我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用io.open()。
def add(): if request.method == 'POST': if request.form['submit'] == 'Add': addr = request.form['addr'].lstrip().rstrip() f = io.open('blastlist.txt', 'a', encoding="utf-8") f.write(addr.decode('utf-8') + u'\r\n') f.close() return render_template('listadded.html',addr=addr) elif request.form['submit'] == 'Remove': addr = request.form['addr'].lstrip().rstrip() f = io.open('blastlist.txt', 'r', encoding="utf-8") lines = f.readlines() f.close() f = io.open('blastlist.txt', 'w', encoding="utf-8") for line in lines: if addr not in line: f.write(line.decode('utf-8')) f.close() return render_template('listremoved.html',addr=addr)
def __iter__(self): """ Read a file where each line is of the form "word1 word2 ..." Yields lists of the form [word1, word2, ...] """ if os.path.isdir(self.fname): filenames = [os.path.join(self.fname,f) for f in os.listdir(self.fname)] else: filenames = [self.fname] for filename in filenames: # with io.open(filename, encoding='utf-8') as f: with open(filename) as f: doc = f.read() for line in doc.split("\n"): #if not line: continue sent = "".join([ch for ch in line.lower() if ch not in string.punctuation]).strip().split() # sent = [word for word in line.strip().split()] sent = [self.begin] + sent + [self.end] yield sent
def __iter__(self): """ Read a file where each line is of the form "word1 word2 ..." Yields lists of the form [word1, word2, ...] """ #jfbbb if os.path.isdir(self.fname): filenames = [os.path.join(self.fname,f) for f in os.listdir(self.fname)] #else: # filenames = [self.fname] for langpath in filenames: with open(filename) as f: doc = f.read() for line in doc.split("\n"): #if not line: continue sent = "".join([ch for ch in line.lower() if ch not in string.punctuation]).strip().split() # sent = [word for word in line.strip().split()] sent = [self.begin] + sent + [self.end] yield sent
def run_script(self, script_name, namespace): script = 'scripts/' + script_name if not self.has_metadata(script): raise ResolutionError("No script named %r" % script_name) script_text = self.get_metadata(script).replace('\r\n', '\n') script_text = script_text.replace('\r', '\n') script_filename = self._fn(self.egg_info, script) namespace['__file__'] = script_filename if os.path.exists(script_filename): source = open(script_filename).read() code = compile(source, script_filename, 'exec') exec(code, namespace, namespace) else: from linecache import cache cache[script_filename] = ( len(script_text), 0, script_text.split('\n'), script_filename ) script_code = compile(script_text, script_filename, 'exec') exec(script_code, namespace, namespace)
def read_manifest(self): """Read the manifest file (named by 'self.manifest') and use it to fill in 'self.filelist', the list of files to include in the source distribution. """ log.info("reading manifest file '%s'", self.manifest) manifest = open(self.manifest, 'rb') for line in manifest: # The manifest must contain UTF-8. See #303. if six.PY3: try: line = line.decode('UTF-8') except UnicodeDecodeError: log.warn("%r not UTF-8 decodable -- skipping" % line) continue # ignore comments and blank lines line = line.strip() if line.startswith('#') or not line: continue self.filelist.append(line) manifest.close()
def uninstall_link(self): if os.path.exists(self.egg_link): log.info("Removing %s (link to %s)", self.egg_link, self.egg_base) egg_link_file = open(self.egg_link) contents = [line.rstrip() for line in egg_link_file] egg_link_file.close() if contents not in ([self.egg_path], [self.egg_path, self.setup_path]): log.warn("Link points to %s: uninstall aborted", contents) return if not self.dry_run: os.unlink(self.egg_link) if not self.dry_run: self.update_pth(self.dist) # remove any .pth link to us if self.distribution.scripts: # XXX should also check for entry point scripts! log.warn("Note: you must uninstall or replace scripts manually!")
def install_egg_scripts(self, dist): if dist is not self.dist: # Installing a dependency, so fall back to normal behavior return easy_install.install_egg_scripts(self, dist) # create wrapper scripts in the script dir, pointing to dist.scripts # new-style... self.install_wrapper_scripts(dist) # ...and old-style for script_name in self.distribution.scripts or []: script_path = os.path.abspath(convert_path(script_name)) script_name = os.path.basename(script_path) with io.open(script_path) as strm: script_text = strm.read() self.install_script(dist, script_name, script_text, script_path)
def save(self): """Write changed .pth file back to disk""" if not self.dirty: return rel_paths = list(map(self.make_relative, self.paths)) if rel_paths: log.debug("Saving %s", self.filename) lines = self._wrap_lines(rel_paths) data = '\n'.join(lines) + '\n' if os.path.islink(self.filename): os.unlink(self.filename) with open(self.filename, 'wt') as f: f.write(data) elif os.path.exists(self.filename): log.debug("Deleting empty %s", self.filename) os.unlink(self.filename) self.dirty = False
def write_parallel_text(sources, targets, output_prefix): """ Writes two files where each line corresponds to one example - [output_prefix].sources.txt - [output_prefix].targets.txt Args: sources: Iterator of source strings targets: Iterator of target strings output_prefix: Prefix for the output file """ source_filename = os.path.abspath(os.path.join(output_prefix, "sources.txt")) target_filename = os.path.abspath(os.path.join(output_prefix, "targets.txt")) with io.open(source_filename, "w", encoding='utf8') as source_file: for record in sources: source_file.write(record + "\n") print("Wrote {}".format(source_filename)) with io.open(target_filename, "w", encoding='utf8') as target_file: for record in targets: target_file.write(record + "\n") print("Wrote {}".format(target_filename))
def pngValidator(path=None, data=None, fileObj=None): """ Version 3+. This checks the signature of the image data. """ assert path is not None or data is not None or fileObj is not None if path is not None: with open(path, "rb") as f: signature = f.read(8) elif data is not None: signature = data[:8] elif fileObj is not None: pos = fileObj.tell() signature = fileObj.read(8) fileObj.seek(pos) if signature != pngSignature: return False, "Image does not begin with the PNG signature." return True, None # ------------------- # layercontents.plist # -------------------
def testInvalidLayerContentsFormat(self): # bogus self.makeUFO() path = os.path.join(self.ufoPath, "layercontents.plist") os.remove(path) with open(path, "w") as f: f.write("test") reader = UFOReader(self.ufoPath) self.assertRaises(UFOLibError, reader.getGlyphSet) # dict self.makeUFO() path = os.path.join(self.ufoPath, "layercontents.plist") os.remove(path) layerContents = { "public.default" : "glyphs", "layer 1" : "glyphs.layer 1", "layer 2" : "glyphs.layer 2", } with open(path, "wb") as f: writePlist(layerContents, f) reader = UFOReader(self.ufoPath) self.assertRaises(UFOLibError, reader.getGlyphSet) # layer contents invalid name format
def testInvalidLayerContentsFormat(self): # bogus self.makeUFO() path = os.path.join(self.ufoPath, "layercontents.plist") os.remove(path) with open(path, "w") as f: f.write("test") self.assertRaises(UFOLibError, UFOWriter, self.ufoPath) # dict self.makeUFO() path = os.path.join(self.ufoPath, "layercontents.plist") os.remove(path) layerContents = { "public.default" : "glyphs", "layer 1" : "glyphs.layer 1", "layer 2" : "glyphs.layer 2", } with open(path, "wb") as f: writePlist(layerContents, f) self.assertRaises(UFOLibError, UFOWriter, self.ufoPath) # __init__: layer contents invalid name format
def testGetGlyphSets(self): self.makeUFO() # hack contents.plist path = os.path.join(self.ufoPath, "glyphs.layer 1", "contents.plist") with open(path, "wb") as f: writePlist(dict(b="a.glif"), f) path = os.path.join(self.ufoPath, "glyphs.layer 2", "contents.plist") with open(path, "wb") as f: writePlist(dict(c="a.glif"), f) # now test writer = UFOWriter(self.ufoPath) # default expected = ["a"] result = list(writer.getGlyphSet().keys()) self.assertEqual(expected, result) # layer 1 expected = ["b"] result = list(writer.getGlyphSet("layer 1", defaultLayer=False).keys()) self.assertEqual(expected, result) # layer 2 expected = ["c"] result = list(writer.getGlyphSet("layer 2", defaultLayer=False).keys()) self.assertEqual(expected, result) # make a new font with two layers
def testNewFontThreeLayers(self): self.clearUFO() writer = UFOWriter(self.ufoPath) writer.getGlyphSet("layer 1", defaultLayer=False) writer.getGlyphSet() writer.getGlyphSet("layer 2", defaultLayer=False) writer.writeLayerContents(["layer 1", "public.default", "layer 2"]) # directories path = os.path.join(self.ufoPath, "glyphs") exists = os.path.exists(path) self.assertEqual(True, exists) path = os.path.join(self.ufoPath, "glyphs.layer 1") exists = os.path.exists(path) self.assertEqual(True, exists) path = os.path.join(self.ufoPath, "glyphs.layer 2") exists = os.path.exists(path) self.assertEqual(True, exists) # layer contents path = os.path.join(self.ufoPath, "layercontents.plist") with open(path, "rb") as f: result = readPlist(f) expected = [["layer 1", "glyphs.layer 1"], ["public.default", "glyphs"], ["layer 2", "glyphs.layer 2"]] self.assertEqual(expected, result) # add a layer to an existing font
def testRenameLayer(self): self.makeUFO() writer = UFOWriter(self.ufoPath) writer.renameGlyphSet("layer 1", "layer 3") writer.writeLayerContents(["public.default", "layer 3", "layer 2"]) # directories path = os.path.join(self.ufoPath, "glyphs") exists = os.path.exists(path) self.assertEqual(True, exists) path = os.path.join(self.ufoPath, "glyphs.layer 1") exists = os.path.exists(path) self.assertEqual(False, exists) path = os.path.join(self.ufoPath, "glyphs.layer 2") exists = os.path.exists(path) self.assertEqual(True, exists) path = os.path.join(self.ufoPath, "glyphs.layer 3") exists = os.path.exists(path) self.assertEqual(True, exists) # layer contents path = os.path.join(self.ufoPath, "layercontents.plist") with open(path, "rb") as f: result = readPlist(f) expected = [['public.default', 'glyphs'], ['layer 3', 'glyphs.layer 3'], ['layer 2', 'glyphs.layer 2']] self.assertEqual(expected, result)
def testRemoveLayer(self): self.makeUFO() writer = UFOWriter(self.ufoPath) writer.deleteGlyphSet("layer 1") writer.writeLayerContents(["public.default", "layer 2"]) # directories path = os.path.join(self.ufoPath, "glyphs") exists = os.path.exists(path) self.assertEqual(True, exists) path = os.path.join(self.ufoPath, "glyphs.layer 1") exists = os.path.exists(path) self.assertEqual(False, exists) path = os.path.join(self.ufoPath, "glyphs.layer 2") exists = os.path.exists(path) self.assertEqual(True, exists) # layer contents path = os.path.join(self.ufoPath, "layercontents.plist") with open(path, "rb") as f: result = readPlist(f) expected = [["public.default", "glyphs"], ["layer 2", "glyphs.layer 2"]] self.assertEqual(expected, result) # remove default layer
def testRemoveDefaultLayer(self): self.makeUFO() writer = UFOWriter(self.ufoPath) writer.deleteGlyphSet("public.default") # directories path = os.path.join(self.ufoPath, "glyphs") exists = os.path.exists(path) self.assertEqual(False, exists) path = os.path.join(self.ufoPath, "glyphs.layer 1") exists = os.path.exists(path) self.assertEqual(True, exists) path = os.path.join(self.ufoPath, "glyphs.layer 2") exists = os.path.exists(path) self.assertEqual(True, exists) # layer contents path = os.path.join(self.ufoPath, "layercontents.plist") with open(path, "rb") as f: result = readPlist(f) expected = [["layer 1", "glyphs.layer 1"], ["layer 2", "glyphs.layer 2"]] self.assertEqual(expected, result) # remove unknown layer
def testWrite(self): writer = UFOWriter(self.dstDir, formatVersion=2) writer.setKerningGroupConversionRenameMaps(self.downConversionMapping) writer.writeKerning(self.kerning) writer.writeGroups(self.groups) # test groups path = os.path.join(self.dstDir, "groups.plist") with open(path, "rb") as f: writtenGroups = readPlist(f) self.assertEqual(writtenGroups, self.expectedWrittenGroups) # test kerning path = os.path.join(self.dstDir, "kerning.plist") with open(path, "rb") as f: writtenKerning = readPlist(f) self.assertEqual(writtenKerning, self.expectedWrittenKerning) self.tearDownUFO()
def writeLayerInfo(self, info): if self.ufoFormatVersion < 3: raise GlifLibError("layerinfo.plist is not allowed in UFO %d." % self.ufoFormatVersion) # gather data infoData = {} for attr in list(layerInfoVersion3ValueData.keys()): if hasattr(info, attr): try: value = getattr(info, attr) except AttributeError: raise GlifLibError("The supplied info object does not support getting a necessary attribute (%s)." % attr) if value is None or (attr == 'lib' and not value): continue infoData[attr] = value # validate infoData = validateLayerInfoVersion3Data(infoData) # write file path = os.path.join(self.dirName, LAYERINFO_FILENAME) with open(path, "wb") as f: writePlist(infoData, f) # read caching
def readBytesFromPath(self, path, encoding=None): """ Returns the bytes in the file at the given path. The path must be relative to the UFO path. Returns None if the file does not exist. An encoding may be passed if needed. """ fullPath = os.path.join(self._path, path) if not self._checkForFile(fullPath): return None if os.path.isdir(fullPath): raise UFOLibError("%s is a directory." % path) if encoding: f = open(fullPath, encoding=encoding) else: f = open(fullPath, "rb", encoding=encoding) data = f.read() f.close() return data
def getReadFileForPath(self, path, encoding=None): """ Returns a file (or file-like) object for the file at the given path. The path must be relative to the UFO path. Returns None if the file does not exist. An encoding may be passed if needed. Note: The caller is responsible for closing the open file. """ fullPath = os.path.join(self._path, path) if not self._checkForFile(fullPath): return None if os.path.isdir(fullPath): raise UFOLibError("%s is a directory." % path) if encoding: f = open(fullPath, "rb", encoding=encoding) else: f = open(fullPath, "r") return f
def writeFileAtomically(text, path, encoding="utf-8"): """ Write text into a file at path. Do this sort of atomically making it harder to cause corrupt files. This also checks to see if text matches the text that is already in the file at path. If so, the file is not rewritten so that the modification date is preserved. An encoding may be passed if needed. """ if os.path.exists(path): with open(path, "r", encoding=encoding) as f: oldText = f.read() if text == oldText: return # if the text is empty, remove the existing file if not text: os.remove(path) if text: with open(path, "w", encoding=encoding) as f: f.write(text)
def consume(self, doc, payload): """ Write text to target directory, using a combination of filename and file ID as path. :param doc: Document object. :param payload: File pointer beloning to document. :type doc: ``gransk.core.document.Document`` :type payload: ``file`` """ new_filename = '%s-%s' % \ (doc.docid[0:8], document.secure_path(os.path.basename(doc.path))) if not os.path.exists(self.root): os.makedirs(self.root) new_path = os.path.join(self.root, new_filename) with io.open(new_path, 'w', encoding='utf-8') as out: out.write(doc.text) doc.meta['text_file'] = new_path
def __call__(self, value): if value is None: return value path = unicode(value) if not os.path.isabs(path): path = os.path.join(self.directory, path) try: value = open(path, self.mode) if self.buffering is None else open(path, self.mode, self.buffering) except IOError as error: raise ValueError('Cannot open {0} with mode={1} and buffering={2}: {3}'.format( value, self.mode, self.buffering, error)) return value
def downloadAllJournalArticles(skip = False, dontskip = ""): # download journal articles from a list of journal names if dontskip != "": skip = True f = io.open("../compsci_journals.txt") # all Elsevier CS journal names for l in f: if len(l) > 2: j = l.strip("\n") # skip the ones for which there are already folders, we assume for those downloading has finished if skip == True: if j.lower().replace(" ", "_") in os.listdir("../elsevier_papers_xml"): if not j == dontskip: print("Skipping journal:", j) continue print("Downloading articles for journal:", j) jurl = getJournalURL(j) downloadArticles("../elsevier_papers_xml/" + j.lower().replace(" ", "_") + "/", jurl)
def whitelist(): with io.open('static/whitelist.txt', 'r', encoding="utf-8") as f: #gets the contents of whitelist.txt so they can be displayed data = f.read().replace('@', ' [at] ').replace('.', ' [dot] ') return render_template('whitelist.html',data=data)
def tail(): if request.method == 'POST': fi = request.form['file'] if os.path.isfile(fi): n = int(request.form['n']) le = io.open(fi, 'r', encoding='utf-8') taildata = le.read()[-n:] le.close() else: taildata = "No such file." return render_template('tail.html',taildata=taildata)
def wladd(): if request.method == 'POST': addr = request.form['addr'].lstrip().rstrip() f = io.open('static/whitelist.txt', 'a', encoding="utf-8") f.write(addr.decode('utf-8') + u'\r\n') f.close() return render_template('wladd.html')
def unsub(): if request.method == 'POST': addr = request.form['addr'].lstrip().rstrip() f = io.open('unsubscribers.txt', 'a', encoding="utf-8") f.write(addr.decode('utf-8') + u'\r\n') f.close() f = io.open('static/whitelist.txt', 'r', encoding="utf-8") lines = f.readlines() f.close() f = io.open('static/whitelist.txt', 'w', encoding="utf-8") for line in lines: if addr not in line: f.write(line.decode('utf-8')) f.close() return render_template('unsubscribed.html',addr=addr)
def save(self, filename): info_dict = { "tokens":self.tokens, "strings":self.strings, "s2t":dict(self.s2t), "i2t":dict(self.i2t), "unk":self.unk, "START_TOK":self.START_TOK, "END_TOK":self.END_TOK } with open(filename, "w") as f: pickle.dump(info_dict, f)
def load(cls, filename): with open(filename, "r") as f: info_dict = pickle.load(f) v = Vocab() v.tokens = info_dict["tokens"] v.strings = info_dict["strings"] v.unk = info_dict["unk"] v.START_TOK = info_dict["START_TOK"] v.END_TOK = info_dict["END_TOK"] defaultf = (lambda :v.unk) if (v.unk is not None) else Token.not_found v.s2t = defaultdict(defaultf, info_dict["s2t"]) v.i2t = defaultdict(defaultf, info_dict["i2t"]) return v
def parse_init(): with open(os.path.join(HERE, PKG_NAME, '__init__.py')) as f: file_data = f.read() return [regex.search(file_data).group(2) for regex in (AUTHOR, DOCSTRING, VERSION)]
def read(*filenames, **kwargs): encoding = kwargs.get('encoding', 'utf-8') sep = kwargs.get('sep', '\n') buf = [] for filename in filenames: with io.open(filename, encoding=encoding) as f: buf.append(f.read()) return sep.join(buf)
def rewrite_yml(data): with io.open('toc.yml', 'w', encoding='utf8') as outfile: yaml.dump(data, outfile, default_flow_style=False, allow_unicode=True)
def embeddings_to_dict(filename): ''' :param filename: the file name of the word embeddings | file is assumed to follow this format: "word[tab]dimension 1[space]dimension 2[space]...[space]dimension 50" :return: a dictionary with keys that are words and values that are the embedding of a word ''' with io.open(filename, 'r', encoding='utf-8') as f: word_vecs = {} for line in f: line = line.strip('\n').split() word_vecs[line[0]] = np.array([float(s) for s in line[1:]]) return word_vecs
def load_test_fixture(fixture_path): path = os.path.dirname(os.path.abspath(__file__)) fixture_file = open(path + '/' + fixture_path) input = fixture_file.read() fixture_file.close() sys.stdin = StringIO(input) sys.stdout = StringIO()
def load_data(file: str): with io.open(os.path.join(__abspath__, 'test_data', file)) as afile: input_str = afile.read().replace('PATH', os.path.join(__abspath__, 'test_data')) sys.stdin = io.StringIO(input_str) sys.stdout = io.StringIO()
def run(path, quiet=False): """ Downloads all available hash files to a given path. :param path: Path to download directory :param quiet: If set to True, no progressbar is displayed """ if os.path.isdir(path): session = requests.Session() session.headers = {'User-agent': 'Mozilla/5.0 Chrome/57.0.2987.110'} max_num = max(list(map(int, re.sub(r'[\<\>]', '', '\n'.join(re.findall(r'\>[1-9][0-9]{2}\<', session.get('https://virusshare.com/hashes.4n6').text ) ) ).split('\n') ) ) ) if not quiet: p = progressbar.ProgressBar(max_value=max_num) for i in range(max_num): filename = str(i).zfill(3) + '.md5' if os.path.exists(os.path.join(path, filename)): continue if not quiet: p.update(i) url = URL + filename head = session.head(url) if head.status_code == 200: body = session.get(url, stream=True) with io.open(os.path.join(path, str(i).zfill(3) + '.md5'), mode='wb') as afile: for chunk in body.iter_content(chunk_size=1024): afile.write(b'' + chunk) body.close() else: print('Given path is not a directory.') sys.exit(1)
def run(self): searchhash = '' if self.data_type == 'hash': searchhash = self.getData() if len(searchhash) != 32: self.report({'isonvs': 'unknown', 'hash': searchhash}) elif self.data_type == 'file': filepath = self.getParam('file') hasher = hashlib.md5() with io.open(filepath, mode='rb') as afile: for chunk in iter(lambda: afile.read(65536), b''): hasher.update(chunk) searchhash = hasher.hexdigest() else: self.error('Unsupported data type.') # Read files for file in self.filelist: filepath = os.path.join(self.path, file) if not os.path.isfile(filepath): continue with io.open(filepath, 'r') as afile: for line in afile: # Skipping comments if line[0] == '#': continue if searchhash.lower() in line: self.report({'isonvs': True, 'md5': searchhash}) self.report({'isonvs': False, 'md5': searchhash})
def readme(): import io with io.open('README.rst', "r", encoding="utf-8") as f: long_description = f.read() return long_description