我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.b()。
def _parse_pem_key(raw_key_input): """Identify and extract PEM keys. Determines whether the given key is in the format of PEM key, and extracts the relevant part of the key if it is. Args: raw_key_input: The contents of a private key file (either PEM or PKCS12). Returns: string, The actual key if the contents are from a PEM file, or else None. """ offset = raw_key_input.find(b'-----BEGIN ') if offset != -1: return raw_key_input[offset:]
def setUp(self): tempFile = tempfile.NamedTemporaryFile() self.fileServerDir = tempFile.name tempFile.close() os.mkdir(self.fileServerDir) os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'), encoding='base64') signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)), encoding='base64').decode() VERSIONS['signature'] = signature keysFilePath = os.path.join(self.fileServerDir, 'keys.gz') with gzip.open(keysFilePath, 'wb') as keysFile: keysFile.write(json.dumps(KEYS, sort_keys=True)) versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz') with gzip.open(versionsFilePath, 'wb') as versionsFile: versionsFile.write(json.dumps(VERSIONS, sort_keys=True)) os.environ['WXUPDATEDEMO_TESTING'] = 'True' from wxupdatedemo.config import CLIENT_CONFIG self.clientConfig = CLIENT_CONFIG self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
def setUp(self): tempFile = tempfile.NamedTemporaryFile() self.fileServerDir = tempFile.name tempFile.close() os.mkdir(self.fileServerDir) os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'), encoding='base64') signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)), encoding='base64').decode() VERSIONS['signature'] = signature keysFilePath = os.path.join(self.fileServerDir, 'keys.gz') with gzip.open(keysFilePath, 'wb') as keysFile: keysFile.write(json.dumps(KEYS, sort_keys=True)) versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz') with gzip.open(versionsFilePath, 'wb') as versionsFile: versionsFile.write(json.dumps(VERSIONS, sort_keys=True)) os.environ['WXUPDATEDEMO_TESTING'] = 'True' from wxupdatedemo.config import CLIENT_CONFIG self.clientConfig = CLIENT_CONFIG self.clientConfig.PUBLIC_KEY = PUBLIC_KEY self.clientConfig.APP_NAME = APP_NAME
def test_clear(self): keyspace = 'keyspace' routing_key = 'routing_key' custom_payload = {'key': six.b('value')} ss = SimpleStatement('whatever', keyspace=keyspace, routing_key=routing_key, custom_payload=custom_payload) batch = BatchStatement() batch.add(ss) self.assertTrue(batch._statements_and_parameters) self.assertEqual(batch.keyspace, keyspace) self.assertEqual(batch.routing_key, routing_key) self.assertEqual(batch.custom_payload, custom_payload) batch.clear() self.assertFalse(batch._statements_and_parameters) self.assertIsNone(batch.keyspace) self.assertIsNone(batch.routing_key) self.assertFalse(batch.custom_payload) batch.add(ss)
def test_non_frozen_udts(self): """ Test to ensure that non frozen udt's work with C* >3.6. @since 3.7.0 @jira_ticket PYTHON-498 @expected_result Non frozen UDT's are supported @test_category data_types, udt """ self.session.execute("USE {0}".format(self.keyspace_name)) self.session.execute("CREATE TYPE user (state text, has_corn boolean)") self.session.execute("CREATE TABLE {0} (a int PRIMARY KEY, b user)".format(self.function_table_name)) User = namedtuple('user', ('state', 'has_corn')) self.cluster.register_user_type(self.keyspace_name, "user", User) self.session.execute("INSERT INTO {0} (a, b) VALUES (%s, %s)".format(self.function_table_name), (0, User("Nebraska", True))) self.session.execute("UPDATE {0} SET b.has_corn = False where a = 0".format(self.function_table_name)) result = self.session.execute("SELECT * FROM {0}".format(self.function_table_name)) self.assertFalse(result[0].b.has_corn) table_sql = self.cluster.metadata.keyspaces[self.keyspace_name].tables[self.function_table_name].as_cql_query() self.assertNotIn("<frozen>", table_sql)
def test_raise_error_on_nonexisting_udts(self): """ Test for ensuring that an error is raised for operating on a nonexisting udt or an invalid keyspace """ c = Cluster(protocol_version=PROTOCOL_VERSION) s = c.connect(self.keyspace_name, wait_for_all_pools=True) User = namedtuple('user', ('age', 'name')) with self.assertRaises(UserTypeDoesNotExist): c.register_user_type("some_bad_keyspace", "user", User) with self.assertRaises(UserTypeDoesNotExist): c.register_user_type("system", "user", User) with self.assertRaises(InvalidRequest): s.execute("CREATE TABLE mytable (a int PRIMARY KEY, b frozen<user>)") c.shutdown()
def test_cluster_column_ordering_reversed_metadata(self): """ Simple test to ensure that the metatdata associated with cluster ordering is surfaced is surfaced correctly. Creates a table with a few clustering keys. Then checks the clustering order associated with clustering columns and ensure it's set correctly. @since 3.0.0 @jira_ticket PYTHON-402 @expected_result is_reversed is set on DESC order, and is False on ASC @test_category metadata """ create_statement = self.make_create_statement(["a"], ["b", "c"], ["d"], compact=True) create_statement += " AND CLUSTERING ORDER BY (b ASC, c DESC)" self.session.execute(create_statement) tablemeta = self.get_table_metadata() b_column = tablemeta.columns['b'] self.assertFalse(b_column.is_reversed) c_column = tablemeta.columns['c'] self.assertTrue(c_column.is_reversed)
def test_cql_compatibility(self): if CASS_SERVER_VERSION >= (3, 0): raise unittest.SkipTest("cql compatibility does not apply Cassandra 3.0+") # having more than one non-PK column is okay if there aren't any # clustering columns create_statement = self.make_create_statement(["a"], [], ["b", "c", "d"], compact=True) self.session.execute(create_statement) tablemeta = self.get_table_metadata() self.assertEqual([u'a'], [c.name for c in tablemeta.partition_key]) self.assertEqual([], tablemeta.clustering_key) self.assertEqual([u'a', u'b', u'c', u'd'], sorted(tablemeta.columns.keys())) self.assertTrue(tablemeta.is_cql_compatible) # ... but if there are clustering columns, it's not CQL compatible. # This is a hacky way to simulate having clustering columns. tablemeta.clustering_key = ["foo", "bar"] tablemeta.columns["foo"] = None tablemeta.columns["bar"] = None self.assertFalse(tablemeta.is_cql_compatible)
def test_indexes(self): create_statement = self.make_create_statement(["a"], ["b", "c"], ["d", "e", "f"]) create_statement += " WITH CLUSTERING ORDER BY (b ASC, c ASC)" execute_until_pass(self.session, create_statement) d_index = "CREATE INDEX d_index ON %s.%s (d)" % (self.keyspace_name, self.function_table_name) e_index = "CREATE INDEX e_index ON %s.%s (e)" % (self.keyspace_name, self.function_table_name) execute_until_pass(self.session, d_index) execute_until_pass(self.session, e_index) tablemeta = self.get_table_metadata() statements = tablemeta.export_as_string().strip() statements = [s.strip() for s in statements.split(';')] statements = list(filter(bool, statements)) self.assertEqual(3, len(statements)) self.assertIn(d_index, statements) self.assertIn(e_index, statements) # make sure indexes are included in KeyspaceMetadata.export_as_string() ksmeta = self.cluster.metadata.keyspaces[self.keyspace_name] statement = ksmeta.export_as_string() self.assertIn('CREATE INDEX d_index', statement) self.assertIn('CREATE INDEX e_index', statement)
def test_non_size_tiered_compaction(self): """ test options for non-size-tiered compaction strategy Creates a table with LeveledCompactionStrategy, specifying one non-default option. Verifies that the option is present in generated CQL, and that other legacy table parameters (min_threshold, max_threshold) are not included. @since 2.6.0 @jira_ticket PYTHON-352 @expected_result the options map for LeveledCompactionStrategy does not contain min_threshold, max_threshold @test_category metadata """ create_statement = self.make_create_statement(["a"], [], ["b", "c"]) create_statement += "WITH COMPACTION = {'class': 'LeveledCompactionStrategy', 'tombstone_threshold': '0.3'}" self.session.execute(create_statement) table_meta = self.get_table_metadata() cql = table_meta.export_as_string() self.assertIn("'tombstone_threshold': '0.3'", cql) self.assertIn("LeveledCompactionStrategy", cql) self.assertNotIn("min_threshold", cql) self.assertNotIn("max_threshold", cql)
def test_simple(self): task1 = TaskRegistry.create('test_executor_add') task1.Input.a = 10 task1.Input.b = 20 task2 = TaskRegistry.create('test_executor_add') task2.Input.a = 100 task2.Input.b = 200 executor1 = TaskExecutor() executor1.set_task(task1) executor1.execute() executor2 = TaskExecutor() executor2.set_task(task2) executor2.execute() self.assertEqual(task1.Output.c, 30) self.assertEqual(task2.Output.c, 300)
def test_stream(self): with open('test.tmp', 'wb') as fd: fd.write(six.b('Hello World ') * 100) task = TaskRegistry.create('test_executor_stream_copy') task.Input.stream = open('test.tmp', 'rb') task.Output.stream = open('test.out', 'wb') executor = TaskExecutor() executor.set_task(task) executor.execute() with open('test.out', 'rb') as fd: self.assertEqual(fd.read(), six.b('Hello World ') * 100) os.unlink('test.tmp') os.unlink('test.out')
def test_string_field(self): instance = StringField() self.assertEqual(instance.get_initial(), None) with self.assertRaises(InvalidValueException): instance.validate(1) with self.assertRaises(InvalidValueException): instance.validate(1.5) with self.assertRaises(InvalidValueException): instance.validate(six.b('ABC')) instance = StringField(default=six.u('ABC')) self.assertEqual(instance.get_initial(), six.u('ABC')) instance.validate(six.u('hello'))
def test_byte_field(self): instance = ByteField() self.assertEqual(instance.get_initial(), None) with self.assertRaises(InvalidValueException): instance.validate(1) with self.assertRaises(InvalidValueException): instance.validate(1.5) with self.assertRaises(InvalidValueException): instance.validate(six.u('ABC')) instance = ByteField(default=six.b('ABC')) self.assertEqual(instance.get_initial(), six.b('ABC')) instance.validate(six.b('hello'))
def test_struct_field(self): instance = StructField(a=IntegerField(), b=FloatField()) val = instance.create() val.a = 100 val.b = 3.14 self.assertEqual(val.a, 100) nested_instance = StructField( a=IntegerField(), b=StructField( c=FloatField(), d=StringField(default=six.u('hello world')) ) ) val = nested_instance.create() val.a = 100 val.b.c = 3.14 self.assertEqual(val.b.c, 3.14) self.assertEqual(val.b.d, six.u('hello world'))
def test_single_io_write_stream_encode_output( tmpdir, patch_aws_encryption_sdk_stream, patch_json_ready_header, patch_json_ready_header_auth ): patch_aws_encryption_sdk_stream.return_value = io.BytesIO(DATA) patch_aws_encryption_sdk_stream.return_value.header = MagicMock(encryption_context=sentinel.encryption_context) target_file = tmpdir.join('target') mock_source = MagicMock() kwargs = GOOD_IOHANDLER_KWARGS.copy() kwargs['encode_output'] = True handler = io_handling.IOHandler(**kwargs) with open(str(target_file), 'wb') as destination_writer: handler._single_io_write( stream_args={ 'mode': 'encrypt', 'a': sentinel.a, 'b': sentinel.b }, source=mock_source, destination_writer=destination_writer ) assert target_file.read('rb') == base64.b64encode(DATA)
def test_should_write_file_does_exist(tmpdir, patch_input, interactive, no_overwrite, user_input, expected): target_file = tmpdir.join('target') target_file.write(b'') patch_input.return_value = user_input kwargs = GOOD_IOHANDLER_KWARGS.copy() kwargs.update(dict( interactive=interactive, no_overwrite=no_overwrite )) handler = io_handling.IOHandler(**kwargs) should_write = handler._should_write_file(str(target_file)) if expected: assert should_write else: assert not should_write
def write_metadata(self, **metadata): # type: (**Any) -> Optional[int] """Writes metadata to the output stream if output is not suppressed. :param **metadata: JSON-serializeable metadata kwargs to write """ if self.suppress_output: return 0 # wrote 0 bytes metadata_line = json.dumps(metadata, sort_keys=True) + os.linesep metadata_output = '' # type: Union[str, bytes] if 'b' in self._output_mode: metadata_output = metadata_line.encode('utf-8') else: metadata_output = metadata_line return self._output_stream.write(metadata_output)
def optimal_data_chunks(data, minimum=4): """ An iterator returning QRData chunks optimized to the data content. :param minimum: The minimum number of bytes in a row to split as a chunk. """ data = to_bytestring(data) re_repeat = ( six.b('{') + six.text_type(minimum).encode('ascii') + six.b(',}')) num_pattern = re.compile(six.b('\d') + re_repeat) num_bits = _optimal_split(data, num_pattern) alpha_pattern = re.compile( six.b('[') + re.escape(ALPHA_NUM) + six.b(']') + re_repeat) for is_num, chunk in num_bits: if is_num: yield QRData(chunk, mode=MODE_NUMBER, check_data=False) else: for is_alpha, sub_chunk in _optimal_split(chunk, alpha_pattern): if is_alpha: mode = MODE_ALPHA_NUM else: mode = MODE_8BIT_BYTE yield QRData(sub_chunk, mode=mode, check_data=False)
def rotate_vector(v, axis, theta): """ Return the rotation matrix associated with counterclockwise rotation about the given axis by theta radians. """ axis = np.asarray(axis) axis = axis/math.sqrt(np.dot(axis, axis)) a = math.cos(theta/2.0) b, c, d = -axis*math.sin(theta/2.0) aa, bb, cc, dd = a*a, b*b, c*c, d*d bc, ad, ac, ab, bd, cd = b*c, a*d, a*c, a*b, b*d, c*d R = np.array([[aa+bb-cc-dd, 2*(bc+ad), 2*(bd-ac)], [2*(bc-ad), aa+cc-bb-dd, 2*(cd+ab)], [2*(bd+ac), 2*(cd-ab), aa+dd-bb-cc]]) return np.dot(R, v)
def test_format_locals(self): def some_inner(k, v): a = 1 b = 2 return traceback.StackSummary.extract( traceback.walk_stack(None), capture_locals=True, limit=1) s = some_inner(3, 4) self.assertEqual( [' File "' + FNAME + '", line 651, ' 'in some_inner\n' ' traceback.walk_stack(None), capture_locals=True, limit=1)\n' ' a = 1\n' ' b = 2\n' ' k = 3\n' ' v = 4\n' ], s.format())
def test_host_override(self): loader = load_from_dict( port_forwarding=dict( host="service", ) ) client = self.init(loader) response = client.get( "/", headers={ "X-Forwarded-Port": "8080", }, ) assert_that(response.status_code, is_(equal_to(200))) assert_that(response.data, is_(equal_to(b("http://service/"))))
def test_class_context_manager_good_factory(): class_session = Session() class TestMockClass(BaseMockClass): def _test_json(self, request): return StaticResponseFactory.GoodResponse( request=request, body=b("it's my life"), headers={'now': 'never'}, status_code=201 ) with mock_session_with_class(class_session, TestMockClass, 'http://test.com'): response = class_session.get('http://test.com/test.json') assert response.text == "it's my life" assert 'now' in response.headers.keys() assert response.headers['now'] == 'never' assert response.status_code == 201
def test_class_context_manager_bad_factory(): class_session = Session() class TestMockClass(BaseMockClass): def _test_json(self, request): return StaticResponseFactory.BadResponse( request=request, body=b("it's not over"), headers={'now': 'never'}, ) with mock_session_with_class(class_session, TestMockClass, 'http://test.com'): response = class_session.get('http://test.com/test.json') assert response.text == "it's not over" assert 'now' in response.headers.keys() assert response.headers['now'] == 'never' assert response.status_code == DEFAULT_BAD_STATUS_CODE
def __call__(self, environ, start_response): for err_str in self.app_iter: err = {} try: err = json.loads(err_str.decode('utf-8')) except ValueError: pass links = {'rel': 'help', 'href': 'https://developer.openstack.org' '/api-guide/compute/microversions.html'} err['max_version'] = self.max_version err['min_version'] = self.min_version err['code'] = "zun.microversion-unsupported" err['links'] = [links] err['title'] = "Requested microversion is unsupported" self.app_iter = [six.b(json.dump_as_bytes(err))] self.headers['Content-Length'] = str(len(self.app_iter[0])) return super(HTTPNotAcceptableAPIVersion, self).__call__( environ, start_response)
def get_id(source_uuid): """Derive a short (12 character) id from a random UUID. The supplied UUID must be a version 4 UUID object. """ if isinstance(source_uuid, six.string_types): source_uuid = uuid.UUID(source_uuid) if source_uuid.version != 4: raise ValueError(_('Invalid UUID version (%d)') % source_uuid.version) # The "time" field of a v4 UUID contains 60 random bits # (see RFC4122, Section 4.4) random_bytes = _to_byte_string(source_uuid.time, 60) # The first 12 bytes (= 60 bits) of base32-encoded output is our data encoded = base64.b32encode(six.b(random_bytes))[:12] if six.PY3: return encoded.lower().decode('utf-8') else: return encoded.lower()
def check(self, fmt, value): from random import randrange # build a buffer which is surely big enough to contain what we need # and check: # 1) that we correctly write the bytes we expect # 2) that we do NOT write outside the bounds # pattern = [six.int2byte(randrange(256)) for _ in range(256)] pattern = b''.join(pattern) buf = bytearray(pattern) buf2 = bytearray(pattern) offset = 16 pack_into(ord(fmt), buf, offset, value) struct.pack_into(fmt, buf2, offset, value) assert buf == buf2 # # check that it raises if it's out of bound out_of_bound = 256-struct.calcsize(fmt)+1 pytest.raises(IndexError, "pack_into(ord(fmt), buf, out_of_bound, value)")
def test_dumps_alignment(): class Person(Struct): pass buf = b('\x20\x00\x00\x00\x00\x00\x00\x00' # age=32 '\x01\x00\x00\x00\x2a\x00\x00\x00' # name=ptr 'J' 'o' 'h' 'n' '\x00\x00\x00\x00') # John p = Person.from_buffer(buf, 0, data_size=1, ptrs_size=1) msg = dumps(p) exp = b('\x00\x00\x00\x00\x04\x00\x00\x00' # message header: 1 segment, size 3 words '\x00\x00\x00\x00\x01\x00\x01\x00' # ptr to payload '\x20\x00\x00\x00\x00\x00\x00\x00' # age=32 '\x01\x00\x00\x00\x2a\x00\x00\x00' # name=ptr 'J' 'o' 'h' 'n' '\x00\x00\x00\x00') # John assert msg == exp
def test_primitive(self): schema = """ @0xbf5147cbbecf40c1; struct Point { x @0 :Int64; y @1 :Int64; } """ mod = self.compile(schema) buf = b('\x01\x00\x00\x00\x00\x00\x00\x00' # 1 '\x02\x00\x00\x00\x00\x00\x00\x00') # 2 # p = mod.Point(1, 2) assert p.x == 1 assert p.y == 2 assert p._seg.buf == buf # p = mod.Point(y=2, x=1) assert p.x == 1 assert p.y == 2 assert p._seg.buf == buf
def test_enum(self): schema = """ @0xbf5147cbbecf40c1; enum Color { red @0; green @1; blue @2; yellow @3; } struct Point { x @0 :Int64; y @1 :Int64; color @2 :Color; } """ mod = self.compile(schema) buf = b('\x01\x00\x00\x00\x00\x00\x00\x00' # 1 '\x02\x00\x00\x00\x00\x00\x00\x00' # 2 '\x03\x00\x00\x00\x00\x00\x00\x00') # yellow # p = mod.Point(1, 2, mod.Color.yellow) assert p.x == 1 assert p.y == 2 assert p.color == mod.Color.yellow assert p._seg.buf == buf
def test_void(self): schema = """ @0xbf5147cbbecf40c1; struct Point { x @0 :Int64; y @1 :Int64; z @2 :Void; } """ mod = self.compile(schema) buf = b('\x01\x00\x00\x00\x00\x00\x00\x00' # 1 '\x02\x00\x00\x00\x00\x00\x00\x00') # 2 # p = mod.Point(1, 2) assert p.x == 1 assert p.y == 2 assert p._seg.buf == buf py.test.raises(TypeError, "mod.Point(z=None)")
def test_struct(self): schema = """ @0xbf5147cbbecf40c1; struct Point { x @0 :Int64; y @1 :Int64; } struct Foo { x @0 :Point; } """ mod = self.compile(schema) p = mod.Point(1, 2) foo = mod.Foo(p) assert foo._seg.buf == b('\x00\x00\x00\x00\x02\x00\x00\x00' # ptr to point '\x01\x00\x00\x00\x00\x00\x00\x00' # p.x == 1 '\x02\x00\x00\x00\x00\x00\x00\x00') # p.y == 2
def test_list_of_text(self): schema = """ @0xbf5147cbbecf40c1; struct Foo { x @0 :List(Text); } """ mod = self.compile(schema) foo = mod.Foo([b'foo', b'bar', b'baz']) expected = b('\x01\x00\x00\x00\x1e\x00\x00\x00' # ptrlist '\x09\x00\x00\x00\x22\x00\x00\x00' '\x09\x00\x00\x00\x22\x00\x00\x00' '\x09\x00\x00\x00\x22\x00\x00\x00' 'foo\x00\x00\x00\x00\x00' 'bar\x00\x00\x00\x00\x00' 'baz\x00\x00\x00\x00\x00') assert foo._seg.buf == expected
def test_list_of_lists(self): schema = """ @0xbf5147cbbecf40c1; struct Foo { x @0 :List(List(Int8)); } """ mod = self.compile(schema) foo = mod.Foo([[1, 2, 3], [4, 5], [6, 7, 8, 9]]) expected = b('\x01\x00\x00\x00\x1e\x00\x00\x00' # list<ptr> (3 items) '\x09\x00\x00\x00\x1a\x00\x00\x00' # list<8> (3 items) '\x09\x00\x00\x00\x12\x00\x00\x00' # list<8> (2 items) '\x09\x00\x00\x00\x22\x00\x00\x00' # list<8> (4 items) '\x01\x02\x03\x00\x00\x00\x00\x00' '\x04\x05\x00\x00\x00\x00\x00\x00' '\x06\x07\x08\x09\x00\x00\x00\x00') assert foo._seg.buf == expected
def test_group(self): schema = """ @0xbf5147cbbecf40c1; struct Point { position :group { x @0 :Int64; y @1 :Int64; } color @2 :Text; } """ mod = self.compile(schema) p = mod.Point(position=(1, 2), color=b'red') assert p.position.x == 1 assert p.position.y == 2 assert p.color == b'red'
def test_nested_group(self): schema = """ @0xbf5147cbbecf40c1; struct Shape { position :group { a :group { x @0 :Int64; y @1 :Int64; } b :group { x @2 :Int64; y @3 :Int64; } } } """ mod = self.compile(schema) p = mod.Shape(position=((1, 2), (3, 4))) assert p.position.a.x == 1 assert p.position.a.y == 2 assert p.position.b.x == 3 assert p.position.b.y == 4
def test_group_named_params(self): schema = """ @0xbf5147cbbecf40c1; struct Point { position :group { x @0 :Int64; y @1 :Int64; } color :group { alpha @2 :Float64; name @3 :Text; } } """ mod = self.compile(schema) p = mod.Point(position=mod.Point.Position(x=1, y=2), color=mod.Point.Color(alpha=1.0, name=b'red')) assert p.position.x == 1 assert p.position.y == 2 assert p.color.alpha == 1.0 assert p.color.name == b'red'
def test_group_named_params(self): schema = """ @0xbf5147cbbecf40c1; struct Point { position :group { x @0 :Int64 = 42; y @1 :Int64; } color :group { alpha @2 :UInt8 = 255; name @3 :Text; } } """ mod = self.compile(schema) assert mod.Point.Position() == (42, 0) assert mod.Point.Color() == (255, None) p = mod.Point(position=mod.Point.Position(y=2), color=mod.Point.Color(name=b'red')) assert p.position.x == 42 assert p.position.y == 2 assert p.color.alpha == 255 assert p.color.name == b'red'
def test_enum(self): schema = """ @0xbf5147cbbecf40c1; enum Color { red @0; green @1; blue @2; } struct P { x @0 :Color; y @1 :Color; } """ self.mod = self.compile(schema) buf = b'\x01\x00\x00\x00\x00\x00\x00\x00' p = self.mod.P.from_buffer(buf, 0, 1, 0) self.check(p, '(x = green, y = red)')
def test_text(self): schema = """ @0xbf5147cbbecf40c1; struct Person { name @0 :Text; surname @1 :Text; } """ self.mod = self.compile(schema) p = self.mod.Person(name=None, surname=None) self.check(p, '()') # p = self.mod.Person(name=b"foo", surname=None) self.check(p, '(name = "foo")') # p = self.mod.Person(name=None, surname=b"bar") self.check(p, '(surname = "bar")') # p = self.mod.Person(name=b"foo", surname=b"bar") self.check(p, '(name = "foo", surname = "bar")')
def test_text_special_chars(self): schema = """ @0xbf5147cbbecf40c1; struct P { txt @0 :Text; } """ self.mod = self.compile(schema) p = self.mod.P(txt=b'double "quotes"') self.check(p, r'(txt = "double \"quotes\"")') # p = self.mod.P(txt=b"single 'quotes'") self.check(p, r'(txt = "single \'quotes\'")') # p = self.mod.P(txt=b"tricky \" '") self.check(p, r'(txt = "tricky \" \'")') # p = self.mod.P(txt=u'hellò'.encode('utf-8')) self.check(p, r'(txt = "hell\xc3\xb2")')
def test_struct(self): schema = """ @0xbf5147cbbecf40c1; struct Point { x @0 :Int64; y @1 :Int64; } struct Rectangle { a @0 :Point; b @1 :Point; empty @2 :Point; } """ self.mod = self.compile(schema) p1 = self.mod.Point(1, 2) p2 = self.mod.Point(3, 4) r = self.mod.Rectangle(p1, p2, None) self.check(r, '(a = (x = 1, y = 2), b = (x = 3, y = 4))')
def test_list(self): schema = """ @0xbf5147cbbecf40c1; struct Point { x @0 :Int64; y @1 :Int64; } struct P { ints @0 :List(Int64); structs @1 :List(Point); texts @2 :List(Text); } """ self.mod = self.compile(schema) p = self.mod.P(ints=[1, 2, 3], structs=None, texts=None) self.check(p, '(ints = [1, 2, 3])') # p1 = self.mod.Point(1, 2) p2 = self.mod.Point(3, 4) p = self.mod.P(ints=None, structs=[p1, p2], texts=None) self.check(p, '(structs = [(x = 1, y = 2), (x = 3, y = 4)])') # p = self.mod.P(ints=None, structs=None, texts=[b'foo', b'bar', b'baz']) self.check(p, '(texts = ["foo", "bar", "baz"])')
def test_union(self): schema = """ @0xbf5147cbbecf40c1; struct P { union { x @0 :Int64; y @1 :Void; z @2 :Text; } } """ self.mod = self.compile(schema) p = self.mod.P.new_x(1) self.check(p, '(x = 1)') # p = self.mod.P.new_y() self.check(p, '(y = void)') # p = self.mod.P.new_z(b'hello') self.check(p, '(z = "hello")')
def test_listbuilder_bug(self): schema = """ @0xbf5147cbbecf40c1; struct Bar { x @0 :Int64; y @1 :Int64; } struct Foo { name @0 :Text; bars @1 :List(Bar); } """ mod = self.compile(schema) bars = [mod.Bar(1, 2)] foo = mod.Foo(b'name', bars) assert len(foo.bars) == 1 assert foo.bars[0].x == 1 assert foo.bars[0].y == 2
def test_listbuilder_null_ptrs(self): schema = """ @0xbf5147cbbecf40c1; struct Bar { x @0 :Int64; y @1 :Int64; name @2 :Text; } struct Foo { bars @0 :List(Bar); } """ mod = self.compile(schema) a = mod.Bar(1, 2, None) b = mod.Bar(3, 4, None) foo = mod.Foo([a, b]) a1 = foo.bars[0] assert a1.x == 1 assert a1.y == 2 assert a1.name is None
def test_compact_structs(self): schema = """ @0xbf5147cbbecf40c1; struct Person { name @0 :Text; } struct Foo { key @0 :Person; } """ mod = self.compile(schema) buf = b('garbage0' '\x05\x00\x00\x00\x32\x00\x00\x00' # ptr to name 'garbage1' 'dummy\x00\x00\x00') p = mod.Person.from_buffer(buf, 8, 0, 1) foo = mod.Foo(p) assert foo.key.name == b'dummy' # we check that the structure has been packed assert foo.key._data_offset == 8 assert foo.key._seg.buf[8:] == b('\x01\x00\x00\x00\x32\x00\x00\x00' # ptr to dummy 'dummy\x00\x00\x00')
def test_compact_struct_inside_list(self): schema = """ @0xbf5147cbbecf40c1; struct Person { name @0 :Text; surname @1 :Text; } struct Town { people @0 :List(Person); } """ mod = self.compile(schema) p1 = mod.Person(b'Mickey', b'Mouse') p2 = mod.Person(b'Donald', b'Duck') t = mod.Town([p1, p2]) assert t.people[0].name == b'Mickey' assert t.people[0].surname == b'Mouse' assert t.people[1].name == b'Donald' assert t.people[1].surname == b'Duck'