Python six 模块,StringIO() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用six.StringIO()

项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_run_add(self, m_k8s_add):
        vif = fake._fake_vif()
        m_k8s_add.return_value = vif
        m_fin = StringIO()
        m_fout = StringIO()
        env = {
            'CNI_COMMAND': 'ADD',
            'CNI_ARGS': 'foo=bar',
        }
        self.runner.run(env, m_fin, m_fout)
        self.assertTrue(m_k8s_add.called)
        self.assertEqual('foo=bar', m_k8s_add.call_args[0][0].CNI_ARGS)
        result = jsonutils.loads(m_fout.getvalue())
        self.assertDictEqual(
            {"cniVersion": "0.3.0",
             "dns": {"nameservers": ["192.168.0.1"]},
             "ip4": {"gateway": "192.168.0.1", "ip": "192.168.0.2/24"}},
            result)
项目:run_lambda    作者:ethantkoenig    | 项目源码 | 文件源码
def test_successful_call(self):
        def handle(event_arg, context_arg):
            return event["number"] + int(context.memory_limit_in_mb)
        event = {"number": 5, "other": "ignored"}
        context = context_module.MockLambdaContext.Builder()\
            .set_function_name("test_handle")\
            .set_memory_limit_in_mb("100")\
            .build()
        result = call_module.run_lambda(handle, event, context)
        self.assertIsInstance(result, call_module.LambdaResult)
        self.assertFalse(result.timed_out)
        self.assertIsNone(result.exception)
        self.assertEqual(result.value, 105)
        self.assertTrue(result.summary.duration_in_millis >= 0)
        self.assertIsInstance(result.summary.duration_in_millis, int)
        self.assertTrue(result.summary.max_memory_used_in_mb >= 0)
        self.assertIsInstance(result.summary.log, str)
        output = six.StringIO()
        result.display(output)
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def __init__(self, body, mimetype='application/octet-stream',
               chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
    """Create a new MediaInMemoryUpload.

  DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
  the stream.

  Args:
    body: string, Bytes of body content.
    mimetype: string, Mime-type of the file or default of
      'application/octet-stream'.
    chunksize: int, File will be uploaded in chunks of this many bytes. Only
      used if resumable=True.
    resumable: bool, True if this is a resumable upload. False means upload
      in a single request.
    """
    fd = BytesIO(body)
    super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
                                              resumable=resumable)
项目:shub-image    作者:scrapinghub    | 项目源码 | 文件源码
def test_load(self):
        config = ReleaseConfig()
        stream = StringIO(
            'projects:\n  dev: 123\n  prod: 321\n'
            'images:\n  dev: registry/user/project\n  prod: user/project\n'
            'endpoints:\n  dev: http://127.0.0.1/api/scrapyd/\n'
            'apikeys:\n  default: abcde\n'
            'version: GIT')
        config.load(stream)
        assert getattr(config, 'projects') == {'dev': 123, 'prod': 321}
        assert getattr(config, 'endpoints') == {
            'default': 'https://app.scrapinghub.com/api/',
            'dev': 'http://127.0.0.1/api/scrapyd/'}
        assert config.images == {
            'dev': 'registry/user/project',
            'prod': 'user/project'}
        assert getattr(config, 'apikeys') == {'default': 'abcde'}
        assert getattr(config, 'version') == 'GIT'
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def test_unseekable_file(self):
        def tell_fails():
            raise IOError()
        ticket = get_ticket(urls=[get_http_ticket(EXAMPLE_URL)])
        for num_retries in range(10):
            temp_file = StringIO()
            temp_file.tell = tell_fails
            with mock.patch("time.sleep") as mock_sleep, \
                    mock.patch("logging.warning") as mock_warning:
                dm = RetryCountDownloadManager(
                    ticket, temp_file, max_retries=num_retries)
                self.assertEqual(dm.max_retries, num_retries)
                self.assertRaises(exceptions.RetryableError, dm.run)
                self.assertEqual(dm.attempt_counts[EXAMPLE_URL], 1)
                self.assertEqual(mock_sleep.call_count, 0)
                self.assertEqual(mock_warning.call_count, 0)
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def logger(test_case):
    log_buffer = StringIO()
    ROOT_LOGGER.handlers[:] = []

    stream_handler = logging.StreamHandler(stream=log_buffer)
    stream_handler.setLevel(logging.DEBUG)

    formatter = logging.Formatter(
        '%(asctime)s - %(levelname)s - %(name)s#%(lineno)d - %(message)s')
    stream_handler.setFormatter(formatter)
    ROOT_LOGGER.addHandler(stream_handler)

    yield log_buffer

    if test_case.is_failed:
        log_path = os.path.join(test_case._test_report_dir, 'test.log')

        with test_case.log_exception("Attach test log"):
            with open(log_path, 'w') as log_file:
                log_file.write(log_buffer.getvalue().encode('utf-8'))
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def test_plural_form(self):
        buf = StringIO(
            (
                '<html><translate translate-plural="hello {$count$} worlds!">'
                'hello one world!</translate></html>'
            ))

        messages = list(extract_angular(buf, [], [], {}))
        self.assertEqual(
            [
                (1, 'ngettext',
                 ('hello one world!',
                  'hello {$count$} worlds!'
                  ),
                 [])
            ], messages)
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def test_multiple_comments(self):
        buf = StringIO(
            '<html><translate '
            'translate-comment="What a beautiful world"'
            'translate-comment="Another comment"'
            '>hello world!</translate></html>')

        messages = list(extract_angular(buf, [], [], {}))
        self.assertEqual(
            [
                (1, 'gettext', 'hello world!',
                 [
                     'What a beautiful world',
                     'Another comment'
                 ])
            ],
            messages)
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def test_nested_variations(self):
        buf = StringIO(
            '''
            <p translate>To <a href="link">link</a> here</p>
            <p translate>To <!-- a comment!! --> here</p>
            <p translate>To trademark&reg; &#62; &#x3E; here</p>
            '''
        )
        messages = list(extract_angular(buf, [], [], {}))
        self.assertEqual(
            [
                (2, u'gettext', 'To <a href="link">link</a> here', []),
                (3, u'gettext', 'To <!-- a comment!! --> here', []),
                (4, u'gettext', u'To trademark® &#62; &#x3E; here', []),
            ],
            messages)
项目:workflows.kyoyue    作者:wizyoung    | 项目源码 | 文件源码
def test_print_ascii(self):
        qr = qrcode.QRCode(border=0)
        f = six.StringIO()
        qr.print_ascii(out=f)
        printed = f.getvalue()
        f.close()
        expected = u'\u2588\u2580\u2580\u2580\u2580\u2580\u2588'
        self.assertEqual(printed[:len(expected)], expected)

        f = six.StringIO()
        f.isatty = lambda: True
        qr.print_ascii(out=f, tty=True)
        printed = f.getvalue()
        f.close()
        expected = (
            u'\x1b[48;5;232m\x1b[38;5;255m' +
            u'\xa0\u2584\u2584\u2584\u2584\u2584\xa0')
        self.assertEqual(printed[:len(expected)], expected)
项目:ml-rest    作者:apinf    | 项目源码 | 文件源码
def generate_handler_stub(router, handler_template=HANDLER_TEMPLATE):
    output = StringIO()
    func_name_to_operation = {}
    for path in router.get_paths():
        for operation in path.get_operations():
            snake_operation_id = camel_case_to_spaces(operation.id).replace(' ', '_')
            func_name_to_operation[snake_operation_id] = operation
    for func_name, operation in sorted(func_name_to_operation.items()):
        parameter_names = [p['name'] for p in operation.parameters]
        handler = handler_template.format(
            func_name=func_name,
            operation_id=operation.id,
            parameters=', '.join(parameter_names),
        )
        output.write(handler)
        output.write('\n\n\n')
    return output.getvalue()
项目:txamqp    作者:txamqp    | 项目源码 | 文件源码
def read_content(queue):
    frame = yield queue.get()
    header = frame.payload
    children = []
    for i in range(header.weight):
        content = yield read_content(queue)
        children.append(content)
    size = header.size
    read = 0
    buf = six.StringIO()
    while read < size:
        body = yield queue.get()
        content = body.payload.content

        # if this is the first instance of real binary content, convert the string buffer to BytesIO
        # Not a nice fix but it preserves the original behaviour
        if six.PY3 and isinstance(content, bytes) and isinstance(buf, six.StringIO):
            buf = six.BytesIO()

        buf.write(content)
        read += len(content)
    defer.returnValue(Content(buf.getvalue(), children, header.properties.copy()))
项目:lepo    作者:akx    | 项目源码 | 文件源码
def generate_handler_stub(router, handler_template=HANDLER_TEMPLATE):
    output = StringIO()
    func_name_to_operation = {}
    for path in router.get_paths():
        for operation in path.get_operations():
            snake_operation_id = camel_case_to_spaces(operation.id).replace(' ', '_')
            func_name_to_operation[snake_operation_id] = operation
    for func_name, operation in sorted(func_name_to_operation.items()):
        parameter_names = [p['name'] for p in operation.parameters]
        handler = handler_template.format(
            func_name=func_name,
            operation_id=operation.id,
            parameters=', '.join(parameter_names),
        )
        output.write(handler)
        output.write('\n\n\n')
    return output.getvalue()
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def __init__(self, body, mimetype='application/octet-stream',
               chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
    """Create a new MediaInMemoryUpload.

  DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
  the stream.

  Args:
    body: string, Bytes of body content.
    mimetype: string, Mime-type of the file or default of
      'application/octet-stream'.
    chunksize: int, File will be uploaded in chunks of this many bytes. Only
      used if resumable=True.
    resumable: bool, True if this is a resumable upload. False means upload
      in a single request.
    """
    fd = BytesIO(body)
    super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
                                              resumable=resumable)
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def assert_csv_response(self, response, status_code, expected_lines=None):
        assert_that(response.headers["Content-Type"], starts_with("text/csv"))
        # always validate status code
        assert_that(response.status_code, is_(equal_to(status_code)))

        # expect JSON data except on 204
        if status_code == 204:
            response_lines = None
        else:
            response_lines = [row for row in reader(StringIO(response.data.decode("utf-8")))]

        # validate data if provided
        assert_that(
            response_lines,
            has_length(len(expected_lines)),
        )
        if response_lines is not None and expected_lines is not None:
            for index, line in enumerate(response_lines):
                assert_that(
                    line,
                    contains(*expected_lines[index]),
                )
项目:fold    作者:tensorflow    | 项目源码 | 文件源码
def create_plan(self, loom_input_tensor):
    p = plan.TrainPlan()
    foo = tf.get_variable('foo', [], tf.float32, tf.constant_initializer(12))
    p.compiler = block_compiler.Compiler.create(
        blocks.Scalar() >> blocks.Function(lambda x: x * foo),
        loom_input_tensor=loom_input_tensor)
    p.losses['foo'] = p.compiler.output_tensors[0]
    p.finalize_stats()
    p.train_op = tf.train.GradientDescentOptimizer(1.0).minimize(
        p.loss_total, global_step=p.global_step)
    p.logdir = self.get_temp_dir()
    p.dev_examples = [2]
    p.is_chief_trainer = True
    p.batch_size = 2
    p.epochs = 3
    p.print_file = six.StringIO()
    return p
项目:fold    作者:tensorflow    | 项目源码 | 文件源码
def test_run_once(self):
    p = self._make_plan()
    p.save_best = False
    # We aren't using a managed session, so we need to run this ourselves.
    init_op = tf.global_variables_initializer()
    sv = p.create_supervisor()
    with self.test_session() as sess:
      p.run(sv, sess)
      log_str = p.print_file.getvalue()
      self.assertTrue(
          log_str.endswith('could not restore from %s\n' % p.logdir_restore),
          msg=log_str)

      p.print_file = six.StringIO()
      sess.run(init_op)
      tf.gfile.MkDir(p.logdir_restore)
      save_path = os.path.join(p.logdir_restore, 'model')
      sv.saver.save(sess, save_path, global_step=42)
      p.run(sv, sess)
      log_str = p.print_file.getvalue()
      expected_lines = ['restoring from %s-42' % save_path,
                        'step:       0 loss: 3.000e+00 foo: 4.200e+01']
      expected = '\n'.join(expected_lines) + '\n'
      self.assertTrue(log_str.endswith(expected), msg=log_str)
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def _request(self, method, url, headers=None, data=None,
                 content_length=None):
        call = build_call_record(method, sort_url_by_query_keys(url),
                                 headers or {}, data)
        if content_length is not None:
            call = tuple(list(call) + [content_length])
        self.calls.append(call)

        fixture = self.fixtures[sort_url_by_query_keys(url)][method]

        data = fixture[1]
        if isinstance(fixture[1], six.string_types):
            try:
                data = json.loads(fixture[1])
            except ValueError:
                data = six.StringIO(fixture[1])

        return FakeResponse(fixture[0], fixture[1]), data
项目:cupy    作者:cupy    | 项目源码 | 文件源码
def test_print_report(self):
        hook = memory_hooks.LineProfileHook()
        p = self.pool.malloc(1000)
        del p
        with hook:
            p1 = self.pool.malloc(1000)
            p2 = self.pool.malloc(2000)
        del p1
        del p2
        io = six.StringIO()
        hook.print_report(file=io)
        actual = io.getvalue()
        expect = r'\A_root \(3\.00KB, 2\.00KB\)'
        six.assertRegex(self, actual, expect)
        expect = r'.*\.py:[0-9]+:test_print_report \(1\.00KB, 0\.00B\)'
        six.assertRegex(self, actual, expect)
        expect = r'.*\.py:[0-9]+:test_print_report \(2\.00KB, 2\.00KB\)'
        six.assertRegex(self, actual, expect)
项目:cupy    作者:cupy    | 项目源码 | 文件源码
def test_print_report_max_depth(self):
        hook = memory_hooks.LineProfileHook(max_depth=1)
        with hook:
            p = self.pool.malloc(1000)
        del p
        io = six.StringIO()
        hook.print_report(file=io)
        actual = io.getvalue()
        self.assertEqual(2, len(actual.split('\n')))

        hook = memory_hooks.LineProfileHook(max_depth=2)
        with hook:
            p = self.pool.malloc(1000)
        del p
        io = six.StringIO()
        hook.print_report(file=io)
        actual = io.getvalue()
        self.assertEqual(3, len(actual.split('\n')))
项目:cs234_reinforcement_learning    作者:hbghhy    | 项目源码 | 文件源码
def _render(self, mode='human', close=False):
        if close:
            return
        outfile = StringIO() if mode == 'ansi' else sys.stdout

        row, col = self.s // self.ncol, self.s % self.ncol
        desc = self.desc.tolist()
        desc = [[c.decode('utf-8') for c in line] for line in desc]
        desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
        if self.lastaction is not None:
            outfile.write("  ({})\n".format(["Left","Down","Right","Up"][self.lastaction]))
        else:
            outfile.write("\n")
        outfile.write("\n".join(''.join(line) for line in desc)+"\n")

        return outfile
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def setUp(self):
        super(PostArgParseSetupTest, self).setUp()
        self.config.debug = False
        self.config.max_log_backups = 1000
        self.config.quiet = False
        self.config.verbose_count = constants.CLI_DEFAULTS['verbose_count']
        self.devnull = open(os.devnull, 'w')

        from certbot.log import ColoredStreamHandler
        self.stream_handler = ColoredStreamHandler(six.StringIO())
        from certbot.log import MemoryHandler, TempHandler
        self.temp_handler = TempHandler()
        self.temp_path = self.temp_handler.path
        self.memory_handler = MemoryHandler(self.temp_handler)
        self.root_logger = mock.MagicMock(
            handlers=[self.memory_handler, self.stream_handler])
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def _test_common(self, error_type, debug):
        """Returns the mocked logger and stderr output."""
        mock_err = six.StringIO()
        try:
            raise error_type(self.error_msg)
        except BaseException:
            exc_info = sys.exc_info()
            with mock.patch('certbot.log.logger') as mock_logger:
                with mock.patch('certbot.log.sys.stderr', mock_err):
                    try:
                        # pylint: disable=star-args
                        self._call(
                            *exc_info, debug=debug, log_path=self.log_path)
                    except SystemExit as exit_err:
                        mock_err.write(str(exit_err))
                    else:  # pragma: no cover
                        self.fail('SystemExit not raised.')

        output = mock_err.getvalue()
        return mock_logger, output
项目:warcio    作者:webrecorder    | 项目源码 | 文件源码
def test_to_str_exclude():
    def exclude(h):
        if h[0].lower() == 'multi-line':
            return None

        return h

    sah = StatusAndHeadersParser(['HTTP/1.0']).parse(StringIO(status_headers_1))
    res = sah.to_str(exclude)

    exp = "\
HTTP/1.0 200 OK\r\n\
Content-Type: ABC\r\n\
Some: Value\r\n\
"
    assert(res == exp)

    assert(sah.to_bytes(exclude) == (exp.encode('latin-1') + b'\r\n'))
项目:kripodb    作者:3D-e-Chem    | 项目源码 | 文件源码
def test_simmatrix_import_run():
    output_fn = tmpname()

    tsv = '''frag_id1   frag_id2    score
2mlm_2W7_frag1  2mlm_2W7_frag2  0.5877164873731594
2mlm_2W7_frag2  3wvm_STE_frag1  0.4633096818493935
'''
    inputfile = StringIO(tsv)

    try:
        script.simmatrix_import_run(inputfile=inputfile,
                                    inputformat='tsv',
                                    simmatrixfn=output_fn,
                                    fragmentsdb='data/fragments.sqlite',
                                    nrrows=2)

        simmatrix = SimilarityMatrix(output_fn)
        result = [r for r in simmatrix]
        simmatrix.close()
        expected = [('2mlm_2W7_frag1', '2mlm_2W7_frag2xx', 0.5877), ('2mlm_2W7_frag2', '3wvm_STE_frag1', 0.4633)]
        assert_array_almost_equal([r[2] for r in result], [r[2] for r in expected], 3)
        assert [(r[0], r[1],) for r in result] == [(r[0], r[1],) for r in result]
    finally:
        if os.path.exists(output_fn):
            os.remove(output_fn)
项目:kripodb    作者:3D-e-Chem    | 项目源码 | 文件源码
def test_simmatrix_importfpneigh_run():
    output_fn = tmpname()

    tsv = '''Compounds similar to 2mlm_2W7_frag1:
2mlm_2W7_frag1   1.0000
2mlm_2W7_frag2   0.5877
Compounds similar to 2mlm_2W7_frag2:
2mlm_2W7_frag2   1.0000
3wvm_STE_frag1   0.4633
'''
    inputfile = StringIO(tsv)

    try:
        script.simmatrix_importfpneigh_run(inputfile=inputfile,
                                           simmatrixfn=output_fn,
                                           fragmentsdb='data/fragments.sqlite',
                                           nrrows=3)

        simmatrix = SimilarityMatrix(output_fn)
        rows = [r for r in simmatrix]
        simmatrix.close()
        expected = [(u'2mlm_2W7_frag1', u'2mlm_2W7_frag2', 0.5877), (u'2mlm_2W7_frag2', u'3wvm_STE_frag1', 0.4633)]
        assert rows == expected
    finally:
        os.remove(output_fn)
项目:kripodb    作者:3D-e-Chem    | 项目源码 | 文件源码
def test_fpneigh2tsv_run():
    fpneigh_in = '''Compounds similar to 2mlm_2W7_frag1:
2mlm_2W7_frag1   1.0000
2mlm_2W7_frag2   0.5877
Compounds similar to 2mlm_2W7_frag2:
2mlm_2W7_frag2   1.0000
3wvm_STE_frag1   0.4633
'''
    inputfile = StringIO(fpneigh_in)

    outputfile = StringIO()

    script.fpneigh2tsv_run(inputfile, outputfile)

    expected = '''frag_id1\tfrag_id2\tscore
2mlm_2W7_frag1\t2mlm_2W7_frag2\t0.5877
2mlm_2W7_frag2\t3wvm_STE_frag1\t0.4633
'''
    assert outputfile.getvalue() == expected
项目:kripodb    作者:3D-e-Chem    | 项目源码 | 文件源码
def test_dump_pairs_astsv(self, bitsets, number_of_bits, label2id):
        out = StringIO()

        pairs.dump_pairs(bitsets,
                         bitsets,
                         'tsv',
                         'StringIO',
                         out,
                         number_of_bits,
                         0.4,
                         0.05,
                         label2id,
                         False,
                         True,
                         )
        result = out.getvalue()

        expected = "a\tc\t0.13556\n"
        assert result == expected
项目:kripodb    作者:3D-e-Chem    | 项目源码 | 文件源码
def test_dump_pairs_astsv_nomem(self, bitsets, number_of_bits, label2id):
        out = StringIO()

        pairs.dump_pairs(bitsets,
                         bitsets,
                         'tsv',
                         'StringIO',
                         out,
                         number_of_bits,
                         0.4,
                         0.05,
                         label2id,
                         True,
                         True,
                         )
        result = out.getvalue()

        expected = "a\tc\t0.13556\n"
        assert result == expected
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def make_audio(tensor, sample_rate, length_frames, num_channels):
    """Convert an numpy representation audio to Audio protobuf"""
    output = StringIO()
    wav_out = wave.open(output, "w")
    wav_out.setframerate(float(sample_rate))
    wav_out.setsampwidth(2)
    wav_out.setcomptype('NONE', 'not compressed')
    wav_out.setnchannels(num_channels)
    wav_out.writeframes(tensor.astype("int16").tostring())
    wav_out.close()
    output.flush()
    audio_string = output.getvalue()

    return Summary.Audio(sample_rate=float(sample_rate),
                         num_channels=num_channels,
                         length_frames=length_frames,
                         encoded_audio_string=audio_string,
                         content_type="audio/wav")
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def __init__(self, name, retain_file=False, gen_flex=False):
        self.num_kernels = 0
        self.module = None
        self.functions = dict()
        self.arg_descs = dict()
        self.cache = dict()

        self.compiled = False
        self.retain_file = retain_file

        self.buffer = StringIO()
        # Open file and add header
        if self.retain_file:
            self.f = tempfile.NamedTemporaryFile(mode='w', suffix='.c', prefix=name, delete=False)
            self.filename = self.f.name

        self.buffer.write(_includes_template)
        if gen_flex:
            self.buffer.write(_flex_includes_template)
项目:deb-python-lesscpy    作者:openstack    | 项目源码 | 文件源码
def test_parse_stream(self):
        """
        It can parse input from a file stream.
        """
        stream = StringIO("""
            @nice-blue: #5B83AD;
            """)

        self.parser.parse(file=stream)

        # A single object is parser which is the expected variable.
        self.assertEqual(1, len(self.parser.result))
        # This is a stream without a name so it sets default name.
        self.assertEqual('(stream)', self.parser.target)
        variable = self.parser.result[0]
        self.assertEqual('@nice-blue', variable.name)
        self.assertEqual(['#5b83ad'], variable.value)
项目:libtrust-py    作者:realityone    | 项目源码 | 文件源码
def test_sign(self):
        message = six.StringIO(u'Hello, World!')
        sig_algs = (hash_.ES256, hash_.ES384, hash_.ES521)
        origin_sig = (
            [88, 34, 78, 248, 120, 105, 162, 172, 14, 179, 252, 201, 193, 230, 124, 105, 227, 145, 236, 104, 31, 153, 215, 117,
             193, 126, 229, 98, 143, 88, 81, 216, 19, 67, 86, 127, 212, 195, 149, 200, 63, 117, 65, 155, 70, 6, 219, 234, 151,
             120, 229, 214, 193, 210, 165, 158, 135, 160, 244, 210, 41, 49, 132, 71],
            [65, 50, 99, 81, 100, 105, 169, 194, 75, 54, 38, 34, 69, 117, 22, 1, 105, 176, 138, 47, 254, 233, 225, 132, 39, 40,
             160, 2, 9, 208, 78, 11, 76, 20, 163, 57, 222, 176, 108, 93, 155, 163, 102, 185, 211, 138, 83, 92, 67, 92, 133, 51,
             119, 58, 20, 212, 51, 37, 20, 175, 202, 134, 85, 14],
            [112, 133, 177, 239, 79, 252, 240, 252, 28, 96, 174, 40, 109, 34, 109, 141, 183, 16, 246, 249, 102, 213, 128, 132,
             206, 148, 124, 150, 254, 248, 164, 62, 109, 95, 178, 120, 53, 52, 216, 135, 213, 193, 97, 109, 255, 0, 220, 219, 41,
             224, 112, 236, 14, 131, 170, 191, 223, 234, 36, 90, 152, 133, 138, 247]
        )

        for i, sa in enumerate(sig_algs):
            message.seek(0)
            sig, alg = self.private_key.sign(message, sa.hash_id)

            message.seek(0)
            self.assertTrue(self.public_key.verify(message, alg, sig))

            message.seek(0)
            self.assertTrue(self.public_key.verify(message, alg, bytes(bytearray(origin_sig[i]))))
项目:libtrust-py    作者:realityone    | 项目源码 | 文件源码
def sign(self, private_key, timestamp=None):
        protected = self.protected_header(timestamp=timestamp)
        sign_bytes = six.StringIO(self.sign_bytes(protected).decode())

        sig_bytes, algorithm = private_key.sign(sign_bytes, hash_.HashID.SHA256)
        self.signatures.append(
            JsSignature(
                JsHeader(
                    private_key.public_key(),
                    algorithm
                ),
                util.jose_base64_url_encode(sig_bytes),
                protected
            )
        )
        return sig_bytes, algorithm
项目:libtrust-py    作者:realityone    | 项目源码 | 文件源码
def verify(self):
        keys = []
        for sign in self.signatures:
            sign_bytes = self.sign_bytes(sign.protected)
            if sign.header.chain:
                raise NotImplementedError()
            elif sign.header.jwk:
                public_key = sign.header.jwk
            else:
                raise JSONSignError("missing public key")

            sig_bytes = util.jose_base64_url_decode(sign.signature)

            try:
                public_key.verify(six.StringIO(sign_bytes.decode()), sign.header.algorithm, sig_bytes)
            except Exception as e:
                raise e

            keys.append(public_key)

        return keys
项目:AutoML-Challenge    作者:postech-mlg-exbrain    | 项目源码 | 文件源码
def __repr__(self, verbosity=0):
        repr = StringIO()
        repr.write("Metafeatures for dataset %s\n" % self.name)
        for name in self.values:
            if verbosity == 0 and self.values[name].type_ != "METAFEATURE":
                continue
            if verbosity == 0:
                repr.write("  %s: %s\n" %
                           (str(name), str(self.values[name].value)))
            elif verbosity >= 1:
                repr.write("  %s: %10s  (%10fs)\n" %
                           (str(name), str(self.values[
                                               name].value)[:10],
                            self.values[name].time))

            # Add the reason for a crash if one happened!
            if verbosity > 1 and self.values[name].comment:
                repr.write("    %s\n" % self.values[name].comment)

        return repr.getvalue()
项目:cs234    作者:CalciferZh    | 项目源码 | 文件源码
def _render(self, mode='human', close=False):
        if close:
            return
        outfile = StringIO() if mode == 'ansi' else sys.stdout

        row, col = self.s // self.ncol, self.s % self.ncol
        desc = self.desc.tolist()
        desc = [[c.decode('utf-8') for c in line] for line in desc]
        desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
        if self.lastaction is not None:
            outfile.write("  ({})\n".format(["Left","Down","Right","Up"][self.lastaction]))
        else:
            outfile.write("\n")
        outfile.write("\n".join(''.join(line) for line in desc)+"\n")

        return outfile
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_run_invalid(self, *args):
        m_fin = StringIO()
        m_fout = StringIO()
        code = self.runner.run(
            {'CNI_COMMAND': 'INVALID', 'CNI_ARGS': 'foo=bar'}, m_fin, m_fout)

        self.assertEqual(1, code)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_run_write_version(self, *args):
        m_fin = StringIO()
        m_fout = StringIO()
        code = self.runner.run(
            {'CNI_COMMAND': 'VERSION', 'CNI_ARGS': 'foo=bar'}, m_fin, m_fout)
        result = jsonutils.loads(m_fout.getvalue())

        self.assertEqual(0, code)
        self.assertEqual(api.CNIRunner.SUPPORTED_VERSIONS,
                         result['supportedVersions'])
        self.assertEqual(api.CNIRunner.VERSION, result['cniVersion'])
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_run_del(self, m_k8s_delete):
        vif = fake._fake_vif()
        m_k8s_delete.return_value = vif
        m_fin = StringIO()
        m_fout = StringIO()
        env = {
            'CNI_COMMAND': 'DEL',
            'CNI_ARGS': 'foo=bar',
        }
        self.runner.run(env, m_fin, m_fout)
        self.assertTrue(m_k8s_delete.called)
        self.assertEqual('foo=bar', m_k8s_delete.call_args[0][0].CNI_ARGS)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def _test_run(self, cni_cmd, path, m_post):
        m_fin = StringIO()
        m_fout = StringIO()
        env = {
            'CNI_COMMAND': cni_cmd,
            'CNI_ARGS': 'foo=bar',
        }
        result = self.runner.run(env, m_fin, m_fout)
        m_post.assert_called_with(
            'http://127.0.0.1:%d/%s' % (self.port, path),
            json=mock.ANY, headers={'Connection': 'close'})
        return result
项目:run_lambda    作者:ethantkoenig    | 项目源码 | 文件源码
def __init__(self, context):
            self._context = context

            self._start_mem = memory_profiler.memory_usage()[0]

            self._log = StringIO()
            self._log.write("START RequestId: {r} Version: {v}\n".format(
                r=context.aws_request_id, v=context.function_version
            ))
            self._start_time = timeit.default_timer()
            self._previous_stdout = sys.stdout

            handler = logging.StreamHandler(stream=self._log)
            logging.getLogger().addHandler(handler)
            sys.stdout = self._log
项目:run_lambda    作者:ethantkoenig    | 项目源码 | 文件源码
def call(arguments):
        output = six.StringIO()
        with mock.patch("sys.stdout", output):
            with mock.patch("sys.argv", arguments):
                main.main()
        return str(output.getvalue())
项目:umapi-client.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def log_stream():
    stream = StringIO()
    handler = logging.StreamHandler(stream)
    logger = logging.getLogger('test_logger')
    logger.setLevel(logging.WARNING)
    logger.addHandler(handler)
    yield stream, logger
    handler.close()
项目:umapi-client.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def log_stream():
    stream = StringIO()
    handler = logging.StreamHandler(stream)
    logger = logging.getLogger('test_logger')
    logger.setLevel(logging.WARNING)
    logger.addHandler(handler)
    prior_logger = v1.logger
    v1.logger = logger
    yield stream
    v1.logger = prior_logger
    handler.close()


# test the retry logic with a custom logger
项目:umapi-client.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def _get_auth(self, ims_host, ims_endpoint_jwt,
                  tech_acct_id=None, api_key=None, client_secret=None,
                  private_key_file=None, private_key_data=None,
                  **kwargs):
        tech_acct_id = tech_acct_id or kwargs.get("tech_acct")
        private_key_file = private_key_file or kwargs.get("priv_key_path")
        if not (tech_acct_id and api_key and client_secret and (private_key_data or private_key_file)):
            raise ArgumentError("Connector create: not all required auth parameters were supplied; please see docs")
        if private_key_data:
            jwt = JWT(self.org_id, tech_acct_id, ims_host, api_key, six.StringIO(private_key_data))
        else:
            with open(private_key_file, 'r') as private_key_stream:
                jwt = JWT(self.org_id, tech_acct_id, ims_host, api_key, private_key_stream)
        token = AccessRequest("https://" + ims_host + ims_endpoint_jwt, api_key, client_secret, jwt())
        return Auth(api_key, token())
项目:gransk    作者:pcbje    | 项目源码 | 文件源码
def _run_test(self, filename):
    mock_pipeline = test_helper.get_mock_pipeline([
        helper.DOCUMENT, helper.PICTURE,
        helper.ARCHIVE, helper.DISKIMAGE])

    detector = detect_type.Subscriber(mock_pipeline)

    with open('config.yml') as inp:
      detector.setup(yaml.load(inp.read()))

    doc = document.get_document(filename)

    detector.consume(doc, StringIO('dummy'))

    return doc
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def from_string(cls, key, password='notasecret'):
        """Construct an RsaSigner instance from a string.

        Args:
            key: string, private key in PEM format.
            password: string, password for private key file. Unused for PEM
                      files.

        Returns:
            RsaSigner instance.

        Raises:
            ValueError if the key cannot be parsed as PKCS#1 or PKCS#8 in
            PEM format.
        """
        key = _from_bytes(key)  # pem expects str in Py3
        marker_id, key_bytes = pem.readPemBlocksFromFile(
            six.StringIO(key), _PKCS1_MARKER, _PKCS8_MARKER)

        if marker_id == 0:
            pkey = rsa.key.PrivateKey.load_pkcs1(key_bytes,
                                                 format='DER')
        elif marker_id == 1:
            key_info, remaining = decoder.decode(
                key_bytes, asn1Spec=_PKCS8_SPEC)
            if remaining != b'':
                raise ValueError('Unused bytes', remaining)
            pkey_info = key_info.getComponentByName('privateKey')
            pkey = rsa.key.PrivateKey.load_pkcs1(pkey_info.asOctets(),
                                                 format='DER')
        else:
            raise ValueError('No key could be detected.')

        return cls(pkey)
项目:goodbye-genbank    作者:biosustain    | 项目源码 | 文件源码
def test_genbank_record_fixing(self):
        with open(os.path.join(FILES_PATH, 'sample.gb')) as f:
            first_record = next(SeqIO.parse(f, 'genbank'))

        first_record.features = [unconvert_feature(convert_feature(f)) for f in first_record.features]

        output = StringIO()
        SeqIO.write(first_record, output, "genbank")
        with open(os.path.join(FILES_PATH, 'sample.fixed.gb')) as f:
            self.assertEqual(output.getvalue(), f.read())
        output.close()