基于pytorch的神经网络模型实现情感分类任务
)
学习笔记,仅供参考!
学习内容来源于书籍《基于预训练模型的方法-自然语言处理》
持续更新!!!
1. 情感分类实战
使用pytorch实现下面四种深度学习模型,多层感知器,卷积神经网络,LSTM,Transformer,来解决文本分类问题,主要以cnn模型为例来介绍。为了完成该任务,还需要编写词表映射,词向量层,数据处理,模型构建和训练。
1.1 数据处理
1.1.1 词表映射
我们需要将输入的语言符号,也就是我们的文本数据,映射为小于词表大小的整数,记为标记(token),这个整数也被称为一个标记的的索引值或者下标。下面编写一个Vocab类实现标记与索引值之间的相互映射,代码如下:`
from collections import defaultdict, Counter
class Vocab:
def __init__(self, tokens=None):
self.idx_to_token = list()
self.token_to_idx = dict()
if tokens is not None:
if "<unk>" not in tokens:
tokens = tokens + ["<unk>"]
for token in tokens:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
self.unk = self.token_to_idx['<unk>']
@classmethod
def build(cls, text, min_freq=1, reserved_tokens=None):
token_freqs = defaultdict(int)
for sentence in text:
for token in sentence:
token_freqs[token] += 1
uniq_tokens = ["<unk>"] + (reserved_tokens if reserved_tokens else [])
uniq_tokens += [token for token, freq in token_freqs.items() \
if freq >= min_freq and token != "<unk>"]
return cls(uniq_tokens)
def __len__(self):
#返回词表的大小,词表中互不相同的标记token
return len(self.idx_to_token)
def __getitem__(self, token):
#查找输入标记对应的索引值,如果不存在,返回unk
return self.token_to_idx.get(token, self.unk)
def convert_tokens_to_ids(self, tokens):
#查找一系列输入标记的索引值
return [self[token] for token in tokens]
def convert_ids_to_tokens(self, indices):
#查找一系列索引值对应的标记
return [self.idx_to_token[index] for index in indices]
def save_vocab(vocab, path):
with open(path, 'w') as writer:
writer.write("\n".join(vocab.idx_to_token))
def read_vocab(path):
with open(path, 'r') as f:
tokens = f.read().split('\n')
return Vocab(tokens)
1.1.2 数据处理
获取情感分类数据集,数据来源于NLTK库提供的语料库sentence_polarity,数据已经经过一定的标记解析,可以通过Vocab类直接将数据每个标记映射到相应的索引值,具体代码如下:
#数据处理
def load_sentence_polarity():
from nltk.corpus import sentence_polarity
#创建词表,数据来源于NLTK语料库sentence_polarity
vocab = Vocab.build(sentence_polarity.sents())
"""
褒贬各4000句作为训练数据,并使用创建的词表将标记映射为相应的索引值
褒义标签为0,贬义标签为1
每个样例由一个索引值和标签组成的元组
"""
train_data = [(vocab.convert_tokens_to_ids(sentence), 0)
for sentence in sentence_polarity.sents(categories='pos')[:4000]] \
+ [(vocab.convert_tokens_to_ids(sentence), 1)
for sentence in sentence_polarity.sents(categories='neg')[:4000]]
#其余的数据作为测试数据
test_data = [(vocab.convert_tokens_to_ids(sentence), 0)
for sentence in sentence_polarity.sents(categories='pos')[4000:]] \
+ [(vocab.convert_tokens_to_ids(sentence), 1)
for sentence in sentence_polarity.sents(categories='neg')[4000:]]
return train_data, test_data, vocab
对于上述处理的数据可以放在DataLoader类里面(在torch.utils.data包里),例如,创建下列语句来建立一个DataLoader对象:
data_loader = DataLoader(
dataset,
batch_size=64,
collate_fn=collate_fn,
shuffle=True
)
对于上面四个参数,做以下介绍,以CNN模型为例:
dataset参数:dataset是Dataset类的一个对象,用于存储数据:
# 数据处理,数据的存储
class CnnDataset(Dataset):
def __init__(self, data):
# data为原始数据,如使用load_sentence_polarity获取的训练数据和测试数据
self.data = data
def __len__(self):
# 返回数据中样例的数目
return len(self.data)
def __getitem__(self, i):
# 返回下标为i的样例
return self.data[i]
collate_fn:对一个批次做处理,实现对多个batch可以做处理:
def collate_fn(examples):
"""
:param examples: 从独立样本中构建各批次的输入输出
CnnDataset类定义了一个样本的数据结构,输入标签和输出标签的元组
因此,将输入inputs定义为一个张量的列表,其中每个张量为原始句子中标记序列
"""
# 对应的索引值序列ex[0]
inputs = [torch.tensor(ex[0]) for ex in examples]
# 输出的目标targets为该批次中全部样例输出结果(0或1)构成的张量
targets = torch.tensor([ex[1] for ex in examples], dtype=torch.long)
# 对batch内的样本进行padding,使其具有相同长度
# batch_first=True表示第一维代表批次的大小
inputs = pad_sequence(inputs, batch_first=True)
return inputs, targets
1.2 CNN神经网络模型
卷积神经网络,对于文本数据,选用一维卷积Conv1d,进行模型的建立,向前传播,再对数据加载,模型加载,训练模型,测试结果,完整代码如下:
# 模型建立
class CNN(nn.Module):
# 模型构建
def __init__(self, vocab_size, embedding_dim, filter_size, num_filter, num_class):
super(CNN, self).__init__()
# 词向量层
self.embedding = nn.Embedding(vocab_size, embedding_dim)
# 卷积层,num_filter为卷积核,padding表示在卷积之前,在序列的前后各补充一个输入
self.conv1d = nn.Conv1d(embedding_dim, num_filter, filter_size, padding=1)
# 激活函数
self.activate = F.relu
# 输出层
self.linear = nn.Linear(num_filter, num_class)
# 模型的向前过程
def forward(self, inputs):
embedding = self.embedding(inputs)
convolution = self.activate(self.conv1d(embedding.permute(0, 2, 1)))
pooling = F.max_pool1d(convolution, kernel_size=convolution.shape[2])
outputs = self.linear(pooling.squeeze(dim=2))
log_probs = F.log_softmax(outputs, dim=1) # 取对数,避免数据溢出
return log_probs
# tqdm是一个Pyth模块,能以进度条的方式显示迭代的进度
from tqdm.auto import tqdm
# 超参数设置
embedding_dim = 128
hidden_dim = 256
num_class = 2
batch_size = 32
num_epoch = 5
filter_size = 3
num_filter = 100
# 加载数据
train_data, test_data, vocab = load_sentence_polarity()
train_dataset = CnnDataset(train_data)
test_dataset = CnnDataset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, shuffle=False)
# 加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = CNN(len(vocab), embedding_dim, filter_size, num_filter, num_class)
model.to(device) # 将模型加载到CPU或GPU设备
# 训练过程
nll_loss = nn.NLLLoss() # 定义损失函数
optimizer = optim.Adam(model.parameters(), lr=0.001) # 使用Adam优化器
model.train()
for epoch in range(num_epoch):
total_loss = 0
for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}"):
inputs, targets = [x.to(device) for x in batch]
log_probs = model(inputs) # 调用模型,预测输出结果
loss = nll_loss(log_probs, targets) # 对比预测值和真实值,计算损失
optimizer.zero_grad() # 在反向传播之前,将优化器的梯度置为0,避免每次循环梯度累加
loss.backward() # 反向传播计算梯度
optimizer.step() # 在优化器类更新参数
total_loss += loss.item()
print(f"Loss: {total_loss:.2f}")
# 测试过程
acc = 0
for batch in tqdm(test_data_loader, desc=f"Testing"):
inputs, targets = [x.to(device) for x in batch]
with torch.no_grad():
output = model(inputs)
acc += (output.argmax(dim=1) == targets).sum().item()
# 输出在测试集上的准确率
print(f"Acc: {acc / len(test_data_loader):.2f}")
1.3 循环神经网络模型
1.4 Transformer框架
更多推荐