#%% # 首先我们导入所有需要的包: import os import random import numpy as np import pandas as pd import torch import torch.nn as nn import torch.optim as optim import torchvision.transforms as transforms from tqdm import tqdm from sklearn.metrics import roc_auc_score from torch.utils.data import DataLoader from torchvision.datasets import FashionMNIST import deepquantum as dq import matplotlib.pyplot as plt def seed_torch(seed=1024): """ Set random seeds for reproducibility. Args: seed (int): Random seed number to use. Default is 1024. """ random.seed(seed) os.environ['PYTHONHASHSEED'] = str(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) # Seed all GPUs with the same seed if using multi-GPU torch.cuda.manual_seed_all(seed) torch.backends.cudnn.benchmark = False torch.backends.cudnn.deterministic = True seed_torch(42) # 使用更常见的随机种子值 #%% def calculate_score(y_true, y_preds): # 将模型预测结果转为概率分布 preds_prob = torch.softmax(y_preds, dim=1) # 获得预测的类别(概率最高的一类) preds_class = torch.argmax(preds_prob, dim=1) # 计算准确率 correct = (preds_class == y_true).float() accuracy = correct.sum() / len(correct) return accuracy.cpu().numpy() def train_model(model, criterion, optimizer, train_loader, valid_loader, num_epochs, device): """ 训练和验证模型。 Args: model (torch.nn.Module): 要训练的模型。 criterion (torch.nn.Module): 损失函数。 optimizer (torch.optim.Optimizer): 优化器。 train_loader (torch.utils.data.DataLoader): 训练数据加载器。 valid_loader (torch.utils.data.DataLoader): 验证数据加载器。 num_epochs (int): 训练的epoch数。 Returns: model (torch.nn.Module): 训练后的模型。 """ model.train() train_loss_list = [] valid_loss_list = [] train_acc_list = [] valid_acc_list = [] best_valid_acc = 0.0 patience = 10 # 早停耐心值 counter = 0 # 计数器 scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=10) with tqdm(total=num_epochs) as pbar: for epoch in range(num_epochs): # 训练阶段 train_loss = 0.0 train_acc = 0.0 for images, labels in train_loader: images = images.to(device) labels = labels.to(device) optimizer.zero_grad() outputs = model(images) loss = criterion(outputs, labels) loss.backward() optimizer.step() train_loss += loss.item() train_acc += calculate_score(labels, outputs) train_loss /= len(train_loader) train_acc /= len(train_loader) # 验证阶段 model.eval() valid_loss = 0.0 valid_acc = 0.0 with torch.no_grad(): for images, labels in valid_loader: images = images.to(device) labels = labels.to(device) outputs = model(images) loss = criterion(outputs, labels) valid_loss += loss.item() valid_acc += calculate_score(labels, outputs) valid_loss /= len(valid_loader) valid_acc /= len(valid_loader) # 学习率调度器更新 scheduler.step(valid_acc) # 早停机制 if valid_acc > best_valid_acc: best_valid_acc = valid_acc torch.save(model.state_dict(), './data/notebook2/best_model.pt') counter = 0 else: counter += 1 if counter >= patience: print(f'Early stopping at epoch {epoch+1} due to no improvement in validation accuracy.') break pbar.set_description(f"Train loss: {train_loss:.3f} Valid Acc: {valid_acc:.3f}") pbar.update() train_loss_list.append(train_loss) valid_loss_list.append(valid_loss) train_acc_list.append(train_acc) valid_acc_list.append(valid_acc) # 加载最佳模型权重 if os.path.exists('./data/notebook2/best_model.pt'): model.load_state_dict(torch.load('./data/notebook2/best_model.pt')) # 修改metrics构建方式,确保各数组长度一致 metrics = { 'epoch': list(range(1, len(train_loss_list) + 1)), 'train_acc': train_acc_list, 'valid_acc': valid_acc_list, 'train_loss': train_loss_list, 'valid_loss': valid_loss_list } return model, metrics def test_model(model, test_loader, device): model.eval() test_acc = 0.0 with torch.no_grad(): for images, labels in test_loader: images = images.to(device) labels = labels.to(device) outputs = model(images) test_acc += calculate_score(labels, outputs) test_acc /= len(test_loader) print(f'Test Acc: {test_acc:.3f}') return test_acc #%% # 定义图像变换 trans1 = transforms.Compose([ transforms.RandomHorizontalFlip(), # 随机水平翻转 transforms.RandomRotation(10), # 随机旋转±10度 transforms.ColorJitter(brightness=0.2, contrast=0.2), # 颜色调整 transforms.Resize((18, 18)), # 调整大小为18x18 transforms.ToTensor(), # 转换为张量 transforms.Normalize((0.5,), (0.5,)) # 归一化到[-1, 1] ]) trans2 = transforms.Compose([ transforms.RandomHorizontalFlip(), # 随机水平翻转 transforms.RandomRotation(10), # 随机旋转±10度 transforms.ColorJitter(brightness=0.2, contrast=0.2), # 颜色调整 transforms.Resize((16, 16)), # 调整大小为16x16 transforms.ToTensor(), # 转换为张量 transforms.Normalize((0.5,), (0.5,)) # 归一化到[-1, 1] ]) train_dataset = FashionMNIST(root='./data/notebook2', train=False, transform=trans1,download=True) test_dataset = FashionMNIST(root='./data/notebook2', train=False, transform=trans1,download=True) # 定义训练集和测试集的比例 train_ratio = 0.8 # 训练集比例为80%,验证集比例为20% valid_ratio = 0.2 total_samples = len(train_dataset) train_size = int(train_ratio * total_samples) valid_size = int(valid_ratio * total_samples) # 分割训练集和测试集 train_dataset, valid_dataset = torch.utils.data.random_split(train_dataset, [train_size, valid_size]) # 加载随机抽取的训练数据集 train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, drop_last=True) valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False, drop_last=True) test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, drop_last=True) #%% singlegate_list = ['rx', 'ry', 'rz', 's', 't', 'p', 'u3'] doublegate_list = ['rxx', 'ryy', 'rzz', 'swap', 'cnot', 'cp', 'ch', 'cu', 'ct', 'cz'] #%% # 随机量子卷积层 class RandomQuantumConvolutionalLayer(nn.Module): def __init__(self, nqubit, num_circuits, seed:int=1024): super(RandomQuantumConvolutionalLayer, self).__init__() random.seed(seed) self.nqubit = nqubit self.cirs = nn.ModuleList([self.circuit(nqubit) for _ in range(num_circuits)]) def circuit(self, nqubit): cir = dq.QubitCircuit(nqubit) cir.rxlayer(encode=True) # 对原论文的量子线路结构并无影响,只是做了一个数据编码的操作 cir.barrier() for iter in range(3): for i in range(nqubit): singlegate = random.choice(singlegate_list) getattr(cir, singlegate)(i) control_bit, target_bit = random.sample(range(0, nqubit - 1), 2) doublegate = random.choice(doublegate_list) if doublegate[0] in ['r', 's']: getattr(cir, doublegate)([control_bit, target_bit]) else: getattr(cir, doublegate)(control_bit, target_bit) cir.barrier() cir.observable(0) return cir def forward(self, x): kernel_size, stride = 2, 2 # [64, 1, 18, 18] -> [64, 1, 9, 18, 2] -> [64, 1, 9, 9, 2, 2] x_unflod = x.unfold(2, kernel_size, stride).unfold(3, kernel_size, stride) w = int((x.shape[-1] - kernel_size) / stride + 1) x_reshape = x_unflod.reshape(-1, self.nqubit) exps = [] for cir in self.cirs: # out_channels cir(x_reshape) exp = cir.expectation() exps.append(exp) exps = torch.stack(exps, dim=1) exps = exps.reshape(x.shape[0], 3, w, w) return exps #%% net = RandomQuantumConvolutionalLayer(nqubit=4, num_circuits=3, seed=1024) net.cirs[0].draw() #%% # 基于随机量子卷积层的混合模型 class RandomQCCNN(nn.Module): def __init__(self): super(RandomQCCNN, self).__init__() self.conv = nn.Sequential( RandomQuantumConvolutionalLayer(nqubit=4, num_circuits=3, seed=1024), # num_circuits=3代表我们在quanv1层只用了3个量子卷积核 nn.BatchNorm2d(3), # 添加批量归一化 nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=1), nn.Conv2d(3, 6, kernel_size=2, stride=1), nn.BatchNorm2d(6), # 添加批量归一化 nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=1) ) self.fc = nn.Sequential( nn.Linear(6 * 6 * 6, 1024), nn.BatchNorm1d(1024), # 添加批量归一化 nn.Dropout(0.5), # 增加dropout比例 nn.ReLU(), nn.Linear(1024, 10) ) def forward(self, x): x = self.conv(x) x = x.reshape(x.size(0), -1) x = self.fc(x) return x #%% # 修改RandomQCCNN模型的训练参数 num_epochs = 300 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(device) seed_torch(42) # 使用相同的随机种子值 model = RandomQCCNN() model.to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-5) # 使用AdamW优化器和适当的权重衰减 optim_model, metrics = train_model(model, criterion, optimizer, train_loader, valid_loader, num_epochs, device) torch.save(optim_model.state_dict(), './data/notebook2/random_qccnn_weights.pt') # 保存训练好的模型参数,用于后续的推理或测试 pd.DataFrame(metrics).to_csv('./data/notebook2/random_qccnn_metrics.csv', index='None') # 保存模型训练过程,用于后续图标展示 #%% state_dict = torch.load('./data/notebook2/random_qccnn_weights.pt', map_location=device) random_qccnn_model = RandomQCCNN() random_qccnn_model.load_state_dict(state_dict) random_qccnn_model.to(device) test_acc = test_model(random_qccnn_model, test_loader, device) #%% data = pd.read_csv('./data/notebook2/random_qccnn_metrics.csv') epoch = data['epoch'] train_loss = data['train_loss'] valid_loss = data['valid_loss'] train_acc = data['train_acc'] valid_acc = data['valid_acc'] # 创建图和Axes对象 fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5)) # 绘制训练损失曲线 ax1.plot(epoch, train_loss, label='Train Loss') ax1.plot(epoch, valid_loss, label='Valid Loss') ax1.set_title('Training Loss Curve') ax1.set_xlabel('Epoch') ax1.set_ylabel('Loss') ax1.legend() # 绘制训练准确率曲线 ax2.plot(epoch, train_acc, label='Train Accuracy') ax2.plot(epoch, valid_acc, label='Valid Accuracy') ax2.set_title('Training Accuracy Curve') ax2.set_xlabel('Epoch') ax2.set_ylabel('Accuracy') ax2.legend() plt.show() #%% class ParameterizedQuantumConvolutionalLayer(nn.Module): def __init__(self, nqubit, num_circuits): super().__init__() self.nqubit = nqubit self.cirs = nn.ModuleList([self.circuit(nqubit) for _ in range(num_circuits)]) def circuit(self, nqubit): cir = dq.QubitCircuit(nqubit) cir.rxlayer(encode=True) #对原论文的量子线路结构并无影响,只是做了一个数据编码的操作 cir.barrier() for iter in range(4): #对应原论文中一个量子卷积线路上的深度为4,可控参数一共16个 cir.rylayer() cir.cnot_ring() cir.barrier() cir.observable(0) return cir def forward(self, x): kernel_size, stride = 2, 2 # [64, 1, 18, 18] -> [64, 1, 9, 18, 2] -> [64, 1, 9, 9, 2, 2] x_unflod = x.unfold(2, kernel_size, stride).unfold(3, kernel_size, stride) w = int((x.shape[-1] - kernel_size) / stride + 1) x_reshape = x_unflod.reshape(-1, self.nqubit) exps = [] for cir in self.cirs: # out_channels cir(x_reshape) exp = cir.expectation() exps.append(exp) exps = torch.stack(exps, dim=1) exps = exps.reshape(x.shape[0], 3, w, w) return exps #%% # 此处我们可视化其中一个量子卷积核的线路结构: net = ParameterizedQuantumConvolutionalLayer(nqubit=4, num_circuits=3) net.cirs[0].draw() #%% # QCCNN整体网络架构: class QCCNN(nn.Module): def __init__(self): super(QCCNN, self).__init__() self.conv = nn.Sequential( ParameterizedQuantumConvolutionalLayer(nqubit=4, num_circuits=3), nn.BatchNorm2d(3), # 添加批量归一化 nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=1) ) self.fc = nn.Sequential( nn.Linear(8 * 8 * 3, 128), nn.BatchNorm1d(128), # 添加批量归一化 nn.Dropout(0.5), # 增加dropout比例 nn.ReLU(), nn.Linear(128, 10) ) def forward(self, x): x = self.conv(x) x = x.reshape(x.size(0), -1) x = self.fc(x) return x #%% # 修改QCCNN模型的训练参数 num_epochs = 300 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = QCCNN() model.to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-5) # 使用AdamW优化器和适当的权重衰减 optim_model, metrics = train_model(model, criterion, optimizer, train_loader, valid_loader, num_epochs, device) torch.save(optim_model.state_dict(), './data/notebook2/qccnn_weights.pt') # 保存训练好的模型参数,用于后续的推理或测试 pd.DataFrame(metrics).to_csv('./data/notebook2/qccnn_metrics.csv', index='None') # 保存模型训练过程,用于后续图标展示 #%% state_dict = torch.load('./data/notebook2/qccnn_weights.pt', map_location=device) qccnn_model = QCCNN() qccnn_model.load_state_dict(state_dict) qccnn_model.to(device) test_acc = test_model(qccnn_model, test_loader, device) #%% def vgg_block(in_channel,out_channel,num_convs): layers = nn.ModuleList() assert num_convs >= 1 layers.append(nn.Conv2d(in_channel,out_channel,kernel_size=3,padding=1)) layers.append(nn.ReLU()) for _ in range(num_convs-1): layers.append(nn.Conv2d(out_channel,out_channel,kernel_size=3,padding=1)) layers.append(nn.ReLU()) layers.append(nn.MaxPool2d(kernel_size=2,stride=2)) return nn.Sequential(*layers) VGG = nn.Sequential( vgg_block(1, 32, 2), # 增加通道数和调整卷积层数量 vgg_block(32, 64, 2), nn.Flatten(), nn.Linear(64 * 4 * 4, 256), # 调整全连接层大小 nn.BatchNorm1d(256), # 添加批量归一化 nn.ReLU(), nn.Dropout(0.5), # 增加dropout比例 nn.Linear(256, 128), nn.BatchNorm1d(128), # 添加批量归一化 nn.ReLU(), nn.Dropout(0.5), nn.Linear(128, 10), nn.Softmax(dim=-1) ) #%% # 修改VGG模型的训练参数 num_epochs = 300 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") vgg_model = VGG vgg_model.to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.AdamW(vgg_model.parameters(), lr=3e-4, weight_decay=1e-5) # 使用AdamW优化器和适当的权重衰减 vgg_model, metrics = train_model(vgg_model, criterion, optimizer, train_loader, valid_loader, num_epochs, device) torch.save(vgg_model.state_dict(), './data/notebook2/vgg_weights.pt') # 保存训练好的模型参数,用于后续的推理或测试 pd.DataFrame(metrics).to_csv('./data/notebook2/vgg_metrics.csv', index='None') # 保存模型训练过程,用于后续图标展示 #%% state_dict = torch.load('./data/notebook2/vgg_weights.pt', map_location=device) vgg_model = VGG vgg_model.load_state_dict(state_dict) vgg_model.to(device) vgg_test_acc = test_model(vgg_model, test_loader, device) #%% vgg_data = pd.read_csv('./data/notebook2/vgg_metrics.csv') qccnn_data = pd.read_csv('./data/notebook2/qccnn_metrics.csv') vgg_epoch = vgg_data['epoch'] vgg_train_loss = vgg_data['train_loss'] vgg_valid_loss = vgg_data['valid_loss'] vgg_train_acc = vgg_data['train_acc'] vgg_valid_acc = vgg_data['valid_acc'] qccnn_epoch = qccnn_data['epoch'] qccnn_train_loss = qccnn_data['train_loss'] qccnn_valid_loss = qccnn_data['valid_loss'] qccnn_train_acc = qccnn_data['train_acc'] qccnn_valid_acc = qccnn_data['valid_acc'] # 创建图和Axes对象 fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5)) # 绘制训练损失曲线 ax1.plot(vgg_epoch, vgg_train_loss, label='VGG Train Loss') ax1.plot(vgg_epoch, vgg_valid_loss, label='VGG Valid Loss') ax1.plot(qccnn_epoch, qccnn_train_loss, label='QCCNN Valid Loss') ax1.plot(qccnn_epoch, qccnn_valid_loss, label='QCCNN Valid Loss') ax1.set_title('Training Loss Curve') ax1.set_xlabel('Epoch') ax1.set_ylabel('Loss') ax1.legend() # 绘制训练准确率曲线 ax2.plot(vgg_epoch, vgg_train_acc, label='VGG Train Accuracy') ax2.plot(vgg_epoch, vgg_valid_acc, label='VGG Valid Accuracy') ax2.plot(qccnn_epoch, qccnn_train_acc, label='QCCNN Train Accuracy') ax2.plot(qccnn_epoch, qccnn_valid_acc, label='QCCNN Valid Accuracy') ax2.set_title('Training Accuracy Curve') ax2.set_xlabel('Epoch') ax2.set_ylabel('Accuracy') ax2.legend() plt.show() #%% # 这里我们对比不同模型之间可训练参数量的区别 def count_parameters(model): """ 计算模型的参数数量 """ return sum(p.numel() for p in model.parameters() if p.requires_grad) number_params_VGG = count_parameters(VGG) number_params_QCCNN = count_parameters(QCCNN()) print(f'VGG 模型可训练参数量:{number_params_VGG}\t QCCNN模型可训练参数量:{number_params_QCCNN}')