我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.open()。
def test_zipfile_timestamp(): # An environment variable can be used to influence the timestamp on # TarInfo objects inside the zip. See issue #143. TemporaryDirectory is # not a context manager under Python 3. with temporary_directory() as tempdir: for filename in ('one', 'two', 'three'): path = os.path.join(tempdir, filename) with codecs.open(path, 'w', encoding='utf-8') as fp: fp.write(filename + '\n') zip_base_name = os.path.join(tempdir, 'dummy') # The earliest date representable in TarInfos, 1980-01-01 with environ('SOURCE_DATE_EPOCH', '315576060'): zip_filename = wheel.archive.make_wheelfile_inner( zip_base_name, tempdir) with readable_zipfile(zip_filename) as zf: for info in zf.infolist(): assert info.date_time[:3] == (1980, 1, 1)
def read_text(filename,rel_hash): id_counter = 0 nodes = {} f = codecs.open(filename, "r", "utf-8") #Add some default relations if none have been supplied (at least 1 rst and 1 multinuc) if len(rel_hash) < 2: rel_hash["elaboration_r"] = "rst" rel_hash["joint_m"] = "multinuc" rels = collections.OrderedDict(sorted(rel_hash.items())) for line in f: id_counter += 1 nodes[str(id_counter)] = NODE(str(id_counter),id_counter,id_counter,"0",0,"edu",line.strip(),rels.keys()[0],rels.values()[0]) return nodes
def shared_locations(self): """ A dictionary of shared locations whose keys are in the set 'prefix', 'purelib', 'platlib', 'scripts', 'headers', 'data' and 'namespace'. The corresponding value is the absolute path of that category for this distribution, and takes into account any paths selected by the user at installation time (e.g. via command-line arguments). In the case of the 'namespace' key, this would be a list of absolute paths for the roots of namespace packages in this distribution. The first time this property is accessed, the relevant information is read from the SHARED file in the .dist-info directory. """ result = {} shared_path = os.path.join(self.path, 'SHARED') if os.path.isfile(shared_path): with codecs.open(shared_path, 'r', encoding='utf-8') as f: lines = f.read().splitlines() for line in lines: key, value = line.split('=', 1) if key == 'namespace': result.setdefault(key, []).append(value) else: result[key] = value return result
def compute_dt_dist(docs, labels, tags, model, max_len, batch_size, pad_id, idxvocab, output_file): #generate batches num_batches = int(math.ceil(float(len(docs)) / batch_size)) dt_dist = [] t = [] combined = [] docid = 0 for i in xrange(num_batches): x, _, _, t, s = get_batch_doc(docs, labels, tags, i, max_len, cf.tag_len, batch_size, pad_id) attention, mean_topic = sess.run([model.attention, model.mean_topic], {model.doc: x, model.tag: t}) dt_dist.extend(attention[:s]) if debug: for si in xrange(s): d = x[si] print "\n\nDoc", docid, "=", " ".join([idxvocab[item] for item in d if (item != pad_id)]) sorted_dist = matutils.argsort(attention[si], reverse=True) for ti in sorted_dist: print "Topic", ti, "=", attention[si][ti] docid += 1 np.save(open(output_file, "w"), dt_dist)
def gen_sent_on_topic(idxvocab, vocabxid, start_symbol, end_symbol, cf): output = codecs.open(args.gen_sent_on_topic, "w", "utf-8") topics, entropy = tm.get_topics(sess, topn=topn) with tf.variable_scope("model", reuse=True, initializer=initializer): mgen = LM(is_training=False, vocab_size=len(idxvocab), batch_size=1, num_steps=1, config=cf, \ reuse_conv_variables=True) for t in range(cf.topic_number): output.write("\n" + "="*100 + "\n") output.write("Topic " + str(t) + ":\n") output.write(" ".join([ idxvocab[item] for item in topics[t] ]) + "\n\n") output.write("\nSentence generation (greedy; argmax):" + "\n") s = mgen.generate_on_topic(sess, t, vocabxid[start_symbol], 0, cf.lm_sent_len+10, vocabxid[end_symbol]) output.write("[0] " + " ".join([ idxvocab[item] for item in s ]) + "\n") for temp in gen_temps: output.write("\nSentence generation (random; temperature = " + str(temp) + "):\n") for i in xrange(gen_num): s = mgen.generate_on_topic(sess, t, vocabxid[start_symbol], temp, cf.lm_sent_len+10, \ vocabxid[end_symbol]) output.write("[" + str(i) + "] " + " ".join([ idxvocab[item] for item in s ]) + "\n")
def write_predictions(self, inputs): ''' Outputs predictions in a file named <model_name_prefix>.predictions. ''' predictions = numpy.argmax(self.model.predict(inputs), axis=1) test_output_file = open("%s.predictions" % self.model_name_prefix, "w") for input_indices, prediction in zip(inputs, predictions): # The predictions are indices of words in padded sentences. We need to readjust them. padding_length = 0 for index in input_indices: if numpy.all(index == 0): padding_length += 1 else: break prediction = prediction - padding_length + 1 # +1 because the indices start at 1. print >>test_output_file, prediction
def process_train_data(self, input_file, onto_aware): print >>sys.stderr, "Reading training data" label_ind = [] tagged_sentences = [] for line in open(input_file): lnstrp = line.strip() label, tagged_sentence = lnstrp.split("\t") if label not in self.label_map: self.label_map[label] = len(self.label_map) label_ind.append(self.label_map[label]) tagged_sentences.append(tagged_sentence) # Shuffling so that when Keras does validation split, it is not always at the end. sentences_and_labels = zip(tagged_sentences, label_ind) random.shuffle(sentences_and_labels) tagged_sentences, label_ind = zip(*sentences_and_labels) print >>sys.stderr, "Indexing training data" train_inputs = self.data_processor.prepare_paired_input(tagged_sentences, onto_aware=onto_aware, for_test=False, remove_singletons=True) train_labels = self.data_processor.make_one_hot(label_ind) return train_inputs, train_labels
def process_test_data(self, input_file, onto_aware, is_labeled=True): if not self.model: raise RuntimeError, "Model not trained yet!" print >>sys.stderr, "Reading test data" label_ind = [] tagged_sentences = [] for line in open(input_file): lnstrp = line.strip() if is_labeled: label, tagged_sentence = lnstrp.split("\t") if label not in self.label_map: self.label_map[label] = len(self.label_map) label_ind.append(self.label_map[label]) else: tagged_sentence = lnstrp tagged_sentences.append(tagged_sentence) print >>sys.stderr, "Indexing test data" # Infer max sentence length if the model is trained input_shape = self.model.get_input_shape_at(0)[0] # take the shape of the first of two inputs at 0. sentlenlimit = input_shape[1] # (num_sentences, num_words, num_senses, num_hyps) test_inputs = self.data_processor.prepare_paired_input(tagged_sentences, onto_aware=onto_aware, sentlenlimit=sentlenlimit, for_test=True) test_labels = self.data_processor.make_one_hot(label_ind) return test_inputs, test_labels
def test_zipfile_attributes(): # With the change from ZipFile.write() to .writestr(), we need to manually # set member attributes. with temporary_directory() as tempdir: files = (('foo', 0o644), ('bar', 0o755)) for filename, mode in files: path = os.path.join(tempdir, filename) with codecs.open(path, 'w', encoding='utf-8') as fp: fp.write(filename + '\n') os.chmod(path, mode) zip_base_name = os.path.join(tempdir, 'dummy') zip_filename = wheel.archive.make_wheelfile_inner( zip_base_name, tempdir) with readable_zipfile(zip_filename) as zf: for filename, mode in files: info = zf.getinfo(os.path.join(tempdir, filename)) assert info.external_attr == (mode | 0o100000) << 16 assert info.compress_type == zipfile.ZIP_DEFLATED
def copy_header(path): encoding = _get_encoding(path) try: file = codecs.open(path, "r", encoding) except: pass else: for row in file: if not row or row[0] != ';': break row = row.strip(" \n") if row == ';; okuri-ari entries.': break print(row) file.close() # 2????????????????????????
def load(self, filename): """ ????????? ????? ? ?? ?? TrieNode??: { ?: [(??_1???_1), (??_2???_2)..], ... } ?? key = ? , value = [(??_1???_1), (??_2???_2)..] """ with codecs.open(filename, 'r', 'utf-8') as f: for line in f.readlines(): items = line.strip().split() if len(items) == 3: self.setdefault(items[0], [] ).append((int(items[1]), int(items[2]))) return True
def process_file(self, filename, out_filename=None): """ ????????????????? """ results = {'words': [], 'tags': []} with codecs.open(filename, 'r', 'utf-8') as input_file: for line in input_file: print('PROCESS LINE:{}'.format(line)) result = self.process(line.strip()) print(self.format_result(result)) results['words'].extend(result['words']) results['tags'].extend(result['tags']) if out_filename is None: return results else: with codecs.open(out_filename, 'w', 'utf-8') as output_file: output_file.write(self.format_result(results)) output_file.write('\n')
def get_sqls(self): """This function extracts sqls from the java files with mybatis sqls. Returns: A list of :class:`SQL`. For example: [SQL('', u'select a.id, b.name from db.ac a join db.bc b on a.id=b.id or a.id=b.iid where a.cnt > 10')] """ sqls = [] for root, dirs, files in os.walk(self.dir): for file in files: if not file.endswith('.java'): continue with codecs.open(os.path.join(root, file), 'r', encoding=self.encoding) as f: sqls.extend(MybatisInlineSqlExtractor.get_selects_from_text(MybatisInlineSqlExtractor.remove_comment(f.read()))) return sqls
def get_sqls(self): """This function extracts sqls from mysql general log file. Returns: A list of :class:`SQL`. For example: [SQL('', u'select a.id, b.name from db.ac a join db.bc b on a.id=b.id or a.id=b.iid where a.cnt > 10')] """ general_log = open(self.log_path) log = GeneralQueryLog(general_log) session_db_map = {} sqls = [] for entry in log: if entry['command'] == 'Connect': m = re.search('\s+on\s(?P<name>\w+)', entry['argument']) if m: session_db_map[entry['session_id']] = m.groupdict()['name'].strip() elif entry['command'] == 'Init DB': session_db_map[entry['session_id']] = entry['argument'].strip() elif entry['command'] == 'Query': sql = entry['argument'] if sql.strip()[:6].lower() == 'select': yield SQL(session_db_map.get(entry['session_id'], ''), sql)
def __init__(self, filename, mode='a', encoding=None, delay=0): """ Open the specified file and use it as the stream for logging. """ #keep the absolute path, otherwise derived classes which use this #may come a cropper when the current directory changes if codecs is None: encoding = None self.baseFilename = os.path.abspath(filename) self.mode = mode self.encoding = encoding if delay: #We don't open the stream, but we still need to call the #Handler constructor to set level, formatter, lock etc. Handler.__init__(self) self.stream = None else: StreamHandler.__init__(self, self._open())
def test_save_svgz_filename(): import gzip qr = segno.make_qr('test') f = tempfile.NamedTemporaryFile('wb', suffix='.svgz', delete=False) f.close() qr.save(f.name) f = open(f.name, mode='rb') expected = b'\x1f\x8b\x08' # gzip magic number val = f.read(len(expected)) f.close() f = gzip.open(f.name) try: content = f.read(6) finally: f.close() os.unlink(f.name) assert expected == val assert b'<?xml ' == content
def save(self): statCache = open(self.statCacheFilePath, 'w') self.cache['version'] = self.cacheVersion self.cache['date'] = self.startDate if not self.cache.has_key('players'): self.cache['players'] = {} self.cache['players'][self.playerName] = { 'battles': [] if self.fastCache else self.battles, 'account': self.account, 'accountTanks': self.accountTanks, 'session': self.session, 'impact': self.impact, 'tanks': self.tanks } if self.fastCache: statCache.write(json.dumps(self.cache)) else: statCache.write(json.dumps(self.cache, sort_keys=True, indent=4, separators=(',', ': '))) statCache.close()
def _readTxt(fname): '''Returns array of words and word embedding matrix ''' words, vectors = [], [] hook = codecs.open(fname, 'r', 'utf-8') # get summary info about vectors file (numWords, dim) = (int(s.strip()) for s in hook.readline().split()) for line in hook: chunks = line.split() word, vector = chunks[0].strip(), np.array([float(n) for n in chunks[1:]]) words.append(word) vectors.append(vector) hook.close() assert len(words) == numWords for v in vectors: assert len(v) == dim return (words, vectors)
def read(analogy_file, setting, strings_only=False): multi_b = setting == settings.ALL_INFO multi_d = setting in [settings.ALL_INFO, settings.MULTI_ANSWER] analogies = {} with codecs.open(analogy_file, 'r', 'utf-8') as stream: cur_relation, cur_analogies = None, [] for line in stream: # relation separators if line[0] == '#': if cur_relation: analogies[cur_relation] = cur_analogies cur_relation = line[2:].strip() cur_analogies = [] # everything else is an analogy else: analogy = _parseLine(line, multi_b, multi_d, strings_only) cur_analogies.append(analogy) analogies[cur_relation] = cur_analogies return analogies
def readme(path='README.rst'): """Try to read README.rst or return empty string if failed. :param str path: Path to README file. :return: File contents. :rtype: str """ path = os.path.realpath(os.path.join(os.path.dirname(__file__), path)) handle = None url_prefix = 'https://raw.githubusercontent.com/Robpol86/{name}/v{version}/'.format(name=NAME, version=VERSION) try: handle = codecs.open(path, encoding='utf-8') return handle.read(131072).replace('.. image:: docs', '.. image:: {0}docs'.format(url_prefix)) except IOError: return '' finally: getattr(handle, 'close', lambda: None)()
def _download(args): url, folderName, index = args session = setupSession() try: # time out is another parameter tuned # fit for the network about 10Mb image = session.get(url, timeout = 5) imageName = str(index) with open(os.path.join(folderName, imageName),'wb') as fout: fout.write(image.content) fileExtension = imghdr.what(os.path.join(folderName, imageName)) if fileExtension is None: os.remove(os.path.join(folderName, imageName)) else: newName = imageName + '.' + str(fileExtension) os.rename(os.path.join(folderName, imageName), os.path.join(folderName, newName)) except Exception as e: print ("failed to download one pages with url of " + str(url)) # basic funciton to get id list
def __init__(self): self.file = codecs.open('article.json', 'w', encoding="utf-8")
def __init__(self): self.file = open('articleexport.json', 'wb') self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False) self.exporter.start_exporting()
def handle(self, *args, **options): schema = getattr(settings, 'SWAGGER_SCHEMA', None) module = getattr(settings, 'SWAGGER_MODULE', None) if not schema: raise ImproperlyConfigured('You have to provide SWAGGER_SCHEMA setting pointing to desired schema') if not module: raise ImproperlyConfigured('You have to specify desired controller module name in SWAGGER_MODULE setting') router = SwaggerRouter() print('Inspecting available controllers...') router.update(True) router.process() print() print('Following classes and methods are going to be generated:') enum = router.get_enum() for name in enum: print("{} : {}".format(name, [x['method'] for x in enum[name]['methods']])) if(options['generate']): template = Template() filename = module.split('.')[-1] + '.py' structure = [{ 'name' : name, 'data' : data } for name, data in six.iteritems(enum)] print('Generating handlers ({})...'.format(filename)) with codecs.open(filename, 'w', 'utf-8') as f: f.write(template.render(template_name = 'view.jinja', names = structure)) print('Done.') else: print() print('Use --generate option to create them')
def load_constraints(self, constraints_filepath): """ This methods reads a collection of constraints from the specified file, and returns a set with all constraints for which both of their constituent words are in the specified vocabulary. """ constraints_filepath.strip() constraints = set() with codecs.open(constraints_filepath, "r", "utf-8") as f: for line in f: word_pair = line.split() if word_pair[0] in self.vocabulary and word_pair[1] in self.vocabulary and word_pair[0] != word_pair[1]: constraints |= {(self.vocab_index[word_pair[0]], self.vocab_index[word_pair[1]])} return constraints
def load_word_vectors(file_destination): """ This method loads the word vectors from the supplied file destination. It loads the dictionary of word vectors and prints its size and the vector dimensionality. """ print "Loading pretrained word vectors from", file_destination word_dictionary = {} try: f = codecs.open(file_destination, 'r', 'utf-8') for line in f: line = line.split(" ", 1) key = unicode(line[0].lower()) word_dictionary[key] = numpy.fromstring(line[1], dtype="float32", sep=" ") except: print "Word vectors could not be loaded from:", file_destination return {} print len(word_dictionary), "vectors loaded from", file_destination return word_dictionary
def print_word_vectors(word_vectors, write_path): """ This function prints the collection of word vectors to file, in a plain textual format. """ f_write = codecs.open(write_path, 'w', 'utf-8') for key in word_vectors: print >>f_write, key, " ".join(map(unicode, numpy.round(word_vectors[key], decimals=6))) print "Printed", len(word_vectors), "word vectors to:", write_path
def read_relfile(filename): f = codecs.open(filename, "r", "utf-8") rels = {} for line in f: if line.find("\t") > 0: rel_data = line.split("\t") if rel_data[1].strip() == "rst": rels[rel_data[0].strip()+"_r"]="rst" elif rel_data[1].strip() == "multinuc": rels[rel_data[0].strip()+"_m"]="multinuc" return rels
def rs3topng(rs3_filepath, png_filepath=None): """Convert a RS3 file into a PNG image of the RST tree. If no output filename is given, the PNG image is returned as a string (which is useful for embedding). """ try: from selenium import webdriver from selenium.common.exceptions import WebDriverException except ImportError: raise ImportError( 'Please install selenium: pip install selenium') html_str = rs3tohtml(rs3_filepath) temp = tempfile.NamedTemporaryFile(suffix='.html', delete=False) temp.write(html_str.encode('utf8')) temp.close() try: driver = webdriver.PhantomJS() except WebDriverException as err: raise WebDriverException( 'Please install phantomjs: http://phantomjs.org/\n' + err.msg) driver.get(temp.name) os.unlink(temp.name) png_str = driver.get_screenshot_as_png() if png_filepath: with open(png_filepath, 'w') as png_file: png_file.write(png_str) else: return png_str
def cli(argv=sys.argv[1:]): parser = argparse.ArgumentParser( description="Convert an RS3 file into an HTML file containing the RST tree.") parser.add_argument('rs3_file') parser.add_argument('output_file', nargs='?') parser.add_argument( '-f', '--output-format', nargs='?', default='html', help="output format: html (default), png") parser.add_argument( '-d', '--debug', action='store_true', help="output format: html (default), png") args = parser.parse_args(argv) if args.debug: import pudb; pudb.set_trace() if args.output_format == 'png': if args.output_file: rs3topng(args.rs3_file, args.output_file) sys.exit(0) else: sys.stderr.write("No PNG output file given.\n") sys.exit(1) if args.output_file: with codecs.open(args.output_file, 'w', 'utf8') as outfile: outfile.write(rs3tohtml(args.rs3_file)) else: sys.stdout.write(rs3tohtml(args.rs3_file).encode('utf8'))
def alias_script(new_Locator): with codecs.open(textfile, 'r', encoding='UTF-8') as f: for line in f: #array.append(line) x= line.split(";") #Starting value="<alias_def>\n" new_Locator.write(value) for name in x[1:11]: if len(name)>1: try: name=name.replace("&","und") except: continue new_Locator.write("<alt>" + name + "</alt> \n") laenge=len(name) z=4 while z <= laenge: try: value1="<alt>"+name[0:z]+"</alt>\n" new_Locator.write(value1) except: continue z=z+1 value="</alias_def>\n" new_Locator.write(value)
def gen_sent_on_doc(docs, tags, idxvocab, vocabxid, start_symbol, end_symbol, cf): topics, _ = tm.get_topics(sess, topn=topn) topics = [ " ".join([idxvocab[w] for w in t]) for t in topics ] doc_text = [ item.replace("\t", "\n") for item in codecs.open(args.input_doc, "r", "utf-8").readlines() ] output = codecs.open(args.gen_sent_on_doc, "w", "utf-8") with tf.variable_scope("model", reuse=True, initializer=initializer): mgen = LM(is_training=False, vocab_size=len(idxvocab), batch_size=1, num_steps=1, config=cf, \ reuse_conv_variables=True) for d in range(len(docs)): output.write("\n" + "="*100 + "\n") output.write("Doc " + str(d) +":\n") output.write(doc_text[d]) doc, _, _, t, _ = get_batch_doc(docs, None, tags, d, cf.doc_len, cf.tag_len, 1, vocabxid[pad_symbol]) best_topics, best_words = mgen.get_topics_on_doc(sess, doc, t, topn) output.write("\nRepresentative topics:\n") output.write("\n".join([ ("[%.3f] %s: %s" % (item[1],str(item[0]).zfill(3),topics[item[0]])) \ for item in best_topics ]) + "\n") output.write("\nRepresentative words:\n") output.write("\n".join([ ("[%.3f] %s" % (item[1], idxvocab[item[0]])) for item in best_words ]) + "\n") output.write("\nSentence generation (greedy; argmax):" + "\n") s = mgen.generate_on_doc(sess, doc, t, vocabxid[start_symbol], 0, cf.lm_sent_len+10, vocabxid[end_symbol]) output.write("[0] " + " ".join([ idxvocab[item] for item in s ]) + "\n") for temp in gen_temps: output.write("\nSentence generation (random; temperature = " + str(temp) + "):\n") for i in xrange(gen_num): s = mgen.generate_on_doc(sess, doc, t, vocabxid[start_symbol], temp, cf.lm_sent_len+10, \ vocabxid[end_symbol]) output.write("[" + str(i) + "] " + " ".join([ idxvocab[item] for item in s ]) + "\n") ###### #main# ###### #load the vocabulary
def gen_vocab(dummy_symbols, corpus, stopwords, vocab_minfreq, vocab_maxfreq, verbose): idxvocab = [] vocabxid = defaultdict(int) vocab_freq = defaultdict(int) for line_id, line in enumerate(codecs.open(corpus, "r", "utf-8")): for word in line.strip().split(): vocab_freq[word] += 1 if line_id % 1000 == 0 and verbose: sys.stdout.write(str(line_id) + " processed\r") sys.stdout.flush() #add in dummy symbols into vocab for s in dummy_symbols: update_vocab(s, idxvocab, vocabxid) #remove low fequency words for w, f in sorted(vocab_freq.items(), key=operator.itemgetter(1), reverse=True): if f < vocab_minfreq: break else: update_vocab(w, idxvocab, vocabxid) #ignore stopwords, frequent words and symbols for the document input for topic model stopwords = set([item.strip().lower() for item in open(stopwords)]) freqwords = set([item[0] for item in sorted(vocab_freq.items(), key=operator.itemgetter(1), \ reverse=True)[:int(float(len(vocab_freq))*vocab_maxfreq)]]) #ignore top N% most frequent words for topic model alpha_check = re.compile("[a-zA-Z]") symbols = set([ w for w in vocabxid.keys() if ((alpha_check.search(w) == None) or w.startswith("'")) ]) ignore = stopwords | freqwords | symbols | set(dummy_symbols) | set(["n't"]) ignore = set([vocabxid[w] for w in ignore if w in vocabxid]) return idxvocab, vocabxid, ignore
def read(fname): file_path = os.path.join(os.path.dirname(__file__), fname) return codecs.open(file_path, encoding='utf-8').read()
def process_data(self, input_file, onto_aware, for_test=False): ''' Reads an input file and makes input for training or testing. ''' dataset_type = "test" if for_test else "training" print >>sys.stderr, "Reading %s data" % dataset_type label_ind = [] tagged_sentences = [] max_sentence_length = 0 all_sentence_lengths = [] for line in open(input_file): lnstrp = line.strip() label, tagged_sentence = lnstrp.split("\t") sentence_length = len(tagged_sentence.split()) all_sentence_lengths.append(sentence_length) if sentence_length > max_sentence_length: max_sentence_length = sentence_length label_ind.append(int(label)) tagged_sentences.append(tagged_sentence) if for_test: if not self.model: raise RuntimeError("Model not trained yet!") input_shape = self.model.get_input_shape_at(0) # (num_sentences, num_words, ...) sentlenlimit = input_shape[1] else: sentlenlimit = max_sentence_length # We need to readjust the labels because padding would affect the sentence indices. for i in range(len(label_ind)): length = all_sentence_lengths[i] label_ind[i] += sentlenlimit - length if not for_test: # Shuffling so that when Keras does validation split, it is not always at the end. sentences_and_labels = zip(tagged_sentences, label_ind) random.shuffle(sentences_and_labels) tagged_sentences, label_ind = zip(*sentences_and_labels) print >>sys.stderr, "Indexing %s data" % dataset_type inputs = self.data_processor.prepare_input(tagged_sentences, onto_aware=onto_aware, sentlenlimit=sentlenlimit, for_test=for_test, remove_singletons=False) labels = self.data_processor.make_one_hot(label_ind) return inputs, labels
def print_attention_values(self, input_file, test_inputs, output_file): sent_attention_outputs = self.get_attention(test_inputs) tagged_sentences = [x.strip().split("\t")[1] for x in codecs.open(input_file).readlines()] outfile = codecs.open(output_file, "w", "utf-8") full_json_struct = [] for sent_attention, tagged_sentence in zip(sent_attention_outputs, tagged_sentences): sent_json = {} sent_json["input"] = tagged_sentence sent_json["tokens"] = [] tagged_words = tagged_sentence.split() for tagged_word, word_attention in zip(tagged_words, sent_attention): token_json = {} token_json["surface_form"] = tagged_word token_json["senses"] = [] for sense_num, sense_attention in enumerate(word_attention): if len(sense_attention) == 0: continue sense_json = {} sense_json["id"] = sense_num sense_json["hypernyms"] = [] for hyp_name, hyp_att in sense_attention: if isinstance(hyp_att, tuple): # Averaging forward and backward attention sense_json["hypernyms"].append({hyp_name: {"forward": float(hyp_att[0]), "backward": float(hyp_att[1])}}) else: sense_json["hypernyms"].append({hyp_name: float(hyp_att)}) token_json["senses"].append(sense_json) sent_json["tokens"].append(token_json) full_json_struct.append(sent_json) print >>outfile, json.dumps(full_json_struct, indent=2) outfile.close()
def load_model(self, epoch=None): ''' Loads a saved model. If epoch id is provided, will load the corresponding model. Or else, will load the best model. ''' if not epoch: self.model = load_model("%s.model" % self.model_name_prefix, custom_objects=self.custom_objects) else: self.model = load_model("%s_%d.model" % (self.model_name_prefix, epoch), custom_objects=self.custom_objects) self.data_processor = pickle.load(open("%s.dataproc" % self.model_name_prefix, "rb")) self.label_map = pickle.load(open("%s.labelmap" % self.model_name_prefix, "rb"))
def print_attention_values(self, input_file, test_inputs, output_file): onto_aware = True sent1_attention_outputs = self.get_attention(test_inputs[0]) sent2_attention_outputs = self.get_attention(test_inputs[1]) tagged_sentences = [x.strip().split("\t")[1] for x in codecs.open(input_file).readlines()] outfile = codecs.open(output_file, "w", "utf-8") for sent1_attention, sent2_attention, tagged_sentence in zip(sent1_attention_outputs, sent2_attention_outputs, tagged_sentences): print >>outfile, tagged_sentence print >>outfile, "Sentence 1:" for word_attention in sent1_attention: for sense_attention in word_attention: print >>outfile, " ".join(["%s:%f" % (hyp, hyp_att) for hyp, hyp_att in sense_attention]) print >>outfile print >>outfile, "\nSentence 2:" for word_attention in sent2_attention: for sense_attention in word_attention: print >>outfile, " ".join(["%s:%f" % (hyp, hyp_att) for hyp, hyp_att in sense_attention]) print >>outfile outfile.close()
def fetch_all_transitions(self, language, ngram_length): """ Generate a dict of counts for transitions for all n-grams in the language word list """ wordlist = os.path.join(os.path.dirname(__file__), "wordlists/{0}.txt".format(language)) if not os.path.exists(wordlist): raise SystemError("Language '{0}' does not exist".format(language)) all_grams = [] with codecs.open(wordlist, 'r', encoding='utf-8') as f: for line in f: words = line.strip('\n').lower().split() ngrams = reduce(lambda x, y: x + y, map(lambda word: self.find_ngrams(word, ngram_length), words)) all_grams += ngrams return dict(Counter(all_grams))
def _init_posix(vars): """Initialize the module as appropriate for POSIX systems.""" # load the installed Makefile: makefile = get_makefile_filename() try: _parse_makefile(makefile, vars) except IOError as e: msg = "invalid Python installation: unable to open %s" % makefile if hasattr(e, "strerror"): msg = msg + " (%s)" % e.strerror raise IOError(msg) # load the installed pyconfig.h: config_h = get_config_h_filename() try: with open(config_h) as f: parse_config_h(f, vars) except IOError as e: msg = "invalid Python installation: unable to open %s" % config_h if hasattr(e, "strerror"): msg = msg + " (%s)" % e.strerror raise IOError(msg) # On AIX, there are wrong paths to the linker scripts in the Makefile # -- these paths are relative to the Python source, but when installed # the scripts are in another directory. if _PYTHON_BUILD: vars['LDSHARED'] = vars['BLDSHARED']
def write_exports(self, exports): """ Write a dictionary of exports to a file in .ini format. :param exports: A dictionary of exports, mapping an export category to a list of :class:`ExportEntry` instances describing the individual export entries. """ rf = self.get_distinfo_file(EXPORTS_FILENAME) with open(rf, 'w') as f: write_exports(exports, f)
def write_installed_files(self, paths, prefix, dry_run=False): """ Writes the ``RECORD`` file, using the ``paths`` iterable passed in. Any existing ``RECORD`` file is silently overwritten. prefix is used to determine when to write absolute paths. """ prefix = os.path.join(prefix, '') base = os.path.dirname(self.path) base_under_prefix = base.startswith(prefix) base = os.path.join(base, '') record_path = self.get_distinfo_file('RECORD') logger.info('creating %s', record_path) if dry_run: return None with CSVWriter(record_path) as writer: for path in paths: if os.path.isdir(path) or path.endswith(('.pyc', '.pyo')): # do not put size and hash, as in PEP-376 hash_value = size = '' else: size = '%d' % os.path.getsize(path) with open(path, 'rb') as fp: hash_value = self.get_hash(fp.read()) if path.startswith(base) or (base_under_prefix and path.startswith(prefix)): path = os.path.relpath(path, base) writer.writerow((path, hash_value, size)) # add the RECORD file itself if record_path.startswith(base): record_path = os.path.relpath(record_path, base) writer.writerow((record_path, '', '')) return record_path
def check_installed_files(self): """ Checks that the hashes and sizes of the files in ``RECORD`` are matched by the files themselves. Returns a (possibly empty) list of mismatches. Each entry in the mismatch list will be a tuple consisting of the path, 'exists', 'size' or 'hash' according to what didn't match (existence is checked first, then size, then hash), the expected value and the actual value. """ mismatches = [] base = os.path.dirname(self.path) record_path = self.get_distinfo_file('RECORD') for path, hash_value, size in self.list_installed_files(): if not os.path.isabs(path): path = os.path.join(base, path) if path == record_path: continue if not os.path.exists(path): mismatches.append((path, 'exists', True, False)) elif os.path.isfile(path): actual_size = str(os.path.getsize(path)) if size and actual_size != size: mismatches.append((path, 'size', size, actual_size)) elif hash_value: if '=' in hash_value: hasher = hash_value.split('=', 1)[0] else: hasher = None with open(path, 'rb') as f: actual_hash = self.get_hash(f.read(), hasher) if actual_hash != hash_value: mismatches.append((path, 'hash', hash_value, actual_hash)) return mismatches
def list_installed_files(self): """ Iterates over the ``installed-files.txt`` entries and returns a tuple ``(path, hash, size)`` for each line. :returns: a list of (path, hash, size) """ def _md5(path): f = open(path, 'rb') try: content = f.read() finally: f.close() return hashlib.md5(content).hexdigest() def _size(path): return os.stat(path).st_size record_path = os.path.join(self.path, 'installed-files.txt') result = [] if os.path.exists(record_path): with codecs.open(record_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() p = os.path.normpath(os.path.join(self.path, line)) # "./" is present as a marker between installed files # and installation metadata files if not os.path.exists(p): logger.warning('Non-existent file: %s', p) if p.endswith(('.pyc', '.pyo')): continue #otherwise fall through and fail if not os.path.isdir(p): result.append((p, _md5(p), _size(p))) result.append((record_path, None, None)) return result
def list_distinfo_files(self, absolute=False): """ Iterates over the ``installed-files.txt`` entries and returns paths for each line if the path is pointing to a file located in the ``.egg-info`` directory or one of its subdirectories. :parameter absolute: If *absolute* is ``True``, each returned path is transformed into a local absolute path. Otherwise the raw value from ``installed-files.txt`` is returned. :type absolute: boolean :returns: iterator of paths """ record_path = os.path.join(self.path, 'installed-files.txt') skip = True with codecs.open(record_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line == './': skip = False continue if not skip: p = os.path.normpath(os.path.join(self.path, line)) if p.startswith(self.path): if absolute: yield p else: yield line
def read(self, filepath): """Read the metadata values from a file path.""" fp = codecs.open(filepath, 'r', encoding='utf-8') try: self.read_file(fp) finally: fp.close()
def write(self, filepath, skip_unknown=False): """Write the metadata fields to filepath.""" fp = codecs.open(filepath, 'w', encoding='utf-8') try: self.write_file(fp, skip_unknown) finally: fp.close()