我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用unittest.mock.DEFAULT。
def test_env_only_calls_get(): context = Context({ 'key1': 'value1', 'key2': 'value2', 'key3': 'value3', 'envGet': { 'key2': 'ARB_GET_ME1', 'key4': 'ARB_GET_ME2' } }) with patch.multiple('pypyr.steps.env', env_get=DEFAULT, env_set=DEFAULT, env_unset=DEFAULT ) as mock_env: pypyr.steps.env.run_step(context) mock_env['env_get'].assert_called_once() mock_env['env_set'].assert_not_called() mock_env['env_unset'].assert_not_called()
def test_env_only_calls_set(): context = Context({ 'key1': 'value1', 'key2': 'value2', 'key3': 'value3', 'envSet': { 'ARB_SET_ME1': 'key2', 'ARB_SET_ME2': 'key1' } }) with patch.multiple('pypyr.steps.env', env_get=DEFAULT, env_set=DEFAULT, env_unset=DEFAULT ) as mock_env: pypyr.steps.env.run_step(context) mock_env['env_get'].assert_not_called() mock_env['env_set'].assert_called_once() mock_env['env_unset'].assert_not_called()
def test_env_only_calls_unset(): context = Context({ 'key1': 'value1', 'key2': 'value2', 'key3': 'value3', 'envUnset': [ 'ARB_DELETE_ME1', 'ARB_DELETE_ME2' ] }) with patch.multiple('pypyr.steps.env', env_get=DEFAULT, env_set=DEFAULT, env_unset=DEFAULT ) as mock_env: pypyr.steps.env.run_step(context) mock_env['env_get'].assert_not_called() mock_env['env_set'].assert_not_called() mock_env['env_unset'].assert_called_once()
def test_env_only_calls_set_unset(): context = Context({ 'key1': 'value1', 'key2': 'value2', 'key3': 'value3', 'envUnset': [ 'ARB_DELETE_ME1', 'ARB_DELETE_ME2' ], 'envSet': { 'ARB_SET_ME1': 'key2', 'ARB_SET_ME2': 'key1' } }) with patch.multiple('pypyr.steps.env', env_get=DEFAULT, env_set=DEFAULT, env_unset=DEFAULT ) as mock_env: pypyr.steps.env.run_step(context) mock_env['env_get'].assert_not_called() mock_env['env_set'].assert_called_once() mock_env['env_unset'].assert_called_once()
def test_tar_only_calls_extract(): """Only calls extract if only extract specified.""" context = Context({ 'key1': 'value1', 'key2': 'value2', 'key3': 'value3', 'tarExtract': [ {'in': 'key2', 'out': 'ARB_GET_ME1'}, {'in': 'key4', 'out': 'ARB_GET_ME2'} ] }) with patch.multiple('pypyr.steps.tar', tar_archive=DEFAULT, tar_extract=DEFAULT ) as mock_tar: pypyr.steps.tar.run_step(context) mock_tar['tar_extract'].assert_called_once() mock_tar['tar_archive'].assert_not_called()
def test_tar_calls_archive_and_extract(): """Calls both extract and archive when both specified.""" context = Context({ 'key2': 'value2', 'key1': 'value1', 'key3': 'value3', 'tarArchive': [ {'in': 'key2', 'out': 'ARB_GET_ME1'}, {'in': 'key4', 'out': 'ARB_GET_ME2'} ], 'tarExtract': [ {'in': 'key2', 'out': 'ARB_GET_ME1'}, {'in': 'key4', 'out': 'ARB_GET_ME2'} ] }) with patch.multiple('pypyr.steps.tar', tar_archive=DEFAULT, tar_extract=DEFAULT ) as mock_tar: pypyr.steps.tar.run_step(context) mock_tar['tar_extract'].assert_called_once() mock_tar['tar_archive'].assert_called_once() # ------------------------- tar base ---------------------------------------# # # ------------------------- tar extract---------------------------------------#
def patch( target, new=mock.DEFAULT, spec=None, create=False, mocksignature=False, spec_set=None, autospec=False, new_callable=None, **kwargs ): """Mocks an async function. Should be a drop-in replacement for mock.patch that handles async automatically. The .asynq attribute is automatically created and shouldn't be used when accessing data on the mock. """ getter, attribute = _get_target(target) return _make_patch_async( getter, attribute, new, spec, create, mocksignature, spec_set, autospec, new_callable, kwargs )
def test_run(self, mCreateSession, mSample): mSample.return_value = 'XXX' iSession = MockSession() mCreateSession.return_value = (iSession, '123456') client = iSession.client('stepfunctions') client.list_activities.return_value = { 'activities':[{ 'name': 'name', 'activityArn': 'XXX' }] } client.get_activity_task.return_value = { 'taskToken': 'YYY', 'input': '{}' } target = mock.MagicMock() activity = ActivityMixin(handle_task = target) def stop_loop(*args, **kwargs): activity.polling = False return mock.DEFAULT target.side_effect = stop_loop activity.run('name') calls = [ mock.call.list_activities(), mock.call.get_activity_task(activityArn = 'XXX', workerName = 'name-XXX') ] self.assertEqual(client.mock_calls, calls) calls = [ mock.call('YYY', {}), mock.call().start() ] self.assertEqual(target.mock_calls, calls)
def test_run(self): self.cls.reactor = Mock(spec_set=reactor) with patch.multiple( pbm, logger=DEFAULT, Site=DEFAULT, LoopingCall=DEFAULT, VaultRedirectorSite=DEFAULT ) as mod_mocks: with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, listentcp=DEFAULT, add_update_loop=DEFAULT, listentls=DEFAULT ) as cls_mocks: cls_mocks['get_active_node'].return_value = 'consul:1234' self.cls.run() assert self.cls.active_node_ip_port == 'consul:1234' assert mod_mocks['logger'].mock_calls == [ call.warning('Initial Vault active node: %s', 'consul:1234'), call.warning('Starting Twisted reactor (event loop)') ] assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)] assert mod_mocks['Site'].mock_calls == [ call(mod_mocks['VaultRedirectorSite'].return_value) ] assert self.cls.reactor.mock_calls == [] assert cls_mocks['run_reactor'].mock_calls == [call()] assert mod_mocks['LoopingCall'].mock_calls == [] assert cls_mocks['listentcp'].mock_calls == [ call(mod_mocks['Site'].return_value) ] assert cls_mocks['add_update_loop'].mock_calls == [call()] assert cls_mocks['listentls'].mock_calls == []
def test_run_tls(self): self.cls.reactor = Mock(spec_set=reactor) self.cls.tls_factory = Mock() with patch.multiple( pbm, logger=DEFAULT, Site=DEFAULT, LoopingCall=DEFAULT, VaultRedirectorSite=DEFAULT ) as mod_mocks: with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, listentcp=DEFAULT, add_update_loop=DEFAULT, listentls=DEFAULT ) as cls_mocks: cls_mocks['get_active_node'].return_value = 'consul:1234' self.cls.run() assert self.cls.active_node_ip_port == 'consul:1234' assert mod_mocks['logger'].mock_calls == [ call.warning('Initial Vault active node: %s', 'consul:1234'), call.warning('Starting Twisted reactor (event loop)') ] assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)] assert mod_mocks['Site'].mock_calls == [ call(mod_mocks['VaultRedirectorSite'].return_value) ] assert self.cls.reactor.mock_calls == [] assert cls_mocks['run_reactor'].mock_calls == [call()] assert mod_mocks['LoopingCall'].mock_calls == [] assert cls_mocks['listentls'].mock_calls == [ call(mod_mocks['Site'].return_value) ] assert cls_mocks['add_update_loop'].mock_calls == [call()] assert cls_mocks['listentcp'].mock_calls == []
def test_run_error(self): self.cls.reactor = Mock(spec_set=reactor) with patch.multiple( pbm, logger=DEFAULT, Site=DEFAULT, LoopingCall=DEFAULT, VaultRedirectorSite=DEFAULT ) as mod_mocks: with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, listentcp=DEFAULT, add_update_loop=DEFAULT ) as cls_mocks: cls_mocks['get_active_node'].return_value = None with pytest.raises(SystemExit) as excinfo: self.cls.run() assert excinfo.value.code == 3 assert mod_mocks['logger'].mock_calls == [ call.critical("ERROR: Could not get active vault node from " "Consul. Exiting.") ] assert mod_mocks['VaultRedirectorSite'].mock_calls == [] assert mod_mocks['Site'].mock_calls == [] assert self.cls.reactor.mock_calls == [] assert cls_mocks['run_reactor'].mock_calls == [] assert mod_mocks['LoopingCall'].mock_calls == []
def testDEFAULT(self): self.assertTrue(DEFAULT is sentinel.DEFAULT)
def test_cached(self): mock_t = Mock() mock_std = Mock() mock_stpp = Mock() mock_stm = Mock() mock_mct = Mock() mock_mbs = Mock() mock_mos = Mock() with patch.multiple( pb, autospec=True, _transactions=DEFAULT, _scheduled_transactions_date=DEFAULT, _scheduled_transactions_per_period=DEFAULT, _scheduled_transactions_monthly=DEFAULT, _make_combined_transactions=DEFAULT, _make_budget_sums=DEFAULT, _make_overall_sums=DEFAULT ) as mocks: mocks['_transactions'].return_value.all.return_value = mock_t mocks['_scheduled_transactions_date' ''].return_value.all.return_value = mock_std mocks['_scheduled_transactions_per_period' ''].return_value.all.return_value = mock_stpp mocks['_scheduled_transactions_monthly' ''].return_value.all.return_value = mock_stm mocks['_make_combined_transactions'].return_value = mock_mct mocks['_make_budget_sums'].return_value = mock_mbs mocks['_make_overall_sums'].return_value = mock_mos self.cls._data_cache = {'foo': 'bar'} res = self.cls._data assert res == {'foo': 'bar'} assert mocks['_transactions'].mock_calls == [] assert mocks['_scheduled_transactions_date'].mock_calls == [] assert mocks['_scheduled_transactions_per_period'].mock_calls == [] assert mocks['_scheduled_transactions_monthly'].mock_calls == [] assert mocks['_make_combined_transactions'].mock_calls == [] assert mocks['_make_budget_sums'].mock_calls == [] assert mocks['_make_overall_sums'].mock_calls == []
def test_upload_journal_logs(self): ''' test upload_journal_logs() ''' tskey = '__REALTIME_TIMESTAMP' journal = systemd.journal.Reader(path=os.getcwd()) with patch('systemd.journal.Reader', return_value=journal) as journal: with patch('main.JournaldClient', MagicMock(autospec=True)) as reader: reader.return_value.__iter__.return_value = [sentinel.msg1, sentinel.msg2, sentinel.msg3, sentinel.msg4] with patch.multiple(self.client, retain_message=DEFAULT, group_messages=DEFAULT): log_group1 = Mock() log_group2 = Mock() self.client.retain_message.side_effect = [True, False, True, True] self.client.group_messages.return_value = [ ((log_group1, 'stream1'), [sentinel.msg1]), ((log_group2, 'stream2'), [sentinel.msg3, sentinel.msg4]), ] self.client.upload_journal_logs(os.getcwd()) # creates reader reader.assert_called_once_with(journal.return_value, self.CURSOR_CONTENT) # uploads log messages log_group1.log_messages.assert_called_once_with('stream1', [sentinel.msg1]) log_group2.log_messages.assert_called_once_with('stream2', [sentinel.msg3, sentinel.msg4])
def testDEFAULT(self): self.assertIs(DEFAULT, sentinel.DEFAULT)
def test_ioctl_fn_ptr_r(self, ioctl_mock): def _handle_ioctl(fd, request, int_ptr): assert fd == 12 assert request == 32 assert type(int_ptr) == ctypes.POINTER(ctypes.c_int) int_ptr.contents.value = 42 return mock.DEFAULT ioctl_mock.side_effect = _handle_ioctl fn = ioctl.ioctl_fn_ptr_r(32, ctypes.c_int) res = fn(12) assert res == 42
def test_ioctl_fn_ptr_w(self, ioctl_mock): def _handle_ioctl(fd, request, int_ptr): assert fd == 12 assert request == 32 assert type(int_ptr) == ctypes.POINTER(ctypes.c_int) assert int_ptr.contents.value == 42 return mock.DEFAULT ioctl_mock.side_effect = _handle_ioctl fn = ioctl.ioctl_fn_ptr_w(32, ctypes.c_int) fn(12, 42)
def test_ioctl_fn_ptr_wr(self, ioctl_mock): def _handle_ioctl(fd, request, int_ptr): assert fd == 12 assert request == 32 assert type(int_ptr) == ctypes.POINTER(ctypes.c_int) assert int_ptr.contents.value == 24 int_ptr.contents.value = 42 return mock.DEFAULT ioctl_mock.side_effect = _handle_ioctl fn = ioctl.ioctl_fn_ptr_wr(32, ctypes.c_int) res = fn(12, 24) assert res == 42
def test_ioctl_fn_w(self, ioctl_mock): def _handle_ioctl(fd, request, int_val): assert fd == 12 assert request == 32 assert type(int_val) == ctypes.c_int assert int_val.value == 42 return mock.DEFAULT ioctl_mock.side_effect = _handle_ioctl fn = ioctl.ioctl_fn_w(32, ctypes.c_int) fn(12, 42)
def _patch_object( target, attribute, new=mock.DEFAULT, spec=None, create=False, mocksignature=False, spec_set=None, autospec=False, new_callable=None, **kwargs ): getter = lambda: target return _make_patch_async( getter, attribute, new, spec, create, mocksignature, spec_set, autospec, new_callable, kwargs )
def _maybe_wrap_new(new): """If the mock replacement cannot have attributes set on it, wraps it in a function. Also, if the replacement object is a method, applies the async() decorator. This is needed so that we support patch(..., x.method) where x.method is an instancemethod object, because instancemethods do not support attribute assignment. """ if new is mock.DEFAULT: return new if inspect.isfunction(new) or isinstance(new, (classmethod, staticmethod)): return asynq(sync_fn=new)(new) elif not callable(new): return new try: new._maybe_wrap_new_test_attribute = None del new._maybe_wrap_new_test_attribute except (AttributeError, TypeError): # setting something on a bound method raises AttributeError, setting something on a # Cythonized class raises TypeError should_wrap = True else: should_wrap = False if should_wrap: # we can't just use a lambda because that overrides __get__ and creates bound methods we # don't want, so we make a wrapper class that overrides __call__ class Wrapper(object): def __call__(self, *args, **kwargs): return new(*args, **kwargs) return Wrapper() else: return new
def test_acceptance(self): logger.debug('starting acceptance test') with patch.multiple( pb, get_active_node=DEFAULT, run_reactor=DEFAULT, update_active_node=DEFAULT ) as cls_mocks: # setup some return values cls_mocks['run_reactor'].side_effect = self.se_run_reactor cls_mocks['get_active_node'].return_value = 'foo:1234' cls_mocks['update_active_node'].side_effect = self.se_update_active # instantiate class self.cls = VaultRedirector('consul:1234', poll_interval=0.5) self.cls.log_enabled = False # make sure active is None (starting state) assert self.cls.active_node_ip_port is None self.cls.bind_port = self.get_open_port() self.cls.log_enabled = True # setup an async task to make the HTTP request self.cls.reactor.callLater(2.0, self.se_requester) # do this in case the callLater for self.stop_reactor fails... signal.signal(signal.SIGALRM, self.stop_reactor) signal.alarm(20) # send SIGALRM in 20 seconds, to stop runaway loop self.cls.run() signal.alarm(0) # disable SIGALRM assert self.cls.active_node_ip_port == 'bar:5678' # from update_active assert self.update_active_called is True assert cls_mocks['update_active_node'].mock_calls[0] == call() assert cls_mocks['run_reactor'].mock_calls == [call()] assert cls_mocks['get_active_node'].mock_calls == [call()] # HTTP response checks resp = json.loads(self.response) # /bar/baz assert resp['/bar/baz']['headers'][ 'Server'] == "vault-redirector/%s/TwistedWeb/%s" % ( _VERSION, twisted_version.short() ) assert resp['/bar/baz']['headers'][ 'Location'] == 'http://bar:5678/bar/baz' assert resp['/bar/baz']['status_code'] == 307 # /vault-redirector-health assert resp['/vault-redirector-health']['status_code'] == 200 health_info = json.loads(resp['/vault-redirector-health']['text']) assert resp['/vault-redirector-health']['headers'][ 'Content-Type'] == 'application/json' assert health_info['healthy'] is True assert health_info['application'] == 'vault-redirector' assert health_info['version'] == _VERSION assert health_info['source'] == _PROJECT_URL assert health_info['consul_host_port'] == 'consul:1234' assert health_info['active_vault'] == 'bar:5678'
def test_initial(self): mock_t = Mock() mock_std = Mock() mock_stpp = Mock() mock_stm = Mock() mock_mct = Mock() mock_mbs = Mock() mock_mos = Mock() with patch.multiple( pb, autospec=True, _transactions=DEFAULT, _scheduled_transactions_date=DEFAULT, _scheduled_transactions_per_period=DEFAULT, _scheduled_transactions_monthly=DEFAULT, _make_combined_transactions=DEFAULT, _make_budget_sums=DEFAULT, _make_overall_sums=DEFAULT ) as mocks: mocks['_transactions'].return_value.all.return_value = mock_t mocks['_scheduled_transactions_date' ''].return_value.all.return_value = mock_std mocks['_scheduled_transactions_per_period' ''].return_value.all.return_value = mock_stpp mocks['_scheduled_transactions_monthly' ''].return_value.all.return_value = mock_stm mocks['_make_combined_transactions'].return_value = mock_mct mocks['_make_budget_sums'].return_value = mock_mbs mocks['_make_overall_sums'].return_value = mock_mos res = self.cls._data assert res == { 'transactions': mock_t, 'st_date': mock_std, 'st_per_period': mock_stpp, 'st_monthly': mock_stm, 'all_trans_list': mock_mct, 'budget_sums': mock_mbs, 'overall_sums': mock_mos } assert mocks['_transactions'].mock_calls == [ call(self.cls), call().all() ] assert mocks['_scheduled_transactions_date'].mock_calls == [ call(self.cls), call().all() ] assert mocks['_scheduled_transactions_per_period'].mock_calls == [ call(self.cls), call().all() ] assert mocks['_scheduled_transactions_monthly'].mock_calls == [ call(self.cls), call().all() ] assert mocks['_make_combined_transactions'].mock_calls == [ call(self.cls) ] assert mocks['_make_budget_sums'].mock_calls == [ call(self.cls) ] assert mocks['_make_overall_sums'].mock_calls == [ call(self.cls) ]