Python six 模块,b() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.b()

项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _parse_pem_key(raw_key_input):
    """Identify and extract PEM keys.

    Determines whether the given key is in the format of PEM key, and extracts
    the relevant part of the key if it is.

    Args:
        raw_key_input: The contents of a private key file (either PEM or
                       PKCS12).

    Returns:
        string, The actual key if the contents are from a PEM file, or
        else None.
    """
    offset = raw_key_input.find(b'-----BEGIN ')
    if offset != -1:
        return raw_key_input[offset:]
项目:pyupdater-wx-demo    作者:wettenhj    | 项目源码 | 文件源码
def setUp(self):
        tempFile = tempfile.NamedTemporaryFile()
        self.fileServerDir = tempFile.name
        tempFile.close()
        os.mkdir(self.fileServerDir)
        os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
        privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
                                        encoding='base64')
        signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
                                    encoding='base64').decode()
        VERSIONS['signature'] = signature
        keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
        with gzip.open(keysFilePath, 'wb') as keysFile:
            keysFile.write(json.dumps(KEYS, sort_keys=True))
        versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
        with gzip.open(versionsFilePath, 'wb') as versionsFile:
            versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
        os.environ['WXUPDATEDEMO_TESTING'] = 'True'
        from wxupdatedemo.config import CLIENT_CONFIG
        self.clientConfig = CLIENT_CONFIG
        self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
项目:pyupdater-wx-demo    作者:wettenhj    | 项目源码 | 文件源码
def setUp(self):
        tempFile = tempfile.NamedTemporaryFile()
        self.fileServerDir = tempFile.name
        tempFile.close()
        os.mkdir(self.fileServerDir)
        os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
        privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
                                        encoding='base64')
        signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
                                    encoding='base64').decode()
        VERSIONS['signature'] = signature
        keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
        with gzip.open(keysFilePath, 'wb') as keysFile:
            keysFile.write(json.dumps(KEYS, sort_keys=True))
        versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
        with gzip.open(versionsFilePath, 'wb') as versionsFile:
            versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
        os.environ['WXUPDATEDEMO_TESTING'] = 'True'
        from wxupdatedemo.config import CLIENT_CONFIG
        self.clientConfig = CLIENT_CONFIG
        self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
        self.clientConfig.APP_NAME = APP_NAME
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_clear(self):
        keyspace = 'keyspace'
        routing_key = 'routing_key'
        custom_payload = {'key': six.b('value')}

        ss = SimpleStatement('whatever', keyspace=keyspace, routing_key=routing_key, custom_payload=custom_payload)

        batch = BatchStatement()
        batch.add(ss)

        self.assertTrue(batch._statements_and_parameters)
        self.assertEqual(batch.keyspace, keyspace)
        self.assertEqual(batch.routing_key, routing_key)
        self.assertEqual(batch.custom_payload, custom_payload)

        batch.clear()
        self.assertFalse(batch._statements_and_parameters)
        self.assertIsNone(batch.keyspace)
        self.assertIsNone(batch.routing_key)
        self.assertFalse(batch.custom_payload)

        batch.add(ss)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_non_frozen_udts(self):
        """
        Test to ensure that non frozen udt's work with C* >3.6.

        @since 3.7.0
        @jira_ticket PYTHON-498
        @expected_result Non frozen UDT's are supported

        @test_category data_types, udt
        """
        self.session.execute("USE {0}".format(self.keyspace_name))
        self.session.execute("CREATE TYPE user (state text, has_corn boolean)")
        self.session.execute("CREATE TABLE {0} (a int PRIMARY KEY, b user)".format(self.function_table_name))
        User = namedtuple('user', ('state', 'has_corn'))
        self.cluster.register_user_type(self.keyspace_name, "user", User)
        self.session.execute("INSERT INTO {0} (a, b) VALUES (%s, %s)".format(self.function_table_name), (0, User("Nebraska", True)))
        self.session.execute("UPDATE {0} SET b.has_corn = False where a = 0".format(self.function_table_name))
        result = self.session.execute("SELECT * FROM {0}".format(self.function_table_name))
        self.assertFalse(result[0].b.has_corn)
        table_sql = self.cluster.metadata.keyspaces[self.keyspace_name].tables[self.function_table_name].as_cql_query()
        self.assertNotIn("<frozen>", table_sql)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_raise_error_on_nonexisting_udts(self):
        """
        Test for ensuring that an error is raised for operating on a nonexisting udt or an invalid keyspace
        """

        c = Cluster(protocol_version=PROTOCOL_VERSION)
        s = c.connect(self.keyspace_name, wait_for_all_pools=True)
        User = namedtuple('user', ('age', 'name'))

        with self.assertRaises(UserTypeDoesNotExist):
            c.register_user_type("some_bad_keyspace", "user", User)

        with self.assertRaises(UserTypeDoesNotExist):
            c.register_user_type("system", "user", User)

        with self.assertRaises(InvalidRequest):
            s.execute("CREATE TABLE mytable (a int PRIMARY KEY, b frozen<user>)")

        c.shutdown()
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_cluster_column_ordering_reversed_metadata(self):
        """
        Simple test to ensure that the metatdata associated with cluster ordering is surfaced is surfaced correctly.

        Creates a table with a few clustering keys. Then checks the clustering order associated with clustering columns
        and ensure it's set correctly.
        @since 3.0.0
        @jira_ticket PYTHON-402
        @expected_result is_reversed is set on DESC order, and is False on ASC

        @test_category metadata
        """

        create_statement = self.make_create_statement(["a"], ["b", "c"], ["d"], compact=True)
        create_statement += " AND CLUSTERING ORDER BY (b ASC, c DESC)"
        self.session.execute(create_statement)
        tablemeta = self.get_table_metadata()
        b_column = tablemeta.columns['b']
        self.assertFalse(b_column.is_reversed)
        c_column = tablemeta.columns['c']
        self.assertTrue(c_column.is_reversed)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_cql_compatibility(self):
        if CASS_SERVER_VERSION >= (3, 0):
            raise unittest.SkipTest("cql compatibility does not apply Cassandra 3.0+")

        # having more than one non-PK column is okay if there aren't any
        # clustering columns
        create_statement = self.make_create_statement(["a"], [], ["b", "c", "d"], compact=True)
        self.session.execute(create_statement)
        tablemeta = self.get_table_metadata()

        self.assertEqual([u'a'], [c.name for c in tablemeta.partition_key])
        self.assertEqual([], tablemeta.clustering_key)
        self.assertEqual([u'a', u'b', u'c', u'd'], sorted(tablemeta.columns.keys()))

        self.assertTrue(tablemeta.is_cql_compatible)

        # ... but if there are clustering columns, it's not CQL compatible.
        # This is a hacky way to simulate having clustering columns.
        tablemeta.clustering_key = ["foo", "bar"]
        tablemeta.columns["foo"] = None
        tablemeta.columns["bar"] = None
        self.assertFalse(tablemeta.is_cql_compatible)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_indexes(self):
        create_statement = self.make_create_statement(["a"], ["b", "c"], ["d", "e", "f"])
        create_statement += " WITH CLUSTERING ORDER BY (b ASC, c ASC)"
        execute_until_pass(self.session, create_statement)

        d_index = "CREATE INDEX d_index ON %s.%s (d)" % (self.keyspace_name, self.function_table_name)
        e_index = "CREATE INDEX e_index ON %s.%s (e)" % (self.keyspace_name, self.function_table_name)
        execute_until_pass(self.session, d_index)
        execute_until_pass(self.session, e_index)

        tablemeta = self.get_table_metadata()
        statements = tablemeta.export_as_string().strip()
        statements = [s.strip() for s in statements.split(';')]
        statements = list(filter(bool, statements))
        self.assertEqual(3, len(statements))
        self.assertIn(d_index, statements)
        self.assertIn(e_index, statements)

        # make sure indexes are included in KeyspaceMetadata.export_as_string()
        ksmeta = self.cluster.metadata.keyspaces[self.keyspace_name]
        statement = ksmeta.export_as_string()
        self.assertIn('CREATE INDEX d_index', statement)
        self.assertIn('CREATE INDEX e_index', statement)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_non_size_tiered_compaction(self):
        """
        test options for non-size-tiered compaction strategy

        Creates a table with LeveledCompactionStrategy, specifying one non-default option. Verifies that the option is
        present in generated CQL, and that other legacy table parameters (min_threshold, max_threshold) are not included.

        @since 2.6.0
        @jira_ticket PYTHON-352
        @expected_result the options map for LeveledCompactionStrategy does not contain min_threshold, max_threshold

        @test_category metadata
        """
        create_statement = self.make_create_statement(["a"], [], ["b", "c"])
        create_statement += "WITH COMPACTION = {'class': 'LeveledCompactionStrategy', 'tombstone_threshold': '0.3'}"
        self.session.execute(create_statement)

        table_meta = self.get_table_metadata()
        cql = table_meta.export_as_string()
        self.assertIn("'tombstone_threshold': '0.3'", cql)
        self.assertIn("LeveledCompactionStrategy", cql)
        self.assertNotIn("min_threshold", cql)
        self.assertNotIn("max_threshold", cql)
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def test_simple(self):
        task1 = TaskRegistry.create('test_executor_add')
        task1.Input.a = 10
        task1.Input.b = 20

        task2 = TaskRegistry.create('test_executor_add')
        task2.Input.a = 100
        task2.Input.b = 200

        executor1 = TaskExecutor()
        executor1.set_task(task1)
        executor1.execute()

        executor2 = TaskExecutor()
        executor2.set_task(task2)
        executor2.execute()

        self.assertEqual(task1.Output.c, 30)
        self.assertEqual(task2.Output.c, 300)
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def test_stream(self):
        with open('test.tmp', 'wb') as fd:
            fd.write(six.b('Hello World ') * 100)

        task = TaskRegistry.create('test_executor_stream_copy')
        task.Input.stream = open('test.tmp', 'rb')
        task.Output.stream = open('test.out', 'wb')

        executor = TaskExecutor()
        executor.set_task(task)
        executor.execute()

        with open('test.out', 'rb') as fd:
            self.assertEqual(fd.read(), six.b('Hello World ') * 100)

        os.unlink('test.tmp')
        os.unlink('test.out')
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def test_string_field(self):
        instance = StringField()
        self.assertEqual(instance.get_initial(), None)

        with self.assertRaises(InvalidValueException):
            instance.validate(1)

        with self.assertRaises(InvalidValueException):
            instance.validate(1.5)

        with self.assertRaises(InvalidValueException):
            instance.validate(six.b('ABC'))

        instance = StringField(default=six.u('ABC'))
        self.assertEqual(instance.get_initial(), six.u('ABC'))

        instance.validate(six.u('hello'))
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def test_byte_field(self):
        instance = ByteField()
        self.assertEqual(instance.get_initial(), None)

        with self.assertRaises(InvalidValueException):
            instance.validate(1)

        with self.assertRaises(InvalidValueException):
            instance.validate(1.5)

        with self.assertRaises(InvalidValueException):
            instance.validate(six.u('ABC'))

        instance = ByteField(default=six.b('ABC'))
        self.assertEqual(instance.get_initial(), six.b('ABC'))

        instance.validate(six.b('hello'))
项目:captaincloud    作者:bpsagar    | 项目源码 | 文件源码
def test_struct_field(self):
        instance = StructField(a=IntegerField(), b=FloatField())
        val = instance.create()
        val.a = 100
        val.b = 3.14
        self.assertEqual(val.a, 100)

        nested_instance = StructField(
            a=IntegerField(),
            b=StructField(
                c=FloatField(),
                d=StringField(default=six.u('hello world'))
            )
        )

        val = nested_instance.create()
        val.a = 100
        val.b.c = 3.14
        self.assertEqual(val.b.c, 3.14)
        self.assertEqual(val.b.d, six.u('hello world'))
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_single_io_write_stream_encode_output(
        tmpdir,
        patch_aws_encryption_sdk_stream,
        patch_json_ready_header,
        patch_json_ready_header_auth
):
    patch_aws_encryption_sdk_stream.return_value = io.BytesIO(DATA)
    patch_aws_encryption_sdk_stream.return_value.header = MagicMock(encryption_context=sentinel.encryption_context)
    target_file = tmpdir.join('target')
    mock_source = MagicMock()
    kwargs = GOOD_IOHANDLER_KWARGS.copy()
    kwargs['encode_output'] = True
    handler = io_handling.IOHandler(**kwargs)
    with open(str(target_file), 'wb') as destination_writer:
        handler._single_io_write(
            stream_args={
                'mode': 'encrypt',
                'a': sentinel.a,
                'b': sentinel.b
            },
            source=mock_source,
            destination_writer=destination_writer
        )

    assert target_file.read('rb') == base64.b64encode(DATA)
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_should_write_file_does_exist(tmpdir, patch_input, interactive, no_overwrite, user_input, expected):
    target_file = tmpdir.join('target')
    target_file.write(b'')
    patch_input.return_value = user_input
    kwargs = GOOD_IOHANDLER_KWARGS.copy()
    kwargs.update(dict(
        interactive=interactive,
        no_overwrite=no_overwrite
    ))
    handler = io_handling.IOHandler(**kwargs)

    should_write = handler._should_write_file(str(target_file))

    if expected:
        assert should_write
    else:
        assert not should_write
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def write_metadata(self, **metadata):
        # type: (**Any) -> Optional[int]
        """Writes metadata to the output stream if output is not suppressed.

        :param **metadata: JSON-serializeable metadata kwargs to write
        """
        if self.suppress_output:
            return 0  # wrote 0 bytes

        metadata_line = json.dumps(metadata, sort_keys=True) + os.linesep
        metadata_output = ''  # type: Union[str, bytes]
        if 'b' in self._output_mode:
            metadata_output = metadata_line.encode('utf-8')
        else:
            metadata_output = metadata_line
        return self._output_stream.write(metadata_output)
项目:workflows.kyoyue    作者:wizyoung    | 项目源码 | 文件源码
def optimal_data_chunks(data, minimum=4):
    """
    An iterator returning QRData chunks optimized to the data content.

    :param minimum: The minimum number of bytes in a row to split as a chunk.
    """
    data = to_bytestring(data)
    re_repeat = (
        six.b('{') + six.text_type(minimum).encode('ascii') + six.b(',}'))
    num_pattern = re.compile(six.b('\d') + re_repeat)
    num_bits = _optimal_split(data, num_pattern)
    alpha_pattern = re.compile(
        six.b('[') + re.escape(ALPHA_NUM) + six.b(']') + re_repeat)
    for is_num, chunk in num_bits:
        if is_num:
            yield QRData(chunk, mode=MODE_NUMBER, check_data=False)
        else:
            for is_alpha, sub_chunk in _optimal_split(chunk, alpha_pattern):
                if is_alpha:
                    mode = MODE_ALPHA_NUM
                else:
                    mode = MODE_8BIT_BYTE
                yield QRData(sub_chunk, mode=mode, check_data=False)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _parse_pem_key(raw_key_input):
    """Identify and extract PEM keys.

    Determines whether the given key is in the format of PEM key, and extracts
    the relevant part of the key if it is.

    Args:
        raw_key_input: The contents of a private key file (either PEM or
                       PKCS12).

    Returns:
        string, The actual key if the contents are from a PEM file, or
        else None.
    """
    offset = raw_key_input.find(b'-----BEGIN ')
    if offset != -1:
        return raw_key_input[offset:]
项目:gym-extensions    作者:Breakend    | 项目源码 | 文件源码
def rotate_vector(v, axis, theta):
    """
    Return the rotation matrix associated with counterclockwise rotation about
    the given axis by theta radians.
    """
    axis = np.asarray(axis)
    axis = axis/math.sqrt(np.dot(axis, axis))
    a = math.cos(theta/2.0)
    b, c, d = -axis*math.sin(theta/2.0)
    aa, bb, cc, dd = a*a, b*b, c*c, d*d
    bc, ad, ac, ab, bd, cd = b*c, a*d, a*c, a*b, b*d, c*d
    R = np.array([[aa+bb-cc-dd, 2*(bc+ad), 2*(bd-ac)],
                     [2*(bc-ad), aa+cc-bb-dd, 2*(cd+ab)],
                     [2*(bd+ac), 2*(cd-ab), aa+dd-bb-cc]])

    return np.dot(R, v)
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def test_format_locals(self):
        def some_inner(k, v):
            a = 1
            b = 2
            return traceback.StackSummary.extract(
                traceback.walk_stack(None), capture_locals=True, limit=1)
        s = some_inner(3, 4)
        self.assertEqual(
            ['  File "' + FNAME + '", line 651, '
             'in some_inner\n'
             '    traceback.walk_stack(None), capture_locals=True, limit=1)\n'
             '    a = 1\n'
             '    b = 2\n'
             '    k = 3\n'
             '    v = 4\n'
            ], s.format())
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def test_host_override(self):
        loader = load_from_dict(
            port_forwarding=dict(
                host="service",
            )
        )
        client = self.init(loader)

        response = client.get(
            "/",
            headers={
                "X-Forwarded-Port": "8080",
            },
        )

        assert_that(response.status_code, is_(equal_to(200)))
        assert_that(response.data, is_(equal_to(b("http://service/"))))
项目:requests-staticmock    作者:tonybaloney    | 项目源码 | 文件源码
def test_class_context_manager_good_factory():
    class_session = Session()

    class TestMockClass(BaseMockClass):
        def _test_json(self, request):
            return StaticResponseFactory.GoodResponse(
                request=request,
                body=b("it's my life"),
                headers={'now': 'never'},
                status_code=201
            )

    with mock_session_with_class(class_session, TestMockClass, 'http://test.com'):
        response = class_session.get('http://test.com/test.json')
    assert response.text == "it's my life"
    assert 'now' in response.headers.keys()
    assert response.headers['now'] == 'never'
    assert response.status_code == 201
项目:requests-staticmock    作者:tonybaloney    | 项目源码 | 文件源码
def test_class_context_manager_bad_factory():
    class_session = Session()

    class TestMockClass(BaseMockClass):
        def _test_json(self, request):
            return StaticResponseFactory.BadResponse(
                request=request,
                body=b("it's not over"),
                headers={'now': 'never'},
            )

    with mock_session_with_class(class_session, TestMockClass, 'http://test.com'):
        response = class_session.get('http://test.com/test.json')
    assert response.text == "it's not over"
    assert 'now' in response.headers.keys()
    assert response.headers['now'] == 'never'
    assert response.status_code == DEFAULT_BAD_STATUS_CODE
项目:deb-python-traceback2    作者:openstack    | 项目源码 | 文件源码
def test_format_locals(self):
        def some_inner(k, v):
            a = 1
            b = 2
            return traceback.StackSummary.extract(
                traceback.walk_stack(None), capture_locals=True, limit=1)
        s = some_inner(3, 4)
        self.assertEqual(
            ['  File "' + FNAME + '", line 651, '
             'in some_inner\n'
             '    traceback.walk_stack(None), capture_locals=True, limit=1)\n'
             '    a = 1\n'
             '    b = 2\n'
             '    k = 3\n'
             '    v = 4\n'
            ], s.format())
项目:zun    作者:openstack    | 项目源码 | 文件源码
def __call__(self, environ, start_response):
        for err_str in self.app_iter:
            err = {}
            try:
                err = json.loads(err_str.decode('utf-8'))
            except ValueError:
                pass

            links = {'rel': 'help', 'href': 'https://developer.openstack.org'
                     '/api-guide/compute/microversions.html'}

            err['max_version'] = self.max_version
            err['min_version'] = self.min_version
            err['code'] = "zun.microversion-unsupported"
            err['links'] = [links]
            err['title'] = "Requested microversion is unsupported"

        self.app_iter = [six.b(json.dump_as_bytes(err))]
        self.headers['Content-Length'] = str(len(self.app_iter[0]))

        return super(HTTPNotAcceptableAPIVersion, self).__call__(
            environ, start_response)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def get_id(source_uuid):
    """Derive a short (12 character) id from a random UUID.

    The supplied UUID must be a version 4 UUID object.
    """

    if isinstance(source_uuid, six.string_types):
        source_uuid = uuid.UUID(source_uuid)
    if source_uuid.version != 4:
        raise ValueError(_('Invalid UUID version (%d)') % source_uuid.version)

    # The "time" field of a v4 UUID contains 60 random bits
    # (see RFC4122, Section 4.4)
    random_bytes = _to_byte_string(source_uuid.time, 60)
    # The first 12 bytes (= 60 bits) of base32-encoded output is our data
    encoded = base64.b32encode(six.b(random_bytes))[:12]

    if six.PY3:
        return encoded.lower().decode('utf-8')
    else:
        return encoded.lower()
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def check(self, fmt, value):
        from random import randrange
        # build a buffer which is surely big enough to contain what we need
        # and check:
        #   1) that we correctly write the bytes we expect
        #   2) that we do NOT write outside the bounds
        #
        pattern = [six.int2byte(randrange(256)) for _ in range(256)]
        pattern = b''.join(pattern)
        buf = bytearray(pattern)
        buf2 = bytearray(pattern)
        offset = 16
        pack_into(ord(fmt), buf, offset, value)
        struct.pack_into(fmt, buf2, offset, value)
        assert buf == buf2
        #
        # check that it raises if it's out of bound
        out_of_bound = 256-struct.calcsize(fmt)+1
        pytest.raises(IndexError, "pack_into(ord(fmt), buf, out_of_bound, value)")
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_dumps_alignment():
    class Person(Struct):
        pass

    buf = b('\x20\x00\x00\x00\x00\x00\x00\x00'   # age=32
            '\x01\x00\x00\x00\x2a\x00\x00\x00'   # name=ptr
            'J' 'o' 'h' 'n' '\x00\x00\x00\x00')  # John

    p = Person.from_buffer(buf, 0, data_size=1, ptrs_size=1)
    msg = dumps(p)
    exp = b('\x00\x00\x00\x00\x04\x00\x00\x00'   # message header: 1 segment, size 3 words
            '\x00\x00\x00\x00\x01\x00\x01\x00'   # ptr to payload
            '\x20\x00\x00\x00\x00\x00\x00\x00'   # age=32
            '\x01\x00\x00\x00\x2a\x00\x00\x00'   # name=ptr
            'J' 'o' 'h' 'n' '\x00\x00\x00\x00')  # John
    assert msg == exp
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_primitive(self):
        schema = """
        @0xbf5147cbbecf40c1;
        struct Point {
            x @0 :Int64;
            y @1 :Int64;
        }
        """
        mod = self.compile(schema)
        buf = b('\x01\x00\x00\x00\x00\x00\x00\x00'  # 1
                '\x02\x00\x00\x00\x00\x00\x00\x00') # 2
        #
        p = mod.Point(1, 2)
        assert p.x == 1
        assert p.y == 2
        assert p._seg.buf == buf
        #
        p = mod.Point(y=2, x=1)
        assert p.x == 1
        assert p.y == 2
        assert p._seg.buf == buf
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_enum(self):
        schema = """
        @0xbf5147cbbecf40c1;
        enum Color {
            red @0;
            green @1;
            blue @2;
            yellow @3;
        }
        struct Point {
            x @0 :Int64;
            y @1 :Int64;
            color @2 :Color;
        }
        """
        mod = self.compile(schema)
        buf = b('\x01\x00\x00\x00\x00\x00\x00\x00'  # 1
                '\x02\x00\x00\x00\x00\x00\x00\x00'  # 2
                '\x03\x00\x00\x00\x00\x00\x00\x00') # yellow
        #
        p = mod.Point(1, 2, mod.Color.yellow)
        assert p.x == 1
        assert p.y == 2
        assert p.color == mod.Color.yellow
        assert p._seg.buf == buf
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_void(self):
        schema = """
        @0xbf5147cbbecf40c1;
        struct Point {
            x @0 :Int64;
            y @1 :Int64;
            z @2 :Void;
        }
        """
        mod = self.compile(schema)
        buf = b('\x01\x00\x00\x00\x00\x00\x00\x00'  # 1
                '\x02\x00\x00\x00\x00\x00\x00\x00') # 2
        #
        p = mod.Point(1, 2)
        assert p.x == 1
        assert p.y == 2
        assert p._seg.buf == buf
        py.test.raises(TypeError, "mod.Point(z=None)")
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_struct(self):
        schema = """
        @0xbf5147cbbecf40c1;
        struct Point {
            x @0 :Int64;
            y @1 :Int64;
        }
        struct Foo {
            x @0 :Point;
        }
        """
        mod = self.compile(schema)
        p = mod.Point(1, 2)
        foo = mod.Foo(p)
        assert foo._seg.buf == b('\x00\x00\x00\x00\x02\x00\x00\x00'  # ptr to point
                                 '\x01\x00\x00\x00\x00\x00\x00\x00'  # p.x == 1
                                 '\x02\x00\x00\x00\x00\x00\x00\x00') # p.y == 2
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_list_of_text(self):
        schema = """
        @0xbf5147cbbecf40c1;
        struct Foo {
            x @0 :List(Text);
        }
        """
        mod = self.compile(schema)
        foo = mod.Foo([b'foo', b'bar', b'baz'])
        expected = b('\x01\x00\x00\x00\x1e\x00\x00\x00'    # ptrlist
                     '\x09\x00\x00\x00\x22\x00\x00\x00'
                     '\x09\x00\x00\x00\x22\x00\x00\x00'
                     '\x09\x00\x00\x00\x22\x00\x00\x00'
                     'foo\x00\x00\x00\x00\x00'
                     'bar\x00\x00\x00\x00\x00'
                     'baz\x00\x00\x00\x00\x00')
        assert foo._seg.buf == expected
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_list_of_lists(self):
        schema = """
        @0xbf5147cbbecf40c1;
        struct Foo {
            x @0 :List(List(Int8));
        }
        """
        mod = self.compile(schema)
        foo = mod.Foo([[1, 2, 3], [4, 5], [6, 7, 8, 9]])
        expected = b('\x01\x00\x00\x00\x1e\x00\x00\x00'  # list<ptr> (3 items)
                     '\x09\x00\x00\x00\x1a\x00\x00\x00'  # list<8> (3 items)
                     '\x09\x00\x00\x00\x12\x00\x00\x00'  # list<8> (2 items)
                     '\x09\x00\x00\x00\x22\x00\x00\x00'  # list<8> (4 items)
                     '\x01\x02\x03\x00\x00\x00\x00\x00'
                     '\x04\x05\x00\x00\x00\x00\x00\x00'
                     '\x06\x07\x08\x09\x00\x00\x00\x00')
        assert foo._seg.buf == expected
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_group(self):
        schema = """
        @0xbf5147cbbecf40c1;
        struct Point {
            position :group {
                x @0 :Int64;
                y @1 :Int64;
            }
            color @2 :Text;
        }
        """
        mod = self.compile(schema)
        p = mod.Point(position=(1, 2), color=b'red')
        assert p.position.x == 1
        assert p.position.y == 2
        assert p.color == b'red'
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_nested_group(self):
        schema = """
        @0xbf5147cbbecf40c1;
        struct Shape {
            position :group {
                a :group {
                    x @0 :Int64;
                    y @1 :Int64;
                }
                b :group {
                    x @2 :Int64;
                    y @3 :Int64;
                }
            }
        }
        """
        mod = self.compile(schema)
        p = mod.Shape(position=((1, 2), (3, 4)))
        assert p.position.a.x == 1
        assert p.position.a.y == 2
        assert p.position.b.x == 3
        assert p.position.b.y == 4
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_group_named_params(self):
        schema = """
        @0xbf5147cbbecf40c1;
        struct Point {
            position :group {
                x @0 :Int64;
                y @1 :Int64;
            }
            color :group {
                alpha @2 :Float64;
                name  @3 :Text;
            }
        }
        """
        mod = self.compile(schema)
        p = mod.Point(position=mod.Point.Position(x=1, y=2),
                      color=mod.Point.Color(alpha=1.0, name=b'red'))
        assert p.position.x == 1
        assert p.position.y == 2
        assert p.color.alpha == 1.0
        assert p.color.name == b'red'
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_group_named_params(self):
        schema = """
        @0xbf5147cbbecf40c1;
        struct Point {
            position :group {
                x @0 :Int64 = 42;
                y @1 :Int64;
            }
            color :group {
                alpha @2 :UInt8 = 255;
                name  @3 :Text;
            }
        }
        """
        mod = self.compile(schema)
        assert mod.Point.Position() == (42, 0)
        assert mod.Point.Color() == (255, None)
        p = mod.Point(position=mod.Point.Position(y=2),
                      color=mod.Point.Color(name=b'red'))
        assert p.position.x == 42
        assert p.position.y == 2
        assert p.color.alpha == 255
        assert p.color.name == b'red'
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_enum(self):
        schema = """
        @0xbf5147cbbecf40c1;
        enum Color {
            red @0;
            green @1;
            blue @2;
        }
        struct P {
            x @0 :Color;
            y @1 :Color;
        }
        """
        self.mod = self.compile(schema)
        buf = b'\x01\x00\x00\x00\x00\x00\x00\x00'
        p = self.mod.P.from_buffer(buf, 0, 1, 0)
        self.check(p, '(x = green, y = red)')
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_text(self):
        schema = """
        @0xbf5147cbbecf40c1;
        struct Person {
            name @0 :Text;
            surname @1 :Text;
        }
        """
        self.mod = self.compile(schema)
        p = self.mod.Person(name=None, surname=None)
        self.check(p, '()')
        #
        p = self.mod.Person(name=b"foo", surname=None)
        self.check(p, '(name = "foo")')
        #
        p = self.mod.Person(name=None, surname=b"bar")
        self.check(p, '(surname = "bar")')
        #
        p = self.mod.Person(name=b"foo", surname=b"bar")
        self.check(p, '(name = "foo", surname = "bar")')
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_text_special_chars(self):
        schema = """
        @0xbf5147cbbecf40c1;
        struct P {
            txt @0 :Text;
        }
        """
        self.mod = self.compile(schema)
        p = self.mod.P(txt=b'double "quotes"')
        self.check(p, r'(txt = "double \"quotes\"")')
        #
        p = self.mod.P(txt=b"single 'quotes'")
        self.check(p, r'(txt = "single \'quotes\'")')
        #
        p = self.mod.P(txt=b"tricky \" '")
        self.check(p, r'(txt = "tricky \" \'")')
        #
        p = self.mod.P(txt=u'hellò'.encode('utf-8'))
        self.check(p, r'(txt = "hell\xc3\xb2")')
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_struct(self):
        schema = """
        @0xbf5147cbbecf40c1;
        struct Point {
            x @0 :Int64;
            y @1 :Int64;
        }
        struct Rectangle {
            a @0 :Point;
            b @1 :Point;
            empty @2 :Point;
        }
        """
        self.mod = self.compile(schema)
        p1 = self.mod.Point(1, 2)
        p2 = self.mod.Point(3, 4)
        r = self.mod.Rectangle(p1, p2, None)
        self.check(r, '(a = (x = 1, y = 2), b = (x = 3, y = 4))')
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_list(self):
        schema = """
        @0xbf5147cbbecf40c1;
        struct Point {
            x @0 :Int64;
            y @1 :Int64;
        }
        struct P {
            ints @0 :List(Int64);
            structs @1 :List(Point);
            texts @2 :List(Text);
        }
        """
        self.mod = self.compile(schema)
        p = self.mod.P(ints=[1, 2, 3], structs=None, texts=None)
        self.check(p, '(ints = [1, 2, 3])')
        #
        p1 = self.mod.Point(1, 2)
        p2 = self.mod.Point(3, 4)
        p = self.mod.P(ints=None, structs=[p1, p2], texts=None)
        self.check(p, '(structs = [(x = 1, y = 2), (x = 3, y = 4)])')
        #
        p = self.mod.P(ints=None, structs=None, texts=[b'foo', b'bar', b'baz'])
        self.check(p, '(texts = ["foo", "bar", "baz"])')
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_union(self):
        schema = """
        @0xbf5147cbbecf40c1;
        struct P {
            union {
                x @0 :Int64;
                y @1 :Void;
                z @2 :Text;
            }
        }
        """
        self.mod = self.compile(schema)
        p = self.mod.P.new_x(1)
        self.check(p, '(x = 1)')
        #
        p = self.mod.P.new_y()
        self.check(p, '(y = void)')
        #
        p = self.mod.P.new_z(b'hello')
        self.check(p, '(z = "hello")')
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_listbuilder_bug(self):
        schema = """
            @0xbf5147cbbecf40c1;
            struct Bar {
                x @0 :Int64;
                y @1 :Int64;
            }

            struct Foo {
                name @0 :Text;
                bars @1 :List(Bar);
            }
        """
        mod = self.compile(schema)
        bars = [mod.Bar(1, 2)]
        foo = mod.Foo(b'name', bars)
        assert len(foo.bars) == 1
        assert foo.bars[0].x == 1
        assert foo.bars[0].y == 2
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_listbuilder_null_ptrs(self):
        schema = """
            @0xbf5147cbbecf40c1;
            struct Bar {
                x @0 :Int64;
                y @1 :Int64;
                name @2 :Text;
            }

            struct Foo {
                bars @0 :List(Bar);
            }
        """
        mod = self.compile(schema)
        a = mod.Bar(1, 2, None)
        b = mod.Bar(3, 4, None)
        foo = mod.Foo([a, b])
        a1 = foo.bars[0]
        assert a1.x == 1
        assert a1.y == 2
        assert a1.name is None
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_compact_structs(self):
        schema = """
            @0xbf5147cbbecf40c1;
            struct Person {
                name @0 :Text;
            }

            struct Foo {
                key @0 :Person;
            }
        """
        mod = self.compile(schema)
        buf = b('garbage0'
                '\x05\x00\x00\x00\x32\x00\x00\x00'  # ptr to name
                'garbage1'
                'dummy\x00\x00\x00')
        p = mod.Person.from_buffer(buf, 8, 0, 1)
        foo = mod.Foo(p)
        assert foo.key.name == b'dummy'
        # we check that the structure has been packed
        assert foo.key._data_offset == 8
        assert foo.key._seg.buf[8:] == b('\x01\x00\x00\x00\x32\x00\x00\x00'  # ptr to dummy
                                         'dummy\x00\x00\x00')
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def test_compact_struct_inside_list(self):
        schema = """
            @0xbf5147cbbecf40c1;
            struct Person {
                name @0 :Text;
                surname @1 :Text;
            }

            struct Town {
                people @0 :List(Person);
            }
        """
        mod = self.compile(schema)
        p1 = mod.Person(b'Mickey', b'Mouse')
        p2 = mod.Person(b'Donald', b'Duck')
        t = mod.Town([p1, p2])
        assert t.people[0].name == b'Mickey'
        assert t.people[0].surname == b'Mouse'
        assert t.people[1].name == b'Donald'
        assert t.people[1].surname == b'Duck'