-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpreprocessor.py
157 lines (124 loc) · 5.25 KB
/
preprocessor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from database_access import DatabaseAccess
import pandas as pd
class Preprocessor:
def split_sentences(self, reviews):
sentences = []
for review in reviews:
for sentence in self.__split_into_sentences(review):
sentences.append(sentence)
return sentences
def split_review_into_sentences(self, review):
sentences = []
for sentence in self.__split_into_sentences(review):
sentences.append(sentence)
return sentences
def __split_into_sentences(self, text):
import re
alphabets = "([A-Za-z])"
prefixes = "(Mr|St|Mrs|Ms|Dr)[.]"
suffixes = "(Inc|Ltd|Jr|Sr|Co)"
starters = "(Mr|Mrs|Ms|Dr|He\s|She\s|It\s|They\s|Their\s|Our\s|We\s|But\s|However\s|That\s|This\s|Wherever)"
acronyms = "([A-Z][.][A-Z][.](?:[A-Z][.])?)"
websites = "[.](com|net|org|io|gov)"
text = " " + text + " "
text = text.replace("\n", " ")
text = re.sub(prefixes, "\\1<prd>", text)
text = re.sub(websites, "<prd>\\1", text)
if "Ph.D" in text:
text = text.replace("Ph.D.", "Ph<prd>D<prd>")
text = re.sub("\s" + alphabets + "[.] ", " \\1<prd> ", text)
text = re.sub(acronyms+" "+starters, "\\1<stop> \\2", text)
text = re.sub(alphabets + "[.]" + alphabets + "[.]" +
alphabets + "[.]", "\\1<prd>\\2<prd>\\3<prd>", text)
text = re.sub(alphabets + "[.]" + alphabets +
"[.]", "\\1<prd>\\2<prd>", text)
text = re.sub(" "+suffixes+"[.] "+starters, " \\1<stop> \\2", text)
text = re.sub(" "+suffixes+"[.]", " \\1<prd>", text)
text = re.sub(" " + alphabets + "[.]", " \\1<prd>", text)
if "”" in text:
text = text.replace(".”", "”.")
if "\"" in text:
text = text.replace(".\"", "\".")
if "!" in text:
text = text.replace("!\"", "\"!")
if "?" in text:
text = text.replace("?\"", "\"?")
text = text.replace(".", ".<stop>")
text = text.replace("?", "?<stop>")
text = text.replace("!", "!<stop>")
text = text.replace("<prd>", ".")
sentences = text.split("<stop>")
sentences = sentences[:-1]
sentences = [s.strip() for s in sentences]
return sentences
def __get_wordnet_pos(self, pos_tag):
from nltk.corpus import wordnet
if pos_tag.startswith('J'):
return wordnet.ADJ
elif pos_tag.startswith('V'):
return wordnet.VERB
elif pos_tag.startswith('N'):
return wordnet.NOUN
elif pos_tag.startswith('R'):
return wordnet.ADV
else:
return wordnet.NOUN
def __clear_sentence(self, text):
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet
from nltk.corpus import stopwords
from nltk import pos_tag
import string
import nltk
# convert to lower case
text = text.lower()
# tokenize text
words = set(nltk.corpus.words.words())
text = " ".join(w for w in nltk.wordpunct_tokenize(text)
if w.lower() in words)
text = nltk.word_tokenize(text)
# part-of-speech tagging
pos_tags = pos_tag(text)
# stemming
lemmatizer = WordNetLemmatizer()
text = [lemmatizer.lemmatize(tag[0], self.__get_wordnet_pos(
tag[1])) for tag in pos_tags if tag[1].startswith('N')]
# remove stopwords
stopwords_en = stopwords.words('english')
text = [word for word in text if word not in stopwords_en]
# remove punctuation
text = [word.strip(string.punctuation) for word in text]
# remove numbers
text = [word for word in text if not any(
letter.isdigit() for letter in word)]
# remove empty and one-letter words
text = [word for word in text if len(word) > 1]
return " ".join(text)
# iterates through reviews and returns processed review array.
def clear_doc(self, doc):
clean_doc = []
for sentence in doc:
clean_sentence = self.__clear_sentence(sentence)
# eliminate short or empty strings.
if len(clean_sentence) > 2:
clean_doc.append(clean_sentence)
return clean_doc
def clear_reviews(self, reviews, data_size):
from extracter_analyzer import get_polarity, isNegative
clean_doc = []
for i in range(0, len(reviews)):
if len(clean_doc) >= data_size:
break
review = reviews[i]
sentence_idx = 0
for sentence in self.__split_into_sentences(review):
polarity = get_polarity(sentence)
if isNegative(polarity):
clean_sentence = self.__clear_sentence(sentence)
# eliminate short or empty strings.
if len(clean_sentence) > 2:
clean_doc.append((clean_sentence, i, sentence_idx))
print("Original Sentence: ", sentence)
print("Preprocessed Sentence: ", clean_sentence)
sentence_idx += 1
return clean_doc