我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.getwriter()。
def write_exports(exports, stream): if sys.version_info[0] >= 3: # needs to be a text stream stream = codecs.getwriter('utf-8')(stream) cp = configparser.ConfigParser() for k, v in exports.items(): # TODO check k, v for valid values cp.add_section(k) for entry in v.values(): if entry.suffix is None: s = entry.prefix else: s = '%s:%s' % (entry.prefix, entry.suffix) if entry.flags: s = '%s [%s]' % (s, ', '.join(entry.flags)) cp.set(k, entry.name, s) cp.write(stream)
def main(): filenames = ParseArguments(sys.argv[1:]) # Change stderr to write with replacement characters so we don't die # if we try to print something containing non-ASCII characters. sys.stderr = codecs.StreamReaderWriter(sys.stderr, codecs.getreader('utf8'), codecs.getwriter('utf8'), 'replace') _cpplint_state.ResetErrorCounts() for filename in filenames: ProcessFile(filename, _cpplint_state.verbose_level) _cpplint_state.PrintErrorCounts() sys.exit(_cpplint_state.error_count > 0)
def write_handle(self, handle): """ Write the database to the specified file handle. """ if self.compress and _gzip_ok: try: g = gzip.GzipFile(mode="wb", fileobj=handle) except: g = handle else: g = handle self.g = codecs.getwriter("utf8")(g) self.write_xml_data() g.close() return 1
def _get_text_writer(stream, errors): # In python3, all the sys.stdout/sys.stderr streams are in text # mode. This means they expect unicode, and will encode the # unicode automatically before actually writing to stdout/stderr. # In python2, that's not the case. In order to provide a consistent # interface, we can create a wrapper around sys.stdout that will take # unicode, and automatically encode it to the preferred encoding. # That way consumers can just call get_text_writer(stream) and write # unicode to the returned stream. Note that get_text_writer # just returns the stream in the PY3 section above because python3 # handles this. # We're going to use the preferred encoding, but in cases that there is # no preferred encoding we're going to fall back to assuming ASCII is # what we should use. This will currently break the use of # PYTHONIOENCODING, which would require checking stream.encoding first, # however, the existing behavior is to only use # locale.getpreferredencoding() and so in the hope of not breaking what # is currently working, we will continue to only use that. encoding = locale.getpreferredencoding() if encoding is None: encoding = "ascii" return codecs.getwriter(encoding)(stream, errors)
def __init__(self, stream, fieldnames, encoding='utf-8', **kwds): """Initialzer. Args: stream: Stream to write to. fieldnames: Fieldnames to pass to the DictWriter. encoding: Desired encoding. kwds: Additional arguments to pass to the DictWriter. """ writer = codecs.getwriter(encoding) if (writer is encodings.utf_8.StreamWriter or writer is encodings.ascii.StreamWriter or writer is encodings.latin_1.StreamWriter or writer is encodings.cp1252.StreamWriter): self.no_recoding = True self.encoder = codecs.getencoder(encoding) self.writer = csv.DictWriter(stream, fieldnames, **kwds) else: self.no_recoding = False self.encoder = codecs.getencoder('utf-8') self.queue = cStringIO.StringIO() self.writer = csv.DictWriter(self.queue, fieldnames, **kwds) self.stream = writer(stream)
def test_incrementaldecoder(self): UTF8Writer = codecs.getwriter('utf-8') for sizehint in [None, -1] + list(range(1, 33)) + \ [64, 128, 256, 512, 1024]: istream = BytesIO(self.tstring[0]) ostream = UTF8Writer(BytesIO()) decoder = self.incrementaldecoder() while 1: data = istream.read(sizehint) if not data: break else: u = decoder.decode(data) ostream.write(u) self.assertEqual(ostream.getvalue(), self.tstring[1])
def test_streamreader(self): UTF8Writer = codecs.getwriter('utf-8') for name in ["read", "readline", "readlines"]: for sizehint in [None, -1] + list(range(1, 33)) + \ [64, 128, 256, 512, 1024]: istream = self.reader(BytesIO(self.tstring[0])) ostream = UTF8Writer(BytesIO()) func = getattr(istream, name) while 1: data = func(sizehint) if not data: break if name == "readlines": ostream.writelines(data) else: ostream.write(data) self.assertEqual(ostream.getvalue(), self.tstring[1])
def test_gb18030(self): s= io.BytesIO() c = codecs.getwriter('gb18030')(s) c.write('123') self.assertEqual(s.getvalue(), b'123') c.write('\U00012345') self.assertEqual(s.getvalue(), b'123\x907\x959') c.write('\U00012345'[0]) self.assertEqual(s.getvalue(), b'123\x907\x959') c.write('\U00012345'[1] + '\U00012345' + '\uac00\u00ac') self.assertEqual(s.getvalue(), b'123\x907\x959\x907\x959\x907\x959\x827\xcf5\x810\x851') c.write('\U00012345'[0]) self.assertEqual(s.getvalue(), b'123\x907\x959\x907\x959\x907\x959\x827\xcf5\x810\x851') self.assertRaises(UnicodeError, c.reset) self.assertEqual(s.getvalue(), b'123\x907\x959\x907\x959\x907\x959\x827\xcf5\x810\x851')
def test_encoding_cyrillic_unicode(self): log = logging.getLogger("test") #Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye) message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f' #Ensure it's written in a Cyrillic encoding writer_class = codecs.getwriter('cp1251') writer_class.encoding = 'cp1251' stream = io.BytesIO() writer = writer_class(stream, 'strict') handler = logging.StreamHandler(writer) log.addHandler(handler) try: log.warning(message) finally: log.removeHandler(handler) handler.close() # check we wrote exactly those bytes, ignoring trailing \n etc s = stream.getvalue() #Compare against what the data should be when encoded in CP-1251 self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
def open_temp(**kwargs): """Opens a new temporary file for writing with unicode interface. Parameters ---------- **kwargs the tempfile keyword arguments. See documentation for :func:`tempfile.NamedTemporaryFile`. Returns ------- file : file the opened file that accepts unicode input. """ timestr = time.strftime("_%Y%m%d_%H%M%S") if 'prefix' in kwargs: kwargs['prefix'] += timestr else: kwargs['prefix'] = timestr temp = tempfile.NamedTemporaryFile(**kwargs) return codecs.getwriter(bag_encoding)(temp, errors=bag_codec_error)