我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用testfixtures.LogCapture()。
def test_missing_variable(self): """Test if ``WriteTensorBoard`` handles missing image variables as expected.""" bad_epoch_data = {'valid': {}} with mock.patch.dict('sys.modules', **{'cv2': cv2_mock}): # test ignore hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'], on_missing_variable='ignore') with LogCapture(level=logging.INFO) as log_capture: hook.after_epoch(42, bad_epoch_data) log_capture.check() # test warn warn_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'], on_missing_variable='warn') with LogCapture(level=logging.INFO) as log_capture2: warn_hook.after_epoch(42, bad_epoch_data) log_capture2.check(('root', 'WARNING', '`plot` not found in epoch data.')) # test error raise_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'], on_missing_variable='error') with self.assertRaises(KeyError): raise_hook.after_epoch(42, bad_epoch_data)
def test_two_social(self): """ get_social_username should return None if there are two social edX accounts for a user """ UserSocialAuthFactory.create(user=self.user, uid='other name') with LogCapture() as log_capture: assert get_social_username(self.user) is None log_capture.check( ( 'profiles.api', 'ERROR', 'Unexpected error retrieving social auth username: get() returned more than ' 'one UserSocialAuth -- it returned 2!' ) )
def test__store_indices_db(self, mock_db, mock_event, mock_pd, mock_classify, mock_coOc, mock_pre_process, mock_config): mock_index = Mock(return_value=True) mock_indice = MagicMock() mock_indice.return_value = True mock_indice.__len__.return_value = 40001 mock_config.return_value = 0 mock_db.store_indices.return_value = False w = Workers() from testfixtures import LogCapture with LogCapture() as l: w._store_indices_db(mock_index, mock_indice) assert (l.__sizeof__()) > 0 assert mock_indice.append.called assert mock_db.store_indices.called assert mock_indice.clear.called
def test__store_info_db(self, mock_event, mock_pd, mock_classify, mock_coOc, mock_pre_process, mock_config): mock_config.return_value = 0 mock_digests = Mock() mock_itm = Mock() mock_list = MagicMock() mock_list.__len__.return_value = 1 mock_item = [mock_itm, mock_list] util_func = Mock() util_func.__name__ = 'mocked_util_func' w = Workers() from testfixtures import LogCapture with LogCapture() as l: w._store_info_db(mock_digests, mock_item, util_func) assert (l.__sizeof__()) > 0 assert mock_list.append.called assert mock_list.clear.called
def test_bad_message_is_logged(self, config, trivial_message): responses.add(responses.POST, "https://api.github.com/repos/tdsmith/test_repo/hooks") repo_listener = snooze.RepositoryListener( events=snooze.LISTEN_EVENTS, **config["tdsmith/test_repo"]) sqs = boto3.resource("sqs", region_name="us-west-2") sqs_queue = list(sqs.queues.all())[0] sqs_queue.send_message(MessageBody="this isn't a json message at all") with LogCapture() as l: repo_listener.poll() assert 'ERROR' in str(l) def my_callback(event, message): raise ValueError("I object!") sqs_queue.send_message(MessageBody=trivial_message) repo_listener.register_callback(my_callback) with LogCapture() as l: repo_listener.poll() assert 'I object!' in str(l)
def test_upload_progress_logging(self, mock_getsize, mock_files): mock_files.return_value = { 'file%s' % i: 20 for i in range(20) } mock_getsize.return_value = 20 s3_p = S3Path('s3://bucket') with LogCapture('stor.s3.progress') as progress_log: s3_p.upload(['upload']) progress_log.check( ('stor.s3.progress', 'INFO', 'starting upload of 20 objects'), # nopep8 ('stor.s3.progress', 'INFO', '10/20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8 ('stor.s3.progress', 'INFO', '20/20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8 ('stor.s3.progress', 'INFO', 'upload complete - 20/20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8 )
def test_download_progress_logging(self, mock_list, mock_getsize, mock_make_dest_dir): mock_list.return_value = [ S3Path('s3://bucket/file%s' % i) for i in range(19) ] + [S3Path('s3://bucket/dir')] mock_getsize.return_value = 100 s3_p = S3Path('s3://bucket') with LogCapture('stor.s3.progress') as progress_log: s3_p.download('output_dir') progress_log.check( ('stor.s3.progress', 'INFO', 'starting download of 20 objects'), # nopep8 ('stor.s3.progress', 'INFO', '10/20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8 ('stor.s3.progress', 'INFO', '20/20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8 ('stor.s3.progress', 'INFO', 'download complete - 20/20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8 )
def test_progress_logging(self): self.mock_swift.download.return_value = [ { 'action': 'download_object', 'read_length': 100 } for i in range(20) ] self.mock_swift.download.return_value.append({'action': 'random_action'}) swift_p = SwiftPath('swift://tenant/container') with LogCapture('stor.swift.progress') as progress_log: swift_p.download('output_dir') progress_log.check( ('stor.swift.progress', 'INFO', 'starting download'), ('stor.swift.progress', 'INFO', '10\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8 ('stor.swift.progress', 'INFO', '20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8 ('stor.swift.progress', 'INFO', 'download complete - 20\t0:00:00\t0.00 MB\t0.00 MB/s'), # nopep8 )
def test_complex_signature_py2(self): with LogCapture() as l: @trace_call(self.logger) def foo(a, b, c, d, e, g='G', h='H', i='ii', j='jj', *varargs_, **varkwargs_): pass foo('a', 'b', *['c', 'd'], e='E', Z='Z', **{'g': 'g', 'h': 'h'}) l.check( ( 'test.v0_1.test_base', 'DEBUG', "calling %sfoo(a='a', b='b', c='c', d='d', e='E', " "g='g', h='h', varkwargs_={'Z': 'Z'}, " "i='ii', j='jj', varargs_=<class '%s._empty'>)" % (self._get_prefix(), INSPECT_MODULE_NAME) ), )
def test_complex_signature_py3(self): if six.PY2: raise SkipTest() with LogCapture() as l: # without this exec Python 2 and pyflakes complain about syntax errors etc exec ( """@trace_call(self.logger) def foo(a, b, c, d, e, *varargs_, f=None, g='G', h='H', i='ii', j='jj', **varkwargs_: None): pass foo('a', 'b', *['c', 'd'], e='E', f='F', Z='Z', **{'g':'g', 'h':'h'}) """, locals(), globals() ) l.check( ( 'test.v0_1.test_base', 'DEBUG', "calling foo(a='a', b='b', c='c', d='d', e='E', f='F', " "g='g', h='h', varkwargs_={'Z': 'Z'}, varargs_=<class '%s._empty'>, " "i='ii', j='jj')" % (INSPECT_MODULE_NAME,) # prefix does not work because of the eval, inspect module is for pypy3 ), )
def test_disable_trace(self): @six.add_metaclass(TraceAllPublicCallsMeta) class Ala(object): @disable_trace def bar(self, a, b, c=None): return True def __repr__(self): return '<%s object>' % (self.__class__.__name__,) class Bela(Ala): def bar(self, a, b, c=None): return False with LogCapture() as l: a = Ala() a.bar(1, 2, 3) a.bar(1, b=2) b = Bela() b.bar(1, 2, 3) b.bar(1, b=2) l.check()
def check_api_error(): runner = CliRunner() with LogCapture() as l: result = runner.invoke(cli.cli, ['--debug', '0', '-c', '0', 'Account', 'GetByID', '--account_id', '1000000']) l.check() print("Critical setting working.") with LogCapture() as l: result = runner.invoke(cli.cli, ['--debug', '1', '-c', '0', 'Account', 'GetByID', '--account_id', '1000000']) l.check(('element.cli.cli', "ERROR", "xUnknownAccount")) print("Error setting working.") with LogCapture() as l: result = runner.invoke(cli.cli, ['--debug', '2', '-c', '0', 'Account', 'GetByID', '--account_id', '1000000']) l.check( ('element.cli.cli', "INFO", "account_id = 1000000;"), ('element.cli.cli', "ERROR", "xUnknownAccount") ) print("Info setting is working.")
def test_natural_key_exception(): """ Tests the get_by_natural_key method for a ContextFilter that doesn't exist. """ with LogCapture() as log_capture: key = ['dummy_context', 'mongodb', 'test_database', 'test_posts', 'host', 'from'] ContextFilter.objects.get_by_natural_key(*key) expected_1 = ('Context dummy_context:mongodb.test_database.' 'test_posts does not exist') expected_2 = ('ContextFilter dummy_context:mongodb.test_database.' 'test_posts (host -> from) does not exist') log_capture.check( ('contexts.models', 'ERROR', expected_1), ('contexts.models', 'ERROR', expected_2) )
def test_process_msg_exception(self): """ Tests the process_msg function when an exception is raised. """ logging.disable(logging.NOTSET) with patch('receiver.receiver.logging.getLogger', return_value=LOGGER): with patch('receiver.receiver.json.loads', side_effect=Exception('foo')): with LogCapture() as log_capture: process_msg(**self.kwargs) log_capture.check( ('receiver', 'ERROR', 'An error occurred while processing the message \'{"@uuid": "12345", ' '"collection": "elasticsearch.test_index.test_logs", "message": ' '"foobar"}\':\n' ' foo'), )
def test_natural_key_exception(self): """ Tests the get_by_natural_key method when the Distillery does not exist. """ with LogCapture() as log_capture: natural_key = ['elasticsearch', 'test_index', 'fake_doctype'] Distillery.objects.get_by_natural_key(*natural_key) log_capture.check( ('warehouses.models', 'ERROR', 'Collection elasticsearch.test_index.fake_doctype does ' 'not exist'), ('distilleries.models', 'ERROR', 'Distillery for Collection elasticsearch.test_index.' 'fake_doctype does not exist') )
def test_add_raw_data_info_for_none(self): """ Tests the _add_raw_data_info method when no Collection name is given. """ with LogCapture() as log_capture: doc_obj = self.doc_obj doc_obj.collection = None actual = self.distillery._add_raw_data_info(self.doc, doc_obj) expected = self.doc log_capture.check( ('cyphon.documents', 'ERROR', 'Info for raw data document None:1 could not be added'), ) self.assertEqual(actual, expected)
def test_integrity_error(self): """ Tests the configuration test tool when an IntegrityError is raised. """ self.page.config_test_value = 'test text' with patch('django.forms.ModelForm.save', side_effect=IntegrityError('foo')): with LogCapture('cyphon.admin') as log_capture: actual = self.page.run_test() expected = "Could not create an object for testing: foo" self.assertEqual(actual, expected) msg = 'An error occurred while creating a test instance: ' + \ '<WSGIRequest: POST ' + \ "'/admin/mailcondensers/mailcondenser/1/change/test/'>" log_capture.check( ('cyphon.admin', 'ERROR', msg), )
def test_validation_error(self): """ Tests the configuration test tool when a ValidationError is raised. """ self.page.config_test_value = 'test text' with patch( 'sifter.mailsifter.mailcondensers.admin.MailCondenserAdmin._get_result', side_effect=ValidationError('foo')): with LogCapture('cyphon.admin') as log_capture: actual = self.page.run_test() expected = "A validation error occurred: ['foo']" self.assertEqual(actual, expected) msg = 'An error occurred while initializing a config test: ' + \ '<WSGIRequest: POST ' + \ "'/admin/mailcondensers/mailcondenser/1/change/test/'>" log_capture.check( ('cyphon.admin', 'ERROR', msg), )
def test_decode_error(self): """ Tests the get_email_value function when a UnicodeDecodeError is raised. """ error = UnicodeDecodeError('funnycodec', b'\x00\x00', 1, 2, 'Something went wrong!') with patch('sifter.mailsifter.accessors.bleach.clean', side_effect=error): with LogCapture() as log_capture: actual = accessors.get_email_value('Subject', {'Subject': 'test'}) expected = 'The Subject of this email could not be displayed ' + \ 'due to an error.' self.assertEqual(actual, expected) msg = ('An error was encountered while parsing the ' 'Subject field of an email.') log_capture.check( ('sifter.mailsifter.accessors', 'ERROR', msg), )
def test_no_file_path(self): """ Tests the save_attachment function. """ mock_settings_1 = { 'ALLOWED_EMAIL_ATTACHMENTS': ('application/java',) } with patch.dict('sifter.mailsifter.accessors.settings.MAILSIFTER', mock_settings_1): self.msg.attach(self.java) attachment = get_first_attachment(self.msg) with patch('sifter.mailsifter.attachments.settings', self.mock_settings): with LogCapture() as log_capture: actual = attachments.save_attachment(attachment) expected = None self.assertEqual(actual, expected) msg = 'The attachment %s is not an allowed file type' \ % self.java_file log_capture.check( ('sifter.mailsifter.attachments', 'WARNING', msg), )
def test_no_match_missing_munger(self): """ Tests the process_email receiver for an email that doesn't match an existing MailChute when a default MailChute is enabled but the defaul MailMunger can't be found. """ doc_obj = self.doc_obj doc_obj.data['Subject'] = 'nothing to see here' mock_config = { 'DEFAULT_MUNGER': 'missing_munger', 'DEFAULT_MUNGER_ENABLED': True } with patch.dict('sifter.mailsifter.mailchutes.models.conf.MAILSIFTER', mock_config): with LogCapture() as log_capture: msg = 'Default MailMunger "missing_munger" is not configured.' MailChute.objects.process(doc_obj) log_capture.check( ('sifter.chutes.models', 'ERROR', msg), )
def test_integrity_error(self): """ Tests the configuration test tool when an IntegrityError is raised. """ self.page.config_test_value = json.dumps({'text': 'test'}) with patch('django.forms.ModelForm.save', side_effect=IntegrityError('foo')): with LogCapture('cyphon.admin') as log_capture: actual = self.page.run_test() expected = "Could not create an object for testing: foo" self.assertEqual(actual, expected) msg = 'An error occurred while creating a test instance: ' + \ '<WSGIRequest: POST ' + \ "'/admin/datacondensers/datacondenser/1/change/test/'>" log_capture.check( ('cyphon.admin', 'ERROR', msg), )
def test_validation_error(self): """ Tests the configuration test tool when a ValidationError is raised. """ self.page.config_test_value = json.dumps({'text': 'test'}) with patch( 'sifter.datasifter.datacondensers.admin.DataCondenserAdmin._get_result', side_effect=ValidationError('foo')): with LogCapture('cyphon.admin') as log_capture: actual = self.page.run_test() expected = "A validation error occurred: ['foo']" self.assertEqual(actual, expected) msg = 'An error occurred while initializing a config test: ' + \ '<WSGIRequest: POST ' + \ "'/admin/datacondensers/datacondenser/1/change/test/'>" log_capture.check( ('cyphon.admin', 'ERROR', msg), )
def test_integrity_error(self): """ Tests the configuration test tool when an IntegrityError is raised. """ with patch('django.forms.ModelForm.save', side_effect=IntegrityError('foo')): with LogCapture('cyphon.admin') as log_capture: actual = self.page.run_test() expected = "Could not create an object for testing: foo" self.assertEqual(actual, expected) msg = 'An error occurred while creating a test instance: ' + \ '<WSGIRequest: POST ' + \ "'/admin/logcondensers/logcondenser/1/change/test/'>" log_capture.check( ('cyphon.admin', 'ERROR', msg), )
def test_get_default_no_chute(self): """ Tests the _default_munger function when the default LogMunger does not exist. """ mock_config = { 'DEFAULT_MUNGER': 'dummy_munger', 'DEFAULT_MUNGER_ENABLED': True } with patch.dict('sifter.logsifter.logchutes.models.conf.LOGSIFTER', mock_config): with LogCapture() as log_capture: actual = LogChute.objects._default_munger expected = None self.assertEqual(actual, expected) self.assertFalse(LogChute.objects._default_munger_enabled) log_capture.check( ('sifter.chutes.models', 'ERROR', 'Default LogMunger "dummy_munger" is not configured.'), )
def test_email_error(self): """ Tests that an error message is logged when an SMTPAuthenticationErro is encountered. """ mock_email = Mock() mock_email.send = Mock( side_effect=SMTPAuthenticationError(535, 'foobar')) with patch('alerts.signals.emails_enabled', return_value=True): with patch('alerts.signals.compose_comment_email', return_value=mock_email): with LogCapture() as log_capture: comment = Comment.objects.get(pk=1) comment.pk = None comment.save() log_capture.check( ('alerts.signals', 'ERROR', 'An error occurred when sending an email ' 'notification: (535, \'foobar\')'), )
def test_cannot_connect(self, mock_index): """ Tests the catch_connection_error decorator when a connection is established. """ @catch_connection_error def test_decorator(): """Test the catch_connection_error decorator.""" self.engine.insert({'foo': 'bar'}) with LogCapture() as log_capture: test_decorator() expected = 'Cannot connect to Elasticsearch' log_capture.check( ('engines.elasticsearch.engine', 'ERROR', expected), )
def test_normal_streaming_query(self): """ Tests the start method for a Pump with a streaming Pipe and a query that doesn't exceed the Pipe's specs. """ with LogCapture() as log_capture: self.stream_pump._factor_query = Mock(return_value=[self.subquery1]) self.stream_pump._process_streaming_query = Mock() self.stream_pump.start(self.subquery1) # check that _factor_query() was called with the value that was # passed to start() self.stream_pump._factor_query.assert_called_once_with(self.subquery1) # check that _process_nonstreaming_queries() was called with the # first element of the query list returned by _factor_query() self.stream_pump._process_streaming_query.assert_called_once_with( self.subquery1) log_capture.check()
def test_large_streaming_query(self): """ Tests the start method for a Pump with a streaming Pipe and a query that exceeds the Pipe's specs. """ with LogCapture() as log_capture: self.stream_pump._factor_query = Mock(return_value=self.query_list) self.stream_pump._process_streaming_query = Mock() self.stream_pump.start(self.query) # check that _factor_query() was called with the value that was # passed to start() self.stream_pump._factor_query.assert_called_once_with(self.query) # check that _process_nonstreaming_queries() was called with the # first element of the query list returned by _factor_query() self.stream_pump._process_streaming_query.assert_called_once_with( self.query_list[0]) # check that a warning was generated msg = 'Query was too large for Pipe "Twitter PublicStreamsAPI." ' \ + 'A smaller version of the query was submitted.' log_capture.check( ('aggregator.pumproom.pump', 'WARNING', msg), )
def test_transmit_course_metadata_task_no_channel(self): """ Test the data transmission task without any integrated channel. """ user = factories.UserFactory(username='john_doe') factories.EnterpriseCustomerFactory( catalog=1, name='Veridian Dynamics', ) # Remove all integrated channels SAPSuccessFactorsEnterpriseCustomerConfiguration.objects.all().delete() DegreedEnterpriseCustomerConfiguration.objects.all().delete() with LogCapture(level=logging.INFO) as log_capture: call_command('transmit_course_metadata', '--catalog_user', user.username) # Because there are no IntegratedChannels, the process will end early. assert not log_capture.records
def test_transmit_course_metadata_task_no_catalog(self): """ Test the data transmission task with enterprise customer that has no course catalog. """ uuid = str(self.enterprise_customer.uuid) course_run_ids = ['course-v1:edX+DemoX+Demo_Course'] self.mock_ent_courses_api_with_pagination( enterprise_uuid=uuid, course_run_ids=course_run_ids ) integrated_channel_enterprise = self.sapsf.enterprise_customer integrated_channel_enterprise.catalog = None integrated_channel_enterprise.save() with LogCapture(level=logging.INFO) as log_capture: call_command('transmit_course_metadata', '--catalog_user', self.user.username) # Because there are no EnterpriseCustomers with a catalog, the process will end early. assert not log_capture.records
def test_transmit_learner_data( self, command_kwargs, certificate, self_paced, end_date, passed, expected_completion, ): """ Test the log output from a successful run of the transmit_learner_data management command, using all the ways we can invoke it. """ with transmit_learner_data_context(command_kwargs, certificate, self_paced, end_date, passed) as (args, kwargs): with LogCapture(level=logging.INFO) as log_capture: expected_output = get_expected_output(**expected_completion) call_command('transmit_learner_data', *args, **kwargs) for index, message in enumerate(expected_output): assert message in log_capture.records[index].getMessage()
def setUp(self): self.lc = LogCapture() self.lc.setLevel(logging.DEBUG) self.lc.addFilter(test_common.MyLogCaptureFilter()) self.additional_setup() self.addCleanup(self.cleanup) self.old_handle_spec = vpc.handle_spec # Monkey patch the handle_spec function, which is called by the # watcher. The handle_spec function is defined in the VPC module. # However, it was directly imported by the watcher module, so it's now # a copy in the watcher module namespace. Thus, the patch has to be # done actually in the watcher module. For safety, we'll do it in both # the vpc and watcher module. def new_handle_spec(*args, **kwargs): pass watcher.handle_spec = vpc.handle_spec = new_handle_spec
def test_unsupported(self): """Test handling of unsupported types.""" with self.assertRaises(ValueError): StopOnNaN(on_unknown_type='error').after_epoch(epoch_data=StopOnNaNTest._get_data(lambda: 0)) with self.assertRaises(AssertionError): StopOnNaN(on_unknown_type='bad value') with LogCapture() as log_capture: StopOnNaN(on_unknown_type='warn').after_epoch(epoch_data=StopOnNaNTest._get_data(lambda: 0)) log_capture.check( ('root', 'WARNING', 'Variable `var` of type `<class \'function\'>` can not be checked for NaNs.'), ) StopOnNaN().after_epoch(epoch_data=StopOnNaNTest._get_data(lambda: 0))
def test_missing_train(self): """Test KeyError raised on missing profile entries.""" with LogCapture() as log_capture: self._hook.after_epoch_profile(0, {}, []) log_capture.check( ('root', 'INFO', '\tT read data:\t0.000000'), ('root', 'INFO', '\tT train:\t0.000000'), ('root', 'INFO', '\tT eval:\t0.000000'), ('root', 'INFO', '\tT hooks:\t0.000000') ) with LogCapture() as log_capture: self._hook.after_epoch_profile(0, {'some_contents': 1}, []) log_capture.check( ('root', 'INFO', '\tT read data:\t0.000000'), ('root', 'INFO', '\tT train:\t0.000000'), ('root', 'INFO', '\tT eval:\t0.000000'), ('root', 'INFO', '\tT hooks:\t0.000000') )
def test_log_variables_selected(self): """ Test logging of selected variables from `epoch_data` streams. """ with LogCapture() as log_capture: LogVariables(['accuracy', 'loss2']).after_epoch(_EPOCH_ID, _get_epoch_data()) log_capture.check( ('root', 'INFO', '\ttrain accuracy: 1.000000'), ('root', 'INFO', '\ttrain loss2:'), ('root', 'INFO', '\t\tmean: 1.000000'), ('root', 'INFO', '\t\tmedian: 11.000000'), ('root', 'INFO', '\ttest accuracy: 2.000000'), ('root', 'INFO', '\ttest loss2:'), ('root', 'INFO', '\t\tmean: 2.000000'), ('root', 'INFO', '\t\tmedian: 22.000000') )
def do_publish_create_republish(config): """Test if creating republishes works.""" with testfixtures.LogCapture() as l: do_publish_create(config) found = False for rec in l.records: if rec.levelname == "CRITICAL": if "has been deferred" in rec.msg: found = True assert found args = [ '-c', config, 'publish', 'create', ] main(args) state = SystemStateReader() state.read() assert 'fakerepo01-stable main' in state.publishes
def test_init(): with LogCapture() as l: rtmbot = init_rtmbot() assert rtmbot.token == 'test-12345' assert rtmbot.directory == '/tmp/' assert rtmbot.debug == True l.check( ('root', 'INFO', 'Initialized in: /tmp/') )
def test_unknown_type(self): """Test if ``WriteTensorBoard`` handles unknown variable types as expected.""" bad_epoch_data = {'valid': {'accuracy': 'bad_type'}} # test ignore hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model()) with LogCapture(level=logging.INFO) as log_capture: hook.after_epoch(42, bad_epoch_data) log_capture.check() # test warn warn_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), on_unknown_type='warn') with LogCapture(level=logging.INFO) as log_capture2: warn_hook.after_epoch(42, bad_epoch_data) log_capture2.check(('root', 'WARNING', 'Variable `accuracy` in stream `valid` has to be of type `int` ' 'or `float` (or a `dict` with a key named `mean` or `nanmean` ' 'whose corresponding value is of type `int` or `float`), ' 'found `<class \'str\'>` instead.')) # test error raise_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), on_unknown_type='error') with self.assertRaises(ValueError): raise_hook.after_epoch(42, bad_epoch_data) with mock.patch.dict('sys.modules', **{'cv2': cv2_mock}): # test skip image variables skip_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), on_unknown_type='error', image_variables=['accuracy']) skip_hook.after_epoch(42, {'valid': {'accuracy': np.zeros((10, 10, 3))}}) skip_hook._summary_writer.close()
def setUp(self): self.lc = LogCapture() self.lc.setLevel(logging.DEBUG) self.lc.addFilter(test_common.MyLogCaptureFilter()) self.addCleanup(self.cleanup)
def test_ColoredFormatter(): """Test if logs are being colored""" logging.config.dictConfig(DEFAULT_LOGGING) with LogCapture(names='bottery') as logs: logger = logging.getLogger('bottery') logger.debug('DEBUG') logger.info('INFO') logger.warning('WARN') logger.error('ERROR') logger.critical('CRITICAL') records = [record for record in logs.records] # Create a list of all records formated with ColoredFormatter colored_formatter = ColoredFormatter() formatted_records = [colored_formatter.format(record) for record in records] expected_records = [ 'DEBUG', 'INFO', '\x1b[33mWARN\x1b[0m', '\x1b[31mERROR\x1b[0m', '\x1b[30m\x1b[41mCRITICAL\x1b[0m' ] assert formatted_records == expected_records
def capture(): """ This way of defining a fixture works around the issue that when using the decorator testfixtures.log_capture() instead, pytest fails with "fixture 'capture' not found". """ with LogCapture(level=logging.DEBUG) as log: yield log # # Test cases #
def test_classify_documents_from_indices(mock_manager, mock_workers, mock_indices_selector): with main.app.app_context(): with patch('urbansearch.main.request') as mock_flask_request: mock_flask_request.args.get.return_value = MagicMock(side_effect=[1, 1, Mock()]) ind_sel = mock_indices_selector.return_value = Mock() cworker = mock_workers.return_value = Mock() man = mock_manager.return_value = Mock() a = Mock() b = Mock() producers = ind_sel.run_workers.return_value = [a, Mock()] consumers = cworker.run_classifying_workers.return_value = \ [b, Mock()] # Bugs other fixtures if imported globally. from testfixtures import LogCapture with LogCapture() as l: main.classify_documents_from_indices() assert ((l.__sizeof__()) > 0) assert mock_indices_selector.called assert mock_workers.called assert mock_manager.called assert man.Queue.called assert ind_sel.run_workers.called assert cworker.run_classifying_workers.called assert cworker.set_producers_done.called assert a.join.called assert b.join.called
def test_classify_indices_to_db_not_connected(mock_db_connected, mock_workers): with main.app.app_context(): with patch('urbansearch.main.request') as mock_flask_request: mock_db_connected.return_value = False from testfixtures import LogCapture with LogCapture() as l: main.classify_indices_to_db() assert (l.__sizeof__()) > 0 assert not mock_workers.called
def test_mock_classify_text_files_to_db(mock_manager, mock_workers, mock_indices_selector, mock_db_utils): mock_db_utils.connected.return_value = True w = mock_workers.return_value = Mock() man = mock_manager.return_value = Mock() a = Mock() b = Mock() producers = w.run_read_files_worker.return_value = [a, Mock()] consumers = w.run_classifying_workers.return_value = \ [b, Mock()] # Bugs other fixtures if imported globally. from testfixtures import LogCapture with LogCapture() as l: main.classify_textfiles_to_db(Mock(), True, 1) assert (l.__sizeof__()) > 0 assert mock_workers.called assert mock_manager.called assert man.Queue.called assert w.run_read_files_worker.called assert w.run_classifying_workers.called assert w.set_file_producers_done.called assert w.clear_file_producers_done.called assert a.join.called assert b.join.called
def test_mock_classify_text_files_to_db_not_connected(mock_db_connected, mock_workers): with main.app.app_context(): with patch('urbansearch.main.request') as mock_flask_request: mock_db_connected.return_value = False from testfixtures import LogCapture with LogCapture() as l: main.classify_textfiles_to_db(Mock(), Mock(), 1, True) assert (l.__sizeof__()) > 0 assert not mock_workers.called
def test_run_classifying_workers(self, mock_event, mock_pd, mock_classify, mock_pre_process, mock_coOc, mock_config, mock_process): queue = Mock() w = Workers() mock_pre_process.return_value = Mock() # Bugs other fixtures if imported globally. from testfixtures import LogCapture with LogCapture() as l: w.run_classifying_workers(1, queue, 1, pre_downloaded=True) assert (l.__sizeof__()) > 0 assert mock_pre_process.called
def test__log_failure_and_die(): with testfixtures.LogCapture() as lc: cr = executor.CallResult(1, 'happy_stdout', 'sad_stderr') with pytest.raises(SystemExit): main._log_failure_and_die('error', cr, False) assert _somewhere_in_messages(lc, 'error') assert _nowhere_in_messages(lc, 'happy_stdout') assert _nowhere_in_messages(lc, 'sad_stderr') with pytest.raises(SystemExit): main._log_failure_and_die('error', cr, True) assert _somewhere_in_messages(lc, 'error') assert _somewhere_in_messages(lc, 'happy_stdout') assert _somewhere_in_messages(lc, 'sad_stderr')
def test_empty_pool_fallback(self): __current_test_get_from_pool_setting = settings.CAPTCHA_GET_FROM_POOL settings.CAPTCHA_GET_FROM_POOL = True CaptchaStore.objects.all().delete() # Delete objects created during SetUp with LogCapture() as l: CaptchaStore.pick() l.check(('captcha.models', 'ERROR', "Couldn't get a captcha from pool, generating"),) self.assertEqual(CaptchaStore.objects.count(), 1) settings.CAPTCHA_GET_FROM_POOL = __current_test_get_from_pool_setting
def test_bad_callback_type_is_logged(self, config): with LogCapture() as l: snooze.github_callback("foobar", None, None, None, None) assert "WARNING" in str(l)