我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用codecs.StreamReaderWriter()。
def main(): filenames = ParseArguments(sys.argv[1:]) # Change stderr to write with replacement characters so we don't die # if we try to print something containing non-ASCII characters. sys.stderr = codecs.StreamReaderWriter(sys.stderr, codecs.getreader('utf8'), codecs.getwriter('utf8'), 'replace') _cpplint_state.ResetErrorCounts() for filename in filenames: ProcessFile(filename, _cpplint_state.verbose_level) _cpplint_state.PrintErrorCounts() sys.exit(_cpplint_state.error_count > 0)
def test_all(self): api = ( "encode", "decode", "register", "CodecInfo", "Codec", "IncrementalEncoder", "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup", "getencoder", "getdecoder", "getincrementalencoder", "getincrementaldecoder", "getreader", "getwriter", "register_error", "lookup_error", "strict_errors", "replace_errors", "ignore_errors", "xmlcharrefreplace_errors", "backslashreplace_errors", "open", "EncodedFile", "iterencode", "iterdecode", "BOM", "BOM_BE", "BOM_LE", "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE", "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE", "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented "StreamReaderWriter", "StreamRecoder", ) self.assertEqual(sorted(api), sorted(codecs.__all__)) for api in codecs.__all__: getattr(codecs, api)
def read_with_encoding(self, filename, document, codec_info, encoding): f = None try: f = codecs.StreamReaderWriter(open(filename, 'rb'), codec_info[2], codec_info[3], 'strict') lines = f.readlines() lines = dedent_lines(lines, self.options.get('dedent')) return lines except (IOError, OSError): return [document.reporter.warning( 'Include file %r not found or reading it failed' % filename, line=self.lineno)] except UnicodeError: return [document.reporter.warning( 'Encoding %r used for reading included file %r seems to ' 'be wrong, try giving an :encoding: option' % (encoding, filename))] finally: if f is not None: f.close()
def __init__( self, file_path: str, mode: Optional[str] = 'a', encoding: Optional[str] = 'utf8', errors: Optional[str] = 'strict', buffering: Optional[int] = 1, name: Optional[str] = None, level: Optional[LogLevel] = None ): """Instantiates a new ``FileHandler`` :param file_path: the path (full or relative) to the log file :param mode: the file mode :param encoding: the file encoding :param errors: how should errors be handled :param buffering: should the line be buffered :param name: the name of the handler :param level: the minimum level of verbosity/priority of the messages this will log """ self.fh: StreamReaderWriter = codecs.open( file_path, mode=mode, encoding=encoding, errors=errors, buffering=buffering) super().__init__(name=name, level=level) self.encoding: str = encoding
def test_all(self): api = ( "encode", "decode", "register", "CodecInfo", "Codec", "IncrementalEncoder", "IncrementalDecoder", "StreamReader", "StreamWriter", "lookup", "getencoder", "getdecoder", "getincrementalencoder", "getincrementaldecoder", "getreader", "getwriter", "register_error", "lookup_error", "strict_errors", "replace_errors", "ignore_errors", "xmlcharrefreplace_errors", "backslashreplace_errors", "open", "EncodedFile", "iterencode", "iterdecode", "BOM", "BOM_BE", "BOM_LE", "BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE", "BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE", "BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented "StreamReaderWriter", "StreamRecoder", ) self.assertCountEqual(api, codecs.__all__) for api in codecs.__all__: getattr(codecs, api)
def readline(self): "Read a line from elyxer.file" self.current = self.file.readline() if not isinstance(self.file, codecs.StreamReaderWriter): self.current = self.current.decode('utf-8') if len(self.current) == 0: self.depleted = True self.current = self.current.rstrip('\n\r') self.linenumber += 1 self.mustread = False Trace.prefix = 'Line ' + unicode(self.linenumber) + ': ' if self.linenumber % 1000 == 0: Trace.message('Parsing')
def test_streamreaderwriter(self): f = io.BytesIO(b"\xc3\xbc") info = codecs.lookup("utf-8") with codecs.StreamReaderWriter(f, info.streamreader, info.streamwriter, 'strict') as srw: self.assertEqual(srw.read(), "\xfc")
def test_streamreaderwriter(self): f = StringIO.StringIO("\xc3\xbc") info = codecs.lookup("utf-8") with codecs.StreamReaderWriter(f, info.streamreader, info.streamwriter, 'strict') as srw: self.assertEqual(srw.read(), u"\xfc")
def read_with_encoding(self, filename, document, codec_info, encoding): global cache f = None try: if not self.arguments[0] in cache: f = codecs.StreamReaderWriter(urllib2.urlopen(self.arguments[0]), codec_info[2], codec_info[3], 'strict') lines = f.readlines() cache[self.arguments[0]] = lines else: lines = cache[self.arguments[0]] lines = dedent_lines(lines, self.options.get('dedent')) return lines except (IOError, OSError, urllib2.URLError): return [document.reporter.warning( 'Include file %r not found or reading it failed' % self.arguments[0], line=self.lineno)] except UnicodeError: return [document.reporter.warning( 'Encoding %r used for reading included file %r seems to ' 'be wrong, try giving an :encoding: option' % (encoding, self.arguments[0]))] finally: if f is not None: f.close()
def test_open(self): self.addCleanup(support.unlink, support.TESTFN) for mode in ('w', 'r', 'r+', 'w+', 'a', 'a+'): with self.subTest(mode), \ codecs.open(support.TESTFN, mode, 'ascii') as file: self.assertIsInstance(file, codecs.StreamReaderWriter)
def _gettextwriter(out, encoding): if out is None: import sys return sys.stdout if isinstance(out, io.TextIOBase): # use a text writer as is return out if isinstance(out, (codecs.StreamWriter, codecs.StreamReaderWriter)): # use a codecs stream writer as is return out # wrap a binary writer with TextIOWrapper if isinstance(out, io.RawIOBase): # Keep the original file open when the TextIOWrapper is # destroyed class _wrapper: __class__ = out.__class__ def __getattr__(self, name): return getattr(out, name) buffer = _wrapper() buffer.close = lambda: None else: # This is to handle passed objects that aren't in the # IOBase hierarchy, but just have a write method buffer = io.BufferedIOBase() buffer.writable = lambda: True buffer.write = out.write try: # TextIOWrapper uses this methods to determine # if BOM (for UTF-16, etc) should be added buffer.seekable = out.seekable buffer.tell = out.tell except AttributeError: pass return io.TextIOWrapper(buffer, encoding=encoding, errors='xmlcharrefreplace', newline='\n', write_through=True)
def readline(self): "Read a line from elyxer.file" self.current = self.file.readline() if not isinstance(self.file, codecs.StreamReaderWriter): self.current = self.current.decode('utf-8') if len(self.current) == 0: self.depleted = True self.current = self.current.rstrip('\n\r') self.linenumber += 1 self.mustread = False Trace.prefix = 'Line ' + str(self.linenumber) + ': ' if self.linenumber % 1000 == 0: Trace.message('Parsing')
def main(): filenames = ParseArguments(sys.argv[1:]) # Change stderr to write with replacement characters so we don't die # if we try to print something containing non-ASCII characters. sys.stderr = codecs.StreamReaderWriter(sys.stderr, codecs.getreader('utf8'), codecs.getwriter('utf8'), 'replace') _cpplint_state.ResetErrorCounts() f = open(_output_file, 'w') title = 'path,dir,' for i in range(1, 16): title += 'PFM%d' % i + ',' for i in range(1, 21): title += 'PRM%d' % i + ',' for i in range(1, 26): title += 'PLM%d' % i + ',' f.write(title) for filename in filenames: #added by Robert global _current_filenmae _current_filenmae = filename print 'current file name: ' + filename global _stat _stat = _Stat() #added by Robert End ProcessFile(filename, _cpplint_state.verbose_level, [ExtraCheckLine]) #added by Roebrt global _output_file _stat.WriteFile(filename, f) #added by Robert end f.close() _cpplint_state.PrintErrorCounts() _stat.PrintValidFiles() #added by Robert sys.exit(_cpplint_state.error_count > 0)