我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用oslo_utils.uuidutils.generate_uuid()。
def test_add_another_offset(self): topic_1 = uuidutils.generate_uuid() partition_1 = random.randint(0, 1024) until_offset_1 = random.randint(0, sys.maxsize) from_offset_1 = random.randint(0, sys.maxsize) app_name_1 = uuidutils.generate_uuid() offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1) my_batch_time = self.get_dummy_batch_time() used_values = {} self.kafka_offset_specs.add(topic=topic_1, partition=partition_1, app_name=app_name_1, from_offset=from_offset_1, until_offset=until_offset_1, batch_time_info=my_batch_time) used_values[offset_key_1] = { "topic": topic_1, "partition": partition_1, "app_name": app_name_1, "from_offset": from_offset_1, "until_offset": until_offset_1 } kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets( app_name_1) offset_value_1 = kafka_offset_specs.get(offset_key_1) self.assertions_on_offset(used_value=used_values.get(offset_key_1), offset_value=offset_value_1) self.assertEqual(1, len(self.kafka_offset_specs.get_kafka_offsets( app_name_1)))
def test_send_magic_packets(self, mock_socket): fake_socket = mock.Mock(spec=socket, spec_set=True) mock_socket.return_value = fake_socket() obj_utils.create_test_port(self.context, uuid=uuidutils.generate_uuid(), address='aa:bb:cc:dd:ee:ff', node_id=self.node.id) with task_manager.acquire( self.context, self.node.uuid, shared=True) as task: wol_power._send_magic_packets(task, '255.255.255.255', 9) expected_calls = [ mock.call(), mock.call().setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1), mock.call().sendto(mock.ANY, ('255.255.255.255', 9)), mock.call().sendto(mock.ANY, ('255.255.255.255', 9)), mock.call().close()] fake_socket.assert_has_calls(expected_calls) self.assertEqual(1, mock_socket.call_count)
def test_create_audit_template(self): goal_name = "dummy" _, goal = self.client.show_goal(goal_name) params = { 'name': 'my at name %s' % uuidutils.generate_uuid(), 'description': 'my at description', 'goal': goal['uuid']} expected_data = { 'name': params['name'], 'description': params['description'], 'goal_uuid': params['goal'], 'goal_name': goal_name, 'strategy_uuid': None, 'strategy_name': None} _, body = self.create_audit_template(**params) self.assert_expected(expected_data, body) _, audit_template = self.client.show_audit_template(body['uuid']) self.assert_expected(audit_template, body)
def test_create_audit_template_unicode_description(self): goal_name = "dummy" _, goal = self.client.show_goal(goal_name) # Use a unicode string for testing: params = { 'name': 'my at name %s' % uuidutils.generate_uuid(), 'description': 'my àt déscrïptïôn', 'goal': goal['uuid']} expected_data = { 'name': params['name'], 'description': params['description'], 'goal_uuid': params['goal'], 'goal_name': goal_name, 'strategy_uuid': None, 'strategy_name': None} _, body = self.create_audit_template(**params) self.assert_expected(expected_data, body) _, audit_template = self.client.show_audit_template(body['uuid']) self.assert_expected(audit_template, body)
def test_update_audit_template_remove(self): description = 'my at description' name = 'my at name %s' % uuidutils.generate_uuid() params = {'name': name, 'description': description, 'goal': self.goal['uuid']} _, audit_template = self.create_audit_template(**params) # Removing the description self.client.update_audit_template( audit_template['uuid'], [{'path': '/description', 'op': 'remove'}]) _, body = self.client.show_audit_template(audit_template['uuid']) self.assertIsNone(body.get('description')) # Assert nothing else was changed self.assertEqual(name, body['name']) self.assertIsNone(body['description']) self.assertEqual(self.goal['uuid'], body['goal_uuid'])
def post(self, context, request_data): """Create a new user. Requires project admin privileges.""" # NOTE(sulo): Instead of using context project_id from # header, here we always ensure, user create gets project_id # from request param. project_id = request_data["project_id"] dbapi.projects_get_by_id(context, project_id) api_key = uuidutils.generate_uuid() request_data["api_key"] = api_key user_obj = dbapi.users_create(context, request_data) location = v1.api.url_for( UserById, id=user_obj.id, _external=True ) headers = {'Location': location} return jsonutils.to_primitive(user_obj), 201, headers
def start(conf): persistence = _get_persistence_backend(conf) if conf.taskflow.db_upgrade: with contextlib.closing(persistence.get_connection()) as conn: LOG.info('Checking for database schema upgrade') conn.upgrade() my_name = uuidutils.generate_uuid() LOG.info('I am %s', my_name) board = _get_jobboard_backend(conf, persistence=persistence) conductor = conductors.fetch( 'nonblocking', my_name, board, engine='parallel', max_simultaneous_jobs=conf.max_simultaneous_jobs, persistence=persistence) board.connect() LOG.debug('Starting taskflow conductor loop') threading.Thread(target=conductor.run).start() return persistence, board, conductor
def accelerator_create(self, context, values): if not values.get('uuid'): values['uuid'] = uuidutils.generate_uuid() if not values.get('description'): values['description'] = '' accelerator = models.Accelerator() accelerator.update(values) with _session_for_write() as session: try: session.add(accelerator) session.flush() except db_exc.DBDuplicateEntry: raise exception.AcceleratorAlreadyExists(uuid=values['uuid']) return accelerator
def create_board(self, values): # ensure defaults are present for new boards if 'uuid' not in values: values['uuid'] = uuidutils.generate_uuid() if 'status' not in values: values['status'] = states.REGISTERED board = models.Board() board.update(values) try: board.save() except db_exc.DBDuplicateEntry as exc: if 'code' in exc.columns: raise exception.DuplicateCode(code=values['code']) raise exception.BoardAlreadyExists(uuid=values['uuid']) return board
def register_job(context, _type, resource_id): try: context.session.begin() job_dict = {'id': uuidutils.generate_uuid(), 'type': _type, 'status': constants.JS_Running, 'resource_id': resource_id, 'extra_id': constants.SP_EXTRA_ID} job = core.create_resource(context, models.Job, job_dict) context.session.commit() return job except db_exc.DBDuplicateEntry: context.session.rollback() return None except db_exc.DBDeadlock: context.session.rollback() return None finally: context.session.close()
def test_update_subproject_not_in_hierarchy(self): # Create another project hierarchy E = self.FakeProject(id=uuidutils.generate_uuid(), parent_id=None) F = self.FakeProject(id=uuidutils.generate_uuid(), parent_id=E.id) E.subtree = {F.id: F.subtree} self.project_by_id[E.id] = E self.project_by_id[F.id] = F qso = quota.QuotaSetOperation(self.A.id) qso._get_project = mock.Mock() qso._get_project.side_effect = self._get_project self.ctx.project_id = self.A.id updated = _make_body(tenant_id=None, root=True, **self.test_class_quota) expected = _make_body(tenant_id=None, root=True, **self.test_class_expected_result) result = qso.update(self.ctx, **updated) self.assertDictMatch(expected, result) # Update the quota of B to be equal to its parent quota qso.update_hierarchy(F.id) self.assertRaises(exceptions.HTTPForbiddenError, qso.update, self.ctx, **updated)
def setUp(self): super(HBaseManager, self).setUp() self.connection = storage.get_connection( self.url, self.conf) # Unique prefix for each test to keep data is distinguished because # all test data is stored in one table data_prefix = uuidutils.generate_uuid(dashed=False) def table(conn, name): return mocks.MockHBaseTable(name, conn, data_prefix) # Mock only real HBase connection, MConnection "table" method # stays origin. mock.patch('happybase.Connection.table', new=table).start() # We shouldn't delete data and tables after each test, # because it last for too long. # All tests tables will be deleted in setup-test-env.sh mock.patch("happybase.Connection.disable_table", new=mock.MagicMock()).start() mock.patch("happybase.Connection.delete_table", new=mock.MagicMock()).start() mock.patch("happybase.Connection.create_table", new=mock.MagicMock()).start()
def _generate_models(self): event_models = [] base = 0 self.s_time = datetime.datetime(2013, 12, 31, 5, 0) self.trait_time = datetime.datetime(2013, 12, 31, 5, 0) for i in range(20): trait_models = [models.Trait(name, type, value) for name, type, value in [ ('trait_A', models.Trait.TEXT_TYPE, "my_text"), ('trait_B', models.Trait.INT_TYPE, base + 1), ('trait_C', models.Trait.FLOAT_TYPE, float(base) + 0.123456), ('trait_D', models.Trait.DATETIME_TYPE, self.trait_time)]] event_models.append( models.Event(message_id=uuidutils.generate_uuid(), event_type='foo.bar', generated=self.trait_time, traits=trait_models, raw={'status': {'nested': 'started'}})) self.trait_time += datetime.timedelta(seconds=1) self.conn.record_events(event_models)
def create_container(self, context, values): # ensure defaults are present for new containers if not values.get('uuid'): values['uuid'] = uuidutils.generate_uuid() if values.get('name'): self._validate_unique_container_name(context, values['name']) container = models.Container() container.update(values) try: container.save() except db_exc.DBDuplicateEntry: raise exception.ContainerAlreadyExists(field='UUID', value=values['uuid']) return container
def create_container(self, context, container_data): # ensure defaults are present for new containers if not container_data.get('uuid'): container_data['uuid'] = uuidutils.generate_uuid() if container_data.get('name'): self._validate_unique_container_name(context, container_data['name']) container = models.Container(container_data) try: container.save() except Exception: raise return container
def test_container_actions_get_by_container(self): """Ensure we can get actions by UUID.""" uuid1 = uuidutils.generate_uuid() expected = [] action_values = self._create_action_values(uuid1) action = dbapi.action_start(self.context, action_values) expected.append(action) action_values['action'] = 'test-action' action = dbapi.action_start(self.context, action_values) expected.append(action) # Create an other container action. uuid2 = uuidutils.generate_uuid() action_values = self._create_action_values(uuid2, 'test-action') dbapi.action_start(self.context, action_values) actions = dbapi.actions_get(self.context, uuid1) self._assertEqualListsOfObjects(expected, actions)
def test_container_action_get_by_container_and_request(self): """Ensure we can get an action by container UUID and request_id""" uuid1 = uuidutils.generate_uuid() action_values = self._create_action_values(uuid1) dbapi.action_start(self.context, action_values) request_id = action_values['request_id'] # An other action using a different req id action_values['action'] = 'test-action' action_values['request_id'] = 'req-00000000-7522-4d99-7ff-111111111111' dbapi.action_start(self.context, action_values) action = dbapi.action_get_by_request_id(self.context, uuid1, request_id) self.assertEqual('create_container', action['action']) self.assertEqual(self.context.request_id, action['request_id'])
def test_container_action_event_start(self): """Create a container action event.""" uuid = uuidutils.generate_uuid() action_values = self._create_action_values(uuid) action = dbapi.action_start(self.context, action_values) event_values = self._create_event_values(uuid) event = dbapi.action_event_start(self.context, event_values) event_values['action_id'] = action['id'] ignored_keys = self.IGNORED_FIELDS + ['finish_time', 'traceback', 'result'] self._assertEqualObjects(event_values, event, ignored_keys) self._assertActionEventSaved(event, action['id'])
def test_container_action_event_finish_success(self): """Finish a container action event.""" uuid = uuidutils.generate_uuid() action = dbapi.action_start(self.context, self._create_action_values(uuid)) dbapi.action_event_start(self.context, self._create_event_values(uuid)) event_values = { 'finish_time': timeutils.utcnow() + datetime.timedelta(seconds=5), 'result': 'Success' } event_values = self._create_event_values(uuid, extra=event_values) event = dbapi.action_event_finish(self.context, event_values) self._assertActionEventSaved(event, action['id']) action = dbapi.action_get_by_request_id(self.context, uuid, self.context.request_id) self.assertNotEqual('Error', action['message'])
def test_list_capsules_with_filters(self): capsule1 = utils.create_test_capsule( name='capsule1', uuid=uuidutils.generate_uuid(), context=self.context) capsule2 = utils.create_test_capsule( name='capsule2', uuid=uuidutils.generate_uuid(), context=self.context) res = dbapi.list_capsules( self.context, filters={'uuid': capsule1.uuid}) self.assertEqual([capsule1.id], [r.id for r in res]) res = dbapi.list_capsules( self.context, filters={'uuid': capsule2.uuid}) self.assertEqual([capsule2.id], [r.id for r in res]) res = dbapi.list_capsules( self.context, filters={'uuid': 'unknow-uuid'}) self.assertEqual([], [r.id for r in res])
def test_list_capsules(self, mock_write, mock_read): uuids = [] capsules = [] mock_read.side_effect = etcd.EtcdKeyNotFound for i in range(1, 6): capsule = utils.create_test_capsule( uuid=uuidutils.generate_uuid(), context=self.context, name='capsule' + str(i)) capsules.append(capsule.as_dict()) uuids.append(six.text_type(capsule['uuid'])) mock_read.side_effect = lambda *args: FakeEtcdMultipleResult( capsules) res = dbapi.list_capsules(self.context) res_uuids = [r.uuid for r in res] self.assertEqual(sorted(uuids), sorted(res_uuids))
def test_list_inventories(self): totals = [] for i in range(1, 6): provider = utils.create_test_resource_provider( id=i, uuid=uuidutils.generate_uuid(), context=self.context) inventory = utils.create_test_inventory( id=i, resource_provider_id=provider.id, total=i, context=self.context) totals.append(inventory['total']) res = dbapi.list_inventories(self.context) res_totals = [r.total for r in res] self.assertEqual(sorted(totals), sorted(res_totals))
def test_list_inventories_sorted(self): totals = [] for i in range(5): provider = utils.create_test_resource_provider( id=i, uuid=uuidutils.generate_uuid(), context=self.context) inventory = utils.create_test_inventory( id=i, resource_provider_id=provider.id, total=10 - i, context=self.context) totals.append(inventory['total']) res = dbapi.list_inventories(self.context, sort_key='total') res_totals = [r.total for r in res] self.assertEqual(sorted(totals), res_totals) self.assertRaises(exception.InvalidParameterValue, dbapi.list_inventories, self.context, sort_key='foo')
def test_list_volume_mappings(self, mock_write, mock_read): uuids = [] volume_mappings = [] mock_read.side_effect = etcd.EtcdKeyNotFound for i in range(0, 6): volume_mapping = utils.create_test_volume_mapping( uuid=uuidutils.generate_uuid(), context=self.context, name='volume_mapping' + str(i)) volume_mappings.append(volume_mapping.as_dict()) uuids.append(six.text_type(volume_mapping['uuid'])) mock_read.side_effect = lambda *args: FakeEtcdMultipleResult( volume_mappings) res = dbapi.list_volume_mappings(self.context) res_uuids = [r.uuid for r in res] self.assertEqual(sorted(uuids), sorted(res_uuids))
def test_list_volume_mappings_sorted(self, mock_write, mock_read): uuids = [] volume_mappings = [] mock_read.side_effect = etcd.EtcdKeyNotFound for i in range(0, 6): volume_mapping = utils.create_test_volume_mapping( uuid=uuidutils.generate_uuid(), context=self.context, name='volume_mapping' + str(i)) volume_mappings.append(volume_mapping.as_dict()) uuids.append(six.text_type(volume_mapping['uuid'])) mock_read.side_effect = lambda *args: FakeEtcdMultipleResult( volume_mappings) res = dbapi.list_volume_mappings(self.context, sort_key='uuid') res_uuids = [r.uuid for r in res] self.assertEqual(sorted(uuids), res_uuids) self.assertRaises(exception.InvalidParameterValue, dbapi.list_volume_mappings, self.context, sort_key='wrong_key')
def test_list_containers_sorted(self): uuids = [] for i in range(5): container = utils.create_test_container( uuid=uuidutils.generate_uuid(), context=self.context, name='container' + str(i)) uuids.append(six.text_type(container.uuid)) res = dbapi.list_containers(self.context, sort_key='uuid') res_uuids = [r.uuid for r in res] self.assertEqual(sorted(uuids), res_uuids) self.assertRaises(exception.InvalidParameterValue, dbapi.list_containers, self.context, sort_key='foo')
def test_update_container_with_the_same_name(self): CONF.set_override("unique_container_name_scope", "project", group="compute") container1 = utils.create_test_container( name='container-one', uuid=uuidutils.generate_uuid(), context=self.context) container2 = utils.create_test_container( name='container-two', uuid=uuidutils.generate_uuid(), context=self.context) new_name = 'new_name' dbapi.update_container(self.context, container1.id, {'name': new_name}) self.assertRaises(exception.ContainerAlreadyExists, dbapi.update_container, self.context, container2.id, {'name': new_name})
def test_list_containers(self, mock_write, mock_read): uuids = [] containers = [] mock_read.side_effect = etcd.EtcdKeyNotFound for i in range(1, 6): container = utils.create_test_container( uuid=uuidutils.generate_uuid(), context=self.context, name='cont' + str(i)) containers.append(container.as_dict()) uuids.append(six.text_type(container['uuid'])) mock_read.side_effect = lambda *args: FakeEtcdMultipleResult( containers) res = dbapi.list_containers(self.context) res_uuids = [r.uuid for r in res] self.assertEqual(sorted(uuids), sorted(res_uuids))
def test_update_container_with_the_same_name(self, mock_update, mock_write, mock_read): CONF.set_override("unique_container_name_scope", "project", group="compute") mock_read.side_effect = etcd.EtcdKeyNotFound container1 = utils.create_test_container( name='container-one', uuid=uuidutils.generate_uuid(), context=self.context) container2 = utils.create_test_container( name='container-two', uuid=uuidutils.generate_uuid(), context=self.context) mock_read.side_effect = lambda *args: FakeEtcdMultipleResult( [container1.as_dict(), container2.as_dict()]) self.assertRaises(exception.ContainerAlreadyExists, dbapi.update_container, self.context, container2.uuid, {'name': 'container-one'})
def test_list_resource_classes_sorted(self): names = [] for i in range(5): resource = utils.create_test_resource_class( context=self.context, uuid=uuidutils.generate_uuid(), name='class' + str(i)) names.append(six.text_type(resource.name)) res = dbapi.list_resource_classes(self.context, sort_key='name') res_names = [r.name for r in res] self.assertEqual(sorted(names), res_names) self.assertRaises(exception.InvalidParameterValue, dbapi.list_resource_classes, self.context, sort_key='foo')
def test_list_allocations(self): cids = [] for i in range(1, 6): provider = utils.create_test_resource_provider( id=i, uuid=uuidutils.generate_uuid(), context=self.context) allocation = utils.create_test_allocation( id=i, resource_provider_id=provider.id, consumer_id=uuidutils.generate_uuid(), context=self.context) cids.append(allocation['consumer_id']) res = dbapi.list_allocations(self.context) res_cids = [r.consumer_id for r in res] self.assertEqual(sorted(cids), sorted(res_cids))
def test_list_compute_nodes_sorted(self): uuids = [] for i in range(5): node = utils.create_test_compute_node( uuid=uuidutils.generate_uuid(), context=self.context, hostname='node' + str(i)) uuids.append(six.text_type(node.uuid)) res = dbapi.list_compute_nodes(self.context, sort_key='uuid') res_uuids = [r.uuid for r in res] self.assertEqual(sorted(uuids), res_uuids) self.assertRaises(exception.InvalidParameterValue, dbapi.list_compute_nodes, self.context, sort_key='foo')
def test_list_images_with_filters(self): image1 = utils.create_test_image( context=self.context, repo='image-one', uuid=uuidutils.generate_uuid()) image2 = utils.create_test_image( context=self.context, repo='image-two', uuid=uuidutils.generate_uuid()) res = self.dbapi.list_images(self.context, filters={'repo': 'image-one'}) self.assertEqual([image1.id], [r.id for r in res]) res = self.dbapi.list_images(self.context, filters={'repo': 'image-two'}) self.assertEqual([image2.id], [r.id for r in res]) res = self.dbapi.list_images(self.context, filters={'repo': 'bad-image'}) self.assertEqual([], [r.id for r in res]) res = self.dbapi.list_images( self.context, filters={'repo': image1.repo}) self.assertEqual([image1.id], [r.id for r in res])
def test_list_resource_providers_sorted(self): uuids = [] for i in range(5): provider = utils.create_test_resource_provider( uuid=uuidutils.generate_uuid(), context=self.context, name='provider' + str(i)) uuids.append(six.text_type(provider.uuid)) res = dbapi.list_resource_providers(self.context, sort_key='uuid') res_uuids = [r.uuid for r in res] self.assertEqual(sorted(uuids), res_uuids) self.assertRaises(exception.InvalidParameterValue, dbapi.list_resource_providers, self.context, sort_key='foo')
def test_get_all_images_with_pagination_marker(self, mock_image_list ): image_list = [] for id_ in range(4): test_image = utils.create_test_image( context=self.context, id=id_, repo='testrepo' + str(id_), uuid=uuidutils.generate_uuid()) image_list.append(objects.Image(self.context, **test_image)) mock_image_list.return_value = image_list[-1:] response = self.get('/v1/images/?limit=3&marker=%s' % image_list[2].uuid) self.assertEqual(200, response.status_int) actual_images = response.json['images'] self.assertEqual(1, len(actual_images)) self.assertEqual(image_list[-1].uuid, actual_images[0].get('uuid'))
def test_get_all_hosts_with_pagination_marker(self, mock_host_list, mock_policy): mock_policy.return_value = True host_list = [] for id_ in range(4): test_host = utils.create_test_compute_node( context=self.context, uuid=uuidutils.generate_uuid()) numat = numa.NUMATopology._from_dict(test_host['numa_topology']) test_host['numa_topology'] = numat host = objects.ComputeNode(self.context, **test_host) host_list.append(host) mock_host_list.return_value = host_list[-1:] response = self.get('/v1/hosts?limit=3&marker=%s' % host_list[2].uuid) self.assertEqual(200, response.status_int) actual_hosts = response.json['hosts'] self.assertEqual(1, len(actual_hosts)) self.assertEqual(host_list[-1].uuid, actual_hosts[0].get('uuid'))
def test_get_all_containers_with_pagination_marker(self, mock_container_list, mock_container_show): container_list = [] for id_ in range(4): test_container = utils.create_test_container( id=id_, uuid=uuidutils.generate_uuid(), name='container' + str(id_), context=self.context) container_list.append(objects.Container(self.context, **test_container)) mock_container_list.return_value = container_list[-1:] mock_container_show.return_value = container_list[-1] response = self.get('/v1/containers/?limit=3&marker=%s' % container_list[2].uuid) self.assertEqual(200, response.status_int) actual_containers = response.json['containers'] self.assertEqual(1, len(actual_containers)) self.assertEqual(container_list[-1].uuid, actual_containers[0].get('uuid'))
def test_refresh(self): uuid = self.fake_volume_mapping['uuid'] new_uuid = uuidutils.generate_uuid() returns = [dict(self.fake_volume_mapping, uuid=uuid), dict(self.fake_volume_mapping, uuid=new_uuid)] expected = [mock.call(self.context, uuid), mock.call(self.context, uuid)] with mock.patch.object(self.dbapi, 'get_volume_mapping_by_uuid', side_effect=returns, autospec=True) as mock_get_volume_mapping: volume_mapping = objects.VolumeMapping.get_by_uuid(self.context, uuid) self.assertEqual(uuid, volume_mapping.uuid) volume_mapping.refresh() self.assertEqual(new_uuid, volume_mapping.uuid) self.assertEqual(expected, mock_get_volume_mapping.call_args_list) self.assertEqual(self.context, volume_mapping._context)
def test_refresh(self): uuid = self.fake_container['uuid'] new_uuid = uuidutils.generate_uuid() returns = [dict(self.fake_container, uuid=uuid), dict(self.fake_container, uuid=new_uuid)] expected = [mock.call(self.context, uuid), mock.call(self.context, uuid)] with mock.patch.object(self.dbapi, 'get_container_by_uuid', side_effect=returns, autospec=True) as mock_get_container: container = objects.Container.get_by_uuid(self.context, uuid) self.assertEqual(uuid, container.uuid) container.refresh() self.assertEqual(new_uuid, container.uuid) self.assertEqual(expected, mock_get_container.call_args_list) self.assertEqual(self.context, container._context)
def container_attach(self, context, container): LOG.debug('Get websocket url from the container: %s', container.uuid) try: url = self.driver.get_websocket_url(context, container) token = uuidutils.generate_uuid() access_url = '%s?token=%s&uuid=%s' % ( CONF.websocket_proxy.base_url, token, container.uuid) container.websocket_url = url container.websocket_token = token container.save(context) return access_url except Exception as e: LOG.error("Error occurred while calling " "get websocket url function: %s", six.text_type(e)) raise
def test_update_logical_router_port(self): fake_router_port = test_constants.FAKE_ROUTER_PORT.copy() uuid = fake_router_port['id'] fake_relay_uuid = uuidutils.generate_uuid() lrport = self.get_mocked_resource() with mock.patch.object(lrport, 'get', return_value=fake_router_port),\ mock.patch("vmware_nsxlib.v3.NsxLib.get_version", return_value='2.0.0'): lrport.update(uuid, relay_service_uuid=fake_relay_uuid) data = { 'id': uuid, 'display_name': fake_router_port['display_name'], 'logical_router_id': fake_router_port['logical_router_id'], 'resource_type': fake_router_port['resource_type'], "revision": 0, 'service_bindings': [{'service_id': { 'target_type': 'LogicalService', 'target_id': fake_relay_uuid}}] } test_client.assert_json_call( 'put', lrport, 'https://1.2.3.4/api/v1/logical-router-ports/%s' % uuid, data=jsonutils.dumps(data, sort_keys=True), headers=self.default_headers())
def test_get_default_network_id(self, mock_get_port_from_host, mock_conf): mock_conf.binding.link_iface = 'eth0' fake_endpoint_id = lib_utils.get_hash() fake_neutron_port_id = uuidutils.generate_uuid() fake_neutron_net_id = uuidutils.generate_uuid() fake_neutron_v4_subnet_id = uuidutils.generate_uuid() fake_neutron_v6_subnet_id = uuidutils.generate_uuid() fake_vm_port = self._get_fake_port( fake_endpoint_id, fake_neutron_net_id, fake_neutron_port_id, lib_const.PORT_STATUS_ACTIVE, fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id)['port'] mock_get_port_from_host.return_value = fake_vm_port nested_driver = nested.NestedDriver() host_network_id = nested_driver.get_default_network_id() mock_get_port_from_host.assert_called_with('eth0') self.assertEqual(host_network_id, fake_vm_port['network_id'])
def test_get_segmentation_id(self, mock_alloc_seg_id, mock_trunk_port, mock_vlan_check): mock_trunk_port.return_value = None mock_vlan_check.return_value = None fake_neutron_port1_id = uuidutils.generate_uuid() fake_neutron_port2_id = uuidutils.generate_uuid() mock_alloc_seg_id.side_effect = [1, 2] vlan_driver = vlan.VlanDriver() response = vlan_driver._get_segmentation_id(fake_neutron_port1_id) mock_alloc_seg_id.assert_called_once() self.assertEqual(response, 1) mock_alloc_seg_id.reset_mock() response = vlan_driver._get_segmentation_id(fake_neutron_port1_id) mock_alloc_seg_id.assert_not_called() self.assertEqual(response, 1) response = vlan_driver._get_segmentation_id(fake_neutron_port2_id) mock_alloc_seg_id.assert_called_once() self.assertEqual(response, 2)
def test_create_host_iface(self, mock_port_bind): veth_driver = veth.VethDriver() fake_endpoint_id = lib_utils.get_hash() fake_neutron_port = uuidutils.generate_uuid() fake_neutron_net_id = uuidutils.generate_uuid() fake_neutron_v4_subnet_id = uuidutils.generate_uuid() fake_neutron_v6_subnet_id = uuidutils.generate_uuid() fake_subnets = self._get_fake_subnets( fake_endpoint_id, fake_neutron_net_id, fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id) fake_network = mock.sentinel.binding_network fake_exec_response = ('fake_stdout', '') mock_port_bind.return_value = ('fake_host_ifname', 'fake_container_ifname', fake_exec_response) response = veth_driver.create_host_iface(fake_endpoint_id, fake_neutron_port, fake_subnets, fake_network) mock_port_bind.assert_called_with(fake_endpoint_id, fake_neutron_port, fake_subnets, fake_network) self.assertEqual(response, fake_exec_response)
def test_add_offset(self): topic_1 = uuidutils.generate_uuid() partition_1 = random.randint(0, 1024) until_offset_1 = random.randint(0, sys.maxsize) from_offset_1 = random.randint(0, sys.maxsize) app_name_1 = uuidutils.generate_uuid() offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1) my_batch_time = self.get_dummy_batch_time() used_values = {} self.kafka_offset_specs.add(topic=topic_1, partition=partition_1, app_name=app_name_1, from_offset=from_offset_1, until_offset=until_offset_1, batch_time_info=my_batch_time) used_values[offset_key_1] = { "topic": topic_1, "partition": partition_1, "app_name": app_name_1, "from_offset": from_offset_1, "until_offset": until_offset_1 } kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets( app_name_1) offset_value_1 = kafka_offset_specs.get(offset_key_1) self.assertions_on_offset(used_value=used_values.get(offset_key_1), offset_value=offset_value_1)
def test_update_offset_values(self): topic_1 = uuidutils.generate_uuid() partition_1 = random.randint(0, 1024) until_offset_1 = random.randint(0, sys.maxsize) from_offset_1 = random.randint(0, sys.maxsize) app_name_1 = uuidutils.generate_uuid() offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1) my_batch_time = self.get_dummy_batch_time() self.kafka_offset_specs.add(topic=topic_1, partition=partition_1, app_name=app_name_1, from_offset=from_offset_1, until_offset=until_offset_1, batch_time_info=my_batch_time) until_offset_2 = random.randint(0, sys.maxsize) while until_offset_2 == until_offset_1: until_offset_2 = random.randint(0, sys.maxsize) from_offset_2 = random.randint(0, sys.maxsize) while from_offset_2 == from_offset_1: from_offset_2 = random.randint(0, sys.maxsize) self.kafka_offset_specs.add(topic=topic_1, partition=partition_1, app_name=app_name_1, from_offset=from_offset_2, until_offset=until_offset_2, batch_time_info=my_batch_time) kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets( app_name_1) updated_offset_value = kafka_offset_specs.get(offset_key_1) self.assertEqual(from_offset_2, updated_offset_value.get_from_offset()) self.assertEqual(until_offset_2, updated_offset_value.get_until_offset())
def test_neutron_to_osvif_network(self): network_id = uuidutils.generate_uuid() network_name = 'test-net' network_mtu = 1500 neutron_network = { 'id': network_id, 'name': network_name, 'mtu': network_mtu, } network = ovu.neutron_to_osvif_network(neutron_network) self.assertEqual(network_id, network.id) self.assertEqual(network_name, network.label) self.assertEqual(network_mtu, network.mtu)
def test_neutron_to_osvif_network_no_name(self): network_id = uuidutils.generate_uuid() network_mtu = 1500 neutron_network = { 'id': network_id, 'mtu': network_mtu, } network = ovu.neutron_to_osvif_network(neutron_network) self.assertFalse(network.obj_attr_is_set('label'))
def test_neutron_to_osvif_network_no_mtu(self): network_id = uuidutils.generate_uuid() network_name = 'test-net' neutron_network = { 'id': network_id, 'name': network_name, } network = ovu.neutron_to_osvif_network(neutron_network) self.assertIsNone(network.mtu)
def test_neutron_to_osvif_vif_ovs_no_bridge(self): vif_plugin = 'ovs' port = {'id': uuidutils.generate_uuid()} subnets = {} self.assertRaises(o_cfg.RequiredOptError, ovu.neutron_to_osvif_vif_ovs, vif_plugin, port, subnets)