我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用nltk.tokenize.TweetTokenizer()。
def __init__(self, dictionary_file): with open(dictionary_file, 'r') as f: self.word2i = json.load(f)['word2i'] self.wpt = TweetTokenizer(preserve_case=False) if "<stop_dialogue>" not in self.word2i: self.word2i["<stop_dialogue>"] = len(self.word2i) self.i2word = {} for (k, v) in self.word2i.items(): self.i2word[v] = k # Retrieve key values self.no_words = len(self.word2i) self.start_token = self.word2i["<start>"] self.stop_token = self.word2i["?"] self.stop_dialogue = self.word2i["<stop_dialogue>"] self.padding_token = self.word2i["<padding>"] self.yes_token = self.word2i["<yes>"] self.no_token = self.word2i["<no>"] self.non_applicable_token = self.word2i["<n/a>"] self.answers = [self.yes_token, self.no_token, self.non_applicable_token]
def __init__(self, root, fileids=None, word_tokenizer=TweetTokenizer(), encoding='utf8'): """ :param root: The root directory for this corpus. :param fileids: A list or regexp specifying the fileids in this corpus. :param word_tokenizer: Tokenizer for breaking the text of Tweets into smaller units, including but not limited to words. """ CorpusReader.__init__(self, root, fileids, encoding) for path in self.abspaths(self._fileids): if isinstance(path, ZipFilePathPointer): pass elif os.path.getsize(path) == 0: raise ValueError("File {} is empty".format(path)) """Check that all user-created corpus files are non-empty.""" self._word_tokenizer = word_tokenizer
def tokenize_texts(texts, words): results = [] for text in texts: t = text.lower().strip() t = t.replace('\n', ' ').replace('\t', ' ') t = t.replace("'s", " 's ") t = t.replace("'ll", " 'll ") t = t.replace('-', ' - ') t = t.replace('.', ' . ') res = TweetTokenizer(preserve_case=False, reduce_len=True).tokenize(t) ids = [] for w in res: w_id = words.get(w) if w_id is None: # log.warning("Unknown word found: %s", w) w_id = 0 ids.append(w_id) results.append(ids) return results
def read_data(file=file_path): col_names = ['System-Id', 'Message', 'drug-offset-start', 'drug-offset-end', 'sideEffect-offset-start', 'sideEffect-offset-end', 'WM1', 'WM2', 'relType'] data_frame = pd.read_csv(file, skipinitialspace=True, usecols=col_names) mssg_frame = data_frame['Message'].drop_duplicates() tokenizer = TweetTokenizer() string = [] for mssg in mssg_frame: tokens = tokenizer.tokenize(mssg) for token in tokens: if is_word(token): string.append(token.lower()) if not os.path.isfile("words.txt"): with open("words.txt", "w") as text_file: print(string, file=text_file) return data_frame # TODO use space splitter and then strip the word # TODO change regex to [a-z0-9].+
def preprocess(tweet): preprocessed = copy.copy(tweet) preprocessed = preprocessed.lower() # remove some emoticons the TweetTokenizer does not know preprocessed = remove_emoticons(preprocessed) # split contractions like "he's" -> "he s", # by using imported contractions dictionary preprocessed = split_contractions(preprocessed) # split compounds like "next-level" -> "next level" preprocessed = split_compounds(preprocessed) # remove links preprocessed = remove_links(preprocessed) # remove all special characters and return tokenized text preprocessed = remove_special_characters(preprocessed) preprocessed = remove_empty_sentences(preprocessed) return preprocessed
def rank_by_inverted_words(raw_query, filehashes=None): from nltk.tokenize import TweetTokenizer tokenizer = TweetTokenizer() keywords = tokenizer.tokenize(raw_query) kv_paperwords = lambda filehash: KeyValueStore('paperwords:' + filehash) if not filehashes: # retrieve all from db. complexity warning. scopes = KeyValueStore.scopes('paper:*') filehashes = [scope[len('paper:'):] for scope in scopes] score_by_filehash = {} for filehash in filehashes: word_dict = kv_paperwords(filehash) score = 0. for word in keywords: score += word_dict.get(word, default=0.) score_by_filehash[filehash] = score print score_by_filehash return sorted(score_by_filehash, key=lambda k: score_by_filehash[k], reverse=True)
def predict(input_string): mask = lambda w, v: 1 if w not in v else v[w] tknzr = TweetTokenizer(reduce_len=True, preserve_case=False) words = tknzr.tokenize(input_string) vec = [[mask(w, pd.vocab) for w in words]] vec = np.array( vec, dtype="int32") vec = pad_sequences(vec, maxlen=pd.max_sequence) predictions = model.predict(vec) sarcasm = round(predictions[0][1], 2) * 100 return (words, sarcasm) ##################################################################
def twitter_tokenizer(x): return TweetTokenizer(strip_handles=True).tokenize(x)
def get_tweet_tags(tweet): """ Break up a tweet into individual word parts """ tknzr = TweetTokenizer() tokens = tknzr.tokenize(tweet) # replace handles with real names for n, tok in enumerate(tokens): if tok.startswith('@'): handle = tok.strip("@") if handle in user.students: # If we have a database entry for the mentioned user, we can # easily substitute a full name. usr = user.NPUser(handle) tokens[n] = usr.fullname else: # If there is no database entry, we use the user's alias. While # this is the full name in many cases, it is often not reliable usr = api.get_user(handle) tokens[n] = usr.name tagged = nltk.pos_tag(tokens) # In nltk, if a teacher's name is written with a period after an # abbreviated prefix, it is awkwardly broken up into 3 tags for n, tag in enumerate(tagged): # If there is the weird period after the prefix, if tag[1] == '.': # and it is in fact splitting up a person's name, if tagged[n - 1][1] == 'NNP' and tagged[n + 1][1] == 'NNP': if tagged[n - 1][0] in ['Mr', 'Ms', 'Mrs', 'Mx']: # combine it into the actual name, tagged[n - 1] = ('{}. {}'.format(tagged[n - 1][0], tagged[n + 1][0]), 'NNP') # and then remove the extra tags. del tagged[n + 1] del tagged[n] return tagged
def preprocess_tweets( docs, stopwords, min_df = 3, min_term_length = 2, ngram_range = (1,1), apply_tfidf = True, apply_norm = True): """ Preprocess a list containing text documents stored as strings, where the documents have already been tokenized and are separated by whitespace """ from nltk.tokenize import TweetTokenizer tweet_tokenizer = TweetTokenizer(preserve_case = False, strip_handles=True, reduce_len=True) def custom_tokenizer( s ): # need to manually replace quotes s = s.replace("'"," ").replace('"',' ') tokens = [] for x in tweet_tokenizer.tokenize(s): if len(x) >= min_term_length: if x[0] == "#" or x[0].isalpha(): tokens.append( x ) return tokens # Build the Vector Space Model, apply TF-IDF and normalize lines to unit length all in one call if apply_norm: norm_function = "l2" else: norm_function = None tfidf = TfidfVectorizer(stop_words=stopwords, lowercase=True, strip_accents="unicode", tokenizer=custom_tokenizer, use_idf=apply_tfidf, norm=norm_function, min_df = min_df, ngram_range = ngram_range) X = tfidf.fit_transform(docs) terms = [] # store the vocabulary map v = tfidf.vocabulary_ for i in range(len(v)): terms.append("") for term in v.keys(): terms[ v[term] ] = term return (X,terms) # --------------------------------------------------------------
def test_tweet_tokenizer(self): """ Test TweetTokenizer using words with special and accented characters. """ tokenizer = TweetTokenizer(strip_handles=True, reduce_len=True) s9 = "@myke: Let's test these words: resumé España München français" tokens = tokenizer.tokenize(s9) expected = [':', "Let's", 'test', 'these', 'words', ':', 'resumé', 'España', 'München', 'français'] self.assertEqual(tokens, expected)
def tweet_tokenize(self, tweet): #http://www.nltk.org/api/nltk.tokenize.html tknzr = TweetTokenizer() tokens = tknzr.tokenize(tweet) return tokens
def tokenize(tweet): tknzr = TweetTokenizer(strip_handles=True, reduce_len=True, preserve_case=False) return tknzr.tokenize(tweet) # Read cleaned training tweets file into pandas and randomize it
def __init__(self): self.tokenizers = { 'en': TweetTokenizer(), 'de': WordPunctTokenizer(), 'it': WordPunctTokenizer(), 'fr': WordPunctTokenizer(), 'default': WordPunctTokenizer() } self.tokenizer = TweetTokenizer()
def load_tweetkeywords(): """ Check and see which keywords are used in each tweet, and load the association table linking tweets and keywords """ # TweetKeyword.query.delete() tweets = Tweet.query.all() keyword_query = Keyword.query.all() keywords = [] [keywords.append(word.keyword) for word in keyword_query] tknzr = TweetTokenizer() for tweet in tweets: tokenized_tweets = tknzr.tokenize(tweet.text) for token in tokenized_tweets: if token in keywords: tweet_id = Tweet.query.filter(Tweet.tweet_id == tweet.tweet_id).one() keyword_id = Keyword.query.filter(Keyword.keyword == token).one() tweet_keyword = TweetKeyword(keyword_id=keyword_id.keyword_id, tweet_id=tweet_id.tweet_id) print "Added to TweetKeyword table: {}".format(tweet_keyword.keyword_id) db.session.add(tweet_keyword) db.session.commit() ################################################################################
def load_data_and_labels_sam(): # load with open("./input/2780_freshmen_tweets.csv", 'rU') as f: rdr = csv.reader(f) dataset = list(rdr)[1:] # remove header # filter out tweets with unknown sentiment dataset = [entry for entry in dataset if entry[4] != '0'] # generate x tk = TweetTokenizer(reduce_len=True) x_text = [entry[3] for entry in dataset] x_text = [clean_str(tweet) for tweet in x_text] x_text = [tk.tokenize(tweet) for tweet in x_text] # generate y y = [entry[4] for entry in dataset] for idx, label in enumerate(y): if label == '1': # positive y[idx] = [1, 0, 0] elif label == '2': # neutral y[idx] = [0, 1, 0] elif label == '3': # negative y[idx] = [0, 0, 1] else: print 'wrong label in sam: ' + label return [x_text, y]
def load_data_and_labels_gameforum(): # load with open("./input/gameforum-1000.csv", 'rU') as f: rdr = csv.reader(f) dataset = list(rdr)[1:] # remove header dataset = [entry for entry in dataset if (entry[1] == '1' or entry[1] == '2' or entry[1] == '3')] # generate x tk = TweetTokenizer(reduce_len=True) x_text = [entry[0] for entry in dataset] x_text = [clean_str(post) for post in x_text] x_text = [tk.tokenize(post) for post in x_text] # generate y y = [entry[1] for entry in dataset] for idx, label in enumerate(y): if label == '1': # positive y[idx] = [1, 0, 0] elif label == '2': # neutral y[idx] = [0, 1, 0] elif label == '3': # negative y[idx] = [0, 0, 1] else: print 'wrong label in gameforum: ' + label return [x_text, y]
def __init__(self, input_text, state_size=2, chain=None): self.tokenizer = TweetTokenizer(reduce_len=True) self.tag_sep = "@::@" # Circumvent some limitations of markovify by allowing one to create a # POSifiedText from a markovify.Text instance if isinstance(input_text, markovify.Text): m = input_text self.input_text = m.input_text self.rejoined_text = m.rejoined_text self.chain = m.chain else: super().__init__(input_text, state_size, chain)
def load_model(config, model): """ Load a complete model and censor with path to model :param config: :param model: :return: """ # Load model model = Classifier.load(model) censor = CensorModel(config) # Tokenizer tokenizer = TweetTokenizer() # Join features bow = features.BagOfGrams() # Bag of gram, 2-grams, 3-grams bow.add(features.BagOfWords()) bow.add(features.BagOf2Grams()) bow.add(features.BagOf3Grams()) return tokenizer, bow, model, censor # end load_model # end Classifier
def tokenize(tweets, sentiment): # NLTK has a tokenizer built out specifically for short messaging data # here we will use some of it's features to: # turn all words to lowercase, # reduce the length of repeated characters ('hiiiiiiiii' and 'hiiiii' both become 'hiii' with three repeats of the 'i'), # and get rid of any handles that might exist in the message tokenizer = TweetTokenizer(preserve_case=False,reduce_len=True,strip_handles=True) tokenizedTweets = [] cleanedSentiment = [] asciiIssues = 0 for rowIdx, tweet in enumerate(tweets): try: tokenizedWords = tokenizer.tokenize(tweet) tokenizedTweets.append(tokenizedWords) cleanedSentiment.append(sentiment[rowIdx]) except: # there are some weird ascii encoding issues present in a small part of our dataset. # they represent < 1% of our dataset # for MVP, i'm going to ignore them to focus on the 99% use case # these issues do not exist in the test data set, so it is safe to ignore these rows asciiIssues += 1 return tokenizedTweets, cleanedSentiment # some algorithms do not train well on ordered data. This function shuffles our data so we don't have one big block of positive documents followed by another large block of negative documents
def tokenize(text, tokenizer = TweetTokenizer()): return [ tokenizer.tokenize(sentence) for sentence in sent_tokenize(text) ]
def remove_emoticons(text): # build regexp with imported emoticon list smileys = '|'.join(map(re.escape, emoticons)) emoticonsPattern = re.compile('({})'.format(smileys), flags=re.IGNORECASE) removed = re.sub(emoticonsPattern, '', text) # remove unnecessary white spaces utilizing the TweetTokenizer removed = tokenize(removed) return " ".join(sum(removed, []))
def _get_user_tweets(self, screen_name): # TODO: Implement tweet limit # Twitter only allows access to a users most recent 3240 tweets with this method # initialize a list to hold all the tweepy Tweets alltweets = [] # make initial request for most recent tweets (200 is the maximum allowed count) new_tweets = self._api.user_timeline(screen_name = screen_name,count=200) # save most recent tweets alltweets.extend(new_tweets) # save the id of the oldest tweet less one oldest = alltweets[-1].id - 1 # keep grabbing tweets until there are no tweets left to grab while len(new_tweets) > 0: # all subsequent requests use the max_id param to prevent duplicates new_tweets = self._api.user_timeline(screen_name = screen_name,count=200,max_id=oldest) # save most recent tweets alltweets.extend(new_tweets) # update the id of the oldest tweet less one oldest = alltweets[-1].id - 1 # transform the tweepy tweets into a 2D array that will populate the csv outtweets = {tweet.id_str: {'created':tweet.created_at,'text':tweet.text} for tweet in alltweets} # Twitter-aware tokenizer tknzr = TweetTokenizer() # Extend data with linguistic processing for tweet_id in outtweets: # Get tweet data from dictionary tweet = outtweets[tweet_id] # Lowercase tokenized tweet text tweet_tokens = tknzr.tokenize(tweet['text']) # Parts-of-speech tags for tokenized text tweet_pos = nltk.pos_tag(tweet_tokens) # Is the tweet a rewteet? tweet['retweet'] = tweet_pos[0][0] == 'RT' # If retweeted, who was the original author? if tweet['retweet'] is True: tweet['rt_author'] = tweet_pos[1][0] else: tweet['rt_author'] = '' return outtweets # TODO: Might have encoding issues. See: https://stackoverflow.com/questions/6539881/python-converting-from-iso-8859-1-latin1-to-utf-8
def k_tokenizer(text): text = text.encode('ascii',errors='ignore').replace('-', '') """ We should use a better way to remove non-english words """ tokenizer = TweetTokenizer(preserve_case=False) tokens = tokenizer.tokenize(text) # stopset = set(stopwords.words('english')) # tokens = [word for word in tokens if not word in stopset] """ Synonyms using wordnet """ mwe_tokenizer = MWETokenizer([('ios', '9'),]) mwe_tokens = mwe_tokenizer.tokenize(tokens) """ We might want to tokenize by sentence and then tag each sentence and aggregate the results """ """ train -> train_NN train_V""" tagged = nltk.pos_tag(mwe_tokens) def get_wordnet_pos(treebank_tag): if treebank_tag.startswith('J'): return wordnet.ADJ elif treebank_tag.startswith('V'): return wordnet.VERB elif treebank_tag.startswith('N'): return wordnet.NOUN elif treebank_tag.startswith('R'): return wordnet.ADV else: return wordnet.NOUN # we preserve the original form of any unknown word wordnet_lemmatizer = WordNetLemmatizer() final_doc=[] for token, tag in tagged: word = tag + '_' + wordnet_lemmatizer.lemmatize(token, get_wordnet_pos(tag)) final_doc.append(word) # porter = PorterStemmer() # final_doc=[] # for token in mwe_tokens: # final_doc.append(porter.stem(token)) return final_doc
def parsedata(lines, word_list, split_word_list, emoji_dict, normalize_text=False, split_hashtag=False, ignore_profiles=False, lowercase=False, replace_emoji=True): data = [] for i, line in enumerate(lines): if (i % 100 == 0): print(str(i) + '...', end='', flush=True) try: # convert the line to lowercase if (lowercase): line = line.lower() # split into token token = line.split('\t') # label label = int(token[1].strip()) # tweet text target_text = TweetTokenizer().tokenize(token[2].strip()) # filter text target_text = filter_text(target_text, word_list, split_word_list, emoji_dict, normalize_text, split_hashtag, ignore_profiles, replace_emoji=replace_emoji) # awc dimensions dimensions = [] if (len(token) > 3 and token[3].strip() != 'NA'): dimensions = [dimension.split('@@')[1] for dimension in token[3].strip().split('|')] # context tweet context = [] if (len(token) > 4): if (token[4] != 'NA'): context = TweetTokenizer().tokenize(token[4].strip()) context = filter_text(context, word_list, normalize_text, split_hashtag, ignore_profiles) # author author = 'NA' if (len(token) > 5): author = token[5] if (len(target_text) != 0): # print((label, target_text, dimensions, context, author)) data.append((label, target_text, dimensions, context, author)) except: raise print('') return data
def load_data_and_labels_semeval(): # load the entire semeval dataset old_dataset = list(open("./input/2013-dev")) old_dataset.extend(list(open("./input/2013-devtest"))) old_dataset.extend(list(open("./input/2013-train"))) old_dataset.extend(list(open("./input/2014-devtest"))) new_dataset = list(open("./input/2016-train")) new_dataset.extend(list(open("./input/2016-dev"))) new_dataset.extend(list(open("./input/2016-devtest"))) # filter out invalid tweets from new dataset new_dataset = [entry for entry in new_dataset if entry.split('\t')[2] != 'Not Available\n'] # generate x from old tk = TweetTokenizer(reduce_len=True) # handles punctuations x_text = [entry.split('\t')[3] for entry in old_dataset] x_text = [clean_str(tweet) for tweet in x_text] x_text = [tk.tokenize(tweet) for tweet in x_text] # generate x from new x_text_new = [entry.split('\t')[2] for entry in new_dataset] x_text_new = [clean_str(tweet) for tweet in x_text_new] x_text_new = [tk.tokenize(tweet) for tweet in x_text_new] # concat x and x_new x_text.extend(x_text_new) # generate y from old y = [entry.split('\t')[2] for entry in old_dataset] for idx, label in enumerate(y): if label == 'positive': y[idx] = [1, 0, 0] elif label == 'neutral': y[idx] = [0, 1, 0] elif label == 'negative': y[idx] = [0, 0, 1] else: print 'wrong label in semeval: ' + label # generate y from new y_new = [entry.split('\t')[1] for entry in new_dataset] for idx, label in enumerate(y_new): if label == 'positive': y_new[idx] = [1, 0, 0] elif label == 'neutral': y_new[idx] = [0, 1, 0] elif label == 'negative': y_new[idx] = [0, 0, 1] else: print 'wrong label in semeval: ' + label # concat y and y_new y.extend(y_new) return [x_text, y]
def store_file(f_in, f_out, alphabet_words,alphabet_hashtags, dummy_word_idx, hashtag_fname=None): #stores the tweets in batches so it fits in memory tknzr = TweetTokenizer(reduce_len=True) counter = 0 batch_counter = 0 output = open(f_out,'wb') output_hashtag = open(hashtag_fname, 'wb') batch_size = 500000 tweet_batch = [] hashtag_batch=[] with gzip.open(f_in,'r') as f: for tweet in f: tweet,hashtags = preprocess_tweet(tweet) if len(hashtags) == 1: ht = hashtags[0] alphabet_hashtags.add(ht) ht_idx = alphabet_hashtags.get(ht,UNKNOWN_HASHTAG_IDX) tweet = tweet.replace(ht,'') tweet_tok = tknzr.tokenize(tweet.decode('utf-8')) tweet_batch.append(tweet_tok) hashtag_batch.append(ht_idx) batch_counter += 1 for token in tweet_tok: alphabet_words.add(token) if batch_counter >= batch_size: tweet_idx = convert2indices(tweet_batch, alphabet_words, dummy_word_idx) np.save(output,tweet_idx) np.save(output_hashtag,hashtag_batch) print 'Saved tweets:',tweet_idx.shape tweet_batch = [] hashtag_batch=[] batch_counter = 0 counter += 1 if (counter%1000000) == 0: print "Elements processed:",counter tweet_idx = convert2indices(tweet_batch, alphabet_words, dummy_word_idx) np.save(output,tweet_idx) np.save(output_hashtag,hashtag_batch) print len(alphabet_hashtags) print len(alphabet_words) print 'Saved tweets:',tweet_idx.shape return counter
def inverse_indexing_once(): kv_paperwords = lambda filehash: KeyValueStore('paperwords:' + filehash) scopes = KeyValueStore.scopes('paper:*') from nltk.tokenize import TweetTokenizer tokenizer = TweetTokenizer() def make_dict(text, weight=1., prefix_weight=0.): if not text: return {} words = tokenizer.tokenize(text.lower().strip()) result = {} for word in words: for i in range(1, len(word)): prefix = word[:i] if prefix not in result: result[prefix] = 0. result[prefix] += prefix_weight if word not in result: result[word] = 0. result[word] += weight return result def merge_dict(dict1, dict2): new_dict = {} for word in set(dict1.keys()).union(dict2.keys()): weight1 = dict1.get(word, 0.) weight2 = dict2.get(word, 0.) new_dict[word] = weight1 + weight2 return new_dict for scope in scopes: filehash = scope[len('paper:'):] meta = KeyValueStore(scope_name=scope) title = meta['title'] abstract = meta.get('abstract', default='') dict_title = make_dict(title, weight=6., prefix_weight=0.06) dict_abstract = make_dict(abstract, weight=2., prefix_weight=0.02) final_dict = merge_dict(dict_title, dict_abstract) authors = meta['authors'] if authors: for author in authors: dict_author = make_dict(author['first_name'] + ' ' + author['last_name']) final_dict = merge_dict(dict_author, final_dict) kv_paperwords(filehash).update(final_dict)