1 Python基础练习
1-1 创建矩阵
import numpy as np
import struct
def createMatrixList(len, rows, cols):
list = []
for i in range(len):
matrix = []
for j in range(rows):
row_list = []
for k in range(cols):
row_list.append((i + 1) * 10 + j * cols + k + 1)
matrix.append(row_list)
list.append(matrix)
result = np.array(list,dtype='ubyte')
return result
1-2 矩阵写入idx文件
import numpy as np
import struct
def writeMatrixList(filename,matrix_list):
with open(filename, 'wb') as f:
shapes = matrix_list.shape
file_head_fmt = '>HBB'
file_head = struct.pack(file_head_fmt, 0, 8, len(shapes))
f.write(file_head)
file_head_fmt = '>I'
for i in shapes:
file_head = struct.pack(file_head_fmt, i)
f.write(file_head)
f.write(matrix_list)
1-3 从IDX文件中恢复矩阵
import numpy as np
import struct
def readMatrixFromFile(filename):
with open(filename,'rb') as f:
data_buf=f.read()
off_set=0
file_head_fmt='>HBB'
_,_,dimslen=struct.unpack_from(file_head_fmt,data_buf,off_set)
off_set+=struct.calcsize(file_head_fmt)
file_head_fmt='>{}I'.format(dimslen)
shapes=struct.unpack_from(file_head_fmt,data_buf,off_set)
off_set+=struct.calcsize(file_head_fmt)
data_fmt='>'+str(np.prod(shapes))+'B'
matrix_list=struct.unpack_from(data_fmt,data_buf,off_set)
matrix_list=np.reshape(matrix_list,shapes)
return matrix_list
1-4 创建批量样本生成器
import numpy as np
import struct
def trainBatchReader(idx_filename, batch_size, drop_last):
def batch_reader():
with open(idx_filename,'rb') as f:
data_buf=f.read()
off_set=0
file_head_fmt='>HBB'
_,_,dimslen=struct.unpack_from(file_head_fmt,data_buf,off_set)
off_set+=struct.calcsize(file_head_fmt)
file_head_fmt='>{}I'.format(dimslen)
shapes=struct.unpack_from(file_head_fmt,data_buf,off_set)
off_set+=struct.calcsize(file_head_fmt)
data_fmt='>'+str(np.prod(shapes))+'B'
matrix_list=struct.unpack_from(data_fmt,data_buf,off_set)
matrix_list=np.reshape(matrix_list,shapes)
b=[]
for instance in matrix_list:
b.append(instance)
if len(b)==batch_size:
yield b
b=[]
if drop_last is False and len(b) !=0:
yield b
return batch_reader()
2 基于pytorch的线性回归
2-1 测试环境
import random
import torch
import numpy as np
def synthetic_data(w, b, num_examples):
"""⽣成y=Xw+b+噪声"""
X = torch.normal(0, 1, (num_examples, len(w)))
y = torch.matmul(X, w) + b
y += torch.normal(0, 0.01, y.shape)
return X, y.reshape((-1, 1))
true_w = torch.tensor([2, -3.4])
true_b = 4.2
features, labels = synthetic_data(true_w, true_b, 1000)
# print('features:', features[0],'\nlabel:', labels[0])
def data_iter(batch_size, features, labels):
num_examples = len(features)
indices = list(range(num_examples))
# 这些样本是随机读取的,没有特定的顺序
random.shuffle(indices)
for i in range(0, num_examples, batch_size):
batch_indices = torch.tensor(
indices[i: min(i + batch_size, num_examples)])
yield features[batch_indices], labels[batch_indices]
batch_size = 10
# for X, y in data_iter(batch_size, features, labels):
# print(X, '\n', y)
# break
w = torch.normal(0, 0.01, size=(2,1), requires_grad=True)
b = torch.zeros(1, requires_grad=True)
def linreg(X, w, b):
"""线性回归模型"""
return torch.matmul(X, w) + b
def squared_loss(y_predict, y):
"""均⽅损失"""
return (y_predict - y.reshape(y_predict.shape)) ** 2 / 2
def sgd(params, lr, batch_size):
"""⼩批量随机梯度下降"""
with torch.no_grad():
for param in params:
param -= lr * param.grad / batch_size
param.grad.zero_()
lr = 0.03
num_epochs = 3
net = linreg
for epoch in range(num_epochs):
for X, y in data_iter(batch_size, features, labels):
loss = squared_loss(net(X, w, b), y) # X和y的⼩批量损失
# 因为loss形状是(batch_size,1),⽽不是⼀个标量。loss中的所有元素被加到⼀起,
# 并以此计算关于[w,b]的梯度
loss.sum().backward()
sgd([w, b], lr, batch_size) # 使⽤参数的梯度更新参数
with torch.no_grad():
train_l = squared_loss(net(features, w, b), labels)
# print(f'epoch {epoch + 1}, loss {float(train_l.mean()):f}')
# if sum(true_w - w.reshape(true_w.shape)) <1.0:
# print(True)
# if true_b - b<1.0:
# print(True)
3 逻辑回归
3-1 二分类问题-pytorch逻辑回归
import torch
import torch.nn as nn
import numpy as np
#我们这里使用的 german.data-numeric是numpy处理好数值化数据,我们直接使用numpy的load方法读取即可
data=np.loadtxt("/data/workspace/myshixun/logistic_regression/data/german.data-numeric")
n,l=data.shape
#显示行列数,行数据表示样本数量,列数-1表示特征向量长度,最后一列是类别标注
print(n,l)
#由于每列数据表示一个特征量,这些特征的量纲不同,需要按列进行归一化
#最后一列是类型标识,不用归一化
for j in range(l-1):
#任务1:按列求均匀
meanVal=np.mean(data[:,j])
#任务2:按列求标准差
stdVal=np.std(data[:,j])
data[:,j]=(data[:,j]-meanVal)/stdVal
#打乱数据
np.random.shuffle(data)
#取前900行用于训练,后100行用于测试
train_data=data[:900,:l-1]
train_lab=data[:900,l-1]-1
test_data=data[900:,:l-1]
test_lab=data[900:,l-1]-1
class LR(nn.Module):
def __init__(self):
super(LR, self).__init__()
#任务3:定义前向计算模型,采用线程回归,特征向量为24维,分类目标为2类
self.fc = nn.Linear(24,2)
def forward(self, x):
out = self.fc(x)
#任务4:输出标准逻辑回归值
out = torch.sigmoid(out)
return out
def test(pred,lab):
t=pred.max(-1)[1]==lab
return torch.mean(t.float())
#创建回归模型
net=LR()
#设定损失函数
criterion=nn.CrossEntropyLoss() # 使用CrossEntropyLoss损失
#设定优化的数和优势方法
optm=torch.optim.Adam(net.parameters()) # Adam优化
epochs=1000 # 训练1000次
4 基于mnist数据集的手写数字识别
4-1 创建训练样本批量生成器
########1.加载数据
import struct,random,numpy as np
code2type = {0x08: 'B', 0x09: 'b', 0x0B: 'h', 0x0c: 'i', 0x0D: 'f', 0x0E: 'd'}
def readMatrix(filename):
with open(filename,'rb') as f:
buff = f.read()
offset = 0
fmt = '>HBB'#格式定义,>表示高位在前,I表示4字节整数
_,dcode,dimslen = struct.unpack_from(fmt,buff,offset)
offset += struct.calcsize(fmt)
fmt = '>{}I'.format(dimslen)
shapes = struct.unpack_from(fmt,buff,offset)
offset += struct.calcsize(fmt)
fmt = '>'+ str(np.prod(shapes)) + code2type[dcode]
matrix = struct.unpack_from(fmt,buff,offset)
matrix = np.reshape(matrix,shapes).astype(code2type[dcode])
return matrix
def dataReader(imgfile, labelfile, batch_size, drop_last):
images = readMatrix(imgfile)
labels = readMatrix(labelfile)
buff = list(zip(images, labels))
batchnum = int(len(images)/batch_size)
random.shuffle(buff)
def batch_reader():
for i in range(batchnum):
yield buff[(i) * batch_size:(i + 1) * batch_size]
if drop_last and len(images) % batch_size != 0:
yield buff[batchnum * batch_size:]
return batch_reader
pass
4-2 创建卷积网络
import numpy as np
import torch
import torch.nn as nn,torch.nn.functional as F,torch.optim as optim
from loader import dataReader
#########2.定义卷积神经网络
class MnistNet(nn.Module):
def __init__(self):
super(MnistNet, self).__init__()
self.conv1 = nn.Conv2d(in_channels=1,out_channels=6,kernel_size=5,stride=1,padding=2)
self.pool = nn.MaxPool2d(kernel_size=2,stride=2)
self.conv2 = nn.Conv2d(6,16,5)
self.fc1 = nn.Linear(16*5*5,120)
self.fc2 = nn.Linear(120,84)
self.fc3 = nn.Linear(84,10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1,16*5*5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
# 3.训练网络
def train(loader):
model = MnistNet()
######## 让我们使用分类交叉熵损失和带有动量的 SGD。
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
for epoch in range(1):
running_loss = 0.0
for i, data in enumerate(loader()):
inputs, labels = zip(*data)
inputs = np.array(inputs).astype('float32')
labels = np.array(labels).astype('int64')
inputs = torch.from_numpy(inputs).unsqueeze(1)#扩展通道维度 NCHW
labels = torch.from_numpy(labels)
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
if i % 100 == 99:
last_loss = running_loss / 100 # loss per batch
print(' batch {} loss: {}'.format(i + 1, last_loss))
running_loss = 0.
if i==199:
break
print('Finished Training')
return model
# 4.测试网络
def test(PATH,loader):
# 让我们重新加载保存的模型
model = MnistNet()
model.load_state_dict(torch.load(PATH))
correct = 0
total = 0
with torch.no_grad():
for data in loader():
images, labels = zip(*data)
images = np.array(images).astype('float32')
labels = np.array(labels).astype('int64')
images = torch.from_numpy(images).unsqueeze(1)
labels = torch.from_numpy(labels)
outputs = model(images)
_, predicted = torch.max(outputs, 1)#torch.argmax
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('Accuracy of the network on the {:d} test images: {:f}%'.format(total,100 * correct / total))
return model
if __name__ == '__main__':
BATCH_SIZE = 16
PATH = '/data/workspace/myshixun/mnist/model/mnist_model.pth'
train_loader = dataReader('/data/workspace/myshixun/mnist/data/train-images-idx3-ubyte', '/data/workspace/myshixun/mnist/data/train-labels-idx1-ubyte', BATCH_SIZE, True)
test_loader = dataReader('/data/workspace/myshixun/mnist/data/t10k-images-idx3-ubyte', '/data/workspace/myshixun/mnist/data/t10k-labels-idx1-ubyte', BATCH_SIZE, False)
model = train(train_loader)
#快速保存我们训练过的模型:
torch.save(model.state_dict(), PATH)
test(PATH,test_loader)
5 循环神经网络
5-1 从zip文件中读取文本文件,并创建字典
import zipfile # https://docs.python.org/zh-cn/2/library/zipfile.html
# import tarfile #https://docs.python.org/zh-cn/3/library/tarfile.html
import re # https://docs.python.org/zh-cn/3/library/re.html
import collections #https://docs.python.org/zh-cn/3/library/collections.html
import json #https://docs.python.org/3/library/json.html
import numpy as np
# 1.读取zip文件中的目录及文件列表
def getFileListFromZip(zippath):
myZip = zipfile.ZipFile(zippath)
fileList = myZip.namelist()
#print(fileList)
return fileList
# 2.从zip文件中读取指定的文本文件,并用'utf-8'解码后返回该为字符串
def getTextFromZip(zippath, filepath):
myZip = zipfile.ZipFile(zippath)
textstr = myZip.read(filepath).decode('utf-8')
return textstr
# 3.判断路径是否以.txt结尾的文本文件名
def isTxtFile(filepath):
return bool(re.match(r'.*\.txt$', filepath))
# 4.将字节串按规定转为单词列表
def str2words(txtstr):
wordlist = re.sub(r'[\r\n]', ' ', txtstr)
wordlist = re.sub(r'(<.*>)|([^A-Za-z ])','', wordlist)
wordlist = re.sub(r'\s+', ' ', wordlist)
wordlist = wordlist.strip()
wordlist = wordlist.lower()
wordlist = wordlist.split(' ')
return wordlist
# 5.创建字典
def buildDict(zippath, cutoff=50):
myzip = zipfile.ZipFile(zippath)
filelist = myzip.namelist()
wordDict = collections.defaultdict(int)
for filepath in filelist:
if not isTxtFile(filepath):
continue
txtstr = myzip.read(filepath).decode('utf-8')
wordlist = str2words(txtstr)
for word in wordlist:
wordDict[word] += 1
wordDict = [x for x in wordDict.items() if x[1] > cutoff]
dictionary = sorted(wordDict, key=lambda x: (-x[1],x[0]))
words, _ = list(zip(*dictionary))
word_idx = dict(list(zip(words,range(len(words)))))
word_idx['<unk>'] = len(words)
return word_idx
# 6.将文本段根据字典转为编号序列
def txt2digseq(txtstr, word_dict):
words = str2words(txtstr)
UNK = word_dict['<unk>']
tagseq = list(np.int64([word_dict.get(word, UNK) for word in words]))
return tagseq
# 将字典dic存储为json格式的文件
def writeDict(dict, filename='/data/workspace/myshixun/text_proj/data/word_dict.json'):
with open(filename, 'wb') as f:
# 给出indent参数后,字典自动添加换行和缩进
f.write(json.dumps(dict, ensure_ascii=False, indent=4).encode('utf8'))
# 从json格式文件中恢复字典dic
def readDict(filename='/data/workspace/myshixun/text_proj/data/word_dict.json'):
with open(filename, 'r', encoding='utf-8') as f:
dict = json.load(f)
return dict
5-2 创建面向序列的数据生成器
from build_dict import readDict,txt2digseq
import torch
from torch.utils.data import Dataset,DataLoader
import re
import zipfile
class TxtDataset(Dataset):
def __init__(self,zip_path,word_dict):
self.word_dict = word_dict
self.txtzip = zipfile.ZipFile(zip_path)
self.filelist = []
for filepath in self.txtzip.namelist():
if bool(re.match(r'.*\.txt$', filepath)):
if bool(re.match(r'^minitrain/pos',filepath)):
self.filelist.append([filepath,1])
else:
self.filelist.append([filepath,0])
self.len = len(self.filelist)
def __getitem__(self, index):
txtstr = self.txtzip.read(self.filelist[index][0]).decode('utf-8')
return self.filelist[index][1], txt2digseq(txtstr,self.word_dict)
def __len__(self):
return self.len
def collate_batch(batch):
label_list, text_list, offsets = [], [], [0]
for(_label, _text) in batch:
label_list.append(_label)
processed_text = torch.tensor(_text, dtype=torch.int64)
text_list.append(processed_text)
offsets.append(processed_text.size(0))
label_list = torch.tensor(label_list, dtype=torch.int64)
offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)
text_list = torch.cat(text_list)
return label_list, text_list, offsets
5-3 创建文件分类器
import torch
from create_loader import readDict,txt2digseq
from torch.utils.data import Dataset,DataLoader
class TextClassifyModel(torch.nn.Module):
def __init__(self, vocab_size, embed_dim, num_class):
super(TextClassifyModel, self).__init__()
self.embedding =torch.nn.EmbeddingBag(vocab_size, embed_dim, sparse=True) #补充代码
self.fc = torch.nn.Linear(embed_dim, num_class) #补充代码
self.init_weights()
def init_weights(self):
initrange = 0.5
self.embedding.weight.data.uniform_(-initrange, initrange)
self.fc.weight.data.uniform_(-initrange, initrange)
self.fc.bias.data.zero_()
def forward(self, text, offsets):
embedded = self.embedding(text, offsets) #补充代码
return self.fc(embedded)
# Define functions to train the model
class ModelRun():
def __init__(self,vocab_size,emsize,num_class,lr,epochs):
self.criterion = torch.nn.CrossEntropyLoss()
self.model = TextClassifyModel(vocab_size, emsize, num_class)
self.optimizer = torch.optim.SGD(self.model.parameters(), lr=lr)
self.epochs = epochs
#补充代码
def train(self, dataloader):
self.model.train() #补充代码
for epoch in range(1, self.epochs + 1):
total_acc, total_count = 0, 0
log_interval = 10
for idx, (label, text, offsets) in enumerate(dataloader):
self.optimizer.zero_grad()
predicted_label = self.model(text, offsets)
loss = self.criterion(predicted_label, label)
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 0.1)
self.optimizer.step()
total_acc += (predicted_label.argmax(1) == label).sum().item()
total_count += label.size(0)
if idx % log_interval == 0 and idx > 0:
#print('| epoch {:3d} | {:5d}/{:5d} batches '
#'| accracy {:8.3f}'.format(epoch, idx, len(dataloader), total_acc / total_count))
total_acc, total_count = 0, 0
return self.model
def predict(text,word_dict, model):
model.eval() #补充代码
text = torch.tensor(txt2digseq(text, word_dict))
output = model(text, torch.tensor([0]))
return output.argmax(1).item()
6 强化学习
6-1 强化学习-游戏杆操作策略学习–基于策略估计
6-1-1 定义策略网络
import torch
import torch.nn as nn
import numpy as np
import gym
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
class PolicyNet(nn.Module):
def __init__(self, n_features, n_actions):
super(PolicyNet, self).__init__()
#********1定义计算单元*********#
self.fc1 = nn.Sequential(
nn.Linear(n_features, 10),
nn.Tanh()
)
self.fc2 = nn.Sequential(
nn.Linear(10, n_actions),
nn.Softmax(dim = 1)
)
#********1定义计算单元*********#
def forward(self, x):
#********2定义计算结构*********#
x = self.fc1(x)
x = self.fc2(x)
return x
#********2定义计算结构*********#
6-1-2 定义损失函数
import torch
import torch.nn as nn
import numpy as np
import gym
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
from policy_net import PolicyNet
class Agent():
def __init__(
self,
n_actions,
n_features,
learning_rate=0.01,
reward_decay=0.95
):
self.n_actions = n_actions
self.n_features = n_features
self.lr = learning_rate
self.gamma = reward_decay
self.obs = []
self.acs = []
self.rws = []
self.net = PolicyNet(n_features,n_actions )
self.loss = nn.CrossEntropyLoss()
self.optimizer = torch.optim.Adam(self.net.parameters(), lr=learning_rate)
def choose_action(self, observation):
self.net.eval()
actions = self.net(torch.Tensor(observation[np.newaxis, :]))
action = np.random.choice(range(actions.shape[1]), p=actions.view(-1).detach().numpy())
return action
def store_transition(self, s, a, r):
self.obs.append(s)
self.acs.append(a)
self.rws.append(r)
def learn(self):
self.net.train()
discount = self._discount_and_norm_rewards()
output = self.net(torch.Tensor(self.obs))
one_hot = torch.zeros(len(self.acs), self.n_actions).\
scatter_(1, torch.LongTensor(self.acs).view(-1,1), 1)
neg = torch.sum(-torch.log(output) * one_hot, 1)
#**********定义损失函数*****#
loss = neg * torch.Tensor(discount)
loss = loss.mean()
#**********定义损失函数*****#
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.acs = []
self.obs = []
self.rws = []
return discount
def _discount_and_norm_rewards(self):
discount = np.zeros_like(self.rws)
tmp = 0
for i in reversed(range(len(self.rws))):
tmp = tmp * self.gamma + self.rws[i]
discount[i] = tmp
discount -= np.mean(discount)
discount /= np.std(discount) +0.0001 #归一化
return discount
def train():
env = gym.make('CartPole-v0')
env.seed(1)
# env = env.unwrapped #解除步数限制
# print(env.action_space)
# print(env.observation_space)
# print(env.observation_space.high)
# print(env.observation_space.low)
RL = Agent(
n_actions=env.action_space.n,
n_features=env.observation_space.shape[0],
learning_rate=0.02,
reward_decay=0.99
)
for i_episode in range(2):
s0 = env.reset()
while True:
# env.render()
a0 = RL.choose_action(s0)
s1, r1, done, info = env.step(a0)
if done:
r1 = -1
RL.store_transition(s0, a0, r1)
if done:
ep_rs_sum = sum(RL.rws)
# print("episode:", i_episode, " reward:", int(ep_rs_sum))
print("episode:", i_episode)
RL.learn()
break
s0 = s1
torch.save(RL.net.state_dict(), '/data/workspace/myshixun/RL_Policy_proj/src/policy.pt')
6-1-3 策略模型的应用
import torch
import torch.nn as nn
import numpy as np
import gym
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
from policy_net import PolicyNet
from agent import Agent
class Test():
def __init__(self,modelpath= '/data/workspace/myshixun/RL_Policy_proj/src/policy.pt'):
self.modelpath = modelpath
def select_action(self, state):
state = torch.from_numpy(state).float().unsqueeze(0)
#****根据策略模型选择动作***#
probs = self.model(state)
m = Categorical(probs)
action = m.sample()
return action.item()
#****根据策略模型选择动作***#
def eval(self):
self.model = PolicyNet(4, 2)
self.model.load_state_dict(torch.load(self.modelpath))
self.model.eval()
env = gym.make('CartPole-v1')
t_all = []
for i_episode in range(2):
s0 = env.reset()
for t in range(1000):
# env.render()
cp, cv, pa, pv = s0
a0 = self.select_action(s0)
s1, r1, done, info = env.step(a0)
s0 = s1
if done:
# print("Episode finished after {} timesteps".format(t + 1))
t_all.append(t)
break
env.close()
return sum(t_all) / len(t_all)
6-2 强化学习-游戏杆操作策略学习–基于值函数估计
6-2-1 Q_learning
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
class Net(nn.Module):
def __init__(self, input_size, output_size):
super().__init__()
#****任务1******定义网络计算单元****#
self.linear=nn.Linear(input_size,output_size)
#****任务1******定义网络计算单元****#
def forward(self, x):
#****任务2******网络结构****#
x=self.linear(x)
#****任务2******网络结构****#
return x
6-2-2 Q_learning定义Agent的损失
# coding: utf-8
__author__ = 'zhenhang.sun@gmail.com'
__version__ = '1.0.0'
import gym
import math
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from Q_net import Net
class Agent(object):
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
self.eval_net = Net(self.state_space_dim, self.action_space_dim)
self.optimizer = optim.Adam(self.eval_net.parameters(), lr=self.lr)
self.buffer = []
self.steps = 0
def act(self, s0):
self.steps += 1
epsi = self.epsi_low + (self.epsi_high - self.epsi_low) * (math.exp(-1.0 * self.steps / self.decay))
if random.random() < epsi:
a0 = random.randrange(self.action_space_dim)
else:
s0 = torch.tensor(s0, dtype=torch.float).view(1, -1)
#***任务1**根据状态,选择最佳动作***#
a0=random.randrange(self.action_space_dim)
#***任务1**根据状态,选择最佳动作***#
return a0
def put(self, *transition):
if len(self.buffer) == self.capacity:
self.buffer.pop(0)
self.buffer.append(transition)
def learn(self):
if (len(self.buffer)) < self.batch_size:
return
samples = random.sample(self.buffer, self.batch_size)
s0, a0, r1, s1 = zip(*samples)
s0 = torch.tensor(s0, dtype=torch.float)
a0 = torch.tensor(a0, dtype=torch.long).view(self.batch_size, -1)
r1 = torch.tensor(r1, dtype=torch.float).view(self.batch_size, -1)
s1 = torch.tensor(s1, dtype=torch.float)
loss_fn = nn.MSELoss()
#****任务2定义损失****#
loss=loss_fn()
#****任务2定义损失****#
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
6-2-3 Q 函数模型的应用
import gym
import math
import random
import torch
from Q_net import Net
def test():
eval_net=torch.load('/data/workspace/myshixun/RL_Q_learning_proj/src/qdn.pt')
env = gym.make('CartPole-v0')
s0 = env.reset()
total_reward = 0
while True:
# env.render()
s0 = torch.tensor(s0, dtype=torch.float).view(1, -1)
#***任务1*****选择动作***#
a0=random.randrange(env.action_space.n)
#***任务1*****选择动作***#
s0, r1, done, _ = env.step(a0)
if done:
break
total_reward += r1
env.close()
return(total_reward)
7 未分配目录
7-1 深层模型构建-yaml2net
# https://jiuaidu.com/jianzhan/666463/
# 定义卷积块:我们可以先定义一个卷积块CBL,C指卷积Conv,B指BN层,L为激活函数,这里我用ReLu.
import torch.nn as nn
from pathlib import Path
import yaml
import torch
from copy import deepcopy
class BaseConv(nn.Module):
def __init__(self, in_channels, out_channels, k=1, s=1, p=None):
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.conv = nn.Conv2d(in_channels, out_channels, k, s, autopad(k, p))
self.bn = nn.BatchNorm2d(out_channels)
self.act_fn = nn.ReLU(inplace=True)
def forward(self, x):
return self.act_fn(self.bn(self.conv(x))) #利用上面定义的计算单元,构建前向计算卷积+批量正则化+激活
# 卷积中的autopad是自动补充pad,k可设为整数,也可设为列表形式[5,7]表示核的宽高不一样,代码如下:
def autopad(k, p=None):
if p is None:
p = k // 2 if isinstance(k, int) else [x // 2 for x in k]
return p
# 定义一个Bottleneck
class Bottleneck(nn.Module):
def __init__(self, in_channels, out_channels, shortcut=True):
super(Bottleneck, self).__init__()
self.conv1 = BaseConv(in_channels, out_channels, k=1, s=1)
self.conv2 = BaseConv(out_channels, out_channels, k=3, s=1)
self.add = shortcut and in_channels == out_channels
def forward(self, x):
"""
x-->conv1-->conv2-->add
|_________________|
"""
return x + self.conv2(self.conv1(x)) if self.add else self.conv2(self.conv1(x))
# Model类,也就是定义自己的网络。可以看到与前面读取yaml文件相比,
# 多了一行 ch = self.yaml["ch"] = self.yaml["ch"] = 3
# 这个是在原yaml内容中加入一个key和valuse,3指的3通道,因为我们的图像是3通道。parse_model是下面要说的传参过程。
class Model(nn.Module):
def __init__(self, cfg='yaml2net.yaml', ch=3 ):
super().__init__()
self.yaml = cfg
import yaml
yaml_file = Path(cfg).name
with open(yaml_file, errors='ignore')as f:
self.yaml = yaml.safe_load(f)
self.backbone = parse_model(deepcopy(self.yaml), ch=[ch])
def forward(self, x):
output = self.backbone(x)
return output
# 传入参数: 这一步也是最关键的一步,我们需要定义传参的函数,
# 将yaml中的卷积参数传入我们定义的网络中,这里会用的一个非常非常重要的函数eval()
def parse_model(yaml_cfg, ch):
"""
:param yaml_cfg: yaml file
:param ch: init in_channels default is 3
:return: model
"""
layer, out_channels = [], ch[-1] # 这行代码是通过列表用来存放每层内容以及输出通道数
for i, (f, number, Module_name, args) in enumerate(yaml_cfg['backbone']):
"""
f:上一层输出通道
number:该模块有几层,就是该模块要重复几次
Mdule_name:卷积层名字
args:参数,包含输出通道数,k,s,p等
"""
# 通过eval,将str类型转自己定义的BaseConv
m = eval(Module_name) if isinstance(Module_name, str) else Module_name
for j, a in enumerate(args):
# 通过eval,将str转int,获得输出通道数
args[j] = eval(a) if isinstance(a, str) else a
# 更新通道
# args[0]是输出通道
if m in [BaseConv, Bottleneck]:
in_channels, out_channels = ch[f], args[0]
args = [in_channels, out_channels, *args[1:]] # args=[in_channels, out_channels, k, s, p]
# 将参数传入模型
model_ = nn.Sequential(*[m(*args) for _ in range(number)]) if number > 1 else m(*args)
# 更新通道列表,每次获取输出通道
ch.append(out_channels)
layer.append(model_)
return nn.Sequential(*layer) #利用生成的layer链表返回计算图,
if __name__ == '__main__':
import os # 导入os模块
os.chdir('yaml2net/src/')
#
# 获得yaml文件名字
yaml_file = Path('yaml2net.yaml').name
with open(yaml_file, errors='ignore') as f:
yaml_ = yaml.safe_load(f)
print(yaml_)
#
# 输出:
#
# {'backbone': [[-1, 1, 'BaseConv', [32, 3, 1]], [-1, 1, 'BaseConv', [64, 1, 1]], [-1, 2, 'Bottleneck', [64]]]}
cfg = 'yaml2net.yaml'
model = Model(cfg,3)
print(model)
# for name,param in model.named_parameters():
# print('name=',name)#,'param=',param)
# model.eval()
# print(model)
# x = torch.ones(1, 3, 512, 512)
# output = model(x)
# torch.save(model, "model.pth")
#
# model = torch.load('model.pth')
# model.eval()
# x = torch.ones(1,3,512,512)
# input_name = ['input']
# output_name = ['output']
# torch.onnx.export(model, x, 'myonnx.onnx', verbose=True)
7-2 人脸比对+迁移学习
7-2-1 访问模型参数,修改模型结构
import torch
import torch.nn as nn
import numpy as np
class MnistLinearNet(nn.Module):
def __init__(self):
super(MnistLinearNet, self).__init__()
self.conv1 =nn.Conv2d(1,1,kernel_size=3)
self.flat = nn.Flatten()
self.fc1 = nn.Linear(9, 5)
self.fc2 = nn.Linear(5, 5)
self.net = nn.Sequential(self.conv1,nn.ReLU(),self.flat,self.fc1,nn.ReLU(),self.fc2)
#如果模型这样定义,参数名会有哪些变化?
# self.net = nn.Sequential( nn.Conv2d(1,1,kernel_size=3) ,
# nn.ReLU(),
# nn.Flatten() ,
# nn.Linear(9, 5) ,
# nn.Linear(5, 5))
def forward(self, x):
''' Forward pass through the network, returns the output logits '''
x=self.net(x)
return x
#按顺序返回模型中的参数名称列表
#0 输入为nn.Module对象,返回['conv1.weight', 'conv1.bias', ....]形式的参数名称列表
def get_param_names(model):
return [name for name, _ in model.named_parameters()]
#1 返回模型中卷积对应的bias参数值,要求返回类型为Tensor
def get_param_value(model_path):
model = torch.load(model_path)
return model.net[0].bias.data
#2 根据参数名,查找参数对应的Tensor对象
def get_param_by_name(model,param_name):
return model.state_dict()[param_name]
#3 加载模型,并修改最后一层为输出为2的全连接层nn.Linear(5, 2),返回model
def modify_model(model_path):
model = torch.load(model_path)
model.net[5] = nn.Linear(5, 2)
return model
#4 加载模型参数,模型相同
def load_model_param(model,param_path):
model.load_state_dict(torch.load(param_path))
return model
#5 加载模型部分参数,两个模型的fc2结构不同,因此不能加载fc2的参数
def load_model2_param(model,param_path):
pretrain_dict = torch.load(param_path)
# filter out fc2 keys
model_dict2 = {k: v for k, v in pretrain_dict.items() if k in pretrain_dict and 'fc2' not in k }
model.load_state_dict(model_dict2, strict=False)
return model
if __name__ == '__main__':
model = MnistLinearNet()
x = torch.tensor(np.arange(0,25).astype('float32').reshape(5,5)).unsqueeze(0) .unsqueeze(0)
print(get_param_names(model))
print(get_param_value('transfer_learning/src/model.pth'))
model = torch.load('transfer_learning/src/model.pth')
print(get_param_by_name(model,'conv1.bias'))
print('model(x):',model(x).data)
model2 = modify_model('transfer_learning/src/model.pth')
model2.net[5].weight.data = torch.tensor(np.arange(0,10).astype('float32').reshape(2,5))
model2.net[5].bias.data = torch.tensor(np.arange(0, 2).astype('float32'))
print('model2(x):',model2(x).data)
model = load_model_param(model ,'transfer_learning/src/model_dict.pth')
print(model.fc2.bias)
model = MnistLinearNet()
model.fc2 = nn.Linear(10, 2)
model = load_model2_param(model,'transfer_learning/src/model_dict.pth')
print(model.conv1.weight)
7-2-2 利用Dataset接口创建样本采样生成器
from torch.utils.data import Dataset ,DataLoader
import random
import numpy as np
import zipfile
from PIL import Image
from io import BytesIO
import re
class SiameseNetworkDataset(Dataset):
def __init__(self, imgzipfile):
self.imgzip = zipfile.ZipFile(imgzipfile)
self.namelist = self.imgzip.namelist()
self.persons = [x for x in self.namelist if re.match(r'.*/training/.*/$', x)]
self.bmps = [x for x in self.namelist if re.match(r'.*/training/.*\.bmp$',x)]
self.person_dict = {}
for k in self.persons:
self.person_dict[k] = [v for v in self.namelist if re.match(k + '.*\.bmp$', v)]
def __getitem__(self, index):
label = random.randint(0, 1)
if label: #选同一个的两个样本
person = random.choice(self.persons)
img0,img1 = random.sample(self.person_dict[person], 2)
else:
persons = random.sample(self.persons, 2)
img0 = random.choice(self.person_dict[persons[0]])
img1 = random.choice(self.person_dict[persons[1]])
img0 = np.array(Image.open(BytesIO(self.imgzip.read(img0)))).astype('float32')[None, :]
img1 = np.array(Image.open(BytesIO(self.imgzip.read(img1)))).astype('float32')[None, :]
return img0,img1,label
def __len__(self):
return len(self.bmps)
if __name__ == '__main__':
siamese_dataset = SiameseNetworkDataset('transfer_learning/faces.zip')
train_dataloader = DataLoader(siamese_dataset,
shuffle=True,
num_workers=0,
batch_size=8)
for idx,data in enumerate(train_dataloader):
img0,img1,label = data
print(img0.shape,label.sum()<8)
break
7-2-3 自定义网络,利用孪生网络实现人脸比对
注意:由于系统编译时间不固定,需要多评测几次才能小于20s,代码没有问题
import torch.nn as nn
import torch
import torch.nn.functional as F
import torch.optim as optim
from dataloader import SiameseNetworkDataset
class SiameseNetwork(nn.Module):
def __init__(self):
super(SiameseNetwork, self).__init__()
self.net = nn.Sequential(
nn.Conv2d(1, 24, 3),
nn.ReLU(),
nn.MaxPool2d(3),
nn.Conv2d(24, 32, 3),
nn.ReLU(),
nn.MaxPool2d(3),
nn.Flatten(),
nn.Linear(3168, 1024),
nn.ReLU(inplace=True),
nn.Linear(1024, 128)
)
def forward(self, input1, input2):
output1 = self.net(input1)
output2 = self.net(input2)
return output1, output2
class ContrastiveLoss(torch.nn.Module):
def __init__(self, margin=1.0):
super(ContrastiveLoss, self).__init__()
self.margin = margin
def forward(self, output1, output2, label):
d = F.pairwise_distance(output1, output2)
loss = torch.mean((1-label)*torch.pow(d, 2)+(label)*torch.pow(torch.clamp(self.margin - d, min=0.0), 2))
return loss
def train(train_dataloader):
net = SiameseNetwork()
criterion = ContrastiveLoss()
optimizer = optim.Adam(net.parameters(), lr=0.0005)
for epoch in range(2):
for i, data in enumerate(train_dataloader, 0):
img0, img1, label = data
output1, output2 = net(img0, img1)
optimizer.zero_grad()
loss_contrastive = criterion(output1, output2, label)
loss_contrastive.backward()
optimizer.step()
if i % 10 == 0:
print("Epoch:{}, Current loss {}\n".format(epoch, loss_contrastive.item()))
if __name__ == '__main__':
siamese_dataset = SiameseNetworkDataset('data/faces.zip')
train_dataloader = DataLoader(siamese_dataset,
shuffle=True,
num_workers=0,
batch_size=20)
for idx,data in enumerate(train_dataloader):
img0,img1,label = data
print(img0.shape,label)
break
train(train_dataloader)
7-3 图像语义分割U-Net
import cv2
import numpy as np
#glob用于获取目录或文件
from glob import glob
import random
import torch
import torch.nn.functional as F
import torch.nn as nn
from torch import optim
from torch.utils.data import Dataset ,DataLoader
class ImgSegDataSet(Dataset):
def __init__(self, data_path):
self.data_path = data_path
self.imgs_path = glob(data_path+'*.png')
#对图像进行翻转
def augment(self, image, flipcode):
flip = cv2.flip(image, flipcode)
return flip
def __getitem__(self, index):
#读取图片和标签
image_path = self.imgs_path[index]
label_path = image_path.replace('image', 'label')
#cv2读取的图像类型为numpy.ndarray类型
image = cv2.imread(image_path,cv2.IMREAD_GRAYSCALE)
label = cv2.imread(label_path,cv2.IMREAD_GRAYSCALE)
#对图片进行预处理preprocess
if label.max() > 1:
label = label / 255
#图像增强, 垂直翻转0,水平翻转1,水平和垂直翻转-1
flipcode = random.choice([-1, 0, 1, 2])
if flipcode != 2:
image = self.augment(image, flipcode)
label = self.augment(label, flipcode)
return image, label
def __len__(self):
return len(self.imgs_path)
class DoubleConv(nn.Module):
"""(convolution => [BN] => ReLU) * 2"""
def __init__(self, in_channels, out_channels):
super().__init__()
self.double_conv = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True)
)
def forward(self, x):
return self.double_conv(x)
class Down(nn.Module):
"""Downscaling with maxpool then double conv"""
def __init__(self, in_channels, out_channels):
super().__init__()
self.maxpool_conv = nn.Sequential(
nn.MaxPool2d(2),
DoubleConv(in_channels, out_channels)
)
def forward(self, x):
return self.maxpool_conv(x)
class Up(nn.Module):
"""Upscaling then double conv"""
def __init__(self, in_channels, out_channels, bilinear=True):
super().__init__()
# if bilinear, use the normal convolutions to reduce the number of channels
if bilinear:
self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
else:
self.up = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)
self.conv = DoubleConv(in_channels, out_channels)
def forward(self, x1, x2):
x1 = self.up(x1)
#input is CHW
diffY = torch.tensor([x2.size()[2] - x1.size()[2]])
diffX = torch.tensor([x2.size()[3] - x1.size()[3]])
x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
diffY // 2, diffY - diffY // 2])
x = torch.cat([x2, x1], dim=1)
return self.conv(x)
class OutConv(nn.Module):
def __init__(self, in_channels, out_channels):
super(OutConv, self).__init__()
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)
def forward(self, x):
return self.conv(x)
class UNet(nn.Module):
def __init__(self, n_channels, n_classes, bilinear=True):
super(UNet, self).__init__()
self.n_channels = n_channels
self.n_classes = n_classes
self.bilinear = bilinear
self.inc = DoubleConv(n_channels, 64)
self.down1 = Down(64, 128)
self.down2 = Down(128, 256)
self.down3 = Down(256, 512)
self.down4 = Down(512, 1024)
self.up1 = Up(1024, 512, bilinear)
self.up2 = Up(512, 256, bilinear)
self.up3 = Up(256, 128, bilinear)
self.up4 = Up(128, 64, bilinear)
self.outc = OutConv(64, n_classes)
def forward(self, x):
# ___________________#补充代码
x1 = self.inc(x)
x2 = self.down1(x1)
x3 = self.down2(x2)
x4 = self.down3(x3)
x5 = self.down4(x4)
x = self.up1(x5, x4)
x = self.up2(x, x3)
x = self.up3(x, x2)
x = self.up4(x, x1)
logits = self.outc(x)
return logits
#原医学图像数据集
def Train_Unet(unet,device,data_path,model_path,batch_size,epochs):
#加载数据集
train_dataset = ImgSegDataSet(data_path)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
#定义优化算法
opt = optim.Adam(unet.parameters())
#定义损失函数
loss_fun = nn.BCEWithLogitsLoss()
unet.train()
for epoch in range(epochs):
for image, label in train_loader:
opt.zero_grad()
image = image.to(device=device, dtype=torch.float32).unsqueeze(1)
label = label.to(device=device, dtype=torch.float32).unsqueeze(1)
pred=unet(image)
loss = loss_fun(pred, label)
loss.backward()
opt.step()
print('epoch: {:d}, loss: {:f}'.format(epoch, loss.item()))
break
if __name__ == '__main__':
data_path = "project/data/train_image/"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
unet = UNet(n_channels= 1, n_classes= 1, bilinear=False)
unet.to(device=device)
# Train_Unet(unet, device, data_path,model_path='unet.pth', epochs=1, batch_size=1)