我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用unittest.mock.call()。
def test_update_details(debug, service): service.current_iteration = 1 service.project_version = 1 service._cached = {'foo': 'bar'} name = 'foo' responses.add( responses.GET, 'https://www.pivotaltracker.com/services/v5/projects/123', json={'current_iteration_number': 1, 'name': name}, adding_headers={'X-Tracker-Project-Version': '2'}, ) responses.add( responses.GET, 'https://www.pivotaltracker.com/services/v5/projects/123/iterations' '/1?fields=%3Adefault%2Cvelocity%2Cstories', json={'velocity': 10, 'stories': []} ) result = service.update() debug.assert_has_calls([ mock.call('fetching Tracker project data'), mock.call('project updated, fetching iteration details'), ]) assert result == dict(velocity=10, stories={}, name=name)
def test_iam_role_policy(resource_action, get_template, get_properties, get_details, construct_policy, session, attach_profile_to_role): """IAM Role Policy should match deployment type.""" get_properties.return_value = {'type': 'ec2'} get_details.return_value.iam.return_value = {'group': 1, 'policy': 2, 'profile': 3, 'role': 4, 'user': 5} assert create_iam_resources() get_template.assert_called_with(EC2_TEMPLATE_NAME) calls = [ mock.call( mock.ANY, action='create_role', log_format=mock.ANY, RoleName=mock.ANY, AssumeRolePolicyDocument=get_template.return_value) ] resource_action.assert_has_calls(calls)
def test_create_s3_event_multiple_buckets(mock_get_properties, mock_create_s3_event, mock_remove_perms, mock_boto3, mock_arn, mock_perms): """Try to create a lambda with two S3 triggers from different buckets.""" triggers = TRIGGERS_BUCKET_A + TRIGGERS_BUCKET_B properties = get_properties_with_triggers(triggers) mock_get_properties.return_value = properties mock_arn.return_value = 'fake_arn' events = LambdaEvent(app='test_app', env='test_env', region='us-east-1', prop_path='other') events.create_lambda_events() s3_calls = [ mock.call(app_name='test_app', env='test_env', region='us-east-1', bucket='my.shared.bucket', triggers=TRIGGERS_BUCKET_A), mock.call(app_name='test_app', env='test_env', region='us-east-1', bucket='my.other.shared.bucket', triggers=TRIGGERS_BUCKET_B) ] mock_create_s3_event.assert_has_calls(s3_calls, any_order=True)
def test_call(self, mocked_predict, mocked_load): mocked_load.return_value = 'model' settings = MagicMock() settings.UNIQUE_IDS = ['number'] settings.CLASSIFIERS = {'answer': 42, 'another': 13} core = Core(settings, self.adapter) core.suspicions = MagicMock() core() # assert load and predict was called for each classifier mocked_load.assert_has_calls((call(42), call(13)), any_order=True) mocked_predict.assert_has_calls(( call('model', 'answer'), call('model', 'another') ), any_order=True) # assert suspicions.xz was created expected_path = os.path.join('tmp', 'test', 'suspicions.xz') core.suspicions.to_csv.assert_called_once_with( expected_path, compression='xz', encoding='utf-8', index=False )
def test_interactive(self, mock_print, mock_markdown, mock_print_heading, mock_import_readline): def MockInputFactory(return_values): _counter = -1 def mock_input(prompt=''): nonlocal _counter _counter += 1 if _counter < len(return_values): return return_values[_counter] elif _counter == len(return_values): raise EOFError else: raise KeyboardInterrupt return mock_input return_values = ['foo', 'bar', 'baz'] with patch('builtins.input', MockInputFactory(return_values)): cli.interactive(sentinel.RendererCls) mock_import_readline.assert_called_with() mock_print_heading.assert_called_with(sentinel.RendererCls) mock_markdown.assert_called_with(['foo\n', 'bar\n', 'baz\n'], sentinel.RendererCls) calls = [call('\nrendered text', end=''), call('\nExiting.')] mock_print.assert_has_calls(calls)
def test_normal_execution(self, mock_isfile, mock_glob): """ Test the normal behaviour of the function. """ # Set the mocked function returned values. mock_isfile.side_effect = [True] mock_glob.return_value = ["/my/path/mock_output"] # Test execution fslreorient2std(**self.kwargs) self.assertEqual([ mock.call(["which", "fslreorient2std"], env={}, stderr=-1, stdout=-1), mock.call(["fslreorient2std", self.kwargs["input_image"], self.kwargs["output_image"]], cwd=None, env={}, stderr=-1, stdout=-1)], self.mock_popen.call_args_list) self.assertEqual(len(self.mock_env.call_args_list), 1)
def test_get_account_info(api): await api.get_account_info() assert api._make_request.call_args == call('get_account_info') # # class CoroutineContextManager(CoroutineMock): # async def __aexit__(self, *args, **kwargs): # pass # # async def __aenter__(self, *args, **kwargs): # return Mock() # # # async def test_make_request(bot_configuration, loop): # api = Api(bot_configuration, CoroutineContextManager(), loop) # api._logger = Mock() # await api._make_request(api.endpoints.SEND_MESSAGE)
def test_credentials(self, mSession): iSession = mSession.return_value = MockSession() access = 'XXX' secret = 'YYY' account = '123456' region = 'east' credentials = { 'aws_access_key': access, 'aws_secret_key': secret, 'aws_account_id': account, 'aws_region': region } session, account_id = utils.create_session(credentials = credentials) self.assertEqual(session, iSession) self.assertEqual(account_id, account) self.assertEqual(iSession.region, region) call = mock.call(aws_access_key_id = access, aws_secret_access_key = secret) calls = [call] self.assertEqual(mSession.mock_calls, calls)
def test_keys(self, mSession): iSession = mSession.return_value = MockSession() access = 'XXX' secret = 'YYY' account = '123456' region = 'east' credentials = { 'aws_access_key': access, 'aws_secret_key': secret, 'aws_account_id': account, 'aws_region': region } # Note: Same as test_credentials, except passing the arguments are key word args session, account_id = utils.create_session(**credentials) self.assertEqual(session, iSession) self.assertEqual(account_id, account) self.assertEqual(iSession.region, region) call = mock.call(aws_access_key_id = access, aws_secret_access_key = secret) calls = [call] self.assertEqual(mSession.mock_calls, calls)
def test_constructor(self, mCreateSession): iSession = MockSession() mCreateSession.return_value = (iSession, '123456') client = iSession.client('stepfunctions') client.list_state_machines.return_value = { 'stateMachines':[{ 'name': 'name', 'stateMachineArn': 'XXX' }] } machine = StateMachine('name') self.assertEqual(machine.arn, 'XXX') calls = [ mock.call.list_state_machines() ] self.assertEqual(client.mock_calls, calls)
def test_create_exists(self, mCreateSession): iSession = MockSession() mCreateSession.return_value = (iSession, '123456') client = iSession.client('stepfunctions') client.list_state_machines.return_value = { 'stateMachines':[{ 'name': 'name', 'stateMachineArn': 'XXX' }] } machine = StateMachine('name') with self.assertRaises(Exception): machine.create('source', 'role') self.assertEqual(machine.arn, 'XXX') calls = [ mock.call.list_state_machines(), ] self.assertEqual(client.mock_calls, calls)
def test_delete_exception(self, mCreateSession): iSession = MockSession() mCreateSession.return_value = (iSession, '123456') client = iSession.client('stepfunctions') client.list_state_machines.return_value = { 'stateMachines':[] } machine = StateMachine('name') with self.assertRaises(Exception): machine.delete(True) self.assertEqual(machine.arn, None) calls = [ mock.call.list_state_machines(), ] self.assertEqual(client.mock_calls, calls)
def test_start_doesnt_exist(self, mCreateSession): iSession = MockSession() mCreateSession.return_value = (iSession, '123456') client = iSession.client('stepfunctions') client.list_state_machines.return_value = { 'stateMachines':[] } machine = StateMachine('name') with self.assertRaises(Exception): machine.start({}, 'run') calls = [ mock.call.list_state_machines(), ] self.assertEqual(client.mock_calls, calls)
def test_stop(self, mCreateSession): iSession = MockSession() mCreateSession.return_value = (iSession, '123456') client = iSession.client('stepfunctions') client.list_state_machines.return_value = { 'stateMachines':[{ 'name': 'name', 'stateMachineArn': 'XXX' }] } machine = StateMachine('name') machine.stop('arn', 'error', 'cause') calls = [ mock.call.list_state_machines(), mock.call.stop_execution(executionArn = 'arn', error = 'error', cause = 'cause') ] self.assertEqual(client.mock_calls, calls)
def test_success(self, mCreateSession): iSession = MockSession() client = iSession.client('stepfunctions') mCreateSession.return_value = (iSession, '123456') task = TaskMixin() task.token = 'token' self.assertEqual(task.token, 'token') task.success(None) self.assertEqual(task.token, None) call = mock.call.send_task_success(taskToken = 'token', output = 'null') self.assertEqual(client.mock_calls, [call])
def test_failure(self, mCreateSession): iSession = MockSession() client = iSession.client('stepfunctions') mCreateSession.return_value = (iSession, '123456') task = TaskMixin() task.token = 'token' self.assertEqual(task.token, 'token') task.failure(None, None) self.assertEqual(task.token, None) call = mock.call.send_task_failure(taskToken = 'token', error = None, cause = None) self.assertEqual(client.mock_calls, [call])
def test_run_generator(self, mCreateSession): iSession = MockSession() mCreateSession.return_value = (iSession, '123456') client = iSession.client('stepfunctions') def target(input_): yield yield return # Just make sure the target is actually a generator self.assertEqual(type(target(None)), types.GeneratorType) task = TaskMixin(process = target) task.handle_task('token', None) self.assertEqual(task.token, None) call = mock.call.send_task_success(taskToken = 'token', output = 'null') call_ = mock.call.send_task_heartbeat(taskToken = 'token') calls = [call_, call_, call] self.assertEqual(client.mock_calls, calls)
def test_run_exception(self, mCreateSession): iSession = MockSession() mCreateSession.return_value = (iSession, '123456') client = iSession.client('stepfunctions') target = mock.MagicMock() target.side_effect = BossError('cause') task = TaskMixin(process = target) task.handle_task('token', None) self.assertEqual(task.token, None) call = mock.call.send_task_failure(taskToken = 'token', error = 'BossError', cause = 'cause') self.assertEqual(client.mock_calls, [call])
def test_create_activity(self, mCreateSession): iSession = MockSession() mCreateSession.return_value = (iSession, '123456') client = iSession.client('stepfunctions') client.create_activity.return_value = { 'activityArn': 'XXX' } activity = ActivityMixin() activity.name = 'name' self.assertEqual(activity.arn, None) activity.create_activity() self.assertEqual(activity.arn, 'XXX') calls = [ mock.call.create_activity(name = 'name') ] self.assertEqual(client.mock_calls, calls)
def test_delete_activity(self, mCreateSession): iSession = MockSession() mCreateSession.return_value = (iSession, '123456') client = iSession.client('stepfunctions') activity = ActivityMixin() activity.arn = 'XXX' activity.delete_activity() self.assertEqual(activity.arn, None) calls = [ mock.call.delete_activity(activityArn = 'XXX') ] self.assertEqual(client.mock_calls, calls)
def test_poll_task(self, mCreateSession): iSession = MockSession() mCreateSession.return_value = (iSession, '123456') client = iSession.client('stepfunctions') client.get_activity_task.return_value = { 'taskToken': 'YYY', 'input': '{}' } activity = ActivityMixin() activity.arn = 'XXX' token, input_ = activity.poll_task('worker') self.assertEqual(token, 'YYY') self.assertEqual(input_, {}) calls = [ mock.call.get_activity_task(activityArn = 'XXX', workerName = 'worker') ] self.assertEqual(client.mock_calls, calls)
def test_poll_task_no_work(self, mCreateSession): iSession = MockSession() mCreateSession.return_value = (iSession, '123456') client = iSession.client('stepfunctions') client.get_activity_task.return_value = { 'taskToken': '' } activity = ActivityMixin() activity.arn = 'XXX' token, input_ = activity.poll_task('worker') self.assertEqual(token, None) self.assertEqual(input_, None) calls = [ mock.call.get_activity_task(activityArn = 'XXX', workerName = 'worker') ] self.assertEqual(client.mock_calls, calls)
def test_execute_calls_save_only_if_product_has_not_saved_before(self): def side_effect(): return [ (1, 'name', 'title', 'url'), (2, 'Name', 'Title', 'Url'), (1, 'name', 'title', 'url'), ] self.crawler_uc.execute.side_effect = side_effect GetProductsFromSiteUseCase.execute() assert 2 == self.saveinfo_uc.execute.call_count self.saveinfo_uc.execute.assert_has_calls([ call('name', 'title', 'url'), call('Name', 'Title', 'Url'), ])
def test_memoize(): """Test Caching with no Time-To-Live (TTL).""" global x for fn in memoize_fns: x = 0 assert_eq(4, fn(1)) assert_eq(1, x) assert_eq(8, fn(2, 4)) assert_eq(2, x) # should not result in another call assert_eq(8, fn(2, z=4)) assert_eq(2, x) assert_eq(8, fn(y=2, z=4)) assert_eq(2, x) fn.clear_cache() assert_eq(4, fn(1)) assert_eq(3, x)
def test_memoize_with_ttl_unhashable(): """Test Caching with TTL using dictionary arguments.""" global x x = 0 assert_eq(12, cached_fn_with_ttl_unhashable(2)) assert_eq(1, x) assert_eq(10, cached_fn_with_ttl_unhashable(1, z={'a': 2, 'b': 3, 'c': 5})) assert_eq(2, x) # should not result in another call assert_eq(10, cached_fn_with_ttl_unhashable(1, z={'a': 2, 'b': 3, 'c': 5})) assert_eq(2, x) assert_eq(12, cached_fn_with_ttl_unhashable(2, z={'a': 1, 'b': 2, 'c': 3})) assert_eq(2, x) cached_fn_with_ttl_unhashable.clear_cache() assert_eq(12, cached_fn_with_ttl_unhashable(2)) assert_eq(3, x)
def test_discriminator(discriminator, tf_module): with TmpMock(tf_module, 'variable_scope') as mock_variable_scope: image = tf.placeholder(tf.float32, [None, 28, 28, 3]) output, logits = discriminator(image) _assert_tensor_shape(output, [None, 1], 'Discriminator Training(reuse=false) output') _assert_tensor_shape(logits, [None, 1], 'Discriminator Training(reuse=false) Logits') assert mock_variable_scope.called,\ 'tf.variable_scope not called in Discriminator Training(reuse=false)' assert mock_variable_scope.call_args == mock.call('discriminator', reuse=False), \ 'tf.variable_scope called with wrong arguments in Discriminator Training(reuse=false)' mock_variable_scope.reset_mock() output_reuse, logits_reuse = discriminator(image, True) _assert_tensor_shape(output_reuse, [None, 1], 'Discriminator Inference(reuse=True) output') _assert_tensor_shape(logits_reuse, [None, 1], 'Discriminator Inference(reuse=True) Logits') assert mock_variable_scope.called, \ 'tf.variable_scope not called in Discriminator Inference(reuse=True)' assert mock_variable_scope.call_args == mock.call('discriminator', reuse=True), \ 'tf.variable_scope called with wrong arguments in Discriminator Inference(reuse=True)'
def test_generator(generator, tf_module): with TmpMock(tf_module, 'variable_scope') as mock_variable_scope: z = tf.placeholder(tf.float32, [None, 100]) out_channel_dim = 5 output = generator(z, out_channel_dim) _assert_tensor_shape(output, [None, 28, 28, out_channel_dim], 'Generator output (is_train=True)') assert mock_variable_scope.called, \ 'tf.variable_scope not called in Generator Training(reuse=false)' assert mock_variable_scope.call_args == mock.call('generator', reuse=False), \ 'tf.variable_scope called with wrong arguments in Generator Training(reuse=false)' mock_variable_scope.reset_mock() output = generator(z, out_channel_dim, False) _assert_tensor_shape(output, [None, 28, 28, out_channel_dim], 'Generator output (is_train=False)') assert mock_variable_scope.called, \ 'tf.variable_scope not called in Generator Inference(reuse=True)' assert mock_variable_scope.call_args == mock.call('generator', reuse=True), \ 'tf.variable_scope called with wrong arguments in Generator Inference(reuse=True)'
def test_normal_start(self, runner, process, getLogger): assert isinstance(process, mock.MagicMock) # Arrange process_count = 4 logger = getLogger.return_value = mock.MagicMock() expected_result = runner.return_value = "SOME_RETURN_VALUE" result_backend = { 'result_backend_class': 'easy_job.result_backends.dummy.DummyBackend' } # Act initializer = RabbitMQInitializer(count=process_count, logger="logger", result_backend=result_backend, options={'serialization_method': 'json', 'queue_name': 'queue', 'rabbitmq_configs': {}}) result = initializer.start() # Assert logger.log.assert_called_once_with(10, "Starting {} RabbitMQ workers...".format(process_count)) process.assert_has_calls([mock.call(args=(mock.ANY,), target=mock.ANY), mock.call().start()] * process_count) self.assertEqual(result, expected_result) _, args, kwargs = process.mock_calls[0] worker_object = kwargs['args'][0] self.assertIsInstance(worker_object, RabbitMQWorker)
def test_actually_calling_retrying_library_in_a_failing_condition(self, ): # Arrange function_side_effects = [IndentationError(), SyntaxError(), OSError()] func = mock.MagicMock(side_effect=function_side_effects) from easy_job.workers.common import call_with_retry as target retrying_params = { "stop_max_attempt_number": 3 } args = (1, 2, 3) kwargs = {"p": "arameter"} # Act with self.assertRaises(OSError): target(func, args, kwargs, retry_policy=retrying_params) # Assert func.assert_has_calls([mock.call(*args, **kwargs), mock.call(*args, **kwargs), mock.call(*args, **kwargs)] )
def test_case_study_create_api_success( mock_create_case_study, supplier_case_study_end_to_end, sso_user, all_case_study_data, api_response_200 ): mock_create_case_study.return_value = api_response_200 response = supplier_case_study_end_to_end() assert response.status_code == http.client.FOUND assert response.get('Location') == reverse('company-detail') data = { **all_case_study_data, 'image_one': ANY, 'image_two': ANY, 'image_three': ANY, } # django converts uploaded files to UploadedFile, which makes # `assert_called_once_with` tricky. assert mock_create_case_study.call_count == 1 assert mock_create_case_study.call_args == call( data=data, sso_session_id=sso_user.session_id, )
def test_supplier_sectors_edit_standalone_view_api_success( mock_update_profile, has_company_client, company_profile_sectors_standalone_data, api_response_200, sso_user, ): mock_update_profile.return_value = api_response_200 url = reverse('company-edit-sectors') has_company_client.post(url, company_profile_sectors_standalone_data) assert mock_update_profile.call_count == 1 assert mock_update_profile.call_args == call( sso_session_id=sso_user.session_id, data={ 'sectors': ['AGRICULTURE_HORTICULTURE_AND_FISHERIES'], 'export_destinations': ['CN', 'IN'], 'export_destinations_other': 'West Philadelphia', } )
def test_verify_company_address_end_to_end( mock_update_profile, settings, has_company_client, send_verification_letter_end_to_end ): settings.FEATURE_COMPANIES_HOUSE_OAUTH2_ENABLED = True view = views.SendVerificationLetterView response = send_verification_letter_end_to_end() assert response.status_code == 200 assert response.template_name == view.templates[view.SENT] assert mock_update_profile.call_count == 1 assert mock_update_profile.call_args == call( data={ 'postal_full_name': 'Jeremy', }, sso_session_id='213' )
def test_wait(mocker, config): # pylint: disable=W0621 """Assert runner waits before re-running the monitoring script""" mock_execution = mocker.Mock() mock_execution.returncode = Codes.CRITICAL.value mocker.patch('subprocess.run', return_value=mock_execution) mocker.patch('cachetclient.cachet.Components.put') time_sleep = mocker.patch('time.sleep') n_retries = 5 d_interval = 10 with pytest.raises(SystemExit): unit.main(script=mocker.Mock(), component_id=99, config_file=config, retries=n_retries, interval=d_interval) expected_calls = [call(10), call(10), call(10), call(10), call(10)] assert time_sleep.call_args_list == expected_calls
def test_create_amazon_refresh_token_no_defaults(mock_http_server, settings): settings.ALEXA_VOICE_SERVICE_CLIENT_ID = 'my-client-id' settings.ALEXA_VOICE_SERVICE_CLIENT_SECRET = 'my-client-secret' settings.ALEXA_VOICE_SERVICE_DEVICE_TYPE_ID = 'my-device-type-id' out = StringIO() call_command( 'create_amazon_refresh_token', '--address=127.0.0.1', '--port=9000', stdout=out ) assert mock_http_server.call_count == 1 assert mock_http_server.call_args == call( server_address=('127.0.0.1', 9000), RequestHandlerClass=handlers.AmazonAlexaServiceLoginHandler, client_id='my-client-id', client_secret='my-client-secret', device_type_id='my-device-type-id', ) assert 'running server on http://127.0.0.1:9000' in out.getvalue()
def test_internal_find_index_called_for_each_item(self, monkeypatch, tmp_folder): """Internal finding an index number whilst scanning only called within _insert_sorted() method during async scanning. So it should be called only per item. Testing for: def _find_index(self, summary, sort_by). :param monkeypatch: pytest monkey patch fixture :type monkeypatch: _pytest.monkeypatch.MonkeyPatch :param tmp_folder: Test params and dummy test folder factory fixture pair. :type tmp_folder: (dict, dirtools.tests.factory.DummyFolderFactory) """ params, factory = tmp_folder scan = Folder(factory.path, params['sort_by'], params['level']) find_index = Mock(return_value=0) monkeypatch.setattr(scan, '_find_index', find_index) for item in scan.items(humanise=False): assert call(item, params['sort_by']) in find_index.call_args_list
def test_cubic_lattice( self, mock_lattice, mock_site ): a, b, c, = 2, 2, 2 spacing = 1.0 mock_site.side_effect = range(2*2*2) mock_lattice.return_value = 'bar' lattice = init_lattice.cubic_lattice( a, b, c, spacing ) expected_site_calls = [ [ 1, np.array( [ 0., 0., 0. ] ), [ 2, 2, 3, 3, 5, 5 ], 0.0, 'L' ], [ 2, np.array( [ 1., 0., 0. ] ), [ 1, 1, 4, 4, 6, 6 ], 0.0, 'L' ], [ 3, np.array( [ 0., 1., 0. ] ), [ 4, 4, 1, 1, 7, 7 ], 0.0, 'L' ], [ 4, np.array( [ 1., 1., 0. ] ), [ 3, 3, 2, 2, 8, 8 ], 0.0, 'L' ], [ 5, np.array( [ 0., 0., 1. ] ), [ 6, 6, 7, 7, 1, 1 ], 0.0, 'L' ], [ 6, np.array( [ 1., 0., 1. ] ), [ 5, 5, 8, 8, 2, 2 ], 0.0, 'L' ], [ 7, np.array( [ 0., 1., 1. ] ), [ 8, 8, 5, 5, 3, 3 ], 0.0, 'L' ], [ 8, np.array( [ 1., 1., 1. ] ), [ 7, 7, 6, 6, 4, 4 ], 0.0, 'L' ] ] for call, e in zip( mock_site.mock_calls, expected_site_calls ): self.assertEqual( call[1][0], e[0] ) # site number np.testing.assert_array_equal( call[1][1], e[1] ) # site coordinates self.assertEqual( call[1][2], e[2] ) # neighbour lists self.assertEqual( call[1][3], e[3] ) # ? self.assertEqual( call[1][4], e[4] ) # site label self.assertEqual( mock_lattice.mock_calls[0][1][0], list(range(2*2*2)) ) self.assertEqual( lattice, 'bar' ) np.testing.assert_array_equal( mock_lattice.mock_calls[0][2]['cell_lengths'], np.array( [ a, b, c, ] ) )
def test_potential_jumps_if_lattice_is_mostly_empty( self, mock_Jump ): # lattice is mostly vacant jumps = [ Mock( spec=Jump ), Mock( spec=Jump ) ] mock_Jump.side_effect = jumps self.lattice.number_of_occupied_sites = 1 self.lattice_number_of_sites = 4 site = Mock( spec=Site ) site.neighbours = [ 2, 3 ] occupied_sites = [ site ] unoccupied_sites = [ Mock( spec=Site ), Mock( spec=Site ), Mock( spec=Site ) ] self.lattice.nn_energy = 'A' self.lattice.cn_energies = 'B' self.lattice.jump_lookup_table = 'C' for s in unoccupied_sites: s.is_occupied = False with patch( 'lattice_mc.lattice.Lattice.occupied_sites' ) as mock_occupied_sites: with patch( 'lattice_mc.lattice.Lattice.site_with_id' ) as mock_site_with_id: mock_occupied_sites.return_value = occupied_sites mock_site_with_id.side_effect = unoccupied_sites[:2] potential_jumps = self.lattice.potential_jumps() self.assertEqual( potential_jumps, jumps ) self.assertEqual( mock_Jump.mock_calls[0][1], ( site, unoccupied_sites[0], 'A', 'B', 'C' ) ) self.assertEqual( mock_Jump.mock_calls[1][1], ( site, unoccupied_sites[1], 'A', 'B', 'C' ) ) mock_site_with_id.assert_has_calls( [ call(2), call(3) ] )
def test_potential_jumps_if_lattice_is_mostly_filled( self, mock_Jump ): # lattice is mostly occupied mock_Jump.side_effect = [ 'jump1', 'jump2' ] self.lattice.number_of_occupied_sites = 3 self.lattice_number_of_sites = 4 site = Mock( spec=Site ) site.neighbours = [ 2, 3 ] vacant_sites = [ site ] occupied_sites = [ Mock( spec=Site ), Mock( spec=Site ), Mock( spec=Site ) ] for s in occupied_sites: s.is_occupied = True self.lattice.nn_energy = 'A' self.lattice.cn_energies = 'B' self.lattice.jump_lookup_table = 'C' with patch( 'lattice_mc.lattice.Lattice.vacant_sites' ) as mock_vacant_sites: with patch( 'lattice_mc.lattice.Lattice.site_with_id' ) as mock_site_with_id: mock_vacant_sites.return_value = vacant_sites mock_site_with_id.side_effect = occupied_sites[:2] jumps = self.lattice.potential_jumps() self.assertEqual( jumps, [ 'jump1', 'jump2' ] ) self.assertEqual( mock_Jump.mock_calls[0][1], ( occupied_sites[0], site, 'A', 'B', 'C' ) ) self.assertEqual( mock_Jump.mock_calls[1][1], ( occupied_sites[1], site, 'A', 'B', 'C' ) ) mock_site_with_id.assert_has_calls( [ call(2), call(3) ] )
def test_run_pipeline_pass_skip_parse_context(mocked_work_dir, mocked_get_pipe_def, mocked_get_parsed_context, mocked_run_step_group): """run_pipeline passes correct params to all methods.""" pypyr.pipelinerunner.run_pipeline( pipeline_name='arb pipe', working_dir='arb/dir', parse_input=False) mocked_work_dir.assert_not_called() mocked_get_pipe_def.assert_called_once_with(pipeline_name='arb pipe', working_dir='arb/dir') mocked_get_parsed_context.assert_not_called() # 1st called steps, then on_success expected_run_step_groups = [call(context={}, pipeline_definition='pipe def', step_group_name='steps'), call(context={}, pipeline_definition='pipe def', step_group_name='on_success')] mocked_run_step_group.assert_has_calls(expected_run_step_groups)
def test6_RGBLED_set(self, mock_wiringpi_softPwmCreate, mock_wiringpi_softPwmWrite, \ mock_wiringpi_ISR, mock_wiringpi_pinMode, mock_wiringpi_setup): gpio_pin_r = 10 gpio_pin_g = 8 gpio_pin_b = 6 led = RGBLED(gpio_pin_r, gpio_pin_g, gpio_pin_b) calls = [call(gpio_pin_r, 1), call(gpio_pin_g, 1), call(gpio_pin_b, 1)] mock_wiringpi_pinMode.assert_has_calls(calls) calls = [call(gpio_pin_r, 0, 100), call(gpio_pin_g, 0, 100), call(gpio_pin_b, 0, 100)] mock_wiringpi_softPwmCreate.assert_has_calls(calls) r_level = 1 g_level = 2 b_level = 3 led.set(r_level, g_level, b_level) calls = [call(gpio_pin_r, r_level), call(gpio_pin_g, g_level), call(gpio_pin_b, b_level)] mock_wiringpi_softPwmWrite.assert_has_calls(calls) self.assertEqual(led.get(), (1, 2, 3))
def test_send_message_to_chat(self, mock_requests): parsed_update = parse_update(TelegramUpdates.TEXT_OK_ID_OK_TEXT) parsed_update['parse_mode'] = 'HTML' parsed_update['text'] = 'Sentiment' result = send_message_to_chat.delay(parsed_update).get(timeout=5) # on success the Message sent to the chat is returned self.assertEqual(result['text'], parsed_update['text']) self.assertEqual(result['parse_mode'], parsed_update['parse_mode']) self.assertEqual(result['reply_to_message_id'], parsed_update['reply_to_message_id']) self.assertEqual(result['chat_id'], parsed_update['chat_id']) mock_requests.assert_called() self.assertIn(call(current_app.config['TELEGRAM_URL'] + 'sendMessage', json=parsed_update, timeout=current_app.config['TELEGRAM_REQUEST_TIMEOUT_SEC']), mock_requests.call_args_list)
def test_build_bundled_pdfs_if_some_are_not_prefilled( self, logger, get_parser, SimpleUploadedFile, slack, SubService): # two submissions get_parser.return_value.join_pdfs.return_value = b'pdf' mock_submissions = [Mock(), Mock()] mock_bundle = Mock(pk=2) mock_bundle.should_have_a_pdf.return_value = True # one is not prefilled mock_bundle.get_individual_filled_pdfs.return_value = [Mock()] mock_bundle.submissions.all.return_value = mock_submissions mock_bundle.organization.pk = 1 # run BundlesService.build_bundled_pdf_if_necessary(mock_bundle) error_msg = "Submissions for ApplicationBundle(pk=2) lack pdfs" logger.error.assert_called_once_with(error_msg) slack.assert_called_once_with(error_msg) self.assertEqual( len(mock_bundle.get_individual_filled_pdfs.mock_calls), 2) mock_bundle.save.assert_called_once_with() SubService.fill_pdfs_for_submission.assert_has_calls( [call(mock_sub) for mock_sub in mock_submissions], any_order=True)
def test_generate_endpoint_parser_noparam(addargument): """Generate a parser from endpoint metadata - no params""" name = 'put-stuff' metadata = { 'path': 'stuff', 'method': 'PUT', 'help': "Changes stuff", 'params': {}, } parser = ArgumentParser() subparsers = parser.add_subparsers() generate_endpoint_parser(subparsers, name, metadata) addargument.assert_has_calls([ # first helper for the main parser mock.call('-h', '--help', action='help', default=mock.ANY, help=mock.ANY), # second helper for the 'put-stuff' subparser mock.call('-h', '--help', action='help', default=mock.ANY, help=mock.ANY) ])
def test_download_lstree_git(self): git_url = 'git://githoobie.com/lol.git<%s>' % tree_object await self._do_download(git_url) # ensure git config was appended to, and that the output file was # opened self.open.assert_any_call('/t/mut/lol.git/config', 'a') self.open.assert_any_call('/t/res/lol.git', 'w+') # ensure the sequence of git commands were called assert self.subprocess.run.mock_calls == [ call(['git', 'clone', '--bare', 'git://githoobie.com/lol.git', '/t/mut/lol.git']), call(['git', 'ls-tree', '-r', '--long', '--full-tree', tree_object], cwd='/t/mut/lol.git', stdout=self.open()), ]
def test_download_directory_git(self): path = '/' git_url = 'git://githoobie.com/lol.git<%s><%s>' % (tree_object, path) await self._do_download(git_url) self._check_config() # ensure the sequence of git commands were called assert self.subprocess.run.mock_calls == [ call(['git', 'clone', '--bare', 'git://githoobie.com/lol.git', '/t/mut/lol.git'], cwd='/t/mut'), call(['git', 'rev-parse', '--quiet', '--verify', tree_object], cwd='/t/mut/lol.git', stdout=-1), call(['git', 'archive', '--prefix=/t/res/lol.git', '--format=directory', tree_object], cwd='/t/mut/lol.git'), ]
def test_download_lstree_git(self): git_url = 'git://githoobie.com/lol.git<%s>' % tree_object await self._do_download(git_url) # ensure git config was appended to, and that the output file was # opened self.open.assert_any_call('/t/mut/lol.git/config', 'a') self.open.assert_any_call('/t/res/lol.git', 'w+') # ensure the sequence of git commands were called assert self.subprocess.run.mock_calls == [ call(['git', 'clone', '--bare', 'git://githoobie.com/lol.git', '/t/mut/lol.git'], cwd='/t/mut'), call(['git', 'rev-parse', '--quiet', '--verify', tree_object], cwd='/t/mut/lol.git', stdout=-1), call(['git', 'ls-tree', '-r', '--long', '--full-tree', tree_object], cwd='/t/mut/lol.git', stdout=self.open()), ]
def test_update_when_hash_not_present(self): class FakeResult: stdout = b'' self.subprocess.run.return_value = FakeResult path = 'README.md' git_url = 'git://githoobie.com/lol.git<%s><%s>' % (tree_object, path) await self._do_download(git_url) self._check_config() # ensure the sequence of git commands were called assert self.subprocess.run.mock_calls == [ call(['git', 'clone', '--bare', 'git://githoobie.com/lol.git', '/t/mut/lol.git'], cwd='/t/mut'), call(['git', 'rev-parse', '--quiet', '--verify', tree_object], cwd='/t/mut/lol.git', stdout=-1), call(['git', 'fetch'], cwd='/t/mut/lol.git'), call(['git', 'archive', '--output=/t/res/README.md', '--format=raw', tree_object, 'README.md'], cwd='/t/mut/lol.git'), ]
def test_calls_update_streams_with_content(self, mock_update): # Calls on create content = ContentFactory() mock_update.assert_called_once_with(content) mock_update.reset_mock() # Does not call on update content.text = "update!" content.save() self.assertFalse(mock_update.called)
def test_local_content_with_parent_sent_as_reply(self, mock_send): user = UserFactory() parent = ContentFactory(author=user.profile) mock_send.reset_mock() content = ContentFactory(author=user.profile, parent=parent) self.assertTrue(content.local) call_args = [ call(send_reply_notifications, content.id), call(send_reply, content.id), ] assert mock_send.call_args_list == call_args