1 Python基础练习

1-1 创建矩阵

import numpy as np
import struct

def createMatrixList(len, rows, cols):
    list = []
    for i in range(len):
        matrix = []
        for j in range(rows):
            row_list = []
            for k in range(cols):
                row_list.append((i + 1) * 10 + j * cols + k + 1)
            matrix.append(row_list)
        list.append(matrix)
    result = np.array(list,dtype='ubyte')
    return result

1-2 矩阵写入idx文件

import numpy as np
import struct

def writeMatrixList(filename,matrix_list):
    with open(filename, 'wb') as f:
        shapes = matrix_list.shape

        file_head_fmt = '>HBB'
        file_head = struct.pack(file_head_fmt, 0, 8, len(shapes))
        f.write(file_head)

        file_head_fmt = '>I'
        for i in shapes:
            file_head = struct.pack(file_head_fmt, i)
            f.write(file_head)
        f.write(matrix_list)

1-3 从IDX文件中恢复矩阵

import numpy as np
import struct

def readMatrixFromFile(filename):
     with open(filename,'rb') as f:
         data_buf=f.read()
         off_set=0
         file_head_fmt='>HBB'
         _,_,dimslen=struct.unpack_from(file_head_fmt,data_buf,off_set)
         off_set+=struct.calcsize(file_head_fmt)

         file_head_fmt='>{}I'.format(dimslen)
         shapes=struct.unpack_from(file_head_fmt,data_buf,off_set)
         off_set+=struct.calcsize(file_head_fmt)

         data_fmt='>'+str(np.prod(shapes))+'B'
         matrix_list=struct.unpack_from(data_fmt,data_buf,off_set)
         matrix_list=np.reshape(matrix_list,shapes)
     return matrix_list

1-4 创建批量样本生成器

import numpy as np
import struct

def trainBatchReader(idx_filename, batch_size, drop_last):
    def batch_reader():
        with open(idx_filename,'rb') as f:
            data_buf=f.read()
            off_set=0
            file_head_fmt='>HBB'
            _,_,dimslen=struct.unpack_from(file_head_fmt,data_buf,off_set)
            off_set+=struct.calcsize(file_head_fmt)

            file_head_fmt='>{}I'.format(dimslen)
            shapes=struct.unpack_from(file_head_fmt,data_buf,off_set)
            off_set+=struct.calcsize(file_head_fmt)

            data_fmt='>'+str(np.prod(shapes))+'B'
            matrix_list=struct.unpack_from(data_fmt,data_buf,off_set)
            matrix_list=np.reshape(matrix_list,shapes)
        b=[]
        for instance in matrix_list:
            b.append(instance)
            if len(b)==batch_size:
                yield b
                b=[]
        if drop_last is False and len(b) !=0:
            yield b
    return batch_reader()

2 基于pytorch的线性回归

2-1 测试环境

import random
import torch
import numpy as np
def synthetic_data(w, b, num_examples):
    """⽣成y=Xw+b+噪声"""
    X = torch.normal(0, 1, (num_examples, len(w)))
    y = torch.matmul(X, w) + b
    y += torch.normal(0, 0.01, y.shape)
    return X, y.reshape((-1, 1))
true_w = torch.tensor([2, -3.4])
true_b = 4.2
features, labels = synthetic_data(true_w, true_b, 1000)
# print('features:', features[0],'\nlabel:', labels[0])
def data_iter(batch_size, features, labels):
    num_examples = len(features)
    indices = list(range(num_examples))
    # 这些样本是随机读取的,没有特定的顺序
    random.shuffle(indices)
    for i in range(0, num_examples, batch_size):
        batch_indices = torch.tensor(
            indices[i: min(i + batch_size, num_examples)])
        yield features[batch_indices], labels[batch_indices]
batch_size = 10
# for X, y in data_iter(batch_size, features, labels):
#     print(X, '\n', y)
#     break
w = torch.normal(0, 0.01, size=(2,1), requires_grad=True)
b = torch.zeros(1, requires_grad=True)
def linreg(X, w, b):
    """线性回归模型"""
    return torch.matmul(X, w) + b
def squared_loss(y_predict, y):
    """均⽅损失"""
    return (y_predict - y.reshape(y_predict.shape)) ** 2 / 2
def sgd(params, lr, batch_size):
    """⼩批量随机梯度下降"""
    with torch.no_grad():
        for param in params:
            param -= lr * param.grad / batch_size
            param.grad.zero_()
lr = 0.03
num_epochs = 3
net = linreg
for epoch in range(num_epochs):
    for X, y in data_iter(batch_size, features, labels):
        loss = squared_loss(net(X, w, b), y) # X和y的⼩批量损失
        # 因为loss形状是(batch_size,1),⽽不是⼀个标量。loss中的所有元素被加到⼀起,
        # 并以此计算关于[w,b]的梯度
        loss.sum().backward()
        sgd([w, b], lr, batch_size) # 使⽤参数的梯度更新参数
    with torch.no_grad():
        train_l = squared_loss(net(features, w, b), labels)
        # print(f'epoch {epoch + 1}, loss {float(train_l.mean()):f}')
# if sum(true_w - w.reshape(true_w.shape)) <1.0:
#     print(True)
# if true_b - b<1.0:
#     print(True)

3 逻辑回归

3-1 二分类问题-pytorch逻辑回归

import torch
import torch.nn as nn
import numpy as np

#我们这里使用的 german.data-numeric是numpy处理好数值化数据,我们直接使用numpy的load方法读取即可
data=np.loadtxt("/data/workspace/myshixun/logistic_regression/data/german.data-numeric")

n,l=data.shape
#显示行列数,行数据表示样本数量,列数-1表示特征向量长度,最后一列是类别标注

print(n,l)
#由于每列数据表示一个特征量,这些特征的量纲不同,需要按列进行归一化
#最后一列是类型标识,不用归一化
for j in range(l-1):
    #任务1:按列求均匀
    meanVal=np.mean(data[:,j])
    #任务2:按列求标准差
    stdVal=np.std(data[:,j])
    data[:,j]=(data[:,j]-meanVal)/stdVal

#打乱数据
np.random.shuffle(data)

#取前900行用于训练,后100行用于测试
train_data=data[:900,:l-1]
train_lab=data[:900,l-1]-1
test_data=data[900:,:l-1]
test_lab=data[900:,l-1]-1


class LR(nn.Module):
    def __init__(self):
        super(LR, self).__init__()
        #任务3:定义前向计算模型,采用线程回归,特征向量为24维,分类目标为2类
        self.fc = nn.Linear(24,2)

    def forward(self, x):
        out = self.fc(x)
        #任务4:输出标准逻辑回归值
        out = torch.sigmoid(out)
        return out

def test(pred,lab):
    t=pred.max(-1)[1]==lab
    return torch.mean(t.float())

#创建回归模型
net=LR()
#设定损失函数
criterion=nn.CrossEntropyLoss() # 使用CrossEntropyLoss损失
#设定优化的数和优势方法
optm=torch.optim.Adam(net.parameters()) # Adam优化
epochs=1000 # 训练1000次

4 基于mnist数据集的手写数字识别

4-1 创建训练样本批量生成器

########1.加载数据
import struct,random,numpy as np
code2type = {0x08: 'B', 0x09: 'b', 0x0B: 'h', 0x0c: 'i', 0x0D: 'f', 0x0E: 'd'}
def readMatrix(filename):
    with open(filename,'rb') as f:
        buff = f.read()
        offset = 0
        fmt = '>HBB'#格式定义,>表示高位在前,I表示4字节整数
        _,dcode,dimslen = struct.unpack_from(fmt,buff,offset)
        offset += struct.calcsize(fmt)

        fmt = '>{}I'.format(dimslen)
        shapes = struct.unpack_from(fmt,buff,offset)
        offset += struct.calcsize(fmt)

        fmt = '>'+ str(np.prod(shapes)) + code2type[dcode]
        matrix = struct.unpack_from(fmt,buff,offset)
        matrix = np.reshape(matrix,shapes).astype(code2type[dcode])

    return matrix

def dataReader(imgfile, labelfile, batch_size, drop_last):
	images = readMatrix(imgfile)
	labels = readMatrix(labelfile)
	buff = list(zip(images, labels))
	batchnum = int(len(images)/batch_size)
	random.shuffle(buff)
	def batch_reader():
		for i in range(batchnum):
			yield buff[(i) * batch_size:(i + 1) * batch_size]
		if drop_last and len(images) % batch_size != 0:
			yield buff[batchnum * batch_size:]
	return batch_reader
    pass

4-2 创建卷积网络

import numpy as np
import torch
import torch.nn as nn,torch.nn.functional as F,torch.optim as optim
from loader import dataReader


#########2.定义卷积神经网络
class MnistNet(nn.Module):
    def __init__(self):
        super(MnistNet, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1,out_channels=6,kernel_size=5,stride=1,padding=2)
        self.pool = nn.MaxPool2d(kernel_size=2,stride=2)
        self.conv2 = nn.Conv2d(6,16,5)
        self.fc1 = nn.Linear(16*5*5,120)
        self.fc2 = nn.Linear(120,84)
        self.fc3 = nn.Linear(84,10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1,16*5*5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# 3.训练网络
def train(loader):
    model = MnistNet()
    ######## 让我们使用分类交叉熵损失和带有动量的 SGD。
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    for epoch in range(1):
        running_loss = 0.0
        for i, data in enumerate(loader()):
            inputs, labels = zip(*data)
            inputs = np.array(inputs).astype('float32')
            labels = np.array(labels).astype('int64')
            inputs = torch.from_numpy(inputs).unsqueeze(1)#扩展通道维度 NCHW
            labels = torch.from_numpy(labels)
            # zero the parameter gradients
            optimizer.zero_grad()
            # forward + backward + optimize
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            # print statistics
            running_loss += loss.item()
            if i % 100 == 99:
                last_loss = running_loss / 100  # loss per batch
                print('  batch {} loss: {}'.format(i + 1, last_loss))
                running_loss = 0.
            if i==199:
                break
    print('Finished Training')
    return model

# 4.测试网络
def test(PATH,loader):
    # 让我们重新加载保存的模型
    model = MnistNet()
    model.load_state_dict(torch.load(PATH))
    correct = 0
    total = 0
    with torch.no_grad():
        for data in loader():
            images, labels = zip(*data)
            images = np.array(images).astype('float32')
            labels = np.array(labels).astype('int64')
            images = torch.from_numpy(images).unsqueeze(1)
            labels = torch.from_numpy(labels)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)#torch.argmax
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print('Accuracy of the network on the {:d} test images: {:f}%'.format(total,100 * correct / total))
    return model

if __name__ == '__main__':
    BATCH_SIZE = 16
    PATH = '/data/workspace/myshixun/mnist/model/mnist_model.pth'
    train_loader = dataReader('/data/workspace/myshixun/mnist/data/train-images-idx3-ubyte', '/data/workspace/myshixun/mnist/data/train-labels-idx1-ubyte', BATCH_SIZE, True)
    test_loader = dataReader('/data/workspace/myshixun/mnist/data/t10k-images-idx3-ubyte', '/data/workspace/myshixun/mnist/data/t10k-labels-idx1-ubyte', BATCH_SIZE, False)
    model = train(train_loader)
    #快速保存我们训练过的模型:
    torch.save(model.state_dict(), PATH)
    test(PATH,test_loader)

5 循环神经网络

5-1 从zip文件中读取文本文件,并创建字典

import zipfile  # https://docs.python.org/zh-cn/2/library/zipfile.html
# import tarfile #https://docs.python.org/zh-cn/3/library/tarfile.html
import re  # https://docs.python.org/zh-cn/3/library/re.html
import collections #https://docs.python.org/zh-cn/3/library/collections.html
import json #https://docs.python.org/3/library/json.html
import numpy as np

# 1.读取zip文件中的目录及文件列表
def getFileListFromZip(zippath):
    myZip = zipfile.ZipFile(zippath)
    fileList = myZip.namelist()
    #print(fileList)
    return fileList

# 2.从zip文件中读取指定的文本文件,并用'utf-8'解码后返回该为字符串
def getTextFromZip(zippath, filepath):
    myZip = zipfile.ZipFile(zippath)
    textstr = myZip.read(filepath).decode('utf-8')
    return textstr

# 3.判断路径是否以.txt结尾的文本文件名
def isTxtFile(filepath):
    return bool(re.match(r'.*\.txt$', filepath))

# 4.将字节串按规定转为单词列表
def str2words(txtstr):
    wordlist = re.sub(r'[\r\n]', ' ', txtstr)
    wordlist = re.sub(r'(<.*>)|([^A-Za-z ])','', wordlist)
    wordlist = re.sub(r'\s+', ' ', wordlist)
    wordlist = wordlist.strip()
    wordlist = wordlist.lower()
    wordlist = wordlist.split(' ')
    return wordlist

# 5.创建字典
def buildDict(zippath, cutoff=50):
    myzip = zipfile.ZipFile(zippath)
    filelist = myzip.namelist()
    wordDict = collections.defaultdict(int)
    for filepath in filelist:
        if not isTxtFile(filepath):
            continue
        txtstr = myzip.read(filepath).decode('utf-8')
        wordlist = str2words(txtstr)
        for word in wordlist:
            wordDict[word] += 1
    wordDict = [x for x in wordDict.items() if x[1] > cutoff]
    dictionary = sorted(wordDict, key=lambda x: (-x[1],x[0]))
    words, _ = list(zip(*dictionary))
    word_idx = dict(list(zip(words,range(len(words)))))
    word_idx['<unk>'] = len(words)
    return word_idx

# 6.将文本段根据字典转为编号序列
def txt2digseq(txtstr, word_dict):
    words = str2words(txtstr)
    UNK = word_dict['<unk>']
    tagseq = list(np.int64([word_dict.get(word, UNK) for word in words]))
    return tagseq

# 将字典dic存储为json格式的文件
def writeDict(dict, filename='/data/workspace/myshixun/text_proj/data/word_dict.json'):
    with open(filename, 'wb') as f:
        # 给出indent参数后,字典自动添加换行和缩进
        f.write(json.dumps(dict, ensure_ascii=False, indent=4).encode('utf8'))

# 从json格式文件中恢复字典dic
def readDict(filename='/data/workspace/myshixun/text_proj/data/word_dict.json'):
    with open(filename, 'r', encoding='utf-8') as f:
        dict = json.load(f)
    return dict

5-2 创建面向序列的数据生成器

from build_dict import readDict,txt2digseq
import torch
from torch.utils.data import Dataset,DataLoader
import re
import zipfile

class TxtDataset(Dataset):
    def __init__(self,zip_path,word_dict):
        self.word_dict = word_dict
        self.txtzip = zipfile.ZipFile(zip_path)
        self.filelist = []
        for filepath in self.txtzip.namelist():
            if bool(re.match(r'.*\.txt$', filepath)):
                if bool(re.match(r'^minitrain/pos',filepath)):
                    self.filelist.append([filepath,1])
                else:
                    self.filelist.append([filepath,0])
        self.len = len(self.filelist)

    def __getitem__(self, index):
        txtstr = self.txtzip.read(self.filelist[index][0]).decode('utf-8')
        return self.filelist[index][1], txt2digseq(txtstr,self.word_dict)

    def __len__(self):
        return self.len

def collate_batch(batch):
    label_list, text_list, offsets = [], [], [0]
    for(_label, _text) in batch:
        label_list.append(_label)
        processed_text = torch.tensor(_text, dtype=torch.int64)
        text_list.append(processed_text)
        offsets.append(processed_text.size(0))
    label_list = torch.tensor(label_list, dtype=torch.int64)
    offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)
    text_list = torch.cat(text_list)

    return label_list, text_list, offsets

5-3 创建文件分类器

import torch
from create_loader import readDict,txt2digseq
from torch.utils.data import Dataset,DataLoader

class TextClassifyModel(torch.nn.Module):
    def __init__(self, vocab_size, embed_dim, num_class):
        super(TextClassifyModel, self).__init__()
        self.embedding =torch.nn.EmbeddingBag(vocab_size, embed_dim, sparse=True)                     #补充代码
        self.fc = torch.nn.Linear(embed_dim, num_class)                                 #补充代码
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, text, offsets):
        embedded = self.embedding(text, offsets)    #补充代码
        return self.fc(embedded)

# Define functions to train the model
class ModelRun():
    def __init__(self,vocab_size,emsize,num_class,lr,epochs):
        self.criterion = torch.nn.CrossEntropyLoss()
        self.model = TextClassifyModel(vocab_size, emsize, num_class)
        self.optimizer = torch.optim.SGD(self.model.parameters(), lr=lr)
        self.epochs = epochs
          #补充代码

    def train(self, dataloader):
        self.model.train()  #补充代码
        for epoch in range(1, self.epochs + 1):
            total_acc, total_count = 0, 0
            log_interval = 10

            for idx, (label, text, offsets) in enumerate(dataloader):
                self.optimizer.zero_grad()
                predicted_label = self.model(text, offsets)
                loss = self.criterion(predicted_label, label)
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), 0.1)
                self.optimizer.step()

                total_acc += (predicted_label.argmax(1) == label).sum().item()
                total_count += label.size(0)
                if idx % log_interval == 0 and idx > 0:
                    #print('| epoch {:3d} | {:5d}/{:5d} batches '
                          #'| accracy {:8.3f}'.format(epoch, idx, len(dataloader), total_acc / total_count))
                    total_acc, total_count = 0, 0
        return self.model

def predict(text,word_dict, model):
    model.eval()  #补充代码
    text = torch.tensor(txt2digseq(text, word_dict))
    output = model(text, torch.tensor([0]))
    return output.argmax(1).item()

6 强化学习

6-1 强化学习-游戏杆操作策略学习–基于策略估计

6-1-1 定义策略网络

import torch
import torch.nn as nn
import numpy as np
import gym
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

class PolicyNet(nn.Module):
    def __init__(self, n_features, n_actions):
        super(PolicyNet, self).__init__()
        #********1定义计算单元*********#

        self.fc1 = nn.Sequential(
            nn.Linear(n_features, 10),
            nn.Tanh()
            )
        self.fc2 = nn.Sequential(
              nn.Linear(10, n_actions),
              nn.Softmax(dim = 1)
        )

        #********1定义计算单元*********#

    def forward(self, x):
        #********2定义计算结构*********#
        x = self.fc1(x)
        x = self.fc2(x)
        return x
        #********2定义计算结构*********#

6-1-2 定义损失函数

import torch
import torch.nn as nn
import numpy as np
import gym
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

from policy_net import PolicyNet

class Agent():
    def __init__(
            self,
            n_actions,
            n_features,
            learning_rate=0.01,
            reward_decay=0.95
    ):
        self.n_actions = n_actions
        self.n_features = n_features
        self.lr = learning_rate
        self.gamma = reward_decay

        self.obs = []
        self.acs = []
        self.rws = []

        self.net = PolicyNet(n_features,n_actions )
        self.loss = nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=learning_rate)

    def choose_action(self, observation):
        self.net.eval()
        actions = self.net(torch.Tensor(observation[np.newaxis, :]))
        action = np.random.choice(range(actions.shape[1]), p=actions.view(-1).detach().numpy())
        return action

    def store_transition(self, s, a, r):
        self.obs.append(s)
        self.acs.append(a)
        self.rws.append(r)

    def learn(self):
        self.net.train()
        discount = self._discount_and_norm_rewards()
        output = self.net(torch.Tensor(self.obs))
        one_hot = torch.zeros(len(self.acs), self.n_actions).\
            scatter_(1, torch.LongTensor(self.acs).view(-1,1), 1)
        neg = torch.sum(-torch.log(output) * one_hot, 1)
        #**********定义损失函数*****#
        loss = neg * torch.Tensor(discount)
        loss = loss.mean()
        #**********定义损失函数*****#
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.acs = []
        self.obs = []
        self.rws = []
        return discount

    def _discount_and_norm_rewards(self):
        discount = np.zeros_like(self.rws)
        tmp = 0
        for i in reversed(range(len(self.rws))):
            tmp = tmp * self.gamma + self.rws[i]
            discount[i] = tmp
        discount -= np.mean(discount)
        discount /= np.std(discount) +0.0001 #归一化
        return discount

def train():
    env = gym.make('CartPole-v0')
    env.seed(1)
    # env = env.unwrapped #解除步数限制

    # print(env.action_space)
    # print(env.observation_space)
    # print(env.observation_space.high)
    # print(env.observation_space.low)

    RL = Agent(
        n_actions=env.action_space.n,
        n_features=env.observation_space.shape[0],
        learning_rate=0.02,
        reward_decay=0.99
    )

    for i_episode in range(2):
        s0 = env.reset()
        while True:
            # env.render()
            a0 = RL.choose_action(s0)
            s1, r1, done, info = env.step(a0)

            if done:
                r1 = -1

            RL.store_transition(s0, a0, r1)
            if done:
                ep_rs_sum = sum(RL.rws)
                # print("episode:", i_episode, "  reward:", int(ep_rs_sum))
                print("episode:", i_episode)
                RL.learn()
                break
            s0 = s1
    torch.save(RL.net.state_dict(), '/data/workspace/myshixun/RL_Policy_proj/src/policy.pt')

6-1-3 策略模型的应用

import torch
import torch.nn as nn
import numpy as np
import gym
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
from policy_net import PolicyNet
from agent import Agent
class Test():
    def __init__(self,modelpath= '/data/workspace/myshixun/RL_Policy_proj/src/policy.pt'):
        self.modelpath = modelpath

    def select_action(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        #****根据策略模型选择动作***#
        probs = self.model(state)
        m = Categorical(probs)
        action = m.sample()
        return action.item()
        #****根据策略模型选择动作***#

    def eval(self):
        self.model = PolicyNet(4, 2)
        self.model.load_state_dict(torch.load(self.modelpath))
        self.model.eval()
        env = gym.make('CartPole-v1')
        t_all = []

        for i_episode in range(2):
            s0 = env.reset()
            for t in range(1000):
                # env.render()
                cp, cv, pa, pv = s0
                a0 = self.select_action(s0)
                s1, r1, done, info = env.step(a0)
                s0 = s1
                if done:
                    # print("Episode finished after {} timesteps".format(t + 1))
                    t_all.append(t)
                    break
        env.close()
        return sum(t_all) / len(t_all)

6-2 强化学习-游戏杆操作策略学习–基于值函数估计

6-2-1 Q_learning

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class Net(nn.Module):
    def __init__(self, input_size,  output_size):
        super().__init__()
        #****任务1******定义网络计算单元****#
        self.linear=nn.Linear(input_size,output_size)

        #****任务1******定义网络计算单元****#

    def forward(self, x):
         #****任务2******网络结构****#
        x=self.linear(x)

        #****任务2******网络结构****#
        return x

6-2-2 Q_learning定义Agent的损失

# coding: utf-8
__author__ = 'zhenhang.sun@gmail.com'
__version__ = '1.0.0'

import gym
import math
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from Q_net import Net

class Agent(object):
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)
        self.eval_net = Net(self.state_space_dim, self.action_space_dim)
        self.optimizer = optim.Adam(self.eval_net.parameters(), lr=self.lr)
        self.buffer = []
        self.steps = 0

    def act(self, s0):
        self.steps += 1
        epsi = self.epsi_low + (self.epsi_high - self.epsi_low) * (math.exp(-1.0 * self.steps / self.decay))
        if random.random() < epsi:
            a0 = random.randrange(self.action_space_dim)
        else:
            s0 = torch.tensor(s0, dtype=torch.float).view(1, -1)
            #***任务1**根据状态,选择最佳动作***#
            a0=random.randrange(self.action_space_dim)

            #***任务1**根据状态,选择最佳动作***#
        return a0

    def put(self, *transition):
        if len(self.buffer) == self.capacity:
            self.buffer.pop(0)
        self.buffer.append(transition)

    def learn(self):
        if (len(self.buffer)) < self.batch_size:
            return

        samples = random.sample(self.buffer, self.batch_size)
        s0, a0, r1, s1 = zip(*samples)
        s0 = torch.tensor(s0, dtype=torch.float)
        a0 = torch.tensor(a0, dtype=torch.long).view(self.batch_size, -1)
        r1 = torch.tensor(r1, dtype=torch.float).view(self.batch_size, -1)
        s1 = torch.tensor(s1, dtype=torch.float)

        loss_fn = nn.MSELoss()

        #****任务2定义损失****#
        loss=loss_fn()
         #****任务2定义损失****#

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

6-2-3 Q 函数模型的应用

import gym
import math
import random
import torch

from Q_net import Net

def test():
    eval_net=torch.load('/data/workspace/myshixun/RL_Q_learning_proj/src/qdn.pt')
    env = gym.make('CartPole-v0')
    s0 = env.reset()
    total_reward = 0
    while True:
        # env.render()
        s0 = torch.tensor(s0, dtype=torch.float).view(1, -1)
        #***任务1*****选择动作***#
        a0=random.randrange(env.action_space.n)
         #***任务1*****选择动作***#
        s0, r1, done, _ = env.step(a0)

        if done:
            break
        total_reward += r1
    env.close()
    return(total_reward)

7 未分配目录

7-1 深层模型构建-yaml2net

# https://jiuaidu.com/jianzhan/666463/
# 定义卷积块:我们可以先定义一个卷积块CBL,C指卷积Conv,B指BN层,L为激活函数,这里我用ReLu.
import torch.nn as nn
from pathlib import Path
import yaml
import torch
from copy import deepcopy

class BaseConv(nn.Module):
    def __init__(self, in_channels, out_channels, k=1, s=1, p=None):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.conv = nn.Conv2d(in_channels, out_channels, k, s, autopad(k, p))
        self.bn = nn.BatchNorm2d(out_channels)
        self.act_fn = nn.ReLU(inplace=True)

    def forward(self, x):
        return self.act_fn(self.bn(self.conv(x)))  #利用上面定义的计算单元,构建前向计算卷积+批量正则化+激活



# 卷积中的autopad是自动补充pad,k可设为整数,也可设为列表形式[5,7]表示核的宽高不一样,代码如下:
def autopad(k, p=None):
    if p is None:
        p = k // 2 if isinstance(k, int) else [x // 2 for x in k]
    return p


# 定义一个Bottleneck
class Bottleneck(nn.Module):
    def __init__(self, in_channels, out_channels, shortcut=True):
        super(Bottleneck, self).__init__()
        self.conv1 = BaseConv(in_channels, out_channels, k=1, s=1)
        self.conv2 = BaseConv(out_channels, out_channels, k=3, s=1)
        self.add = shortcut and in_channels == out_channels

    def forward(self, x):
        """
        x-->conv1-->conv2-->add

        |_________________|
        """
        return x + self.conv2(self.conv1(x)) if self.add else self.conv2(self.conv1(x))


# Model类,也就是定义自己的网络。可以看到与前面读取yaml文件相比,
# 多了一行    ch = self.yaml["ch"] = self.yaml["ch"] = 3
# 这个是在原yaml内容中加入一个key和valuse,3指的3通道,因为我们的图像是3通道。parse_model是下面要说的传参过程。

class Model(nn.Module):
    def __init__(self, cfg='yaml2net.yaml', ch=3 ):
        super().__init__()
        self.yaml = cfg

        import yaml
        yaml_file = Path(cfg).name
        with open(yaml_file, errors='ignore')as f:
            self.yaml = yaml.safe_load(f)
            self.backbone = parse_model(deepcopy(self.yaml), ch=[ch])

    def forward(self, x):
        output = self.backbone(x)
        return output

    # 传入参数: 这一步也是最关键的一步,我们需要定义传参的函数,
    # 将yaml中的卷积参数传入我们定义的网络中,这里会用的一个非常非常重要的函数eval()

def parse_model(yaml_cfg, ch):
    """
    :param yaml_cfg: yaml file
    :param ch: init in_channels default is 3
    :return: model
    """
    layer, out_channels = [], ch[-1]   # 这行代码是通过列表用来存放每层内容以及输出通道数
    for i, (f, number, Module_name, args) in enumerate(yaml_cfg['backbone']):
        """
        f:上一层输出通道
        number:该模块有几层,就是该模块要重复几次
        Mdule_name:卷积层名字
        args:参数,包含输出通道数,k,s,p等
        """
        # 通过eval,将str类型转自己定义的BaseConv
        m = eval(Module_name) if isinstance(Module_name, str) else Module_name
        for j, a in enumerate(args):
            # 通过eval,将str转int,获得输出通道数
            args[j] = eval(a) if isinstance(a, str) else a
        # 更新通道
        # args[0]是输出通道
        if m in [BaseConv, Bottleneck]:
            in_channels, out_channels = ch[f], args[0]
            args = [in_channels, out_channels, *args[1:]]  # args=[in_channels, out_channels, k, s, p]
            # 将参数传入模型
            model_ = nn.Sequential(*[m(*args) for _ in range(number)]) if number > 1 else m(*args)
            # 更新通道列表,每次获取输出通道
            ch.append(out_channels)
            layer.append(model_)

    return nn.Sequential(*layer)    #利用生成的layer链表返回计算图,


if __name__ == '__main__':
    import os  # 导入os模块
    os.chdir('yaml2net/src/')
    #
    # 获得yaml文件名字
    yaml_file = Path('yaml2net.yaml').name
    with open(yaml_file, errors='ignore') as f:
        yaml_ = yaml.safe_load(f)
        print(yaml_)
    #
    # 输出:
    #
    # {'backbone': [[-1, 1, 'BaseConv', [32, 3, 1]], [-1, 1, 'BaseConv', [64, 1, 1]], [-1, 2, 'Bottleneck', [64]]]}

    cfg = 'yaml2net.yaml'
    model = Model(cfg,3)
    print(model)
    # for name,param in model.named_parameters():
    #     print('name=',name)#,'param=',param)
    # model.eval()
    # print(model)
    # x = torch.ones(1, 3, 512, 512)
    # output = model(x)
    # torch.save(model, "model.pth")
    #
    # model = torch.load('model.pth')
    # model.eval()
    # x = torch.ones(1,3,512,512)
    # input_name = ['input']
    # output_name = ['output']
    # torch.onnx.export(model, x, 'myonnx.onnx', verbose=True)

7-2 人脸比对+迁移学习

7-2-1 访问模型参数,修改模型结构

import torch
import torch.nn as nn
import numpy as np
class MnistLinearNet(nn.Module):
    def __init__(self):
        super(MnistLinearNet, self).__init__()
        self.conv1 =nn.Conv2d(1,1,kernel_size=3)
        self.flat = nn.Flatten()
        self.fc1 = nn.Linear(9, 5)
        self.fc2 = nn.Linear(5, 5)
        self.net = nn.Sequential(self.conv1,nn.ReLU(),self.flat,self.fc1,nn.ReLU(),self.fc2)
        #如果模型这样定义,参数名会有哪些变化?
        # self.net =     nn.Sequential(   nn.Conv2d(1,1,kernel_size=3)   ,
        #                                 nn.ReLU(),
        #                                 nn.Flatten()   ,
        #                                 nn.Linear(9, 5)    ,
        #                                 nn.Linear(5, 5))

    def forward(self, x):
        ''' Forward pass through the network, returns the output logits '''
        x=self.net(x)
        return x

#按顺序返回模型中的参数名称列表
#0 输入为nn.Module对象,返回['conv1.weight', 'conv1.bias', ....]形式的参数名称列表
def get_param_names(model):
     return [name for name, _ in model.named_parameters()]



#1 返回模型中卷积对应的bias参数值,要求返回类型为Tensor
def get_param_value(model_path):
    model = torch.load(model_path)
    return model.net[0].bias.data

#2 根据参数名,查找参数对应的Tensor对象
def get_param_by_name(model,param_name):
    return model.state_dict()[param_name]

#3 加载模型,并修改最后一层为输出为2的全连接层nn.Linear(5, 2),返回model
def modify_model(model_path):
    model = torch.load(model_path)
    model.net[5] = nn.Linear(5, 2)
    return model


#4 加载模型参数,模型相同
def load_model_param(model,param_path):
    model.load_state_dict(torch.load(param_path))
    return model


#5 加载模型部分参数,两个模型的fc2结构不同,因此不能加载fc2的参数
def load_model2_param(model,param_path):
    pretrain_dict = torch.load(param_path)
    # filter out fc2 keys
    model_dict2 = {k: v for k, v in pretrain_dict.items() if k in pretrain_dict and 'fc2' not in k }
    model.load_state_dict(model_dict2, strict=False)
    return model




if __name__ == '__main__':
    model =    MnistLinearNet()
    x = torch.tensor(np.arange(0,25).astype('float32').reshape(5,5)).unsqueeze(0) .unsqueeze(0)

    print(get_param_names(model))
    print(get_param_value('transfer_learning/src/model.pth'))

    model = torch.load('transfer_learning/src/model.pth')
    print(get_param_by_name(model,'conv1.bias'))

    print('model(x):',model(x).data)
    model2 = modify_model('transfer_learning/src/model.pth')
    model2.net[5].weight.data = torch.tensor(np.arange(0,10).astype('float32').reshape(2,5))
    model2.net[5].bias.data = torch.tensor(np.arange(0, 2).astype('float32'))
    print('model2(x):',model2(x).data)

    model = load_model_param(model ,'transfer_learning/src/model_dict.pth')
    print(model.fc2.bias)

    model = MnistLinearNet()
    model.fc2 = nn.Linear(10, 2)
    model = load_model2_param(model,'transfer_learning/src/model_dict.pth')
    print(model.conv1.weight)

7-2-2 利用Dataset接口创建样本采样生成器

from torch.utils.data import Dataset ,DataLoader
import random
import numpy as np
import zipfile
from PIL import Image
from io import BytesIO
import re


class SiameseNetworkDataset(Dataset):
    def __init__(self, imgzipfile):
        self.imgzip = zipfile.ZipFile(imgzipfile)
        self.namelist = self.imgzip.namelist()
        self.persons = [x for x in self.namelist if re.match(r'.*/training/.*/$', x)]
        self.bmps = [x  for x in self.namelist if  re.match(r'.*/training/.*\.bmp$',x)]
        self.person_dict = {}
        for k in self.persons:
            self.person_dict[k] = [v for v in self.namelist if re.match(k + '.*\.bmp$', v)]
    def __getitem__(self, index):
        label = random.randint(0, 1)
        if label:   #选同一个的两个样本
            person = random.choice(self.persons)
            img0,img1 = random.sample(self.person_dict[person], 2)
        else:
            persons = random.sample(self.persons, 2)
            img0 = random.choice(self.person_dict[persons[0]])
            img1 = random.choice(self.person_dict[persons[1]])
        img0 = np.array(Image.open(BytesIO(self.imgzip.read(img0)))).astype('float32')[None, :]
        img1 = np.array(Image.open(BytesIO(self.imgzip.read(img1)))).astype('float32')[None, :]
        return img0,img1,label
    def __len__(self):
        return len(self.bmps)

if __name__ == '__main__':
    siamese_dataset = SiameseNetworkDataset('transfer_learning/faces.zip')
    train_dataloader = DataLoader(siamese_dataset,
                                  shuffle=True,
                                  num_workers=0,
                                  batch_size=8)
    for  idx,data in enumerate(train_dataloader):
        img0,img1,label = data
        print(img0.shape,label.sum()<8)
        break

7-2-3 自定义网络,利用孪生网络实现人脸比对

注意:由于系统编译时间不固定,需要多评测几次才能小于20s,代码没有问题

import torch.nn as nn
import torch
import torch.nn.functional as F
import torch.optim as optim
from dataloader import SiameseNetworkDataset
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        self.net = nn.Sequential(
            nn.Conv2d(1, 24, 3),
            nn.ReLU(),
            nn.MaxPool2d(3),
            nn.Conv2d(24, 32, 3),
            nn.ReLU(),
            nn.MaxPool2d(3),
            nn.Flatten(),
            nn.Linear(3168, 1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024, 128)
            )


    def forward(self, input1, input2):
        output1 = self.net(input1)
        output2 = self.net(input2)
        return output1, output2

class ContrastiveLoss(torch.nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        d = F.pairwise_distance(output1, output2)
        loss = torch.mean((1-label)*torch.pow(d, 2)+(label)*torch.pow(torch.clamp(self.margin - d, min=0.0), 2))
        return loss




def  train(train_dataloader):
    net = SiameseNetwork()
    criterion = ContrastiveLoss()
    optimizer = optim.Adam(net.parameters(), lr=0.0005)
    for epoch in range(2):
        for i, data in enumerate(train_dataloader, 0):
            img0, img1, label = data

            output1, output2 = net(img0, img1)
            optimizer.zero_grad()
            loss_contrastive = criterion(output1, output2, label)
            loss_contrastive.backward()
            optimizer.step()

            if i % 10 == 0:
                print("Epoch:{},  Current loss {}\n".format(epoch, loss_contrastive.item()))

if __name__ == '__main__':
    siamese_dataset = SiameseNetworkDataset('data/faces.zip')
    train_dataloader = DataLoader(siamese_dataset,
                                  shuffle=True,
                                  num_workers=0,
                                  batch_size=20)
    for  idx,data in enumerate(train_dataloader):
        img0,img1,label = data
        print(img0.shape,label)
        break

    train(train_dataloader)

7-3 图像语义分割U-Net

import cv2
import numpy as np
#glob用于获取目录或文件
from glob import glob
import random
import torch
import torch.nn.functional as F
import torch.nn as nn
from torch import optim
from torch.utils.data import Dataset ,DataLoader

class ImgSegDataSet(Dataset):
    def __init__(self, data_path):
        self.data_path = data_path
        self.imgs_path = glob(data_path+'*.png')

    #对图像进行翻转
    def augment(self, image, flipcode):
        flip = cv2.flip(image, flipcode)
        return flip

    def __getitem__(self, index):
        #读取图片和标签
        image_path = self.imgs_path[index]
        label_path = image_path.replace('image', 'label')
        #cv2读取的图像类型为numpy.ndarray类型
        image = cv2.imread(image_path,cv2.IMREAD_GRAYSCALE)
        label = cv2.imread(label_path,cv2.IMREAD_GRAYSCALE)
        #对图片进行预处理preprocess
        if label.max() > 1:
            label = label / 255
        #图像增强, 垂直翻转0,水平翻转1,水平和垂直翻转-1
        flipcode = random.choice([-1, 0, 1, 2])
        if flipcode != 2:
            image = self.augment(image, flipcode)
            label = self.augment(label, flipcode)
        return image, label

    def __len__(self):
        return len(self.imgs_path)

class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)

class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)

class Up(nn.Module):
    """Upscaling then double conv"""
    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()
        # if bilinear, use the normal convolutions to reduce the number of channels
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        else:
            self.up = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)

        self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        #input is CHW
        diffY = torch.tensor([x2.size()[2] - x1.size()[2]])
        diffX = torch.tensor([x2.size()[3] - x1.size()[3]])

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])

        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)

class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=True):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        self.inc = DoubleConv(n_channels, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        self.down4 = Down(512, 1024)
        self.up1 = Up(1024, 512, bilinear)
        self.up2 = Up(512, 256, bilinear)
        self.up3 = Up(256, 128, bilinear)
        self.up4 = Up(128, 64, bilinear)
        self.outc = OutConv(64, n_classes)

    def forward(self, x):
        # ___________________#补充代码
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

#原医学图像数据集
def Train_Unet(unet,device,data_path,model_path,batch_size,epochs):
    #加载数据集
    train_dataset = ImgSegDataSet(data_path)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    #定义优化算法
    opt = optim.Adam(unet.parameters())
    #定义损失函数
    loss_fun = nn.BCEWithLogitsLoss()
    unet.train()
    for epoch in range(epochs):
        for image, label in train_loader:
            opt.zero_grad()
            image = image.to(device=device, dtype=torch.float32).unsqueeze(1)
            label = label.to(device=device, dtype=torch.float32).unsqueeze(1)
            pred=unet(image)
            loss = loss_fun(pred, label)
            loss.backward()
            opt.step()
            print('epoch: {:d}, loss: {:f}'.format(epoch, loss.item()))
            break



if __name__ == '__main__':
    data_path = "project/data/train_image/"
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    unet = UNet(n_channels= 1, n_classes= 1, bilinear=False)
    unet.to(device=device)
    # Train_Unet(unet, device, data_path,model_path='unet.pth', epochs=1, batch_size=1)