我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用StringIO.StringIO()。
def get_iam_credential_report(self): report = None while report == None: try: report = self.iam_client.get_credential_report() except botocore.exceptions.ClientError as e: if 'ReportNotPresent' in e.message: self.iam_client.generate_credential_report() else: raise e time.sleep(5) document = StringIO.StringIO(report['Content']) reader = csv.DictReader(document) report_rows = [] for row in reader: report_rows.append(row) return report_rows
def scan_file(self, this_file): """ Submit a file to be scanned by VirusTotal :param this_file: File to be scanned (32MB file size limit) :return: JSON response that contains scan_id and permalink. """ params = {'apikey': self.api_key} try: if type(this_file) == str and os.path.isfile(this_file): files = {'file': (this_file, open(this_file, 'rb'))} elif isinstance(this_file, StringIO.StringIO): files = {'file': this_file.read()} else: files = {'file': this_file} except TypeError as e: return dict(error=e.message) try: response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def encode_public_key(self): """ Based on spotnab, this is the gzipped version of the key with base64 applied to it. We encode it as such and return it. """ fileobj = StringIO() with GzipFile(fileobj=fileobj, mode="wb") as f: try: f.write(self.public_pem()) except TypeError: # It wasn't initialized yet return None return b64encode(fileobj.getvalue())
def test_pop_zipfile(): sio = StringIO() zf = wheel.install.VerifyingZipFile(sio, 'w') zf.writestr("one", b"first file") zf.writestr("two", b"second file") zf.close() try: zf.pop() except RuntimeError: pass # already closed else: raise Exception("expected RuntimeError") zf = wheel.install.VerifyingZipFile(sio, 'a') zf.pop() zf.close() zf = wheel.install.VerifyingZipFile(sio, 'r') assert len(zf.infolist()) == 1
def test_validation_fails_on_upload(self, mock_open): invalid_file = StringIO.StringIO() invalid_file.write(INVALID_CSV) mock_upload = MockFieldStorage(invalid_file, 'invalid.csv') dataset = factories.Dataset() invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV)) with mock.patch('io.open', return_value=invalid_stream): with assert_raises(t.ValidationError) as e: call_action( 'resource_create', package_id=dataset['id'], format='CSV', upload=mock_upload ) assert 'validation' in e.exception.error_dict assert 'missing-value' in str(e.exception) assert 'Row 2 has a missing value in column 4' in str(e.exception)
def test_validation_passes_on_upload(self, mock_open): invalid_file = StringIO.StringIO() invalid_file.write(VALID_CSV) mock_upload = MockFieldStorage(invalid_file, 'invalid.csv') dataset = factories.Dataset() valid_stream = io.BufferedReader(io.BytesIO(VALID_CSV)) with mock.patch('io.open', return_value=valid_stream): resource = call_action( 'resource_create', package_id=dataset['id'], format='CSV', upload=mock_upload ) assert_equals(resource['validation_status'], 'success') assert 'validation_timestamp' in resource
def test_schema_upload_field(self, mock_open): schema_file = StringIO.StringIO('{"fields":[{"name":"category"}]}') mock_upload = MockFieldStorage(schema_file, 'schema.json') dataset = factories.Dataset() resource = call_action( 'resource_create', package_id=dataset['id'], url='http://example.com/file.csv', schema_upload=mock_upload ) assert_equals(resource['schema'], {'fields': [{'name': 'category'}]}) assert 'schema_upload' not in resource assert 'schema_url' not in resource
def test_apiHostCollocation(self): app = self._rhDom.createApplication("/waveforms/through_w/through_w.sad.xml") provides_ports = object.__getattribute__(app,'_providesPortDict') self.assertEquals(provides_ports, {}) uses_ports = object.__getattribute__(app,'_usesPortDict') self.assertEquals(uses_ports, {}) _destfile=StringIO.StringIO() app.api(destfile=_destfile) provides_ports = object.__getattribute__(app,'_providesPortDict') self.assertEquals(len(provides_ports), 1) self.assertEquals(provides_ports.keys()[0], 'input') self.assertEquals(provides_ports['input']['Port Interface'], 'IDL:CF/LifeCycle:1.0') self.assertEquals(provides_ports['input']['Port Name'], 'input') uses_ports = object.__getattribute__(app,'_usesPortDict') self.assertEquals(uses_ports.keys()[0], 'output') self.assertEquals(uses_ports['output']['Port Interface'], 'IDL:CF/LifeCycle:1.0') self.assertEquals(uses_ports['output']['Port Name'], 'output')
def dumpPackets(self, pkt_start=0, pkt_end=None, payload_start=0, payload_end=40, raw_payload=False, header_only=False, use_pager=True ): genf=self._gen_packet( self.raw_data_, pkt_start ) if pkt_end == None: pkt_end = self.npkts_ else: pkt_end = pkt_end + 1 res = StringIO() for i, pkt in enumerate(genf,pkt_start): if i < pkt_end: print >>res, 'Packet: ', str(i) print >>res, pkt.header_and_payload(payload_start, payload_end, header_only=header_only, raw=raw_payload ) else: break if use_pager: _helpers.Pager( res.getvalue() ) else: print res.getvalue()
def parseString(inString): from StringIO import StringIO doc = parsexml_(StringIO(inString)) rootNode = doc.getroot() rootTag, rootClass = get_root_tag(rootNode) if rootClass is None: rootTag = 'softPkg' rootClass = softPkg rootObj = rootClass.factory() rootObj.build(rootNode) # Enable Python to collect the space used by the DOM. doc = None ## sys.stdout.write('<?xml version="1.0" ?>\n') ## rootObj.export(sys.stdout, 0, name_="softPkg", ## namespacedef_='') return rootObj
def parseString(inString): from StringIO import StringIO doc = parsexml_(StringIO(inString)) rootNode = doc.getroot() rootTag, rootClass = get_root_tag(rootNode) if rootClass is None: rootTag = 'devicepkg' rootClass = devicepkg rootObj = rootClass.factory() rootObj.build(rootNode) # Enable Python to collect the space used by the DOM. doc = None ## sys.stdout.write('<?xml version="1.0" ?>\n') ## rootObj.export(sys.stdout, 0, name_="devicepkg", ## namespacedef_='') return rootObj
def parseString(inString): from StringIO import StringIO doc = parsexml_(StringIO(inString)) rootNode = doc.getroot() rootTag, rootClass = get_root_tag(rootNode) if rootClass is None: rootTag = 'deviceconfiguration' rootClass = deviceconfiguration rootObj = rootClass.factory() rootObj.build(rootNode) # Enable Python to collect the space used by the DOM. doc = None ## sys.stdout.write('<?xml version="1.0" ?>\n') ## rootObj.export(sys.stdout, 0, name_="deviceconfiguration", ## namespacedef_='') return rootObj
def parseString(inString): from StringIO import StringIO doc = parsexml_(StringIO(inString)) rootNode = doc.getroot() rootTag, rootClass = get_root_tag(rootNode) if rootClass is None: rootTag = 'profile' rootClass = profile rootObj = rootClass.factory() rootObj.build(rootNode) # Enable Python to collect the space used by the DOM. doc = None ## sys.stdout.write('<?xml version="1.0" ?>\n') ## rootObj.export(sys.stdout, 0, name_="profile", ## namespacedef_='') return rootObj
def parseString(inString): from StringIO import StringIO doc = parsexml_(StringIO(inString)) rootNode = doc.getroot() rootTag, rootClass = get_root_tag(rootNode) if rootClass is None: rootTag = 'softwareassembly' rootClass = softwareassembly rootObj = rootClass.factory() rootObj.build(rootNode) # Enable Python to collect the space used by the DOM. doc = None ## sys.stdout.write('<?xml version="1.0" ?>\n') ## rootObj.export(sys.stdout, 0, name_="softwareassembly", ## namespacedef_='') return rootObj
def parseString(inString): from StringIO import StringIO doc = parsexml_(StringIO(inString)) rootNode = doc.getroot() rootTag, rootClass = get_root_tag(rootNode) if rootClass is None: rootTag = 'softwarecomponent' rootClass = softwarecomponent rootObj = rootClass.factory() rootObj.build(rootNode) # Enable Python to collect the space used by the DOM. doc = None ## sys.stdout.write('<?xml version="1.0" ?>\n') ## rootObj.export(sys.stdout, 0, name_="softwarecomponent", ## namespacedef_='') return rootObj
def parseString(inString): from StringIO import StringIO doc = parsexml_(StringIO(inString)) rootNode = doc.getroot() rootTag, rootClass = get_root_tag(rootNode) if rootClass is None: rootTag = 'properties' rootClass = properties rootObj = rootClass.factory() rootObj.build(rootNode) # Enable Python to collect the space used by the DOM. doc = None ## sys.stdout.write('<?xml version="1.0" ?>\n') ## rootObj.export(sys.stdout, 0, name_="properties", ## namespacedef_='') return rootObj
def toXML(self, level=0, version="2.2.2"): value = None if self.defvalue != None: value = to_xmlvalue(self.defvalue, self.type_) simp = ossie.parsers.prf.simple(id_=self.id_, type_=self.type_, name=self.name, mode=self.mode, description=self.__doc__, value=value, units=self.units, action=ossie.parsers.prf.action(type_=self.action)) for kind in self.kinds: simp.add_kind(ossie.parsers.prf.kind(kindtype=kind)) xml = StringIO.StringIO() simp.export(xml, level, name_='simple') return xml.getvalue()
def _body(self): try: read_func = self.environ['wsgi.input'].read except KeyError: self.environ['wsgi.input'] = BytesIO() return self.environ['wsgi.input'] body_iter = self._iter_chunked if self.chunked else self._iter_body body, body_size, is_temp_file = BytesIO(), 0, False for part in body_iter(read_func, self.MEMFILE_MAX): body.write(part) body_size += len(part) if not is_temp_file and body_size > self.MEMFILE_MAX: body, tmp = TemporaryFile(mode='w+b'), body body.write(tmp.getvalue()) del tmp is_temp_file = True self.environ['wsgi.input'] = body body.seek(0) return body
def _check_rst_data(self, data): """Returns warnings when the provided data doesn't compile.""" source_path = StringIO() parser = Parser() settings = frontend.OptionParser().get_default_values() settings.tab_width = 4 settings.pep_references = None settings.rfc_references = None reporter = SilentReporter(source_path, settings.report_level, settings.halt_level, stream=settings.warning_stream, debug=settings.debug, encoding=settings.error_encoding, error_handler=settings.error_encoding_error_handler) document = nodes.document(settings, reporter, source=source_path) document.note_source(source_path, -1) try: parser.parse(data, document) except AttributeError: reporter.messages.append((-1, 'Could not finish the parsing.', '', {})) return reporter.messages
def parseString(string, handler, errorHandler=ErrorHandler()): try: from cStringIO import StringIO except ImportError: from StringIO import StringIO if errorHandler is None: errorHandler = ErrorHandler() parser = make_parser() parser.setContentHandler(handler) parser.setErrorHandler(errorHandler) inpsrc = InputSource() inpsrc.setByteStream(StringIO(string)) parser.parse(inpsrc) # this is the parser list used by the make_parser function if no # alternatives are given as parameters to the function
def gzip_encode(data): """data -> gzip encoded data Encode data using the gzip content encoding as described in RFC 1952 """ if not gzip: raise NotImplementedError f = StringIO.StringIO() gzf = gzip.GzipFile(mode="wb", fileobj=f, compresslevel=1) gzf.write(data) gzf.close() encoded = f.getvalue() f.close() return encoded ## # Decode a string using the gzip content encoding such as specified by the # Content-Encoding: gzip # in the HTTP header, as described in RFC 1952 # # @param data The encoded data # @return the unencoded data # @raises ValueError if data is not correctly coded.
def gzip_decode(data): """gzip encoded data -> unencoded data Decode data using the gzip content encoding as described in RFC 1952 """ if not gzip: raise NotImplementedError f = StringIO.StringIO(data) gzf = gzip.GzipFile(mode="rb", fileobj=f) try: decoded = gzf.read() except IOError: raise ValueError("invalid data") f.close() gzf.close() return decoded ## # Return a decoded file-like object for the gzip encoding # as described in RFC 1952. # # @param response A stream supporting a read() method # @return a file-like object that the decoded data can be read() from
def rollover(self): if self._rolled: return file = self._file newfile = self._file = TemporaryFile(*self._TemporaryFileArgs) del self._TemporaryFileArgs newfile.write(file.getvalue()) newfile.seek(file.tell(), 0) self._rolled = True # The method caching trick from NamedTemporaryFile # won't work here, because _file may change from a # _StringIO instance to a real file. So we list # all the methods directly. # Context management protocol
def getbodytext(self, decode = 1): """Return the message's body text as string. This undoes a Content-Transfer-Encoding, but does not interpret other MIME features (e.g. multipart messages). To suppress decoding, pass 0 as an argument.""" self.fp.seek(self.startofbody) encoding = self.getencoding() if not decode or encoding in ('', '7bit', '8bit', 'binary'): return self.fp.read() try: from cStringIO import StringIO except ImportError: from StringIO import StringIO output = StringIO() mimetools.decode(self.fp, output, encoding) return output.getvalue()
def get_message(self, key): """Return a Message representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip '1,' line specifying labels. original_headers = StringIO.StringIO() while True: line = self._file.readline() if line == '*** EOOH ***' + os.linesep or line == '': break original_headers.write(line.replace(os.linesep, '\n')) visible_headers = StringIO.StringIO() while True: line = self._file.readline() if line == os.linesep or line == '': break visible_headers.write(line.replace(os.linesep, '\n')) body = self._file.read(stop - self._file.tell()).replace(os.linesep, '\n') msg = BabylMessage(original_headers.getvalue() + body) msg.set_visible(visible_headers.getvalue()) if key in self._labels: msg.set_labels(self._labels[key]) return msg
def get_string(self, key): """Return a string representation or raise a KeyError.""" start, stop = self._lookup(key) self._file.seek(start) self._file.readline() # Skip '1,' line specifying labels. original_headers = StringIO.StringIO() while True: line = self._file.readline() if line == '*** EOOH ***' + os.linesep or line == '': break original_headers.write(line.replace(os.linesep, '\n')) while True: line = self._file.readline() if line == os.linesep or line == '': break return original_headers.getvalue() + \ self._file.read(stop - self._file.tell()).replace(os.linesep, '\n')
def _get_result(response, limit=None): if limit == '0': result = response.read(224 * 1024) elif limit: result = response.read(int(limit) * 1024) else: result = response.read(5242880) try: encoding = response.info().getheader('Content-Encoding') except: encoding = None if encoding == 'gzip': result = gzip.GzipFile(fileobj=StringIO.StringIO(result)).read() return result
def __init__(self, sock, mode='rb', bufsize=-1, close=False): self._sock = sock self.mode = mode # Not actually used in this version if bufsize < 0: bufsize = self.default_bufsize self.bufsize = bufsize self.softspace = False # _rbufsize is the suggested recv buffer size. It is *strictly* # obeyed within readline() for recv calls. If it is larger than # default_bufsize it will be used for recv calls within read(). if bufsize == 0: self._rbufsize = 1 elif bufsize == 1: self._rbufsize = self.default_bufsize else: self._rbufsize = bufsize self._wbufsize = bufsize # We use StringIO for the read buffer to avoid holding a list # of variously sized string objects which have been known to # fragment the heap due to how they are malloc()ed and often # realloc()ed down much smaller than their original allocation. self._rbuf = StringIO() self._wbuf = [] # A list of strings self._wbuf_len = 0 self._close = close
def ini2value(ini_content): """ INI FILE CONTENT TO Data """ from ConfigParser import ConfigParser buff = StringIO.StringIO(ini_content) config = ConfigParser() config._read(buff, "dummy") output = {} for section in config.sections(): output[section]=s = {} for k, v in config.items(section): s[k]=v return wrap(output)
def test_base(self): pipe = test_helper.get_mock_pipeline([helper.RUN_PIPELINE]) _strings = strings.Subscriber(pipe) _strings.setup({ 'min_string_length': 4, 'max_lines': 2 }) doc = document.get_document('mock') doc.set_size(12345) _strings.consume(doc, StringIO('AAAA\x00BBBB\x00CCCC')) # Two child documents produced. self.assertEquals(2, len(pipe.consumer.produced)) expected = 'mock.00000.child' actual = pipe.consumer.produced[0][0].path self.assertEquals(expected, actual)
def _decompressContent(response, new_content): content = new_content try: encoding = response.get('content-encoding', None) if encoding in ['gzip', 'deflate']: if encoding == 'gzip': content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read() if encoding == 'deflate': content = zlib.decompress(content) response['content-length'] = str(len(content)) # Record the historical presence of the encoding in a way the won't interfere. response['-content-encoding'] = response['content-encoding'] del response['content-encoding'] except IOError: content = "" raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content) return content
def render(file_name): """ This function makes use of slsutil salt module to render sls files Args: file_name (str): the sls file path """ err = StringIO.StringIO() out = StringIO.StringIO() exception = None with redirect_stderr(err): with redirect_stdout(out): try: result = SLSRenderer.caller.cmd('slsutil.renderer', file_name) except salt.exceptions.SaltException as ex: exception = StageRenderingException(file_name, ex.strerror) if exception: # pylint: disable=E0702 raise exception logger.info("Rendered SLS file %s, stdout\n%s", file_name, out.getvalue()) logger.debug("Rendered SLS file %s, stderr\n%s", file_name, err.getvalue()) return result, out.getvalue(), err.getvalue()
def topngbytes(name, rows, x, y, **k): """Convenience function for creating a PNG file "in memory" as a string. Creates a :class:`Writer` instance using the keyword arguments, then passes `rows` to its :meth:`Writer.write` method. The resulting PNG file is returned as a string. `name` is used to identify the file for debugging. """ import os print (name) f = BytesIO() w = Writer(x, y, **k) w.write(f, rows) if os.environ.get('PYPNG_TEST_TMP'): w = open(name, 'wb') w.write(f.getvalue()) w.close() return f.getvalue()
def testPtrns(self): "Test colour type 3 and tRNS chunk (and 4-bit palette)." a = (50,99,50,50) b = (200,120,120,80) c = (255,255,255) d = (200,120,120) e = (50,99,50) w = Writer(3, 3, bitdepth=4, palette=[a,b,c,d,e]) f = BytesIO() w.write_array(f, array('B', (4, 3, 2, 3, 2, 0, 2, 0, 1))) r = Reader(bytes=f.getvalue()) x,y,pixels,meta = r.asRGBA8() self.assertEqual(x, 3) self.assertEqual(y, 3) c = c+(255,) d = d+(255,) e = e+(255,) boxed = [(e,d,c),(d,c,a),(c,a,b)] flat = map(lambda row: itertools.chain(*row), boxed) self.assertEqual(map(list, pixels), map(list, flat))
def testPAMin(self): """Test that the command line tool can read PAM file.""" def do(): return _main(['testPAMin']) s = BytesIO() s.write(strtobytes('P7\nWIDTH 3\nHEIGHT 1\nDEPTH 4\nMAXVAL 255\n' 'TUPLTYPE RGB_ALPHA\nENDHDR\n')) # The pixels in flat row flat pixel format flat = [255,0,0,255, 0,255,0,120, 0,0,255,30] asbytes = seqtobytes(flat) s.write(asbytes) s.flush() s.seek(0) o = BytesIO() testWithIO(s, o, do) r = Reader(bytes=o.getvalue()) x,y,pixels,meta = r.read() self.assertTrue(r.alpha) self.assertTrue(not r.greyscale) self.assertEqual(list(itertools.chain(*pixels)), flat)
def fetch_quote(symbols, timestamp, cached_file=None): url = URL % '+'.join(symbols) if not cached_file: # fetch log('Fetching %s' % url) fp = urllib.urlopen(url) try: data = fp.read() finally: fp.close() # log result if LOG_DATA_FETCHED: log_filename = LOG_FILENAME % timestamp.replace(':','-') out = open(log_filename, 'wb') try: log('Fetched %s bytes logged in %s' % (len(data), log_filename)) out.write(data) finally: out.close() else: data = open(cached_file,'rb').read() return StringIO(data)
def request(self, endpoint, post=None): buffer = BytesIO() ch = pycurl.Curl() ch.setopt(pycurl.URL, Constants.API_URL + endpoint) ch.setopt(pycurl.USERAGENT, self.userAgent) ch.setopt(pycurl.WRITEFUNCTION, buffer.write) ch.setopt(pycurl.FOLLOWLOCATION, True) ch.setopt(pycurl.HEADER, True) ch.setopt(pycurl.VERBOSE, False) ch.setopt(pycurl.COOKIEFILE, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat")) ch.setopt(pycurl.COOKIEJAR, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat")) if post is not None: ch.setopt(pycurl.POST, True) ch.setopt(pycurl.POSTFIELDS, post) if self.proxy: ch.setopt(pycurl.PROXY, self.proxyHost) if self.proxyAuth: ch.setopt(pycurl.PROXYUSERPWD, self.proxyAuth) ch.perform() resp = buffer.getvalue() header_len = ch.getinfo(pycurl.HEADER_SIZE) header = resp[0: header_len] body = resp[header_len:] ch.close() if self.debug: print("REQUEST: " + endpoint) if post is not None: if not isinstance(post, list): print("DATA: " + str(post)) print("RESPONSE: " + body) return [header, json_decode(body)]
def run_test(innerHTML, input, expected, errors, treeClass): try: p = html5parser.HTMLParser(tree = treeClass["builder"]) if innerHTML: document = p.parseFragment(StringIO.StringIO(input), innerHTML) else: document = p.parse(StringIO.StringIO(input)) except constants.DataLossWarning: #Ignore testcases we know we don't pass return document = treeClass.get("adapter", lambda x: x)(document) try: output = convertTokens(treeClass["walker"](document)) output = attrlist.sub(sortattrs, output) expected = attrlist.sub(sortattrs, convertExpected(expected)) assert expected == output, "\n".join([ "", "Input:", input, "", "Expected:", expected, "", "Received:", output ]) except NotImplementedError: pass # Amnesty for those that confess...
def loadRecord(line): """ ????csv?? """ input_line=StringIO.StringIO(line) #row=unicodecsv.reader(input_line, encoding="utf-8") #return row.next() #reader=csv.DictReader(input_line,fieldnames=["id","qid1","qid2","question1","question2","is_duplicate"]) reader=csv.reader(input_line) return reader.next() #data=[] #for row in reader: # print row # data.append([unicode(cell,"utf-8") for cell in row]) #return data[0] #return reader.next() #raw_data=sc.textFile(train_file_path).map(loadRecord) #print raw_data.take(10)
def feed(self, markup): if isinstance(markup, bytes): markup = BytesIO(markup) elif isinstance(markup, unicode): markup = StringIO(markup) # Call feed() at least once, even if the markup is empty, # or the parser won't be initialized. data = markup.read(self.CHUNK_SIZE) try: self.parser = self.parser_for(self.soup.original_encoding) self.parser.feed(data) while len(data) != 0: # Now call feed() on the rest of the data, chunk by chunk. data = markup.read(self.CHUNK_SIZE) if len(data) != 0: self.parser.feed(data) self.parser.close() except (UnicodeDecodeError, LookupError, etree.ParserError), e: raise ParserRejectedMarkup(str(e))
def CurlPOST(url, data, cookie): c = pycurl.Curl() b = StringIO.StringIO() c.setopt(pycurl.URL, url) c.setopt(pycurl.POST, 1) c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json']) # c.setopt(pycurl.TIMEOUT, 10) c.setopt(pycurl.WRITEFUNCTION, b.write) c.setopt(pycurl.COOKIEFILE, cookie) c.setopt(pycurl.COOKIEJAR, cookie) c.setopt(pycurl.POSTFIELDS, data) c.perform() html = b.getvalue() b.close() c.close() return html
def __init__(self, body, mimetype='application/octet-stream', chunksize=DEFAULT_CHUNK_SIZE, resumable=False): """Create a new MediaInMemoryUpload. DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for the stream. Args: body: string, Bytes of body content. mimetype: string, Mime-type of the file or default of 'application/octet-stream'. chunksize: int, File will be uploaded in chunks of this many bytes. Only used if resumable=True. resumable: bool, True if this is a resumable upload. False means upload in a single request. """ fd = StringIO.StringIO(body) super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize, resumable=resumable)
def emit(events, stream=None, Dumper=Dumper, canonical=None, indent=None, width=None, allow_unicode=None, line_break=None): """ Emit YAML parsing events into a stream. If stream is None, return the produced string instead. """ getvalue = None if stream is None: from StringIO import StringIO stream = StringIO() getvalue = stream.getvalue dumper = Dumper(stream, canonical=canonical, indent=indent, width=width, allow_unicode=allow_unicode, line_break=line_break) try: for event in events: dumper.emit(event) finally: dumper.dispose() if getvalue: return getvalue()
def test_object_pairs_hook(self): s = '{"xkd":1, "kcw":2, "art":3, "hxm":4, "qrt":5, "pad":6, "hoy":7}' p = [("xkd", 1), ("kcw", 2), ("art", 3), ("hxm", 4), ("qrt", 5), ("pad", 6), ("hoy", 7)] self.assertEqual(self.loads(s), eval(s)) self.assertEqual(self.loads(s, object_pairs_hook=lambda x: x), p) self.assertEqual(self.json.load(StringIO(s), object_pairs_hook=lambda x: x), p) od = self.loads(s, object_pairs_hook=OrderedDict) self.assertEqual(od, OrderedDict(p)) self.assertEqual(type(od), OrderedDict) # the object_pairs_hook takes priority over the object_hook self.assertEqual(self.loads(s, object_pairs_hook=OrderedDict, object_hook=lambda x: None), OrderedDict(p)) # check that empty object literals work (see #17368) self.assertEqual(self.loads('{}', object_pairs_hook=OrderedDict), OrderedDict()) self.assertEqual(self.loads('{"empty": {}}', object_pairs_hook=OrderedDict), OrderedDict([('empty', OrderedDict())]))