我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用mock.patch.multiple()。
def test_nrpe_dependency_installed(self, mock_config): config = copy.deepcopy(CHARM_CONFIG) mock_config.side_effect = lambda key: config[key] with patch.multiple(ceph_hooks, apt_install=DEFAULT, rsync=DEFAULT, log=DEFAULT, write_file=DEFAULT, nrpe=DEFAULT) as mocks: ceph_hooks.update_nrpe_config() mocks["apt_install"].assert_called_once_with( ["python-dbus", "lockfile-progs"])
def test_train_1(self): j = 2 ret_train = np.zeros((6, 3, N_CLASSES)) ret_dev = np.zeros((6, 3, N_CLASSES)) func_ret = np.zeros((1, N_CLASSES)) func_ret[0, j] = 1. with patch.multiple(self.wb, _gs=True, _generate_ts=lambda *x: (self.TRAIN_X, Y), _extract_features=MagicMock( return_value=FEATS), _model=MOCK_DEFAULT): with patch("dsenser.wang.wangbase.GridSearchCV"): self.wb._model.decision_function = \ MagicMock(return_value=func_ret) self.wb._model.classes_ = CLASSES_ self.wb.train(([(0, REL1)], [PARSE1]), ([(0, REL1)], [PARSE1]), 1, 1, ret_train, ret_dev)
def test_predict_1(self): x = np.array([[0, 1, 2, 3, 4], [[0, 1, 2, 3, 4]]]) y = np.array([0, 0, 0, 0]) def _predict_func_mock(*args, **kwargs): return np.array([[0, 0, 1, 0], [0, 0, 0, 1], [0, 0, 1, 0], [1, 0, 0, 0], ]) with patch.multiple(self.nnbs, _predict_func=_predict_func_mock, get_test_w_emb_i=None, _init_wemb_funcs=MagicMock(), _rel2x=MagicMock(return_value=x)): ret = np.array([[0] * 4]) self.nnbs.predict(None, (None, None), ret, 0) assert np.allclose(ret[0], y)
def test_init_wemb_funcs_0(self): word2vec = Mock() word2vec.load = MagicMock() word2vec.ndim = 2 fmock = MagicMock() with patch.multiple(self.nnbs, _plain_w2v=True, ndim=8, w2v=word2vec, _trained=True, _predict_func_emb=fmock): self.nnbs._init_wemb_funcs() assert word2vec.load.called assert self.nnbs.ndim == word2vec.ndim assert self.nnbs.get_train_w_emb_i == \ self.nnbs._get_train_w2v_emb_i assert self.nnbs.get_test_w_emb_i == \ self.nnbs._get_test_w2v_emb_i assert self.nnbs._predict_func == fmock
def test_init_wemb_funcs_2(self): word2vec = Mock() word2vec.load = MagicMock() word2vec.ndim = 2 fmock = MagicMock() with patch.multiple(self.nnbs, _plain_w2v=False, lstsq=True, ndim=8, w2v=None, _trained=True, _predict_func_emb=fmock): with patch.object(dsenser.nnbase, "Word2Vec", word2vec): self.nnbs._init_wemb_funcs() assert self.nnbs.ndim == DFLT_VDIM assert word2vec.load.called assert self.nnbs.get_train_w_emb_i == \ self.nnbs._get_train_w2v_emb_i assert self.nnbs.get_test_w_emb_i == \ self.nnbs._get_test_w2v_lstsq_emb_i
def test_upgrade_charm_with_nrpe_relation_installs_dependencies(self): with patch.multiple( ceph_hooks, apt_install=DEFAULT, rsync=DEFAULT, log=DEFAULT, write_file=DEFAULT, nrpe=DEFAULT, emit_cephconf=DEFAULT, mon_relation_joined=DEFAULT, is_relation_made=DEFAULT) as mocks, patch( "charmhelpers.contrib.hardening.harden.config"): mocks["is_relation_made"].return_value = True ceph_hooks.upgrade_charm() mocks["apt_install"].assert_called_with( ["python-dbus", "lockfile-progs"])
def test_not_implemented_errors(self): # Construction of the base class should not be possible self.assertRaises(NotImplementedError, StateMachineDevice) mandatory_methods = { '_get_state_handlers': Mock(return_value={'test': MagicMock()}), '_get_initial_state': Mock(return_value='test'), '_get_transition_handlers': Mock( return_value={('test', 'test'): Mock(return_value=True)}) } # If any of the mandatory methods is missing, a NotImplementedError must be raised for methods in itertools.combinations(mandatory_methods.items(), 2): with patch.multiple('lewis.devices.StateMachineDevice', **dict(methods)): self.assertRaises(NotImplementedError, StateMachineDevice) # If all are implemented, no exception should be raised with patch.multiple('lewis.devices.StateMachineDevice', **mandatory_methods): assertRaisesNothing(self, StateMachineDevice)
def mocks(request, testdir): testdir.makeini(""" [pytest] IMAGE = myregistry/defaultimage MINION_IMAGE = myregistry/minion_image """) plugin_mocks = patch.multiple( 'saltcontainers.plugin', Client=MagicMock(**{ 'return_value.inspect_container.return_value': { 'NetworkSettings': {'IPAddress': 'fake-ip'}} })) salt_key_mock = patch( 'saltcontainers.factories.MasterModel.salt_key', **{'return_value.__getitem__.return_value.__contains__.return_value': True}) plugin_mocks.start() request.addfinalizer(plugin_mocks.stop) salt_key_mock.start() request.addfinalizer(salt_key_mock.stop)
def test_authenticate_server(self): with patch.multiple(kerberos_module_name, authGSSClientStep=clientStep_complete): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 response_ok.headers = { 'www-authenticate': 'negotiate servertoken', 'authorization': 'Negotiate GSSRESPONSE'} auth = requests_kerberos.HTTPKerberosAuth() auth.context = {"www.example.org": "CTX"} result = auth.authenticate_server(response_ok) self.assertTrue(result) clientStep_complete.assert_called_with("CTX", "servertoken")
def test_upgrade_charm_with_nrpe_relation_installs_dependencies( self, mock_config): config = copy.deepcopy(CHARM_CONFIG) mock_config.side_effect = lambda key: config[key] with patch.multiple( ceph_hooks, apt_install=DEFAULT, rsync=DEFAULT, log=DEFAULT, write_file=DEFAULT, nrpe=DEFAULT, emit_cephconf=DEFAULT, mon_relation_joined=DEFAULT, is_relation_made=DEFAULT) as mocks, patch( "charmhelpers.contrib.hardening.harden.config"): mocks["is_relation_made"].return_value = True ceph_hooks.upgrade_charm() mocks["apt_install"].assert_called_with( ["python-dbus", "lockfile-progs"])
def test_send_request(self): """ Test the execution of a deferred Supervisor request. """ from supvisors.mainloop import SupvisorsMainLoop from supvisors.utils import DeferredRequestHeaders main_loop = SupvisorsMainLoop(self.supvisors) # patch main loop subscriber with patch.multiple(main_loop, check_address=DEFAULT, start_process=DEFAULT, stop_process=DEFAULT, restart=DEFAULT, shutdown=DEFAULT) as mocked_loop: # test check address self.check_call(main_loop, mocked_loop, 'check_address', DeferredRequestHeaders.CHECK_ADDRESS, ('10.0.0.2', )) # test start process self.check_call(main_loop, mocked_loop, 'start_process', DeferredRequestHeaders.START_PROCESS, ('10.0.0.2', 'dummy_process', 'extra args')) # test stop process self.check_call(main_loop, mocked_loop, 'stop_process', DeferredRequestHeaders.STOP_PROCESS, ('10.0.0.2', 'dummy_process')) # test restart self.check_call(main_loop, mocked_loop, 'restart', DeferredRequestHeaders.RESTART, ('10.0.0.2', )) # test shutdown self.check_call(main_loop, mocked_loop, 'shutdown', DeferredRequestHeaders.SHUTDOWN, ('10.0.0.2', ))
def test_on_remote_event(self): """ Test the reception of a Supervisor remote comm event. """ from supvisors.listener import SupervisorListener listener = SupervisorListener(self.supvisors) # add patches for what is tested just above with patch.multiple(listener, unstack_event=DEFAULT, unstack_info=DEFAULT, authorization=DEFAULT): # test unknown type event = Mock(type='unknown', data='') listener.on_remote_event(event) self.assertFalse(listener.unstack_event.called) self.assertFalse(listener.unstack_info.called) self.assertFalse(listener.authorization.called) # test event event = Mock(type='event', data={'state': 'RUNNING'}) listener.on_remote_event(event) self.assertEqual([call({'state': 'RUNNING'})], listener.unstack_event.call_args_list) self.assertFalse(listener.unstack_info.called) self.assertFalse(listener.authorization.called) listener.unstack_event.reset_mock() # test info event = Mock(type='info', data={'name': 'dummy_process'}) listener.on_remote_event(event) self.assertFalse(listener.unstack_event.called) self.assertEqual([call({'name': 'dummy_process'})], listener.unstack_info.call_args_list) self.assertFalse(listener.authorization.called) listener.unstack_info.reset_mock() # test authorization event = Mock(type='auth', data=('10.0.0.1', True)) listener.on_remote_event(event) self.assertFalse(listener.unstack_event.called) self.assertFalse(listener.unstack_info.called) self.assertEqual([call(('10.0.0.1', True))], listener.authorization.call_args_list)
def test_run(self): self.cls.reactor = Mock(spec_set=reactor) with patch.multiple( pbm, logger=DEFAULT, Site=DEFAULT, LoopingCall=DEFAULT, VaultRedirectorSite=DEFAULT ) as mod_mocks: with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, listentcp=DEFAULT, add_update_loop=DEFAULT, listentls=DEFAULT ) as cls_mocks: cls_mocks['get_active_node'].return_value = 'consul:1234' self.cls.run() assert self.cls.active_node_ip_port == 'consul:1234' assert mod_mocks['logger'].mock_calls == [ call.warning('Initial Vault active node: %s', 'consul:1234'), call.warning('Starting Twisted reactor (event loop)') ] assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)] assert mod_mocks['Site'].mock_calls == [ call(mod_mocks['VaultRedirectorSite'].return_value) ] assert self.cls.reactor.mock_calls == [] assert cls_mocks['run_reactor'].mock_calls == [call()] assert mod_mocks['LoopingCall'].mock_calls == [] assert cls_mocks['listentcp'].mock_calls == [ call(mod_mocks['Site'].return_value) ] assert cls_mocks['add_update_loop'].mock_calls == [call()] assert cls_mocks['listentls'].mock_calls == []
def test_run_tls(self): self.cls.reactor = Mock(spec_set=reactor) self.cls.tls_factory = Mock() with patch.multiple( pbm, logger=DEFAULT, Site=DEFAULT, LoopingCall=DEFAULT, VaultRedirectorSite=DEFAULT ) as mod_mocks: with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, listentcp=DEFAULT, add_update_loop=DEFAULT, listentls=DEFAULT ) as cls_mocks: cls_mocks['get_active_node'].return_value = 'consul:1234' self.cls.run() assert self.cls.active_node_ip_port == 'consul:1234' assert mod_mocks['logger'].mock_calls == [ call.warning('Initial Vault active node: %s', 'consul:1234'), call.warning('Starting Twisted reactor (event loop)') ] assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)] assert mod_mocks['Site'].mock_calls == [ call(mod_mocks['VaultRedirectorSite'].return_value) ] assert self.cls.reactor.mock_calls == [] assert cls_mocks['run_reactor'].mock_calls == [call()] assert mod_mocks['LoopingCall'].mock_calls == [] assert cls_mocks['listentls'].mock_calls == [ call(mod_mocks['Site'].return_value) ] assert cls_mocks['add_update_loop'].mock_calls == [call()] assert cls_mocks['listentcp'].mock_calls == []
def test_run_error(self): self.cls.reactor = Mock(spec_set=reactor) with patch.multiple( pbm, logger=DEFAULT, Site=DEFAULT, LoopingCall=DEFAULT, VaultRedirectorSite=DEFAULT ) as mod_mocks: with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, listentcp=DEFAULT, add_update_loop=DEFAULT ) as cls_mocks: cls_mocks['get_active_node'].return_value = None with pytest.raises(SystemExit) as excinfo: self.cls.run() assert excinfo.value.code == 3 assert mod_mocks['logger'].mock_calls == [ call.critical("ERROR: Could not get active vault node from " "Consul. Exiting.") ] assert mod_mocks['VaultRedirectorSite'].mock_calls == [] assert mod_mocks['Site'].mock_calls == [] assert self.cls.reactor.mock_calls == [] assert cls_mocks['run_reactor'].mock_calls == [] assert mod_mocks['LoopingCall'].mock_calls == []
def run_and_exit(cli_args=None, prompt_commands=None): """Run http-prompt executable, execute some prompt commands, and exit.""" if cli_args is None: cli_args = [] # Make sure last command is 'exit' if prompt_commands is None: prompt_commands = ['exit'] else: prompt_commands += ['exit'] # Fool cli() so that it believes we're running from CLI instead of pytest. # We will restore it at the end of the function. orig_argv = sys.argv sys.argv = ['http-prompt'] + cli_args try: with patch.multiple('http_prompt.cli', prompt=DEFAULT, execute=DEFAULT) as mocks: mocks['execute'].side_effect = execute # prompt() is mocked to return the command in 'prompt_commands' in # sequence, i.e., prompt() returns prompt_commands[i-1] when it is # called for the ith time mocks['prompt'].side_effect = prompt_commands result = CliRunner().invoke(cli, cli_args) context = mocks['execute'].call_args[0][1] return result, context finally: sys.argv = orig_argv
def test_train(self): with patch.multiple(self.ds, implicit=Mock(), explicit=Mock()): self.ds.train(None)
def test_train_0(self): with patch.multiple(self.wb, _extract_features=MOCK_DEFAULT, _model=MOCK_DEFAULT): self.wb.train(([], []), None)
def test_train_0(self): with patch("dsenser.wang.wangbase.GridSearchCV"): with patch.multiple(self.wb, _generate_ts=lambda *x: (self.TRAIN_X, Y), _model=MOCK_DEFAULT): self.wb.train(([], []), None)
def test_train_1(self): rels = [REL1] * NFOLDS parses = [PARSE1] * NFOLDS with patch("dsenser.wang.wangbase.GridSearchCV"): with patch.multiple(self.wb, _generate_ts=lambda *x: (self.TRAIN_X, Y), _model=MOCK_DEFAULT): self.wb.train((rels, parses), (rels, parses))
def test_get_train_w_emb_i(self): with patch.multiple(self.nnbs, w2emb_i={'1': 1, "hello": 2}, _w_stat={"world": 1, "z": 3}, w_i=3): with patch("dsenser.nnbase.UNK_PROB", MagicMock(return_value=True)): assert self.nnbs._get_train_w_emb_i("1024") == 1 assert self.nnbs._get_train_w_emb_i("HELLO") == 2 assert self.nnbs._get_train_w_emb_i("world") == \ self.nnbs.unk_w_i assert self.nnbs._get_train_w_emb_i("Z") == 3 assert self.nnbs.w_i == 4
def test_get_train_w2v_emb_i(self): with patch.multiple(self.nnbs, w2emb_i={'1': 1, "hello": 2}, w2v={"world": None}, w_i=3): assert self.nnbs._get_train_w2v_emb_i("1024") == 1 assert self.nnbs._get_train_w2v_emb_i("HELLO") == 2 assert self.nnbs._get_train_w2v_emb_i("world") == 3 assert self.nnbs._get_train_w2v_emb_i("Z") == self.nnbs.unk_w_i assert self.nnbs.w_i == 4
def test_get_test_w2v_emb_i(self): with patch.multiple(self.nnbs, w2emb_i={'1': 1, "hello": 2}, w2v={"world": VEC3}, W_EMB=np.vstack([VEC0, VEC1, VEC2])): assert np.allclose(self.nnbs._get_test_w2v_emb_i("Z"), VEC0) assert np.allclose(self.nnbs._get_test_w2v_emb_i("256"), VEC1) assert np.allclose(self.nnbs._get_test_w2v_emb_i("HELLO"), VEC2) assert np.allclose(self.nnbs._get_test_w2v_emb_i("world"), VEC3)
def test_get_train_c_emb_i_1(self): conn2emb = {} CONN = "USUALLY WHEN" with patch.multiple(self.nnbs, c_i=2, c2emb_i=conn2emb): ret = self.nnbs.get_train_c_emb_i(CONN) assert ret == 2 assert self.nnbs.c_i == 3
def test_init_w_emb(self): with patch.multiple(self.nnbs, _params=[], ndim=3): self.nnbs._init_w_emb() assert len(self.nnbs._params) == 1
def test_init_wemb_funcs_1(self): word2vec = Mock() with patch.multiple(self.nnbs, _plain_w2v=True, w2v=None, _trained=False): with patch.object(dsenser.nnbase, "Word2Vec", word2vec): self.nnbs._init_wemb_funcs() assert self.nnbs.get_test_w_emb_i == \ self.nnbs._get_train_w2v_emb_i
def test_init_funcs_0(self): rmsmock = MagicMock(return_value=(1, 2, 3)) theano_mock = Mock() theano_mock.function = MagicMock() with patch.multiple(dsenser.nnbase, rmsprop=rmsmock, theano=theano_mock): self.nnbs._init_funcs(True) assert rmsmock.called
def test_nrpe_dependency_installed(self): with patch.multiple(ceph_hooks, apt_install=DEFAULT, rsync=DEFAULT, log=DEFAULT, write_file=DEFAULT, nrpe=DEFAULT) as mocks: ceph_hooks.update_nrpe_config() mocks["apt_install"].assert_called_once_with( ["python-dbus", "lockfile-progs"])
def test_cached(self): mock_t = Mock() mock_std = Mock() mock_stpp = Mock() mock_stm = Mock() mock_mct = Mock() mock_mbs = Mock() mock_mos = Mock() with patch.multiple( pb, autospec=True, _transactions=DEFAULT, _scheduled_transactions_date=DEFAULT, _scheduled_transactions_per_period=DEFAULT, _scheduled_transactions_monthly=DEFAULT, _make_combined_transactions=DEFAULT, _make_budget_sums=DEFAULT, _make_overall_sums=DEFAULT ) as mocks: mocks['_transactions'].return_value.all.return_value = mock_t mocks['_scheduled_transactions_date' ''].return_value.all.return_value = mock_std mocks['_scheduled_transactions_per_period' ''].return_value.all.return_value = mock_stpp mocks['_scheduled_transactions_monthly' ''].return_value.all.return_value = mock_stm mocks['_make_combined_transactions'].return_value = mock_mct mocks['_make_budget_sums'].return_value = mock_mbs mocks['_make_overall_sums'].return_value = mock_mos self.cls._data_cache = {'foo': 'bar'} res = self.cls._data assert res == {'foo': 'bar'} assert mocks['_transactions'].mock_calls == [] assert mocks['_scheduled_transactions_date'].mock_calls == [] assert mocks['_scheduled_transactions_per_period'].mock_calls == [] assert mocks['_scheduled_transactions_monthly'].mock_calls == [] assert mocks['_make_combined_transactions'].mock_calls == [] assert mocks['_make_budget_sums'].mock_calls == [] assert mocks['_make_overall_sums'].mock_calls == []
def test_rebuild_indices(self): with patch.multiple( Command, _create=DEFAULT, _delete=DEFAULT, _populate=DEFAULT ) as handles: handles['_delete'].return_value = True call_command('search_index', stdout=self.out, action='rebuild') handles['_delete'].assert_called() handles['_create'].assert_called() handles['_populate'].assert_called()
def test_rebuild_indices_aborted(self): with patch.multiple( Command, _create=DEFAULT, _delete=DEFAULT, _populate=DEFAULT ) as handles: handles['_delete'].return_value = False call_command('search_index', stdout=self.out, action='rebuild') handles['_delete'].assert_called() handles['_create'].assert_not_called() handles['_populate'].assert_not_called()
def test_force_preemptive(self): with patch.multiple(kerberos_module_name, authGSSClientInit=clientInit_complete, authGSSClientResponse=clientResponse, authGSSClientStep=clientStep_continue): auth = requests_kerberos.HTTPKerberosAuth(force_preemptive=True) request = requests.Request(url="http://www.example.org") auth.__call__(request) self.assertTrue('Authorization' in request.headers) self.assertEqual(request.headers.get('Authorization'), 'Negotiate GSSRESPONSE')
def test_no_force_preemptive(self): with patch.multiple(kerberos_module_name, authGSSClientInit=clientInit_complete, authGSSClientResponse=clientResponse, authGSSClientStep=clientStep_continue): auth = requests_kerberos.HTTPKerberosAuth() request = requests.Request(url="http://www.example.org") auth.__call__(request) self.assertTrue('Authorization' not in request.headers)
def test_with_statement(self): with patch.multiple(WireMockServer, start=DEFAULT, stop=DEFAULT) as mocks: with WireMockServer() as wm: self.assertIsInstance(wm, WireMockServer) mocks['start'].assert_called_once_with() mocks['stop'].assert_called_once_with()
def test_connectionbase(): # Temporarily disable ABC checks with patch.multiple(ConnectionBase, __abstractmethods__=set()): with pytest.raises(TypeError): ConnectionBase() c = ConnectionBase('project') assert c.project_name == 'project' assert not c.project_namespace assert c.project_qualname == 'project' c = ConnectionBase('ns/project') assert c.project_name == 'project' assert c.project_namespace == 'ns' assert c.project_qualname == 'ns/project'
def test_acceptance(self): logger.debug('starting acceptance test') with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, update_active_node=DEFAULT ) as cls_mocks: # setup some return values cls_mocks['run_reactor'].side_effect = self.se_run_reactor cls_mocks['get_active_node'].return_value = 'foo:1234' cls_mocks['update_active_node'].side_effect = self.se_update_active # instantiate class self.cls = VaultRedirector('consul:1234', poll_interval=0.5) self.cls.log_enabled = False # make sure active is None (starting state) assert self.cls.active_node_ip_port is None self.cls.bind_port = self.get_open_port() self.cls.log_enabled = True # setup an async task to make the HTTP request self.cls.reactor.callLater(2.0, self.se_requester) # do this in case the callLater for self.stop_reactor fails... signal.signal(signal.SIGALRM, self.stop_reactor) signal.alarm(20) # send SIGALRM in 20 seconds, to stop runaway loop self.cls.run() signal.alarm(0) # disable SIGALRM assert self.cls.active_node_ip_port == 'bar:5678' # from update_active assert self.update_active_called is True assert cls_mocks['update_active_node'].mock_calls[0] == call() assert cls_mocks['run_reactor'].mock_calls == [call()] assert cls_mocks['get_active_node'].mock_calls == [call()] # HTTP response checks resp = json.loads(self.response) # /bar/baz assert resp['/bar/baz']['headers'][ 'Server'] == "vault-redirector/%s/TwistedWeb/%s" % ( _VERSION, twisted_version.short() ) assert resp['/bar/baz']['headers'][ 'Location'] == 'http://bar:5678/bar/baz' assert resp['/bar/baz']['status_code'] == 307 # /vault-redirector-health assert resp['/vault-redirector-health']['status_code'] == 200 health_info = json.loads(resp['/vault-redirector-health']['text']) assert resp['/vault-redirector-health']['headers'][ 'Content-Type'] == 'application/json' assert health_info['healthy'] is True assert health_info['application'] == 'vault-redirector' assert health_info['version'] == _VERSION assert health_info['source'] == _PROJECT_URL assert health_info['consul_host_port'] == 'consul:1234' assert health_info['active_vault'] == 'bar:5678'
def test_initial(self): mock_t = Mock() mock_std = Mock() mock_stpp = Mock() mock_stm = Mock() mock_mct = Mock() mock_mbs = Mock() mock_mos = Mock() with patch.multiple( pb, autospec=True, _transactions=DEFAULT, _scheduled_transactions_date=DEFAULT, _scheduled_transactions_per_period=DEFAULT, _scheduled_transactions_monthly=DEFAULT, _make_combined_transactions=DEFAULT, _make_budget_sums=DEFAULT, _make_overall_sums=DEFAULT ) as mocks: mocks['_transactions'].return_value.all.return_value = mock_t mocks['_scheduled_transactions_date' ''].return_value.all.return_value = mock_std mocks['_scheduled_transactions_per_period' ''].return_value.all.return_value = mock_stpp mocks['_scheduled_transactions_monthly' ''].return_value.all.return_value = mock_stm mocks['_make_combined_transactions'].return_value = mock_mct mocks['_make_budget_sums'].return_value = mock_mbs mocks['_make_overall_sums'].return_value = mock_mos res = self.cls._data assert res == { 'transactions': mock_t, 'st_date': mock_std, 'st_per_period': mock_stpp, 'st_monthly': mock_stm, 'all_trans_list': mock_mct, 'budget_sums': mock_mbs, 'overall_sums': mock_mos } assert mocks['_transactions'].mock_calls == [ call(self.cls), call().all() ] assert mocks['_scheduled_transactions_date'].mock_calls == [ call(self.cls), call().all() ] assert mocks['_scheduled_transactions_per_period'].mock_calls == [ call(self.cls), call().all() ] assert mocks['_scheduled_transactions_monthly'].mock_calls == [ call(self.cls), call().all() ] assert mocks['_make_combined_transactions'].mock_calls == [ call(self.cls) ] assert mocks['_make_budget_sums'].mock_calls == [ call(self.cls) ] assert mocks['_make_overall_sums'].mock_calls == [ call(self.cls) ]
def test_base(self): class MockGroup(MagicMock): def __init__(self, name): super(MockGroup, self).__init__() self.name = name hosts = [] vars = {} def __repr__(self): return str(self.name) group_all = MockGroup("all") compute = MockGroup("compute") controller = MockGroup("controller") ungrouped = MockGroup("ungrouped") group_all.hosts = ['localhost'] compute.hosts = ['localhost'] compute.vars = {'localhost': { "ansible_connection": "local", "neutron_physical_interface_mappings": "vlan:enp0s8," "external:enp0s3", "cinder_disk": "/dev/sdc"}} controller.hosts = ['localhost'] controller.vars = {'localhost': { "ansible_connection": "local" }} mocked_inv = MagicMock() mocked_inv.groups = {'all': group_all, 'compute': compute, 'controller': controller, 'ungrouped': ungrouped} mocked_dl = MagicMock() inventory_path = "NonePath" with patch.multiple('inventory.dynlxc', InventoryParser=mocked_inv, DataLoader=mocked_dl): res = dynlxc.read_inventory_file(inventory_path) mocked_dl.assert_called_once_with() assert "all" in res assert "_meta" in res assert "hostvars" in res["_meta"] assert "groupvars" in res["_meta"] assert "localhost" in res["all"] assert "compute" in res assert "localhost" in res["compute"]["hosts"]