我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用nose.tools.assert_true()。
def test_powerview_create_title_missing(self): '''Missing title raises ValidationError.''' sysadmin = Sysadmin() data_dict = self._make_create_data_dict() # remove title key del data_dict['title'] with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_create')( context={'user': sysadmin['name']}, data_dict=data_dict ) error_dict = cm.exception.error_dict['title'] nosetools.assert_true("Missing value" in error_dict, "Expected string not in exception message.")
def test_powerview_create_view_type_missing(self): '''Missing view_type raises ValidationError.''' sysadmin = Sysadmin() data_dict = self._make_create_data_dict() # remove title key del data_dict['view_type'] with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_create')( context={'user': sysadmin['name']}, data_dict=data_dict ) error_dict = cm.exception.error_dict['view_type'] nosetools.assert_true("Missing value" in error_dict, "Expected string not in exception message.")
def test_powerview_create_config_must_be_json(self): '''If present, config must be a json string.''' sysadmin = Sysadmin() data_dict = self._make_create_data_dict() # replace config with non-json string data_dict['config'] = "I'm not json." with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_create')( context={'user': sysadmin['name']}, data_dict=data_dict ) error_dict = cm.exception.error_dict['config'] nosetools.assert_true("Could not parse as valid JSON" in error_dict, "Expected string not in exception message.")
def test_powerview_update_no_id(self): '''Updating with a missing powerview id raises error.''' sysadmin = Sysadmin() powerview_dict = factories.PowerView() del powerview_dict['id'] with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_update')( context={'user': sysadmin['name']}, data_dict=powerview_dict ) error_dict = cm.exception.error_dict['id'] nosetools.assert_true("Missing value" in error_dict, "Expected string not in exception message.")
def test_powerview_update_nonexisting_id(self): '''Updating with a non-existing powerview id raises error.''' sysadmin = Sysadmin() powerview_dict = factories.PowerView() powerview_dict['id'] = 'non-existing-id' with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_update')( context={'user': sysadmin['name']}, data_dict=powerview_dict ) error_dict = cm.exception.error_dict['id'] nosetools.assert_true("Not found: PowerView" in error_dict, "Expected string not in exception message.")
def test_powerview_show_no_id(self): '''Calling powerview show with a missing id raises error.''' sysadmin = Sysadmin() factories.PowerView() with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_show')( context={'user': sysadmin['name']}, data_dict={} ) error_dict = cm.exception.error_dict['id'] nosetools.assert_true("Missing value" in error_dict, "Expected string not in exception message.")
def test_powerview_show_nonexisting_id(self): '''Calling powerview show with a non-existing id raises error.''' sysadmin = Sysadmin() factories.PowerView() with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_show')( context={'user': sysadmin['name']}, data_dict={'id': 'non-existin-id'} ) error_dict = cm.exception.error_dict['id'] nosetools.assert_true("Not found: PowerView" in error_dict, "Expected string not in exception message.")
def test_powerview_resource_list_nonexisting_id(self): '''Calling powerview resource list with a non-existing id raises error.''' sysadmin = Sysadmin() factories.PowerView() with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_resource_list')( context={'user': sysadmin['name']}, data_dict={'id': 'non-existin-id'} ) error_dict = cm.exception.error_dict['id'] nosetools.assert_true("Not found: PowerView" in error_dict, "Expected string not in exception message.")
def test_powerview_delete_no_id(self): '''Calling powerview delete with a missing id raises error.''' sysadmin = Sysadmin() factories.PowerView() with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_delete')( context={'user': sysadmin['name']}, data_dict={} ) error_dict = cm.exception.error_dict['id'] nosetools.assert_true("Missing value" in error_dict, "Expected string not in exception message.")
def test_powerview_delete_nonexisting_id(self): '''Calling powerview delete with a non-existing id raises error.''' sysadmin = Sysadmin() factories.PowerView() with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_delete')( context={'user': sysadmin['name']}, data_dict={'id': 'non-existin-id'} ) error_dict = cm.exception.error_dict['id'] nosetools.assert_true("Not found: PowerView" in error_dict, "Expected string not in exception message.")
def test_powerview_add_resource_nonexisting_resource_id(self): '''Calling powerview_add_resource with nonexisting resource id raises ValidationError.''' sysadmin = Sysadmin() powerview = factories.PowerView() with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_add_resource')( context={'user': sysadmin['name']}, data_dict={'id': powerview['id'], 'resource_id': 'non-existing-id'} ) error_dict = cm.exception.error_dict['resource_id'] nosetools.assert_true("Not found: Resource" in error_dict, "Expected string not in exception message.")
def test_powerview_remove_resource_nonexisting_resource_id(self): '''Calling powerview_remove_resource with nonexisting resource id raises ValidationError.''' sysadmin = Sysadmin() powerview = factories.PowerView() with nosetools.assert_raises(ValidationError) as cm: toolkit.get_action('powerview_remove_resource')( context={'user': sysadmin['name']}, data_dict={'id': powerview['id'], 'resource_id': 'non-existing-id'} ) error_dict = cm.exception.error_dict['resource_id'] nosetools.assert_true("Not found: Resource" in error_dict, "Expected string not in exception message.")
def test_powerview_create_normal_user_allow_create_authed_resources(self): ''' Calling powerview create with normal user when allow_user_create is true, with authorized private resources doesn't raise NotAuthorized. ''' a_user = factories.User() org = factories.Organization(users=[{ 'name': a_user['name'], 'capacity': 'member'}]) dataset = factories.Dataset(owner_org=org['id'], private="true") r1 = factories.Resource(package_id=dataset['id']) r2 = factories.Resource(package_id=dataset['id']) pv_dict = self._make_create_data_dict(resources=[r1['id'], r2['id']]) context = {'user': a_user['name'], 'model': model} nosetools.assert_true(helpers.call_auth('ckanext_powerview_create', context=context, **pv_dict))
def test_powerview_update_normal_user_allow_create_add_authed_resources(self): ''' Calling powerview update with normal user when allow_user_create is true, with authorized private resources doesn't raise NotAuthorized. ''' a_user = factories.User() org = factories.Organization(users=[{'name': a_user['name'], 'capacity': 'member'}]) dataset = factories.Dataset(owner_org=org['id'], private="true") r1 = factories.Resource(package_id=dataset['id']) r2 = factories.Resource(package_id=dataset['id']) pv = powerview_factories.PowerView(user=a_user, private=False) pv['resources'] = [r1['id'], r2['id']] context = {'user': a_user['name'], 'model': model} nosetools.assert_true(helpers.call_auth('ckanext_powerview_update', context=context, **pv))
def test_powerview_add_resource_sysadmin(self): ''' Calling powerview add resource for a sysadmin does not raise NotAuthorized. ''' a_sysadmin = factories.Sysadmin() dataset = factories.Dataset() r1 = factories.Resource(package_id=dataset['id']) powerview = powerview_factories.PowerView() context = {'user': a_sysadmin['name'], 'model': model} nosetools.assert_true( helpers.call_auth('ckanext_powerview_add_resource', context=context, powerview_id=powerview['id'], resource_id=r1['id']))
def test_powerview_add_resource_normal_user_allow_create(self): ''' Calling powerview add resource for a normal user does not raise NotAuthorized for an owned powerview and resource ''' a_user = factories.User() dataset = factories.Dataset() r1 = factories.Resource(package_id=dataset['id']) powerview = powerview_factories.PowerView(user=a_user) context = {'user': a_user['name'], 'model': model} nosetools.assert_true( helpers.call_auth('ckanext_powerview_add_resource', context=context, powerview_id=powerview['id'], resource_id=r1['id']))
def test_powerview_add_resource_normal_user_allow_create_authed_resource(self): ''' Calling powerview add resource for a normal user doesn't raise NotAuthorized for an owned powerview and private authorized resource. ''' a_user = factories.User() org = factories.Organization(users=[{'name': a_user['name'], 'capacity': 'member'}]) dataset = factories.Dataset(owner_org=org['id'], private="true") r1 = factories.Resource(package_id=dataset['id']) powerview = powerview_factories.PowerView(user=a_user) context = {'user': a_user['name'], 'model': model} nosetools.assert_true( helpers.call_auth('ckanext_powerview_add_resource', context=context, powerview_id=powerview['id'], resource_id=r1['id']))
def test_powerview_remove_resource_sysadmin(self): ''' Calling powerview remove resource for a sysadmin does not raise NotAuthorized. ''' a_sysadmin = factories.Sysadmin() dataset = factories.Dataset() r1 = factories.Resource(package_id=dataset['id']) powerview = powerview_factories.PowerView(resources=[r1['id']]) context = {'user': a_sysadmin['name'], 'model': model} nosetools.assert_true( helpers.call_auth('ckanext_powerview_remove_resource', context=context, powerview_id=powerview['id'], resource_id=r1['id']))
def test_powerview_remove_resource_normal_user_allow_create(self): ''' Calling powerview remove resource for a normal user does not raise NotAuthorized for an owned powerview and resource ''' a_user = factories.User() dataset = factories.Dataset() r1 = factories.Resource(package_id=dataset['id']) powerview = powerview_factories.PowerView(user=a_user, resources=[r1['id']]) context = {'user': a_user['name'], 'model': model} nosetools.assert_true( helpers.call_auth('ckanext_powerview_remove_resource', context=context, powerview_id=powerview['id'], resource_id=r1['id']))
def test_powerview_remove_resource_normal_user_allow_create_authed_resource(self): ''' Calling powerview remove resource for a normal user doesn't raise NotAuthorized for an owned powerview and private authorized resource. ''' a_user = factories.User() org = factories.Organization(users=[{'name': a_user['name'], 'capacity': 'member'}]) dataset = factories.Dataset(owner_org=org['id'], private="true") r1 = factories.Resource(package_id=dataset['id']) powerview = powerview_factories.PowerView(user=a_user, resources=[r1['id']]) context = {'user': a_user['name'], 'model': model} nosetools.assert_true( helpers.call_auth('ckanext_powerview_remove_resource', context=context, powerview_id=powerview['id'], resource_id=r1['id']))
def test_powerview_resource_list_sysadmin_private_resources(self): ''' Calling powerview resource list with a sysadmin on a powerview containing private resources, doesn't raise errors. ''' a_sysadmin = factories.Sysadmin() org = factories.Organization() dataset = factories.Dataset(owner_org=org['id'], private="true") r1 = factories.Resource(package_id=dataset['id']) r2 = factories.Resource(package_id=dataset['id']) powerview = powerview_factories.PowerView(private='no', resources=[r1['id'], r2['id']]) context = {'user': a_sysadmin['name'], 'model': model} nosetools.assert_true( helpers.call_auth('ckanext_powerview_resource_list', context=context, id=powerview['id']))
def test_powerview_update(self): '''Updating with valid data_dict.''' sysadmin = Sysadmin() powerview_dict_create = factories.PowerView() powerview_dict_update = toolkit.get_action('powerview_update')( context={'user': sysadmin['name']}, data_dict=powerview_dict_create.copy() ) # last_modified has changed nosetools.assert_true(powerview_dict_create['last_modified'] is None) nosetools.assert_true(powerview_dict_update['last_modified'] is not None) # but it should be the only thing that changed powerview_dict_update['last_modified'] = None nosetools.assert_equal(powerview_dict_create, powerview_dict_update)
def test_powerview_show_with_resources(self): '''Calling powerview show should return list of resource ids.''' sysadmin = Sysadmin() r1 = Resource() r2 = Resource() r3 = Resource() powerview_dict_create = factories.PowerView(resources=[r1['id'], r2['id'], r3['id']]) powerview_dict_show = toolkit.get_action('powerview_show')( context={'user': sysadmin['name']}, data_dict={'id': powerview_dict_create['id']} ) nosetools.assert_equal(powerview_dict_create, powerview_dict_show) resource_list = powerview_dict_show['resources'] nosetools.assert_equal(len(resource_list), 3) nosetools.assert_true(r1['id'] in resource_list) nosetools.assert_true(r2['id'] in resource_list) nosetools.assert_true(r3['id'] in resource_list)
def test_powerview_list_offset_and_limit(self): ''' Calling powerview_list with an offset and limit returns expected results. ''' # make some powerviews for i in xrange(0, 20): factories.PowerView(title='powerview_{0}'.format(i + 1), private=False) powerview_list = toolkit.get_action('powerview_list')( data_dict={ 'limit': 10, 'offset': 10 } ) nosetools.assert_equal(len(powerview_list), 10) pv_ids = [pv['title'] for pv in powerview_list] for i in xrange(10, 20): nosetools.assert_true('powerview_{0}'.format(i + 1) in pv_ids)
def test_powerview_list_private_powerview(self): ''' Calling powerview_list by a normal user only returns public and authorized powerviews. ''' user_one = User() user_two = User() p1 = factories.PowerView(user=user_one, private=False) p2 = factories.PowerView(user=user_one, private=True) context = {'user': user_two['name']} powerview_list = toolkit.get_action('powerview_list')( context=context, data_dict={'id': user_one['name']}) nosetools.assert_equal(len(powerview_list), 1) nosetools.assert_true(p1 in powerview_list) nosetools.assert_true(p2 not in powerview_list)
def test_powerview_list_private_powerview_authorized(self): ''' Calling powerview_list by a normal user returns public and private powerviews if they are the creator. ''' user_one = User() p1 = factories.PowerView(user=user_one, private=False) p2 = factories.PowerView(user=user_one, private=True) context = {'user': user_one['name']} powerview_list = toolkit.get_action('powerview_list')( context=context, data_dict={'id': user_one['name']}) nosetools.assert_equal(len(powerview_list), 2) nosetools.assert_true(p1 in powerview_list) nosetools.assert_true(p2 in powerview_list)
def test_powerview_list_user_powerviews(self): ''' Calling powerview_list only returns powerviews for the passed user id. ''' user_one = User() user_two = User() user_three = User() p1 = factories.PowerView(user=user_one, private=False) p2 = factories.PowerView(user=user_two, private=False) p3 = factories.PowerView(user=user_two, private=False) context = {'user': user_three['name']} powerview_list = toolkit.get_action('powerview_list')( context=context, data_dict={'id': user_two['name']}) nosetools.assert_equal(len(powerview_list), 2) nosetools.assert_true(p1 not in powerview_list) nosetools.assert_true(p2 in powerview_list) nosetools.assert_true(p3 in powerview_list)
def test_powerview_create(self): '''Creating a powerview returns a dict with expected values''' sysadmin = Sysadmin() powerview_result = toolkit.get_action('powerview_create')( context={'user': sysadmin['name']}, data_dict=self._make_create_data_dict() ) nosetools.assert_true(isinstance(powerview_result, dict)) nosetools.assert_true(powerview_result['title'] == 'Title') nosetools.assert_true(powerview_result['description'] == 'My short description.') nosetools.assert_true(powerview_result['view_type'] == 'my-view-type') nosetools.assert_true(isinstance(powerview_result['config'], dict)) nosetools.assert_true(powerview_result['private']) # created_by field auto-populated nosetools.assert_equal(powerview_result['created_by'], sysadmin['id'])
def test_custom_search(self): helpers.call_action('package_create', name='test_package_a', custom_text='z') helpers.call_action('package_create', name='test_package_b', custom_text='y') response = self.app.get('/dataset') # change the sort by form to our custom_text ascending response.forms[1].fields['sort'][0].value = 'custom_text asc' response = response.forms[1].submit() # check that package_b appears before package a (y < z) a = response.body.index('test_package_a') b = response.body.index('test_package_b') nt.assert_true(b < a) response.forms[1].fields['sort'][0].value = 'custom_text desc' # check that package_a appears before package b (z is first in # descending order) response = response.forms[1].submit() a = response.body.index('test_package_a') b = response.body.index('test_package_b') nt.assert_true(a < b)
def test_upgrade_from_sha_with_unicode_password(self): user = factories.User() password = u'testpassword\xc2\xa0' user_obj = model.User.by_name(user['name']) # setup our user with an old password hash old_hash = self._set_password(password) user_obj._password = old_hash user_obj.save() nt.assert_true(user_obj.validate_password(password)) nt.assert_not_equals(old_hash, user_obj.password) nt.assert_true(pbkdf2_sha512.identify(user_obj.password)) nt.assert_true(pbkdf2_sha512.verify(password, user_obj.password)) # check that we now allow unicode characters nt.assert_false(pbkdf2_sha512.verify('testpassword', user_obj.password))
def test_local_file_completions(): ip = get_ipython() with TemporaryWorkingDirectory(): prefix = './foo' suffixes = ['1', '2'] names = [prefix+s for s in suffixes] for n in names: open(n, 'w').close() # Check simple completion c = ip.complete(prefix)[1] nt.assert_equal(c, names) # Now check with a function call cmd = 'a = f("%s' % prefix c = ip.complete(prefix, cmd)[1] comp = set(prefix+s for s in suffixes) nt.assert_true(comp.issubset(set(c)))
def test_rehashx(): # clear up everything _ip.alias_manager.clear_aliases() del _ip.db['syscmdlist'] _ip.magic('rehashx') # Practically ALL ipython development systems will have more than 10 aliases nt.assert_true(len(_ip.alias_manager.aliases) > 10) for name, cmd in _ip.alias_manager.aliases: # we must strip dots from alias names nt.assert_not_in('.', name) # rehashx must fill up syscmdlist scoms = _ip.db['syscmdlist'] nt.assert_true(len(scoms) > 10)
def test_line_cell_info(): """%%foo and %foo magics are distinguishable to inspect""" ip = get_ipython() ip.magics_manager.register(FooFoo) oinfo = ip.object_inspect('foo') nt.assert_true(oinfo['found']) nt.assert_true(oinfo['ismagic']) oinfo = ip.object_inspect('%%foo') nt.assert_true(oinfo['found']) nt.assert_true(oinfo['ismagic']) nt.assert_equal(oinfo['docstring'], FooFoo.cell_foo.__doc__) oinfo = ip.object_inspect('%foo') nt.assert_true(oinfo['found']) nt.assert_true(oinfo['ismagic']) nt.assert_equal(oinfo['docstring'], FooFoo.line_foo.__doc__)
def test_command_chain_dispatcher_ff(): """Test two failing hooks""" fail1 = Fail(u'fail1') fail2 = Fail(u'fail2') dp = CommandChainDispatcher([(0, fail1), (10, fail2)]) try: dp() except TryNext as e: nt.assert_equal(str(e), u'fail2') else: assert False, "Expected exception was not raised." nt.assert_true(fail1.called) nt.assert_true(fail2.called)
def test_last_two_blanks(): nt.assert_false(isp.last_two_blanks('')) nt.assert_false(isp.last_two_blanks('abc')) nt.assert_false(isp.last_two_blanks('abc\n')) nt.assert_false(isp.last_two_blanks('abc\n\na')) nt.assert_false(isp.last_two_blanks('abc\n \n')) nt.assert_false(isp.last_two_blanks('abc\n\n')) nt.assert_true(isp.last_two_blanks('\n\n')) nt.assert_true(isp.last_two_blanks('\n\n ')) nt.assert_true(isp.last_two_blanks('\n \n')) nt.assert_true(isp.last_two_blanks('abc\n\n ')) nt.assert_true(isp.last_two_blanks('abc\n\n\n')) nt.assert_true(isp.last_two_blanks('abc\n\n \n')) nt.assert_true(isp.last_two_blanks('abc\n\n \n ')) nt.assert_true(isp.last_two_blanks('abc\n\n \n \n')) nt.assert_true(isp.last_two_blanks('abc\nd\n\n\n')) nt.assert_true(isp.last_two_blanks('abc\nd\ne\nf\n\n\n'))
def test_basic_class(): def type_pprint_wrapper(obj, p, cycle): if obj is MyObj: type_pprint_wrapper.called = True return pretty._type_pprint(obj, p, cycle) type_pprint_wrapper.called = False stream = StringIO() printer = pretty.RepresentationPrinter(stream) printer.type_pprinters[type] = type_pprint_wrapper printer.pretty(MyObj) printer.flush() output = stream.getvalue() nt.assert_equal(output, '%s.MyObj' % __name__) nt.assert_true(type_pprint_wrapper.called)
def test_deepreload(): "Test that dreload does deep reloads and skips excluded modules." with TemporaryDirectory() as tmpdir: with prepended_to_syspath(tmpdir): with open(os.path.join(tmpdir, 'A.py'), 'w') as f: f.write("class Object(object):\n pass\n") with open(os.path.join(tmpdir, 'B.py'), 'w') as f: f.write("import A\n") import A import B # Test that A is not reloaded. obj = A.Object() dreload(B, exclude=['A']) nt.assert_true(isinstance(obj, A.Object)) # Test that A is reloaded. obj = A.Object() dreload(B) nt.assert_false(isinstance(obj, A.Object))
def test_correct_geometric_distortion_signal_type(self): dp=self.signal dp.correct_geometric_distortion(D=np.array([[1., 0., 0.], [0., 1., 0.], [0., 0., 1.]])) nt.assert_true(isinstance(dp, ElectronDiffraction)) # def test_geometric_distortion_rotation_origin(self): # dp = self.signal # dp.correct_geometric_distortion() # np.testing.assert_allclose(rp.data, np.array([[5., 4.25, 2.875, # 1.7, 0.92857143, 0.], # [5., 4.75, 3.625, # 2.5, 1.71428571,0.6]]), # atol=1e-3) # def test_geometric_distortion(self): # dp = self.signal # rp = dp.get_radial_profile(centers=np.array([[4, 3], [4, 3]])) # np.testing.assert_allclose(rp.data, np.array([[5., 4.25, 2.875, # 1.7, 0.92857143, 0.], # [5., 4.375, 3.5, # 2.4, 2.07142857, 1.]]), # atol=1e-3)
def _test_calculate_on_random_set(self, alpha, random_str): """ Test utils.entropy on random set generated from alphanumerics. Note, this method presumes a uniform distribution from the stringer.random_string method. """ STRING_LENGTH = len(random_str) ALPHA_LEN = len(alpha) ent_calc = self.ent.calculate(random_str) p = 1/ALPHA_LEN # this is the expected Shannon entropy for a uniform distribution exp_ent = STRING_LENGTH * p * log2(p) accepted_err = 10 # why the hell is this failing? # assert_true(exp_ent-accepted_err <= # ent_calc <= # exp_ent+accepted_err)
def test_subprocess_error(): """error in mp.Process doesn't crash""" with new_kernel() as kc: iopub = kc.iopub_channel code = '\n'.join([ "import multiprocessing as mp", "p = mp.Process(target=int, args=('hi',))", "p.start()", "p.join()", ]) msg_id, content = execute(kc=kc, code=code) stdout, stderr = assemble_output(iopub) nt.assert_equal(stdout, '') nt.assert_true("ValueError" in stderr, stderr) _check_master(kc, expected=True) _check_master(kc, expected=True, stream="stderr") # raw_input tests
def test_basic_class(): def type_pprint_wrapper(obj, p, cycle): if obj is MyObj: type_pprint_wrapper.called = True return pretty._type_pprint(obj, p, cycle) type_pprint_wrapper.called = False stream = StringIO() printer = pretty.RepresentationPrinter(stream) printer.type_pprinters[type] = type_pprint_wrapper printer.pretty(MyObj) printer.flush() output = stream.getvalue() nt.assert_equal(output, '%s.MyObj' % __name__) nt.assert_true(type_pprint_wrapper.called) # This is only run on Python 2 because in Python 3 the language prevents you # from setting a non-unicode value for __qualname__ on a metaclass, and it # doesn't respect the descriptor protocol if you subclass unicode and implement # __get__.
def assign_labeled_network(api): """ Adds the labeled network to the cluster and asserts the hosts are attached """ engine = api.system_service() labeled_net = engine.networks_service().list( search='name={}'.format(LABELED_NET_NAME))[0] # the logical network will be automatically assigned to all host network # interfaces with that label asynchronously cluster_service = test_utils.get_cluster_service(engine, CLUSTER_NAME) nt.assert_true( cluster_service.networks_service().add(labeled_net) ) hosts_service = engine.hosts_service() for host in test_utils.hosts_in_cluster_v4(engine, CLUSTER_NAME): host_service = hosts_service.host_service(id=host.id) testlib.assert_true_within_short( functools.partial(_host_is_attached_to_network, engine, host_service, LABELED_NET_NAME))
def add_filter(ovirt_api4): engine = ovirt_api4.system_service() nics_service = test_utils.get_nics_service(engine, VM0_NAME) nic = nics_service.list()[0] network = ovirt_api4.follow_link(nic.vnic_profile).network network_filters_service = engine.network_filters_service() network_filter = next( network_filter for network_filter in network_filters_service.list() if network_filter.name == NETWORK_FILTER_NAME ) vnic_profiles_service = engine.vnic_profiles_service() vnic_profile = vnic_profiles_service.add( types.VnicProfile( name='{}_profile'.format(network_filter.name), network=network, network_filter=network_filter ) ) nic.vnic_profile = vnic_profile nt.assert_true( nics_service.nic_service(nic.id).update(nic) )
def add_filter_parameter(prefix): engine = prefix.virt_env.engine_vm() ovirt_api4 = engine.get_api(api_ver=4) vm_gw = '.'.join(engine.ip().split('.')[0:3] + ['1']) network_filter_parameters_service = test_utils.get_network_fiter_parameters_service( ovirt_api4.system_service(), VM0_NAME) nt.assert_true( network_filter_parameters_service.add( types.NetworkFilterParameter( name=NETWORK_FILTER_PARAMETER0_NAME, value=NETWORK_FILTER_PARAMETER0_VALUE ) ) ) nt.assert_true( network_filter_parameters_service.add( types.NetworkFilterParameter( name=NETWORK_FILTER_PARAMETER1_NAME, value=vm_gw ) ) )