我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用six.StringIO()。
def test_run_add(self, m_k8s_add): vif = fake._fake_vif() m_k8s_add.return_value = vif m_fin = StringIO() m_fout = StringIO() env = { 'CNI_COMMAND': 'ADD', 'CNI_ARGS': 'foo=bar', } self.runner.run(env, m_fin, m_fout) self.assertTrue(m_k8s_add.called) self.assertEqual('foo=bar', m_k8s_add.call_args[0][0].CNI_ARGS) result = jsonutils.loads(m_fout.getvalue()) self.assertDictEqual( {"cniVersion": "0.3.0", "dns": {"nameservers": ["192.168.0.1"]}, "ip4": {"gateway": "192.168.0.1", "ip": "192.168.0.2/24"}}, result)
def test_successful_call(self): def handle(event_arg, context_arg): return event["number"] + int(context.memory_limit_in_mb) event = {"number": 5, "other": "ignored"} context = context_module.MockLambdaContext.Builder()\ .set_function_name("test_handle")\ .set_memory_limit_in_mb("100")\ .build() result = call_module.run_lambda(handle, event, context) self.assertIsInstance(result, call_module.LambdaResult) self.assertFalse(result.timed_out) self.assertIsNone(result.exception) self.assertEqual(result.value, 105) self.assertTrue(result.summary.duration_in_millis >= 0) self.assertIsInstance(result.summary.duration_in_millis, int) self.assertTrue(result.summary.max_memory_used_in_mb >= 0) self.assertIsInstance(result.summary.log, str) output = six.StringIO() result.display(output)
def __init__(self, body, mimetype='application/octet-stream', chunksize=DEFAULT_CHUNK_SIZE, resumable=False): """Create a new MediaInMemoryUpload. DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for the stream. Args: body: string, Bytes of body content. mimetype: string, Mime-type of the file or default of 'application/octet-stream'. chunksize: int, File will be uploaded in chunks of this many bytes. Only used if resumable=True. resumable: bool, True if this is a resumable upload. False means upload in a single request. """ fd = BytesIO(body) super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize, resumable=resumable)
def test_load(self): config = ReleaseConfig() stream = StringIO( 'projects:\n dev: 123\n prod: 321\n' 'images:\n dev: registry/user/project\n prod: user/project\n' 'endpoints:\n dev: http://127.0.0.1/api/scrapyd/\n' 'apikeys:\n default: abcde\n' 'version: GIT') config.load(stream) assert getattr(config, 'projects') == {'dev': 123, 'prod': 321} assert getattr(config, 'endpoints') == { 'default': 'https://app.scrapinghub.com/api/', 'dev': 'http://127.0.0.1/api/scrapyd/'} assert config.images == { 'dev': 'registry/user/project', 'prod': 'user/project'} assert getattr(config, 'apikeys') == {'default': 'abcde'} assert getattr(config, 'version') == 'GIT'
def test_unseekable_file(self): def tell_fails(): raise IOError() ticket = get_ticket(urls=[get_http_ticket(EXAMPLE_URL)]) for num_retries in range(10): temp_file = StringIO() temp_file.tell = tell_fails with mock.patch("time.sleep") as mock_sleep, \ mock.patch("logging.warning") as mock_warning: dm = RetryCountDownloadManager( ticket, temp_file, max_retries=num_retries) self.assertEqual(dm.max_retries, num_retries) self.assertRaises(exceptions.RetryableError, dm.run) self.assertEqual(dm.attempt_counts[EXAMPLE_URL], 1) self.assertEqual(mock_sleep.call_count, 0) self.assertEqual(mock_warning.call_count, 0)
def logger(test_case): log_buffer = StringIO() ROOT_LOGGER.handlers[:] = [] stream_handler = logging.StreamHandler(stream=log_buffer) stream_handler.setLevel(logging.DEBUG) formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(name)s#%(lineno)d - %(message)s') stream_handler.setFormatter(formatter) ROOT_LOGGER.addHandler(stream_handler) yield log_buffer if test_case.is_failed: log_path = os.path.join(test_case._test_report_dir, 'test.log') with test_case.log_exception("Attach test log"): with open(log_path, 'w') as log_file: log_file.write(log_buffer.getvalue().encode('utf-8'))
def test_plural_form(self): buf = StringIO( ( '<html><translate translate-plural="hello {$count$} worlds!">' 'hello one world!</translate></html>' )) messages = list(extract_angular(buf, [], [], {})) self.assertEqual( [ (1, 'ngettext', ('hello one world!', 'hello {$count$} worlds!' ), []) ], messages)
def test_multiple_comments(self): buf = StringIO( '<html><translate ' 'translate-comment="What a beautiful world"' 'translate-comment="Another comment"' '>hello world!</translate></html>') messages = list(extract_angular(buf, [], [], {})) self.assertEqual( [ (1, 'gettext', 'hello world!', [ 'What a beautiful world', 'Another comment' ]) ], messages)
def test_nested_variations(self): buf = StringIO( ''' <p translate>To <a href="link">link</a> here</p> <p translate>To <!-- a comment!! --> here</p> <p translate>To trademark® > > here</p> ''' ) messages = list(extract_angular(buf, [], [], {})) self.assertEqual( [ (2, u'gettext', 'To <a href="link">link</a> here', []), (3, u'gettext', 'To <!-- a comment!! --> here', []), (4, u'gettext', u'To trademark® > > here', []), ], messages)
def test_print_ascii(self): qr = qrcode.QRCode(border=0) f = six.StringIO() qr.print_ascii(out=f) printed = f.getvalue() f.close() expected = u'\u2588\u2580\u2580\u2580\u2580\u2580\u2588' self.assertEqual(printed[:len(expected)], expected) f = six.StringIO() f.isatty = lambda: True qr.print_ascii(out=f, tty=True) printed = f.getvalue() f.close() expected = ( u'\x1b[48;5;232m\x1b[38;5;255m' + u'\xa0\u2584\u2584\u2584\u2584\u2584\xa0') self.assertEqual(printed[:len(expected)], expected)
def generate_handler_stub(router, handler_template=HANDLER_TEMPLATE): output = StringIO() func_name_to_operation = {} for path in router.get_paths(): for operation in path.get_operations(): snake_operation_id = camel_case_to_spaces(operation.id).replace(' ', '_') func_name_to_operation[snake_operation_id] = operation for func_name, operation in sorted(func_name_to_operation.items()): parameter_names = [p['name'] for p in operation.parameters] handler = handler_template.format( func_name=func_name, operation_id=operation.id, parameters=', '.join(parameter_names), ) output.write(handler) output.write('\n\n\n') return output.getvalue()
def read_content(queue): frame = yield queue.get() header = frame.payload children = [] for i in range(header.weight): content = yield read_content(queue) children.append(content) size = header.size read = 0 buf = six.StringIO() while read < size: body = yield queue.get() content = body.payload.content # if this is the first instance of real binary content, convert the string buffer to BytesIO # Not a nice fix but it preserves the original behaviour if six.PY3 and isinstance(content, bytes) and isinstance(buf, six.StringIO): buf = six.BytesIO() buf.write(content) read += len(content) defer.returnValue(Content(buf.getvalue(), children, header.properties.copy()))
def assert_csv_response(self, response, status_code, expected_lines=None): assert_that(response.headers["Content-Type"], starts_with("text/csv")) # always validate status code assert_that(response.status_code, is_(equal_to(status_code))) # expect JSON data except on 204 if status_code == 204: response_lines = None else: response_lines = [row for row in reader(StringIO(response.data.decode("utf-8")))] # validate data if provided assert_that( response_lines, has_length(len(expected_lines)), ) if response_lines is not None and expected_lines is not None: for index, line in enumerate(response_lines): assert_that( line, contains(*expected_lines[index]), )
def create_plan(self, loom_input_tensor): p = plan.TrainPlan() foo = tf.get_variable('foo', [], tf.float32, tf.constant_initializer(12)) p.compiler = block_compiler.Compiler.create( blocks.Scalar() >> blocks.Function(lambda x: x * foo), loom_input_tensor=loom_input_tensor) p.losses['foo'] = p.compiler.output_tensors[0] p.finalize_stats() p.train_op = tf.train.GradientDescentOptimizer(1.0).minimize( p.loss_total, global_step=p.global_step) p.logdir = self.get_temp_dir() p.dev_examples = [2] p.is_chief_trainer = True p.batch_size = 2 p.epochs = 3 p.print_file = six.StringIO() return p
def test_run_once(self): p = self._make_plan() p.save_best = False # We aren't using a managed session, so we need to run this ourselves. init_op = tf.global_variables_initializer() sv = p.create_supervisor() with self.test_session() as sess: p.run(sv, sess) log_str = p.print_file.getvalue() self.assertTrue( log_str.endswith('could not restore from %s\n' % p.logdir_restore), msg=log_str) p.print_file = six.StringIO() sess.run(init_op) tf.gfile.MkDir(p.logdir_restore) save_path = os.path.join(p.logdir_restore, 'model') sv.saver.save(sess, save_path, global_step=42) p.run(sv, sess) log_str = p.print_file.getvalue() expected_lines = ['restoring from %s-42' % save_path, 'step: 0 loss: 3.000e+00 foo: 4.200e+01'] expected = '\n'.join(expected_lines) + '\n' self.assertTrue(log_str.endswith(expected), msg=log_str)
def _request(self, method, url, headers=None, data=None, content_length=None): call = build_call_record(method, sort_url_by_query_keys(url), headers or {}, data) if content_length is not None: call = tuple(list(call) + [content_length]) self.calls.append(call) fixture = self.fixtures[sort_url_by_query_keys(url)][method] data = fixture[1] if isinstance(fixture[1], six.string_types): try: data = json.loads(fixture[1]) except ValueError: data = six.StringIO(fixture[1]) return FakeResponse(fixture[0], fixture[1]), data
def test_print_report(self): hook = memory_hooks.LineProfileHook() p = self.pool.malloc(1000) del p with hook: p1 = self.pool.malloc(1000) p2 = self.pool.malloc(2000) del p1 del p2 io = six.StringIO() hook.print_report(file=io) actual = io.getvalue() expect = r'\A_root \(3\.00KB, 2\.00KB\)' six.assertRegex(self, actual, expect) expect = r'.*\.py:[0-9]+:test_print_report \(1\.00KB, 0\.00B\)' six.assertRegex(self, actual, expect) expect = r'.*\.py:[0-9]+:test_print_report \(2\.00KB, 2\.00KB\)' six.assertRegex(self, actual, expect)
def test_print_report_max_depth(self): hook = memory_hooks.LineProfileHook(max_depth=1) with hook: p = self.pool.malloc(1000) del p io = six.StringIO() hook.print_report(file=io) actual = io.getvalue() self.assertEqual(2, len(actual.split('\n'))) hook = memory_hooks.LineProfileHook(max_depth=2) with hook: p = self.pool.malloc(1000) del p io = six.StringIO() hook.print_report(file=io) actual = io.getvalue() self.assertEqual(3, len(actual.split('\n')))
def _render(self, mode='human', close=False): if close: return outfile = StringIO() if mode == 'ansi' else sys.stdout row, col = self.s // self.ncol, self.s % self.ncol desc = self.desc.tolist() desc = [[c.decode('utf-8') for c in line] for line in desc] desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True) if self.lastaction is not None: outfile.write(" ({})\n".format(["Left","Down","Right","Up"][self.lastaction])) else: outfile.write("\n") outfile.write("\n".join(''.join(line) for line in desc)+"\n") return outfile
def setUp(self): super(PostArgParseSetupTest, self).setUp() self.config.debug = False self.config.max_log_backups = 1000 self.config.quiet = False self.config.verbose_count = constants.CLI_DEFAULTS['verbose_count'] self.devnull = open(os.devnull, 'w') from certbot.log import ColoredStreamHandler self.stream_handler = ColoredStreamHandler(six.StringIO()) from certbot.log import MemoryHandler, TempHandler self.temp_handler = TempHandler() self.temp_path = self.temp_handler.path self.memory_handler = MemoryHandler(self.temp_handler) self.root_logger = mock.MagicMock( handlers=[self.memory_handler, self.stream_handler])
def _test_common(self, error_type, debug): """Returns the mocked logger and stderr output.""" mock_err = six.StringIO() try: raise error_type(self.error_msg) except BaseException: exc_info = sys.exc_info() with mock.patch('certbot.log.logger') as mock_logger: with mock.patch('certbot.log.sys.stderr', mock_err): try: # pylint: disable=star-args self._call( *exc_info, debug=debug, log_path=self.log_path) except SystemExit as exit_err: mock_err.write(str(exit_err)) else: # pragma: no cover self.fail('SystemExit not raised.') output = mock_err.getvalue() return mock_logger, output
def test_to_str_exclude(): def exclude(h): if h[0].lower() == 'multi-line': return None return h sah = StatusAndHeadersParser(['HTTP/1.0']).parse(StringIO(status_headers_1)) res = sah.to_str(exclude) exp = "\ HTTP/1.0 200 OK\r\n\ Content-Type: ABC\r\n\ Some: Value\r\n\ " assert(res == exp) assert(sah.to_bytes(exclude) == (exp.encode('latin-1') + b'\r\n'))
def test_simmatrix_import_run(): output_fn = tmpname() tsv = '''frag_id1 frag_id2 score 2mlm_2W7_frag1 2mlm_2W7_frag2 0.5877164873731594 2mlm_2W7_frag2 3wvm_STE_frag1 0.4633096818493935 ''' inputfile = StringIO(tsv) try: script.simmatrix_import_run(inputfile=inputfile, inputformat='tsv', simmatrixfn=output_fn, fragmentsdb='data/fragments.sqlite', nrrows=2) simmatrix = SimilarityMatrix(output_fn) result = [r for r in simmatrix] simmatrix.close() expected = [('2mlm_2W7_frag1', '2mlm_2W7_frag2xx', 0.5877), ('2mlm_2W7_frag2', '3wvm_STE_frag1', 0.4633)] assert_array_almost_equal([r[2] for r in result], [r[2] for r in expected], 3) assert [(r[0], r[1],) for r in result] == [(r[0], r[1],) for r in result] finally: if os.path.exists(output_fn): os.remove(output_fn)
def test_simmatrix_importfpneigh_run(): output_fn = tmpname() tsv = '''Compounds similar to 2mlm_2W7_frag1: 2mlm_2W7_frag1 1.0000 2mlm_2W7_frag2 0.5877 Compounds similar to 2mlm_2W7_frag2: 2mlm_2W7_frag2 1.0000 3wvm_STE_frag1 0.4633 ''' inputfile = StringIO(tsv) try: script.simmatrix_importfpneigh_run(inputfile=inputfile, simmatrixfn=output_fn, fragmentsdb='data/fragments.sqlite', nrrows=3) simmatrix = SimilarityMatrix(output_fn) rows = [r for r in simmatrix] simmatrix.close() expected = [(u'2mlm_2W7_frag1', u'2mlm_2W7_frag2', 0.5877), (u'2mlm_2W7_frag2', u'3wvm_STE_frag1', 0.4633)] assert rows == expected finally: os.remove(output_fn)
def test_fpneigh2tsv_run(): fpneigh_in = '''Compounds similar to 2mlm_2W7_frag1: 2mlm_2W7_frag1 1.0000 2mlm_2W7_frag2 0.5877 Compounds similar to 2mlm_2W7_frag2: 2mlm_2W7_frag2 1.0000 3wvm_STE_frag1 0.4633 ''' inputfile = StringIO(fpneigh_in) outputfile = StringIO() script.fpneigh2tsv_run(inputfile, outputfile) expected = '''frag_id1\tfrag_id2\tscore 2mlm_2W7_frag1\t2mlm_2W7_frag2\t0.5877 2mlm_2W7_frag2\t3wvm_STE_frag1\t0.4633 ''' assert outputfile.getvalue() == expected
def test_dump_pairs_astsv(self, bitsets, number_of_bits, label2id): out = StringIO() pairs.dump_pairs(bitsets, bitsets, 'tsv', 'StringIO', out, number_of_bits, 0.4, 0.05, label2id, False, True, ) result = out.getvalue() expected = "a\tc\t0.13556\n" assert result == expected
def test_dump_pairs_astsv_nomem(self, bitsets, number_of_bits, label2id): out = StringIO() pairs.dump_pairs(bitsets, bitsets, 'tsv', 'StringIO', out, number_of_bits, 0.4, 0.05, label2id, True, True, ) result = out.getvalue() expected = "a\tc\t0.13556\n" assert result == expected
def make_audio(tensor, sample_rate, length_frames, num_channels): """Convert an numpy representation audio to Audio protobuf""" output = StringIO() wav_out = wave.open(output, "w") wav_out.setframerate(float(sample_rate)) wav_out.setsampwidth(2) wav_out.setcomptype('NONE', 'not compressed') wav_out.setnchannels(num_channels) wav_out.writeframes(tensor.astype("int16").tostring()) wav_out.close() output.flush() audio_string = output.getvalue() return Summary.Audio(sample_rate=float(sample_rate), num_channels=num_channels, length_frames=length_frames, encoded_audio_string=audio_string, content_type="audio/wav")
def __init__(self, name, retain_file=False, gen_flex=False): self.num_kernels = 0 self.module = None self.functions = dict() self.arg_descs = dict() self.cache = dict() self.compiled = False self.retain_file = retain_file self.buffer = StringIO() # Open file and add header if self.retain_file: self.f = tempfile.NamedTemporaryFile(mode='w', suffix='.c', prefix=name, delete=False) self.filename = self.f.name self.buffer.write(_includes_template) if gen_flex: self.buffer.write(_flex_includes_template)
def test_parse_stream(self): """ It can parse input from a file stream. """ stream = StringIO(""" @nice-blue: #5B83AD; """) self.parser.parse(file=stream) # A single object is parser which is the expected variable. self.assertEqual(1, len(self.parser.result)) # This is a stream without a name so it sets default name. self.assertEqual('(stream)', self.parser.target) variable = self.parser.result[0] self.assertEqual('@nice-blue', variable.name) self.assertEqual(['#5b83ad'], variable.value)
def test_sign(self): message = six.StringIO(u'Hello, World!') sig_algs = (hash_.ES256, hash_.ES384, hash_.ES521) origin_sig = ( [88, 34, 78, 248, 120, 105, 162, 172, 14, 179, 252, 201, 193, 230, 124, 105, 227, 145, 236, 104, 31, 153, 215, 117, 193, 126, 229, 98, 143, 88, 81, 216, 19, 67, 86, 127, 212, 195, 149, 200, 63, 117, 65, 155, 70, 6, 219, 234, 151, 120, 229, 214, 193, 210, 165, 158, 135, 160, 244, 210, 41, 49, 132, 71], [65, 50, 99, 81, 100, 105, 169, 194, 75, 54, 38, 34, 69, 117, 22, 1, 105, 176, 138, 47, 254, 233, 225, 132, 39, 40, 160, 2, 9, 208, 78, 11, 76, 20, 163, 57, 222, 176, 108, 93, 155, 163, 102, 185, 211, 138, 83, 92, 67, 92, 133, 51, 119, 58, 20, 212, 51, 37, 20, 175, 202, 134, 85, 14], [112, 133, 177, 239, 79, 252, 240, 252, 28, 96, 174, 40, 109, 34, 109, 141, 183, 16, 246, 249, 102, 213, 128, 132, 206, 148, 124, 150, 254, 248, 164, 62, 109, 95, 178, 120, 53, 52, 216, 135, 213, 193, 97, 109, 255, 0, 220, 219, 41, 224, 112, 236, 14, 131, 170, 191, 223, 234, 36, 90, 152, 133, 138, 247] ) for i, sa in enumerate(sig_algs): message.seek(0) sig, alg = self.private_key.sign(message, sa.hash_id) message.seek(0) self.assertTrue(self.public_key.verify(message, alg, sig)) message.seek(0) self.assertTrue(self.public_key.verify(message, alg, bytes(bytearray(origin_sig[i]))))
def sign(self, private_key, timestamp=None): protected = self.protected_header(timestamp=timestamp) sign_bytes = six.StringIO(self.sign_bytes(protected).decode()) sig_bytes, algorithm = private_key.sign(sign_bytes, hash_.HashID.SHA256) self.signatures.append( JsSignature( JsHeader( private_key.public_key(), algorithm ), util.jose_base64_url_encode(sig_bytes), protected ) ) return sig_bytes, algorithm
def verify(self): keys = [] for sign in self.signatures: sign_bytes = self.sign_bytes(sign.protected) if sign.header.chain: raise NotImplementedError() elif sign.header.jwk: public_key = sign.header.jwk else: raise JSONSignError("missing public key") sig_bytes = util.jose_base64_url_decode(sign.signature) try: public_key.verify(six.StringIO(sign_bytes.decode()), sign.header.algorithm, sig_bytes) except Exception as e: raise e keys.append(public_key) return keys
def __repr__(self, verbosity=0): repr = StringIO() repr.write("Metafeatures for dataset %s\n" % self.name) for name in self.values: if verbosity == 0 and self.values[name].type_ != "METAFEATURE": continue if verbosity == 0: repr.write(" %s: %s\n" % (str(name), str(self.values[name].value))) elif verbosity >= 1: repr.write(" %s: %10s (%10fs)\n" % (str(name), str(self.values[ name].value)[:10], self.values[name].time)) # Add the reason for a crash if one happened! if verbosity > 1 and self.values[name].comment: repr.write(" %s\n" % self.values[name].comment) return repr.getvalue()
def test_run_invalid(self, *args): m_fin = StringIO() m_fout = StringIO() code = self.runner.run( {'CNI_COMMAND': 'INVALID', 'CNI_ARGS': 'foo=bar'}, m_fin, m_fout) self.assertEqual(1, code)
def test_run_write_version(self, *args): m_fin = StringIO() m_fout = StringIO() code = self.runner.run( {'CNI_COMMAND': 'VERSION', 'CNI_ARGS': 'foo=bar'}, m_fin, m_fout) result = jsonutils.loads(m_fout.getvalue()) self.assertEqual(0, code) self.assertEqual(api.CNIRunner.SUPPORTED_VERSIONS, result['supportedVersions']) self.assertEqual(api.CNIRunner.VERSION, result['cniVersion'])
def test_run_del(self, m_k8s_delete): vif = fake._fake_vif() m_k8s_delete.return_value = vif m_fin = StringIO() m_fout = StringIO() env = { 'CNI_COMMAND': 'DEL', 'CNI_ARGS': 'foo=bar', } self.runner.run(env, m_fin, m_fout) self.assertTrue(m_k8s_delete.called) self.assertEqual('foo=bar', m_k8s_delete.call_args[0][0].CNI_ARGS)
def _test_run(self, cni_cmd, path, m_post): m_fin = StringIO() m_fout = StringIO() env = { 'CNI_COMMAND': cni_cmd, 'CNI_ARGS': 'foo=bar', } result = self.runner.run(env, m_fin, m_fout) m_post.assert_called_with( 'http://127.0.0.1:%d/%s' % (self.port, path), json=mock.ANY, headers={'Connection': 'close'}) return result
def __init__(self, context): self._context = context self._start_mem = memory_profiler.memory_usage()[0] self._log = StringIO() self._log.write("START RequestId: {r} Version: {v}\n".format( r=context.aws_request_id, v=context.function_version )) self._start_time = timeit.default_timer() self._previous_stdout = sys.stdout handler = logging.StreamHandler(stream=self._log) logging.getLogger().addHandler(handler) sys.stdout = self._log
def call(arguments): output = six.StringIO() with mock.patch("sys.stdout", output): with mock.patch("sys.argv", arguments): main.main() return str(output.getvalue())
def log_stream(): stream = StringIO() handler = logging.StreamHandler(stream) logger = logging.getLogger('test_logger') logger.setLevel(logging.WARNING) logger.addHandler(handler) yield stream, logger handler.close()
def log_stream(): stream = StringIO() handler = logging.StreamHandler(stream) logger = logging.getLogger('test_logger') logger.setLevel(logging.WARNING) logger.addHandler(handler) prior_logger = v1.logger v1.logger = logger yield stream v1.logger = prior_logger handler.close() # test the retry logic with a custom logger
def _get_auth(self, ims_host, ims_endpoint_jwt, tech_acct_id=None, api_key=None, client_secret=None, private_key_file=None, private_key_data=None, **kwargs): tech_acct_id = tech_acct_id or kwargs.get("tech_acct") private_key_file = private_key_file or kwargs.get("priv_key_path") if not (tech_acct_id and api_key and client_secret and (private_key_data or private_key_file)): raise ArgumentError("Connector create: not all required auth parameters were supplied; please see docs") if private_key_data: jwt = JWT(self.org_id, tech_acct_id, ims_host, api_key, six.StringIO(private_key_data)) else: with open(private_key_file, 'r') as private_key_stream: jwt = JWT(self.org_id, tech_acct_id, ims_host, api_key, private_key_stream) token = AccessRequest("https://" + ims_host + ims_endpoint_jwt, api_key, client_secret, jwt()) return Auth(api_key, token())
def _run_test(self, filename): mock_pipeline = test_helper.get_mock_pipeline([ helper.DOCUMENT, helper.PICTURE, helper.ARCHIVE, helper.DISKIMAGE]) detector = detect_type.Subscriber(mock_pipeline) with open('config.yml') as inp: detector.setup(yaml.load(inp.read())) doc = document.get_document(filename) detector.consume(doc, StringIO('dummy')) return doc
def from_string(cls, key, password='notasecret'): """Construct an RsaSigner instance from a string. Args: key: string, private key in PEM format. password: string, password for private key file. Unused for PEM files. Returns: RsaSigner instance. Raises: ValueError if the key cannot be parsed as PKCS#1 or PKCS#8 in PEM format. """ key = _from_bytes(key) # pem expects str in Py3 marker_id, key_bytes = pem.readPemBlocksFromFile( six.StringIO(key), _PKCS1_MARKER, _PKCS8_MARKER) if marker_id == 0: pkey = rsa.key.PrivateKey.load_pkcs1(key_bytes, format='DER') elif marker_id == 1: key_info, remaining = decoder.decode( key_bytes, asn1Spec=_PKCS8_SPEC) if remaining != b'': raise ValueError('Unused bytes', remaining) pkey_info = key_info.getComponentByName('privateKey') pkey = rsa.key.PrivateKey.load_pkcs1(pkey_info.asOctets(), format='DER') else: raise ValueError('No key could be detected.') return cls(pkey)
def test_genbank_record_fixing(self): with open(os.path.join(FILES_PATH, 'sample.gb')) as f: first_record = next(SeqIO.parse(f, 'genbank')) first_record.features = [unconvert_feature(convert_feature(f)) for f in first_record.features] output = StringIO() SeqIO.write(first_record, output, "genbank") with open(os.path.join(FILES_PATH, 'sample.fixed.gb')) as f: self.assertEqual(output.getvalue(), f.read()) output.close()