我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用cStringIO.StringIO()。
def rrset_to_text(m): s = StringIO() if 'bailiwick' in m: s.write(';; bailiwick: %s\n' % m['bailiwick']) if 'count' in m: s.write(';; count: %s\n' % locale.format('%d', m['count'], True)) if 'time_first' in m: s.write(';; first seen: %s\n' % sec_to_text(m['time_first'])) if 'time_last' in m: s.write(';; last seen: %s\n' % sec_to_text(m['time_last'])) if 'zone_time_first' in m: s.write(';; first seen in zone file: %s\n' % sec_to_text(m['zone_time_first'])) if 'zone_time_last' in m: s.write(';; last seen in zone file: %s\n' % sec_to_text(m['zone_time_last'])) if 'rdata' in m: for rdata in m['rdata']: s.write('%s IN %s %s\n' % (m['rrname'], m['rrtype'], rdata)) s.seek(0) return s.read()
def vertical_flip(self): if self.empty == True: die("Attempted to call vertical_flip() on an empty BMP object!") mod_bitmap = "" rows = [] # Let's pretend it's a file to make things easy f = StringIO(self.bitmap_data) for row_num in xrange(0, self.height): rows.append(f.read(self.width * 3 + self.padding_size)) for row in rows[::-1]: mod_bitmap += row self.bitmap_data = mod_bitmap return self
def grayscale(self): """ Convert the image into a (24-bit) grayscale one, using the Y'UV method. """ # http://en.wikipedia.org/wiki/YUV Wr = 0.299 Wb = 0.114 Wg = 0.587 mod_bitmap = "" f = StringIO(self.bitmap_data) for row_num in xrange(0, self.height): for pix in xrange(0, self.width): pixel = struct.unpack("3B", f.read(3)) out_pix = chr(int(Wr * pixel[2] + Wg * pixel[1] + Wb * pixel[0])) mod_bitmap += out_pix * 3 mod_bitmap += chr(0x00) * self.padding_size f.seek(self.padding_size, 1) self.bitmap_data = mod_bitmap return self
def _try_config_test(self, logcfg, epattern, foundTest=None ): import ossie.utils.log4py.config with stdout_redirect(cStringIO.StringIO()) as new_stdout: ossie.utils.log4py.config.strConfig(logcfg,None) new_stdout.seek(0) found = [] epats=[] if type(epattern) == str: epats.append(epattern) else: epats = epattern if foundTest == None: foundTest = len(epats)*[True] for x in new_stdout.readlines(): for epat in epats: m=re.search( epat, x ) if m : found.append( True ) self.assertEqual(found, foundTest )
def _showPorts(self, ports, destfile=None): localdef_dest = False if destfile == None: localdef_dest = True destfile = cStringIO.StringIO() if ports: table = TablePrinter('Port Name', 'Port Interface') for port in ports.itervalues(): table.append(port['Port Name'], port['Port Interface']) table.write(f=destfile) else: print >>destfile, "None" if localdef_dest: pydoc.pager(destfile.getvalue()) destfile.close()
def api(self, destfile=None): localdef_dest = False if destfile == None: localdef_dest = True destfile = cStringIO.StringIO() print >>destfile, "Provides (Input) Ports ==============" self._showPorts(self._providesPortDict, destfile=destfile) print >>destfile, "\n" print >>destfile, "Uses (Output) Ports ==============" self._showPorts(self._usesPortDict, destfile=destfile) print >>destfile, "\n" if localdef_dest: pydoc.pager(destfile.getvalue()) destfile.close()
def api(self, destfile=None): localdef_dest = False if destfile == None: localdef_dest = True destfile = cStringIO.StringIO() print >>destfile, 'Allocation Properties ======' if not self._allocProps: print >>destfile, 'None' return table = TablePrinter('Property Name', '(Data Type)', 'Action') for prop in self._allocProps: table.append(prop.clean_name, '('+prop.type+')', prop.action) if prop.type in ('struct', 'structSeq'): if prop.type == 'structSeq': structdef = prop.structDef else: structdef = prop for member in structdef.members.itervalues(): table.append(' '+member.clean_name, member.type) table.write(f=destfile) if localdef_dest: pydoc.pager(destfile.getvalue()) destfile.close()
def api(self, showComponentName=True, showInterfaces=True, showProperties=True, externalPropInfo=None, destfile=None): ''' Inspect interfaces and properties for the component ''' localdef_dest = False if destfile == None: localdef_dest = True destfile = cStringIO.StringIO() className = self.__class__.__name__ if showComponentName == True: print >>destfile, className+" [" + str(self.name) + "]:" if showInterfaces == True: PortSupplier.api(self, destfile=destfile) if showProperties == True and self._properties != None: PropertySet.api(self, externalPropInfo, destfile=destfile) if localdef_dest: pydoc.pager(destfile.getvalue()) destfile.close()
def redirectSTDOUT(filename): if _DEBUG == True: print "redirectSTDOUT(): redirecting stdout/stderr to filename " + str(filename) if type(filename) == str: dirname = os.path.dirname(filename) if len(dirname) == 0 or \ (len(dirname) > 0 and os.path.isdir(dirname)): try: f = open(filename,'w') # Send stdout and stderr to provided filename sys.stdout = f sys.stderr = f except Exception, e: print "redirectSTDOUT(): ERROR - Unable to open file " + str(filename) + " for writing stdout and stderr " + str(e) elif type(filename) == cStringIO.OutputType: sys.stdout = filename sys.stderr = filename else: print 'redirectSTDOUT(): failed to redirect stdout/stderr to ' + str(filename) print 'redirectSTDOUT(): argument must be: string filename, cStringIO.StringIO object'
def api(self, destfile=None): """ Prints application programming interface (API) information and returns. """ localdef_dest = False if destfile == None: localdef_dest = True destfile = cStringIO.StringIO() print >>destfile, "Component " + self.__class__.__name__ + " :" PortSupplier.api(self, destfile=destfile) if localdef_dest: pydoc.pager(destfile.getvalue()) destfile.close()
def read_string4(f): r""" >>> import StringIO >>> read_string4(StringIO.StringIO("\x00\x00\x00\x00abc")) '' >>> read_string4(StringIO.StringIO("\x03\x00\x00\x00abcdef")) 'abc' >>> read_string4(StringIO.StringIO("\x00\x00\x00\x03abcdef")) Traceback (most recent call last): ... ValueError: expected 50331648 bytes in a string4, but only 6 remain """ n = read_int4(f) if n < 0: raise ValueError("string4 byte count < 0: %d" % n) data = f.read(n) if len(data) == n: return data raise ValueError("expected %d bytes in a string4, but only %d remain" % (n, len(data)))
def read_string1(f): r""" >>> import StringIO >>> read_string1(StringIO.StringIO("\x00")) '' >>> read_string1(StringIO.StringIO("\x03abcdef")) 'abc' """ n = read_uint1(f) assert n >= 0 data = f.read(n) if len(data) == n: return data raise ValueError("expected %d bytes in a string1, but only %d remain" % (n, len(data)))
def read_long1(f): r""" >>> import StringIO >>> read_long1(StringIO.StringIO("\x00")) 0L >>> read_long1(StringIO.StringIO("\x02\xff\x00")) 255L >>> read_long1(StringIO.StringIO("\x02\xff\x7f")) 32767L >>> read_long1(StringIO.StringIO("\x02\x00\xff")) -256L >>> read_long1(StringIO.StringIO("\x02\x00\x80")) -32768L """ n = read_uint1(f) data = f.read(n) if len(data) != n: raise ValueError("not enough data in stream to read long1") return decode_long(data)
def read_long4(f): r""" >>> import StringIO >>> read_long4(StringIO.StringIO("\x02\x00\x00\x00\xff\x00")) 255L >>> read_long4(StringIO.StringIO("\x02\x00\x00\x00\xff\x7f")) 32767L >>> read_long4(StringIO.StringIO("\x02\x00\x00\x00\x00\xff")) -256L >>> read_long4(StringIO.StringIO("\x02\x00\x00\x00\x00\x80")) -32768L >>> read_long1(StringIO.StringIO("\x00\x00\x00\x00")) 0L """ n = read_int4(f) if n < 0: raise ValueError("long4 byte count < 0: %d" % n) data = f.read(n) if len(data) != n: raise ValueError("not enough data in stream to read long4") return decode_long(data)
def _handle_message_delivery_status(self, msg): # We can't just write the headers directly to self's file object # because this will leave an extra newline between the last header # block and the boundary. Sigh. blocks = [] for part in msg.get_payload(): s = StringIO() g = self.clone(s) g.flatten(part, unixfrom=False) text = s.getvalue() lines = text.split('\n') # Strip off the unnecessary trailing empty line if lines and lines[-1] == '': blocks.append(NL.join(lines[:-1])) else: blocks.append(text) # Now join all the blocks with an empty line. This has the lovely # effect of separating each block with an empty line, but not adding # an extra one after the last one. self._fp.write(NL.join(blocks))
def parseString(string, handler, errorHandler=ErrorHandler()): try: from cStringIO import StringIO except ImportError: from StringIO import StringIO if errorHandler is None: errorHandler = ErrorHandler() parser = make_parser() parser.setContentHandler(handler) parser.setErrorHandler(errorHandler) inpsrc = InputSource() inpsrc.setByteStream(StringIO(string)) parser.parse(inpsrc) # this is the parser list used by the make_parser function if no # alternatives are given as parameters to the function
def gzip_encode(data): """data -> gzip encoded data Encode data using the gzip content encoding as described in RFC 1952 """ if not gzip: raise NotImplementedError f = StringIO.StringIO() gzf = gzip.GzipFile(mode="wb", fileobj=f, compresslevel=1) gzf.write(data) gzf.close() encoded = f.getvalue() f.close() return encoded ## # Decode a string using the gzip content encoding such as specified by the # Content-Encoding: gzip # in the HTTP header, as described in RFC 1952 # # @param data The encoded data # @return the unencoded data # @raises ValueError if data is not correctly coded.
def gzip_decode(data): """gzip encoded data -> unencoded data Decode data using the gzip content encoding as described in RFC 1952 """ if not gzip: raise NotImplementedError f = StringIO.StringIO(data) gzf = gzip.GzipFile(mode="rb", fileobj=f) try: decoded = gzf.read() except IOError: raise ValueError("invalid data") f.close() gzf.close() return decoded ## # Return a decoded file-like object for the gzip encoding # as described in RFC 1952. # # @param response A stream supporting a read() method # @return a file-like object that the decoded data can be read() from
def rollover(self): if self._rolled: return file = self._file newfile = self._file = TemporaryFile(*self._TemporaryFileArgs) del self._TemporaryFileArgs newfile.write(file.getvalue()) newfile.seek(file.tell(), 0) self._rolled = True # The method caching trick from NamedTemporaryFile # won't work here, because _file may change from a # _StringIO instance to a real file. So we list # all the methods directly. # Context management protocol
def getbodytext(self, decode = 1): """Return the message's body text as string. This undoes a Content-Transfer-Encoding, but does not interpret other MIME features (e.g. multipart messages). To suppress decoding, pass 0 as an argument.""" self.fp.seek(self.startofbody) encoding = self.getencoding() if not decode or encoding in ('', '7bit', '8bit', 'binary'): return self.fp.read() try: from cStringIO import StringIO except ImportError: from StringIO import StringIO output = StringIO() mimetools.decode(self.fp, output, encoding) return output.getvalue()
def file(self): """ Returns a file pointer to this binary :example: >>> process_obj = c.select(Process).where("process_name:svch0st.exe").first() >>> binary_obj = process_obj.binary >>> print(binary_obj.file.read(2)) MZ """ # TODO: I don't like reaching through to the session... with closing(self._cb.session.get("/api/v1/binary/{0:s}".format(self.md5sum), stream=True)) as r: z = StringIO(r.content) zf = ZipFile(z) fp = zf.open('filedata') return fp
def put (url, data, headers={}): """Make a PUT request to the url, using data in the message body, with the additional headers, if any""" reply = -1 # default, non-http response curl = pycurl.Curl() curl.setopt(pycurl.URL, url) if len(headers) > 0: curl.setopt(pycurl.HTTPHEADER, [k+': '+v for k,v in headers.items()]) curl.setopt(pycurl.PUT, 1) curl.setopt(pycurl.INFILESIZE, len(data)) databuffer = StringIO(data) curl.setopt(pycurl.READFUNCTION, databuffer.read) try: curl.perform() reply = curl.getinfo(pycurl.HTTP_CODE) except Exception: pass curl.close() return reply
def __init__(self, sock, mode='rb', bufsize=-1, close=False): self._sock = sock self.mode = mode # Not actually used in this version if bufsize < 0: bufsize = self.default_bufsize self.bufsize = bufsize self.softspace = False # _rbufsize is the suggested recv buffer size. It is *strictly* # obeyed within readline() for recv calls. If it is larger than # default_bufsize it will be used for recv calls within read(). if bufsize == 0: self._rbufsize = 1 elif bufsize == 1: self._rbufsize = self.default_bufsize else: self._rbufsize = bufsize self._wbufsize = bufsize # We use StringIO for the read buffer to avoid holding a list # of variously sized string objects which have been known to # fragment the heap due to how they are malloc()ed and often # realloc()ed down much smaller than their original allocation. self._rbuf = StringIO() self._wbuf = [] # A list of strings self._wbuf_len = 0 self._close = close
def __init__(self, ofile, maxresultrows=None): self._maxresultrows = 50000 if maxresultrows is None else maxresultrows self._ofile = ofile self._fieldnames = None self._buffer = StringIO() self._writer = csv.writer(self._buffer, dialect=CsvDialect) self._writerow = self._writer.writerow self._finished = False self._flushed = False self._inspector = OrderedDict() self._chunk_count = 0 self._record_count = 0 self._total_record_count = 0L
def test_header_splitter(self): eq = self.ndiffAssertEqual msg = MIMEText('') # It'd be great if we could use add_header() here, but that doesn't # guarantee an order of the parameters. msg['X-Foobar-Spoink-Defrobnit'] = ( 'wasnipoop; giraffes="very-long-necked-animals"; ' 'spooge="yummy"; hippos="gargantuan"; marshmallows="gooey"') sfp = StringIO() g = Generator(sfp) g.flatten(msg) eq(sfp.getvalue(), '''\ Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Foobar-Spoink-Defrobnit: wasnipoop; giraffes="very-long-necked-animals"; \tspooge="yummy"; hippos="gargantuan"; marshmallows="gooey" ''')
def test_nested_with_same_boundary(self): eq = self.ndiffAssertEqual # msg 39.txt is similarly evil in that it's got inner parts that use # the same boundary as outer parts. Again, I believe the way this is # parsed is closest to the spirit of RFC 2046 msg = self._msgobj('msg_39.txt') sfp = StringIO() iterators._structure(msg, sfp) eq(sfp.getvalue(), """\ multipart/mixed multipart/mixed multipart/alternative application/octet-stream application/octet-stream text/plain """)
def test_invalid_content_type(self): eq = self.assertEqual neq = self.ndiffAssertEqual msg = Message() # RFC 2045, $5.2 says invalid yields text/plain msg['Content-Type'] = 'text' eq(msg.get_content_maintype(), 'text') eq(msg.get_content_subtype(), 'plain') eq(msg.get_content_type(), 'text/plain') # Clear the old value and try something /really/ invalid del msg['content-type'] msg['Content-Type'] = 'foo' eq(msg.get_content_maintype(), 'text') eq(msg.get_content_subtype(), 'plain') eq(msg.get_content_type(), 'text/plain') # Still, make sure that the message is idempotently generated s = StringIO() g = Generator(s) g.flatten(msg) neq(s.getvalue(), 'Content-Type: foo\n\n')
def test_generate(self): # First craft the message to be encapsulated m = Message() m['Subject'] = 'An enclosed message' m.set_payload('Here is the body of the message.\n') r = MIMEMessage(m) r['Subject'] = 'The enclosing message' s = StringIO() g = Generator(s) g.flatten(r) self.assertEqual(s.getvalue(), """\ Content-Type: message/rfc822 MIME-Version: 1.0 Subject: The enclosing message Subject: An enclosed message Here is the body of the message. """)
def test_nested_with_same_boundary(self): eq = self.ndiffAssertEqual # msg 39.txt is similarly evil in that it's got inner parts that use # the same boundary as outer parts. Again, I believe the way this is # parsed is closest to the spirit of RFC 2046 msg = self._msgobj('msg_39.txt') sfp = StringIO() Iterators._structure(msg, sfp) eq(sfp.getvalue(), """\ multipart/mixed multipart/mixed multipart/alternative application/octet-stream application/octet-stream text/plain """)
def remove_blank_lines(source): """ Removes blank lines from 'source' and returns the result. Example: .. code-block:: python test = "foo" test2 = "bar" Will become: .. code-block:: python test = "foo" test2 = "bar" """ io_obj = cStringIO.StringIO(source) source = [a for a in io_obj.readlines() if a.strip()] return "".join(source)
def rev_readlines2(arg,bufsize=8192): f1=open(arg,'rb') f1.seek(0,2)# go to the end leftover='' while f1.tell(): print f1.tell() if f1.tell()<bufsize: bufsize=f1.tell() f1.seek(-bufsize,1) in_memory=f1.read(bufsize)+leftover f1.seek(-bufsize,1) buffer=cStringIO.StringIO(in_memory) buffer.seek(0,2)# go to the end line=collections.deque() while buffer.tell(): buffer.seek(-1,1) c=buffer.read(1) buffer.seek(-1,1) line.appendleft(c) if c =='\n': yield ''.join(line).strip() line.clear() leftover=''.join(line).strip() yield leftover #different approach and much faster
def write(self, data): """Buffer the input, then send as many bytes as possible""" self.buffer.write(data) if self.writable(): buff = self.buffer.getvalue() # next try/except clause suggested by Robert Brown try: sent = self.sock.send(buff) except: # Catch socket exceptions and abort # writing the buffer sent = len(data) # reset the buffer to the data that has not yet be sent self.buffer=cStringIO.StringIO() self.buffer.write(buff[sent:])
def runTokenizerTest(test): #XXX - move this out into the setup function #concatenate all consecutive character tokens into a single token if 'doubleEscaped' in test: test = unescape_test(test) expected = concatenateCharacterTokens(test['output']) if 'lastStartTag' not in test: test['lastStartTag'] = None outBuffer = cStringIO.StringIO() stdout = sys.stdout sys.stdout = outBuffer parser = TokenizerTestParser(test['initialState'], test['lastStartTag']) tokens = parser.parse(test['input']) tokens = concatenateCharacterTokens(tokens) received = normalizeTokens(tokens) errorMsg = u"\n".join(["\n\nInitial state:", test['initialState'] , "\nInput:", unicode(test['input']), "\nExpected:", unicode(expected), "\nreceived:", unicode(tokens)]) errorMsg = errorMsg.encode("utf-8") ignoreErrorOrder = test.get('ignoreErrorOrder', False) assert tokensMatch(expected, received, ignoreErrorOrder), errorMsg
def remove_blank_lines(source): """ Removes blank lines from *source* and returns the result. Example: .. code-block:: python test = "foo" test2 = "bar" Will become: .. code-block:: python test = "foo" test2 = "bar" """ io_obj = io.StringIO(source) source = [a for a in io_obj.readlines() if a.strip()] return "".join(source)
def make_input_stream(input, charset): # Is already an input stream. if hasattr(input, 'read'): if PY2: return input rv = _find_binary_reader(input) if rv is not None: return rv raise TypeError('Could not find binary reader for input stream.') if input is None: input = b'' elif not isinstance(input, bytes): input = input.encode(charset) if PY2: return StringIO(input) return io.BytesIO(input)
def open_resource(name): """Load the object from the datastore""" import logging from cStringIO import StringIO try: data = ndb.Key('Zoneinfo', name, namespace=NDB_NAMESPACE).get().data except AttributeError: # Missing zone info; test for GMT - which would be there if the # Zoneinfo has been initialized. if ndb.Key('Zoneinfo', 'GMT', namespace=NDB_NAMESPACE).get(): # the user asked for a zone that doesn't seem to exist. logging.exception("Requested zone '%s' is not in the database." % name) raise # we need to initialize the database init_zoneinfo() return open_resource(name) return StringIO(data)
def emit(events, stream=None, Dumper=Dumper, canonical=None, indent=None, width=None, allow_unicode=None, line_break=None): """ Emit YAML parsing events into a stream. If stream is None, return the produced string instead. """ getvalue = None if stream is None: from StringIO import StringIO stream = StringIO() getvalue = stream.getvalue dumper = Dumper(stream, canonical=canonical, indent=indent, width=width, allow_unicode=allow_unicode, line_break=line_break) try: for event in events: dumper.emit(event) finally: dumper.dispose() if getvalue: return getvalue()
def runProfiler(logger, func, args=tuple(), kw={}, verbose=True, nb_func=25, sort_by=('cumulative', 'calls')): profile_filename = "/tmp/profiler" prof = Profile(profile_filename) try: logger.warning("Run profiler") result = prof.runcall(func, *args, **kw) prof.close() logger.error("Profiler: Process data...") stat = loadStats(profile_filename) stat.strip_dirs() stat.sort_stats(*sort_by) logger.error("Profiler: Result:") log = StringIO() stat.stream = log stat.print_stats(nb_func) log.seek(0) for line in log: logger.error(line.rstrip()) return result finally: unlink(profile_filename)
def __init__(self, buf=None): """ @raise TypeError: Unable to coerce C{buf} to C{StringIO}. """ self._buffer = StringIO() if isinstance(buf, python.str_types): self._buffer.write(buf) elif hasattr(buf, 'getvalue'): self._buffer.write(buf.getvalue()) elif ( hasattr(buf, 'read') and hasattr(buf, 'seek') and hasattr(buf, 'tell')): old_pos = buf.tell() buf.seek(0) self._buffer.write(buf.read()) buf.seek(old_pos) elif buf is not None: raise TypeError("Unable to coerce buf->StringIO got %r" % (buf,)) self._get_len() self._len_changed = False self._buffer.seek(0, 0)
def truncate(self, size=0): """ Truncates the stream to the specified length. @param size: The length of the stream, in bytes. @type size: C{int} """ if size == 0: self._buffer = StringIO() self._len_changed = True return cur_pos = self.tell() self.seek(0) buf = self.read(size) self._buffer = StringIO() self._buffer.write(buf) self.seek(cur_pos) self._len_changed = True
def _decode(self, headers, fileobj): encoding = headers[-1].get_all("content-transfer-encoding", ["7bit"])[0] encoding = encoding.lower() if encoding == "base64": try: data = base64.b64decode(fileobj.read()) except TypeError as error: self.log.error("Base64 decoding failed ({0})".format(error)) idiokit.stop(False) return StringIO(data) if encoding == "quoted-printable": output = StringIO() quopri.decode(fileobj, output) output.seek(0) return output return fileobj
def format(self, parts, events, filename, *args): prefix, ext = os.path.splitext(filename) if ext.lower() == ".zip": zip_name = filename raw_name = prefix else: zip_name = filename + ".zip" raw_name = filename data = self.formatter.format(parts, events, *args) memfile = StringIO() zipped = zipfile.ZipFile(memfile, 'w', zipfile.ZIP_DEFLATED) zipped.writestr(raw_name, data.encode("utf-8")) zipped.close() memfile.flush() memfile.seek(0) part = MIMEBase("application", "zip") part.set_payload(memfile.read()) encode_base64(part) part.add_header("Content-Disposition", "attachment", filename=zip_name) parts.append(part) return u""