Pytorch常用代码片段
仿照d2l的pytorch内容进行记录
设备切换
尝试使用GPU
def try_gpu(i=0):
if torch.cuda.device_count() >= i + 1:
return torch.device(f'cuda:{i}')
return torch.device('cpu')
def try_all_gpus():
"""找到所有可以使用的GPU"""
devices = [torch.device(f'cuda:{i}')
for i in range(torch.cuda.device_count())]
return devices if devices else [torch.device('cpu')]
计数器
class Accumulator:
"""用于计数"""
def __init__(self, n):
self.data = [0.0] * n
def add(self, *args):
"""将一组数据加载本身的数据之上"""
self.data = [a + float(b) for a, b in zip(self.data, args)]
def reset(self):
self.data = [0.0] * len(self.data)
def __getitem__(self, idx):
return self.data[idx]
精确性
对于训练精度评价可以使用如下代码
def accuracy(y_hat, y):
"""计算准确度"""
if len(y_hat.shape) >1 and y_hat.shape[1] > 1:
y_hat = y_hat.argmax(axis=1)
cmp = y_hat.type(y.dtype) == y
return float(cmp.type(y_hat.dtype).sum())
def evaluate_accuracy(net, data_iter):
"""准确度计算"""
if isinstance(net, nn.Module):
net.eval() # 设置为评估模式
# 使用计数器分别表示正确预测的个数和总的预测个数
metric = Accumulator(2)
with torch.no_grad():
for X, y in data_iter:
metric.add(accuracy(net(X), y), y.numel())
return metric[0] / metric[1]
当使用GPU的时候需要进行额外的处理,主要可以使用如下内容
def evaluate_accuracy_gpu(net, data_iter, device=None):
"""在GPU上的准确度计算"""
if isinstance(net, nn.Module):
net.eval() # 设置为评估模式
if not device: # 如果没指定device,则使用网络所处的device
device = next(iter(net.parameters())).device
# 使用计数器分别表示正确预测的个数和总的预测个数
metric = Accumulator(2)
with torch.no_grad():
for X, y in data_iter:
if isinstance(X, list): # 当X是一个列表的时候就需要把每个元素分别放入GPU中
X = [x.to(device) for x in X]
else:
X = X.to(device)
y = y.to(device)
metric.add(accuracy(net(X), y),y.numel())
return metric[0] / metric[1]
图片绘制
定义了绘制图片常用的类,再后面训练的时候会被反复使用
import matplotlib.pyplot as plt
from IPython import display
def set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend):
axes.set_xlabel(xlabel)
axes.set_ylabel(ylabel)
axes.set_xscale(xscale)
axes.set_yscale(yscale)
axes.set_xlim(xlim)
axes.set_ylim(ylim)
if legend:
axes.legend(legend)
axes.grid()
class Animator:
"""用于图像的绘制"""
def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None,
ylim=None, xscale='linear', yscale='linear',
fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1,
figsize=(3.5, 2.5)):
if legend is None:
legend = []
display.set_matplotlib_formats('svg') # 设置绘图方式为svg
self.fig, self.axes = plt.subplots(nrows, ncols, figsize=figsize)
if nrows * ncols == 1:
self.axes = [self.axes, ]
# 对每一个图片进行设置
self.config_axes = lambda: set_axes(
self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
self.X, self.Y, self.fmts = None, None, fmts
def add(self, x, y):
"""添加一组数据"""
if not hasattr(y, "__len__"):
y = [y]
n = len(y)
if not hasattr(x, "__len__"):
x = [x] * n
if not self.X:
self.X = [[] for _ in range(n)]
if not self.Y:
self.Y = [[] for _ in range(n)]
for i, (a, b) in enumerate(zip(x, y)):
if a is not None and b is not None:
self.X[i].append(a)
self.Y[i].append(b)
self.axes[0].cla()
for x, y, fmt in zip(self.X, self.Y, self.fmts):
self.axes[0].plot(x, y, fmt)
self.config_axes()
display.display(self.fig)
display.clear_output(wait=True)
训练
使用GPU的神经网络训练
def train(net, train_iter, test_iter, num_epochs, lr, device):
# 初始化每一层的权重
def init_weights(m):
if type(m) == nn.Linear or type(m) == nn.Conv2d:
nn.init.xavier_uniform_(m.weight)
net.apply(init_weights)
# 选择训练用的设备
print('training on', device)
net.to(device)
# 定义优化器与损失函数
optimizer = torch.optim.SGD(net.parameters(), lr=lr)
loss = nn.CrossEntropyLoss()
# 得到绘制训练损失的图表
animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs],
legend=['train loss', 'train acc', 'test acc'])
# 得到批量的大小
num_batches = len(train_iter)
for epoch in range(num_epochs):
# 开一个长度为3的计数器,其中分别储存:损失,正确预测的个数,总的样本数
metric = d2l.Accumulator(3)
net.train() # 网络进入训练模式
for i, (X, y) in enumerate(train_iter):
optimizer.zero_grad() # 梯度归0
X, y = X.to(device), y.to(device) # 放入对应的设备
# 前向传播
y_hat = net(X)
l = loss(y_hat, y)
# 反向传播
l.backward()
optimizer.step()
with torch.no_grad():
# 第一个统计的是批量损失,第二个统计的是正确预测的个数,第三个统计的是总的样本数
metric.add(l * X.shape[0], d2l.accuracy(y_hat, y), X.shape[0])
train_l = metric[0] / metric[2]
train_acc = metric[1] / metric[2]
# 每隔一定的batch数量,计算一次测试集的准确率
if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
animator.add(epoch + (i + 1) / num_batches,
(train_l, train_acc, None))
# 计算测试集上的准确率,并进行绘制
test_acc = evaluate_accuracy_gpu(net, test_iter)
animator.add(epoch + 1, (None, None, test_acc))
print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, '
f'test acc {test_acc:.3f}')
使用更多GPU进行并行训练
def train_batch(net, X, y, loss, trainer, device):
"""使用多GPU进行小批量训练"""
if isinstance(X, list):
X = [x.to(device) for x in X]
else:
X = X.to(device[0])
net.train() # 切换为训练模式
trainer.zero_grad() # 清零梯度
pred=net(X)
l=loss(pred,y)
l.sum().backward() # 反向传播
trainer.step() # 更新参数
# 计算损失
train_loss_sum=l.sum()
train_acc_sum=torch.sum(torch.argmax(pred,dim=1)==y).float()
return train_loss_sum,train_acc_sum
def train(net, train_iter, test_iter, loss, trainer, num_epochs, devices=try_all_gpus()):
num_batches = len(train_iter)
animator = Animator(xlabel='epoch', xlim=[1, num_epochs], ylim=[0, 1],
legend=['train loss', 'train acc', 'test acc'])
net = nn.DataParallel(net, device_ids=devices).to(devices[0])
for epoch in range(num_epochs):
metric = Accumulator(4)
for i, (features, labels) in enumerate(train_iter):
l, acc = train_batch(net, features, labels, loss, trainer, devices)
metric.add(l, acc, labels.shape[0], labels.numel())
if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
animator.add(epoch + (i + 1) / num_batches,
(metric[0] / metric[2], metric[1] / metric[3],
None))
test_acc = d2l.evaluate_accuracy_gpu(net, test_iter)
animator.add(epoch + 1, (None, None, test_acc))
print(f'loss {metric[0] / metric[2]:.3f}, train acc '
f'{metric[1] / metric[3]:.3f}, test acc {test_acc:.3f}')
查看层的结构
X = torch.rand(size=(1,1,224,224)) # 定义输入的形状
for layer in net:
X=layer(X) # 向前推进一层
print(layer.__class__.__name__, 'output shape:', X.shape)