我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用io.StringIO()。
def get_environ(self): env = {} # The following code snippet does not follow PEP8 conventions # but it's formatted the way it is for demonstration purposes # to emphasize the required variables and their values # # Required WSGI variables env['wsgi.version'] = (1, 0) env['wsgi.url_scheme'] = 'http' env['wsgi.input'] = StringIO(self.request_data) env['wsgi.errors'] = sys.stderr env['wsgi.multithread'] = False env['wsgi.multiprocess'] = False env['wsgi.run_once'] = False # Required CGI variables env['REQUEST_METHOD'] = self.request_method # GET env['PATH_INFO'] = self.path # /hello env['SERVER_NAME'] = self.server_name # localhost env['SERVER_PORT'] = str(self.server_port) # 8888 return env
def __init__(self, max_size=0, mode='w+b', buffering=-1, encoding=None, newline=None, suffix=None, prefix=None, dir=None): if 'b' in mode: self._file = _io.BytesIO() else: # Setting newline="\n" avoids newline translation; # this is important because otherwise on Windows we'd # get double newline translation upon rollover(). self._file = _io.StringIO(newline="\n") self._max_size = max_size self._rolled = False self._TemporaryFileArgs = {'mode': mode, 'buffering': buffering, 'suffix': suffix, 'prefix': prefix, 'encoding': encoding, 'newline': newline, 'dir': dir}
def rollover(self): if self._rolled: return file = self._file newfile = self._file = TemporaryFile(**self._TemporaryFileArgs) del self._TemporaryFileArgs newfile.write(file.getvalue()) newfile.seek(file.tell(), 0) self._rolled = True # The method caching trick from NamedTemporaryFile # won't work here, because _file may change from a # BytesIO/StringIO instance to a real file. So we list # all the methods directly. # Context management protocol
def run_in_real_python(self, code): real_stdout = sys.stdout py_stdout = io.StringIO() sys.stdout = py_stdout py_value = py_exc = None globs = { '__builtins__': __builtins__, '__name__': '__main__', '__doc__': None, '__package__': None, } try: py_value = eval(code, globs, globs) except AssertionError: # pragma: no cover raise except Exception as e: py_exc = e finally: sys.stdout = real_stdout return py_value, py_exc, py_stdout
def parse_ab3_defines(defines_file): # , pkg_name): try: fp = open(defines_file, 'rt') abd_cont = fp.read() fp.close() except: print('[E] Failed to load autobuild defines file! Do you have read permission?') return False script = "ARCH={}\n".format( get_arch_name()) + abd_cont + gen_laundry_list(['PKGNAME', 'PKGDEP', 'BUILDDEP']) try: # Better to be replaced by subprocess.Popen abd_out = subprocess.check_output(script, shell=True) except: print('[E] Malformed Autobuild defines file found! Couldn\'t continue!') return False abd_fp = io.StringIO('[wrap]\n' + abd_out.decode('utf-8')) abd_config = RawConfigParser() abd_config.read_file(abd_fp) abd_config_dict = {} for i in abd_config['wrap']: abd_config_dict[i.upper()] = abd_config['wrap'][i] return abd_config_dict
def _make_csv_writer(self): """ :return: """ self._buffer = StringIO() self._bytes_written = 0 now = datetime.now() self._out_csv = open(self.log_folder + '/' + now.strftime('%Y%m%d_%H%M%S.csv'.format(self.make_random(6))), 'w') logging.warning("Writing to {} ({} bytes)".format(self._out_csv.name, self.max_bytes)) self._out_writer = csv.DictWriter(self._buffer, fieldnames=self.fieldnames, restval=None) self._out_writer.writeheader() self._out_csv.write(self._buffer.getvalue()) self._reset_buffer() self.writerow({'vid': self.vin})
def _make_writer(self): """ :return: """ self._buffer = StringIO() self._bytes_written = 0 now = datetime.now() self.fname = self.log_folder + '/' + now.strftime('%Y%m%d_%H%M%S_{}.json'.format(self.make_random(6))) self.fname = str(pathlib.Path(self.fname)) self._out_fh = open(self.fname, 'w') self.write_pid() logging.warning("Writing to {} ({} bytes)".format(self._out_fh.name, self.max_bytes)) # compress any old files still lying around for fname in glob(self.log_folder+"/*.json"): if fname != self.fname: self._compress(fname)
def xpm_as_matrix(buff, border): """\ Returns the XPM QR code as list of [0,1] lists. :param io.StringIO buff: Buffer to read the matrix from. """ res = [] img_data = _img_data(buff.getvalue()) height = int(img_data[0].split(' ')[0]) img_data = img_data[3:] for i, row in enumerate(img_data): if i < border: continue if i >= height - border: break r = row[border:-border] if border else row res.append([(1 if b == 'X' else 0) for b in r]) return res
def test_base(self): pipe = test_helper.get_mock_pipeline([helper.RUN_PIPELINE]) _strings = strings.Subscriber(pipe) _strings.setup({ 'min_string_length': 4, 'max_lines': 2 }) doc = document.get_document('mock') doc.set_size(12345) _strings.consume(doc, StringIO('AAAA\x00BBBB\x00CCCC')) # Two child documents produced. self.assertEquals(2, len(pipe.consumer.produced)) expected = 'mock.00000.child' actual = pipe.consumer.produced[0][0].path self.assertEquals(expected, actual)
def test_info(self, process, process_repr): for cls_or_obj in [ExampleProcess, process]: buf = StringIO() cls_or_obj.info(buf=buf) actual = buf.getvalue() assert actual == process_repr class EmptyProcess(Process): pass expected = dedent("""\ Variables: *empty* Meta: time_dependent: True""") buf = StringIO() EmptyProcess.info(buf=buf) actual = buf.getvalue() assert actual == expected
def process_replay(tup): game_string, isValue = tup game = json.load(StringIO(game_string)) assert game[0]['type'] == 'metadata' metadata = game[0] result = [] try: if isValue: result = process_eseq_value(game) else: assert metadata['won'] == True, "Input for policy features should always be from winning side" for state in all_states(game): result.extend(training_features(state)) except Exception as e: print("Warning: encountered error `%s` while procssing replay %s; skipping" % (str(e), metadata['replay_id']), file=sys.stderr) return result
def feed(self, markup): if isinstance(markup, bytes): markup = BytesIO(markup) elif isinstance(markup, str): markup = StringIO(markup) # Call feed() at least once, even if the markup is empty, # or the parser won't be initialized. data = markup.read(self.CHUNK_SIZE) try: self.parser = self.parser_for(self.soup.original_encoding) self.parser.feed(data) while len(data) != 0: # Now call feed() on the rest of the data, chunk by chunk. data = markup.read(self.CHUNK_SIZE) if len(data) != 0: self.parser.feed(data) self.parser.close() except (UnicodeDecodeError, LookupError, etree.ParserError) as e: raise ParserRejectedMarkup(str(e))
def test_show_config(self): config = Config() stdout = io.StringIO() with replace(sys, 'stdout', stdout): show_config(config, flat=True) result = stdout.getvalue() lines = result.splitlines() self.assertIn('run.commands_module => commands.py', lines) self.assertIn('run.config_file => runcommands.tests:commands.cfg', lines) self.assertIn('run.env => None', lines) self.assertIn('run.default_env => None', lines) self.assertIn('run.options => ', lines) self.assertIn('run.echo => False', lines) self.assertIn('run.hide => False', lines) self.assertIn('run.debug => False', lines) self.assertIn('version => X.Y.Z', lines)
def capture_context_buffer(**kw): if kw.pop('bytes_io', False): buf = io.BytesIO() else: buf = io.StringIO() kw.update({ 'dialect_name': "sqlite", 'output_buffer': buf }) conf = EnvironmentContext.configure def configure(*arg, **opt): opt.update(**kw) return conf(*arg, **opt) with mock.patch.object(EnvironmentContext, "configure", configure): yield buf
def __init__(self, raw_email, debug=False): ''' Setup the base options of the copy/convert setup ''' self.raw_email = raw_email self.log_processing = StringIO() self.log_content = StringIO() self.tree(self.raw_email) twiggy_out = outputs.StreamOutput(formats.shell_format, stream=self.log_processing) emitters['*'] = filters.Emitter(levels.DEBUG, True, twiggy_out) self.log_name = log.name('files') self.cur_attachment = None self.debug = debug if self.debug: if not os.path.exists('debug_logs'): os.makedirs('debug_logs') self.log_debug_err = os.path.join('debug_logs', 'debug_stderr.log') self.log_debug_out = os.path.join('debug_logs', 'debug_stdout.log') else: self.log_debug_err = os.devnull self.log_debug_out = os.devnull
def build_forbidden(clause): if not isinstance(clause, AbstractForbiddenComponent): raise TypeError("build_forbidden must be called with an instance of " "'%s', got '%s'" % (AbstractForbiddenComponent, type(clause))) if not isinstance(clause, (ForbiddenEqualsClause, ForbiddenAndConjunction)): raise NotImplementedError("IRACE cannot handle '%s' of type %s" % str(clause), (type(clause))) retval = io.StringIO() retval.write("(") # Really simple because everything is an AND-conjunction of equals # conditions dlcs = clause.get_descendant_literal_clauses() for dlc in dlcs: if retval.tell() > 1: retval.write(" && ") retval.write("%s==%s" % (dlc.hyperparameter.name, dlc.value)) retval.write(")") retval.seek(0) return retval.getvalue()
def build_forbidden(clause): if not isinstance(clause, AbstractForbiddenComponent): raise TypeError("build_forbidden must be called with an instance of " "'%s', got '%s'" % (AbstractForbiddenComponent, type(clause))) retval = StringIO() retval.write("{") # Really simple because everything is an AND-conjunction of equals # conditions dlcs = clause.get_descendant_literal_clauses() for dlc in dlcs: if retval.tell() > 1: retval.write(", ") retval.write("%s=%s" % (dlc.hyperparameter.name, dlc.value)) retval.write("}") retval.seek(0) return retval.getvalue()
def build_forbidden(clause): if not isinstance(clause, AbstractForbiddenComponent): raise TypeError("build_forbidden must be called with an instance of " "'%s', got '%s'" % (AbstractForbiddenComponent, type(clause))) if not isinstance(clause, (ForbiddenEqualsClause, ForbiddenAndConjunction)): raise NotImplementedError("SMAC cannot handle '%s' of type %s" % str(clause), (type(clause))) retval = StringIO() retval.write("{") # Really simple because everything is an AND-conjunction of equals # conditions dlcs = clause.get_descendant_literal_clauses() for dlc in dlcs: if retval.tell() > 1: retval.write(", ") retval.write("%s=%s" % (dlc.hyperparameter.name, dlc.value)) retval.write("}") retval.seek(0) return retval.getvalue()
def __repr__(self) -> str: self._populate_values() representation = io.StringIO() representation.write("Configuration:\n") hyperparameters = self.configuration_space.get_hyperparameters() hyperparameters.sort(key=lambda t: t.name) for hyperparameter in hyperparameters: hp_name = hyperparameter.name if hp_name in self._values and self._values[hp_name] is not None: representation.write(" ") value = repr(self._values[hp_name]) if isinstance(hyperparameter, Constant): representation.write("%s, Constant: %s" % (hp_name, value)) else: representation.write("%s, Value: %s" % (hp_name, value)) representation.write("\n") return representation.getvalue()
def remove_blank_lines(source): """ Removes blank lines from *source* and returns the result. Example: .. code-block:: python test = "foo" test2 = "bar" Will become: .. code-block:: python test = "foo" test2 = "bar" """ io_obj = io.StringIO(source) source = [a for a in io_obj.readlines() if a.strip()] return "".join(source)
def get_assembly_report(self, taxid): if self.ass_sum is None: self.get_assembly_summaries() df = self.ass_sum.query("taxid == {} & refseq_category == 'reference genome'".format(taxid)) if len(df) == 0: # try "representative genome" (needed for mouse and rat) df = self.ass_sum.query("taxid == {} & refseq_category == 'representative genome'".format(taxid)) if len(df) != 1: raise ValueError("unknown reference: {}".format(df)) print(df) ftp_path = list(df.ftp_path)[0] assembly = os.path.split(ftp_path)[1] url = os.path.join(ftp_path, assembly + "_assembly_report.txt") print(url) # read the column names from the file table = request.urlopen(request.Request(url)).read().decode() names = [x for x in table.split("\n") if x.startswith("#")][-1].strip().replace("# ", "").split("\t") self.chr_df[taxid] = pd.read_csv(StringIO(table), sep="\t", names=names, comment='#') self.chr_df[taxid] = self.chr_df[taxid].rename(columns={'Sequence-Name': 'SequenceName', 'Sequence-Role': 'SequenceRole', 'Assigned-Molecule': 'AssignedMolecule', 'Assigned-Molecule-Location/Type': 'AssignedMoleculeLocationType', 'GenBank-Accn': 'GenBankAccn', 'RefSeq-Accn': 'RefSeqAccn', 'UCSC-style-name': 'UCSCstylename'}) #print(self.chr_df[taxid].query("SequenceRole == 'assembled-molecule'"))
def _get_added(cls, diff): patches = PatchSet(StringIO(diff)) diff_contents = [] for p in patches: if p.added > 0: contents = [] for h in p: added = [] for i, line in enumerate(h): if line.is_added: added_line = Line(line.target_line_no, line.value, i + 1) added.append(added_line) contents += added diff_contents.append( DiffContent(p.path, contents) ) return diff_contents
def test_default(self): """Does the default method print the proper information?""" with patch('sys.stdout', new=StringIO()) as fake_out: self.my_log.describe(mode="default") output = fake_out.getvalue() self.assertIn("Log containing 4 records from local git created at ", output) self.assertIn("\nOrigin:", output) self.assertNotIn("Filters:", output) self.assertIn("\nNumber of authors: 4\n", output) self.assertIn("\nNumber of files: 7\n", output) self.assertIn("\nMost common email address domains:", output) self.assertIn("\n\t @gmail.com [4 users]\n", output) self.assertIn("\nDate range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n", output) self.assertIn("\nChange distribution summary:\n", output) self.assertIn("\n\t Files changed: Mean = 2.75, SD = 0.829\n", output) self.assertIn("\n\t Line insertions: Mean = 2.75, SD = 0.829\n", output) self.assertIn("\n\t Line deletions: Mean = nan, SD = nan\n", output) self.assertIn("\nNumber of merges: 0\n", output) self.assertIn("\nNumber of parsing errors: 0\n", output)
def test_not_default(self): """ Does a non-default method print the proper information? Note: At this point, default is the only setting so they end up being the same.""" with patch('sys.stdout', new=StringIO()) as fake_out: self.my_log.describe(mode="not default") output = fake_out.getvalue() self.assertIn("Log containing 4 records from local git created at ", output) self.assertIn("\nOrigin:", output) self.assertNotIn("Filters:", output) self.assertIn("\nNumber of authors: 4\n", output) self.assertIn("\nNumber of files: 7\n", output) self.assertIn("\nMost common email address domains:", output) self.assertIn("\n\t @gmail.com [4 users]\n", output) self.assertIn("\nDate range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n", output) self.assertIn("\nChange distribution summary:\n", output) self.assertIn("\n\t Files changed: Mean = 2.75, SD = 0.829\n", output) self.assertIn("\n\t Line insertions: Mean = 2.75, SD = 0.829\n", output) self.assertIn("\n\t Line deletions: Mean = nan, SD = nan\n", output) self.assertIn("\nNumber of merges: 0\n", output) self.assertIn("\nNumber of parsing errors: 0\n", output)
def test_whole(self): """Is the entire output as expected?""" with patch('sys.stdout', new=StringIO()) as fake_out: self.my_log.describe() out = fake_out.getvalue() self.assertRegex(out, "Log containing 4 records from local git created at ....-..-.. ..:..:..\.......\.\n" "Origin: .*\n" "Number of authors: 4\n" "Number of files: 7\n" "Most common email address domains:\n" "\t @gmail.com \[4 users\]\n" "Date range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n" "Change distribution summary:\n" "\t Files changed: Mean = 2.75, SD = 0.829\n" "\t Line insertions: Mean = 2.75, SD = 0.829\n" "\t Line deletions: Mean = nan, SD = nan\n" "Number of merges: 0\n" "Number of parsing errors: 0\n")
def test_exclude(self): """Does exclude prevent statistics from being printed?""" with patch('sys.stdout', new=StringIO()) as fake_out: self.my_log.describe(exclude=['merges', 'errors', 'files', 'summary', 'changes', 'path', 'filters', 'authors', 'dates', 'emails']) output = fake_out.getvalue() self.assertNotIn("Log containing 4 records from local git created at ", output) self.assertNotIn("\nOrigin:", output) self.assertNotIn("Filters:", output) self.assertNotIn("\nNumber of authors: 4\n", output) self.assertNotIn("\nNumber of files: 7\n", output) self.assertNotIn("\nMost common email address domains:", output) self.assertNotIn("\n\t @gmail.com [4 users]\n", output) self.assertNotIn("\nDate range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n", output) self.assertNotIn("\nChange distribution summary:\n", output) self.assertNotIn("\n\t Files changed: Mean = 2.75, SD = 0.829\n", output) self.assertNotIn("\n\t Line insertions: Mean = 2.75, SD = 0.829\n", output) self.assertNotIn("\n\t Line deletions: Mean = nan, SD = nan\n", output) self.assertNotIn("\nNumber of merges: 0\n", output) self.assertNotIn("\nNumber of parsing errors: 0\n", output) self.assertEqual(output, "")
def emit(events, stream=None, Dumper=Dumper, canonical=None, indent=None, width=None, allow_unicode=None, line_break=None): """ Emit YAML parsing events into a stream. If stream is None, return the produced string instead. """ getvalue = None if stream is None: stream = io.StringIO() getvalue = stream.getvalue dumper = Dumper(stream, canonical=canonical, indent=indent, width=width, allow_unicode=allow_unicode, line_break=line_break) try: for event in events: dumper.emit(event) finally: dumper.dispose() if getvalue: return getvalue()
def _insert_zone_names(self): """ Args: _zonetree (str): set in __init__ Returns: List of dicts (str: str) devices by zone """ self._zone_name = None self._zonetree_io = StringIO(self._zonetree) self._zonetree_csv = csv.reader(self._zonetree_io, delimiter=',') self._zonetree_lod = [] for self._row in self._zonetree_csv: if self._row[0] == '1': self._zone_name = self._row[1] if self._zone_name == 'Undefined': self._zone_name = '' continue for self._dev in self._devtree: if self._dev['ds_id'] == self._row[2]: self._dev['zone_name'] = self._zone_name return self._devtree
def textanalyze(self,index_name, analyzer, text): # Create JSON string for request body reqobject={} reqobject['text'] = text reqobject['analyzer'] = analyzer io=StringIO() json.dump(reqobject, io) req_body = io.getvalue() # HTTP request to Azure search REST API conn = httplib.HTTPSConnection(self.__api_url) conn.request("POST", u"/indexes/{0}/analyze?api-version={1}".format(index_name, _AZURE_SEARCH_API_VERSION), req_body, self.headers) response = conn.getresponse() #print "status:", response.status, response.reason data = (response.read()).decode('utf-8') #print("data:{}".format(data)) conn.close() return data
def topngbytes(name, rows, x, y, **k): """Convenience function for creating a PNG file "in memory" as a string. Creates a :class:`Writer` instance using the keyword arguments, then passes `rows` to its :meth:`Writer.write` method. The resulting PNG file is returned as a string. `name` is used to identify the file for debugging. """ import os print(name) f = BytesIO() w = Writer(x, y, **k) w.write(f, rows) if os.environ.get('PYPNG_TEST_TMP'): w = open(name, 'wb') w.write(f.getvalue()) w.close() return f.getvalue()
def testPtrns(self): "Test colour type 3 and tRNS chunk (and 4-bit palette)." a = (50,99,50,50) b = (200,120,120,80) c = (255,255,255) d = (200,120,120) e = (50,99,50) w = Writer(3, 3, bitdepth=4, palette=[a,b,c,d,e]) f = BytesIO() w.write_array(f, array('B', (4, 3, 2, 3, 2, 0, 2, 0, 1))) r = Reader(bytes=f.getvalue()) x,y,pixels,meta = r.asRGBA8() self.assertEqual(x, 3) self.assertEqual(y, 3) c = c+(255,) d = d+(255,) e = e+(255,) boxed = [(e,d,c),(d,c,a),(c,a,b)] flat = [itertools.chain(*row) for row in boxed] self.assertEqual(list(map(list, pixels)), list(map(list, flat)))
def testPAMin(self): """Test that the command line tool can read PAM file.""" def do(): return _main(['testPAMin']) s = BytesIO() s.write(strtobytes('P7\nWIDTH 3\nHEIGHT 1\nDEPTH 4\nMAXVAL 255\n' 'TUPLTYPE RGB_ALPHA\nENDHDR\n')) # The pixels in flat row flat pixel format flat = [255,0,0,255, 0,255,0,120, 0,0,255,30] asbytes = seqtobytes(flat) s.write(asbytes) s.flush() s.seek(0) o = BytesIO() testWithIO(s, o, do) r = Reader(bytes=o.getvalue()) x,y,pixels,meta = r.read() self.assertTrue(r.alpha) self.assertTrue(not r.greyscale) self.assertEqual(list(itertools.chain(*pixels)), flat)
def test_write_captions(self): os.makedirs(OUTPUT_DIR) copy(self._get_file('one_caption.vtt'), OUTPUT_DIR) out = io.StringIO() self.webvtt.read(os.path.join(OUTPUT_DIR, 'one_caption.vtt')) new_caption = Caption('00:00:07.000', '00:00:11.890', ['New caption text line1', 'New caption text line2']) self.webvtt.captions.append(new_caption) self.webvtt.write(out) out.seek(0) lines = [line.rstrip() for line in out.readlines()] expected_lines = [ 'WEBVTT', '', '00:00:00.500 --> 00:00:07.000', 'Caption text #1', '', '00:00:07.000 --> 00:00:11.890', 'New caption text line1', 'New caption text line2' ] self.assertListEqual(lines, expected_lines)
def test_copy_text(self): self.conn.set_client_encoding('latin1') self._create_temp_table() # the above call closed the xn if sys.version_info[0] < 3: abin = ''.join(map(chr, list(range(32, 127)) + list(range(160, 256)))) about = abin.decode('latin1').replace('\\', '\\\\') else: abin = bytes(list(range(32, 127)) + list(range(160, 256))).decode('latin1') about = abin.replace('\\', '\\\\') curs = self.conn.cursor() curs.execute('insert into tcopy values (%s, %s)', (42, abin)) import io f = io.StringIO() curs.copy_to(f, 'tcopy', columns=('data',)) f.seek(0) self.assertEqual(f.readline().rstrip(), about)
def _copy_from(self, curs, nrecs, srec, copykw): f = StringIO() for i, c in zip(range(nrecs), cycle(string.ascii_letters)): l = c * srec f.write("%s\t%s\n" % (i, l)) f.seek(0) curs.copy_from(MinimalRead(f), "tcopy", **copykw) curs.execute("select count(*) from tcopy") self.assertEqual(nrecs, curs.fetchone()[0]) curs.execute("select data from tcopy where id < %s order by id", (len(string.ascii_letters),)) for i, (l,) in enumerate(curs): self.assertEqual(l, string.ascii_letters[i] * srec)
def test_copy_no_column_limit(self): cols = ["c%050d" % i for i in range(200)] curs = self.conn.cursor() curs.execute('CREATE TEMPORARY TABLE manycols (%s)' % ',\n'.join( ["%s int" % c for c in cols])) curs.execute("INSERT INTO manycols DEFAULT VALUES") f = StringIO() curs.copy_to(f, "manycols", columns=cols) f.seek(0) self.assertEqual(f.read().split(), ['\\N'] * len(cols)) f.seek(0) curs.copy_from(f, "manycols", columns=cols) curs.execute("select count(*) from manycols;") self.assertEqual(curs.fetchone()[0], 2)
def test_copy_rowcount(self): curs = self.conn.cursor() curs.copy_from(StringIO('aaa\nbbb\nccc\n'), 'tcopy', columns=['data']) self.assertEqual(curs.rowcount, 3) curs.copy_expert( "copy tcopy (data) from stdin", StringIO('ddd\neee\n')) self.assertEqual(curs.rowcount, 2) curs.copy_to(StringIO(), "tcopy") self.assertEqual(curs.rowcount, 5) curs.execute("insert into tcopy (data) values ('fff')") curs.copy_expert("copy tcopy to stdout", StringIO()) self.assertEqual(curs.rowcount, 6)
def test_copy_text(self): self.conn.set_client_encoding('latin1') self._create_temp_table() # the above call closed the xn if sys.version_info[0] < 3: abin = ''.join(map(chr, range(32, 127) + range(160, 256))) about = abin.decode('latin1').replace('\\', '\\\\') else: abin = bytes(range(32, 127) + range(160, 256)).decode('latin1') about = abin.replace('\\', '\\\\') curs = self.conn.cursor() curs.execute('insert into tcopy values (%s, %s)', (42, abin)) import io f = io.StringIO() curs.copy_to(f, 'tcopy', columns=('data',)) f.seek(0) self.assertEqual(f.readline().rstrip(), about)
def _get_config(self, unit, filename): """Get a ConfigParser object for parsing a unit's config file.""" file_contents = unit.file_contents(filename) # NOTE(beisner): by default, ConfigParser does not handle options # with no value, such as the flags used in the mysql my.cnf file. # https://bugs.python.org/issue7005 config = configparser.ConfigParser(allow_no_value=True) config.readfp(io.StringIO(file_contents)) return config
def post(self): payload = { 'owner' : request.form['owner'], 'package' : request.form['package'], 'data' : request.form['data'] } owner = request.form['owner'] package = request.form['package'] data = request.form['data'] b = ENGINE.get_named_secret(owner) print(b) secret = rsa.decrypt(eval(b), KEY[1]) # data is a python tuple of the templated solidity at index 0 and an example payload at index 1 # compilation of this code should return true # if there are errors, don't commit it to the db # otherwise, commit it raw_data = decrypt(secret, eval(data)) package_data = json.loads(raw_data.decode('utf8')) ''' payload = { 'tsol' : open(code_path[0]).read(), 'example' : example } ''' # assert that the code compiles with the provided example tsol.compile(StringIO(package_data['tsol']), package_data['example']) template = pickle.dumps(package_data['tsol']) example = pickle.dumps(package_data['example']) if ENGINE.add_package(owner, package, template, example) == True: return success_payload(None, 'Package successfully uploaded.') return error_payload('Problem uploading package. Try again.')
def _restart_data(self, format_: str='json') -> None: assert format_ == 'json' with open(join(CURDIR, 'data', 'helloworld.py')) as f: testcode = f.read() self.data = Request({ 'filepath': 'test.py', 'action': 'ParseAST', 'content': testcode, 'language': 'python', }) bufferclass = io.StringIO if format_ == 'json' else io.BytesIO # This will mock the python_driver stdin self.sendbuffer = bufferclass() # This will mock the python_driver stdout self.recvbuffer = bufferclass()
def load_test_fixture(fixture_path): path = os.path.dirname(os.path.abspath(__file__)) fixture_file = open(path + '/' + fixture_path) input = fixture_file.read() fixture_file.close() sys.stdin = StringIO(input) sys.stdout = StringIO()
def load_data(file: str): with io.open(os.path.join(__abspath__, 'test_data', file)) as afile: input_str = afile.read().replace('PATH', os.path.join(__abspath__, 'test_data')) sys.stdin = io.StringIO(input_str) sys.stdout = io.StringIO()
def open(self): """Open the ssh connection""" key_str = io.StringIO(self.config['key']) pkey = paramiko.RSAKey.from_private_key(key_str) self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy( paramiko.AutoAddPolicy()) self.client.connect( self.config['hostname'], username=self.config['username'], pkey=pkey, timeout=60, banner_timeout=60) self.transport = self.client.get_transport() self.transport.set_keepalive(60) self.script_filename = self.get_tmp_script_filename()
def run_script(self, script): self.stream = StringIO(script)
def openStream(self, source): """Produces a file object from source. source can be either a file object, local filename or a string. """ # Already a file object if hasattr(source, 'read'): stream = source else: stream = StringIO(source) return stream
def captured_output(stream_name): """Return a context manager used by captured_stdout/stdin/stderr that temporarily replaces the sys stream *stream_name* with a StringIO. Taken from Lib/support/__init__.py in the CPython repo. """ orig_stdout = getattr(sys, stream_name) setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout)) try: yield getattr(sys, stream_name) finally: setattr(sys, stream_name, orig_stdout)
def export(request): print('starting csv export') output_rows, DATE = export_impl() data = StringIO() writer = csv.writer(data) writer.writerows(sorted(output_rows)) r = Response(gzip.compress(data.getvalue().encode())) r.content_type = 'text/csv' r.headers.update({ 'Content-Disposition':'attachment;filename = RRID-data-%s.csv' % DATE, 'Content-Encoding':'gzip' }) return r
def run_in_vm(self, code): real_stdout = sys.stdout # Run the code through our VM. vm_stdout = io.StringIO() if CAPTURE_STDOUT: # pragma: no branch sys.stdout = vm_stdout vm_value = vm_exc = None try: vm_value = run(code, None, None) except VirtualMachineError: # pragma: no cover # If the VM code raises an error, show it. raise except AssertionError: # pragma: no cover # If test code fails an assert, show it. raise except Exception as e: # Otherwise, keep the exception for comparison later. if not CAPTURE_EXCEPTION: # pragma: no cover raise vm_exc = e finally: sys.stdout = real_stdout real_stdout.write("-- stdout ----------\n") real_stdout.write(vm_stdout.getvalue()) return vm_value, vm_exc, vm_stdout