我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用codecs.EncodedFile()。
def test_all(self): api = ( "encode", "decode", "register", "CodecInfo", "Codec", "IncrementalEncoder", "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup", "getencoder", "getdecoder", "getincrementalencoder", "getincrementaldecoder", "getreader", "getwriter", "register_error", "lookup_error", "strict_errors", "replace_errors", "ignore_errors", "xmlcharrefreplace_errors", "backslashreplace_errors", "open", "EncodedFile", "iterencode", "iterdecode", "BOM", "BOM_BE", "BOM_LE", "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE", "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE", "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented "StreamReaderWriter", "StreamRecoder", ) self.assertEqual(sorted(api), sorted(codecs.__all__)) for api in codecs.__all__: getattr(codecs, api)
def test_all(self): api = ( "encode", "decode", "register", "CodecInfo", "Codec", "IncrementalEncoder", "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup", "getencoder", "getdecoder", "getincrementalencoder", "getincrementaldecoder", "getreader", "getwriter", "register_error", "lookup_error", "strict_errors", "replace_errors", "ignore_errors", "xmlcharrefreplace_errors", "backslashreplace_errors", "open", "EncodedFile", "iterencode", "iterdecode", "BOM", "BOM_BE", "BOM_LE", "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE", "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE", "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented "StreamReaderWriter", "StreamRecoder", ) self.assertCountEqual(api, codecs.__all__) for api in codecs.__all__: getattr(codecs, api)
def _get_packed_pkg_file_contents(self, override_file): try: with zipfile.ZipFile(self.package_file()) as zFile: info = find_zip_entry(zFile, override_file) file = codecs.EncodedFile(zFile.open(info, mode="rU"), "utf-8") content = io.TextIOWrapper(file, encoding="utf-8").readlines() source = "Shipped Packages" if self.installed_path is not None: source = "Installed Packages" source = os.path.join(source, self.name, override_file) mtime = datetime(*info.date_time).strftime("%Y-%m-%d %H:%M:%S") return (content, _fixPath(source), mtime) except (KeyError, FileNotFoundError): print("Error loading %s:%s; cannot find file in sublime-package" % (self.package_file(), override_file)) return None except UnicodeDecodeError: print("Error loading %s:%s; unable to decode file contents" % (self.package_file(), override_file)) return None
def load(self, file): """Load properties from an open file stream""" unicodeFile = codecs.EncodedFile(file, 'latin-1') try: lines = unicodeFile.readlines() unicodeFile.close() except IOError, e: raise self._parse(lines)
def _store(self, outFile, header=""): unicodeOutFile = codecs.EncodedFile(outFile, 'latin-1') unicodeOutFile.write(header) unicodeOutFile.write(self.getAsString())
def test_recoding(self): f = io.BytesIO() f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8") f2.write("a") f2.close() # Python used to crash on this at exit because of a refcount # bug in _codecsmodule.c # From RFC 3492
def test_basic(self): f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80') ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8') self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae') f = io.BytesIO() ef = codecs.EncodedFile(f, 'utf-8', 'latin1') ef.write(b'\xc3\xbc') self.assertEqual(f.getvalue(), b'\xfc')
def test_encodedfile(self): f = io.BytesIO(b"\xc3\xbc") with codecs.EncodedFile(f, "latin-1", "utf-8") as ef: self.assertEqual(ef.read(), b"\xfc")
def test_recoding(self): f = StringIO.StringIO() f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8") f2.write(u"a") f2.close() # Python used to crash on this at exit because of a refcount # bug in _codecsmodule.c # From RFC 3492
def test_basic(self): f = StringIO.StringIO('\xed\x95\x9c\n\xea\xb8\x80') ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8') self.assertEqual(ef.read(), '\\\xd5\n\x00\x00\xae') f = StringIO.StringIO() ef = codecs.EncodedFile(f, 'utf-8', 'latin1') ef.write('\xc3\xbc') self.assertEqual(f.getvalue(), '\xfc')
def test_encodedfile(self): f = StringIO.StringIO("\xc3\xbc") with codecs.EncodedFile(f, "latin-1", "utf-8") as ef: self.assertEqual(ef.read(), "\xfc")
def test_basic(self): f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80') ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8') self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae') f = io.BytesIO() ef = codecs.EncodedFile(f, 'utf-8', 'latin-1') ef.write(b'\xc3\xbc') self.assertEqual(f.getvalue(), b'\xfc')
def itertriples(path): ''' Iterates over an N-triples file returning triples as tuples. Parameters ---------- path : string path to N-triples file. ''' if isinstance(path, file): ntfile = path else: if path.endswith('.gz'): ntfile = GzipFile(path) else: ntfile = open(path) ntfile = EncodedFile(ntfile, 'utf-8') with closing(ntfile): for line in ntfile: if line.startswith('#'): continue # remove trailing newline and dot line = line.strip().strip('.').strip() # the first two whitespaces are guaranteed to split the line # correctly. The trailing part may be a property containing # whitespaces, so using str.split is not viable. s1 = line.find(' ') s2 = line.find(' ', s1 + 1) triple = line[:s1], line[s1 + 1:s2], line[s2 + 1:] yield triple
def main(): parser = ArgumentParser(description=__doc__) parser.add_argument('ns_file', metavar='namespaces', help='tab-separated list of namespace codes') parser.add_argument('nt_file', metavar='ntriples', help='N-Triples file') parser.add_argument('-p', '--properties', action='store_true', help='print properties') parser.add_argument('-D', '--destination', help='destination path') args = parser.parse_args() print print 'WARNING: the n-triples file must be already sorted by source,'\ ' destination!' print global namespaces namespaces = NodesIndex.readns(args.ns_file) sys.stdout = EncodedFile(sys.stdout, 'utf-8') # expand destination path, check it is not an existing file, create it in # case it does not exist args.destination = os.path.expanduser(os.path.expandvars(args.destination)) if os.path.exists(args.destination) and not os.path.isdir(args.destination): print >> sys.stderr, 'error: not a directory: '\ '{}'.format(args.destination) sys.exit(1) elif not os.path.exists(args.destination): os.mkdir(args.destination) print >> sys.stderr, 'info: created {}'.format(args.destination) try: tic = time() vertexmap, num_triples = _first_pass(args.nt_file, args.properties, args.destination) _second_pass(args.nt_file, vertexmap, num_triples, args.properties, args.destination) toc = time() etime = timedelta(seconds=round(toc - tic)) speed = num_triples / (toc - tic) print >> sys.stderr, 'info: {:d} triples processed in {} '\ '({:.2f} triple/s)'.format(num_triples, etime, speed) except IOError, e: if e.errno == errno.EPIPE: # broken pipe sys.exit(0) raise
def test_recoding(self): f = io.BytesIO() f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8") f2.write("a") f2.close() # Python used to crash on this at exit because of a refcount # bug in _codecsmodule.c self.assertTrue(f.closed) # From RFC 3492
def test_encodedfile(self): f = io.BytesIO(b"\xc3\xbc") with codecs.EncodedFile(f, "latin-1", "utf-8") as ef: self.assertEqual(ef.read(), b"\xfc") self.assertTrue(f.closed)
def part_upload_bom(request, part_id): try: part = Part.objects.get(id=part_id) except ObjectDoesNotExist: messages.error(request, "No part found with given part_id.") return HttpResponseRedirect(reverse('error')) if request.method == 'POST': form = FileForm(request.POST, request.FILES) if form.is_valid(): csvfile = request.FILES['file'] dialect = csv.Sniffer().sniff(csvfile.readline()) csvfile.open() reader = csv.reader( codecs.EncodedFile( csvfile, "utf-8"), delimiter=',', dialect=dialect) headers = reader.next() # Subpart.objects.filter(assembly_part=part).delete() for row in reader: partData = {} for idx, item in enumerate(row): partData[headers[idx]] = item if 'part_number' in partData and 'quantity' in partData: civ = full_part_number_to_broken_part( partData['part_number']) subparts = Part.objects.filter( number_class=civ['class'], number_item=civ['item'], number_variation=civ['variation']) if len(subparts) == 0: messages.info( request, "Subpart: {} doesn't exist".format( partData['part_number'])) continue subpart = subparts[0] count = partData['quantity'] if part == subpart: messages.error( request, "Recursive part association: a part cant be a subpart of itsself") return HttpResponseRedirect(reverse('error')) sp = Subpart( assembly_part=part, assembly_subpart=subpart, count=count) sp.save() else: messages.error( request, "File form not valid: {}".format( form.errors)) return HttpResponseRedirect(reverse('error')) return HttpResponseRedirect(request.META.get('HTTP_REFERER', reverse('home')))
def run(self, edit): view = self.view encoding = view.settings().get('force_encoding') if not encoding: encoding = view.settings().get('origin_encoding') file_name = view.file_name() if not encoding or encoding == 'UTF-8': encoding_cache.pop(file_name) return # remember current folded regions regions = [[x.a, x.b] for x in view.folded_regions()] if regions: view.settings().set('folded_regions', regions) vp = view.viewport_position() view.settings().set('viewport_position', [vp[0], vp[1]]) fp = None try: fp = open(file_name, 'rb') contents = codecs.EncodedFile(fp, encoding, 'UTF-8').read() except (LookupError, UnicodeEncodeError) as e: sublime.error_message(u'Can not convert file encoding of {0} to {1}, it was saved as UTF-8 instead:\n\n{2}'.format (os.path.basename(file_name), encoding, e)) return finally: if fp: fp.close() # write content to temporary file tmp_name = os.path.join(TMP_DIR, get_temp_name(file_name)) fp = open(tmp_name, 'wb') fp.write(contents) fp.close() if not get_setting(view, 'lazy_reload'): # os.rename has "Invalid cross-device link" issue os.chmod(tmp_name, os.stat(file_name)[0]) shutil.move(tmp_name, file_name) else: # copy the timestamp from original file mtime = os.path.getmtime(file_name) os.utime(tmp_name, (mtime, mtime)) encoding_cache.set(file_name, encoding) view.settings().set('prevent_detect', True) sublime.status_message('UTF8 -> {0}'.format(encoding))
def update_publisher(self, pub): """Updates the configuration information for the publisher defined by the provided Publisher object. """ if self.mirror: raise RepositoryMirrorError() if self.read_only: raise RepositoryReadOnlyError() if not self.root: raise RepositoryUnsupportedOperationError() p5ipath = os.path.join(self.root, "pub.p5i") fn = None try: dirname = os.path.dirname(p5ipath) fd, fn = tempfile.mkstemp(dir=dirname) st = None try: st = os.stat(p5ipath) except OSError as e: if e.errno != errno.ENOENT: raise if st: os.fchmod(fd, stat.S_IMODE(st.st_mode)) try: portable.chown(fn, st.st_uid, st.st_gid) except OSError as e: if e.errno != errno.EPERM: raise else: os.fchmod(fd, misc.PKG_FILE_MODE) if six.PY2: with os.fdopen(fd, "wb") as f: with codecs.EncodedFile(f, "utf-8") as ef: p5i.write(ef, [pub]) else: # we use simpleson.dump() in p5i.write(), # simplejson module will produce str objects # in Python 3, therefore fp.write() # must support str input. with open(fd, "w", encoding="utf-8") as fp: p5i.write(fp, [pub]) portable.rename(fn, p5ipath) except EnvironmentError as e: if e.errno == errno.EACCES: raise apx.PermissionsException(e.filename) elif e.errno == errno.EROFS: raise apx.ReadOnlyFileSystemException( e.filename) raise finally: if fn and os.path.exists(fn): os.unlink(fn)