DeepQuantom-CNN/Origin.py
fly6516 a6c92a4031 refactor(Modify): 重构 Modify.py 脚本
- 添加数据增强、优化器与学习率调度等改进
- 统一训练框架,支持多模型训练- 增加梯度裁剪、最佳模型保存等功能
- 优化代码结构,提高可维护性
2025-06-25 03:04:30 +08:00

460 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#%%
# 首先我们导入所有需要的包:
import os
import random
import numpy as np
import pandas as pd
import deepquantum as dq
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from tqdm import tqdm
from sklearn.metrics import roc_auc_score
from torch.utils.data import DataLoader
# from torchvision.datasets import MNIST, FashionMNIST
def seed_torch(seed=1024):
"""
Set random seeds for reproducibility.
Args:
seed (int): Random seed number to use. Default is 1024.
"""
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
# Seed all GPUs with the same seed if using multi-GPU
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
seed_torch(1024)
#%%
def calculate_score(y_true, y_preds):
# 将模型预测结果转为概率分布
preds_prob = torch.softmax(y_preds, dim=1)
# 获得预测的类别(概率最高的一类)
preds_class = torch.argmax(preds_prob, dim=1)
# 计算准确率
correct = (preds_class == y_true).float()
accuracy = correct.sum() / len(correct)
return accuracy.cpu().numpy()
def train_model(model, criterion, optimizer, train_loader, valid_loader, num_epochs, device):
"""
训练和验证模型。
Args:
model (torch.nn.Module): 要训练的模型。
criterion (torch.nn.Module): 损失函数。
optimizer (torch.optim.Optimizer): 优化器。
train_loader (torch.utils.data.DataLoader): 训练数据加载器。
valid_loader (torch.utils.data.DataLoader): 验证数据加载器。
num_epochs (int): 训练的epoch数。
Returns:
model (torch.nn.Module): 训练后的模型。
"""
model.train()
train_loss_list = []
valid_loss_list = []
train_acc_list = []
valid_acc_list = []
with tqdm(total=num_epochs) as pbar:
for epoch in range(num_epochs):
# 训练阶段
train_loss = 0.0
train_acc = 0.0
for images, labels in train_loader:
images = images.to(device)
labels = labels.to(device)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_acc += calculate_score(labels, outputs)
train_loss /= len(train_loader)
train_acc /= len(train_loader)
# 验证阶段
model.eval()
valid_loss = 0.0
valid_acc = 0.0
with torch.no_grad():
for images, labels in valid_loader:
images = images.to(device)
labels = labels.to(device)
outputs = model(images)
loss = criterion(outputs, labels)
valid_loss += loss.item()
valid_acc += calculate_score(labels, outputs)
valid_loss /= len(valid_loader)
valid_acc /= len(valid_loader)
pbar.set_description(f"Train loss: {train_loss:.3f} Valid Acc: {valid_acc:.3f}")
pbar.update()
train_loss_list.append(train_loss)
valid_loss_list.append(valid_loss)
train_acc_list.append(train_acc)
valid_acc_list.append(valid_acc)
metrics = {'epoch': list(range(1, num_epochs + 1)),
'train_acc': train_acc_list,
'valid_acc': valid_acc_list,
'train_loss': train_loss_list,
'valid_loss': valid_loss_list}
return model, metrics
def test_model(model, test_loader, device):
model.eval()
test_acc = 0.0
with torch.no_grad():
for images, labels in test_loader:
images = images.to(device)
labels = labels.to(device)
outputs = model(images)
test_acc += calculate_score(labels, outputs)
test_acc /= len(test_loader)
print(f'Test Acc: {test_acc:.3f}')
return test_acc
#%%
# 定义图像变换
trans1 = transforms.Compose([
transforms.Resize((18, 18)), # 调整大小为18x18
transforms.ToTensor() # 转换为张量
])
trans2 = transforms.Compose([
transforms.Resize((16, 16)), # 调整大小为16x16
transforms.ToTensor() # 转换为张量
])
train_dataset = FashionMNIST(root='./data/notebook1', train=False, transform=trans1,download=True)
test_dataset = FashionMNIST(root='./data/notebook1', train=False, transform=trans1,download=True)
# 定义训练集和测试集的比例
train_ratio = 0.8 # 训练集比例为80%验证集比例为20%
valid_ratio = 0.2
total_samples = len(train_dataset)
train_size = int(train_ratio * total_samples)
valid_size = int(valid_ratio * total_samples)
# 分割训练集和测试集
train_dataset, valid_dataset = torch.utils.data.random_split(train_dataset, [train_size, valid_size])
# 加载随机抽取的训练数据集
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, drop_last=True)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False, drop_last=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, drop_last=True)
#%%
singlegate_list = ['rx', 'ry', 'rz', 's', 't', 'p', 'u3']
doublegate_list = ['rxx', 'ryy', 'rzz', 'swap', 'cnot', 'cp', 'ch', 'cu', 'ct', 'cz']
#%%
# 随机量子卷积层
class RandomQuantumConvolutionalLayer(nn.Module):
def __init__(self, nqubit, num_circuits, seed:int=1024):
super(RandomQuantumConvolutionalLayer, self).__init__()
random.seed(seed)
self.nqubit = nqubit
self.cirs = nn.ModuleList([self.circuit(nqubit) for _ in range(num_circuits)])
def circuit(self, nqubit):
cir = dq.QubitCircuit(nqubit)
cir.rxlayer(encode=True) # 对原论文的量子线路结构并无影响,只是做了一个数据编码的操作
cir.barrier()
for iter in range(3):
for i in range(nqubit):
singlegate = random.choice(singlegate_list)
getattr(cir, singlegate)(i)
control_bit, target_bit = random.sample(range(0, nqubit - 1), 2)
doublegate = random.choice(doublegate_list)
if doublegate[0] in ['r', 's']:
getattr(cir, doublegate)([control_bit, target_bit])
else:
getattr(cir, doublegate)(control_bit, target_bit)
cir.barrier()
cir.observable(0)
return cir
def forward(self, x):
kernel_size, stride = 2, 2
# [64, 1, 18, 18] -> [64, 1, 9, 18, 2] -> [64, 1, 9, 9, 2, 2]
x_unflod = x.unfold(2, kernel_size, stride).unfold(3, kernel_size, stride)
w = int((x.shape[-1] - kernel_size) / stride + 1)
x_reshape = x_unflod.reshape(-1, self.nqubit)
exps = []
for cir in self.cirs: # out_channels
cir(x_reshape)
exp = cir.expectation()
exps.append(exp)
exps = torch.stack(exps, dim=1)
exps = exps.reshape(x.shape[0], 3, w, w)
return exps
#%%
net = RandomQuantumConvolutionalLayer(nqubit=4, num_circuits=3, seed=1024)
net.cirs[0].draw()
#%%
# 基于随机量子卷积层的混合模型
class RandomQCCNN(nn.Module):
def __init__(self):
super(RandomQCCNN, self).__init__()
self.conv = nn.Sequential(
RandomQuantumConvolutionalLayer(nqubit=4, num_circuits=3, seed=1024), # num_circuits=3代表我们在quanv1层只用了3个量子卷积核
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=1),
nn.Conv2d(3, 6, kernel_size=2, stride=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=1)
)
self.fc = nn.Sequential(
nn.Linear(6 * 6 * 6, 1024),
nn.Dropout(0.4),
nn.Linear(1024, 10)
)
def forward(self, x):
x = self.conv(x)
x = x.reshape(x.size(0), -1)
x = self.fc(x)
return x
#%%
num_epochs = 300
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
seed_torch(1024) # 重新设置随机种子
model = RandomQCCNN()
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, weight_decay=0.001) # 添加正则化项
optim_model, metrics = train_model(model, criterion, optimizer, train_loader, valid_loader, num_epochs, device)
torch.save(optim_model.state_dict(), './data/notebook1/random_qccnn_weights.pt') # 保存训练好的模型参数,用于后续的推理或测试
pd.DataFrame(metrics).to_csv('./data/notebook1/random_qccnn_metrics.csv', index='None') # 保存模型训练过程,用于后续图标展示
#%%
state_dict = torch.load('./data/notebook1/random_qccnn_weights.pt', map_location=device)
random_qccnn_model = RandomQCCNN()
random_qccnn_model.load_state_dict(state_dict)
random_qccnn_model.to(device)
test_acc = test_model(random_qccnn_model, test_loader, device)
#%%
data = pd.read_csv('./data/notebook1/random_qccnn_metrics.csv')
epoch = data['epoch']
train_loss = data['train_loss']
valid_loss = data['valid_loss']
train_acc = data['train_acc']
valid_acc = data['valid_acc']
# 创建图和Axes对象
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# 绘制训练损失曲线
ax1.plot(epoch, train_loss, label='Train Loss')
ax1.plot(epoch, valid_loss, label='Valid Loss')
ax1.set_title('Training Loss Curve')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.legend()
# 绘制训练准确率曲线
ax2.plot(epoch, train_acc, label='Train Accuracy')
ax2.plot(epoch, valid_acc, label='Valid Accuracy')
ax2.set_title('Training Accuracy Curve')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy')
ax2.legend()
plt.show()
#%%
class ParameterizedQuantumConvolutionalLayer(nn.Module):
def __init__(self, nqubit, num_circuits):
super().__init__()
self.nqubit = nqubit
self.cirs = nn.ModuleList([self.circuit(nqubit) for _ in range(num_circuits)])
def circuit(self, nqubit):
cir = dq.QubitCircuit(nqubit)
cir.rxlayer(encode=True) #对原论文的量子线路结构并无影响,只是做了一个数据编码的操作
cir.barrier()
for iter in range(4): #对应原论文中一个量子卷积线路上的深度为4可控参数一共16个
cir.rylayer()
cir.cnot_ring()
cir.barrier()
cir.observable(0)
return cir
def forward(self, x):
kernel_size, stride = 2, 2
# [64, 1, 18, 18] -> [64, 1, 9, 18, 2] -> [64, 1, 9, 9, 2, 2]
x_unflod = x.unfold(2, kernel_size, stride).unfold(3, kernel_size, stride)
w = int((x.shape[-1] - kernel_size) / stride + 1)
x_reshape = x_unflod.reshape(-1, self.nqubit)
exps = []
for cir in self.cirs: # out_channels
cir(x_reshape)
exp = cir.expectation()
exps.append(exp)
exps = torch.stack(exps, dim=1)
exps = exps.reshape(x.shape[0], 3, w, w)
return exps
#%%
# 此处我们可视化其中一个量子卷积核的线路结构:
net = ParameterizedQuantumConvolutionalLayer(nqubit=4, num_circuits=3)
net.cirs[0].draw()
#%%
# QCCNN整体网络架构
class QCCNN(nn.Module):
def __init__(self):
super(QCCNN, self).__init__()
self.conv = nn.Sequential(
ParameterizedQuantumConvolutionalLayer(nqubit=4, num_circuits=3),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=1)
)
self.fc = nn.Sequential(
nn.Linear(8 * 8 * 3, 128),
nn.Dropout(0.4),
nn.ReLU(),
nn.Linear(128, 10)
)
def forward(self, x):
x = self.conv(x)
x = x.reshape(x.size(0), -1)
x = self.fc(x)
return x
#%%
num_epochs = 300
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = QCCNN()
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-5) # 添加正则化项
optim_model, metrics = train_model(model, criterion, optimizer, train_loader, valid_loader, num_epochs, device)
torch.save(optim_model.state_dict(), './data/notebook1/qccnn_weights.pt') # 保存训练好的模型参数,用于后续的推理或测试
pd.DataFrame(metrics).to_csv('./data/notebook1/qccnn_metrics.csv', index='None') # 保存模型训练过程,用于后续图标展示
#%%
state_dict = torch.load('./data/notebook1/qccnn_weights.pt', map_location=device)
qccnn_model = QCCNN()
qccnn_model.load_state_dict(state_dict)
qccnn_model.to(device)
test_acc = test_model(qccnn_model, test_loader, device)
#%%
def vgg_block(in_channel,out_channel,num_convs):
layers = nn.ModuleList()
assert num_convs >= 1
layers.append(nn.Conv2d(in_channel,out_channel,kernel_size=3,padding=1))
layers.append(nn.ReLU())
for _ in range(num_convs-1):
layers.append(nn.Conv2d(out_channel,out_channel,kernel_size=3,padding=1))
layers.append(nn.ReLU())
layers.append(nn.MaxPool2d(kernel_size=2,stride=2))
return nn.Sequential(*layers)
VGG = nn.Sequential(
vgg_block(1,10,3), # 14,14
vgg_block(10,16,3), # 4 * 4
nn.Flatten(),
nn.Linear(16 * 4 * 4, 120),
nn.Sigmoid(),
nn.Linear(120, 84),
nn.Sigmoid(),
nn.Linear(84,10),
nn.Softmax(dim=-1)
)
#%%
num_epochs = 300
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vgg_model = VGG
vgg_model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(vgg_model.parameters(), lr=1e-5) # 添加正则化项
vgg_model, metrics = train_model(vgg_model, criterion, optimizer, train_loader, valid_loader, num_epochs, device)
torch.save(vgg_model.state_dict(), './data/notebook1/vgg_weights.pt') # 保存训练好的模型参数,用于后续的推理或测试
pd.DataFrame(metrics).to_csv('./data/notebook1/vgg_metrics.csv', index='None') # 保存模型训练过程,用于后续图标展示
#%%
state_dict = torch.load('./data/notebook1/vgg_weights.pt', map_location=device)
vgg_model = VGG
vgg_model.load_state_dict(state_dict)
vgg_model.to(device)
vgg_test_acc = test_model(vgg_model, test_loader, device)
#%%
vgg_data = pd.read_csv('./data/notebook1/vgg_metrics.csv')
qccnn_data = pd.read_csv('./data/notebook1/qccnn_metrics.csv')
vgg_epoch = vgg_data['epoch']
vgg_train_loss = vgg_data['train_loss']
vgg_valid_loss = vgg_data['valid_loss']
vgg_train_acc = vgg_data['train_acc']
vgg_valid_acc = vgg_data['valid_acc']
qccnn_epoch = qccnn_data['epoch']
qccnn_train_loss = qccnn_data['train_loss']
qccnn_valid_loss = qccnn_data['valid_loss']
qccnn_train_acc = qccnn_data['train_acc']
qccnn_valid_acc = qccnn_data['valid_acc']
# 创建图和Axes对象
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# 绘制训练损失曲线
ax1.plot(vgg_epoch, vgg_train_loss, label='VGG Train Loss')
ax1.plot(vgg_epoch, vgg_valid_loss, label='VGG Valid Loss')
ax1.plot(qccnn_epoch, qccnn_train_loss, label='QCCNN Valid Loss')
ax1.plot(qccnn_epoch, qccnn_valid_loss, label='QCCNN Valid Loss')
ax1.set_title('Training Loss Curve')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.legend()
# 绘制训练准确率曲线
ax2.plot(vgg_epoch, vgg_train_acc, label='VGG Train Accuracy')
ax2.plot(vgg_epoch, vgg_valid_acc, label='VGG Valid Accuracy')
ax2.plot(qccnn_epoch, qccnn_train_acc, label='QCCNN Train Accuracy')
ax2.plot(qccnn_epoch, qccnn_valid_acc, label='QCCNN Valid Accuracy')
ax2.set_title('Training Accuracy Curve')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy')
ax2.legend()
plt.show()
#%%
# 这里我们对比不同模型之间可训练参数量的区别
def count_parameters(model):
"""
计算模型的参数数量
"""
return sum(p.numel() for p in model.parameters() if p.requires_grad)
number_params_VGG = count_parameters(VGG)
number_params_QCCNN = count_parameters(QCCNN())
print(f'VGG 模型可训练参数量:{number_params_VGG}\t QCCNN模型可训练参数量{number_params_QCCNN}')