我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用unittest.mock.ANY。
def test_iam_role_policy(resource_action, get_template, get_properties, get_details, construct_policy, session, attach_profile_to_role): """IAM Role Policy should match deployment type.""" get_properties.return_value = {'type': 'ec2'} get_details.return_value.iam.return_value = {'group': 1, 'policy': 2, 'profile': 3, 'role': 4, 'user': 5} assert create_iam_resources() get_template.assert_called_with(EC2_TEMPLATE_NAME) calls = [ mock.call( mock.ANY, action='create_role', log_format=mock.ANY, RoleName=mock.ANY, AssumeRolePolicyDocument=get_template.return_value) ] resource_action.assert_has_calls(calls)
def test_create_kernel_url(): mock_resp = asynctest.MagicMock(spec=aiohttp.ClientResponse) mock_resp.status = 201 mock_resp.json = asynctest.MagicMock() mock_req_obj = asynctest.MagicMock(spec=Request) mock_req_obj.asend.return_value = mock_resp with asynctest.patch('ai.backend.client.kernel.Request', return_value=mock_req_obj) as mock_req_cls: await Kernel.get_or_create('python') mock_req_cls.assert_called_once_with( 'POST', '/kernel/create', mock.ANY) mock_req_obj.asend.assert_called_once_with() mock_req_obj.asend.return_value.json.assert_called_once_with()
def test_run_event_source(self, mock_aws_client): mock_aws_client.return_value.get_s3_file_list.return_value = ['test_file_1','test_file_2','test_file_3','test_file_4'] args = Args() args.verbose = False args.event_source = 'test_bucket' Scar().run(args) self.assertEqual(mock_aws_client.call_count, 1) # check_function_name_not_exists mock_aws_client.mock_calls[1].assert_called_with('test-name', False) # get_s3_file_list mock_aws_client.mock_calls[2].assert_called_with('test_bucket') # launch_request_response_event mock_aws_client.mock_calls[3].assert_called_with('test_file_1', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY) # launch_async_event mock_aws_client.mock_calls[4].assert_called_with('test_file_2', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY) # launch_async_event mock_aws_client.mock_calls[5].assert_called_with('test_file_3', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY) # launch_async_event mock_aws_client.mock_calls[6].assert_called_with('test_file_4', {'Records': [{'eventSource': 'aws:s3', 's3': {'bucket': {'name': 'test_bucket'}, 'object': {'key': ''}}}]}, ANY, ANY)
def test_callback_with_exception(self): def callback(): raise ValueError() self.loop = mock.Mock() self.loop.call_exception_handler = mock.Mock() h = asyncio.Handle(callback, (), self.loop) h._run() self.loop.call_exception_handler.assert_called_with({ 'message': test_utils.MockPattern('Exception in callback.*'), 'exception': mock.ANY, 'handle': h, 'source_traceback': h._source_traceback, })
def test_rabbitmq_worker_callback_method_with_invalid_serialization_method(self, getLogger, import_string): assert isinstance(import_string, mock.MagicMock) # Arrange queue_name = "queue_name" rabbitmq_configs = {} serialization_method = "invalid" result_backend = { "result_backend_class": "easy_job.result_backends.dummy.DummyBackend", } logger = getLogger.return_value = mock.MagicMock() # Act rbt = RabbitMQWorker(queue_name=queue_name, rabbitmq_configs=rabbitmq_configs, serialization_method=serialization_method, result_backend=result_backend, logger="log") rbt.callback(mock.MagicMock(), mock.MagicMock(), None, "") # Assert logger.log.assert_called_once_with(logging.ERROR, mock.ANY)
def test_case_study_create_api_success( mock_create_case_study, supplier_case_study_end_to_end, sso_user, all_case_study_data, api_response_200 ): mock_create_case_study.return_value = api_response_200 response = supplier_case_study_end_to_end() assert response.status_code == http.client.FOUND assert response.get('Location') == reverse('company-detail') data = { **all_case_study_data, 'image_one': ANY, 'image_two': ANY, 'image_three': ANY, } # django converts uploaded files to UploadedFile, which makes # `assert_called_once_with` tricky. assert mock_create_case_study.call_count == 1 assert mock_create_case_study.call_args == call( data=data, sso_session_id=sso_user.session_id, )
def test_case_study_update_api_success( mock_update_case_study, supplier_case_study_end_to_end, sso_user, all_case_study_data, api_response_200 ): mock_update_case_study.return_value = api_response_200 response = supplier_case_study_end_to_end(case_study_id='1') assert response.status_code == http.client.FOUND assert response.get('Location') == reverse('company-detail') # django converts uploaded files to UploadedFile, which makes # `assert_called_once_with` tricky. data = { **all_case_study_data, 'image_one': ANY, 'image_two': ANY, 'image_three': ANY, } mock_update_case_study.assert_called_once_with( data=data, case_study_id='1', sso_session_id=sso_user.session_id, )
def test_generate_endpoint_parser_noparam(addargument): """Generate a parser from endpoint metadata - no params""" name = 'put-stuff' metadata = { 'path': 'stuff', 'method': 'PUT', 'help': "Changes stuff", 'params': {}, } parser = ArgumentParser() subparsers = parser.add_subparsers() generate_endpoint_parser(subparsers, name, metadata) addargument.assert_has_calls([ # first helper for the main parser mock.call('-h', '--help', action='help', default=mock.ANY, help=mock.ANY), # second helper for the 'put-stuff' subparser mock.call('-h', '--help', action='help', default=mock.ANY, help=mock.ANY) ])
def test_execute_without_failures(self, check_mock, wait_mock): client_mock = self.aws_hook_mock.return_value.get_client_type.return_value client_mock.run_task.return_value = RESPONSE_WITHOUT_FAILURES self.ecs.execute(None) self.aws_hook_mock.return_value.get_client_type.assert_called_once_with('ecs', region_name='eu-west-1') client_mock.run_task.assert_called_once_with( cluster='c', overrides={}, startedBy=mock.ANY, # Can by 'airflow' or 'Airflow' taskDefinition='t' ) wait_mock.assert_called_once_with() check_mock.assert_called_once_with() self.assertEqual(self.ecs.arn, 'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55')
def test_exec(self, gcs_hook, dataflow_mock): """Test DataFlowHook is created and the right args are passed to start_python_workflow. """ start_python_hook = dataflow_mock.return_value.start_python_dataflow gcs_download_hook = gcs_hook.return_value.google_cloud_to_local self.dataflow.execute(None) self.assertTrue(dataflow_mock.called) expected_options = { 'project': 'test', 'staging_location': 'gs://test/staging', 'output': 'gs://test/output' } gcs_download_hook.assert_called_once_with(PY_FILE) start_python_hook.assert_called_once_with(TASK_ID, expected_options, mock.ANY, PY_OPTIONS) self.assertTrue(self.dataflow.py_file.startswith('/tmp/dataflow'))
def test_exit_stack_exception_propagate(): h1 = mock.MagicMock() h2 = mock.MagicMock() v1 = mock.MagicMock() v2 = mock.MagicMock() error = ValueError('FUUU') with pytest.raises(ValueError) as exc: with ExitStack() as stack: v = stack.enter_context(AutoClose(h1, v=v1)) assert v is v1 v = stack.enter_context(AutoClose(h2, v=v2)) assert v is v2 raise error assert exc.value is error h2.close.assert_called_once_with(ValueError, error, mock.ANY) h1.close.assert_called_once_with(ValueError, error, mock.ANY)
def test_exit_stack_exception_propagate(): h1 = mock.MagicMock() h2 = mock.MagicMock() v1 = mock.MagicMock() v2 = mock.MagicMock() error = ValueError('FUUU') with pytest.raises(ValueError) as exc: async with AsyncExitStack() as stack: v = await stack.enter_context(AutoClose(h1, v=v1)) assert v is v1 v = await stack.enter_context(AutoClose(h2, v=v2)) assert v is v2 raise error assert exc.value is error h2.close.assert_called_once_with(ValueError, error, mock.ANY) h1.close.assert_called_once_with(ValueError, error, mock.ANY)
def test_Region_sync_custom_image_already_synced(tmpdir): """Test Region.sync_custom performs create and handles when its already synced.""" region = make_Region() image_path = tmpdir.join("image.tar.gz") image_path.write(b"data") region.sync_custom("image", { "path": str(image_path), "architecture": "amd64/generic", "title": "My Title", }) assert call( "custom/image", "amd64/generic", ANY, title="My Title", filetype=BootResourceFileType.TGZ, progress_callback=ANY) == region.origin.BootResources.create.call_args assert ( call("custom/image already in sync", level=MessageLevel.SUCCESS) == region.print_msg.call_args)
def test_ingest(self, mock_process_pool, mock_chunk_writer, ): # The next three lines mock the ProcessPoolExecutor and it's map # function. executor_mock = mock.Mock() executor_mock.map.return_value = [] mock_process_pool.return_value.__enter__.return_value = executor_mock self.gulp_ingestor.adapter.__len__.return_value = 2 self.gulp_ingestor() mock_chunk_writer.assert_called_once_with(self.adapter) executor_mock.map.assert_called_once_with( mock_chunk_writer.return_value.write_chunk, mock.ANY, [slice(0, 1), slice(1, 2)], )
def test_swagger_minimal(app): """ Test a swagger config for a minimal setup. """ app.config['OPENAPI_INFO_VERSION'] = '1.2.3' openapi = OpenAPI(app) assert openapi.swagger == { 'swagger': '2.0', 'info': { 'title': 'test_swagger_minimal', 'version': '1.2.3' }, 'paths': ANY, 'schemes': ['http'], }
def test_swagger_full(app): """ Test a swagger config for a fully configured setup. """ app.config.update( SERVER_NAME='api.example.com', OPENAPI_SHOW_HOST=True, OPENAPI_INFO_VERSION='1.2.3' ) openapi = OpenAPI(app) assert openapi.swagger == { 'swagger': '2.0', 'info': { 'title': 'test_swagger_full', 'version': '1.2.3' }, 'paths': ANY, 'host': 'api.example.com', 'schemes': ['http'] }
def testCwdWithRelativeScriptPath(self, isdirMock, existsMock, accessMock, subprocessMock): """ If a step has a cwd set and its script is a relative path, the path of the executed script that is executed must be as specified (not converted to an absolute path). """ subprocessMock.return_value = '' sp = SlurmPipeline( { 'steps': [ { 'cwd': '/tmp', 'name': 'name1', 'script': 'script1', }, ], }) sp.schedule() subprocessMock.assert_has_calls([ call(['script1'], cwd='/tmp', universal_newlines=True, stdin=DEVNULL, env=ANY), ])
def testForce(self, existsMock, accessMock, subprocessMock): """ If force=True is given to SlurmPipeline, SP_FORCE must be set to '1' in the step execution environment. """ subprocessMock.return_value = '' sp = SlurmPipeline( { 'steps': [ { 'name': 'name1', 'script': 'script1', }, ], }) sp.schedule(force=True) subprocessMock.assert_has_calls([ call(['script1'], cwd='.', universal_newlines=True, stdin=DEVNULL, env=ANY), ]) env = subprocessMock.mock_calls[0][2]['env'] self.assertEqual('1', env['SP_FORCE'])
def testDefaultNice(self, existsMock, accessMock, subprocessMock): """ If no nice value is given to schedule, SP_NICE_ARG must be set to '--nice' in the step execution environment. """ subprocessMock.return_value = '' sp = SlurmPipeline( { 'steps': [ { 'name': 'name1', 'script': 'script1', }, ], }) sp.schedule() subprocessMock.assert_has_calls([ call(['script1'], cwd='.', universal_newlines=True, stdin=DEVNULL, env=ANY), ]) env = subprocessMock.mock_calls[0][2]['env'] self.assertEqual('--nice', env['SP_NICE_ARG'])
def testSpecificNice(self, existsMock, accessMock, subprocessMock): """ If a specific nice value is given to schedule, SP_NICE_ARG must be set to the expected value in the step execution environment. """ subprocessMock.return_value = '' sp = SlurmPipeline( { 'steps': [ { 'name': 'name1', 'script': 'script1', }, ], }) sp.schedule(nice=40) subprocessMock.assert_has_calls([ call(['script1'], cwd='.', universal_newlines=True, stdin=DEVNULL, env=ANY), ]) env = subprocessMock.mock_calls[0][2]['env'] self.assertEqual('--nice 40', env['SP_NICE_ARG'])
def test_simple(self): sdm = MagicMock() sdm.classical = {"A": 0, "B": 1} sdm.apply_ptm = MagicMock() c = circuit.Circuit() c.add_gate("hadamard", "A", time=0, conditional_bit="B") c.apply_to(sdm) sdm.apply_ptm.assert_called_once_with("A", ptm=ANY) sdm.ensure_classical.assert_called_once_with("B") sdm = MagicMock() sdm.classical = {"A": 0, "B": 0} sdm.hadamard = MagicMock() c.apply_to(sdm) sdm.apply_ptm.assert_not_called() sdm.ensure_classical.assert_called_once_with("B")
def test_temporarysshkey_remove_failure(self): """ Verify TemporarySSHKey.remove reacts properly to failure. """ mock_logger = mock.MagicMock(logging.Logger('test')) key = TemporarySSHKey(TEST_HOST_CREDS, mock_logger) key.create() with mock.patch('os.unlink') as _unlink: _unlink.side_effect = Exception self.assertTrue(os.path.isfile(key.path)) key.remove() self.assertTrue(os.path.isfile(key.path)) # We should have a warning in the log mock_logger.warn.assert_called_once_with( mock.ANY, mock.ANY, mock.ANY) # Clean up the file key.remove()
def test_render(self): """Test that the flow cell delete POST works""" # Check precondition self.assertEqual(FlowCell.objects.all().count(), 1) # Simulate the POST with self.login(self.user): response = self.client.post( reverse('flowcell_delete', kwargs={'pk': self.flow_cell.pk})) # Check resulting database state self.assertEqual(FlowCell.objects.all().count(), 0) # Check call to sending emails self.email_mock.assert_called_once_with(self.user, ANY) m1 = model_to_dict(self.arg_flowcell) del m1['id'] m2 = model_to_dict(self.flow_cell) del m2['id'] self.assertEqual(m1, m2) # Check resulting response with self.login(self.user): self.assertRedirects( response, reverse('flowcell_list'))
def test_create_reader_instances_with_filenames(self): import satpy.scene filenames = ["bla", "foo", "bar"] sensors = None reader_name = None with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'): with mock.patch('satpy.scene.ReaderFinder') as findermock: scene = satpy.scene.Scene(filenames=filenames) findermock.assert_called_once_with(ppp_config_dir=mock.ANY, base_dir=None, area=None, end_time=None, start_time=None) findermock.return_value.assert_called_once_with( reader=reader_name, sensor=set(), filenames=filenames, reader_kwargs=None, metadata={} )
def test_create_reader_instances_with_sensor(self): import satpy.scene sensors = ["bla", "foo", "bar"] filenames = None reader_name = None with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'): with mock.patch('satpy.scene.ReaderFinder') as findermock: scene = satpy.scene.Scene(sensor=sensors) findermock.assert_called_once_with(ppp_config_dir=mock.ANY, base_dir=None, area=None, end_time=None, start_time=None) findermock.return_value.assert_called_once_with( reader=reader_name, sensor=sensors, filenames=filenames, reader_kwargs=None, metadata={} )
def test_create_reader_instances_with_sensor_and_filenames(self): import satpy.scene sensors = ["bla", "foo", "bar"] filenames = ["1", "2", "3"] reader_name = None with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'): with mock.patch('satpy.scene.ReaderFinder') as findermock: scene = satpy.scene.Scene(sensor=sensors, filenames=filenames) findermock.assert_called_once_with(ppp_config_dir=mock.ANY, base_dir=None, area=None, end_time=None, start_time=None) findermock.return_value.assert_called_once_with( reader=reader_name, sensor=sensors, filenames=filenames, reader_kwargs=None, metadata={} )
def test_create_reader_instances_with_reader(self): from satpy.scene import Scene reader = "foo" filenames = ["1", "2", "3"] sensors = set() with mock.patch('satpy.scene.Scene._compute_metadata_from_readers'): with mock.patch('satpy.scene.ReaderFinder') as findermock: scene = Scene(reader=reader, filenames=filenames) findermock.assert_called_once_with(ppp_config_dir=mock.ANY, base_dir=None, area=None, end_time=None, start_time=None) findermock.return_value.assert_called_once_with( reader=reader, sensor=sensors, filenames=filenames, reader_kwargs=None, metadata={} )
def test_create_cluster_with_valid_network(self): """ Verify create_cluster uses valid networks as expected. """ bus = mock.MagicMock() cluster = Cluster.new(name='test', network='test') # The cluster doesn't exist yet bus.storage.get_cluster.side_effect = Exception # Network response bus.storage.get_network.return_value = Network.new(name='test') # Creation of the cluster bus.storage.save.return_value = cluster # Call the handler... clusters.create_cluster.handler(copy.deepcopy(NETWORK_CLUSTER_REQUEST), bus) bus.storage.save.assert_called_with(mock.ANY)
def test_create_cluster_with_invalid_network(self): """ Verify create_cluster reacts to invalid networks as expected. """ bus = mock.MagicMock() cluster = Cluster.new(name='test', network='test') # The cluster doesn't exist yet bus.storage.get_cluster.side_effect = Exception # The network doesn't exist bus.storage.get_network.side_effect = Exception # The cluster creation bus.storage.save.return_value = cluster # Call the handler... clusters.create_cluster.handler(copy.deepcopy(NETWORK_CLUSTER_REQUEST), bus) # Update clusters network to be 'default' as we expect 'test' to be # rejected by the handler cluster.network = 'default' bus.storage.save.assert_called_with(mock.ANY)
def test_delete_cluster_member_with_container_manager(self): """ Verify that delete_cluster_member handles a container manager """ bus = mock.MagicMock() cluster = Cluster.new( name='test', hostset=['127.0.0.1'], container_manager=C.CONTAINER_MANAGER_OPENSHIFT) bus.storage.get_cluster.return_value = cluster bus.storage.save.return_value = None self.assertEquals( create_jsonrpc_response(ID, []), clusters.delete_cluster_member.handler(CHECK_CLUSTER_REQUEST, bus)) # Verify we had a 'container.remove_node' bus.request.assert_called_with('container.remove_node', params=mock.ANY)
def test_authentication_manager_multi_complex_deny(self): """ Verify AuthenticationManager handles the complex forbidden case with multiple authenticators. """ start_response = mock.MagicMock() response_code = '402 Payment Required' expected_result = [bytes('$$$', 'utf8')] def complex_auth(environ, start_response): start_response(response_code, []) return expected_result self.authentication_manager.authenticators = [ mock.MagicMock(authenticate=mock.MagicMock(return_value=False)), mock.MagicMock(authenticate=complex_auth), mock.MagicMock(authenticate=mock.MagicMock(return_value=False)), ] result = self.authentication_manager(create_environ(), start_response) self.assertEquals(expected_result, result) start_response.assert_called_once_with(response_code, mock.ANY)
def test_authentication_manager_multi_complex_allow(self): """ Verify AuthenticationManager handles the complex allow case with multiple authenticators. """ expected_result = [bytes('itrustyou', 'utf8')] def complex_auth(environ, start_response): start_response('200 OK', []) return expected_result start_response = mock.MagicMock() self.authentication_manager.authenticators = [ mock.MagicMock(authenticate=mock.MagicMock(return_value=False)), mock.MagicMock(authenticate=complex_auth), mock.MagicMock(authenticate=mock.MagicMock(return_value=False)), ] result = self.authentication_manager(create_environ(), start_response) self.assertEquals(expected_result, result) start_response.assert_called_once_with('200 OK', mock.ANY)
def test_lock_one_retry(self, lock_manager_redis_patched, locked_lock): lock_manager, redis = lock_manager_redis_patched redis.set_lock = CoroutineMock(side_effect=[ (False, 1), (True, 1) ]) lock = await lock_manager.lock('resource') calls = [ call('resource', ANY), call('resource', ANY) ] redis.set_lock.assert_has_calls(calls) assert lock.resource == 'resource' assert lock.id == ANY assert lock.valid is True
def test_lock_expire_retries(self, lock_manager_redis_patched, locked_lock): lock_manager, redis = lock_manager_redis_patched redis.set_lock = CoroutineMock(side_effect=[ (False, 1), (False, 1), (False, 1) ]) lock = await lock_manager.lock('resource') calls = [ call('resource', ANY), call('resource', ANY), call('resource', ANY) ] redis.set_lock.assert_has_calls(calls) assert lock.resource == 'resource' assert lock.id == ANY assert lock.valid is False
def test_lock_one_timeout(self, lock_manager_redis_patched, locked_lock): lock_manager, redis = lock_manager_redis_patched redis.set_lock = CoroutineMock(side_effect=[ (True, 1500), (True, 1) ]) lock = await lock_manager.lock('resource') calls = [ call('resource', ANY), call('resource', ANY) ] redis.set_lock.assert_has_calls(calls) assert lock.resource == 'resource' assert lock.id == ANY assert lock.valid is True
def test_lock_expire_retries_for_timeouts(self, lock_manager_redis_patched, locked_lock): lock_manager, redis = lock_manager_redis_patched redis.set_lock = CoroutineMock(side_effect=[ (True, 1100), (True, 1001), (True, 2000) ]) lock = await lock_manager.lock('resource') calls = [ call('resource', ANY), call('resource', ANY), call('resource', ANY) ] redis.set_lock.assert_has_calls(calls) assert lock.resource == 'resource' assert lock.id == ANY assert lock.valid is False
def test_000_simple(self, mock_backup, mock_getpass, mock_input): mock_getpass.return_value = 'testpass' mock_input.return_value = 'Y' vm1 = BackupVM() vm1.name = 'test-vm' vm1.backup_path = 'path/in/backup' vm1.template = None vm1.klass = 'StandaloneVM' vm1.label = 'red' mock_restore_info = { 1: BackupRestore.VMToRestore(vm1), } mock_backup.configure_mock(**{ 'return_value.get_restore_summary.return_value': '', 'return_value.get_restore_info.return_value': mock_restore_info, }) with mock.patch('qubesadmin.tools.qvm_backup_restore.handle_broken') \ as mock_handle_broken: qubesadmin.tools.qvm_backup_restore.main(['/some/path'], app=self.app) mock_handle_broken.assert_called_once_with( self.app, mock.ANY, mock_restore_info) mock_backup.assert_called_once_with( self.app, '/some/path', None, 'testpass') self.assertAllCalled()
def test_search_found(self, mock_requests_get): search_query = 'test' search_results = MalSearchResponseBuilder() search_results.add_result({'title': search_query}) mock_requests_get.return_value = mock.Mock( status_code=200, text=search_results.get_response_xml() ) results = self.mal.search(search_query) mock_requests_get.assert_called_with( ANY, params=dict(q=search_query), auth=ANY, headers=ANY ) self.assertTrue(len(results) == 1) first_result = results[0] self.assertTrue(first_result['title'] == search_query)
def test_search_found_more_than_one(self, mock_requests_get): search_query = 'test' search_results = MalSearchResponseBuilder() search_results.add_result({'title': 'test1'}) search_results.add_result({'title': 'test2'}) mock_requests_get.return_value = mock.Mock( status_code=200, text=search_results.get_response_xml() ) results = self.mal.search(search_query) mock_requests_get.assert_called_with( ANY, params=dict(q=search_query), auth=ANY, headers=ANY ) self.assertTrue(len(results) > 1) for result in results: self.assertTrue(search_query in result['title'])
def test_update_post(self, mock_requests_post): item_id = 1 entry = {'episode': 10} expected_xml = '<entry><episode>10</episode></entry>' expected_response_code = 200 xml_header='<?xml version="1.0" encoding="UTF-8"?>' mock_requests_post.return_value = mock.Mock( status_code=expected_response_code ) result = self.mal.update(item_id, entry) mock_requests_post.assert_called_with( 'https://myanimelist.net/api/animelist/update/{0}.xml'.format( item_id), data={'data': xml_header + expected_xml}, auth=(MOCK_USER, MOCK_PASS), headers=ANY ) self.assertTrue(result == expected_response_code)
def test_exception_non_waited_job(make_scheduler, loop): exc_handler = mock.Mock() scheduler = await make_scheduler(exception_handler=exc_handler) exc = RuntimeError() async def coro(): await asyncio.sleep(0, loop=loop) raise exc await scheduler.spawn(coro()) assert len(scheduler) == 1 await asyncio.sleep(0.05, loop=loop) assert len(scheduler) == 0 expect = {'exception': exc, 'job': mock.ANY, 'message': 'Job processing failed'} if loop.get_debug(): expect['source_traceback'] = mock.ANY exc_handler.assert_called_with(scheduler, expect)
def test_exception_on_close(make_scheduler, loop): exc_handler = mock.Mock() scheduler = await make_scheduler(exception_handler=exc_handler) exc = RuntimeError() fut = asyncio.Future() async def coro(): fut.set_result(None) raise exc await scheduler.spawn(coro()) assert len(scheduler) == 1 await scheduler.close() assert len(scheduler) == 0 expect = {'exception': exc, 'job': mock.ANY, 'message': 'Job processing failed'} if loop.get_debug(): expect['source_traceback'] = mock.ANY exc_handler.assert_called_with(scheduler, expect)
def test_timeout_on_closing(make_scheduler, loop): exc_handler = mock.Mock() scheduler = await make_scheduler(exception_handler=exc_handler, close_timeout=0.01) fut1 = asyncio.Future() fut2 = asyncio.Future() async def coro(): try: await fut1 except asyncio.CancelledError: await fut2 job = await scheduler.spawn(coro()) await asyncio.sleep(0.001, loop=loop) await scheduler.close() assert job.closed assert fut1.cancelled() expect = {'message': 'Job closing timed out', 'job': job, 'exception': mock.ANY} if loop.get_debug(): expect['source_traceback'] = mock.ANY exc_handler.assert_called_with(scheduler, expect)
def test_exception_on_closing(make_scheduler, loop): exc_handler = mock.Mock() scheduler = await make_scheduler(exception_handler=exc_handler) fut = asyncio.Future() exc = RuntimeError() async def coro(): fut.set_result(None) raise exc job = await scheduler.spawn(coro()) await fut await scheduler.close() assert job.closed expect = {'message': 'Job processing failed', 'job': job, 'exception': exc} if loop.get_debug(): expect['source_traceback'] = mock.ANY exc_handler.assert_called_with(scheduler, expect)
def test_serve_main_app_app_instance(tmpworkdir, loop, mocker): mktree(tmpworkdir, { 'app.py': """\ from aiohttp import web async def hello(request): return web.Response(text='<h1>hello world</h1>', content_type='text/html') app = web.Application() app.router.add_get('/', hello) """ }) asyncio.set_event_loop(loop) mocker.spy(loop, 'create_server') mock_modify_main_app = mocker.patch('aiohttp_devtools.runserver.serve.modify_main_app') loop.call_later(0.5, loop.stop) config = Config(app_path='app.py') serve_main_app(config, '/dev/tty') assert loop.is_closed() loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8000, backlog=128) mock_modify_main_app.assert_called_with(mock.ANY, config)
def test_data_preserves_translated_strings(db_connection): data.add_language('polish', 'pl') resource_pk = data.add_resource('r').pk string_pk = data.add_or_update_base_string(resource_pk, 'x', comment='comment', context='ctx') data.set_translated_string('pl', string_pk, translation='y', translator_comment='tcomment') preserved_strings = list(data.get_translated_strings('pl', resource_pk)) assert preserved_strings == [ dila.application.structures.TranslatedStringData( pk=mock.ANY, base_string='x', plural='', context='ctx', translation='y', comment='comment', translator_comment='tcomment', resource_pk=resource_pk, plural_translations=None, )]
def test_adding_the_same_sting(db_connection): data.add_language('polish', 'pl') resource_pk = data.add_resource('r').pk data.add_or_update_base_string(resource_pk, 'x', comment='comment', context='ctx') data.add_or_update_base_string(resource_pk, 'x', comment='lolz', context='ctx') preserved_strings = list(data.get_translated_strings('pl', resource_pk)) assert preserved_strings == [ dila.application.structures.TranslatedStringData( pk=mock.ANY, base_string='x', plural='', context='ctx', translation='', comment='lolz', translator_comment='', resource_pk=resource_pk, plural_translations=None, )]