pytorch迁移学习基准测试

随笔 2019-05-03 788 次浏览 次点赞

pytorch提供多种预训练模型可以用于图像分类任务中的训练和测试,下面提供它的训练和测试代码:

注意训练所用的图像目录结构必须是: data/train/a b .. data/val/a b .. ,a b 为不同类别的文件目录

from __future__ import print_function
from __future__ import division
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import time
import os
import copy
from cnn_finetune import make_model
print("PyTorch Version: ",torch.__version__)
print("Torchvision Version: ",torchvision.__version__)

data_dir = "/data"

# 可选择 [resnet, alexnet, vgg, squeezenet, densenet, inception] 预训练模型
model_name = "densenet"

# 类别数量。与文件夹对应
num_classes = 2

# batch size 越大占用内存越多,一般为8N
batch_size = 16

num_epochs = 30

# 特征提取的标志, 当为False时,微调整个模型,当True时只更新添加的网络参数
feature_extract = False

def train_model(model, dataloaders, criterion, optimizer, num_epochs=25, is_inception=False):
    since = time.time()

    val_acc_history = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # 每轮进行一次验证
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # 设为训练模式
            else:
                model.eval()   # 设为评价模式

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # 梯度置零
                optimizer.zero_grad()

                # 前向传播
                # 训练模式下保存训练记录
                with torch.set_grad_enabled(phase == 'train'):
                    # 计算损失率                          
                    # 测试模式只需要考虑输出
                    # 如果是inception结构的网络,loss计算有所不同
                    #if is_inception and phase == 'train':

                    #    outputs, aux_outputs = model(inputs)
                    #    loss1 = criterion(outputs, labels)
                    #    loss2 = criterion(aux_outputs, labels)
                    #    loss = loss1 + 0.4*loss2
                    #else:
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    _, preds = torch.max(outputs, 1)

                    # 在训练模式中才进行后向传播优化
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # 拷贝保存最优模型
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
                torch.save(best_model_wts,'../model/'+str(best_acc)+'.pth')
            if phase == 'val':
                val_acc_history.append(epoch_acc)

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # 加载模型
    model.load_state_dict(best_model_wts)
    return model, val_acc_history

def set_parameter_requires_grad(model, feature_extracting):
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False

def initialize_model(model_name, num_classes, feature_extract, use_pretrained=True):
    # 初始化特定模型参数,因为不同的网络input size和output numbers不同
    model_ft = None
    input_size = 0

    if model_name == "resnet":
        """ Resnet18
        """
        model_ft = models.resnet18(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == "alexnet":
        """ Alexnet
        """
        model_ft = models.alexnet(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "vgg":
        """ VGG11_bn
        """
        model_ft = models.vgg11_bn(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "squeezenet":
        """ Squeezenet
        """
        model_ft = models.squeezenet1_0(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        model_ft.classifier[1] = nn.Conv2d(512, num_classes, kernel_size=(1,1), stride=(1,1))
        model_ft.num_classes = num_classes
        input_size = 224

    elif model_name == "densenet":
        """ Densenet
        """
        model_ft = models.densenet161(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier.in_features
        model_ft.classifier = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == "inception":
        """ Inception v3
        Be careful, expects (299,299) sized images and has auxiliary output
        """
        model_ft = models.inception_v3(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        # Handle the auxilary net
        num_ftrs = model_ft.AuxLogits.fc.in_features
        model_ft.AuxLogits.fc = nn.Linear(num_ftrs, num_classes)
        # Handle the primary net
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs,num_classes)
        input_size = 299

    else:
        print("Invalid model name, exiting...")
        exit()

    return model_ft, input_size

# 初始化并开始训练
model_ft, input_size = initialize_model(model_name, num_classes, feature_extract, use_pretrained=True)

# 打印模型结构
print(model_ft)

#对图像进行预处理,归一化等操作
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(input_size),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(input_size),
        transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

print("Initializing Datasets and Dataloaders...")

# 加载训练和验证数据
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train', 'val']}

dataloaders_dict = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True, num_workers=4) for x in ['train', 'val']}

# 指定gpu
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# 模型加载到gpu
model_ft = model_ft.to(device)

params_to_update = model_ft.parameters()
print("Params to learn:")
if feature_extract:
    params_to_update = []
    for name,param in model_ft.named_parameters():
        if param.requires_grad == True:
            params_to_update.append(param)
            print("\t",name)
else:
    for name,param in model_ft.named_parameters():
        if param.requires_grad == True:
            print("\t",name)

# 设置优化器
optimizer_ft = optim.SGD(params_to_update, lr=0.001, momentum=0.9)

# 设置损失函数
criterion = nn.CrossEntropyLoss()

# 开始训练
model_ft, hist = train_model(model_ft, dataloaders_dict, criterion, optimizer_ft, num_epochs=num_epochs, is_inception=(model_name=="densenet"))

# 设置不使用预训练模型的版本
scratch_model,_ = initialize_model(model_name, num_classes, feature_extract=False, use_pretrained=False)
scratch_model = scratch_model.to(device)
scratch_optimizer = optim.SGD(scratch_model.parameters(), lr=0.001, momentum=0.9)
scratch_criterion = nn.CrossEntropyLoss()
_,scratch_hist = train_model(scratch_model, dataloaders_dict, scratch_criterion, scratch_optimizer, num_epochs=num_epochs, is_inception=(model_name=="densenet"))

# 使用matplotlib绘制两种训练方式的结果对比
ohist = []
shist = []

ohist = [h.cpu().numpy() for h in hist]
shist = [h.cpu().numpy() for h in scratch_hist]

plt.title("Validation Accuracy vs. Number of Training Epochs")
plt.xlabel("Training Epochs")
plt.ylabel("Validation Accuracy")
plt.plot(range(1,num_epochs+1),ohist,label="Pretrained")
plt.plot(range(1,num_epochs+1),shist,label="Scratch")
plt.ylim((0,1.))
plt.xticks(np.arange(1, num_epochs+1, 1.0))
plt.legend()
plt.savefig("train.png")

当获得模型之后,加载模型对未知图片进行批量测试:

import torch
import torch.nn as nn
from torchvision import datasets, models, transforms
from torch.autograd import Variable
from PIL import Image
import os
import pandas as pd
from tqdm import tqdm

csv_name = 'submit.csv' # 输出结果文件
num_classes = 2
input_size = 224
class_names = ['good', 'bad']
#device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")

# 和训练集相同的数据预处理
data_transforms = transforms.Compose([
        transforms.RandomResizedCrop(input_size),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

test_folder = '../test/'

# 这里的预训练模型设置方式要和训练时的对应
model_ft = models.densenet161(pretrained=True)

num_ftrs = model_ft.classifier.in_features
model_ft.classifier = nn.Linear(num_ftrs, num_classes)

model_ft = model_ft.to(device)
model_ft.load_state_dict(torch.load('../model/0.9795.pth', map_location='cpu'))

# 必须要先设置为评价模式
model_ft.eval()

id = []
label = []

# 批量测试
for filename in tqdm(os.listdir(test_folder)):
    name = (filename.split(".tif"))[0]
    #print(name)
    id.append(name)
    image = Image.open(test_folder+filename)
    image_tensor = data_transforms(image).float()
    image_tensor = image_tensor.unsqueeze_(0)
    input = Variable(image_tensor)
    input = input.to(device)
    output = model_ft(input)
    _, preds = torch.max(output, 1) # 输出可能性最大的类
    for j in range(input.size()[0]):
        #print("pred label:{}".format(class_names[preds[j]]))
        #print(int(preds[j]))
         label.append(int(preds[j]))

dataframe = pd.DataFrame({'id':id,'label':label})
dataframe.to_csv(csv_name, index=False, sep=',')

本文由 Tony 创作,采用 署名-非商业性使用-相同方式共享 3.0,可自由转载、引用,但需署名作者且注明文章出处。

赏个馒头吧