我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.getincrementaldecoder()。
def stream_decode_response_unicode(iterator, r): """Stream decodes a iterator.""" if r.encoding is None: for item in iterator: yield item return decoder = codecs.getincrementaldecoder(r.encoding)(errors='replace') for chunk in iterator: rv = decoder.decode(chunk) if rv: yield rv rv = decoder.decode(b'', final=True) if rv: yield rv
def posix_shell(chan,channel,log_name=None,width=90,height=40): from OpsManage.asgi import channel_layer stdout = list() begin_time = time.time() last_write_time = {'last_activity_time':begin_time} try: chan.settimeout(0.0) while True: try: x = u(chan.recv(1024)) if len(x) == 0: channel_layer.send(channel, {'text': json.dumps(['disconnect',smart_unicode('\r\n*** EOF\r\n')]) }) break now = time.time() delay = now - last_write_time['last_activity_time'] last_write_time['last_activity_time'] = now if x == "exit\r\n" or x == "logout\r\n" or x == 'logout': chan.close() else: stdout.append([delay,codecs.getincrementaldecoder('UTF-8')('replace').decode(x)]) channel_layer.send(channel, {'text': json.dumps(['stdout',smart_unicode(x)]) }) except socket.timeout: pass except Exception,e: channel_layer.send(channel, {'text': json.dumps(['stdout','A bug find,You can report it to me' + smart_unicode(e)]) }) finally: pass
def iter_slices_future(r, slice_length, decode_unicode=False): """Iterate over slices of a string.""" pos, string = 0, r._content if r.encoding is None: decode_unicode = False if decode_unicode: decoder = codecs.getincrementaldecoder(r.encoding)(errors='replace') if slice_length is None or slice_length <= 0: slice_length = len(string) while pos < len(string): future = Future() chunk = string[pos:pos + slice_length] if decode_unicode: chunk = decoder.decode(chunk) future.set_result(chunk) yield future pos += slice_length chunk = decoder.decode(b'', final=True) if chunk: future = Future() future.set_result(chunk) yield future
def stream_decode_response_unicode(iterator, r): """Stream decodes a iterator.""" if r.encoding is None: for item in iterator: yield item return decoder = codecs.getincrementaldecoder(r.encoding)(errors='replace') for chunk in iterator: rv = decoder.decode(chunk) if rv: yield rv rv = decoder.decode('', final=True) if rv: yield rv
def __init__(self, r=24, c=80, encoding='latin-1', encoding_errors='replace'): '''This initializes a blank screen of the given dimensions.''' self.rows = r self.cols = c self.encoding = encoding self.encoding_errors = encoding_errors if encoding is not None: self.decoder = codecs.getincrementaldecoder(encoding)(encoding_errors) else: self.decoder = None self.cur_r = 1 self.cur_c = 1 self.cur_saved_r = 1 self.cur_saved_c = 1 self.scroll_row_start = 1 self.scroll_row_end = self.rows self.w = [ [SPACE] * self.cols for _ in range(self.rows)]