Python fixtures 模块,FakeLogger() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用fixtures.FakeLogger()

项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        """Run before each test method to initialize test environment."""

        super(TestCase, self).setUp()
        test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
        try:
            test_timeout = int(test_timeout)
        except ValueError:
            # If timeout value is invalid do not set a timeout.
            test_timeout = 0
        if test_timeout > 0:
            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))

        self.useFixture(fixtures.NestedTempfile())
        self.useFixture(fixtures.TempHomeDir())

        if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
        if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))

        self.log_fixture = self.useFixture(fixtures.FakeLogger())
项目:deb-oslo.vmware    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        """Run before each test method to initialize test environment."""

        super(TestCase, self).setUp()
        test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
        try:
            test_timeout = int(test_timeout)
        except ValueError:
            # If timeout value is invalid do not set a timeout.
            test_timeout = 0
        if test_timeout > 0:
            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))

        self.useFixture(fixtures.NestedTempfile())
        self.useFixture(fixtures.TempHomeDir())

        if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
        if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))

        self.log_fixture = self.useFixture(fixtures.FakeLogger())
项目:bilean    作者:openstack    | 项目源码 | 文件源码
def setup_logging(self):
        # Assign default logs to self.LOG so we can still
        # assert on bilean logs.
        default_level = logging.INFO
        if os.environ.get('OS_DEBUG') in _TRUE_VALUES:
            default_level = logging.DEBUG

        self.LOG = self.useFixture(
            fixtures.FakeLogger(level=default_level, format=_LOG_FORMAT))
        base_list = set([nlog.split('.')[0]
                         for nlog in logging.Logger.manager.loggerDict])
        for base in base_list:
            if base in TEST_DEFAULT_LOGLEVELS:
                self.useFixture(fixtures.FakeLogger(
                    level=TEST_DEFAULT_LOGLEVELS[base],
                    name=base, format=_LOG_FORMAT))
            elif base != 'bilean':
                self.useFixture(fixtures.FakeLogger(
                    name=base, format=_LOG_FORMAT))
项目:python-zunclient    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        """Run before each test method to initialize test environment."""

        super(TestCase, self).setUp()
        test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
        try:
            test_timeout = int(test_timeout)
        except ValueError:
            # If timeout value is invalid do not set a timeout.
            test_timeout = 0
        if test_timeout > 0:
            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))

        self.useFixture(fixtures.NestedTempfile())
        self.useFixture(fixtures.TempHomeDir())

        if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
        if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))

        self.log_fixture = self.useFixture(fixtures.FakeLogger())
项目:acceptable    作者:canonical-ols    | 项目源码 | 文件源码
def test_logs_on_no_permissions(self):
        workdir = self.useFixture(fixtures.TempDir())
        fake_logger = self.useFixture(fixtures.FakeLogger())

        bad_path = os.path.join(workdir.path, 'path_not_readable')
        with open(bad_path, 'w') as f:
            f.write("# You can't read me")
        os.chmod(bad_path, 0)
        result = _build_doubles.extract_schemas_from_file(bad_path)

        self.assertIsNone(result)
        self.assertThat(
            fake_logger.output,
            Contains('Extracting schemas from %s' % bad_path))
        self.assertThat(
            fake_logger.output,
            Contains('Cannot extract schemas: Permission denied'))
项目:acceptable    作者:canonical-ols    | 项目源码 | 文件源码
def test_logs_on_syntax_error(self):
        workdir = self.useFixture(fixtures.TempDir())
        fake_logger = self.useFixture(fixtures.FakeLogger())

        bad_path = os.path.join(workdir.path, 'foo.py')
        with open(bad_path, 'w') as f:
            f.write("not valid pyton")

        result = _build_doubles.extract_schemas_from_file(bad_path)

        self.assertIsNone(result)
        self.assertThat(
            fake_logger.output,
            Contains('Extracting schemas from %s' % bad_path))
        self.assertThat(
            fake_logger.output,
            Contains('Cannot extract schemas: invalid syntax (foo.py, line 1)')
        )
项目:systemfixtures    作者:testing-cabal    | 项目源码 | 文件源码
def test_listen(self):
        logger = self.useFixture(FakeLogger(format="%(asctime)s %(message)s"))
        self.executable.listen(6666)
        self.executable.sleep(1)
        self.executable.spawn()

        # Try to connect to the socket
        sock = socket.socket()
        transient = (errno.ECONNREFUSED, errno.ECONNRESET)

        attempts = 10
        for i in range(1, attempts + 1):
            try:
                sock.connect(("127.0.0.1", self.executable.port))
            except socket.error as error:  # pragma: no cover
                if error.errno in transient and i != attempts:
                    time.sleep(0.05 * i)
                    continue
                logging.error("connection attempt %d failed", i)
                raise error
            break
        self.assertEqual("127.0.0.1", sock.getsockname()[0])
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__logs_if_suggests_previously_observed_neighbour(self):
        # Note: 10.0.0.0/30 --> 10.0.0.1 and 10.0.0.0.2 are usable.
        subnet = self.make_Subnet(
            cidr="10.0.0.0/30", gateway_ip=None, dns_servers=None)
        rackif = factory.make_Interface(vlan=subnet.vlan)
        now = datetime.now()
        yesterday = now - timedelta(days=1)
        factory.make_Discovery(
            ip="10.0.0.1", interface=rackif, updated=now)
        factory.make_Discovery(
            ip="10.0.0.2", interface=rackif, updated=yesterday)
        logger = self.useFixture(FakeLogger("maas"))
        ip = subnet.get_next_ip_for_allocation()
        self.assertThat(ip, Equals("10.0.0.2"))
        self.assertThat(logger.output, DocTestMatches(
            "Next IP address...observed previously..."
        ))
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__handle_uncaught_exception_logs_other_failure(self):
        handler = views.WebApplicationHandler()
        request = make_request()
        request.path = factory.make_name("path")

        # Capture an exc_info tuple with traceback.
        exc_type = factory.make_exception_type()
        exc_msg = factory.make_name("message")
        try:
            raise exc_type(exc_msg)
        except exc_type:
            exc_info = sys.exc_info()

        with FakeLogger(views.__name__, logging.ERROR) as logger:
            handler.handle_uncaught_exception(
                request=request, resolver=get_resolver(None),
                exc_info=exc_info)

        self.assertThat(
            logger.output, DocTestMatches("""\
            500 Internal Server Error @ %s
            Traceback (most recent call last):
            ...
            maastesting.factory.TestException#...: %s
            """ % (request.path, exc_msg)))
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__scan_as_admin_logs_the_fact_that_a_scan_happened(self):
        user = factory.make_admin()
        handler = SubnetHandler(user, {})
        subnet = factory.make_Subnet(version=4)
        rack = factory.make_RackController()
        factory.make_Interface(node=rack, subnet=subnet)
        logger = self.useFixture(FakeLogger())
        cidr = subnet.get_ipnetwork()
        handler.scan({
            "id": subnet.id,
        })
        # Use MatchesRegex here rather than DocTestMatches because usernames
        # can contain characters that confuse DocTestMatches (e.g. periods).
        self.assertThat(logger.output, MatchesRegex(
            "User '%s' initiated a neighbour discovery scan against subnet: %s"
            % (re.escape(user.username), re.escape(str(cidr)))))
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_raises_error_when_omshell_not_connected(self):
        error = ExternalProcessError(
            returncode=2, cmd=("omshell",), output="")
        self.patch(ExternalProcessError, 'output_as_unicode', 'not connected.')
        omshell = Mock()
        omshell.remove.side_effect = error
        mac = factory.make_mac_address()
        with FakeLogger("maas.dhcp") as logger:
            error = self.assertRaises(
                exceptions.CannotRemoveHostMap, dhcp._remove_host_map,
                omshell, mac)
        # The CannotCreateHostMap exception includes a message describing the
        # problematic mapping.
        self.assertDocTestMatches(
            "Could not remove host map for %s: "
            "The DHCP server could not be reached." % (mac),
            str(error))
        # A message is also written to the maas.dhcp logger that describes the
        # problematic mapping.
        self.assertDocTestMatches(
            "Could not remove host map for %s: "
            "The DHCP server could not be reached." % (mac),
            logger.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_raises_error_when_omshell_crashes(self):
        error_message = factory.make_name("error").encode("ascii")
        omshell = Mock()
        omshell.create.side_effect = ExternalProcessError(
            returncode=2, cmd=("omshell",), output=error_message)
        mac = factory.make_mac_address()
        ip = factory.make_ip_address()
        with FakeLogger("maas.dhcp") as logger:
            error = self.assertRaises(
                exceptions.CannotCreateHostMap, dhcp._create_host_map,
                omshell, mac, ip)
        # The CannotCreateHostMap exception includes a message describing the
        # problematic mapping.
        self.assertDocTestMatches(
            "Could not create host map for %s -> %s: ..." % (mac, ip),
            str(error))
        # A message is also written to the maas.dhcp logger that describes the
        # problematic mapping.
        self.assertDocTestMatches(
            "Could not create host map for %s -> %s: ..." % (mac, ip),
            logger.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_raises_error_when_omshell_not_connected(self):
        error = ExternalProcessError(
            returncode=2, cmd=("omshell",), output="")
        self.patch(ExternalProcessError, 'output_as_unicode', 'not connected.')
        omshell = Mock()
        omshell.create.side_effect = error
        mac = factory.make_mac_address()
        ip = factory.make_ip_address()
        with FakeLogger("maas.dhcp") as logger:
            error = self.assertRaises(
                exceptions.CannotCreateHostMap, dhcp._create_host_map,
                omshell, mac, ip)
        # The CannotCreateHostMap exception includes a message describing the
        # problematic mapping.
        self.assertDocTestMatches(
            "Could not create host map for %s -> %s: "
            "The DHCP server could not be reached." % (mac, ip),
            str(error))
        # A message is also written to the maas.dhcp logger that describes the
        # problematic mapping.
        self.assertDocTestMatches(
            "Could not create host map for %s -> %s: "
            "The DHCP server could not be reached." % (mac, ip),
            logger.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__does_log_other_exceptions_when_restarting(self):
        self.patch_sudo_write_file()
        self.patch_restartService().side_effect = (
            factory.make_exception("DHCP is on strike today"))
        failover_peers = [make_failover_peer_config()]
        shared_networks = fix_shared_networks_failover(
            [make_shared_network()], failover_peers)
        with FakeLogger("maas") as logger:
            with ExpectedException(exceptions.CannotConfigureDHCP):
                yield self.configure(
                    factory.make_name('key'),
                    failover_peers, shared_networks,
                    [make_host()], [make_interface()],
                    make_global_dhcp_snippets())
        self.assertDocTestMatches(
            "DHCPv... server failed to restart: "
            "DHCP is on strike today", logger.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_query_all_nodes_swallows_PowerActionFail(self):
        node1, node2 = self.make_nodes(2)
        new_state_2 = self.pick_alternate_state(node2['power_state'])
        get_power_state = self.patch(power, 'get_power_state')
        error_msg = factory.make_name("error")
        get_power_state.side_effect = [
            fail(exceptions.PowerActionFail(error_msg)),
            succeed(new_state_2),
        ]
        suppress_reporting(self)

        with FakeLogger("maas.power", level=logging.DEBUG) as maaslog:
            yield power.query_all_nodes([node1, node2])

        self.assertDocTestMatches(
            """\
            hostname-...: Could not query power state: %s.
            hostname-...: Power state has changed from ... to ...
            """ % error_msg,
            maaslog.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_query_all_nodes_swallows_PowerError(self):
        node1, node2 = self.make_nodes(2)
        new_state_2 = self.pick_alternate_state(node2['power_state'])
        get_power_state = self.patch(power, 'get_power_state')
        error_msg = factory.make_name("error")
        get_power_state.side_effect = [
            fail(PowerError(error_msg)),
            succeed(new_state_2),
        ]
        suppress_reporting(self)

        with FakeLogger("maas.power", level=logging.DEBUG) as maaslog:
            yield power.query_all_nodes([node1, node2])

        self.assertDocTestMatches(
            """\
            %s: Could not query power state: %s.
            %s: Power state has changed from %s to %s.
            """ % (node1['hostname'], error_msg,
                   node2['hostname'], node2['power_state'], new_state_2),
            maaslog.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_query_all_nodes_swallows_NoSuchNode(self):
        node1, node2 = self.make_nodes(2)
        new_state_2 = self.pick_alternate_state(node2['power_state'])
        get_power_state = self.patch(power, 'get_power_state')
        get_power_state.side_effect = [
            fail(exceptions.NoSuchNode()),
            succeed(new_state_2),
        ]
        suppress_reporting(self)

        with FakeLogger("maas.power", level=logging.DEBUG) as maaslog:
            yield power.query_all_nodes([node1, node2])

        self.assertDocTestMatches(
            """\
            hostname-...: Could not update power state: no such node.
            hostname-...: Power state has changed from ... to ...
            """,
            maaslog.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test___performServiceAction_logs_error_if_action_fails(self):
        service = make_fake_service(SERVICE_STATE.ON)
        service_monitor = self.make_service_monitor()
        mock_execSystemDServiceAction = self.patch(
            service_monitor, "_execSystemDServiceAction")
        error_output = factory.make_name("error")
        mock_execSystemDServiceAction.return_value = (1, "", error_output)
        action = factory.make_name("action")
        with FakeLogger(
                "maas.service_monitor", level=logging.ERROR) as maaslog:
            with ExpectedException(ServiceActionError):
                yield service_monitor._performServiceAction(service, action)

        self.assertDocTestMatches(
            "Service '%s' failed to %s: %s" % (
                service.name, action, error_output),
            maaslog.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test___ensureService_logs_warning_in_mismatch_process_state(self):
        service = make_fake_service(SERVICE_STATE.ON)
        service_monitor = self.make_service_monitor([service])

        invalid_process_state = factory.make_name("invalid_state")
        mock_getServiceState = self.patch(
            service_monitor, "getServiceState")
        mock_getServiceState.return_value = succeed(
            ServiceState(SERVICE_STATE.ON, invalid_process_state))

        with FakeLogger(
                "maas.service_monitor", level=logging.WARNING) as maaslog:
            yield service_monitor._ensureService(service)
        self.assertDocTestMatches(
            "Service '%s' is %s but not in the expected state of "
            "'%s', its current state is '%s'." % (
                service.service_name, SERVICE_STATE.ON.value,
                service_monitor.PROCESS_STATE[SERVICE_STATE.ON],
                invalid_process_state),
            maaslog.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test___ensureService_logs_debug_in_expected_states(self):
        state = SERVICE_STATE.ON
        service = make_fake_service(state)
        service_monitor = self.make_service_monitor([service])

        expected_process_state = service_monitor.PROCESS_STATE[state]
        mock_getServiceState = self.patch(
            service_monitor, "getServiceState")
        mock_getServiceState.return_value = succeed(
            ServiceState(SERVICE_STATE.ON, expected_process_state))

        with FakeLogger(
                "maas.service_monitor", level=logging.DEBUG) as maaslog:
            yield service_monitor._ensureService(service)
        self.assertDocTestMatches(
            "Service '%s' is %s and '%s'." % (
                service.service_name, state, expected_process_state),
            maaslog.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test___ensureService_logs_mismatch_for_dead_process_state(self):
        service = make_fake_service(SERVICE_STATE.OFF)
        service_monitor = self.make_service_monitor([service])

        invalid_process_state = factory.make_name("invalid")
        mock_getServiceState = self.patch(
            service_monitor, "getServiceState")
        mock_getServiceState.return_value = succeed(
            ServiceState(SERVICE_STATE.DEAD, invalid_process_state))

        with FakeLogger(
                "maas.service_monitor", level=logging.WARNING) as maaslog:
            yield service_monitor._ensureService(service)
        self.assertDocTestMatches(
            "Service '%s' is %s but not in the expected state of "
            "'%s', its current state is '%s'." % (
                service.service_name, SERVICE_STATE.DEAD.value,
                service_monitor.PROCESS_STATE[SERVICE_STATE.DEAD],
                invalid_process_state),
            maaslog.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test___ensureService_performs_start_for_off_service(self):
        service = make_fake_service(SERVICE_STATE.ON)
        service_monitor = self.make_service_monitor([service])

        mock_getServiceState = self.patch(
            service_monitor, "getServiceState")
        mock_getServiceState.side_effect = [
            succeed(ServiceState(SERVICE_STATE.OFF, "waiting")),
            succeed(ServiceState(SERVICE_STATE.ON, "running")),
            ]
        mock_performServiceAction = self.patch(
            service_monitor, "_performServiceAction")
        mock_performServiceAction.return_value = succeed(None)

        with FakeLogger(
                "maas.service_monitor", level=logging.INFO) as maaslog:
            yield service_monitor._ensureService(service)
        self.assertThat(
            mock_performServiceAction, MockCalledOnceWith(service, "start"))
        self.assertDocTestMatches(
            """\
            Service '%s' is not on, it will be started.
            Service '%s' has been started and is 'running'.
            """ % (service.service_name, service.service_name),
            maaslog.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__ensureService_performs_stop_for_on_service(self):
        service = make_fake_service(SERVICE_STATE.OFF)
        service_monitor = self.make_service_monitor([service])

        mock_getServiceState = self.patch(
            service_monitor, "getServiceState")
        mock_getServiceState.side_effect = [
            succeed(ServiceState(SERVICE_STATE.ON, "running")),
            succeed(ServiceState(SERVICE_STATE.OFF, "waiting")),
            ]
        mock_performServiceAction = self.patch(
            service_monitor, "_performServiceAction")
        mock_performServiceAction.return_value = succeed(None)

        with FakeLogger(
                "maas.service_monitor", level=logging.INFO) as maaslog:
            yield service_monitor._ensureService(service)
        self.assertThat(
            mock_performServiceAction, MockCalledOnceWith(service, "stop"))
        self.assertDocTestMatches(
            """\
            Service '%s' is not off, it will be stopped.
            Service '%s' has been stopped and is 'waiting'.
            """ % (service.service_name, service.service_name),
            maaslog.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_logs_other_errors(self):
        service = ImageDownloadService(
            sentinel.rpc, sentinel.tftp_root, Clock())

        maybe_start_download = self.patch(service, "maybe_start_download")
        maybe_start_download.return_value = defer.fail(
            ZeroDivisionError("Such a shame I can't divide by zero"))

        with FakeLogger("maas") as maaslog, TwistedLoggerFixture() as logger:
            d = service.try_download()

        self.assertEqual(None, extract_result(d))
        self.assertDocTestMatches(
            "Failed to download images: "
            "Such a shame I can't divide by zero",
            maaslog.output)
        self.assertDocTestMatches(
            """\
            Downloading images failed.
            Traceback (most recent call last):
            Failure: builtins.ZeroDivisionError: Such a shame ...
            """,
            logger.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_try_query_nodes_logs_other_errors(self):
        service = self.make_monitor_service()
        self.patch(npms, "getRegionClient").return_value = sentinel.client
        sentinel.client.localIdent = factory.make_UUID()

        query_nodes = self.patch(service, "query_nodes")
        query_nodes.return_value = fail(
            ZeroDivisionError("Such a shame I can't divide by zero"))

        with FakeLogger("maas") as maaslog, TwistedLoggerFixture():
            d = service.try_query_nodes()

        self.assertEqual(None, extract_result(d))
        self.assertDocTestMatches(
            "Failed to query nodes' power status: "
            "Such a shame I can't divide by zero",
            maaslog.output)
项目:eclsdk    作者:nttcom    | 项目源码 | 文件源码
def setUp(self):
        """Run before each test method to initialize test environment."""

        super(TestCase, self).setUp()
        test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
        try:
            test_timeout = int(test_timeout)
        except ValueError:
            # If timeout value is invalid do not set a timeout.
            test_timeout = 0
        if test_timeout > 0:
            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))

        self.useFixture(fixtures.NestedTempfile())
        self.useFixture(fixtures.TempHomeDir())

        if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
        if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))

        self.log_fixture = self.useFixture(fixtures.FakeLogger())
项目:CAL    作者:HPCC-Cloud-Computing    | 项目源码 | 文件源码
def setUp(self):
        super(StandardLogging, self).setUp()

        # set root logger to debug
        root = std_logging.getLogger()
        root.setLevel(std_logging.DEBUG)

        # supports collecting debug level for local runs
        if os.environ.get('OS_DEBUG') in _TRUE_VALUES:
            level = std_logging.DEBUG
        else:
            level = std_logging.INFO

        # Collect logs
        fs = '%(asctime)s %(levelname)s [%(name)s] %(message)s'
        self.logger = self.useFixture(
            fixtures.FakeLogger(format=fs, level=None))
        # TODO(sdague): why can't we send level through the fake
        # logger? Tests prove that it breaks, but it's worth getting
        # to the bottom of.
        root.handlers[0].setLevel(level)

        if level > std_logging.DEBUG:
            # Just attempt to format debug level logs, but don't save them
            handler = NullHandler()
            self.useFixture(fixtures.LogHandler(handler, nuke_handlers=False))
            handler.setLevel(std_logging.DEBUG)

            # Don't log every single DB migration step
            std_logging.getLogger(
                'migrate.versioning.api').setLevel(std_logging.WARNING)
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def setUp(self):
        super(DeckhandTestCase, self).setUp()
        self.useFixture(fixtures.FakeLogger('deckhand'))
项目:deb-oslo.versionedobjects    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(StandardLogging, self).setUp()

        # set root logger to debug
        root = logging.getLogger()
        root.setLevel(logging.DEBUG)

        # supports collecting debug level for local runs
        if os.environ.get('OS_DEBUG') in _TRUE_VALUES:
            level = logging.DEBUG
        else:
            level = logging.INFO

        # Collect logs
        fs = '%(asctime)s %(levelname)s [%(name)s] %(message)s'
        self.logger = self.useFixture(
            fixtures.FakeLogger(format=fs, level=None))
        # TODO(sdague): why can't we send level through the fake
        # logger? Tests prove that it breaks, but it's worth getting
        # to the bottom of.
        root.handlers[0].setLevel(level)

        if level > logging.DEBUG:
            # Just attempt to format debug level logs, but don't save them
            handler = NullHandler()
            self.useFixture(fixtures.LogHandler(handler, nuke_handlers=False))
            handler.setLevel(logging.DEBUG)
项目:imapautofiler    作者:dhellmann    | 项目源码 | 文件源码
def setUp(self):
        super().setUp()
        # Capture logging
        self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
        # Capturing printing
        stdout = self.useFixture(fixtures.StringStream('stdout')).stream
        self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
项目:python-zunclient    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(BaseTestCase, self).setUp()
        self.useFixture(fixtures.FakeLogger())
项目:os-http    作者:openstack-dev    | 项目源码 | 文件源码
def setUp(self):
        super(TestCase, self).setUp()
        self.requests_mock = self.useFixture(fixture.Fixture())
        self.logger_mock = self.useFixture(fixtures.FakeLogger())
项目:python-valenceclient    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(BaseTestCase, self).setUp()
        self.useFixture(fixtures.FakeLogger())

        # If enabled, stdout and/or stderr is captured and will appear in
        # test results if that test fails.
        if strutils.bool_from_string(os.environ.get('OS_STDOUT_CAPTURE')):
            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
        if strutils.bool_from_string(os.environ.get('OS_STDERR_CAPTURE')):
            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
项目:My-Web-Server-Framework-With-Python2.7    作者:syjsu    | 项目源码 | 文件源码
def setUp(self):
        super(BaseTestCase, self).setUp()
        test_timeout = os.environ.get('OS_TEST_TIMEOUT', 30)
        try:
            test_timeout = int(test_timeout)
        except ValueError:
            # If timeout value is invalid, fail hard.
            print("OS_TEST_TIMEOUT set to invalid value"
                  " defaulting to no timeout")
            test_timeout = 0
        if test_timeout > 0:
            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))

        if os.environ.get('OS_STDOUT_CAPTURE') in options.TRUE_VALUES:
            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
        if os.environ.get('OS_STDERR_CAPTURE') in options.TRUE_VALUES:
            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
        self.log_fixture = self.useFixture(
            fixtures.FakeLogger('pbr'))

        # Older git does not have config --local, so create a temporary home
        # directory to permit using git config --global without stepping on
        # developer configuration.
        self.useFixture(fixtures.TempHomeDir())
        self.useFixture(fixtures.NestedTempfile())
        self.useFixture(fixtures.FakeLogger())
        self.useFixture(fixtures.EnvironmentVariable('PBR_VERSION', '0.0'))

        self.temp_dir = self.useFixture(fixtures.TempDir()).path
        self.package_dir = os.path.join(self.temp_dir, 'testpackage')
        shutil.copytree(os.path.join(os.path.dirname(__file__), 'testpackage'),
                        self.package_dir)
        self.addCleanup(os.chdir, os.getcwd())
        os.chdir(self.package_dir)
        self.addCleanup(self._discard_testpackage)
项目:deb-oslotest    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(ConfigureLogging, self).setUp()
        if self.capture_logs:
            self.logger = self.useFixture(
                fixtures.FakeLogger(
                    format=self._format,
                    level=self.level,
                    nuke_handlers=True,
                )
            )
        else:
            logging.basicConfig(format=self._format, level=self.level)
项目:acceptable    作者:canonical-ols    | 项目源码 | 文件源码
def test_logs_on_missing_file(self):
        workdir = self.useFixture(fixtures.TempDir())
        fake_logger = self.useFixture(fixtures.FakeLogger())

        bad_path = os.path.join(workdir.path, 'path_does_not_exist')
        result = _build_doubles.extract_schemas_from_file(bad_path)

        self.assertIsNone(result)
        self.assertThat(
            fake_logger.output,
            Contains('Extracting schemas from %s' % bad_path))
        self.assertThat(
            fake_logger.output,
            Contains('Cannot extract schemas: No such file or directory'))
项目:acceptable    作者:canonical-ols    | 项目源码 | 文件源码
def test_logs_on_schema_extraction(self):
        workdir = self.useFixture(fixtures.TempDir())
        fake_logger = self.useFixture(fixtures.FakeLogger())

        good_path = os.path.join(workdir.path, 'my.py')
        with open(good_path, 'w') as f:
            f.write(dedent(
                """
                service = AcceptableService('vendor')

                root_api = service.api('/', 'root')

                @root_api.view(introduced_at='1.0')
                def my_view():
                    pass
                """))
        [schema] = _build_doubles.extract_schemas_from_file(good_path)

        self.assertEqual('root', schema.view_name)
        self.assertEqual('1.0', schema.version)
        self.assertEqual(['GET'], schema.methods)
        self.assertEqual(None, schema.input_schema)
        self.assertEqual(None, schema.output_schema)

        self.assertThat(
            fake_logger.output,
            Contains('Extracting schemas from %s' % good_path))
        self.assertThat(
            fake_logger.output,
            Contains('Extracted 1 schema'))


# To support testing, we need a version of ArgumentParser that doesn't call
# sys.exit on error, but rather throws an exception, so we can catch that in
# our tests:
项目:pifpaf    作者:jd    | 项目源码 | 文件源码
def setUp(self):
        super(TestDrivers, self).setUp()
        self.logger = self.useFixture(
            fixtures.FakeLogger(
                format="%(levelname)8s [%(name)s] %(message)s",
                level=logging.DEBUG,
                nuke_handlers=True,
            )
        )
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_output_not_containing_virtual_does_not_set_tag(self):
        logger = self.useFixture(FakeLogger())
        node = factory.make_Node()
        self.assertTagsEqual(node, [])
        set_virtual_tag(node, b"", 0)
        self.assertTagsEqual(node, [])
        self.assertIn(
            "No virtual type reported in VIRTUALITY_SCRIPT output for node "
            "%s" % node.system_id, logger.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_output_not_containing_virtual_does_not_remove_tag(self):
        logger = self.useFixture(FakeLogger())
        node = factory.make_Node()
        node.tags.add(self.getVirtualTag())
        self.assertTagsEqual(node, ["virtual"])
        set_virtual_tag(node, b"", 0)
        self.assertTagsEqual(node, ["virtual"])
        self.assertIn(
            "No virtual type reported in VIRTUALITY_SCRIPT output for node "
            "%s" % node.system_id, logger.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_hardware_updates_logs_invalid_xml(self):
        logger = self.useFixture(FakeLogger())
        update_hardware_details(factory.make_Node(), b"garbage", 0)
        expected_log = dedent("""\
        Invalid lshw data.
        Traceback (most recent call last):
        ...
        lxml.etree.XMLSyntaxError: Start tag expected, ...
        """)
        self.assertThat(
            logger.output, DocTestMatches(
                expected_log, self.doctest_flags))
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_hardware_updates_does_nothing_when_exit_status_is_not_zero(self):
        logger = self.useFixture(FakeLogger(name='commissioningscript'))
        update_hardware_details(factory.make_Node(), b"garbage", exit_status=1)
        self.assertEqual("", logger.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_update_services_logs_when_service_not_recognised(self):
        service_name = factory.make_name("service")
        service = self.make_service(service_name)
        rack_controller = factory.make_RackController()
        with FakeLogger(services.__name__) as logger:
            update_services(rack_controller.system_id, [service])
        self.assertThat(logger.output, DocTestMatches(
            "Rack ... reported status for '...' but this is not a "
            "recognised service (status='...', info='...')."))
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_logs_finding_existing_node_when_master(self):
        logger = self.useFixture(FakeLogger("maas"))
        node = factory.make_Node(node_type=NODE_TYPE.RACK_CONTROLLER)
        self.patch(rackcontrollers, "is_master_process").return_value = True
        register(system_id=node.system_id)
        self.assertEqual(
            "Existing rack controller '%s' running version 2.2 or below has "
            "connected to region '%s'." % (
                node.hostname, self.this_region.hostname),
            logger.output.strip())
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_logs_finding_existing_node_when_master_with_version(self):
        logger = self.useFixture(FakeLogger("maas"))
        node = factory.make_Node(node_type=NODE_TYPE.RACK_CONTROLLER)
        self.patch(rackcontrollers, "is_master_process").return_value = True
        register(system_id=node.system_id, version="4.0")
        self.assertEqual(
            "Existing rack controller '%s' running version 4.0 has "
            "connected to region '%s'." % (
                node.hostname, self.this_region.hostname),
            logger.output.strip())
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_logs_converting_region_controller(self):
        logger = self.useFixture(FakeLogger("maas"))
        node = factory.make_Node(node_type=NODE_TYPE.REGION_CONTROLLER)
        register(system_id=node.system_id)
        self.assertEqual(
            "Region controller '%s' running version 2.2 or below converted "
            "into a region and rack controller.\n" % node.hostname,
            logger.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_logs_converting_region_controller_with_version(self):
        logger = self.useFixture(FakeLogger("maas"))
        node = factory.make_Node(node_type=NODE_TYPE.REGION_CONTROLLER)
        register(system_id=node.system_id, version="7.8")
        self.assertEqual(
            "Region controller '%s' running version 7.8 converted "
            "into a region and rack controller.\n" % node.hostname,
            logger.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_logs_converting_existing_node(self):
        logger = self.useFixture(FakeLogger("maas"))
        node = factory.make_Node(node_type=NODE_TYPE.MACHINE)
        register(system_id=node.system_id)
        self.assertEqual(
            "Region controller '%s' converted '%s' running version 2.2 or "
            "below into a rack controller.\n" % (
                self.this_region.hostname, node.hostname),
            logger.output)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_logs_creating_new_rackcontroller(self):
        logger = self.useFixture(FakeLogger("maas"))
        hostname = factory.make_name("hostname")
        register(hostname=hostname)
        self.assertEqual(
            "New rack controller '%s' running version 2.2 or below was "
            "created by region '%s' upon first connection." % (
                hostname, self.this_region.hostname),
            logger.output.strip())
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_logs_creating_new_rackcontroller_with_version(self):
        logger = self.useFixture(FakeLogger("maas"))
        hostname = factory.make_name("hostname")
        register(hostname=hostname, version="4.2")
        self.assertEqual(
            "New rack controller '%s' running version 4.2 was "
            "created by region '%s' upon first connection." % (
                hostname, self.this_region.hostname),
            logger.output.strip())