LSTM-文本情感预测

数据预处理

去除了停用词,分词,最后输出一个矩阵,每一行时经过处理后的单词列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import pandas as pd
import nltk
import re
train_data = pd.read_csv('./train.csv', encoding='ISO-8859-1')
size_train_data = len(train_data)
tweet_text = train_data['OriginalTweet']

# 对Tweet进行分句, 同时去除标点符号
tweet_sentences = []
for i in range(size_train_data):
tweet = tweet_text[i]
tweets = nltk.sent_tokenize(tweet)
for j in range(len(tweets)):
sentence = tweets[j]
tweets[j] = re.sub(r'[^a-zA-Z0-9\s]','',string= sentence)
tweets[j] = tweets[j].replace("\r","").replace("\n","").replace("\t","")
tweet_sentences.append(tweets)

Word2vec模型训练

1
2
model = Word2Vec(text_combine,vector_size = 100, window = 5 , min_count = 5, epochs=7, negative=10,sg=1)
model.save('./models/Word2vec_v4') # 保存模型

构建词典

句子的向量表示:sentence = [word_index1,word_index2,....,word_indexn]

注意是用索引组成的列表表示,而不是直接用word embedding表示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import numpy as np
import scipy as sp
import pandas as pd
from gensim.corpora import Dictionary
from gensim.models import Word2Vec
from collections import Counter
import pickle
import os
os.environ["CUDA_DEVICE_ORDER"]="0"

def create_dictionaries(model=None):
"""
创建词语字典,返回word2vec模型中词语的索引、词向量
:param model:
:return: index, wordvec
"""
gensim_dict = Dictionary()
# doc2bow:
# Bag-of-words model : BoW
# 忽略文本的语法和语序,仅仅看作若干个词的集合,词是无序的.
# 每个句子都可以用BoW来表示 假设又m个词:
# 每个句子可以表示成:
# [n1,...nm]
# ni表示索引为i的词出现的次数
gensim_dict.doc2bow(model.wv.key_to_index.keys(),allow_update=True)

w2index = {v : k + 1 for k,v in gensim_dict.items()}
# print(w2index)
w2vec = {word : model.wv.get_vector(word) for word in w2index.keys()}

return w2index,w2vec

# 加载模型
model = Word2Vec.load('./models/Word2vec_v4')
index_dict , word_vectors = create_dictionaries(model)

output = open("dictW.pkl", 'wb')
pickle.dump(index_dict, output) # 索引字典
pickle.dump(word_vectors, output) # 词向量字典
output.close()

LSTM神经网络训练和预测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import pickle
import pandas as pd
from keras import Sequential
from keras.layers import Embedding, LSTM, Dropout, Dense
from sklearn.model_selection import train_test_split
import numpy as np
from tensorflow.compat.v2 import keras
from keras.models import load_model
# import pylab as plt
import os
os.environ["CUDA_VISIBLE_DEVICES"]="0"
# import pylab as plt
vocab_dim = 100 # 向量维度
maxlen = 70 # 文本保留的最大长度
batch_size = 32 # 训练过程中 每次传入模型的特征向量
n_epoch = 4 #迭代次数


# 打开词字典
f = open("dictW.pkl",'rb')
index_dict = pickle.load(f) # key:word value:index
word_vectors = pickle.load(f) # key: word value:vec

n_symbols = len(index_dict) + 1 # 索引单词的个数

# 创建一个n_symbols * vocab_dim 的矩阵 用于填充所有的词向量
embedding_weights = np.zeros((n_symbols,vocab_dim))

for w,index in index_dict.items():
embedding_weights[index, :] = word_vectors[w]

def text_to_index_array(p_new_dic=None,p_sen=None):
# if p_sen != None and p_new_dic != None:
new_sentences = []
for sen in p_sen:
new_sen = []
for word in sen:
try:
# 句子用单词的索引来填充
new_sen.append(p_new_dic[word])
except:
# 未出现在dict中的单词用索引0填充
new_sen.append(0)
new_sentences.append(new_sen)
return np.array(new_sentences)
# else:
# print("Error p_sen is not a list!")
# return None

train_data = pd.read_csv('./train.csv', encoding='ISO-8859-1')
train_label = train_data['Sentiment']

train_label_New = []
for label in train_label:
labelArray = [0] * 5
if label == "Positive" :
index = 0
elif label == "Negative":
index = 1
elif label == "Neutral" :
index = 2
elif label == "Extremely Positive" :
index = 3
else:
index = 4
labelArray[index] = 1
train_label_New.append(labelArray)
train_label = train_label_New

train_data = np.load("./sentences.npy", allow_pickle=True)
# 随机划分训练集和开发集 比例为9:1
X_train , X_dev , y_train, y_dev = train_test_split(train_data,train_label,test_size=0.1)

# print("X_train:",X_train)
X_train_new = text_to_index_array(index_dict,X_train)
X_dev_new = text_to_index_array(index_dict,X_dev)

X_train_new = keras.preprocessing.sequence.pad_sequences(X_train_new, maxlen=maxlen)
X_dev_new = keras.preprocessing.sequence.pad_sequences(X_dev_new, maxlen=maxlen)


y_train_new = np.array(y_train)
y_dev_new = np.array(y_dev)

print("测试集shape:",X_train_new.shape)
print("开发集shape:",X_dev_new.shape)



def train_lstm(p_n_symbols, p_embedding_weights, p_X_train, p_y_train, p_X_test, p_y_test, X_test_l):
print('创建模型...')
model = Sequential()
model.add(Embedding(output_dim=vocab_dim, # 输出向量维度
input_dim=p_n_symbols, # 输入向量维度
mask_zero=True, # 使我们填补的0值在后续训练中不产生影响(屏蔽0值)
weights=[p_embedding_weights], # 对数据加权
input_length=maxlen )) # 每个特征的长度

model.add(LSTM(units=100,
activation='sigmoid',
inner_activation='hard_sigmoid'))
model.add(Dropout(0.5)) # 每次迭代丢弃50神经元 防止过拟合
model.add(Dense(units=512,
activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(units=5, # 输出层1个神经元 1代表正面 0代表负面
activation='sigmoid'))
model.summary()

print('编译模型...')
model.compile(loss='binary_crossentropy',
optimizer='adam',
metrics=['accuracy'])

print("训练...")
train_history = model.fit(p_X_train, p_y_train, batch_size=batch_size, nb_epoch=n_epoch,
validation_data=(p_X_test, p_y_test))

print("评估...")
score, acc = model.evaluate(p_X_test, p_y_test, batch_size=batch_size)
label = model.predict(p_X_test)
print('Test score:', score)
print('Test accuracy:', acc)
# for (a, b, c) in zip(p_y_test, X_test_l, label):
# print("原文为:"+ "".join(b))
# print("真实值:", a)
# print("预测值", c)

# show_train_history(train_history, 'acc', 'val_acc') # 训练集准确率与验证集准确率 折线图
# show_train_history(train_history, 'loss', 'val_loss') # 训练集误差率与验证集误差率 折线图

"""保存模型"""
model.save('./model/emotion_model_LSTM2.h5')
print("模型保存成功")
def show_train_history(train_history,train, velidation):
"""
可视化训练过程 对比
:param train_history:
:param train:
:param velidation:
:return:
"""
plt.plot(train_history.history[train])
plt.plot(train_history.history[velidation])
plt.title("Train History") #标题
plt.xlabel('Epoch') #x轴标题
plt.ylabel(train) #y轴标题
plt.legend(['train', 'test'], loc='upper left') #图例 左上角
plt.show()

def get_max_index(listRe):
maxProb = 0.0
maxProbIndex = 0
i = 0
for re in listRe:
if re > maxProb:
maxProb = re
maxProbIndex = i
i = i + 1
return maxProbIndex
def prediction(test_data,raw_test_data):

keras.backend.clear_session()
model = load_model('./model/emotion_model_LSTM2.h5',)
label = model.predict(test_data)
file = open('./submission.txt', 'w')
i=0
# print(raw_test_data)
for re in label:
index = get_max_index(re)
# file.writelines(raw_test_data[i])
i = i + 1
if index == 0:
file.write("Positive"+'\n')
elif index == 1:
file.write("Negative"+'\n')
elif index == 2:
file.write("Neutral"+'\n')
elif index == 3:
file.write("Extremely Positive"+'\n')
else:
file.write("Extremely Negative"+'\n')


train_lstm(n_symbols, embedding_weights, X_train_new, y_train_new, X_dev_new, y_dev_new, X_dev)

# 预测
raw_test_data = np.load("./test_sentences1.npy", allow_pickle=True)
test_data = text_to_index_array(index_dict,raw_test_data)
test_data = keras.preprocessing.sequence.pad_sequences(test_data, maxlen=maxlen)
prediction(test_data,raw_test_data)

最后的结果不是太好,只有百分之八十左右的精度,还有很多可以改良的地方。例如数据预处理的时候可以考虑停用词是否要去掉,因为问号等会对语气产生影响。

完整报告