Python testfixtures 模块,LogCapture() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用testfixtures.LogCapture()

项目:cxflow-tensorflow    作者:Cognexa    | 项目源码 | 文件源码
def test_missing_variable(self):
        """Test if ``WriteTensorBoard`` handles missing image variables as expected."""
        bad_epoch_data = {'valid': {}}

        with mock.patch.dict('sys.modules', **{'cv2': cv2_mock}):
            # test ignore
            hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
                                    on_missing_variable='ignore')
            with LogCapture(level=logging.INFO) as log_capture:
                hook.after_epoch(42, bad_epoch_data)
            log_capture.check()

            # test warn
            warn_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
                                         on_missing_variable='warn')
            with LogCapture(level=logging.INFO) as log_capture2:
                warn_hook.after_epoch(42, bad_epoch_data)
            log_capture2.check(('root', 'WARNING', '`plot` not found in epoch data.'))

            # test error
            raise_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
                                          on_missing_variable='error')
            with self.assertRaises(KeyError):
                raise_hook.after_epoch(42, bad_epoch_data)
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def test_two_social(self):
        """
        get_social_username should return None if there are two social edX accounts for a user
        """
        UserSocialAuthFactory.create(user=self.user, uid='other name')

        with LogCapture() as log_capture:
            assert get_social_username(self.user) is None
            log_capture.check(
                (
                    'profiles.api',
                    'ERROR',
                    'Unexpected error retrieving social auth username: get() returned more than '
                    'one UserSocialAuth -- it returned 2!'
                )
            )
项目:UrbanSearch    作者:urbansearchTUD    | 项目源码 | 文件源码
def test__store_indices_db(self, mock_db, mock_event,
                               mock_pd, mock_classify,
                               mock_coOc, mock_pre_process,
                               mock_config):
        mock_index = Mock(return_value=True)
        mock_indice = MagicMock()
        mock_indice.return_value = True
        mock_indice.__len__.return_value = 40001
        mock_config.return_value = 0
        mock_db.store_indices.return_value = False

        w = Workers()
        from testfixtures import LogCapture
        with LogCapture() as l:
            w._store_indices_db(mock_index, mock_indice)
            assert (l.__sizeof__()) > 0
            assert mock_indice.append.called
            assert mock_db.store_indices.called
            assert mock_indice.clear.called
项目:UrbanSearch    作者:urbansearchTUD    | 项目源码 | 文件源码
def test__store_info_db(self, mock_event,
                            mock_pd, mock_classify,
                            mock_coOc, mock_pre_process,
                            mock_config):
        mock_config.return_value = 0

        mock_digests = Mock()
        mock_itm = Mock()
        mock_list = MagicMock()
        mock_list.__len__.return_value = 1
        mock_item = [mock_itm, mock_list]
        util_func = Mock()
        util_func.__name__ = 'mocked_util_func'

        w = Workers()
        from testfixtures import LogCapture
        with LogCapture() as l:
            w._store_info_db(mock_digests, mock_item, util_func)
            assert (l.__sizeof__()) > 0
            assert mock_list.append.called
            assert mock_list.clear.called
项目:github-snooze-button    作者:tdsmith    | 项目源码 | 文件源码
def test_bad_message_is_logged(self, config, trivial_message):
        responses.add(responses.POST, "https://api.github.com/repos/tdsmith/test_repo/hooks")
        repo_listener = snooze.RepositoryListener(
            events=snooze.LISTEN_EVENTS,
            **config["tdsmith/test_repo"])

        sqs = boto3.resource("sqs", region_name="us-west-2")
        sqs_queue = list(sqs.queues.all())[0]
        sqs_queue.send_message(MessageBody="this isn't a json message at all")

        with LogCapture() as l:
            repo_listener.poll()
            assert 'ERROR' in str(l)

        def my_callback(event, message):
            raise ValueError("I object!")
        sqs_queue.send_message(MessageBody=trivial_message)
        repo_listener.register_callback(my_callback)
        with LogCapture() as l:
            repo_listener.poll()
            assert 'I object!' in str(l)
项目:stor    作者:counsyl    | 项目源码 | 文件源码
def test_upload_progress_logging(self, mock_getsize, mock_files):
        mock_files.return_value = {
            'file%s' % i: 20
            for i in range(20)
        }
        mock_getsize.return_value = 20

        s3_p = S3Path('s3://bucket')
        with LogCapture('stor.s3.progress') as progress_log:
            s3_p.upload(['upload'])
            progress_log.check(
                ('stor.s3.progress', 'INFO', 'starting upload of 20 objects'),  # nopep8
                ('stor.s3.progress', 'INFO', '10/20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
                ('stor.s3.progress', 'INFO', '20/20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
                ('stor.s3.progress', 'INFO', 'upload complete - 20/20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
            )
项目:stor    作者:counsyl    | 项目源码 | 文件源码
def test_download_progress_logging(self, mock_list, mock_getsize, mock_make_dest_dir):
        mock_list.return_value = [
            S3Path('s3://bucket/file%s' % i)
            for i in range(19)
        ] + [S3Path('s3://bucket/dir')]
        mock_getsize.return_value = 100

        s3_p = S3Path('s3://bucket')
        with LogCapture('stor.s3.progress') as progress_log:
            s3_p.download('output_dir')
            progress_log.check(
                ('stor.s3.progress', 'INFO', 'starting download of 20 objects'),  # nopep8
                ('stor.s3.progress', 'INFO', '10/20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
                ('stor.s3.progress', 'INFO', '20/20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
                ('stor.s3.progress', 'INFO', 'download complete - 20/20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
            )
项目:stor    作者:counsyl    | 项目源码 | 文件源码
def test_progress_logging(self):
        self.mock_swift.download.return_value = [
            {
                'action': 'download_object',
                'read_length': 100
            }
            for i in range(20)
        ]
        self.mock_swift.download.return_value.append({'action': 'random_action'})

        swift_p = SwiftPath('swift://tenant/container')
        with LogCapture('stor.swift.progress') as progress_log:
            swift_p.download('output_dir')
            progress_log.check(
                ('stor.swift.progress', 'INFO', 'starting download'),
                ('stor.swift.progress', 'INFO', '10\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
                ('stor.swift.progress', 'INFO', '20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
                ('stor.swift.progress', 'INFO', 'download complete - 20\t0:00:00\t0.00 MB\t0.00 MB/s'),  # nopep8
            )
项目:logfury    作者:ppolewicz    | 项目源码 | 文件源码
def test_complex_signature_py2(self):
        with LogCapture() as l:

            @trace_call(self.logger)
            def foo(a, b, c, d, e, g='G', h='H', i='ii', j='jj', *varargs_, **varkwargs_):
                pass

            foo('a', 'b', *['c', 'd'], e='E', Z='Z', **{'g': 'g', 'h': 'h'})

            l.check(
                (
                    'test.v0_1.test_base', 'DEBUG', "calling %sfoo(a='a', b='b', c='c', d='d', e='E', "
                    "g='g', h='h', varkwargs_={'Z': 'Z'}, "
                    "i='ii', j='jj', varargs_=<class '%s._empty'>)" % (self._get_prefix(), INSPECT_MODULE_NAME)
                ),
            )
项目:logfury    作者:ppolewicz    | 项目源码 | 文件源码
def test_complex_signature_py3(self):
        if six.PY2:
            raise SkipTest()
        with LogCapture() as l:
            # without this exec Python 2 and pyflakes complain about syntax errors etc
            exec (
                """@trace_call(self.logger)
def foo(a, b, c, d, e, *varargs_, f=None, g='G', h='H', i='ii', j='jj', **varkwargs_: None):
    pass
foo('a', 'b', *['c', 'd'], e='E', f='F', Z='Z', **{'g':'g', 'h':'h'})
""", locals(), globals()
            )
            l.check(
                (
                    'test.v0_1.test_base',
                    'DEBUG',
                    "calling foo(a='a', b='b', c='c', d='d', e='E', f='F', "
                    "g='g', h='h', varkwargs_={'Z': 'Z'}, varargs_=<class '%s._empty'>, "
                    "i='ii', j='jj')" % (INSPECT_MODULE_NAME,)  # prefix does not work because of the eval, inspect module is for pypy3
                ),
            )
项目:logfury    作者:ppolewicz    | 项目源码 | 文件源码
def test_disable_trace(self):
        @six.add_metaclass(TraceAllPublicCallsMeta)
        class Ala(object):
            @disable_trace
            def bar(self, a, b, c=None):
                return True

            def __repr__(self):
                return '<%s object>' % (self.__class__.__name__,)

        class Bela(Ala):
            def bar(self, a, b, c=None):
                return False

        with LogCapture() as l:
            a = Ala()
            a.bar(1, 2, 3)
            a.bar(1, b=2)
            b = Bela()
            b.bar(1, 2, 3)
            b.bar(1, b=2)
            l.check()
项目:solidfire-cli    作者:solidfire    | 项目源码 | 文件源码
def check_api_error():
    runner = CliRunner()
    with LogCapture() as l:
        result = runner.invoke(cli.cli, ['--debug', '0', '-c', '0', 'Account', 'GetByID', '--account_id', '1000000'])
        l.check()
    print("Critical setting working.")
    with LogCapture() as l:
        result = runner.invoke(cli.cli, ['--debug', '1', '-c', '0', 'Account', 'GetByID', '--account_id', '1000000'])
        l.check(('element.cli.cli', "ERROR", "xUnknownAccount"))
    print("Error setting working.")
    with LogCapture() as l:
        result = runner.invoke(cli.cli, ['--debug', '2', '-c', '0', 'Account', 'GetByID', '--account_id', '1000000'])
        l.check(
            ('element.cli.cli', "INFO", "account_id = 1000000;"),
            ('element.cli.cli', "ERROR", "xUnknownAccount")
        )
    print("Info setting is working.")
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_natural_key_exception():
        """
        Tests the get_by_natural_key method for a ContextFilter that
        doesn't exist.
        """
        with LogCapture() as log_capture:
            key = ['dummy_context', 'mongodb', 'test_database', 'test_posts',
                   'host', 'from']
            ContextFilter.objects.get_by_natural_key(*key)
            expected_1 = ('Context dummy_context:mongodb.test_database.'
                          'test_posts does not exist')
            expected_2 = ('ContextFilter dummy_context:mongodb.test_database.'
                          'test_posts (host -> from) does not exist')
            log_capture.check(
                ('contexts.models', 'ERROR', expected_1),
                ('contexts.models', 'ERROR', expected_2)
            )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_process_msg_exception(self):
        """
        Tests the process_msg function when an exception is raised.
        """
        logging.disable(logging.NOTSET)
        with patch('receiver.receiver.logging.getLogger', return_value=LOGGER):
            with patch('receiver.receiver.json.loads', side_effect=Exception('foo')):
                with LogCapture() as log_capture:
                    process_msg(**self.kwargs)
                    log_capture.check(
                        ('receiver',
                         'ERROR',
                         'An error occurred while processing the message \'{"@uuid": "12345", '
                         '"collection": "elasticsearch.test_index.test_logs", "message": '
                         '"foobar"}\':\n'
                         '  foo'),
                    )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_natural_key_exception(self):
        """
        Tests the get_by_natural_key method when the Distillery does not exist.
        """
        with LogCapture() as log_capture:
            natural_key = ['elasticsearch', 'test_index', 'fake_doctype']
            Distillery.objects.get_by_natural_key(*natural_key)
            log_capture.check(
                ('warehouses.models',
                 'ERROR',
                 'Collection elasticsearch.test_index.fake_doctype does '
                 'not exist'),
                ('distilleries.models',
                 'ERROR',
                 'Distillery for Collection elasticsearch.test_index.'
                 'fake_doctype does not exist')
            )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_add_raw_data_info_for_none(self):
        """
        Tests the _add_raw_data_info method when no Collection name is
        given.
        """
        with LogCapture() as log_capture:
            doc_obj = self.doc_obj
            doc_obj.collection = None
            actual = self.distillery._add_raw_data_info(self.doc, doc_obj)
            expected = self.doc
            log_capture.check(
                ('cyphon.documents',
                 'ERROR',
                 'Info for raw data document None:1 could not be added'),
            )
            self.assertEqual(actual, expected)
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_integrity_error(self):
        """
        Tests the configuration test tool when an IntegrityError is raised.
        """
        self.page.config_test_value = 'test text'

        with patch('django.forms.ModelForm.save',
                   side_effect=IntegrityError('foo')):
            with LogCapture('cyphon.admin') as log_capture:

                actual = self.page.run_test()
                expected = "Could not create an object for testing: foo"
                self.assertEqual(actual, expected)

                msg = 'An error occurred while creating a test instance: ' + \
                      '<WSGIRequest: POST ' + \
                      "'/admin/mailcondensers/mailcondenser/1/change/test/'>"
                log_capture.check(
                    ('cyphon.admin', 'ERROR', msg),
                )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_validation_error(self):
        """
        Tests the configuration test tool when a ValidationError is raised.
        """
        self.page.config_test_value = 'test text'

        with patch(
            'sifter.mailsifter.mailcondensers.admin.MailCondenserAdmin._get_result',
            side_effect=ValidationError('foo')):
            with LogCapture('cyphon.admin') as log_capture:

                actual = self.page.run_test()
                expected = "A validation error occurred: ['foo']"
                self.assertEqual(actual, expected)

                msg = 'An error occurred while initializing a config test: ' + \
                      '<WSGIRequest: POST ' + \
                      "'/admin/mailcondensers/mailcondenser/1/change/test/'>"
                log_capture.check(
                    ('cyphon.admin', 'ERROR', msg),
                )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_decode_error(self):
        """
        Tests the get_email_value function when a UnicodeDecodeError is
        raised.
        """
        error = UnicodeDecodeError('funnycodec', b'\x00\x00', 1, 2,
                                   'Something went wrong!')
        with patch('sifter.mailsifter.accessors.bleach.clean',
                   side_effect=error):
            with LogCapture() as log_capture:
                actual = accessors.get_email_value('Subject', {'Subject': 'test'})
                expected = 'The Subject of this email could not be displayed ' + \
                           'due to an error.'
                self.assertEqual(actual, expected)

                msg = ('An error was encountered while parsing the '
                       'Subject field of an email.')
                log_capture.check(
                    ('sifter.mailsifter.accessors', 'ERROR', msg),
                )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_no_file_path(self):
        """
        Tests the save_attachment function.
        """
        mock_settings_1 = {
            'ALLOWED_EMAIL_ATTACHMENTS': ('application/java',)
        }
        with patch.dict('sifter.mailsifter.accessors.settings.MAILSIFTER',
                        mock_settings_1):
            self.msg.attach(self.java)
            attachment = get_first_attachment(self.msg)
            with patch('sifter.mailsifter.attachments.settings',
                       self.mock_settings):
                with LogCapture() as log_capture:
                    actual = attachments.save_attachment(attachment)
                    expected = None
                    self.assertEqual(actual, expected)
                    msg = 'The attachment %s is not an allowed file type' \
                          % self.java_file
                    log_capture.check(
                        ('sifter.mailsifter.attachments', 'WARNING', msg),
                    )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_no_match_missing_munger(self):
        """
        Tests the process_email receiver for an email that doesn't match
        an existing MailChute when a default MailChute is enabled but
        the defaul MailMunger can't be found.
        """
        doc_obj = self.doc_obj
        doc_obj.data['Subject'] = 'nothing to see here'
        mock_config = {
            'DEFAULT_MUNGER': 'missing_munger',
            'DEFAULT_MUNGER_ENABLED': True
        }
        with patch.dict('sifter.mailsifter.mailchutes.models.conf.MAILSIFTER',
                        mock_config):
            with LogCapture() as log_capture:
                msg = 'Default MailMunger "missing_munger" is not configured.'
                MailChute.objects.process(doc_obj)
                log_capture.check(
                    ('sifter.chutes.models', 'ERROR', msg),
                )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_integrity_error(self):
        """
        Tests the configuration test tool when an IntegrityError is raised.
        """
        self.page.config_test_value = json.dumps({'text': 'test'})

        with patch('django.forms.ModelForm.save',
                   side_effect=IntegrityError('foo')):
            with LogCapture('cyphon.admin') as log_capture:

                actual = self.page.run_test()
                expected = "Could not create an object for testing: foo"
                self.assertEqual(actual, expected)

                msg = 'An error occurred while creating a test instance: ' + \
                      '<WSGIRequest: POST ' + \
                      "'/admin/datacondensers/datacondenser/1/change/test/'>"
                log_capture.check(
                    ('cyphon.admin', 'ERROR', msg),
                )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_validation_error(self):
        """
        Tests the configuration test tool when a ValidationError is raised.
        """
        self.page.config_test_value = json.dumps({'text': 'test'})

        with patch(
            'sifter.datasifter.datacondensers.admin.DataCondenserAdmin._get_result',
            side_effect=ValidationError('foo')):
            with LogCapture('cyphon.admin') as log_capture:

                actual = self.page.run_test()
                expected = "A validation error occurred: ['foo']"
                self.assertEqual(actual, expected)

                msg = 'An error occurred while initializing a config test: ' + \
                      '<WSGIRequest: POST ' + \
                      "'/admin/datacondensers/datacondenser/1/change/test/'>"
                log_capture.check(
                    ('cyphon.admin', 'ERROR', msg),
                )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_integrity_error(self):
        """
        Tests the configuration test tool when an IntegrityError is raised.
        """
        with patch('django.forms.ModelForm.save',
                   side_effect=IntegrityError('foo')):
            with LogCapture('cyphon.admin') as log_capture:

                actual = self.page.run_test()
                expected = "Could not create an object for testing: foo"
                self.assertEqual(actual, expected)

                msg = 'An error occurred while creating a test instance: ' + \
                      '<WSGIRequest: POST ' + \
                      "'/admin/logcondensers/logcondenser/1/change/test/'>"
                log_capture.check(
                    ('cyphon.admin', 'ERROR', msg),
                )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_get_default_no_chute(self):
        """
        Tests the _default_munger function when the default LogMunger
        does not exist.
        """
        mock_config = {
            'DEFAULT_MUNGER': 'dummy_munger',
            'DEFAULT_MUNGER_ENABLED': True
        }
        with patch.dict('sifter.logsifter.logchutes.models.conf.LOGSIFTER',
                        mock_config):
            with LogCapture() as log_capture:
                actual = LogChute.objects._default_munger
                expected = None
                self.assertEqual(actual, expected)
                self.assertFalse(LogChute.objects._default_munger_enabled)
                log_capture.check(
                    ('sifter.chutes.models',
                     'ERROR',
                     'Default LogMunger "dummy_munger" is not configured.'),
                )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_email_error(self):
        """
        Tests that an error message is logged when an
        SMTPAuthenticationErro is encountered.
        """
        mock_email = Mock()
        mock_email.send = Mock(
            side_effect=SMTPAuthenticationError(535, 'foobar'))
        with patch('alerts.signals.emails_enabled', return_value=True):
            with patch('alerts.signals.compose_comment_email',
                       return_value=mock_email):
                with LogCapture() as log_capture:
                    comment = Comment.objects.get(pk=1)
                    comment.pk = None
                    comment.save()
                    log_capture.check(
                        ('alerts.signals',
                         'ERROR',
                         'An error occurred when sending an email '
                         'notification: (535, \'foobar\')'),
                    )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_cannot_connect(self, mock_index):
        """
        Tests the catch_connection_error decorator when a connection
        is established.
        """
        @catch_connection_error
        def test_decorator():
            """Test the catch_connection_error decorator."""
            self.engine.insert({'foo': 'bar'})

        with LogCapture() as log_capture:
            test_decorator()
            expected = 'Cannot connect to Elasticsearch'
            log_capture.check(
                ('engines.elasticsearch.engine', 'ERROR', expected),
            )
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_normal_streaming_query(self):
        """
        Tests the start method for a Pump with a streaming Pipe and a query that
        doesn't exceed the Pipe's specs.
        """
        with LogCapture() as log_capture:
            self.stream_pump._factor_query = Mock(return_value=[self.subquery1])
            self.stream_pump._process_streaming_query = Mock()
            self.stream_pump.start(self.subquery1)

            # check that _factor_query() was called with the value that was
            # passed to start()
            self.stream_pump._factor_query.assert_called_once_with(self.subquery1)

            # check that _process_nonstreaming_queries() was called with the
            # first element of the query list returned by _factor_query()
            self.stream_pump._process_streaming_query.assert_called_once_with(
                self.subquery1)

            log_capture.check()
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def test_large_streaming_query(self):
        """
        Tests the start method for a Pump with a streaming Pipe and a query that
        exceeds the Pipe's specs.
        """
        with LogCapture() as log_capture:
            self.stream_pump._factor_query = Mock(return_value=self.query_list)
            self.stream_pump._process_streaming_query = Mock()
            self.stream_pump.start(self.query)

            # check that _factor_query() was called with the value that was
            # passed to start()
            self.stream_pump._factor_query.assert_called_once_with(self.query)

            # check that _process_nonstreaming_queries() was called with the
            # first element of the query list returned by _factor_query()
            self.stream_pump._process_streaming_query.assert_called_once_with(
                self.query_list[0])

            # check that a warning was generated
            msg = 'Query was too large for Pipe "Twitter PublicStreamsAPI." ' \
                        + 'A smaller version of the query was submitted.'
            log_capture.check(
                ('aggregator.pumproom.pump', 'WARNING', msg),
            )
项目:edx-enterprise    作者:edx    | 项目源码 | 文件源码
def test_transmit_course_metadata_task_no_channel(self):
        """
        Test the data transmission task without any integrated channel.
        """
        user = factories.UserFactory(username='john_doe')
        factories.EnterpriseCustomerFactory(
            catalog=1,
            name='Veridian Dynamics',
        )

        # Remove all integrated channels
        SAPSuccessFactorsEnterpriseCustomerConfiguration.objects.all().delete()
        DegreedEnterpriseCustomerConfiguration.objects.all().delete()

        with LogCapture(level=logging.INFO) as log_capture:
            call_command('transmit_course_metadata', '--catalog_user', user.username)

            # Because there are no IntegratedChannels, the process will end early.
            assert not log_capture.records
项目:edx-enterprise    作者:edx    | 项目源码 | 文件源码
def test_transmit_course_metadata_task_no_catalog(self):
        """
        Test the data transmission task with enterprise customer that has no course catalog.
        """
        uuid = str(self.enterprise_customer.uuid)
        course_run_ids = ['course-v1:edX+DemoX+Demo_Course']
        self.mock_ent_courses_api_with_pagination(
            enterprise_uuid=uuid,
            course_run_ids=course_run_ids
        )
        integrated_channel_enterprise = self.sapsf.enterprise_customer
        integrated_channel_enterprise.catalog = None
        integrated_channel_enterprise.save()

        with LogCapture(level=logging.INFO) as log_capture:
            call_command('transmit_course_metadata', '--catalog_user', self.user.username)

            # Because there are no EnterpriseCustomers with a catalog, the process will end early.
            assert not log_capture.records
项目:edx-enterprise    作者:edx    | 项目源码 | 文件源码
def test_transmit_learner_data(
            self,
            command_kwargs,
            certificate,
            self_paced,
            end_date,
            passed,
            expected_completion,
    ):
        """
        Test the log output from a successful run of the transmit_learner_data management command,
        using all the ways we can invoke it.
        """
        with transmit_learner_data_context(command_kwargs, certificate, self_paced, end_date, passed) as (args, kwargs):
            with LogCapture(level=logging.INFO) as log_capture:
                expected_output = get_expected_output(**expected_completion)
                call_command('transmit_learner_data', *args, **kwargs)
                for index, message in enumerate(expected_output):
                    assert message in log_capture.records[index].getMessage()
项目:vpc-router    作者:romana    | 项目源码 | 文件源码
def setUp(self):
        self.lc = LogCapture()
        self.lc.setLevel(logging.DEBUG)
        self.lc.addFilter(test_common.MyLogCaptureFilter())

        self.additional_setup()

        self.addCleanup(self.cleanup)

        self.old_handle_spec = vpc.handle_spec

        # Monkey patch the handle_spec function, which is called by the
        # watcher. The handle_spec function is defined in the VPC module.
        # However, it was directly imported by the watcher module, so it's now
        # a copy in the watcher module namespace. Thus, the patch has to be
        # done actually in the watcher module. For safety, we'll do it in both
        # the vpc and watcher module.
        def new_handle_spec(*args, **kwargs):
            pass
        watcher.handle_spec = vpc.handle_spec = new_handle_spec
项目:cxflow    作者:Cognexa    | 项目源码 | 文件源码
def test_unsupported(self):
        """Test handling of unsupported types."""

        with self.assertRaises(ValueError):
            StopOnNaN(on_unknown_type='error').after_epoch(epoch_data=StopOnNaNTest._get_data(lambda: 0))

        with self.assertRaises(AssertionError):
            StopOnNaN(on_unknown_type='bad value')

        with LogCapture() as log_capture:
            StopOnNaN(on_unknown_type='warn').after_epoch(epoch_data=StopOnNaNTest._get_data(lambda: 0))

        log_capture.check(
            ('root', 'WARNING', 'Variable `var` of type `<class \'function\'>` can not be checked for NaNs.'),
        )

        StopOnNaN().after_epoch(epoch_data=StopOnNaNTest._get_data(lambda: 0))
项目:cxflow    作者:Cognexa    | 项目源码 | 文件源码
def test_missing_train(self):
        """Test KeyError raised on missing profile entries."""
        with LogCapture() as log_capture:
            self._hook.after_epoch_profile(0, {}, [])

        log_capture.check(
            ('root', 'INFO', '\tT read data:\t0.000000'),
            ('root', 'INFO', '\tT train:\t0.000000'),
            ('root', 'INFO', '\tT eval:\t0.000000'),
            ('root', 'INFO', '\tT hooks:\t0.000000')
        )
        with LogCapture() as log_capture:
            self._hook.after_epoch_profile(0, {'some_contents': 1}, [])

        log_capture.check(
            ('root', 'INFO', '\tT read data:\t0.000000'),
            ('root', 'INFO', '\tT train:\t0.000000'),
            ('root', 'INFO', '\tT eval:\t0.000000'),
            ('root', 'INFO', '\tT hooks:\t0.000000')
        )
项目:cxflow    作者:Cognexa    | 项目源码 | 文件源码
def test_log_variables_selected(self):
        """
        Test logging of selected variables from `epoch_data` streams.
        """
        with LogCapture() as log_capture:
            LogVariables(['accuracy', 'loss2']).after_epoch(_EPOCH_ID, _get_epoch_data())

        log_capture.check(
            ('root', 'INFO', '\ttrain accuracy: 1.000000'),
            ('root', 'INFO', '\ttrain loss2:'),
            ('root', 'INFO', '\t\tmean: 1.000000'),
            ('root', 'INFO', '\t\tmedian: 11.000000'),
            ('root', 'INFO', '\ttest accuracy: 2.000000'),
            ('root', 'INFO', '\ttest loss2:'),
            ('root', 'INFO', '\t\tmean: 2.000000'),
            ('root', 'INFO', '\t\tmedian: 22.000000')
        )
项目:pyaptly    作者:adfinis-sygroup    | 项目源码 | 文件源码
def do_publish_create_republish(config):
    """Test if creating republishes works."""
    with testfixtures.LogCapture() as l:
        do_publish_create(config)
        found = False
        for rec in l.records:
            if rec.levelname == "CRITICAL":
                if "has been deferred" in rec.msg:
                    found = True
        assert found
    args = [
        '-c',
        config,
        'publish',
        'create',
    ]
    main(args)
    state = SystemStateReader()
    state.read()
    assert 'fakerepo01-stable main' in state.publishes
项目:marvin    作者:bassdread    | 项目源码 | 文件源码
def test_init():
    with LogCapture() as l:
        rtmbot = init_rtmbot()

    assert rtmbot.token == 'test-12345'
    assert rtmbot.directory == '/tmp/'
    assert rtmbot.debug == True

    l.check(
        ('root', 'INFO', 'Initialized in: /tmp/')
    )
项目:cxflow-tensorflow    作者:Cognexa    | 项目源码 | 文件源码
def test_unknown_type(self):
        """Test if ``WriteTensorBoard`` handles unknown variable types as expected."""
        bad_epoch_data = {'valid': {'accuracy': 'bad_type'}}

        # test ignore
        hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model())
        with LogCapture(level=logging.INFO) as log_capture:
            hook.after_epoch(42, bad_epoch_data)
        log_capture.check()

        # test warn
        warn_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), on_unknown_type='warn')
        with LogCapture(level=logging.INFO) as log_capture2:
            warn_hook.after_epoch(42, bad_epoch_data)
        log_capture2.check(('root', 'WARNING', 'Variable `accuracy` in stream `valid` has to be of type `int` '
                                               'or `float` (or a `dict` with a key named `mean` or `nanmean` '
                                               'whose corresponding value is of type `int` or `float`), '
                                               'found `<class \'str\'>` instead.'))

        # test error
        raise_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), on_unknown_type='error')
        with self.assertRaises(ValueError):
            raise_hook.after_epoch(42, bad_epoch_data)

        with mock.patch.dict('sys.modules', **{'cv2': cv2_mock}):
            # test skip image variables
            skip_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), on_unknown_type='error',
                                         image_variables=['accuracy'])
            skip_hook.after_epoch(42, {'valid': {'accuracy': np.zeros((10, 10, 3))}})
            skip_hook._summary_writer.close()
项目:vpcrouter-romana-plugin    作者:romana    | 项目源码 | 文件源码
def setUp(self):
        self.lc = LogCapture()
        self.lc.setLevel(logging.DEBUG)
        self.lc.addFilter(test_common.MyLogCaptureFilter())
        self.addCleanup(self.cleanup)
项目:bottery    作者:rougeth    | 项目源码 | 文件源码
def test_ColoredFormatter():
    """Test if logs are being colored"""

    logging.config.dictConfig(DEFAULT_LOGGING)

    with LogCapture(names='bottery') as logs:
        logger = logging.getLogger('bottery')
        logger.debug('DEBUG')
        logger.info('INFO')
        logger.warning('WARN')
        logger.error('ERROR')
        logger.critical('CRITICAL')
        records = [record for record in logs.records]

    # Create a list of all records formated with ColoredFormatter
    colored_formatter = ColoredFormatter()
    formatted_records = [colored_formatter.format(record)
                         for record in records]

    expected_records = [
        'DEBUG',
        'INFO',
        '\x1b[33mWARN\x1b[0m',
        '\x1b[31mERROR\x1b[0m',
        '\x1b[30m\x1b[41mCRITICAL\x1b[0m'
    ]

    assert formatted_records == expected_records
项目:python-zhmcclient    作者:zhmcclient    | 项目源码 | 文件源码
def capture():
    """
    This way of defining a fixture works around the issue that when
    using the decorator testfixtures.log_capture() instead, pytest
    fails with "fixture 'capture' not found".
    """
    with LogCapture(level=logging.DEBUG) as log:
        yield log


#
# Test cases
#
项目:UrbanSearch    作者:urbansearchTUD    | 项目源码 | 文件源码
def test_classify_documents_from_indices(mock_manager, mock_workers,
                                         mock_indices_selector):
    with main.app.app_context():
        with patch('urbansearch.main.request') as mock_flask_request:
            mock_flask_request.args.get.return_value = MagicMock(side_effect=[1, 1, Mock()])
            ind_sel = mock_indices_selector.return_value = Mock()
            cworker = mock_workers.return_value = Mock()
            man = mock_manager.return_value = Mock()

            a = Mock()
            b = Mock()

            producers = ind_sel.run_workers.return_value = [a, Mock()]
            consumers = cworker.run_classifying_workers.return_value = \
                [b, Mock()]

            # Bugs other fixtures if imported globally.
            from testfixtures import LogCapture
            with LogCapture() as l:
                main.classify_documents_from_indices()
                assert ((l.__sizeof__()) > 0)

            assert mock_indices_selector.called
            assert mock_workers.called
            assert mock_manager.called
            assert man.Queue.called
            assert ind_sel.run_workers.called
            assert cworker.run_classifying_workers.called
            assert cworker.set_producers_done.called
            assert a.join.called
            assert b.join.called
项目:UrbanSearch    作者:urbansearchTUD    | 项目源码 | 文件源码
def test_classify_indices_to_db_not_connected(mock_db_connected, mock_workers):
    with main.app.app_context():
        with patch('urbansearch.main.request') as mock_flask_request:
            mock_db_connected.return_value = False

            from testfixtures import LogCapture
            with LogCapture() as l:
                main.classify_indices_to_db()
                assert (l.__sizeof__()) > 0
                assert not mock_workers.called
项目:UrbanSearch    作者:urbansearchTUD    | 项目源码 | 文件源码
def test_mock_classify_text_files_to_db(mock_manager, mock_workers,
                                        mock_indices_selector,
                                        mock_db_utils):

    mock_db_utils.connected.return_value = True
    w = mock_workers.return_value = Mock()
    man = mock_manager.return_value = Mock()

    a = Mock()
    b = Mock()

    producers = w.run_read_files_worker.return_value = [a, Mock()]
    consumers = w.run_classifying_workers.return_value = \
        [b, Mock()]

    # Bugs other fixtures if imported globally.
    from testfixtures import LogCapture
    with LogCapture() as l:
        main.classify_textfiles_to_db(Mock(), True, 1)
        assert (l.__sizeof__()) > 0

        assert mock_workers.called
        assert mock_manager.called
        assert man.Queue.called
        assert w.run_read_files_worker.called
        assert w.run_classifying_workers.called
        assert w.set_file_producers_done.called
        assert w.clear_file_producers_done.called
        assert a.join.called
        assert b.join.called
项目:UrbanSearch    作者:urbansearchTUD    | 项目源码 | 文件源码
def test_mock_classify_text_files_to_db_not_connected(mock_db_connected,
                                                      mock_workers):
    with main.app.app_context():
        with patch('urbansearch.main.request') as mock_flask_request:

            mock_db_connected.return_value = False

            from testfixtures import LogCapture
            with LogCapture() as l:
                main.classify_textfiles_to_db(Mock(), Mock(), 1, True)
                assert (l.__sizeof__()) > 0
                assert not mock_workers.called
项目:UrbanSearch    作者:urbansearchTUD    | 项目源码 | 文件源码
def test_run_classifying_workers(self, mock_event, mock_pd,
                                     mock_classify, mock_pre_process,
                                     mock_coOc, mock_config,
                                     mock_process):
        queue = Mock()
        w = Workers()

        mock_pre_process.return_value = Mock()

        # Bugs other fixtures if imported globally.
        from testfixtures import LogCapture
        with LogCapture() as l:
            w.run_classifying_workers(1, queue, 1, pre_downloaded=True)
            assert (l.__sizeof__()) > 0
            assert mock_pre_process.called
项目:hatchery    作者:ajk8    | 项目源码 | 文件源码
def test__log_failure_and_die():
    with testfixtures.LogCapture() as lc:
        cr = executor.CallResult(1, 'happy_stdout', 'sad_stderr')
        with pytest.raises(SystemExit):
            main._log_failure_and_die('error', cr, False)
        assert _somewhere_in_messages(lc, 'error')
        assert _nowhere_in_messages(lc, 'happy_stdout')
        assert _nowhere_in_messages(lc, 'sad_stderr')
        with pytest.raises(SystemExit):
            main._log_failure_and_die('error', cr, True)
        assert _somewhere_in_messages(lc, 'error')
        assert _somewhere_in_messages(lc, 'happy_stdout')
        assert _somewhere_in_messages(lc, 'sad_stderr')
项目:sdining    作者:Lurance    | 项目源码 | 文件源码
def test_empty_pool_fallback(self):
        __current_test_get_from_pool_setting = settings.CAPTCHA_GET_FROM_POOL
        settings.CAPTCHA_GET_FROM_POOL = True
        CaptchaStore.objects.all().delete()  # Delete objects created during SetUp
        with LogCapture() as l:
            CaptchaStore.pick()
        l.check(('captcha.models', 'ERROR', "Couldn't get a captcha from pool, generating"),)
        self.assertEqual(CaptchaStore.objects.count(), 1)
        settings.CAPTCHA_GET_FROM_POOL = __current_test_get_from_pool_setting
项目:github-snooze-button    作者:tdsmith    | 项目源码 | 文件源码
def test_bad_callback_type_is_logged(self, config):
        with LogCapture() as l:
            snooze.github_callback("foobar", None, None, None, None)
            assert "WARNING" in str(l)