我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用oslo_serialization.jsonutils.dumps()。
def add(self): params = None try: params = utils.CNIParameters(flask.request.get_json()) LOG.debug('Received addNetwork request. CNI Params: %s', params) vif = self.plugin.add(params) data = jsonutils.dumps(vif.obj_to_primitive()) except exceptions.ResourceNotReady as e: LOG.error("Timed out waiting for requested pod to appear in " "registry: %s.", e) return '', httplib.GATEWAY_TIMEOUT, self.headers except Exception: LOG.exception('Error when processing addNetwork request. CNI ' 'Params: %s', params) return '', httplib.INTERNAL_SERVER_ERROR, self.headers return data, httplib.ACCEPTED, self.headers
def test_do_POST_populate(self): method = 'create' path = "http://localhost/populatePool" trunk_ips = [u"10.0.0.6"] num_ports = 3 body = jsonutils.dumps({"trunks": trunk_ips, "num_ports": num_ports}) headers = {'Content-Type': 'application/json', 'Connection': 'close'} headers['Content-Length'] = len(body) trigger_exception = False expected_resp = ('Ports pool at {} was populated with 3 ports.' .format(trunk_ips)).encode() self._do_POST_helper(method, path, headers, body, expected_resp, trigger_exception, trunk_ips, num_ports)
def test_do_POST_populate_exception(self): method = 'create' path = "http://localhost/populatePool" trunk_ips = [u"10.0.0.6"] num_ports = 3 body = jsonutils.dumps({"trunks": trunk_ips, "num_ports": num_ports}) headers = {'Content-Type': 'application/json', 'Connection': 'close'} headers['Content-Length'] = len(body) trigger_exception = True expected_resp = ('Error while populating pool {0} with {1} ports.' .format(trunk_ips, num_ports)).encode() self._do_POST_helper(method, path, headers, body, expected_resp, trigger_exception, trunk_ips, num_ports)
def test_do_POST_populate_no_trunks(self): method = 'create' path = "http://localhost/populatePool" trunk_ips = [] num_ports = 3 body = jsonutils.dumps({"trunks": trunk_ips, "num_ports": num_ports}) headers = {'Content-Type': 'application/json', 'Connection': 'close'} headers['Content-Length'] = len(body) trigger_exception = False expected_resp = ('Trunk port IP(s) missing.' .format(trunk_ips, num_ports)).encode() self._do_POST_helper(method, path, headers, body, expected_resp, trigger_exception, trunk_ips, num_ports)
def test_do_GET_list(self): method = 'list' method_resp = ('["10.0.0.6", "9d2b45c4efaa478481c30340b49fd4d2", ' '["00efc78c-f11c-414a-bfcd-a82e16dc07d1", ' '"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]] ' 'has 5 ports') path = "http://localhost/listPools" body = jsonutils.dumps({}) headers = {'Content-Type': 'application/json', 'Connection': 'close'} headers['Content-Length'] = len(body) trigger_exception = False expected_resp = ('Pools:\n{0}'.format(method_resp)).encode() self._do_GET_helper(method, method_resp, path, headers, body, expected_resp, trigger_exception)
def test_do_GET_list_exception(self): method = 'list' method_resp = ('["10.0.0.6", "9d2b45c4efaa478481c30340b49fd4d2", ' '["00efc78c-f11c-414a-bfcd-a82e16dc07d1", ' '"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]] ' 'has 5 ports') path = "http://localhost/listPools" body = jsonutils.dumps({}) headers = {'Content-Type': 'application/json', 'Connection': 'close'} headers['Content-Length'] = len(body) trigger_exception = True expected_resp = ('Error listing the pools.').encode() self._do_GET_helper(method, method_resp, path, headers, body, expected_resp, trigger_exception)
def test_do_GET_show(self): method = 'show' method_resp = "251f748d-2a0d-4143-bce8-2e616f7a6a4a" path = "http://localhost/showPool" pool_key = [u"10.0.0.6", u"9d2b45c4efaa478481c30340b49fd4d2", [u"00efc78c-f11c-414a-bfcd-a82e16dc07d1", u"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]] body = jsonutils.dumps({"pool_key": pool_key}) headers = {'Content-Type': 'application/json', 'Connection': 'close'} headers['Content-Length'] = len(body) trigger_exception = False formated_key = (pool_key[0], pool_key[1], tuple(sorted(pool_key[2]))) expected_resp = ('Pool {0} ports are:\n{1}' .format(formated_key, method_resp)).encode() self._do_GET_helper(method, method_resp, path, headers, body, expected_resp, trigger_exception, pool_key)
def test_do_GET_show_exception(self): method = 'show' method_resp = "251f748d-2a0d-4143-bce8-2e616f7a6a4a" path = "http://localhost/showPool" pool_key = [u"10.0.0.6", u"9d2b45c4efaa478481c30340b49fd4d2", [u"00efc78c-f11c-414a-bfcd-a82e16dc07d1", u"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]] body = jsonutils.dumps({"pool_key": pool_key}) headers = {'Content-Type': 'application/json', 'Connection': 'close'} headers['Content-Length'] = len(body) trigger_exception = True formated_key = (pool_key[0], pool_key[1], tuple(sorted(pool_key[2]))) expected_resp = ('Error showing pool: {0}.' .format(formated_key)).encode() self._do_GET_helper(method, method_resp, path, headers, body, expected_resp, trigger_exception, pool_key)
def test_do_GET_show_empty(self): method = 'show' method_resp = "Empty pool" path = "http://localhost/showPool" pool_key = [u"10.0.0.6", u"9d2b45c4efaa478481c30340b49fd4d2", [u"00efc78c-f11c-414a-bfcd-a82e16dc07d1", u"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]] body = jsonutils.dumps({"pool_key": pool_key}) headers = {'Content-Type': 'application/json', 'Connection': 'close'} headers['Content-Length'] = len(body) trigger_exception = False formated_key = (pool_key[0], pool_key[1], tuple(sorted(pool_key[2]))) expected_resp = ('Pool {0} ports are:\n{1}' .format(formated_key, method_resp)).encode() self._do_GET_helper(method, method_resp, path, headers, body, expected_resp, trigger_exception, pool_key)
def test_annotate(self, m_patch, m_count): m_count.return_value = list(range(1, 5)) path = '/test' annotations = {'a1': 'v1', 'a2': 'v2'} resource_version = "123" ret = {'metadata': {'annotations': annotations, "resourceVersion": resource_version}} data = jsonutils.dumps(ret, sort_keys=True) m_resp = mock.MagicMock() m_resp.ok = True m_resp.json.return_value = ret m_patch.return_value = m_resp self.assertEqual(annotations, self.client.annotate( path, annotations, resource_version=resource_version)) m_patch.assert_called_once_with(self.base_url + path, data=data, headers=mock.ANY, cert=(None, None), verify=False)
def test_watch(self, m_get): path = '/test' data = [{'obj': 'obj%s' % i} for i in range(3)] lines = [jsonutils.dumps(i) for i in data] m_resp = mock.MagicMock() m_resp.ok = True m_resp.iter_lines.return_value = lines m_get.return_value = m_resp cycles = 3 self.assertEqual( data * cycles, list(itertools.islice(self.client.watch(path), len(data) * cycles))) self.assertEqual(cycles, m_get.call_count) self.assertEqual(cycles, m_resp.close.call_count) m_get.assert_called_with(self.base_url + path, headers={}, stream=True, params={'watch': 'true'}, cert=(None, None), verify=False)
def _set_lbaas_spec(self, service, lbaas_spec): # TODO(ivc): extract annotation interactions if lbaas_spec is None: LOG.debug("Removing LBaaSServiceSpec annotation: %r", lbaas_spec) annotation = None else: lbaas_spec.obj_reset_changes(recursive=True) LOG.debug("Setting LBaaSServiceSpec annotation: %r", lbaas_spec) annotation = jsonutils.dumps(lbaas_spec.obj_to_primitive(), sort_keys=True) svc_link = service['metadata']['selfLink'] ep_link = self._get_endpoints_link(service) k8s = clients.get_kubernetes_client() try: k8s.annotate(ep_link, {k_const.K8S_ANNOTATION_LBAAS_SPEC: annotation}) except k_exc.K8sClientException: # REVISIT(ivc): only raise ResourceNotReady for NotFound raise k_exc.ResourceNotReady(ep_link) k8s.annotate(svc_link, {k_const.K8S_ANNOTATION_LBAAS_SPEC: annotation}, resource_version=service['metadata']['resourceVersion'])
def run(self): """ Override the function run: run testcase and update database """ update_data = {'task_id': self.args.get('task_id'), 'status': 'IN PROGRESS'} self.handler.insert(update_data) LOGGER.info('Starting running test case') try: data = self.target(self.args) except Exception as err: # pylint: disable=broad-except LOGGER.exception('Task Failed') update_data = {'status': 'FAIL', 'error': str(err)} self.handler.update_attr(self.args.get('task_id'), update_data) else: LOGGER.info('Task Finished') LOGGER.debug('Result: %s', data) new_data = {'status': 'FINISHED', 'result': jsonutils.dumps(data.get('result', {}))} self.handler.update_attr(self.args.get('task_id'), new_data)
def _get_file_contents(from_data, files): if not isinstance(from_data, (dict, list)): return if isinstance(from_data, dict): recurse_data = six.itervalues(from_data) for key, value in six.iteritems(from_data): if _ignore_if(key, value): continue if not value.startswith(('http://', 'https://')): raise exceptions.GetFileError(value, 'get_file') if value not in files: file_content = heat_utils.read_url_content(value) if template_utils.is_template(file_content): template = get_template_files(template_url=value)[1] file_content = jsonutils.dumps(template) files[value] = file_content else: recurse_data = from_data for value in recurse_data: _get_file_contents(value, files)
def sendjson(self, method, urlpath, obj=None): """Send json to the OpenDaylight controller.""" headers = {'Content-Type': 'application/json'} data = jsonutils.dumps(obj, indent=2) if obj else None url = '/'.join([self.url, urlpath]) LOG.debug("Sending METHOD (%(method)s) URL (%(url)s) JSON (%(obj)s)" % {'method': method, 'url': url, 'obj': obj}) r = requests.request(method, url=url, headers=headers, data=data, auth=self.auth, timeout=self.timeout) try: r.raise_for_status() except Exception as ex: LOG.error("Error Sending METHOD (%(method)s) URL (%(url)s)" "JSON (%(obj)s) return: %(r)s ex: %(ex)s rtext: " "%(rtext)s" % {'method': method, 'url': url, 'obj': obj, 'r': r, 'ex': ex, 'rtext': r.text}) return r try: return json.loads(r.content) except Exception: LOG.debug("%s" % r) return
def format_nested_dict(d, fields, column_names): if d is None: return '' pt = prettytable.PrettyTable(caching=False, print_empty=False, header=True, field_names=column_names) for n in column_names: pt.align[n] = 'l' keys = sorted(d.keys()) for field in keys: value = d[field] if not isinstance(value, six.string_types): value = jsonutils.dumps(value, indent=2, ensure_ascii=False) pt.add_row([field, value.strip('"')]) return pt.get_string()
def update_compute_node(self, context, node_uuid, values): if 'uuid' in values: msg = _('Cannot overwrite UUID for an existing node.') raise exception.InvalidParameterValue(err=msg) try: target = self.client.read('/compute_nodes/' + node_uuid) target_value = json.loads(target.value) target_value.update(values) target.value = json.dumps(target_value) self.client.update(target) except etcd.EtcdKeyNotFound: raise exception.ComputeNodeNotFound(compute_node=node_uuid) except Exception as e: LOG.error( 'Error occurred while updating compute node: %s', six.text_type(e)) raise return translate_etcd_result(target, 'compute_node')
def test_detach_volume(self, mock_cinder_api_cls, mock_get_connector_prprts, mock_get_volume_connector): volume = mock.MagicMock() volume.volume_id = self.fake_volume_id volume.connection_info = jsonutils.dumps(self.fake_conn_info) mock_cinder_api = mock.MagicMock() mock_cinder_api_cls.return_value = mock_cinder_api mock_connector = mock.MagicMock() mock_get_connector_prprts.return_value = self.fake_conn_prprts mock_get_volume_connector.return_value = mock_connector cinder = cinder_workflow.CinderWorkflow(self.context) cinder.detach_volume(volume) mock_cinder_api.begin_detaching.assert_called_once_with( self.fake_volume_id) mock_connector.disconnect_volume.assert_called_once_with( self.fake_conn_info['data'], None) mock_cinder_api.terminate_connection.assert_called_once_with( self.fake_volume_id, self.fake_conn_prprts) mock_cinder_api.detach.assert_called_once_with( self.fake_volume_id) mock_cinder_api.roll_detaching.assert_not_called()
def get_pci_resources(self): addresses = [] try: output, status = processutils.execute('lspci', '-D', '-nnmm') lines = output.split('\n') for line in lines: if not line: continue columns = line.split() address = columns[0] addresses.append(address) except processutils.ProcessExecutionError: raise exception.CommandError(cmd='lspci') pci_info = [] for addr in addresses: pci_info.append(self._get_pci_dev_info(addr)) return jsonutils.dumps(pci_info)
def serialize(self, entity): if entity is None: return None key = self._get_serialization_key(type(entity)) # Primitive or not registered type. if not key: return jsonutils.dumps( jsonutils.to_primitive(entity, convert_instances=True) ) serializer = self.serializers.get(key) if not serializer: raise RuntimeError( "Failed to find a serializer for the key: %s" % key ) result = { '__serial_key': key, '__serial_data': serializer.serialize(entity) } return jsonutils.dumps(result)
def test_clear_port_bindings(self): fake_port = copy.copy(test_constants.FAKE_PORT) fake_port['address_bindings'] = ['a', 'b'] mocked_resource = self.get_mocked_resource() def get_fake_port(*args): return fake_port mocked_resource.get = get_fake_port mocked_resource.update( fake_port['id'], fake_port['id'], address_bindings=[]) fake_port['address_bindings'] = [] test_client.assert_json_call( 'put', mocked_resource, 'https://1.2.3.4/api/v1/logical-ports/%s' % fake_port['id'], data=jsonutils.dumps(fake_port, sort_keys=True), headers=self.default_headers())
def test_create_logical_router(self): """Test creating a router returns the correct response and 201 status. """ fake_router = test_constants.FAKE_ROUTER.copy() router = self.get_mocked_resource() tier0_router = True description = 'dummy' router.create(fake_router['display_name'], None, None, tier0_router, description=description) data = { 'display_name': fake_router['display_name'], 'router_type': 'TIER0' if tier0_router else 'TIER1', 'tags': None, 'description': description } test_client.assert_json_call( 'post', router, 'https://1.2.3.4/api/v1/logical-routers', data=jsonutils.dumps(data, sort_keys=True), headers=self.default_headers())
def test_update_advertisement(self): router = self.get_mocked_resource() router_id = test_constants.FAKE_ROUTER_UUID data = {'advertise_nat_routes': 'a', 'advertise_nsx_connected_routes': 'b', 'advertise_static_routes': False, 'enabled': True, 'advertise_lb_vip': False, 'advertise_lb_snat_ip': False} with mock.patch("vmware_nsxlib.v3.NsxLib.get_version", return_value='2.1.0'), \ mock.patch.object(router.client, 'get', return_value={}): router.update_advertisement( router_id, **data) test_client.assert_json_call( 'put', router, ('https://1.2.3.4/api/v1/logical-routers/%s/routing/' 'advertisement' % router_id), data=jsonutils.dumps(data, sort_keys=True), headers=self.default_headers())
def test_update_advertisement_no_lb(self): router = self.get_mocked_resource() router_id = test_constants.FAKE_ROUTER_UUID data = {'advertise_nat_routes': 'a', 'advertise_nsx_connected_routes': 'b', 'advertise_static_routes': False, 'enabled': True} with mock.patch("vmware_nsxlib.v3.NsxLib.get_version", return_value='1.1.0'), \ mock.patch.object(router.client, 'get', return_value={}): # lb args will be ignored on this nsx version router.update_advertisement( router_id, advertise_lb_vip=False, advertise_lb_snat_ip=False, **data) test_client.assert_json_call( 'put', router, ('https://1.2.3.4/api/v1/logical-routers/%s/routing/' 'advertisement' % router_id), data=jsonutils.dumps(data, sort_keys=True), headers=self.default_headers())
def test_update_logical_router_port(self): fake_router_port = test_constants.FAKE_ROUTER_PORT.copy() uuid = fake_router_port['id'] fake_relay_uuid = uuidutils.generate_uuid() lrport = self.get_mocked_resource() with mock.patch.object(lrport, 'get', return_value=fake_router_port),\ mock.patch("vmware_nsxlib.v3.NsxLib.get_version", return_value='2.0.0'): lrport.update(uuid, relay_service_uuid=fake_relay_uuid) data = { 'id': uuid, 'display_name': fake_router_port['display_name'], 'logical_router_id': fake_router_port['logical_router_id'], 'resource_type': fake_router_port['resource_type'], "revision": 0, 'service_bindings': [{'service_id': { 'target_type': 'LogicalService', 'target_id': fake_relay_uuid}}] } test_client.assert_json_call( 'put', lrport, 'https://1.2.3.4/api/v1/logical-router-ports/%s' % uuid, data=jsonutils.dumps(data, sort_keys=True), headers=self.default_headers())
def test_create_ip_pool_no_ranges_with_gateway(self): pool = self.get_mocked_resource() cidr = '2.2.2.0/30' gateway_ip = '2.2.2.1' pool.create(cidr, allocation_ranges=None, gateway_ip=gateway_ip) exp_ranges = [{'start': '2.2.2.0', 'end': '2.2.2.0'}, {'start': '2.2.2.2', 'end': '2.2.2.3'}] data = { 'subnets': [{ 'gateway_ip': gateway_ip, 'allocation_ranges': exp_ranges, 'cidr': cidr, }] } test_client.assert_json_call( 'post', pool, 'https://1.2.3.4/api/v1/pools/ip-pools', data=jsonutils.dumps(data, sort_keys=True), headers=self.default_headers())
def test_create_ip_pool_no_ranges_no_gateway(self): pool = self.get_mocked_resource() cidr = '2.2.2.0/30' pool.create(cidr, allocation_ranges=None) exp_ranges = [{'start': '2.2.2.0', 'end': '2.2.2.3'}] data = { 'subnets': [{ 'allocation_ranges': exp_ranges, 'cidr': cidr, }] } test_client.assert_json_call( 'post', pool, 'https://1.2.3.4/api/v1/pools/ip-pools', data=jsonutils.dumps(data, sort_keys=True), headers=self.default_headers())
def test_update_metadata_proxy(self): fake_md = test_constants.FAKE_MD.copy() md = self.get_mocked_resource() new_url = "http://2.2.2.20:3500/xyz" new_secret = 'abc' new_edge = uuidutils.generate_uuid() with mock.patch.object(md.client, 'url_get', return_value=fake_md): md.update(fake_md['id'], server_url=new_url, secret=new_secret, edge_cluster_id=new_edge) fake_md.update({'metadata_server_url': new_url, 'secret': new_secret, 'edge_cluster_id': new_edge}) test_client.assert_json_call( 'put', md, 'https://1.2.3.4/api/v1/md-proxies/%s' % fake_md['id'], data=jsonutils.dumps(fake_md, sort_keys=True), headers=self.default_headers())
def _verify_backend_create(self, mocked_trust, cert_pem): """Verify API calls to create cert and identity on backend""" # verify API call to import cert on backend base_uri = 'https://1.2.3.4/api/v1/trust-management' uri = base_uri + '/certificates?action=import' expected_body = {'pem_encoded': cert_pem} test_client.assert_json_call('post', mocked_trust.client, uri, single_call=False, data=jsonutils.dumps(expected_body)) # verify API call to bind cert to identity on backend uri = base_uri + '/principal-identities' expected_body = {'name': self.identity, 'node_id': self.node_id, 'permission_group': 'read_write_api_users', 'certificate_id': self.cert_id, 'is_protected': True} test_client.assert_json_call('post', mocked_trust.client, uri, single_call=False, data=jsonutils.dumps(expected_body, sort_keys=True))
def test_json_request(self): resp = mocks.MockRequestsResponse( 200, jsonutils.dumps({'result': {'ok': 200}})) api = self.new_mocked_client(client.JSONRESTClient, session_response=resp, url_prefix='api/v2/nat') resp = api.create(body={'name': 'mgmt-egress'}) assert_json_call( 'post', api, 'https://1.2.3.4/api/v2/nat', data=jsonutils.dumps({'name': 'mgmt-egress'})) self.assertEqual(resp, {'result': {'ok': 200}})
def test_volumedriver_create(self): self.volume_providers_setup(['cinder']) fake_request = { u'Name': u'test-vol', u'Opts': {u'size': u'1'}, } for provider in app.volume_providers.values(): provider.check_exist = mock.MagicMock() provider.check_exist.return_value = False provider.create = mock.MagicMock() response = self.app.post('/VolumeDriver.Create', content_type='application/json', data=jsonutils.dumps(fake_request)) fake_response = { u'Err': u'' } self.assertEqual(200, response.status_code) self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_create_invalid_volume_provider(self): self.volume_providers_setup(['cinder']) fake_request = { u'Name': u'test-vol', u'Opts': {u'size': u'1', u'volume_provider': u'provider'}} for provider in app.volume_providers.values(): provider.check_exist = mock.MagicMock() provider.check_exist.return_value = False provider.create = mock.MagicMock() response = self.app.post('VolumeDriver.Create', content_type='application/json', data=jsonutils.dumps(fake_request)) fake_response = { u'Err': u'' } self.assertEqual(200, response.status_code) self.assertNotEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_remove(self): self.volume_providers_setup(['cinder']) fake_request = { u'Name': u'test-vol' } for provider in app.volume_providers.values(): provider.delete = mock.MagicMock() provider.delete.return_value = True response = self.app.post('/VolumeDriver.Remove', content_type='application/json', data=jsonutils.dumps(fake_request)) fake_response = { u'Err': u'' } self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_mount(self): self.volume_providers_setup(['cinder']) fake_name = u'test-vol' fake_request = { u'Name': fake_name } for provider in app.volume_providers.values(): provider.check_exist = mock.MagicMock() provider.check_exist.return_value = True provider.mount = mock.MagicMock() provider.mount.return_value = fake_mountpoint(fake_name) response = self.app.post('/VolumeDriver.Mount', content_type='application/json', data=jsonutils.dumps(fake_request)) fake_response = { u'Mountpoint': fake_mountpoint(fake_name), u'Err': u'' } self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_mount_with_volume_not_exist(self): self.volume_providers_setup(['cinder']) fake_name = u'test-vol' fake_request = { u'Name': fake_name, } for provider in app.volume_providers.values(): provider.check_exit = mock.MagicMock() provider.check_exit.return_value = False response = self.app.post('/VolumeDriver.Mount', content_type='application/json', data=jsonutils.dumps(fake_request)) fake_response = { u'Mountpoint': fake_mountpoint(fake_name), u'Err': u'' } self.assertEqual(200, response.status_code) self.assertNotEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_path(self): self.volume_providers_setup(['cinder']) fake_name = u'test-vol' fake_request = { u'Name': fake_name } for provider in app.volume_providers.values(): provider.show = mock.MagicMock() provider.show.return_value = fake_volume(fake_name) response = self.app.post('/VolumeDriver.Path', content_type='application/json', data=jsonutils.dumps(fake_request)) fake_response = { u'Mountpoint': fake_mountpoint(fake_name), u'Err': u'' } self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_path_with_volume_not_exist(self): self.volume_providers_setup(['cinder']) fake_docker_volume_name = u'test-vol' fake_request = { u'Name': fake_docker_volume_name } for provider in app.volume_providers.values(): provider.show = mock.MagicMock(side_effect=exceptions.NotFound) response = self.app.post('/VolumeDriver.Path', content_type='application/json', data=jsonutils.dumps(fake_request)) fake_response = { u'Err': u'Mountpoint Not Found' } self.assertEqual(200, response.status_code) self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_get_with_volume_not_exist(self): self.volume_providers_setup(['cinder']) fake_docker_volume_name = u'test-vol' fake_request = { u'Name': fake_docker_volume_name } for provider in app.volume_providers.values(): provider.show = mock.MagicMock(side_effect=exceptions.NotFound()) response = self.app.post('/VolumeDriver.Get', content_type='application/json', data=jsonutils.dumps(fake_request)) fake_response = { u'Err': u'Volume Not Found' } self.assertEqual(200, response.status_code) self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_ipam_driver_request_pool_with_default_v6pool(self, mock_list_subnetpools): fake_kuryr_subnetpool_id = uuidutils.generate_uuid() fake_name = 'kuryr6' kuryr_subnetpools = self._get_fake_v6_subnetpools( fake_kuryr_subnetpool_id, prefixes=['fe80::/64']) mock_list_subnetpools.return_value = { 'subnetpools': kuryr_subnetpools['subnetpools']} fake_request = { 'AddressSpace': '', 'Pool': '', 'SubPool': '', # In the case --ip-range is not given 'Options': {}, 'V6': True } response = self.app.post('/IpamDriver.RequestPool', content_type='application/json', data=jsonutils.dumps(fake_request)) self.assertEqual(200, response.status_code) mock_list_subnetpools.assert_called_with(name=fake_name) decoded_json = jsonutils.loads(response.data) self.assertEqual(fake_kuryr_subnetpool_id, decoded_json['PoolID'])
def test_network_driver_endpoint_operational_info_with_no_port(self): docker_network_id = lib_utils.get_hash() docker_endpoint_id = lib_utils.get_hash() fake_port_response = {"ports": []} with mock.patch.object(app.neutron, 'list_ports') as mock_list_ports: data = { 'NetworkID': docker_network_id, 'EndpointID': docker_endpoint_id, } mock_list_ports.return_value = fake_port_response response = self.app.post('/NetworkDriver.EndpointOperInfo', content_type='application/json', data=jsonutils.dumps(data)) decoded_json = jsonutils.loads(response.data) self.assertEqual(200, response.status_code) port_name = utils.get_neutron_port_name(docker_endpoint_id) mock_list_ports.assert_called_once_with(name=port_name) self.assertEqual({}, decoded_json['Value'])
def test_network_driver_allocate_network(self): docker_network_id = lib_utils.get_hash() allocate_network_request = { 'NetworkID': docker_network_id, 'IPv4Data': [{ 'AddressSpace': 'foo', 'Pool': '192.168.42.0/24', 'Gateway': '192.168.42.1/24', }], 'IPv6Data': [], 'Options': {} } response = self.app.post('/NetworkDriver.AllocateNetwork', content_type='application/json', data=jsonutils.dumps( allocate_network_request)) self.assertEqual(200, response.status_code) decoded_json = jsonutils.loads(response.data) self.assertEqual({'Options': {}}, decoded_json)
def test_node_reassign_handler_with_roles(self, mcast): cluster = self.env.create( cluster_kwargs={'api': False}, nodes_kwargs=[{'status': consts.NODE_STATUSES.ready, 'roles': ['controller']}]) node = cluster.nodes[0] seed_cluster = self.env.create_cluster(api=False) # NOTE(akscram): reprovision=True means that the node will be # re-provisioned during the reassigning. This is # a default behavior. data = {'nodes_ids': [node.id], 'reprovision': True, 'roles': ['compute']} resp = self.app.post( reverse('NodeReassignHandler', kwargs={'cluster_id': seed_cluster.id}), jsonutils.dumps(data), headers=self.default_headers) self.assertEqual(202, resp.status_code) self.assertEqual(node.roles, []) self.assertEqual(node.pending_roles, ['compute']) self.assertTrue(mcast.called)
def test_node_reassign_handler_without_reprovisioning(self, mcast): cluster = self.env.create( cluster_kwargs={'api': False}, nodes_kwargs=[{'status': consts.NODE_STATUSES.ready, 'roles': ['controller']}]) node = cluster.nodes[0] seed_cluster = self.env.create_cluster(api=False) data = {'nodes_ids': [node.id], 'reprovision': False, 'roles': ['compute']} resp = self.app.post( reverse('NodeReassignHandler', kwargs={'cluster_id': seed_cluster.id}), jsonutils.dumps(data), headers=self.default_headers) self.assertEqual(200, resp.status_code) self.assertFalse(mcast.called) self.assertEqual(node.roles, ['compute'])
def playbook_treeview(playbook): """ Creates a fake filesystem with playbook files and uses generate_tree() to recurse and return a JSON structure suitable for bootstrap-treeview. """ fs = fake_filesystem.FakeFilesystem() mock_os = fake_filesystem.FakeOsModule(fs) files = models.File.query.filter(models.File.playbook_id.in_([playbook])) paths = {} for file in files: fs.CreateFile(file.path) paths[file.path] = file.id return jsonutils.dumps(generate_tree('/', paths, mock_os), sort_keys=True, indent=2)
def _init_all_fc_dvs(self): """Send message to fc and get dvswitch info :return: """ LOG.debug("loading dvs mapping ") dvs_map_temp = {} physnet_map_temp = {} data = self.get(self.site.dvswitchs_uri) if not data.get(constant.DVSWITCHS): raise fc_exc.DVSwitchNotFound() dvs = data.get(constant.DVSWITCHS) if dvs and len(dvs) > 0: for dvswitch in dvs: dvs_id = utils.get_id_from_urn(dvswitch.get('urn')) dvs_map_temp[dvswitch["name"]] = dvs_id self.update_physnet_map(dvs_id, physnet_map_temp) LOG.debug( "init all fc dvs dvs map is %s, physnet map is %s", jsonutils.dumps(dvs_map_temp), jsonutils.dumps(physnet_map_temp)) self.dvs_mapping = dvs_map_temp self.physnet_mapping = physnet_map_temp
def create_vsp(self, dvs_id, pg_urn, vif): """send message to fusion compute to create a vsp :param dvs_id: :param pg_urn: :param vif: :return: """ vsp_path = self.get_path_by_site(constant.VSP_URI, dvs_id=dvs_id) port_id = vif['id'] body = { 'name': port_id, 'portGroupUrn': pg_urn, 'tags': [{'tagKey': constant.VSP_TAG_KEY, 'tagValue': port_id}] } ret = self.post(vsp_path, data=jsonutils.dumps(body)) return ret
def create_subports(num_ports, trunk_ips, timeout=180): method = 'POST' body = jsonutils.dumps({"trunks": trunk_ips, "num_ports": num_ports}) headers = {'Content-Type': 'application/json', 'Connection': 'close'} headers['Content-Length'] = len(body) path = 'http://localhost{0}'.format(constants.VIF_POOL_POPULATE) socket_path = constants.MANAGER_SOCKET_FILE conn = UnixDomainHttpConnection(socket_path, timeout) conn.request(method, path, body=body, headers=headers) resp = conn.getresponse() print(resp.read())
def delete_subports(trunk_ips, timeout=180): method = 'POST' body = jsonutils.dumps({"trunks": trunk_ips}) headers = {'Content-Type': 'application/json', 'Connection': 'close'} headers['Content-Length'] = len(body) path = 'http://localhost{0}'.format(constants.VIF_POOL_FREE) socket_path = constants.MANAGER_SOCKET_FILE conn = UnixDomainHttpConnection(socket_path, timeout) conn.request(method, path, body=body, headers=headers) resp = conn.getresponse() print(resp.read())
def list_pools(timeout=180): method = 'GET' body = jsonutils.dumps({}) headers = {'Context-Type': 'application/json', 'Connection': 'close'} headers['Context-Length'] = len(body) path = 'http://localhost{0}'.format(constants.VIF_POOL_LIST) socket_path = constants.MANAGER_SOCKET_FILE conn = UnixDomainHttpConnection(socket_path, timeout) conn.request(method, path, body=body, headers=headers) resp = conn.getresponse() print(resp.read())
def show_pool(trunk_ip, project_id, sg, timeout=180): method = 'GET' body = jsonutils.dumps({"pool_key": [trunk_ip, project_id, sg]}) headers = {'Context-Type': 'application/json', 'Connection': 'close'} headers['Context-Length'] = len(body) path = 'http://localhost{0}'.format(constants.VIF_POOL_SHOW) socket_path = constants.MANAGER_SOCKET_FILE conn = UnixDomainHttpConnection(socket_path, timeout) conn.request(method, path, body=body, headers=headers) resp = conn.getresponse() print(resp.read())