# Modify.py #%% 导入所有需要的包 import os import random import numpy as np import pandas as pd import deepquantum as dq import matplotlib.pyplot as plt import torch import torch.nn as nn import torch.optim as optim import torchvision.transforms as transforms from torchvision.datasets import FashionMNIST from tqdm import tqdm from torch.utils.data import DataLoader from multiprocessing import freeze_support #%% 设置随机种子以保证可复现 def seed_torch(seed=1024): random.seed(seed) os.environ['PYTHONHASHSEED'] = str(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) torch.backends.cudnn.benchmark = False torch.backends.cudnn.deterministic = True #%% 准确率计算函数 def calculate_score(y_true, y_preds): preds_prob = torch.softmax(y_preds, dim=1) preds_class = torch.argmax(preds_prob, dim=1) correct = (preds_class == y_true).float() return (correct.sum() / len(correct)).cpu().numpy() #%% 训练与验证函数 def train_model(model, criterion, optimizer, scheduler, train_loader, valid_loader, num_epochs, device, save_path): model.to(device) best_acc = 0.0 metrics = {'epoch': [], 'train_acc': [], 'valid_acc': [], 'train_loss': [], 'valid_loss': []} for epoch in range(1, num_epochs + 1): # --- 训练阶段 --- model.train() running_loss, running_acc = 0.0, 0.0 for imgs, labels in train_loader: imgs, labels = imgs.to(device), labels.to(device) optimizer.zero_grad() outputs = model(imgs) loss = criterion(outputs, labels) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() running_loss += loss.item() running_acc += calculate_score(labels, outputs) train_loss = running_loss / len(train_loader) train_acc = running_acc / len(train_loader) scheduler.step() # --- 验证阶段 --- model.eval() val_loss, val_acc = 0.0, 0.0 with torch.no_grad(): for imgs, labels in valid_loader: imgs, labels = imgs.to(device), labels.to(device) outputs = model(imgs) loss = criterion(outputs, labels) val_loss += loss.item() val_acc += calculate_score(labels, outputs) valid_loss = val_loss / len(valid_loader) valid_acc = val_acc / len(valid_loader) metrics['epoch'].append(epoch) metrics['train_loss'].append(train_loss) metrics['valid_loss'].append(valid_loss) metrics['train_acc'].append(train_acc) metrics['valid_acc'].append(valid_acc) tqdm.write(f"[{save_path}] Epoch {epoch}/{num_epochs} " f"Train Acc: {train_acc:.4f} Valid Acc: {valid_acc:.4f}") if valid_acc > best_acc: best_acc = valid_acc torch.save(model.state_dict(), save_path) return model, metrics #%% 测试函数 def test_model(model, test_loader, device): model.to(device).eval() acc = 0.0 with torch.no_grad(): for imgs, labels in test_loader: imgs, labels = imgs.to(device), labels.to(device) outputs = model(imgs) acc += calculate_score(labels, outputs) acc /= len(test_loader) print(f"Test Accuracy: {acc:.4f}") return acc #%% 定义量子卷积层与模型 singlegate_list = ['rx','ry','rz','s','t','p','u3'] doublegate_list = ['rxx','ryy','rzz','swap','cnot','cp','ch','cu','ct','cz'] class RandomQuantumConvolutionalLayer(nn.Module): def __init__(self, nqubit, num_circuits, seed=1024): super().__init__() random.seed(seed) self.nqubit = nqubit self.cirs = nn.ModuleList([self.circuit(nqubit) for _ in range(num_circuits)]) def circuit(self, nqubit): cir = dq.QubitCircuit(nqubit) cir.rxlayer(encode=True); cir.barrier() for _ in range(3): for i in range(nqubit): getattr(cir, random.choice(singlegate_list))(i) c,t = random.sample(range(nqubit),2) gate = random.choice(doublegate_list) if gate[0] in ['r','s']: getattr(cir, gate)([c,t]) else: getattr(cir, gate)(c,t) cir.barrier() cir.observable(0) return cir def forward(self, x): k,s = 2,2 x_unf = x.unfold(2,k,s).unfold(3,k,s) w = (x.shape[-1]-k)//s + 1 x_r = x_unf.reshape(-1, self.nqubit) exps = [] for cir in self.cirs: cir(x_r) exps.append(cir.expectation()) exps = torch.stack(exps,1).reshape(x.size(0), len(self.cirs), w, w) return exps class RandomQCCNN(nn.Module): def __init__(self): super().__init__() self.conv = nn.Sequential( RandomQuantumConvolutionalLayer(4,3,seed=1024), nn.ReLU(), nn.MaxPool2d(2,1), nn.Conv2d(3,6,2,1), nn.ReLU(), nn.MaxPool2d(2,1) ) self.fc = nn.Sequential( nn.Linear(6*6*6,1024), nn.Dropout(0.4), nn.Linear(1024,10) ) def forward(self,x): x = self.conv(x) x = x.view(x.size(0),-1) return self.fc(x) class ParameterizedQuantumConvolutionalLayer(nn.Module): def __init__(self,nqubit,num_circuits): super().__init__() self.nqubit = nqubit self.cirs = nn.ModuleList([self.circuit(nqubit) for _ in range(num_circuits)]) def circuit(self,nqubit): cir = dq.QubitCircuit(nqubit) cir.rxlayer(encode=True); cir.barrier() for _ in range(4): cir.rylayer(); cir.cnot_ring(); cir.barrier() cir.observable(0) return cir def forward(self,x): k,s = 2,2 x_unf = x.unfold(2,k,s).unfold(3,k,s) w = (x.shape[-1]-k)//s +1 x_r = x_unf.reshape(-1,self.nqubit) exps = [] for cir in self.cirs: cir(x_r); exps.append(cir.expectation()) exps = torch.stack(exps,1).reshape(x.size(0),len(self.cirs),w,w) return exps class QCCNN(nn.Module): def __init__(self): super().__init__() self.conv = nn.Sequential( ParameterizedQuantumConvolutionalLayer(4,3), nn.ReLU(), nn.MaxPool2d(2,1) ) self.fc = nn.Sequential( nn.Linear(8*8*3,128), nn.Dropout(0.4), nn.ReLU(), nn.Linear(128,10) ) def forward(self,x): x = self.conv(x); x = x.view(x.size(0),-1) return self.fc(x) def vgg_block(in_c,out_c,n_convs): layers = [nn.Conv2d(in_c,out_c,3,padding=1), nn.ReLU()] for _ in range(n_convs-1): layers += [nn.Conv2d(out_c,out_c,3,padding=1), nn.ReLU()] layers.append(nn.MaxPool2d(2,2)) return nn.Sequential(*layers) VGG = nn.Sequential( vgg_block(1,10,3), vgg_block(10,16,3), nn.Flatten(), nn.Linear(16*4*4,120), nn.Sigmoid(), nn.Linear(120,84), nn.Sigmoid(), nn.Linear(84,10), nn.Softmax(dim=-1) ) #%% 主入口 if __name__ == '__main__': freeze_support() # 数据增广与加载 train_transform = transforms.Compose([ transforms.Resize((18, 18)), transforms.RandomRotation(15), transforms.RandomHorizontalFlip(), transforms.RandomVerticalFlip(0.3), transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,)) ]) eval_transform = transforms.Compose([ transforms.Resize((18, 18)), transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,)) ]) full_train = FashionMNIST(root='./data/notebook2', train=True, transform=train_transform, download=True) test_dataset = FashionMNIST(root='./data/notebook2', train=False, transform=eval_transform, download=True) train_size = int(0.8 * len(full_train)) valid_size = len(full_train) - train_size train_ds, valid_ds = torch.utils.data.random_split(full_train, [train_size, valid_size]) valid_ds.dataset.transform = eval_transform batch_size = 128 train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, drop_last=True, num_workers=4) valid_loader = DataLoader(valid_ds, batch_size=batch_size, shuffle=False, drop_last=True, num_workers=4) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=False, num_workers=4) # 三种模型配置 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") models = { 'random_qccnn': (RandomQCCNN(), 1e-3, './data/notebook2/random_qccnn_best.pt'), 'qccnn': (QCCNN(), 1e-4, './data/notebook2/qccnn_best.pt'), 'vgg': (VGG, 1e-4, './data/notebook2/vgg_best.pt') } all_metrics = {} for name, (model, lr, save_path) in models.items(): seed_torch(1024) model = model.to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4) scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50) print(f"\n=== Training {name} ===") _, metrics = train_model( model, criterion, optimizer, scheduler, train_loader, valid_loader, num_epochs=50, device=device, save_path=save_path ) all_metrics[name] = metrics pd.DataFrame(metrics).to_csv(f'./data/notebook2/{name}_metrics.csv', index=False) # 测试与可视化 plt.figure(figsize=(12,5)) for i,(name,metrics) in enumerate(all_metrics.items(),1): model, _, save_path = models[name] best_model = model.to(device) best_model.load_state_dict(torch.load(save_path)) print(f"\n--- Testing {name} ---") test_model(best_model, test_loader, device) plt.subplot(1,3,i) plt.plot(metrics['epoch'], metrics['valid_acc'], label=f'{name} Val Acc') plt.xlabel('Epoch'); plt.ylabel('Valid Acc') plt.title(name); plt.legend() plt.tight_layout(); plt.show() # 参数量统计 def count_parameters(m): return sum(p.numel() for p in m.parameters() if p.requires_grad) print("\nParameter Counts:") for name,(model,_,_) in models.items(): print(f"{name}: {count_parameters(model)}")