【ML笔记】PyTorch Workflow(工作流)

以下都是notebook直接转换成markdown,在Google Colab上验证可以正常执行。
可以在这里直接进入👉https://colab.research.google.com/drive/1TDM6Vz5efbjTzvXcn1RQMhF5iijg8pzm?usp=sharing
下面的内容是pytorch训练的一般的工作流

Initalize

1
2
3
4
import torch
import matplotlib.pyplot as plt
from torch import nn
import torch.optim as optim

Data Preparation

1
2
3
4
5
6
7
8
9
10
11
# linear regression
weight = 0.7
bias = 0.3

start = 0
end = 1
step = 0.02
X = torch.arange(start, end, step).unsqueeze(dim = 1)
Y = weight * X + bias

X, Y, len(X), len(Y)
1
2
3
4
5
6
7
8
9
10
11
12
# split data
x = torch.arange(start, end, step).unsqueeze(dim = 1)
y = weight * x + bias
train_split = int(0.8 * len(x))
test_split = int(0.2 * len(x))
x_train = x[:train_split]
y_train = y[:train_split]
x_test = x[-test_split:]
y_test = y[-test_split:]
# y_test, x_test
# test_split
len(x_test)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# visualize data
def plot_predictions(train_data = x_train,
train_labels = y_train,
test_data = x_test,
test_labels = y_test,
predictions = None):

plt.figure(figsize = (10, 7))

# training_data => blue
plt.scatter(train_data, train_labels, c="b", s=4, label="Training data")

#test_data => green
plt.scatter(test_data, test_labels, c="g", s=4, label="Test data")

if predictions is not None:
plt.scatter(test_data, predictions, c="r", s=4, label="Predictions")

plt.legend(prop={"size": 14});
1
plot_predictions();

Build a model

ML基本实现原理:线性公式通用,但是通过不断尝试新的随机weightsbias来无限接近于实际的weights &
bias,最后达成预测以及输出。以下是一些常见的参数及其解析

  • torch.nn - 包含了所有神经网络引擎需要用的参数
  • torch.nn.Parameter - 需要训练的参数,通常为权重和常数
  • torch.nn.Module - 引入了常见的参数库,如果需要重复调用需要forward函数
  • torch.optim - 字面意思,torch的优化
  • def forward() - 所有nn.Modules都需要,这里定义了模型在训练过程中的计算方法
1
2
3
4
5
6
7
8
9
10
11
class LinearRegressionModel(nn.Module):
def __init__(self):
super().__init__()
# 定义模型的可训练参数:权重和偏置
self.weight = nn.Parameter(torch.randn(1, requires_grad=True, dtype=torch.float))
self.bias = nn.Parameter(torch.randn(1, requires_grad=True, dtype=torch.float))

def forward(self, x: torch.Tensor) -> torch.Tensor:
# 线性回归的前向传播:y = wx + b
return self.weight * x + self.bias # 对于每一个神经元neruon

Use the model

1
2
3
4
5
6
7
# create a random seed
# 随机种子只是影响随机数生成器的初始状态,而不是从时间爱你维度上影响。通常来说,随机数的生成都是依靠时间戳
torch.manual_seed(42)

# create an instance of the model
model_0 = LinearRegressionModel()
list(model_0.parameters())
1
2
3
# show the parameters
# check the content of the model
model_0.state_dict()
1
2
3
4
5
6
7
# make (a) prediction(s)
# torch.inference_mode禁用了梯度计算,适用于推理(debug~),pytorch将不会追踪深度信息,可以使运行速度变得更加迅速
# 在一些老版本的torch中会使用到类似的torch.no_grad()
with torch.inference_mode():
y_preds = model_0(x_test)

y_preds, y_test
1
plot_predictions(predictions = y_preds)
1

Train the model

是时候要领出loss_function了,计算与理想值的差距。一些前置概念:

  • Loss function - 计算残差的方式,越小越好
  • Optimizer - 通过已知的残差来优化参数,从而达到训练的目的

训练过程

  • 训练循环
  • 测试循环

训练循环的过程:(gradient decrease & backpropagation)

  • 过一遍数据
  • 向前推进
  • 计算残差(loss_funtion)
  • 调整上一层参数(Optimizer,归零)
  • 回退到上一层,并且计算深度
  • 下一层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# setup a loss function
loss_fn = nn.L1Loss()

# setup a optimizer (Not a function)
optimizer = torch.optim.SGD(params = model_0.parameters(),
lr = 0.001) # lr对应改变parameters的对应几位小数
# building a training loop
torch.manual_seed(42)
epochs = 6000 # 需要重复训练的次数
for epochs in range(epochs):
model_0.train() # 进入训练模式
y_pred = model_0(x_train) # 通过模型得到预测数据
loss = loss_fn(y_pred, y_train) # 计算损失
# print(f"Loss: {loss}")
optimizer.zero_grad() # 深度归零
loss.backward() # 回溯
optimizer.step() # 更新参数,下一步



1
model_0.state_dict()
1
weight, bias

Test the model

测试模型的准确性,通常写在训练的for函数里面。
需要输出的数据有

  • 训练次数
  • 训练数据残差
  • 测试数据残差
1
2
3
4
5
6
7
model_0.eval() # 进入测试模式
with torch.inference_mode(): # 关闭深度追踪
y_preds_new = model_0(x_test)
test_pred = model_0(x_test)
test_loss = loss_fn(test_pred, y_test)
# plot_predictions(predictions = y_preds)
# plot_predictions(predictions = y_preds_new)

Saving a model

  • torch.save() - 保存一个模型
  • torch.load() - 加载一个模型
  • torch.nn.Module.load_state_dict() - 保存模型的训练(参数)状态以便之后继续训练
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pathlib import Path

# create model directory
MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents = True, exist_ok = True)

# create model save directory
MODEL_NAME = "model_0.pt"
MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME

MODEL_SAVE_PATH

# save the state dict
torch.save(model_0.state_dict(), MODEL_SAVE_PATH)

Loading a model

1
2
3
4
load_model_0 = LinearRegressionModel()
# load_model_0.state_dict()
load_model_0.load_state_dict(torch.load(f=MODEL_SAVE_PATH))
load_model_0.state_dict()

Altogether

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import torch
import matplotlib.pyplot as plt
from torch import nn
import torch.optim as optim
import numpy as np

class LinearRegressionModel(nn.Module):
def __init__(self):
super().__init__()
self.weight = nn.Parameter(torch.randn(1, requires_grad=True, dtype=torch.float))
self.bias = nn.Parameter(torch.randn(1, requires_grad=True, dtype=torch.float))

def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.weight * x + self.bias # 对于每一个神经元neruon


weight = 0.7
bias = 0.3

start = 0
end = 1
step = 0.02

x = torch.arange(start, end, step).unsqueeze(dim = 1)
y = weight * x + bias

train_split = int(0.8 * len(x))
test_split = int(0.2 * len(x))

x_train = x[:train_split]
y_train = y[:train_split]
x_test = x[-test_split:]
y_test = y[-test_split:]

def plot_predictions(train_data = x_train,
train_labels = y_train,
test_data = x_test,
test_labels = y_test,
predictions = None):
plt.figure(figsize = (10, 7))
plt.scatter(train_data, train_labels, c="b", s=4, label="Training data")
plt.scatter(test_data, test_labels, c="g", s=4, label="Test data")
if predictions is not None:
plt.scatter(test_data, predictions, c="r", s=4, label="Predictions")
plt.legend(prop={"size": 14});

model_0 = LinearRegressionModel()
epochs = 6000
loss_fn = nn.L1Loss()
optimizer = torch.optim.SGD(params = model_0.parameters(), lr = 0.001)
epochs_count = []
train_loss = []
test_loss = []
lass_loss = -1;

for epochs in range(epochs):
model_0.train() # 训练
y_pred = model_0(x_train)
loss = loss_fn(y_pred, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
model_0.eval()
if (epochs % 10 == 0):
with torch.inference_mode():
y_test_pred = model_0(x_test)
loss_test = loss_fn(y_test_pred, y_test)
train_loss.append(loss)
test_loss.append(loss_test)
epochs_count.append(epochs)
# print(f"epochs = {epochs} | train_loss = {loss} | test_loss = {loss_test}")

plt.plot(epochs_count, torch.tensor(train_loss).numpy(), label = "Train loss")
plt.plot(epochs_count, test_loss, label = "Test loss")
plt.title("Training and test loss curves")
plt.ylabel("Loss")
plt.xlabel("Epochs")
plt.legend()

Put everything together on a GPU

记得把模型和相关tensor加载进GPU,以及在绘图的时候加载回CPU

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import torch
import matplotlib.pyplot as plt
from torch import nn
import torch.optim as optim
import numpy as np

if torch.cuda.is_available:
device = "cuda"
else:
device = "cpu"

print(f"device is {device}")

class LinearRegressionModel(nn.Module):
def __init__(self):
super().__init__()
self.linear_layer = nn.Linear(in_features = 1, out_features = 1)

def forward(self, x: torch.tensor) -> torch.Tensor:
return self.linear_layer(x)


weight = 0.7
bias = 0.3

start = 0
end = 1
step = 0.02

x = torch.arange(start, end, step).unsqueeze(dim = 1) # 将最后一个维度设为1,方便进行线性运算
y = weight * x + bias

train_split = int(0.8 * len(x))
test_split = int(0.2 * len(x))

x_train = x[:train_split]
y_train = y[:train_split]
x_test = x[-test_split:]
y_test = y[-test_split:]

def plot_predictions(train_data = x_train,
train_labels = y_train,
test_data = x_test,
test_labels = y_test,
predictions = None):
plt.figure(figsize = (10, 7))
plt.scatter(train_data, train_labels, c="b", s=4, label="Training data")
plt.scatter(test_data, test_labels, c="g", s=4, label="Test data")
if predictions is not None:
plt.scatter(test_data, predictions, c="r", s=4, label="Predictions")
plt.legend(prop={"size": 14});

torch.manual_seed(42)
model_0 = LinearRegressionModel()

model_0 = model_0.to(device)
x_train = x_train.to(device)
y_train = y_train.to(device)
x_test = x_test.to(device)
y_test = y_test.to(device)

epochs = 1000
loss_fn = nn.L1Loss()
optimizer = torch.optim.SGD(params = model_0.parameters(), lr = 0.001)
epochs_count = []
train_loss = []
test_loss = []
lass_loss = -1;

for epochs in range(epochs):
model_0.train() # 训练
y_pred = model_0(x_train)
loss = loss_fn(y_pred, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
model_0.eval()
if (epochs % 10 == 0):
with torch.inference_mode():
y_test_pred = model_0(x_test)
loss_test = loss_fn(y_test_pred, y_test)
train_loss.append(loss)
test_loss.append(loss_test)
epochs_count.append(epochs)
# print(f"epochs = {epochs} | train_loss = {loss} | test_loss = {loss_test}")

plt.plot(epochs_count, torch.tensor(train_loss).cpu().numpy(), label = "Train loss")
plt.plot(epochs_count, torch.tensor(test_loss).cpu().numpy(), label = "Test loss")
plt.title("Training and test loss curves")
plt.ylabel("Loss")
plt.xlabel("Epochs")
plt.legend()

【ML笔记】PyTorch Workflow(工作流)
https://学习.fun/ml-note/pytorch-workflow/
Author
Stephen Zeng
Posted on
August 1, 2024
Licensed under