Pytorch常用代码片段

3月 21, 2023 · 4 分钟阅读时长

仿照d2l的pytorch内容进行记录

设备切换

尝试使用GPU

def try_gpu(i=0):
    if torch.cuda.device_count() >= i + 1:
        return torch.device(f'cuda:{i}')
    return torch.device('cpu')
def try_all_gpus():
    """找到所有可以使用的GPU"""
    devices = [torch.device(f'cuda:{i}')
               for i in range(torch.cuda.device_count())]
    return devices if devices else [torch.device('cpu')]

计数器

class Accumulator:
    """用于计数"""
    def __init__(self, n):
        self.data = [0.0] * n

    def add(self, *args):
        """将一组数据加载本身的数据之上"""
        self.data = [a + float(b) for a, b in zip(self.data, args)]

    def reset(self):
        self.data = [0.0] * len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

精确性

对于训练精度评价可以使用如下代码

def accuracy(y_hat, y):
    """计算准确度"""
    if len(y_hat.shape) >1 and y_hat.shape[1] > 1:
        y_hat = y_hat.argmax(axis=1)
    cmp = y_hat.type(y.dtype) == y
    return float(cmp.type(y_hat.dtype).sum())

def evaluate_accuracy(net, data_iter):
    """准确度计算"""
    if isinstance(net, nn.Module):
        net.eval() # 设置为评估模式
    # 使用计数器分别表示正确预测的个数和总的预测个数
    metric = Accumulator(2)
    with torch.no_grad():
        for X, y in data_iter:
            metric.add(accuracy(net(X), y), y.numel())
    return metric[0] / metric[1]

当使用GPU的时候需要进行额外的处理,主要可以使用如下内容

def evaluate_accuracy_gpu(net, data_iter, device=None):
    """在GPU上的准确度计算"""
    if isinstance(net, nn.Module):
        net.eval() # 设置为评估模式
        if not device: # 如果没指定device,则使用网络所处的device
            device = next(iter(net.parameters())).device

    # 使用计数器分别表示正确预测的个数和总的预测个数
    metric = Accumulator(2)

    with torch.no_grad():
        for X, y in data_iter:
            if isinstance(X, list): # 当X是一个列表的时候就需要把每个元素分别放入GPU中
                X = [x.to(device) for x in X]
            else:
                X = X.to(device)
            y = y.to(device)
            metric.add(accuracy(net(X), y),y.numel())
    return metric[0] / metric[1]

图片绘制

定义了绘制图片常用的类,再后面训练的时候会被反复使用

import matplotlib.pyplot as plt
from IPython import display


def set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend):

    axes.set_xlabel(xlabel)
    axes.set_ylabel(ylabel)
    axes.set_xscale(xscale)
    axes.set_yscale(yscale)
    axes.set_xlim(xlim)
    axes.set_ylim(ylim)
    if legend:
        axes.legend(legend)
    axes.grid()


class Animator:
    """用于图像的绘制"""

    def __init__(self, xlabel=None, ylabel=None, legend=None, xlim=None,
                 ylim=None, xscale='linear', yscale='linear',
                 fmts=('-', 'm--', 'g-.', 'r:'), nrows=1, ncols=1,
                 figsize=(3.5, 2.5)):

        if legend is None:
            legend = []
        display.set_matplotlib_formats('svg')  # 设置绘图方式为svg
        self.fig, self.axes = plt.subplots(nrows, ncols, figsize=figsize)

        if nrows * ncols == 1:
            self.axes = [self.axes, ]

        # 对每一个图片进行设置
        self.config_axes = lambda: set_axes(
            self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend)

        self.X, self.Y, self.fmts = None, None, fmts

    def add(self, x, y):
        """添加一组数据"""
        if not hasattr(y, "__len__"):
            y = [y]
        n = len(y)
        if not hasattr(x, "__len__"):
            x = [x] * n
        if not self.X:
            self.X = [[] for _ in range(n)]
        if not self.Y:
            self.Y = [[] for _ in range(n)]
        for i, (a, b) in enumerate(zip(x, y)):
            if a is not None and b is not None:
                self.X[i].append(a)
                self.Y[i].append(b)
        self.axes[0].cla()
        for x, y, fmt in zip(self.X, self.Y, self.fmts):
            self.axes[0].plot(x, y, fmt)
        self.config_axes()

        display.display(self.fig)
        display.clear_output(wait=True)

训练

使用GPU的神经网络训练

def train(net, train_iter, test_iter, num_epochs, lr, device):
    # 初始化每一层的权重
    def init_weights(m):
        if type(m) == nn.Linear or type(m) == nn.Conv2d:
            nn.init.xavier_uniform_(m.weight)
    net.apply(init_weights)

    # 选择训练用的设备
    print('training on', device)
    net.to(device)
    # 定义优化器与损失函数
    optimizer = torch.optim.SGD(net.parameters(), lr=lr)
    loss = nn.CrossEntropyLoss()

    # 得到绘制训练损失的图表
    animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs],
                            legend=['train loss', 'train acc', 'test acc'])

    # 得到批量的大小
    num_batches = len(train_iter)

    for epoch in range(num_epochs):
        # 开一个长度为3的计数器,其中分别储存:损失,正确预测的个数,总的样本数
        metric = d2l.Accumulator(3)

        net.train()  # 网络进入训练模式
        for i, (X, y) in enumerate(train_iter):
            optimizer.zero_grad()  # 梯度归0
            X, y = X.to(device), y.to(device)  # 放入对应的设备

            # 前向传播
            y_hat = net(X)
            l = loss(y_hat, y)
            # 反向传播
            l.backward()
            optimizer.step()

            with torch.no_grad():
                # 第一个统计的是批量损失,第二个统计的是正确预测的个数,第三个统计的是总的样本数
                metric.add(l * X.shape[0], d2l.accuracy(y_hat, y), X.shape[0])

            train_l = metric[0] / metric[2]
            train_acc = metric[1] / metric[2]

            # 每隔一定的batch数量,计算一次测试集的准确率
            if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
                animator.add(epoch + (i + 1) / num_batches,
                             (train_l, train_acc, None))

        # 计算测试集上的准确率,并进行绘制
        test_acc = evaluate_accuracy_gpu(net, test_iter)
        animator.add(epoch + 1, (None, None, test_acc))

    print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, '
          f'test acc {test_acc:.3f}')

使用更多GPU进行并行训练

def train_batch(net, X, y, loss, trainer, device):
    """使用多GPU进行小批量训练"""
    if isinstance(X, list):
        X = [x.to(device) for x in X]
    else:
        X = X.to(device[0])

    net.train()  # 切换为训练模式
    trainer.zero_grad()  # 清零梯度
    pred=net(X)
    l=loss(pred,y)
    l.sum().backward() # 反向传播
    trainer.step() # 更新参数

    # 计算损失
    train_loss_sum=l.sum()
    train_acc_sum=torch.sum(torch.argmax(pred,dim=1)==y).float()
    return train_loss_sum,train_acc_sum

def train(net, train_iter, test_iter, loss, trainer, num_epochs, devices=try_all_gpus()):
    num_batches = len(train_iter)
    animator = Animator(xlabel='epoch', xlim=[1, num_epochs], ylim=[0, 1],
                        legend=['train loss', 'train acc', 'test acc'])
    net = nn.DataParallel(net, device_ids=devices).to(devices[0])
    for epoch in range(num_epochs):
        metric = Accumulator(4)
        for i, (features, labels) in enumerate(train_iter):
            l, acc = train_batch(net, features, labels, loss, trainer, devices)
            metric.add(l, acc, labels.shape[0], labels.numel())
            if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
                animator.add(epoch + (i + 1) / num_batches,
                             (metric[0] / metric[2], metric[1] / metric[3],
                              None))
        test_acc = d2l.evaluate_accuracy_gpu(net, test_iter)
        animator.add(epoch + 1, (None, None, test_acc))
    print(f'loss {metric[0] / metric[2]:.3f}, train acc '
          f'{metric[1] / metric[3]:.3f}, test acc {test_acc:.3f}')

查看层的结构

X = torch.rand(size=(1,1,224,224)) # 定义输入的形状
for layer in net:
    X=layer(X) # 向前推进一层
    print(layer.__class__.__name__, 'output shape:', X.shape)