我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用unittest.mock.PropertyMock()。
def setup_method(self, method): self.patch_repo_config = patch.object(RepoHandler, 'get_config_value') self.patch_open_pull_requests = patch.object(RepoHandler, 'open_pull_requests') self.patch_labels = patch.object(PullRequestHandler, 'labels', new_callable=PropertyMock) self.patch_last_commit_date = patch.object(PullRequestHandler, 'last_commit_date', new_callable=PropertyMock) self.patch_find_comments = patch.object(PullRequestHandler, 'find_comments') self.patch_submit_comment = patch.object(PullRequestHandler, 'submit_comment') self.patch_close = patch.object(PullRequestHandler, 'close') self.autoclose_stale = self.patch_repo_config.start() self.open_pull_requests = self.patch_open_pull_requests.start() self.labels = self.patch_labels.start() self.last_commit_date = self.patch_last_commit_date.start() self.find_comments = self.patch_find_comments.start() self.submit_comment = self.patch_submit_comment.start() self.close = self.patch_close.start()
def test_worker_thread_transfer(): u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) u._transfer_queue.put( (mock.MagicMock, mock.MagicMock, mock.MagicMock, mock.MagicMock) ) u._transfer_queue.put( (mock.MagicMock, mock.MagicMock, mock.MagicMock, mock.MagicMock) ) u._process_transfer = mock.MagicMock() u._process_transfer.side_effect = [None, Exception()] with mock.patch( 'blobxfer.operations.upload.Uploader.termination_check', new_callable=mock.PropertyMock) as patched_tc: patched_tc.side_effect = [False, False, True] u._worker_thread_transfer() assert u._process_transfer.call_count == 2 assert len(u._exceptions) == 1
def test_worker_thread_upload(ts): u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) u._general_options.concurrency.transfer_threads = 1 u._transfer_set = mock.MagicMock() u._transfer_set.__len__.side_effect = [5, 0, 0, 0] u._upload_queue.put(mock.MagicMock) u._upload_queue.put(mock.MagicMock) u._process_upload_descriptor = mock.MagicMock() u._process_upload_descriptor.side_effect = [None, Exception()] with mock.patch( 'blobxfer.operations.upload.Uploader.termination_check', new_callable=mock.PropertyMock) as patched_tc: patched_tc.side_effect = [False, False, False, False, True] u._worker_thread_upload() assert u._process_upload_descriptor.call_count == 2 assert len(u._exceptions) == 1
def test_is_timer_task_completed_multiple_pages(self, page1_response, page2_response, dt1, dt2, dt3, mocker): page1_response['events'] = [{'eventId':3, 'eventType':'TimerFired', 'eventTimestamp':dt3, 'timerFiredEventAttributes':{'timerId':'t_id'}}, {'eventId':2, 'eventType':'DecisionTaskCompleted', 'eventTimestamp':dt2}] page2_response['events'] = [{'eventId':1, 'eventType':'TimerStarted', 'eventTimestamp':dt1, 'timerStartedEventAttributes':{'timerId':'t_id'}}] page2_response.pop('nextPageToken', None) swf_mock = SwfMock() swf_mock.pages['page2'] = page2_response mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=swf_mock) h = floto.History(domain='d', task_list='tl', response=page1_response) assert h._has_next_event_page() assert h.is_timer_task_completed('t_id') assert not h._has_next_event_page()
def test_get_id_previous_started_multi_page(self, page1_response, page2_response, dt1, dt2, dt3, mocker): page1_response['events'] = [{'eventId':3, 'eventType':'DecisionTaskStarted', 'eventTimestamp':dt3}, {'eventId':2, 'eventType':'DecisionTaskCompleted', 'eventTimestamp':dt2}] page2_response['events'] = [{'eventId':1, 'eventType':'DecisionTaskStarted', 'eventTimestamp':dt1}] swf_mock = SwfMock() swf_mock.pages['page2'] = page2_response mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=swf_mock) h = floto.History(domain='d', task_list='tl', response=page1_response) assert h.get_id_previous_started(page1_response['events'][0]) == 1
def test_get_id_activity_task_event_multi_page(self, page1_response, page2_response, dt1, dt2, dt3, mocker): activity_task_failed_event = {'eventId':3, 'eventType':'ActivityTaskFailed', 'eventTimestamp':dt3, 'activityTaskFailedEventAttributes':{'scheduledEventId':1}} decision_task_completed = {'eventId':2, 'eventType':'DecisionTaskCompleted', 'eventTimestamp':dt2} activity_task_scheduled_event = {'eventId':1, 'eventType':'ActivityTaskScheduled', 'eventTimestamp':dt1, 'activityTaskScheduledEventAttributes':{'activityId':'a_id'}} page1_response['events'] = [activity_task_failed_event, decision_task_completed] page2_response['events'] = [activity_task_scheduled_event] swf_mock = SwfMock() swf_mock.pages['page2'] = page2_response mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=swf_mock) h = floto.History(domain='d', task_list='tl', response=page1_response) assert h.get_id_activity_task_event(activity_task_failed_event) == 'a_id'
def test_register_type_does_not_raise(self, mocker): error_response = {'Error':{'Code':'TypeAlreadyExistsFault'}} client_error = botocore.exceptions.ClientError(error_response=error_response, operation_name="op_name") mock_function = Mock(side_effect=client_error) client_mock = type("ClientMock", (object,), {"register_workflow_type":mock_function}) mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=client_mock()) swf = floto.api.Swf() args = {'domain': 'test_domain', 'name': 'my_workflow_type', 'version': 'v1'} workflow_type = floto.api.WorkflowType(**args) properties = workflow_type.swf_attributes swf.register_type(workflow_type) swf.client.register_workflow_type.assert_called_once_with(**properties)
def test_submit_run(self, mock_requests): mock_requests.codes.ok = 200 mock_requests.post.return_value.json.return_value = {'run_id': '1'} status_code_mock = mock.PropertyMock(return_value=200) type(mock_requests.post.return_value).status_code = status_code_mock json = { 'notebook_task': NOTEBOOK_TASK, 'new_cluster': NEW_CLUSTER } run_id = self.hook.submit_run(json) self.assertEquals(run_id, '1') mock_requests.post.assert_called_once_with( submit_run_endpoint(HOST), json={ 'notebook_task': NOTEBOOK_TASK, 'new_cluster': NEW_CLUSTER, }, auth=(LOGIN, PASSWORD), headers=USER_AGENT_HEADER, timeout=self.hook.timeout_seconds)
def test_get_run_state(self, mock_requests): mock_requests.codes.ok = 200 mock_requests.get.return_value.json.return_value = GET_RUN_RESPONSE status_code_mock = mock.PropertyMock(return_value=200) type(mock_requests.get.return_value).status_code = status_code_mock run_state = self.hook.get_run_state(RUN_ID) self.assertEquals(run_state, RunState( LIFE_CYCLE_STATE, RESULT_STATE, STATE_MESSAGE)) mock_requests.get.assert_called_once_with( get_run_endpoint(HOST), json={'run_id': RUN_ID}, auth=(LOGIN, PASSWORD), headers=USER_AGENT_HEADER, timeout=self.hook.timeout_seconds)
def test_task_functions(self, input_patch, output_patch): with mock.patch( "bwg.tasks.relation_merging.RelationMergingTask.workflow_resources", new_callable=mock.PropertyMock() ) as workflow_mock: task_config = { "CORPUS_ENCODING": "", "RELATION_MERGING_OUTPUT_PATH": "" } output_patch.return_value = MockOutput() input_patch.return_value = ( MockInput(RELATION_MERGING_TASK["input"][0]), MockInput(RELATION_MERGING_TASK["input"][0]) ) workflow_mock.__get__ = mock.Mock(return_value={}) task = bwg.tasks.relation_merging.RelationMergingTask(task_config=task_config) # Testing self._test_get_relations_from_sentence_json() self._test_is_relevant_article(task) self._test_is_relevant_sentence(task)
def test_task_functions(self, input_patch, output_patch): with mock.patch( "bwg.tasks.dependency_parsing.DependencyParseTask.workflow_resources", new_callable=mock.PropertyMock() ) as workflow_mock: task_config = { "STANFORD_DEPENDENCY_MODEL_PATH": "", "STANFORD_CORENLP_MODELS_PATH": "", "CORPUS_ENCODING": "", "DEPENDENCY_OUTPUT_PATH": "" } output_patch.return_value = MockOutput() input_patch.return_value = MockInput(DEPENDENCY_TASK["input"]) workflow_mock.__get__ = mock.Mock( return_value={ "dependency_parser": MockParser() } ) task = bwg.tasks.dependency_parsing.DependencyParseTask(task_config=task_config) # Testing self._test_task(task) self._test_dependency_parse(task)
def test_find_payments(self): cls = LowestBalanceFirstMethod(Decimal('100.00')) s1 = Mock(spec_set=CCStatement) type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00')) type(s1).principal = PropertyMock(return_value=Decimal('10.00')) s2 = Mock(spec_set=CCStatement) type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00')) type(s2).principal = PropertyMock(return_value=Decimal('25.00')) s3 = Mock(spec_set=CCStatement) type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00')) type(s3).principal = PropertyMock(return_value=Decimal('1234.56')) s4 = Mock(spec_set=CCStatement) type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00')) type(s4).principal = PropertyMock(return_value=Decimal('3.00')) assert cls.find_payments([s1, s2, s3, s4]) == [ Decimal('2.00'), Decimal('5.00'), Decimal('2.00'), Decimal('91.00') ]
def test_find_payments_total_too_low(self): cls = LowestBalanceFirstMethod(Decimal('3.00')) s1 = Mock(spec_set=CCStatement) type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00')) type(s1).principal = PropertyMock(return_value=Decimal('10.00')) s2 = Mock(spec_set=CCStatement) type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00')) type(s2).principal = PropertyMock(return_value=Decimal('25.00')) s3 = Mock(spec_set=CCStatement) type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00')) type(s3).principal = PropertyMock(return_value=Decimal('1234.56')) s4 = Mock(spec_set=CCStatement) type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00')) type(s4).principal = PropertyMock(return_value=Decimal('3.00')) with pytest.raises(TypeError): cls.find_payments([s1, s2, s3, s4])
def test_find_payments(self): cls = HighestBalanceFirstMethod(Decimal('100.00')) s1 = Mock(spec_set=CCStatement) type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00')) type(s1).principal = PropertyMock(return_value=Decimal('10.00')) s2 = Mock(spec_set=CCStatement) type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00')) type(s2).principal = PropertyMock(return_value=Decimal('25.00')) s3 = Mock(spec_set=CCStatement) type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00')) type(s3).principal = PropertyMock(return_value=Decimal('1234.56')) s4 = Mock(spec_set=CCStatement) type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00')) type(s4).principal = PropertyMock(return_value=Decimal('3.00')) assert cls.find_payments([s1, s2, s3, s4]) == [ Decimal('2.00'), Decimal('5.00'), Decimal('86.00'), Decimal('7.00') ]
def test_find_payments(self): cls = HighestInterestRateFirstMethod(Decimal('100.00')) s1 = Mock(spec_set=CCStatement) type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00')) type(s1).apr = PropertyMock(return_value=Decimal('0.0100')) type(s1).principal = PropertyMock(return_value=Decimal('10.00')) s2 = Mock(spec_set=CCStatement) type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00')) type(s2).apr = PropertyMock(return_value=Decimal('0.0200')) type(s2).principal = PropertyMock(return_value=Decimal('25.00')) s3 = Mock(spec_set=CCStatement) type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00')) type(s3).apr = PropertyMock(return_value=Decimal('0.0800')) type(s3).principal = PropertyMock(return_value=Decimal('1234.56')) s4 = Mock(spec_set=CCStatement) type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00')) type(s4).apr = PropertyMock(return_value=Decimal('0.0300')) type(s4).principal = PropertyMock(return_value=Decimal('3.00')) assert cls.find_payments([s1, s2, s3, s4]) == [ Decimal('2.00'), Decimal('5.00'), Decimal('86.00'), Decimal('7.00') ]
def test_find_payments_total_too_low(self): cls = HighestInterestRateFirstMethod(Decimal('3.00')) s1 = Mock(spec_set=CCStatement) type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00')) type(s1).principal = PropertyMock(return_value=Decimal('10.00')) s2 = Mock(spec_set=CCStatement) type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00')) type(s2).principal = PropertyMock(return_value=Decimal('25.00')) s3 = Mock(spec_set=CCStatement) type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00')) type(s3).principal = PropertyMock(return_value=Decimal('1234.56')) s4 = Mock(spec_set=CCStatement) type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00')) type(s4).principal = PropertyMock(return_value=Decimal('3.00')) with pytest.raises(TypeError): cls.find_payments([s1, s2, s3, s4])
def test_find_payments(self): cls = LowestInterestRateFirstMethod(Decimal('100.00')) s1 = Mock(spec_set=CCStatement) type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00')) type(s1).apr = PropertyMock(return_value=Decimal('0.0100')) type(s1).principal = PropertyMock(return_value=Decimal('10.00')) s2 = Mock(spec_set=CCStatement) type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00')) type(s2).apr = PropertyMock(return_value=Decimal('0.0200')) type(s2).principal = PropertyMock(return_value=Decimal('25.00')) s3 = Mock(spec_set=CCStatement) type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00')) type(s3).apr = PropertyMock(return_value=Decimal('0.0800')) type(s3).principal = PropertyMock(return_value=Decimal('1234.56')) s4 = Mock(spec_set=CCStatement) type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00')) type(s4).apr = PropertyMock(return_value=Decimal('0.0300')) type(s4).principal = PropertyMock(return_value=Decimal('3.00')) assert cls.find_payments([s1, s2, s3, s4]) == [ Decimal('86.00'), Decimal('5.00'), Decimal('2.00'), Decimal('7.00') ]
def test_find_payments_total_too_low(self): cls = LowestInterestRateFirstMethod(Decimal('3.00')) s1 = Mock(spec_set=CCStatement) type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00')) type(s1).principal = PropertyMock(return_value=Decimal('10.00')) s2 = Mock(spec_set=CCStatement) type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00')) type(s2).principal = PropertyMock(return_value=Decimal('25.00')) s3 = Mock(spec_set=CCStatement) type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00')) type(s3).principal = PropertyMock(return_value=Decimal('1234.56')) s4 = Mock(spec_set=CCStatement) type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00')) type(s4).principal = PropertyMock(return_value=Decimal('3.00')) with pytest.raises(TypeError): cls.find_payments([s1, s2, s3, s4])
def test_next_with_transactions(self): b = Mock(spec_set=_BillingPeriod) b_next = Mock(spec_set=_BillingPeriod) type(b).next_period = PropertyMock(return_value=b_next) i = Mock(spec_set=_InterestCalculation) p = Mock(spec_set=_MinPaymentFormula) cls = CCStatement( i, Decimal('1.23'), p, b, end_balance=Decimal('1.50'), interest_amt=Decimal('0.27') ) mock_stmt = Mock(spec_set=CCStatement) with patch('biweeklybudget.interest.CCStatement', autospec=True) as m: m.return_value = mock_stmt res = cls.next_with_transactions({'foo': 'bar'}) assert res == mock_stmt assert m.mock_calls == [ call( i, Decimal('1.50'), p, b_next, transactions={'foo': 'bar'} ) ]
def test_pay(self): b = Mock(spec_set=_BillingPeriod) b_next = Mock(spec_set=_BillingPeriod) type(b_next).payment_date = PropertyMock(return_value=date(2017, 1, 15)) type(b).next_period = PropertyMock(return_value=b_next) i = Mock(spec_set=_InterestCalculation) p = Mock(spec_set=_MinPaymentFormula) cls = CCStatement( i, Decimal('1.23'), p, b, end_balance=Decimal('1.50'), interest_amt=Decimal('0.27') ) mock_stmt = Mock(spec_set=CCStatement) with patch( 'biweeklybudget.interest.CCStatement.next_with_transactions', autospec=True ) as m: m.return_value = mock_stmt res = cls.pay(Decimal('98.76')) assert res == mock_stmt assert m.mock_calls == [ call(cls, {date(2017, 1, 15): Decimal('98.76')}) ]
def test_get_s3_file_object_http_400_error(): """ Tests Get S3 file object with HTTP 400 error. Looks like HTTP 400 is returned when AWS token expires and S3.Object.load is called. """ load_method = MagicMock( side_effect=botocore.exceptions.ClientError( {'Error': {'Code': u'400', 'Message': 'Bad Request'}}, operation_name='mock load')) s3object = MagicMock(load=load_method) client = Mock() client.Object.return_value = s3object client.load.return_value = None type(client).s3path = PropertyMock(return_value='s3://testbucket/') meta = { u'client': client, u'stage_info': { u'location': 'sfc-teststage/rwyitestacco/users/1234/', u'locationType': 'S3', } } filename = "/path1/file2.txt" akey = SnowflakeS3Util.get_file_header(meta, filename) assert akey is None assert meta['result_status'] == ResultStatus.RENEW_TOKEN
def _init_rest(application, post_requset): connection = MagicMock() connection._login_timeout = 120 connection.errorhandler = Mock(return_value=None) type(connection).application = PropertyMock(return_value=application) type(connection)._internal_application_name = PropertyMock( return_value=CLIENT_NAME ) type(connection)._internal_application_version = PropertyMock( return_value=CLIENT_VERSION ) rest = SnowflakeRestful(host='testaccount.snowflakecomputing.com', port=443, connection=connection) rest._post_request = post_requset return rest
def setUp(self): self.clientquery = {'property_uri': 'https://www.example.com/', 'siteMode': 'en-us', 'clientName': 'Example', 'query_date': '2016-09-01'} self.emptyresponse = '''{"responseAggregationType": "byPage"}''' self.p = (os.path.dirname(os.path.abspath(__file__))) self.build_response_data = '%s/build_response_data.json' % (self.p) self.http_auth = HttpMockSequence([ ({'status': '200'}, open(self.build_response_data, 'rb').read()), ({'status': '200'}, self.emptyresponse.encode('UTF-8')) ]) self.service = build('webmasters', 'v3', http=self.http_auth, developerKey='mocked_api_key_1234') # Mock the service attribute within Apiclient self.mocked_prop = PropertyMock(return_value=self.service)
def setUp(self): self.clientquery = {'property_uri': 'https://www.example.com/', 'siteMode': 'en-us', 'clientName': 'Example', 'query_date': '2016-09-01'} self.response = '''{"error": {"errors": [{"domain": "global","reason": "forbidden", "message": "User does not have sufficient permission for site 'https://www.example.com/'. See also: https://support.google.com/webmasters/answer/2451999."}], "code": 403, "message": "User does not have sufficient permission for site 'https://www.example.com/'. See also: https://support.google.com/webmasters/answer/2451999."}}''' self.p = (os.path.dirname(os.path.abspath(__file__))) self.build_response_data = '%s/build_response_data.json' % (self.p) self.http_auth = HttpMockSequence([ ({'status': '403'}, open(self.build_response_data, 'rb').read()), ({'status': '403'}, self.response.encode('UTF-8')) ]) self.service = build('webmasters', 'v3', http=self.http_auth, developerKey='mocked_api_key_1234') self.mocked_prop = PropertyMock(return_value=self.service)
def test_get_queryset(km_user_factory, media_resource_factory): """ The queryset should return all the items owned by the Know Me user that is provided as context. """ km_user = km_user_factory() media_resource_factory(km_user=km_user) media_resource_factory() with mock.patch( 'know_me.serializers.fields.MediaResourceField.context', new_callable=mock.PropertyMock) as mock_context: mock_context.return_value = { 'km_user': km_user, } field = fields.MediaResourceField() result = field.get_queryset() expected = km_user.media_resources.all() assert list(result) == list(expected)
def test_to_internal_value(media_resource_factory): """ ``.to_internal_value()`` should return the media resource that has the provided ID. """ resource = media_resource_factory() with mock.patch( 'know_me.serializers.fields.MediaResourceField.context', new_callable=mock.PropertyMock) as mock_context: mock_context.return_value = { 'km_user': resource.km_user, } field = fields.MediaResourceField() assert field.to_internal_value(resource.id) == resource
def test_to_internal_value_non_existent(km_user_factory): """ If there is no media resource matching the provided ID, a ``ValidationError`` should be raised. """ with mock.patch( 'know_me.serializers.fields.MediaResourceField.context', new_callable=mock.PropertyMock) as mock_context: mock_context.return_value = { 'km_user': km_user_factory(), } field = fields.MediaResourceField() with pytest.raises(ValidationError): field.to_internal_value(1)
def test_to_representation(media_resource_factory, serializer_context): """ ``.to_representation()`` should return the serialized version of the provided media resource. """ resource = media_resource_factory() serializer = MediaResourceSerializer(resource, context=serializer_context) with mock.patch( 'know_me.serializers.fields.MediaResourceField.context', new_callable=mock.PropertyMock) as mock_context: mock_context.return_value = serializer_context field = fields.MediaResourceField() assert field.to_representation(resource) == serializer.data
def test_lists_unreadable_android_directory_using_helper_method(self, mock_runner): mock_response = mock.Mock() mock_response.is_successful.return_value = True type(mock_response).data = mock.PropertyMock(return_value={ 'path': '/foo/bar', 'readable': False, 'writable': False, 'files': {} }) mock_runner.return_value.get_last_message.return_value = mock_response with capture(_ls_android, ['/foo/bar']) as o: output = o self.assertEqual(output, '\nReadable: No Writable: No\n')
def test_prints_ios_environment_via_platform_helpers(self, mock_runner): mock_response = mock.Mock() mock_response.is_successful.return_value = True type(mock_response).data = mock.PropertyMock(return_value={'foo': '/bar'}) mock_runner.return_value.get_last_message.return_value = mock_response with capture(_get_ios_environment) as o: output = o expected_output = """ Name Path ------ ------ foo /bar """ self.assertEqual(output, expected_output)
def test_prints_android_environment_via_platform_helpers(self, mock_runner): mock_response = mock.Mock() mock_response.is_successful.return_value = True type(mock_response).data = mock.PropertyMock(return_value={'foo': '/bar'}) mock_runner.return_value.get_last_message.return_value = mock_response with capture(_get_android_environment) as o: output = o expected_output = """ Name Path ------ ------ foo /bar """ self.assertEqual(output, expected_output)
def test_get_ssh_args_instance_name(env_mock, exit_mock): with patch('os.getcwd') as cwd_mock: cwd_mock.return_value = '/path/to/cwd' project = MagicMock() key_path_mock = PropertyMock(return_value='/path/to/key.pem') type(project).key_path = key_path_mock env_mock.is_initialized.return_value = True env_mock.return_value.find_project.return_value = project instance = MagicMock() ip_mock = PropertyMock(return_value='0.0.0.0') type(instance).ip = ip_mock instance.get_user_name.return_value = 'test_user' project.get_instance.return_value = instance args = cli.get_ssh_args(['fooinst']) env_mock.return_value.find_project.assert_called_with('/path/to/cwd') project.get_instance.assert_called_with('fooinst') key_path_mock.assert_called_with() instance.get_user_name.assert_called_with() ip_mock.assert_called_with() assert len(args) == 3 assert args[0] == '/path/to/key.pem' assert args[1] == 'test_user' assert args[2] == '0.0.0.0'
def test_get_access_key(self, mock_session): credentials = MagicMock() access_key = PropertyMock(return_value='test') mock_session.return_value.get_credentials.return_value = credentials type(credentials).access_key = access_key access_key = AwsClient().get_access_key() self.assertEqual(access_key, 'test')
def test_add_lambda_permissions(self, mock_lambda_client, mock_config, mock_uuid): type(mock_config).lambda_name = PropertyMock(return_value='test_name') mock_uuid.return_value = 'test_uuid' AwsClient().add_lambda_permissions('test_bucket') self.assertEqual(mock_lambda_client.call_count, 1) self.assertTrue(call().add_permission(Action='lambda:InvokeFunction', FunctionName='test_name', Principal='s3.amazonaws.com', SourceArn='arn:aws:s3:::test_bucket', StatementId='test_uuid') in mock_lambda_client.mock_calls)
def setup_method(self, method): # Closed PR already tested in TestHook.test_invalid() pr_json = {'state': 'open', 'user': {'login': 'user'}, 'head': {'repo': {'full_name': 'repo'}, 'ref': 'branch'}} self.patch_repo_config = patch.object(RepoHandler, 'get_config_value') self.patch_pr_json = patch.object( PullRequestHandler, 'json', new_callable=PropertyMock, return_value=pr_json) self.patch_pr_comments = patch.object( PullRequestHandler, 'find_comments', return_value=[]) self.patch_pr_labels = patch.object( PullRequestHandler, 'labels', new_callable=PropertyMock) self.patch_submit_comment = patch.object( PullRequestHandler, 'submit_comment', return_value='url') self.patch_set_status = patch.object(PullRequestHandler, 'set_status') self.patch_check_changelog = patch('changebot.blueprints.pull_request_checker.check_changelog_consistency') self.changelog_cfg = self.patch_repo_config.start() self.patch_pr_json.start() self.comment_ids = self.patch_pr_comments.start() self.labels = self.patch_pr_labels.start() self.submit_comment = self.patch_submit_comment.start() self.set_status = self.patch_set_status.start() self.issues = self.patch_check_changelog.start()
def test_is_closed(self, state, answer): with patch('changebot.github.github_api.IssueHandler.json', new_callable=PropertyMock) as mock_json: # noqa mock_json.return_value = {'state': state} assert self.issue.is_closed is answer
def test_company_profile_edit_condition_address_feature_flag_off( letter_sent, preverified, expected, settings, sso_request ): settings.FEATURE_COMPANIES_HOUSE_OAUTH2_ENABLED = False view = views.CompanyProfileEditView() view.request = sso_request property_mock = PropertyMock(return_value={ 'is_verification_letter_sent': letter_sent, 'verified_with_preverified_enrolment': preverified, }) with patch.object(view, 'company_profile', new_callable=property_mock): assert view.condition_show_address() is expected
def mock_snowboy(request): path = ( 'command_lifecycle.wakeword.SnowboyWakewordDetector.' 'wakeword_library_import_path' ) stub = patch(path, PropertyMock(return_value='unittest.mock.Mock')) yield stub.start() stub.stop()
def build_request(self, method_return_value, FILES_return_value): request = MagicMock() method = PropertyMock(return_value=method_return_value) type(request).method = method FILES = PropertyMock(return_value=FILES_return_value) type(request).FILES = FILES return request, method, FILES
def test_get_service_input(self): form = MagicMock() cleaned_data = PropertyMock(return_value={}) type(form).cleaned_data = cleaned_data view = MockView() rv = view.get_service_input(form) cleaned_data.assert_called_once_with() self.assertEqual({}, rv)
def test_old_tracer_correlation( self ): s = self.simulation s.atoms = Mock() s.atoms.sum_dr_squared = PropertyMock( return_value=10.0 ) s.number_of_jumps = 5 self.assertEqual( s.old_tracer_correlation, 2.0 )
def test_collective_diffusion_coefficient_per_atom( self ): s = self.simulation with patch( 'lattice_mc.simulation.Simulation.collective_diffusion_coefficient', new_callable=PropertyMock ) as mock_cdc: mock_cdc.return_value = 8.0 s.number_of_atoms = 2 self.assertEqual( s.collective_diffusion_coefficient_per_atom, 4.0 )
def test_worker_thread_transfer(): s = ops.SyncCopy(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) s._transfer_queue.put(mock.MagicMock()) s._transfer_queue.put(mock.MagicMock()) s._process_synccopy_descriptor = mock.MagicMock() s._process_synccopy_descriptor.side_effect = [None, Exception()] with mock.patch( 'blobxfer.operations.synccopy.SyncCopy.termination_check', new_callable=mock.PropertyMock) as patched_tc: patched_tc.side_effect = [False, False, True] s._worker_thread_transfer() assert s._process_synccopy_descriptor.call_count == 2 assert len(s._exceptions) == 1
def test_check_for_uploads_from_md5(): u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) u._md5_offload = mock.MagicMock() u._post_md5_skip_on_check = mock.MagicMock() with mock.patch( 'blobxfer.operations.upload.Uploader.termination_check_md5', new_callable=mock.PropertyMock) as patched_tcm: patched_tcm.side_effect = [False, False, False, True, True] u._md5_offload.pop_done_queue.side_effect = [ None, mock.MagicMock(), None ] u._check_for_uploads_from_md5() assert u._post_md5_skip_on_check.call_count == 1
def test_worker_thread_disk(): with mock.patch( 'blobxfer.operations.download.Downloader.termination_check', new_callable=mock.PropertyMock) as patched_tc: d = ops.Downloader( mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) d._general_options.concurrency.disk_threads = 1 d._disk_queue = mock.MagicMock() d._disk_queue.get.side_effect = [ (mock.MagicMock(), mock.MagicMock(), mock.MagicMock()), ] d._process_data = mock.MagicMock() patched_tc.side_effect = [False, True] d._worker_thread_disk() assert d._process_data.call_count == 1 with mock.patch( 'blobxfer.operations.download.Downloader.termination_check', new_callable=mock.PropertyMock) as patched_tc: d = ops.Downloader( mock.MagicMock(), mock.MagicMock(), mock.MagicMock()) d._general_options.concurrency.disk_threads = 1 d._disk_queue = mock.MagicMock() d._disk_queue.get.side_effect = [ (mock.MagicMock(), mock.MagicMock(), mock.MagicMock()), ] d._process_data = mock.MagicMock() d._process_data.side_effect = Exception() patched_tc.side_effect = [False, True] d._worker_thread_disk() assert len(d._exceptions) == 1
def test_events_by_id_multi_page(self, mocker, page1_response, page2_response): swf_mock = SwfMock() swf_mock.pages['page2'] = page2_response mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=swf_mock) h = floto.History(domain='d', task_list='tl', response=page1_response) assert h.lowest_event_id == 2 assert h.get_event(1) assert h.lowest_event_id == 1