DeepQuantom-CNN/Modify.py

534 lines
20 KiB
Python
Raw Permalink Normal View History

#%%
# 首先我们导入所有需要的包:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from tqdm import tqdm
from sklearn.metrics import roc_auc_score
from torch.utils.data import DataLoader
from torchvision.datasets import FashionMNIST
import deepquantum as dq
import matplotlib.pyplot as plt
def seed_torch(seed=1024):
"""
Set random seeds for reproducibility.
Args:
seed (int): Random seed number to use. Default is 1024.
"""
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
# Seed all GPUs with the same seed if using multi-GPU
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
seed_torch(42) # 使用更常见的随机种子值
#%%
def calculate_score(y_true, y_preds):
# 将模型预测结果转为概率分布
preds_prob = torch.softmax(y_preds, dim=1)
# 获得预测的类别(概率最高的一类)
preds_class = torch.argmax(preds_prob, dim=1)
# 计算准确率
correct = (preds_class == y_true).float()
accuracy = correct.sum() / len(correct)
return accuracy.cpu().numpy()
def train_model(model, criterion, optimizer, train_loader, valid_loader, num_epochs, device):
"""
训练和验证模型
Args:
model (torch.nn.Module): 要训练的模型
criterion (torch.nn.Module): 损失函数
optimizer (torch.optim.Optimizer): 优化器
train_loader (torch.utils.data.DataLoader): 训练数据加载器
valid_loader (torch.utils.data.DataLoader): 验证数据加载器
num_epochs (int): 训练的epoch数
Returns:
model (torch.nn.Module): 训练后的模型
"""
model.train()
train_loss_list = []
valid_loss_list = []
train_acc_list = []
valid_acc_list = []
best_valid_acc = 0.0
patience = 50 # 早停耐心值
counter = 0 # 计数器
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=25)
with tqdm(total=num_epochs) as pbar:
for epoch in range(num_epochs):
# 训练阶段
train_loss = 0.0
train_acc = 0.0
for images, labels in train_loader:
images = images.to(device)
labels = labels.to(device)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_acc += calculate_score(labels, outputs)
train_loss /= len(train_loader)
train_acc /= len(train_loader)
# 验证阶段
model.eval()
valid_loss = 0.0
valid_acc = 0.0
with torch.no_grad():
for images, labels in valid_loader:
images = images.to(device)
labels = labels.to(device)
outputs = model(images)
loss = criterion(outputs, labels)
valid_loss += loss.item()
valid_acc += calculate_score(labels, outputs)
valid_loss /= len(valid_loader)
valid_acc /= len(valid_loader)
# 学习率调度器更新
scheduler.step(valid_acc)
# 早停机制
if valid_acc > best_valid_acc:
best_valid_acc = valid_acc
torch.save(model.state_dict(), './data/notebook2/best_model.pt')
counter = 0
else:
counter += 1
if counter >= patience:
print(f'Early stopping at epoch {epoch+1} due to no improvement in validation accuracy.')
break
pbar.set_description(f"Train loss: {train_loss:.3f} Valid Acc: {valid_acc:.3f}")
pbar.update()
train_loss_list.append(train_loss)
valid_loss_list.append(valid_loss)
train_acc_list.append(train_acc)
valid_acc_list.append(valid_acc)
# 加载最佳模型权重
if os.path.exists('./data/notebook2/best_model.pt'):
model.load_state_dict(torch.load('./data/notebook2/best_model.pt'))
# 修改metrics构建方式确保各数组长度一致
metrics = {
'epoch': list(range(1, len(train_loss_list) + 1)),
'train_acc': train_acc_list,
'valid_acc': valid_acc_list,
'train_loss': train_loss_list,
'valid_loss': valid_loss_list
}
return model, metrics
def test_model(model, test_loader, device):
model.eval()
test_acc = 0.0
with torch.no_grad():
for images, labels in test_loader:
images = images.to(device)
labels = labels.to(device)
outputs = model(images)
test_acc += calculate_score(labels, outputs)
test_acc /= len(test_loader)
print(f'Test Acc: {test_acc:.3f}')
return test_acc
#%%
# 定义图像变换
trans1 = transforms.Compose([
transforms.RandomHorizontalFlip(), # 随机水平翻转
transforms.RandomRotation(10), # 随机旋转±10度
transforms.ColorJitter(brightness=0.2, contrast=0.2), # 颜色调整
transforms.ToTensor(), # 转换为张量
transforms.Normalize((0.5,), (0.5,)) # 归一化到[-1, 1]
])
trans2 = transforms.Compose([
transforms.RandomHorizontalFlip(), # 随机水平翻转
transforms.RandomRotation(10), # 随机旋转±10度
transforms.ColorJitter(brightness=0.2, contrast=0.2), # 颜色调整
transforms.ToTensor(), # 转换为张量
transforms.Normalize((0.5,), (0.5,)) # 归一化到[-1, 1]
])
train_dataset = FashionMNIST(root='./data/notebook2', train=True, transform=trans1,download=True)
test_dataset = FashionMNIST(root='./data/notebook2', train=False, transform=trans1,download=True)
# 定义训练集和测试集的比例
train_ratio = 0.8 # 训练集比例为80%验证集比例为20%
valid_ratio = 0.2
total_samples = len(train_dataset)
train_size = int(train_ratio * total_samples)
valid_size = int(valid_ratio * total_samples)
# 分割训练集和测试集
train_dataset, valid_dataset = torch.utils.data.random_split(train_dataset, [train_size, valid_size])
# 加载随机抽取的训练数据集
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, drop_last=True)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False, drop_last=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, drop_last=True)
#%%
singlegate_list = ['rx', 'ry', 'rz', 's', 't', 'p', 'u3']
doublegate_list = ['rxx', 'ryy', 'rzz', 'swap', 'cnot', 'cp', 'ch', 'cu', 'ct', 'cz']
#%%
# 随机量子卷积层
class RandomQuantumConvolutionalLayer(nn.Module):
def __init__(self, nqubit, num_circuits, seed:int=1024):
super(RandomQuantumConvolutionalLayer, self).__init__()
random.seed(seed)
self.nqubit = nqubit
self.cirs = nn.ModuleList([self.circuit(nqubit) for _ in range(num_circuits)])
def circuit(self, nqubit):
cir = dq.QubitCircuit(nqubit)
cir.rxlayer(encode=True) # 对原论文的量子线路结构并无影响,只是做了一个数据编码的操作
cir.barrier()
for iter in range(3):
for i in range(nqubit):
singlegate = random.choice(singlegate_list)
getattr(cir, singlegate)(i)
control_bit, target_bit = random.sample(range(0, nqubit - 1), 2)
doublegate = random.choice(doublegate_list)
if doublegate[0] in ['r', 's']:
getattr(cir, doublegate)([control_bit, target_bit])
else:
getattr(cir, doublegate)(control_bit, target_bit)
cir.barrier()
cir.observable(0)
return cir
def forward(self, x):
kernel_size, stride = 2, 2
# [64, 1, 18, 18] -> [64, 1, 9, 18, 2] -> [64, 1, 9, 9, 2, 2]
x_unflod = x.unfold(2, kernel_size, stride).unfold(3, kernel_size, stride)
w = int((x.shape[-1] - kernel_size) / stride + 1)
x_reshape = x_unflod.reshape(-1, self.nqubit)
exps = []
for cir in self.cirs: # out_channels
cir(x_reshape)
exp = cir.expectation()
exps.append(exp)
exps = torch.stack(exps, dim=1)
exps = exps.reshape(x.shape[0], 3, w, w)
return exps
#%%
net = RandomQuantumConvolutionalLayer(nqubit=4, num_circuits=3, seed=1024)
net.cirs[0].draw()
#%%
# 基于随机量子卷积层的混合模型
class RandomQCCNN(nn.Module):
def __init__(self):
super(RandomQCCNN, self).__init__()
self.conv = nn.Sequential(
RandomQuantumConvolutionalLayer(nqubit=4, num_circuits=3), # num_circuits=3代表我们在quanv1层只用了3个量子卷积核
nn.BatchNorm2d(3), # 添加批量归一化
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=1),
# 添加形状检查层以确保尺寸正确
nn.Conv2d(3, 6, kernel_size=2, stride=1),
nn.BatchNorm2d(6), # 添加批量归一化
nn.ReLU(),
# 添加自适应池化层确保固定输出尺寸
nn.AdaptiveMaxPool2d((9, 9)) # 确保输出固定尺寸
)
self.fc = nn.Sequential(
# 根据自适应池化后的固定尺寸计算输入维度
nn.Linear(6 * 9 * 9, 1024), # 确保与自适应池化输出匹配
nn.BatchNorm1d(1024),
nn.Dropout(0.5),
nn.ReLU(),
nn.Linear(1024, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
# 添加详细的形状检查输出
print(f"Input shape: {x.shape}")
x = self.conv(x)
print(f"After conv layers: {x.shape}") # 添加中间形状检查
x = x.reshape(x.size(0), -1)
print(f"After flatten: {x.shape}") # 添加展平后形状检查
x = self.fc(x)
return x
#%%
# 修改RandomQCCNN模型的训练参数
num_epochs = 300
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
seed_torch(42) # 使用相同的随机种子值
model = RandomQCCNN()
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-5) # 使用AdamW优化器和适当的权重衰减
optim_model, metrics = train_model(model, criterion, optimizer, train_loader, valid_loader, num_epochs, device)
torch.save(optim_model.state_dict(), './data/notebook2/random_qccnn_weights.pt') # 保存训练好的模型参数,用于后续的推理或测试
pd.DataFrame(metrics).to_csv('./data/notebook2/random_qccnn_metrics.csv', index='None') # 保存模型训练过程,用于后续图标展示
#%%
state_dict = torch.load('./data/notebook2/random_qccnn_weights.pt', map_location=device)
random_qccnn_model = RandomQCCNN()
random_qccnn_model.load_state_dict(state_dict)
random_qccnn_model.to(device)
test_acc = test_model(random_qccnn_model, test_loader, device)
#%%
data = pd.read_csv('./data/notebook2/random_qccnn_metrics.csv')
epoch = data['epoch']
train_loss = data['train_loss']
valid_loss = data['valid_loss']
train_acc = data['train_acc']
valid_acc = data['valid_acc']
# 创建图和Axes对象
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# 绘制训练损失曲线
ax1.plot(epoch, train_loss, label='Train Loss')
ax1.plot(epoch, valid_loss, label='Valid Loss')
ax1.set_title('Training Loss Curve')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.legend()
# 绘制训练准确率曲线
ax2.plot(epoch, train_acc, label='Train Accuracy')
ax2.plot(epoch, valid_acc, label='Valid Accuracy')
ax2.set_title('Training Accuracy Curve')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy')
ax2.legend()
plt.show()
#%%
class ParameterizedQuantumConvolutionalLayer(nn.Module):
def __init__(self, nqubit, num_circuits):
super().__init__()
self.nqubit = nqubit
self.cirs = nn.ModuleList([self.circuit(nqubit) for _ in range(num_circuits)])
def circuit(self, nqubit):
cir = dq.QubitCircuit(nqubit)
cir.rxlayer(encode=True) # 数据编码
cir.barrier()
for iter in range(5): # 将线路深度从4增加到5
cir.rylayer()
cir.cnot_ring()
cir.barrier()
cir.observable(0)
return cir
def forward(self, x):
kernel_size, stride = 3, 3 # 使用3x3数据块
x_unflod = x.unfold(2, kernel_size, stride).unfold(3, kernel_size, stride)
print(f"Input shape: {x.shape}") # 添加输入形状检查
print(f"Unfolded shape: {x_unflod.shape}") # 添加展开后形状检查
# 动态计算w值并验证特征图尺寸
w = x_unflod.shape[2] # 使用实际展开后的尺寸
# 确保展平后的总元素数与量子线路输入匹配
x_reshape = x_unflod.reshape(-1, kernel_size * kernel_size) # 将每个3x3块展平为9维
exps = []
for cir in self.cirs:
cir(x_reshape)
exp = cir.expectation()
exps.append(exp)
exps = torch.stack(exps, dim=1)
out_channels = len(self.cirs) # 使用动态计算而非硬编码值
# 验证总元素数一致性
assert exps.numel() == x.shape[0] * out_channels * w * w, \
f"Element count mismatch: {exps.numel()} vs {x.shape[0] * out_channels * w * w}"
# 确保展平后的总元素数与量子线路输出匹配
exps = exps.reshape(x.shape[0], out_channels, w, w)
print(f"Reshaped shape: {exps.shape}") # 添加最终形状检查
return exps
#%%
# 此处我们可视化其中一个量子卷积核的线路结构:
net = ParameterizedQuantumConvolutionalLayer(nqubit=4, num_circuits=3)
net.cirs[0].draw()
#%%
# QCCNN整体网络架构
class QCCNN(nn.Module):
def __init__(self):
super(QCCNN, self).__init__()
self.conv = nn.Sequential(
ParameterizedQuantumConvolutionalLayer(nqubit=4, num_circuits=3), # 恢复为4量子比特
nn.BatchNorm2d(3), # 恢复原始通道数
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=1),
nn.Conv2d(3, 6, kernel_size=1), # 添加1x1卷积层增强通道间信息交互
nn.BatchNorm2d(6),
nn.ReLU(),
nn.AdaptiveMaxPool2d((9, 9)) # 确保输出固定尺寸
)
self.fc = nn.Sequential(
# 根据新的特征图大小调整输入维度6通道、9x9特征图 => 6*9*9=486
nn.Linear(6 * 9 * 9, 1024), # 修改为正确的输入维度
nn.BatchNorm1d(1024),
nn.Dropout(0.5),
nn.ReLU(),
nn.Linear(1024, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.conv(x)
x = x.reshape(x.size(0), -1)
x = self.fc(x)
return x
#%%
# 修改QCCNN模型的训练参数
num_epochs = 300
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = QCCNN()
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=5e-4, weight_decay=5e-5, amsgrad=True) # 优化学习率和weight_decay参数
optim_model, metrics = train_model(model, criterion, optimizer, train_loader, valid_loader, num_epochs, device)
torch.save(optim_model.state_dict(), './data/notebook2/qccnn_weights.pt') # 保存训练好的模型参数,用于后续的推理或测试
pd.DataFrame(metrics).to_csv('./data/notebook2/qccnn_metrics.csv', index='None') # 保存模型训练过程,用于后续图标展示
#%%
state_dict = torch.load('./data/notebook2/qccnn_weights.pt', map_location=device)
qccnn_model = QCCNN()
qccnn_model.load_state_dict(state_dict)
qccnn_model.to(device)
test_acc = test_model(qccnn_model, test_loader, device)
#%%
def vgg_block(in_channel,out_channel,num_convs):
layers = nn.ModuleList()
assert num_convs >= 1
layers.append(nn.Conv2d(in_channel,out_channel,kernel_size=3,padding=1))
layers.append(nn.ReLU())
for _ in range(num_convs-1):
layers.append(nn.Conv2d(out_channel,out_channel,kernel_size=3,padding=1))
layers.append(nn.ReLU())
layers.append(nn.MaxPool2d(kernel_size=2,stride=2))
return nn.Sequential(*layers)
VGG = nn.Sequential(
vgg_block(1, 32, 2), # 增加通道数和调整卷积层数量
vgg_block(32, 64, 2),
nn.Flatten(),
nn.Linear(64 * 7 * 7, 256), # 修改为正确的输入维度
nn.BatchNorm1d(256), # 添加批量归一化
nn.ReLU(),
nn.Dropout(0.5), # 增加dropout比例
nn.Linear(256, 128),
nn.BatchNorm1d(128), # 添加批量归一化
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(128, 10),
nn.Softmax(dim=-1)
)
#%%
# 修改VGG模型的训练参数
num_epochs = 300
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vgg_model = VGG
vgg_model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(vgg_model.parameters(), lr=3e-4, weight_decay=1e-5) # 使用AdamW优化器和适当的权重衰减
vgg_model, metrics = train_model(vgg_model, criterion, optimizer, train_loader, valid_loader, num_epochs, device)
torch.save(vgg_model.state_dict(), './data/notebook2/vgg_weights.pt') # 保存训练好的模型参数,用于后续的推理或测试
pd.DataFrame(metrics).to_csv('./data/notebook2/vgg_metrics.csv', index='None') # 保存模型训练过程,用于后续图标展示
#%%
state_dict = torch.load('./data/notebook2/vgg_weights.pt', map_location=device)
vgg_model = VGG
vgg_model.load_state_dict(state_dict)
vgg_model.to(device)
vgg_test_acc = test_model(vgg_model, test_loader, device)
#%%
vgg_data = pd.read_csv('./data/notebook2/vgg_metrics.csv')
qccnn_data = pd.read_csv('./data/notebook2/qccnn_metrics.csv')
vgg_epoch = vgg_data['epoch']
vgg_train_loss = vgg_data['train_loss']
vgg_valid_loss = vgg_data['valid_loss']
vgg_train_acc = vgg_data['train_acc']
vgg_valid_acc = vgg_data['valid_acc']
qccnn_epoch = qccnn_data['epoch']
qccnn_train_loss = qccnn_data['train_loss']
qccnn_valid_loss = qccnn_data['valid_loss']
qccnn_train_acc = qccnn_data['train_acc']
qccnn_valid_acc = qccnn_data['valid_acc']
# 创建图和Axes对象
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# 绘制训练损失曲线
ax1.plot(vgg_epoch, vgg_train_loss, label='VGG Train Loss')
ax1.plot(vgg_epoch, vgg_valid_loss, label='VGG Valid Loss')
ax1.plot(qccnn_epoch, qccnn_train_loss, label='QCCNN Valid Loss')
ax1.plot(qccnn_epoch, qccnn_valid_loss, label='QCCNN Valid Loss')
ax1.set_title('Training Loss Curve')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.legend()
# 绘制训练准确率曲线
ax2.plot(vgg_epoch, vgg_train_acc, label='VGG Train Accuracy')
ax2.plot(vgg_epoch, vgg_valid_acc, label='VGG Valid Accuracy')
ax2.plot(qccnn_epoch, qccnn_train_acc, label='QCCNN Train Accuracy')
ax2.plot(qccnn_epoch, qccnn_valid_acc, label='QCCNN Valid Accuracy')
ax2.set_title('Training Accuracy Curve')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy')
ax2.legend()
plt.show()
#%%
# 这里我们对比不同模型之间可训练参数量的区别
def count_parameters(model):
"""
计算模型的参数数量
"""
return sum(p.numel() for p in model.parameters() if p.requires_grad)
number_params_VGG = count_parameters(VGG)
number_params_QCCNN = count_parameters(QCCNN())
print(f'VGG 模型可训练参数量:{number_params_VGG}\t QCCNN模型可训练参数量{number_params_QCCNN}')