我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用unittest.mock.patch.object()。
def test_checkout_exception(self): self.c.login(username=self.user.username, password='str0ngpa$$w0rd') # Test check out lendable raises exception with patch.object(Lendable, 'checkout', side_effect=Exception('Checkout Failed!')): response = self.c.get(reverse('library:checkout', args=['lendable']), follow=True) # Confirm error message displayed message = list(response.context['messages'])[0].message self.assertEqual(message, 'Checkout Failed!') # Confirm lendable not created self.assertEqual(Lendable.all_types.count(), 0)
def test_set_username(self): """Test set username method.""" # Test unicode converts to ascii J?hn to John with patch.object(AmazonAccountUtils, 'iam_user_exists', return_value=False): self.aws_account._set_username() self.assertEqual(self.aws_account.username, 'John') # Test random username generated if iam user exists with patch.object(AmazonAccountUtils, 'iam_user_exists', return_value=True): self.aws_account._set_username() self.assertNotEqual(self.aws_account.username, 'John') self.assertEqual(len(self.aws_account.username), 20) # Test random username is generated when regex invalid # after conversion. ??? converts to ???. self.user.username = '???' self.aws_account._set_username() self.assertNotEqual(self.aws_account.username, '???') self.assertEqual(len(self.aws_account.username), 20)
def setUp(self): # Cleanup self.addCleanup(patch.stopall) # Request patch self.request = patch.object(module, 'request').start() # Various patches self.services = patch.object(module, 'services').start() self.original_config = dict(module.config) module.config['STORAGE_BUCKET_NAME'] = self.bucket = 'buckbuck' module.config['STORAGE_ACCESS_KEY_ID'] = '' module.config['STORAGE_SECRET_ACCESS_KEY'] = '' module.config['ACCESS_KEY_EXPIRES_IN'] = '' module.config['STORAGE_PATH_PATTERN'] = '{owner}/{dataset}/{path}' self.s3 = boto3.client('s3')
def async_test_scarlett_os(loop): """Return a ScarlettOS object pointing at test config dir.""" # loop._thread_ident = threading.get_ident() ss = s.ScarlettSystem(loop) ss.config.location_name = 'test scarlett' ss.config.config_dir = get_test_config_dir() ss.config.latitude = 32.87336 ss.config.longitude = -117.22743 ss.config.elevation = 0 ss.config.time_zone = date_utility.get_time_zone('US/Pacific') ss.config.units = METRIC_SYSTEM ss.config.skip_pip = True # if 'custom_automations.test' not in loader.AVAILABLE_COMPONENTS: # yield from loop.run_in_executor(None, loader.prepare, ss) ss.state = s.CoreState.running return ss
def test_close_disabled(self): # Second case: time is beyond close deadline, and there is no comment yet # but the global option to allow closing has not been enabled. Since there # is no comment, the warning gets posted (rather than the 'epilogue') self.get_issues.return_value = ['123'] self.get_label_added_date.return_value = now() - 34443 self.find_comments.return_value = [] with app.app_context(): with patch.object(app, 'stale_issue_close', False): process_issues('repo', 'installation') assert self.submit_comment.call_count == 1 expected = ISSUE_CLOSE_WARNING.format(pasttime='9 hours ago', futuretime='5 hours') self.submit_comment.assert_called_with(expected) assert self.close.call_count == 0
def setup_method(self, method): self.patch_repo_config = patch.object(RepoHandler, 'get_config_value') self.patch_open_pull_requests = patch.object(RepoHandler, 'open_pull_requests') self.patch_labels = patch.object(PullRequestHandler, 'labels', new_callable=PropertyMock) self.patch_last_commit_date = patch.object(PullRequestHandler, 'last_commit_date', new_callable=PropertyMock) self.patch_find_comments = patch.object(PullRequestHandler, 'find_comments') self.patch_submit_comment = patch.object(PullRequestHandler, 'submit_comment') self.patch_close = patch.object(PullRequestHandler, 'close') self.autoclose_stale = self.patch_repo_config.start() self.open_pull_requests = self.patch_open_pull_requests.start() self.labels = self.patch_labels.start() self.last_commit_date = self.patch_last_commit_date.start() self.find_comments = self.patch_find_comments.start() self.submit_comment = self.patch_submit_comment.start() self.close = self.patch_close.start()
def test_close_disabled(self): # Time is beyond close deadline, and there is no comment yet but the # global option to allow closing has not been enabled. Since there is no # comment, the warning gets posted (rather than the 'epilogue') self.open_pull_requests.return_value = ['123'] self.last_commit_date.return_value = now() - 241 self.find_comments.return_value = [] with app.app_context(): with patch.object(app, 'stale_pull_requests_close', False): process_pull_requests('repo', 'installation') assert self.submit_comment.call_count == 1 expected = PULL_REQUESTS_CLOSE_WARNING.format(pasttime='4 minutes', futuretime='20 seconds') self.submit_comment.assert_called_with(expected) assert self.close.call_count == 0
def test_request_error_handler(api_path, _error_handler=True): client = api_path._client with patch.object(client, 'request', side_effect=dummy) as client_request: with patch.object(client, 'error_handler', side_effect=dummy_error_handler) as error_handler: client.loop.run_until_complete( requests.Request(api_path, 'get', _error_handling=_error_handler, test=1, _test=2) ) assert client_request.called_with(method='get', url=url, skip_params=False, test=2, params={'test': 1}) assert error_handler.called is _error_handler
def test_error_handler_service_unavailable(): global tries tries = 4 async def service_unavailable(**kwargs): global tries tries -= 1 if tries > 0: response = MockResponse(status=503) raise await exceptions.throw(response) else: return MockResponse() with pytest.raises(exceptions.ServiceUnavailable): with patch.object(asyncio, 'sleep', side_effect=dummy) as sleep: await utils.error_handler(service_unavailable)() assert sleep.called await utils.error_handler(service_unavailable)()
def test_streaming_apis(dummy_client): with patch.object(dummy_client, 'request', side_effect=dummy) as request: await dummy_client.api.test.get() assert request.called with patch.object(dummy_client, 'stream_request') as request: dummy_client.stream.test.get() assert request.called client = BasePeonyClient("", "", streaming_apis={'api'}) with patch.object(client, 'stream_request') as request: client.api.test.get() assert request.called with patch.object(client, 'request', side_effect=dummy) as request: await client.stream.test.get() assert request.called
def test_request_proxy(dummy_client): class RaiseProxy: def __init__(self, *args, proxy=None, **kwargs): raise RuntimeError(proxy) async with aiohttp.ClientSession() as session: with patch.object(session, 'request', side_effect=RaiseProxy): try: await dummy_client.request(method='get', url="http://hello.com", proxy="http://some.proxy.com", session=session, future=asyncio.Future()) except RuntimeError as e: assert str(e) == "http://some.proxy.com"
def test_chunked_upload_fail(dummy_peony_client, medias): media = medias['video'] media_data = await media.download() chunk_size = 1024**2 dummy_request = DummyRequest(dummy_peony_client, media, chunk_size, True) with patch.object(dummy_peony_client, 'request', side_effect=dummy_request): with patch.object(asyncio, 'sleep', side_effect=dummy) as sleep: with pytest.raises(peony.exceptions.MediaProcessingError): await dummy_peony_client.upload_media( media_data, chunk_size=chunk_size, chunked=True ) sleep.assert_called_with(5)
def test_upload_from_url(dummy_peony_client, medias, media_request, url): async def dummy_get(get_url): assert get_url == url return media_request async def dummy_request(url, method, future, data=None, skip_params=None): assert url == dummy_peony_client.upload.media.upload.url() assert method.lower() == 'post' assert data['media'] == media_request.content assert skip_params future.set_result(None) with patch.object(dummy_peony_client, '_session') as session: session.get = dummy_get with patch.object(dummy_peony_client, 'request', side_effect=dummy_request): await dummy_peony_client.upload_media(url)
def test_stream_reconnection_disconnection(stream): async def dummy(*args, **kwargs): pass turn = -1 with patch.object(stream, '_connect', side_effect=response_disconnection): with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy): async for data in stream: assert stream._state == DISCONNECTION turn += 1 if turn == 0: assert data == {'connected': True} elif turn % 2 == 1: timeout = DISCONNECTION_TIMEOUT * (turn + 1) / 2 if timeout > MAX_DISCONNECTION_TIMEOUT: actual = data['reconnecting_in'] assert actual == MAX_DISCONNECTION_TIMEOUT break assert data == {'reconnecting_in': timeout, 'error': None} else: assert data == {'stream_restart': True}
def test_stream_reconnection_reconnect(stream): async def dummy(*args, **kwargs): pass turn = -1 with patch.object(stream, '_connect', side_effect=response_reconnection): with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy): async for data in stream: assert stream._state == RECONNECTION turn += 1 if turn == 0: assert data == {'connected': True} elif turn % 2 == 1: timeout = RECONNECTION_TIMEOUT * 2**(turn // 2) if timeout > MAX_RECONNECTION_TIMEOUT: actual = data['reconnecting_in'] assert actual == MAX_RECONNECTION_TIMEOUT break assert data == {'reconnecting_in': timeout, 'error': None} else: assert data == {'stream_restart': True}
def test_stream_reconnection_enhance_your_calm(stream): async def dummy(*args, **kwargs): pass turn = -1 with patch.object(stream, '_connect', side_effect=response_calm): with patch.object(peony.stream.asyncio, 'sleep', side_effect=dummy): async for data in stream: assert stream._state == ENHANCE_YOUR_CALM turn += 1 if turn >= 100: break if turn == 0: assert data == {'connected': True} elif turn % 2 == 1: timeout = ENHANCE_YOUR_CALM_TIMEOUT * 2**(turn // 2) assert data == {'reconnecting_in': timeout, 'error': None} else: assert data == {'stream_restart': True}
def test_stream_cancel(event_loop): async def cancel(task): await asyncio.sleep(0.001) task.cancel() async def test_stream_iterations(stream): while True: await test_stream_iteration(stream) with aiohttp.ClientSession(loop=event_loop) as session: client = peony.client.BasePeonyClient("", "", session=session) context = peony.stream.StreamResponse(method='GET', url="http://whatever.com", client=client) with context as stream: with patch.object(stream, '_connect', side_effect=stream_content): coro = test_stream_iterations(stream) task = event_loop.create_task(coro) cancel_task = event_loop.create_task(cancel(task)) with aiohttp.Timeout(1): await asyncio.wait([task, cancel_task])
def post(self, _data=None, _headers=None, **kwargs): self.count += 1 if _data is not None: with patch.object(oauth.aiohttp.payload, 'BytesPayload', side_effect=dummy_func): assert _data._gen_form_urlencoded() == b"access_token=abc" if _headers is not None: key = "1234567890:0987654321" auth = base64.b64encode(key.encode('utf-8')).decode('utf-8') assert _headers['Authorization'] == 'Basic ' + auth # This is needed to run `test_oauth2_concurrent_refreshes` # without that, refresh tasks would be executed sequentially # In a sense it is a simulation of a request being fetched await asyncio.sleep(0.001) return {'access_token': "abc"}
def test_full_program_user_serialization_with__no_passed_course(self): """ Tests that full ProgramEnrollment serialization works as expected when user has no passed courses. """ with patch.object(MMTrack, 'count_courses_passed', return_value=0): self.profile.refresh_from_db() program = self.program_enrollment.program expected_result = { 'id': program.id, 'enrollments': self.serialized_enrollments, 'grade_average': 75, 'is_learner': True, 'num_courses_passed': 0, 'total_courses': 1 } serialized_enrollments = UserProgramSearchSerializer.serialize(self.program_enrollment) assert serialized_enrollments == expected_result
def test_create(self): """Integration that asserts routing a message to the create channel. Asserts response is correct and an object is created. """ json_content = self._send_and_consume("websocket.receive", self._build_message("testmodel", { 'action': 'create', 'pk': None, 'request_id': 'client-request-id', 'data': {'name': 'some-thing'} })) # it should create an object self.assertEqual(TestModel.objects.count(), 1) expected = { 'action': 'create', 'data': TestModelSerializer(TestModel.objects.first()).data, 'errors': [], 'request_id': 'client-request-id', 'response_status': 201 } # it should respond with the serializer.data self.assertEqual(json_content['payload'], expected)
def test_create_failure(self): """Integration that asserts error handling of a message to the create channel.""" json_content = self._send_and_consume('websocket.receive', self._build_message("testmodel", { 'action': 'create', 'pk': None, 'request_id': 'client-request-id', 'data': {}, })) # it should not create an object self.assertEqual(TestModel.objects.count(), 0) expected = { 'action': 'create', 'data': None, 'request_id': 'client-request-id', 'errors': [{'name': ['This field is required.']}], 'response_status': 400 } # it should respond with an error self.assertEqual(json_content['payload'], expected)
def test_bad_permission_reply(self): mock_has_perm = Mock() mock_has_perm.return_value = False with patch.object(TestModelResourceBinding, 'has_permission', mock_has_perm): json_content = self._send_and_consume('websocket.receive', self._build_message('testmodel',{ 'action': 'named_detail', 'pk': 546, 'data': {}, 'request_id': 'client-request-id' })) expected = { 'action': 'named_detail', 'errors': ['Permission Denied'], 'data': None, 'response_status': 401, 'request_id': 'client-request-id' } self.assertEqual(json_content['payload'], expected) self.assertEqual(mock_has_perm.called, True)
def test_is_authenticated_permission(self): with patch.object(TestModelResourceBinding, 'permission_classes', (IsAuthenticated,)): content = { 'action': 'test_list', 'pk': None, 'data': {}, 'request_id': 'client-request-id', } json_content = self._send_and_consume('websocket.receive', self._build_message('testmodel', content)) # It should block the request self.assertEqual(json_content['payload']['response_status'], 401) user = User.objects.create(username="testuser", password="123") self.client.force_login(user) self.client._session_cookie = True json_content = self._send_and_consume('websocket.receive', self._build_message('testmodel', content)) self.assertEqual(json_content['payload']['response_status'], 200)
def test_parent_inheritance(aiohttp_session): """Test the first api object creation.""" mock_kwargs = { 'arg1': 10, 'arg2': 20, } child_kwargs = { 'arg3': 30, } parent = base.BaseApiObject(None, request_kwargs=mock_kwargs, client_session=aiohttp_session) child = base.BaseApiObject(parent, request_kwargs=child_kwargs) assert child.client_session == aiohttp_session assert child.loop == aiohttp_session.loop assert child.request_kwargs == { 'arg1': 10, 'arg2': 20, 'arg3': 30, }
def test_result_list(session): """Test the result object.""" obj = base.ResultList(parent=session, item_class=MockDataObject, resp={ "_metadata": { "count": 2, "next": None, "previous": None, }, "results": [ { "attr1": "value1", "attr2": "value2", }, { "attr1": "value3" }], }) next_list = session.loop.run_until_complete(obj.get_next()) previous_list = session.loop.run_until_complete(obj.get_previous()) assert obj.next is None assert obj.previous is None assert next_list is None assert previous_list is None assert len(obj) == 2 assert sorted([item.attr1 for item in obj]) == sorted(["value1", "value3"])
def test_fetch_by_trigger(self): client = Client(self.api_url) contact_manager = EventManager(client) trigger_id = '1' trigger = Trigger(client, 'Name', ['tag'], ['target'], 0, 1, id=trigger_id) with patch.object(client, 'get', return_value={'list': []}) as get_mock: contact_manager.fetch_by_trigger(trigger) self.assertTrue(get_mock.called) expected_request_data = { 'p': 0, 'size': MAX_FETCH_LIMIT } get_mock.assert_called_with('event/' + trigger_id, params=expected_request_data)
def test_fetch_by_id(self): client = Client(self.api_url) trigger_manager = TriggerManager(client) trigger_id = '1' state = { 'state': 'OK', 'trigger_id': trigger_id } trigger = { 'id': trigger_id, 'name': 'trigger_name', 'tags': ['tag'], 'targets': ['pattern'], 'warn_value': 0, 'error_value': 1 } with patch.object(client, 'get', side_effect=[state, trigger]) as get_mock: trigger = trigger_manager.fetch_by_id(trigger_id) self.assertTrue(get_mock.called) self.assertEqual(trigger_id, trigger.id)
def test_add_bad_response(self): client = Client(self.api_url) contact_manager = ContactManager(client) contact_value = '#channel' with patch.object(client, 'put', return_value={}) as put_mock, \ patch.object(client, 'get', return_value={'contacts': []}) as get_mock: with self.assertRaises(ResponseStructureError): contact_manager.add(contact_value, CONTACT_SLACK) self.assertTrue(put_mock.called) self.assertTrue(get_mock.called) expected_request_data = { 'value': contact_value, 'type': CONTACT_SLACK } put_mock.assert_called_with('contact', json=expected_request_data)
def test_get_invalid_response(self): def get(url, params, **kwargs): return FakeResponse() response = FakeResponse() with patch.object(requests, 'get', side_effects=get, return_value=response) as mock_get: test_path = 'test_path' client = Client(TEST_API_URL, AUTH_HEADERS) with self.assertRaises(InvalidJSONError): client.get(test_path) self.assertTrue(mock_get.called) expected_url_call = TEST_API_URL + '/' + test_path mock_get.assert_called_with(expected_url_call, headers=AUTH_HEADERS, auth=None)
def test_put_invalid_response(self): test_data = {'test': 'test'} def put(url, data, **kwargs): return FakeResponse() response = FakeResponse() with patch.object(requests, 'put', side_effects=put, return_value=response) as mock_put: test_path = 'test_path' client = Client(TEST_API_URL, AUTH_HEADERS) with self.assertRaises(InvalidJSONError): client.put(test_path, data=test_data) self.assertTrue(mock_put.called) expected_url_call = TEST_API_URL + '/' + test_path mock_put.assert_called_with(expected_url_call, data=test_data, headers=AUTH_HEADERS, auth=None)
def test_pprint_compact(self): with patch('sys.stdout', new_callable=StringIO) as mock_stdout: # - test compact pprint-ing with 80x25 terminal with patch.object(pythonrc.subprocess, 'check_output', return_value='25 80'): sys.displayhook(list(range(22))) self.assertIn('20, 21]', sys.stdout.getvalue()) sys.displayhook(list(range(23))) self.assertIn('21,\n 22]', sys.stdout.getvalue()) # - test compact pprint-ing with resized 100x25 terminal with patch.object(pythonrc.subprocess, 'check_output', return_value=('25 100')): sys.displayhook(list(range(23))) self.assertIn('21, 22]', sys.stdout.getvalue())
def test_completer(self): completer = self.pymp.improved_rlcompleter() rl = pythonrc.readline # - no leading characters with patch.object(rl, 'get_line_buffer', return_value='\t'): self.assertEqual(completer('\t', 0), ' ') # - keyword completion with patch.object(rl, 'get_line_buffer', return_value='imp\t'): self.assertEqual(completer('imp', 0), 'import ') # - module name completion with patch.object(rl, 'get_line_buffer', return_value='from '): self.assertIn(completer('th', 0), ('this', 'threading')) self.assertIn(completer('th', 1), ('this', 'threading')) # - pathname completion with patch.object(rl, 'get_line_buffer', return_value='./p'): self.assertEqual(completer('./py', 0), './pythonrc.py')
def test_remove_all_report_tpr_success(caplog): mock = MagicMock() mock.remove.return_value = 0 with patch('kubernetes.config.load_incluster_config', MagicMock(return_value=0)), \ patch('kubernetes.client.ExtensionsV1beta1Api', MagicMock(return_value=0)), \ patch.object(third_party.ThirdPartyResourceType, 'create', MagicMock(return_value=mock)): uninstall.remove_all_report() caplog_tuple = caplog.record_tuples assert caplog_tuple[-1][2] == "\"Reconcilereport\" for node \"{}\" " \ "removed.".format(os.getenv("NODE_NAME")) assert caplog_tuple[-3][2] == "\"Nodereport\" for node \"{}\" " \ "removed.".format(os.getenv("NODE_NAME"))
def test_remove_all_report_crd_success(caplog): mock = MagicMock() mock.remove.return_value = 0 with patch('kubernetes.config.load_incluster_config', MagicMock(return_value=0)), \ patch('kubernetes.client.ExtensionsV1beta1Api', MagicMock(return_value=0)), \ patch.object(custom_resource.CustomResourceDefinitionType, 'create', MagicMock(return_value=mock)): uninstall.remove_all_report() caplog_tuple = caplog.record_tuples assert caplog_tuple[-1][2] == "\"cmk-reconcilereport\" for node " \ "\"{}\" removed."\ .format(os.getenv("NODE_NAME")) assert caplog_tuple[-3][2] == "\"cmk-nodereport\" for node \"{}\" " \ "removed.".format(os.getenv("NODE_NAME"))
def test_remove_report_tpr_success(caplog): fake_tpr_report = MagicMock() fake_tpr_report.remove.return_value = 0 with patch('kubernetes.config.load_incluster_config', MagicMock(return_value=0)),\ patch('kubernetes.client.ExtensionsV1beta1Api', MagicMock(return_value=0)), \ patch.object(third_party.ThirdPartyResourceType, 'create', MagicMock(return_value=fake_tpr_report)): uninstall.remove_report_tpr("NodeReport") caplog_tuple = caplog.record_tuples assert caplog_tuple[-1][2] == "\"NodeReport\" for node \"{}\" " \ "removed.".format(os.getenv("NODE_NAME")) # Remove success due to not existing report
def test_remove_report_tpr_success2(caplog): fake_http_resp = FakeHTTPResponse(500, "{\"message\":\"fake message\"}", "{\"reason\":\"NotFound\"}") fake_api_exception = K8sApiException(http_resp=fake_http_resp) fake_tpr_report = MagicMock() fake_tpr_report.remove.side_effect = fake_api_exception with patch('kubernetes.config.load_incluster_config', MagicMock(return_value=0)),\ patch('kubernetes.client.ExtensionsV1beta1Api', MagicMock(return_value=0)), \ patch.object(third_party.ThirdPartyResourceType, 'create', MagicMock(return_value=fake_tpr_report)): uninstall.remove_report_tpr("NodeReport") caplog_tuple = caplog.record_tuples assert \ caplog_tuple[-2][2] == "\"NodeReport\" for node \"{}\" does" \ " not exist.".format(os.getenv("NODE_NAME")) assert \ caplog_tuple[-1][2] == "\"NodeReport\" for node \"{}\" " \ "removed.".format(os.getenv("NODE_NAME"))
def test_remove_report_tpr_failure(caplog): fake_http_resp = FakeHTTPResponse(500, "{\"message\":\"fake message\"}", "{\"reason\":\"WrongReason\"}") fake_api_exception = K8sApiException(http_resp=fake_http_resp) fake_tpr_report = MagicMock() fake_tpr_report.remove.side_effect = fake_api_exception with patch('kubernetes.config.load_incluster_config', MagicMock(return_value=0)),\ patch('kubernetes.client.ExtensionsV1beta1Api', MagicMock(return_value=0)), \ patch.object(third_party.ThirdPartyResourceType, 'create', MagicMock(return_value=fake_tpr_report)): with pytest.raises(SystemExit): uninstall.remove_report_tpr("NodeReport") caplog_tuple = caplog.record_tuples exp_err = "Aborting uninstall: " \ "Exception when removing third party resource \"NodeReport\"" exp_log_err = get_expected_log_error(exp_err, fake_http_resp) assert caplog_tuple[-1][2] == exp_log_err
def test_remove_report_crd_success2(caplog): fake_http_resp = FakeHTTPResponse(500, "{\"message\":\"fake message\"}", "{\"reason\":\"NotFound\"}") fake_api_exception = K8sApiException(http_resp=fake_http_resp) fake_crd_report = MagicMock() fake_crd_report.remove.side_effect = fake_api_exception with patch('kubernetes.config.load_incluster_config', MagicMock(return_value=0)),\ patch('kubernetes.client.ExtensionsV1beta1Api', MagicMock(return_value=0)), \ patch.object(custom_resource.CustomResourceDefinitionType, 'create', MagicMock(return_value=fake_crd_report)): uninstall.remove_report_crd("cmk-nodereport", ["cmk-nr"]) caplog_tuple = caplog.record_tuples assert \ caplog_tuple[-2][2] == "\"cmk-nodereport\" for node \"{}\" does "\ "not exist.".format(os.getenv("NODE_NAME")) assert \ caplog_tuple[-1][2] == "\"cmk-nodereport\" for node \"{}\" " \ "removed.".format(os.getenv("NODE_NAME"))
def test_remove_report_crd_failure(caplog): fake_http_resp = FakeHTTPResponse(500, "{\"message\":\"fake message\"}", "{\"reason\":\"WrongReason\"}") fake_api_exception = K8sApiException(http_resp=fake_http_resp) fake_crd_report = MagicMock() fake_crd_report.remove.side_effect = fake_api_exception with patch('kubernetes.config.load_incluster_config', MagicMock(return_value=0)),\ patch('kubernetes.client.ExtensionsV1beta1Api', MagicMock(return_value=0)), \ patch.object(custom_resource.CustomResourceDefinitionType, 'create', MagicMock(return_value=fake_crd_report)): with pytest.raises(SystemExit): uninstall.remove_report_crd("cmk-nodereport", ["cmk-nr"]) caplog_tuple = caplog.record_tuples exp_err = "Aborting uninstall: " \ "Exception when removing custom resource definition " \ "\"cmk-nodereport\"" exp_log_err = get_expected_log_error(exp_err, fake_http_resp) assert caplog_tuple[-1][2] == exp_log_err
def mock_crypto_passthrough(): count = 0 def mocked_urandom(size): nonlocal count count += 1 if size > 16: template = '<dummy-key-{:0>%s}>' % (size - 12) else: template = '{:0>%s}' % size return template.format(count).encode() with patch.object(RSAPublicKey, 'encrypt', new=lambda _, txt: txt), \ patch.object(RSAPublicKey, 'verify', new=lambda _, sign, txt: None), \ patch.object(RSAPrivateKey, 'decrypt', new=lambda _, txt: txt), \ patch.object(RSAPrivateKey, 'sign', new=lambda _, txt: '<mock-signature>'), \ patch.object(RSAPrivateKey, 'export', new=lambda _, pwd: ('<mock-exported-key with password %s>' % pwd).encode()), \ patch.object(AESKey, 'encrypt', new=lambda _, txt: txt), \ patch.object(AESKey, 'decrypt', new=lambda _, txt: txt), \ patch('parsec.crypto.urandom', new=mocked_urandom), \ patch('parsec.crypto.encrypt_with_password', new=lambda p, s, t: t), \ patch('parsec.crypto.decrypt_with_password', new=lambda p, s, c: c): yield
def test_pype_use_parent_context_with_swallow(mock_run_pipeline): """pype swallowing error in child pipeline.""" context = Context({ 'pype': { 'name': 'pipe name', 'pipeArg': 'argument here', 'useParentContext': True, 'skipParse': True, 'raiseError': False } }) logger = logging.getLogger('pypyr.steps.pype') with patch.object(logger, 'error') as mock_logger_error: pype.run_step(context) mock_run_pipeline.assert_called_once_with( pipeline_name='pipe name', pipeline_context_input='argument here', context=context, parse_input=False) mock_logger_error.assert_called_once_with( 'Something went wrong pyping pipe name. RuntimeError: whoops') # ------------------------ run_step --------------------------------------
def test_run_failure_step_group_swallows(): """Failure step group runner swallows errors.""" logger = pypyr.log.logger.get_logger('pypyr.stepsrunner') with patch('pypyr.stepsrunner.run_step_group') as mock_run_group: with patch.object(logger, 'error') as mock_logger_error: mock_run_group.side_effect = ContextError('arb error') pypyr.stepsrunner.run_failure_step_group( {'pipe': 'val'}, Context()) mock_logger_error.assert_any_call( "Failure handler also failed. Swallowing.") mock_run_group.assert_called_once_with(pipeline_definition={'pipe': 'val'}, step_group_name='on_failure', context=Context()) # ------------------------- run_failure_step_group----------------------------# # ------------------------- run_pipeline_step---------------------------------#
def test_get_current_version(self): sample_data = [ "## Unreleased\n", "---\n", "\n", "### New\n", "\n", "### Fixes\n", "\n", "### Breaks\n", "\n", "\n", "## 0.3.2 - (2017-06-09)\n", "---\n", ] with patch.object(ChangelogUtils, 'get_changelog_data', return_value=sample_data) as mock_read: CL = ChangelogUtils() result = CL.get_current_version() self.assertEqual(result, '0.3.2')
def test_get_changes(self): sample_data = [ "## Unreleased\n", "---\n", "\n", "### New\n", "* added feature x\n", "\n", "### Fixes\n", "* fixed bug 1\n", "\n", "### Breaks\n", "\n", "\n", "## 0.3.2 - (2017-06-09)\n", "---\n", ] with patch.object(ChangelogUtils, 'get_changelog_data', return_value=sample_data) as mock_read: CL = ChangelogUtils() result = CL.get_changes() self.assertTrue('new' in result) self.assertTrue('fix' in result)
def doctest_reset_del(): """Test that resetting doesn't cause errors in __del__ methods. In [2]: class A(object): ...: def __del__(self): ...: print str("Hi") ...: In [3]: a = A() In [4]: get_ipython().reset() Hi In [5]: 1+1 Out[5]: 2 """ # For some tests, it will be handy to organize them in a class with a common # setup that makes a temp file
def test_obj_del(self): """Test that object's __del__ methods are called on exit.""" if sys.platform == 'win32': try: import win32api except ImportError: raise SkipTest("Test requires pywin32") src = ("class A(object):\n" " def __del__(self):\n" " print 'object A deleted'\n" "a = A()\n") self.mktmp(py3compat.doctest_refactor_print(src)) if dec.module_not_available('sqlite3'): err = 'WARNING: IPython History requires SQLite, your history will not be saved\n' else: err = None tt.ipexec_validate(self.fname, 'object A deleted', err)
def test_tclass(self): mydir = os.path.dirname(__file__) tc = os.path.join(mydir, 'tclass') src = ("%%run '%s' C-first\n" "%%run '%s' C-second\n" "%%run '%s' C-third\n") % (tc, tc, tc) self.mktmp(src, '.ipy') out = """\ ARGV 1-: ['C-first'] ARGV 1-: ['C-second'] tclass.py: deleting object: C-first ARGV 1-: ['C-third'] tclass.py: deleting object: C-second tclass.py: deleting object: C-third """ if dec.module_not_available('sqlite3'): err = 'WARNING: IPython History requires SQLite, your history will not be saved\n' else: err = None tt.ipexec_validate(self.fname, out, err)
def test_get_content_ids__returns_cached_ids_if_enough_in_cache(self, mock_queryset): stream = FollowedStream(user=self.user) stream.paginate_by = 1 with patch.object(stream, "get_queryset") as mock_queryset, \ patch.object(stream, "get_cached_content_ids") as mock_cached: mock_cached.return_value = [self.content1.id], {self.content1.id: self.content1.id} stream.get_content_ids() self.assertEqual(mock_queryset.call_count, 0)
def test_get_content_ids__uses_cached_ids(self): with patch.object(self.stream, "get_cached_content_ids", return_value=([], {})) as mock_cached: self.stream.get_content_ids() mock_cached.assert_called_once_with()