我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用unittest.mock.patch.multiple()。
def test_env_only_calls_get(): context = Context({ 'key1': 'value1', 'key2': 'value2', 'key3': 'value3', 'envGet': { 'key2': 'ARB_GET_ME1', 'key4': 'ARB_GET_ME2' } }) with patch.multiple('pypyr.steps.env', env_get=DEFAULT, env_set=DEFAULT, env_unset=DEFAULT ) as mock_env: pypyr.steps.env.run_step(context) mock_env['env_get'].assert_called_once() mock_env['env_set'].assert_not_called() mock_env['env_unset'].assert_not_called()
def test_env_only_calls_set(): context = Context({ 'key1': 'value1', 'key2': 'value2', 'key3': 'value3', 'envSet': { 'ARB_SET_ME1': 'key2', 'ARB_SET_ME2': 'key1' } }) with patch.multiple('pypyr.steps.env', env_get=DEFAULT, env_set=DEFAULT, env_unset=DEFAULT ) as mock_env: pypyr.steps.env.run_step(context) mock_env['env_get'].assert_not_called() mock_env['env_set'].assert_called_once() mock_env['env_unset'].assert_not_called()
def test_env_only_calls_unset(): context = Context({ 'key1': 'value1', 'key2': 'value2', 'key3': 'value3', 'envUnset': [ 'ARB_DELETE_ME1', 'ARB_DELETE_ME2' ] }) with patch.multiple('pypyr.steps.env', env_get=DEFAULT, env_set=DEFAULT, env_unset=DEFAULT ) as mock_env: pypyr.steps.env.run_step(context) mock_env['env_get'].assert_not_called() mock_env['env_set'].assert_not_called() mock_env['env_unset'].assert_called_once()
def test_env_only_calls_set_unset(): context = Context({ 'key1': 'value1', 'key2': 'value2', 'key3': 'value3', 'envUnset': [ 'ARB_DELETE_ME1', 'ARB_DELETE_ME2' ], 'envSet': { 'ARB_SET_ME1': 'key2', 'ARB_SET_ME2': 'key1' } }) with patch.multiple('pypyr.steps.env', env_get=DEFAULT, env_set=DEFAULT, env_unset=DEFAULT ) as mock_env: pypyr.steps.env.run_step(context) mock_env['env_get'].assert_not_called() mock_env['env_set'].assert_called_once() mock_env['env_unset'].assert_called_once()
def test_tar_only_calls_extract(): """Only calls extract if only extract specified.""" context = Context({ 'key1': 'value1', 'key2': 'value2', 'key3': 'value3', 'tarExtract': [ {'in': 'key2', 'out': 'ARB_GET_ME1'}, {'in': 'key4', 'out': 'ARB_GET_ME2'} ] }) with patch.multiple('pypyr.steps.tar', tar_archive=DEFAULT, tar_extract=DEFAULT ) as mock_tar: pypyr.steps.tar.run_step(context) mock_tar['tar_extract'].assert_called_once() mock_tar['tar_archive'].assert_not_called()
def test_tar_calls_archive_and_extract(): """Calls both extract and archive when both specified.""" context = Context({ 'key2': 'value2', 'key1': 'value1', 'key3': 'value3', 'tarArchive': [ {'in': 'key2', 'out': 'ARB_GET_ME1'}, {'in': 'key4', 'out': 'ARB_GET_ME2'} ], 'tarExtract': [ {'in': 'key2', 'out': 'ARB_GET_ME1'}, {'in': 'key4', 'out': 'ARB_GET_ME2'} ] }) with patch.multiple('pypyr.steps.tar', tar_archive=DEFAULT, tar_extract=DEFAULT ) as mock_tar: pypyr.steps.tar.run_step(context) mock_tar['tar_extract'].assert_called_once() mock_tar['tar_archive'].assert_called_once() # ------------------------- tar base ---------------------------------------# # # ------------------------- tar extract---------------------------------------#
def test_run(self): self.cls.reactor = Mock(spec_set=reactor) with patch.multiple( pbm, logger=DEFAULT, Site=DEFAULT, LoopingCall=DEFAULT, VaultRedirectorSite=DEFAULT ) as mod_mocks: with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, listentcp=DEFAULT, add_update_loop=DEFAULT, listentls=DEFAULT ) as cls_mocks: cls_mocks['get_active_node'].return_value = 'consul:1234' self.cls.run() assert self.cls.active_node_ip_port == 'consul:1234' assert mod_mocks['logger'].mock_calls == [ call.warning('Initial Vault active node: %s', 'consul:1234'), call.warning('Starting Twisted reactor (event loop)') ] assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)] assert mod_mocks['Site'].mock_calls == [ call(mod_mocks['VaultRedirectorSite'].return_value) ] assert self.cls.reactor.mock_calls == [] assert cls_mocks['run_reactor'].mock_calls == [call()] assert mod_mocks['LoopingCall'].mock_calls == [] assert cls_mocks['listentcp'].mock_calls == [ call(mod_mocks['Site'].return_value) ] assert cls_mocks['add_update_loop'].mock_calls == [call()] assert cls_mocks['listentls'].mock_calls == []
def test_run_tls(self): self.cls.reactor = Mock(spec_set=reactor) self.cls.tls_factory = Mock() with patch.multiple( pbm, logger=DEFAULT, Site=DEFAULT, LoopingCall=DEFAULT, VaultRedirectorSite=DEFAULT ) as mod_mocks: with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, listentcp=DEFAULT, add_update_loop=DEFAULT, listentls=DEFAULT ) as cls_mocks: cls_mocks['get_active_node'].return_value = 'consul:1234' self.cls.run() assert self.cls.active_node_ip_port == 'consul:1234' assert mod_mocks['logger'].mock_calls == [ call.warning('Initial Vault active node: %s', 'consul:1234'), call.warning('Starting Twisted reactor (event loop)') ] assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)] assert mod_mocks['Site'].mock_calls == [ call(mod_mocks['VaultRedirectorSite'].return_value) ] assert self.cls.reactor.mock_calls == [] assert cls_mocks['run_reactor'].mock_calls == [call()] assert mod_mocks['LoopingCall'].mock_calls == [] assert cls_mocks['listentls'].mock_calls == [ call(mod_mocks['Site'].return_value) ] assert cls_mocks['add_update_loop'].mock_calls == [call()] assert cls_mocks['listentcp'].mock_calls == []
def test_run_error(self): self.cls.reactor = Mock(spec_set=reactor) with patch.multiple( pbm, logger=DEFAULT, Site=DEFAULT, LoopingCall=DEFAULT, VaultRedirectorSite=DEFAULT ) as mod_mocks: with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, listentcp=DEFAULT, add_update_loop=DEFAULT ) as cls_mocks: cls_mocks['get_active_node'].return_value = None with pytest.raises(SystemExit) as excinfo: self.cls.run() assert excinfo.value.code == 3 assert mod_mocks['logger'].mock_calls == [ call.critical("ERROR: Could not get active vault node from " "Consul. Exiting.") ] assert mod_mocks['VaultRedirectorSite'].mock_calls == [] assert mod_mocks['Site'].mock_calls == [] assert self.cls.reactor.mock_calls == [] assert cls_mocks['run_reactor'].mock_calls == [] assert mod_mocks['LoopingCall'].mock_calls == []
def test_cached(self): mock_t = Mock() mock_std = Mock() mock_stpp = Mock() mock_stm = Mock() mock_mct = Mock() mock_mbs = Mock() mock_mos = Mock() with patch.multiple( pb, autospec=True, _transactions=DEFAULT, _scheduled_transactions_date=DEFAULT, _scheduled_transactions_per_period=DEFAULT, _scheduled_transactions_monthly=DEFAULT, _make_combined_transactions=DEFAULT, _make_budget_sums=DEFAULT, _make_overall_sums=DEFAULT ) as mocks: mocks['_transactions'].return_value.all.return_value = mock_t mocks['_scheduled_transactions_date' ''].return_value.all.return_value = mock_std mocks['_scheduled_transactions_per_period' ''].return_value.all.return_value = mock_stpp mocks['_scheduled_transactions_monthly' ''].return_value.all.return_value = mock_stm mocks['_make_combined_transactions'].return_value = mock_mct mocks['_make_budget_sums'].return_value = mock_mbs mocks['_make_overall_sums'].return_value = mock_mos self.cls._data_cache = {'foo': 'bar'} res = self.cls._data assert res == {'foo': 'bar'} assert mocks['_transactions'].mock_calls == [] assert mocks['_scheduled_transactions_date'].mock_calls == [] assert mocks['_scheduled_transactions_per_period'].mock_calls == [] assert mocks['_scheduled_transactions_monthly'].mock_calls == [] assert mocks['_make_combined_transactions'].mock_calls == [] assert mocks['_make_budget_sums'].mock_calls == [] assert mocks['_make_overall_sums'].mock_calls == []
def test_upload_journal_logs(self): ''' test upload_journal_logs() ''' tskey = '__REALTIME_TIMESTAMP' journal = systemd.journal.Reader(path=os.getcwd()) with patch('systemd.journal.Reader', return_value=journal) as journal: with patch('main.JournaldClient', MagicMock(autospec=True)) as reader: reader.return_value.__iter__.return_value = [sentinel.msg1, sentinel.msg2, sentinel.msg3, sentinel.msg4] with patch.multiple(self.client, retain_message=DEFAULT, group_messages=DEFAULT): log_group1 = Mock() log_group2 = Mock() self.client.retain_message.side_effect = [True, False, True, True] self.client.group_messages.return_value = [ ((log_group1, 'stream1'), [sentinel.msg1]), ((log_group2, 'stream2'), [sentinel.msg3, sentinel.msg4]), ] self.client.upload_journal_logs(os.getcwd()) # creates reader reader.assert_called_once_with(journal.return_value, self.CURSOR_CONTENT) # uploads log messages log_group1.log_messages.assert_called_once_with('stream1', [sentinel.msg1]) log_group2.log_messages.assert_called_once_with('stream2', [sentinel.msg3, sentinel.msg4])
def test_connectionbase(): # Temporarily disable ABC checks with patch.multiple(ConnectionBase, __abstractmethods__=set()): with pytest.raises(TypeError): ConnectionBase() c = ConnectionBase('project') assert c.project_name == 'project' assert not c.project_namespace assert c.project_qualname == 'project' c = ConnectionBase('ns/project') assert c.project_name == 'project' assert c.project_namespace == 'ns' assert c.project_qualname == 'ns/project'
def test_acceptance(self): logger.debug('starting acceptance test') with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, update_active_node=DEFAULT ) as cls_mocks: # setup some return values cls_mocks['run_reactor'].side_effect = self.se_run_reactor cls_mocks['get_active_node'].return_value = 'foo:1234' cls_mocks['update_active_node'].side_effect = self.se_update_active # instantiate class self.cls = VaultRedirector('consul:1234', poll_interval=0.5) self.cls.log_enabled = False # make sure active is None (starting state) assert self.cls.active_node_ip_port is None self.cls.bind_port = self.get_open_port() self.cls.log_enabled = True # setup an async task to make the HTTP request self.cls.reactor.callLater(2.0, self.se_requester) # do this in case the callLater for self.stop_reactor fails... signal.signal(signal.SIGALRM, self.stop_reactor) signal.alarm(20) # send SIGALRM in 20 seconds, to stop runaway loop self.cls.run() signal.alarm(0) # disable SIGALRM assert self.cls.active_node_ip_port == 'bar:5678' # from update_active assert self.update_active_called is True assert cls_mocks['update_active_node'].mock_calls[0] == call() assert cls_mocks['run_reactor'].mock_calls == [call()] assert cls_mocks['get_active_node'].mock_calls == [call()] # HTTP response checks resp = json.loads(self.response) # /bar/baz assert resp['/bar/baz']['headers'][ 'Server'] == "vault-redirector/%s/TwistedWeb/%s" % ( _VERSION, twisted_version.short() ) assert resp['/bar/baz']['headers'][ 'Location'] == 'http://bar:5678/bar/baz' assert resp['/bar/baz']['status_code'] == 307 # /vault-redirector-health assert resp['/vault-redirector-health']['status_code'] == 200 health_info = json.loads(resp['/vault-redirector-health']['text']) assert resp['/vault-redirector-health']['headers'][ 'Content-Type'] == 'application/json' assert health_info['healthy'] is True assert health_info['application'] == 'vault-redirector' assert health_info['version'] == _VERSION assert health_info['source'] == _PROJECT_URL assert health_info['consul_host_port'] == 'consul:1234' assert health_info['active_vault'] == 'bar:5678'
def test_initial(self): mock_t = Mock() mock_std = Mock() mock_stpp = Mock() mock_stm = Mock() mock_mct = Mock() mock_mbs = Mock() mock_mos = Mock() with patch.multiple( pb, autospec=True, _transactions=DEFAULT, _scheduled_transactions_date=DEFAULT, _scheduled_transactions_per_period=DEFAULT, _scheduled_transactions_monthly=DEFAULT, _make_combined_transactions=DEFAULT, _make_budget_sums=DEFAULT, _make_overall_sums=DEFAULT ) as mocks: mocks['_transactions'].return_value.all.return_value = mock_t mocks['_scheduled_transactions_date' ''].return_value.all.return_value = mock_std mocks['_scheduled_transactions_per_period' ''].return_value.all.return_value = mock_stpp mocks['_scheduled_transactions_monthly' ''].return_value.all.return_value = mock_stm mocks['_make_combined_transactions'].return_value = mock_mct mocks['_make_budget_sums'].return_value = mock_mbs mocks['_make_overall_sums'].return_value = mock_mos res = self.cls._data assert res == { 'transactions': mock_t, 'st_date': mock_std, 'st_per_period': mock_stpp, 'st_monthly': mock_stm, 'all_trans_list': mock_mct, 'budget_sums': mock_mbs, 'overall_sums': mock_mos } assert mocks['_transactions'].mock_calls == [ call(self.cls), call().all() ] assert mocks['_scheduled_transactions_date'].mock_calls == [ call(self.cls), call().all() ] assert mocks['_scheduled_transactions_per_period'].mock_calls == [ call(self.cls), call().all() ] assert mocks['_scheduled_transactions_monthly'].mock_calls == [ call(self.cls), call().all() ] assert mocks['_make_combined_transactions'].mock_calls == [ call(self.cls) ] assert mocks['_make_budget_sums'].mock_calls == [ call(self.cls) ] assert mocks['_make_overall_sums'].mock_calls == [ call(self.cls) ]
def patch_jupyter_dirs(): """ Patch jupyter paths to use temporary directories. This just creates the patches and directories, caller is still responsible for starting & stopping patches, and removing temp dir when appropriate. """ test_dir = tempfile.mkdtemp(prefix='jupyter_') jupyter_dirs = {name: make_dirs(test_dir, name) for name in ( 'user_home', 'env_vars', 'system', 'sys_prefix', 'custom', 'server')} jupyter_dirs['root'] = test_dir for name in ('notebook', 'runtime'): d = jupyter_dirs['server'][name] = os.path.join( test_dir, 'server', name) if not os.path.exists(d): os.makedirs(d) # patch relevant environment variables jupyter_patches = [] jupyter_patches.append( patch.dict('os.environ', stringify_env({ 'HOME': jupyter_dirs['user_home']['root'], 'JUPYTER_CONFIG_DIR': jupyter_dirs['env_vars']['conf'], 'JUPYTER_DATA_DIR': jupyter_dirs['env_vars']['data'], 'JUPYTER_RUNTIME_DIR': jupyter_dirs['server']['runtime'], }))) # patch jupyter path variables in various modules # find the appropriate modules to patch according to compat. # Should include either # notebook.nbextensions # or # jupyter_contrib_core.notebook_compat._compat.nbextensions modules_to_patch = set([ jupyter_core.paths, sys.modules[nbextensions._get_config_dir.__module__], sys.modules[nbextensions._get_nbextension_dir.__module__], ]) path_patches = dict( SYSTEM_CONFIG_PATH=[jupyter_dirs['system']['conf']], ENV_CONFIG_PATH=[jupyter_dirs['sys_prefix']['conf']], SYSTEM_JUPYTER_PATH=[jupyter_dirs['system']['data']], ENV_JUPYTER_PATH=[jupyter_dirs['sys_prefix']['data']], ) for mod in modules_to_patch: applicable_patches = { attrname: newval for attrname, newval in path_patches.items() if hasattr(mod, attrname)} jupyter_patches.append( patch.multiple(mod, **applicable_patches)) def remove_jupyter_dirs(): """Remove all temporary directories created.""" shutil.rmtree(test_dir) return jupyter_patches, jupyter_dirs, remove_jupyter_dirs