我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用fixtures.FakeLogger()。
def setUp(self): """Run before each test method to initialize test environment.""" super(TestCase, self).setUp() test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0) try: test_timeout = int(test_timeout) except ValueError: # If timeout value is invalid do not set a timeout. test_timeout = 0 if test_timeout > 0: self.useFixture(fixtures.Timeout(test_timeout, gentle=True)) self.useFixture(fixtures.NestedTempfile()) self.useFixture(fixtures.TempHomeDir()) if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES: stdout = self.useFixture(fixtures.StringStream('stdout')).stream self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout)) if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES: stderr = self.useFixture(fixtures.StringStream('stderr')).stream self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) self.log_fixture = self.useFixture(fixtures.FakeLogger())
def setup_logging(self): # Assign default logs to self.LOG so we can still # assert on bilean logs. default_level = logging.INFO if os.environ.get('OS_DEBUG') in _TRUE_VALUES: default_level = logging.DEBUG self.LOG = self.useFixture( fixtures.FakeLogger(level=default_level, format=_LOG_FORMAT)) base_list = set([nlog.split('.')[0] for nlog in logging.Logger.manager.loggerDict]) for base in base_list: if base in TEST_DEFAULT_LOGLEVELS: self.useFixture(fixtures.FakeLogger( level=TEST_DEFAULT_LOGLEVELS[base], name=base, format=_LOG_FORMAT)) elif base != 'bilean': self.useFixture(fixtures.FakeLogger( name=base, format=_LOG_FORMAT))
def test_logs_on_no_permissions(self): workdir = self.useFixture(fixtures.TempDir()) fake_logger = self.useFixture(fixtures.FakeLogger()) bad_path = os.path.join(workdir.path, 'path_not_readable') with open(bad_path, 'w') as f: f.write("# You can't read me") os.chmod(bad_path, 0) result = _build_doubles.extract_schemas_from_file(bad_path) self.assertIsNone(result) self.assertThat( fake_logger.output, Contains('Extracting schemas from %s' % bad_path)) self.assertThat( fake_logger.output, Contains('Cannot extract schemas: Permission denied'))
def test_logs_on_syntax_error(self): workdir = self.useFixture(fixtures.TempDir()) fake_logger = self.useFixture(fixtures.FakeLogger()) bad_path = os.path.join(workdir.path, 'foo.py') with open(bad_path, 'w') as f: f.write("not valid pyton") result = _build_doubles.extract_schemas_from_file(bad_path) self.assertIsNone(result) self.assertThat( fake_logger.output, Contains('Extracting schemas from %s' % bad_path)) self.assertThat( fake_logger.output, Contains('Cannot extract schemas: invalid syntax (foo.py, line 1)') )
def test_listen(self): logger = self.useFixture(FakeLogger(format="%(asctime)s %(message)s")) self.executable.listen(6666) self.executable.sleep(1) self.executable.spawn() # Try to connect to the socket sock = socket.socket() transient = (errno.ECONNREFUSED, errno.ECONNRESET) attempts = 10 for i in range(1, attempts + 1): try: sock.connect(("127.0.0.1", self.executable.port)) except socket.error as error: # pragma: no cover if error.errno in transient and i != attempts: time.sleep(0.05 * i) continue logging.error("connection attempt %d failed", i) raise error break self.assertEqual("127.0.0.1", sock.getsockname()[0])
def test__logs_if_suggests_previously_observed_neighbour(self): # Note: 10.0.0.0/30 --> 10.0.0.1 and 10.0.0.0.2 are usable. subnet = self.make_Subnet( cidr="10.0.0.0/30", gateway_ip=None, dns_servers=None) rackif = factory.make_Interface(vlan=subnet.vlan) now = datetime.now() yesterday = now - timedelta(days=1) factory.make_Discovery( ip="10.0.0.1", interface=rackif, updated=now) factory.make_Discovery( ip="10.0.0.2", interface=rackif, updated=yesterday) logger = self.useFixture(FakeLogger("maas")) ip = subnet.get_next_ip_for_allocation() self.assertThat(ip, Equals("10.0.0.2")) self.assertThat(logger.output, DocTestMatches( "Next IP address...observed previously..." ))
def test__handle_uncaught_exception_logs_other_failure(self): handler = views.WebApplicationHandler() request = make_request() request.path = factory.make_name("path") # Capture an exc_info tuple with traceback. exc_type = factory.make_exception_type() exc_msg = factory.make_name("message") try: raise exc_type(exc_msg) except exc_type: exc_info = sys.exc_info() with FakeLogger(views.__name__, logging.ERROR) as logger: handler.handle_uncaught_exception( request=request, resolver=get_resolver(None), exc_info=exc_info) self.assertThat( logger.output, DocTestMatches("""\ 500 Internal Server Error @ %s Traceback (most recent call last): ... maastesting.factory.TestException#...: %s """ % (request.path, exc_msg)))
def test__scan_as_admin_logs_the_fact_that_a_scan_happened(self): user = factory.make_admin() handler = SubnetHandler(user, {}) subnet = factory.make_Subnet(version=4) rack = factory.make_RackController() factory.make_Interface(node=rack, subnet=subnet) logger = self.useFixture(FakeLogger()) cidr = subnet.get_ipnetwork() handler.scan({ "id": subnet.id, }) # Use MatchesRegex here rather than DocTestMatches because usernames # can contain characters that confuse DocTestMatches (e.g. periods). self.assertThat(logger.output, MatchesRegex( "User '%s' initiated a neighbour discovery scan against subnet: %s" % (re.escape(user.username), re.escape(str(cidr)))))
def test_raises_error_when_omshell_not_connected(self): error = ExternalProcessError( returncode=2, cmd=("omshell",), output="") self.patch(ExternalProcessError, 'output_as_unicode', 'not connected.') omshell = Mock() omshell.remove.side_effect = error mac = factory.make_mac_address() with FakeLogger("maas.dhcp") as logger: error = self.assertRaises( exceptions.CannotRemoveHostMap, dhcp._remove_host_map, omshell, mac) # The CannotCreateHostMap exception includes a message describing the # problematic mapping. self.assertDocTestMatches( "Could not remove host map for %s: " "The DHCP server could not be reached." % (mac), str(error)) # A message is also written to the maas.dhcp logger that describes the # problematic mapping. self.assertDocTestMatches( "Could not remove host map for %s: " "The DHCP server could not be reached." % (mac), logger.output)
def test_raises_error_when_omshell_crashes(self): error_message = factory.make_name("error").encode("ascii") omshell = Mock() omshell.create.side_effect = ExternalProcessError( returncode=2, cmd=("omshell",), output=error_message) mac = factory.make_mac_address() ip = factory.make_ip_address() with FakeLogger("maas.dhcp") as logger: error = self.assertRaises( exceptions.CannotCreateHostMap, dhcp._create_host_map, omshell, mac, ip) # The CannotCreateHostMap exception includes a message describing the # problematic mapping. self.assertDocTestMatches( "Could not create host map for %s -> %s: ..." % (mac, ip), str(error)) # A message is also written to the maas.dhcp logger that describes the # problematic mapping. self.assertDocTestMatches( "Could not create host map for %s -> %s: ..." % (mac, ip), logger.output)
def test_raises_error_when_omshell_not_connected(self): error = ExternalProcessError( returncode=2, cmd=("omshell",), output="") self.patch(ExternalProcessError, 'output_as_unicode', 'not connected.') omshell = Mock() omshell.create.side_effect = error mac = factory.make_mac_address() ip = factory.make_ip_address() with FakeLogger("maas.dhcp") as logger: error = self.assertRaises( exceptions.CannotCreateHostMap, dhcp._create_host_map, omshell, mac, ip) # The CannotCreateHostMap exception includes a message describing the # problematic mapping. self.assertDocTestMatches( "Could not create host map for %s -> %s: " "The DHCP server could not be reached." % (mac, ip), str(error)) # A message is also written to the maas.dhcp logger that describes the # problematic mapping. self.assertDocTestMatches( "Could not create host map for %s -> %s: " "The DHCP server could not be reached." % (mac, ip), logger.output)
def test__does_log_other_exceptions_when_restarting(self): self.patch_sudo_write_file() self.patch_restartService().side_effect = ( factory.make_exception("DHCP is on strike today")) failover_peers = [make_failover_peer_config()] shared_networks = fix_shared_networks_failover( [make_shared_network()], failover_peers) with FakeLogger("maas") as logger: with ExpectedException(exceptions.CannotConfigureDHCP): yield self.configure( factory.make_name('key'), failover_peers, shared_networks, [make_host()], [make_interface()], make_global_dhcp_snippets()) self.assertDocTestMatches( "DHCPv... server failed to restart: " "DHCP is on strike today", logger.output)
def test_query_all_nodes_swallows_PowerActionFail(self): node1, node2 = self.make_nodes(2) new_state_2 = self.pick_alternate_state(node2['power_state']) get_power_state = self.patch(power, 'get_power_state') error_msg = factory.make_name("error") get_power_state.side_effect = [ fail(exceptions.PowerActionFail(error_msg)), succeed(new_state_2), ] suppress_reporting(self) with FakeLogger("maas.power", level=logging.DEBUG) as maaslog: yield power.query_all_nodes([node1, node2]) self.assertDocTestMatches( """\ hostname-...: Could not query power state: %s. hostname-...: Power state has changed from ... to ... """ % error_msg, maaslog.output)
def test_query_all_nodes_swallows_PowerError(self): node1, node2 = self.make_nodes(2) new_state_2 = self.pick_alternate_state(node2['power_state']) get_power_state = self.patch(power, 'get_power_state') error_msg = factory.make_name("error") get_power_state.side_effect = [ fail(PowerError(error_msg)), succeed(new_state_2), ] suppress_reporting(self) with FakeLogger("maas.power", level=logging.DEBUG) as maaslog: yield power.query_all_nodes([node1, node2]) self.assertDocTestMatches( """\ %s: Could not query power state: %s. %s: Power state has changed from %s to %s. """ % (node1['hostname'], error_msg, node2['hostname'], node2['power_state'], new_state_2), maaslog.output)
def test_query_all_nodes_swallows_NoSuchNode(self): node1, node2 = self.make_nodes(2) new_state_2 = self.pick_alternate_state(node2['power_state']) get_power_state = self.patch(power, 'get_power_state') get_power_state.side_effect = [ fail(exceptions.NoSuchNode()), succeed(new_state_2), ] suppress_reporting(self) with FakeLogger("maas.power", level=logging.DEBUG) as maaslog: yield power.query_all_nodes([node1, node2]) self.assertDocTestMatches( """\ hostname-...: Could not update power state: no such node. hostname-...: Power state has changed from ... to ... """, maaslog.output)
def test___performServiceAction_logs_error_if_action_fails(self): service = make_fake_service(SERVICE_STATE.ON) service_monitor = self.make_service_monitor() mock_execSystemDServiceAction = self.patch( service_monitor, "_execSystemDServiceAction") error_output = factory.make_name("error") mock_execSystemDServiceAction.return_value = (1, "", error_output) action = factory.make_name("action") with FakeLogger( "maas.service_monitor", level=logging.ERROR) as maaslog: with ExpectedException(ServiceActionError): yield service_monitor._performServiceAction(service, action) self.assertDocTestMatches( "Service '%s' failed to %s: %s" % ( service.name, action, error_output), maaslog.output)
def test___ensureService_logs_warning_in_mismatch_process_state(self): service = make_fake_service(SERVICE_STATE.ON) service_monitor = self.make_service_monitor([service]) invalid_process_state = factory.make_name("invalid_state") mock_getServiceState = self.patch( service_monitor, "getServiceState") mock_getServiceState.return_value = succeed( ServiceState(SERVICE_STATE.ON, invalid_process_state)) with FakeLogger( "maas.service_monitor", level=logging.WARNING) as maaslog: yield service_monitor._ensureService(service) self.assertDocTestMatches( "Service '%s' is %s but not in the expected state of " "'%s', its current state is '%s'." % ( service.service_name, SERVICE_STATE.ON.value, service_monitor.PROCESS_STATE[SERVICE_STATE.ON], invalid_process_state), maaslog.output)
def test___ensureService_logs_debug_in_expected_states(self): state = SERVICE_STATE.ON service = make_fake_service(state) service_monitor = self.make_service_monitor([service]) expected_process_state = service_monitor.PROCESS_STATE[state] mock_getServiceState = self.patch( service_monitor, "getServiceState") mock_getServiceState.return_value = succeed( ServiceState(SERVICE_STATE.ON, expected_process_state)) with FakeLogger( "maas.service_monitor", level=logging.DEBUG) as maaslog: yield service_monitor._ensureService(service) self.assertDocTestMatches( "Service '%s' is %s and '%s'." % ( service.service_name, state, expected_process_state), maaslog.output)
def test___ensureService_logs_mismatch_for_dead_process_state(self): service = make_fake_service(SERVICE_STATE.OFF) service_monitor = self.make_service_monitor([service]) invalid_process_state = factory.make_name("invalid") mock_getServiceState = self.patch( service_monitor, "getServiceState") mock_getServiceState.return_value = succeed( ServiceState(SERVICE_STATE.DEAD, invalid_process_state)) with FakeLogger( "maas.service_monitor", level=logging.WARNING) as maaslog: yield service_monitor._ensureService(service) self.assertDocTestMatches( "Service '%s' is %s but not in the expected state of " "'%s', its current state is '%s'." % ( service.service_name, SERVICE_STATE.DEAD.value, service_monitor.PROCESS_STATE[SERVICE_STATE.DEAD], invalid_process_state), maaslog.output)
def test___ensureService_performs_start_for_off_service(self): service = make_fake_service(SERVICE_STATE.ON) service_monitor = self.make_service_monitor([service]) mock_getServiceState = self.patch( service_monitor, "getServiceState") mock_getServiceState.side_effect = [ succeed(ServiceState(SERVICE_STATE.OFF, "waiting")), succeed(ServiceState(SERVICE_STATE.ON, "running")), ] mock_performServiceAction = self.patch( service_monitor, "_performServiceAction") mock_performServiceAction.return_value = succeed(None) with FakeLogger( "maas.service_monitor", level=logging.INFO) as maaslog: yield service_monitor._ensureService(service) self.assertThat( mock_performServiceAction, MockCalledOnceWith(service, "start")) self.assertDocTestMatches( """\ Service '%s' is not on, it will be started. Service '%s' has been started and is 'running'. """ % (service.service_name, service.service_name), maaslog.output)
def test__ensureService_performs_stop_for_on_service(self): service = make_fake_service(SERVICE_STATE.OFF) service_monitor = self.make_service_monitor([service]) mock_getServiceState = self.patch( service_monitor, "getServiceState") mock_getServiceState.side_effect = [ succeed(ServiceState(SERVICE_STATE.ON, "running")), succeed(ServiceState(SERVICE_STATE.OFF, "waiting")), ] mock_performServiceAction = self.patch( service_monitor, "_performServiceAction") mock_performServiceAction.return_value = succeed(None) with FakeLogger( "maas.service_monitor", level=logging.INFO) as maaslog: yield service_monitor._ensureService(service) self.assertThat( mock_performServiceAction, MockCalledOnceWith(service, "stop")) self.assertDocTestMatches( """\ Service '%s' is not off, it will be stopped. Service '%s' has been stopped and is 'waiting'. """ % (service.service_name, service.service_name), maaslog.output)
def test_logs_other_errors(self): service = ImageDownloadService( sentinel.rpc, sentinel.tftp_root, Clock()) maybe_start_download = self.patch(service, "maybe_start_download") maybe_start_download.return_value = defer.fail( ZeroDivisionError("Such a shame I can't divide by zero")) with FakeLogger("maas") as maaslog, TwistedLoggerFixture() as logger: d = service.try_download() self.assertEqual(None, extract_result(d)) self.assertDocTestMatches( "Failed to download images: " "Such a shame I can't divide by zero", maaslog.output) self.assertDocTestMatches( """\ Downloading images failed. Traceback (most recent call last): Failure: builtins.ZeroDivisionError: Such a shame ... """, logger.output)
def test_try_query_nodes_logs_other_errors(self): service = self.make_monitor_service() self.patch(npms, "getRegionClient").return_value = sentinel.client sentinel.client.localIdent = factory.make_UUID() query_nodes = self.patch(service, "query_nodes") query_nodes.return_value = fail( ZeroDivisionError("Such a shame I can't divide by zero")) with FakeLogger("maas") as maaslog, TwistedLoggerFixture(): d = service.try_query_nodes() self.assertEqual(None, extract_result(d)) self.assertDocTestMatches( "Failed to query nodes' power status: " "Such a shame I can't divide by zero", maaslog.output)
def setUp(self): super(StandardLogging, self).setUp() # set root logger to debug root = std_logging.getLogger() root.setLevel(std_logging.DEBUG) # supports collecting debug level for local runs if os.environ.get('OS_DEBUG') in _TRUE_VALUES: level = std_logging.DEBUG else: level = std_logging.INFO # Collect logs fs = '%(asctime)s %(levelname)s [%(name)s] %(message)s' self.logger = self.useFixture( fixtures.FakeLogger(format=fs, level=None)) # TODO(sdague): why can't we send level through the fake # logger? Tests prove that it breaks, but it's worth getting # to the bottom of. root.handlers[0].setLevel(level) if level > std_logging.DEBUG: # Just attempt to format debug level logs, but don't save them handler = NullHandler() self.useFixture(fixtures.LogHandler(handler, nuke_handlers=False)) handler.setLevel(std_logging.DEBUG) # Don't log every single DB migration step std_logging.getLogger( 'migrate.versioning.api').setLevel(std_logging.WARNING)
def setUp(self): super(DeckhandTestCase, self).setUp() self.useFixture(fixtures.FakeLogger('deckhand'))
def setUp(self): super(StandardLogging, self).setUp() # set root logger to debug root = logging.getLogger() root.setLevel(logging.DEBUG) # supports collecting debug level for local runs if os.environ.get('OS_DEBUG') in _TRUE_VALUES: level = logging.DEBUG else: level = logging.INFO # Collect logs fs = '%(asctime)s %(levelname)s [%(name)s] %(message)s' self.logger = self.useFixture( fixtures.FakeLogger(format=fs, level=None)) # TODO(sdague): why can't we send level through the fake # logger? Tests prove that it breaks, but it's worth getting # to the bottom of. root.handlers[0].setLevel(level) if level > logging.DEBUG: # Just attempt to format debug level logs, but don't save them handler = NullHandler() self.useFixture(fixtures.LogHandler(handler, nuke_handlers=False)) handler.setLevel(logging.DEBUG)
def setUp(self): super().setUp() # Capture logging self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) # Capturing printing stdout = self.useFixture(fixtures.StringStream('stdout')).stream self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
def setUp(self): super(BaseTestCase, self).setUp() self.useFixture(fixtures.FakeLogger())
def setUp(self): super(TestCase, self).setUp() self.requests_mock = self.useFixture(fixture.Fixture()) self.logger_mock = self.useFixture(fixtures.FakeLogger())
def setUp(self): super(BaseTestCase, self).setUp() self.useFixture(fixtures.FakeLogger()) # If enabled, stdout and/or stderr is captured and will appear in # test results if that test fails. if strutils.bool_from_string(os.environ.get('OS_STDOUT_CAPTURE')): stdout = self.useFixture(fixtures.StringStream('stdout')).stream self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout)) if strutils.bool_from_string(os.environ.get('OS_STDERR_CAPTURE')): stderr = self.useFixture(fixtures.StringStream('stderr')).stream self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
def setUp(self): super(BaseTestCase, self).setUp() test_timeout = os.environ.get('OS_TEST_TIMEOUT', 30) try: test_timeout = int(test_timeout) except ValueError: # If timeout value is invalid, fail hard. print("OS_TEST_TIMEOUT set to invalid value" " defaulting to no timeout") test_timeout = 0 if test_timeout > 0: self.useFixture(fixtures.Timeout(test_timeout, gentle=True)) if os.environ.get('OS_STDOUT_CAPTURE') in options.TRUE_VALUES: stdout = self.useFixture(fixtures.StringStream('stdout')).stream self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout)) if os.environ.get('OS_STDERR_CAPTURE') in options.TRUE_VALUES: stderr = self.useFixture(fixtures.StringStream('stderr')).stream self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) self.log_fixture = self.useFixture( fixtures.FakeLogger('pbr')) # Older git does not have config --local, so create a temporary home # directory to permit using git config --global without stepping on # developer configuration. self.useFixture(fixtures.TempHomeDir()) self.useFixture(fixtures.NestedTempfile()) self.useFixture(fixtures.FakeLogger()) self.useFixture(fixtures.EnvironmentVariable('PBR_VERSION', '0.0')) self.temp_dir = self.useFixture(fixtures.TempDir()).path self.package_dir = os.path.join(self.temp_dir, 'testpackage') shutil.copytree(os.path.join(os.path.dirname(__file__), 'testpackage'), self.package_dir) self.addCleanup(os.chdir, os.getcwd()) os.chdir(self.package_dir) self.addCleanup(self._discard_testpackage)
def setUp(self): super(ConfigureLogging, self).setUp() if self.capture_logs: self.logger = self.useFixture( fixtures.FakeLogger( format=self._format, level=self.level, nuke_handlers=True, ) ) else: logging.basicConfig(format=self._format, level=self.level)
def test_logs_on_missing_file(self): workdir = self.useFixture(fixtures.TempDir()) fake_logger = self.useFixture(fixtures.FakeLogger()) bad_path = os.path.join(workdir.path, 'path_does_not_exist') result = _build_doubles.extract_schemas_from_file(bad_path) self.assertIsNone(result) self.assertThat( fake_logger.output, Contains('Extracting schemas from %s' % bad_path)) self.assertThat( fake_logger.output, Contains('Cannot extract schemas: No such file or directory'))
def test_logs_on_schema_extraction(self): workdir = self.useFixture(fixtures.TempDir()) fake_logger = self.useFixture(fixtures.FakeLogger()) good_path = os.path.join(workdir.path, 'my.py') with open(good_path, 'w') as f: f.write(dedent( """ service = AcceptableService('vendor') root_api = service.api('/', 'root') @root_api.view(introduced_at='1.0') def my_view(): pass """)) [schema] = _build_doubles.extract_schemas_from_file(good_path) self.assertEqual('root', schema.view_name) self.assertEqual('1.0', schema.version) self.assertEqual(['GET'], schema.methods) self.assertEqual(None, schema.input_schema) self.assertEqual(None, schema.output_schema) self.assertThat( fake_logger.output, Contains('Extracting schemas from %s' % good_path)) self.assertThat( fake_logger.output, Contains('Extracted 1 schema')) # To support testing, we need a version of ArgumentParser that doesn't call # sys.exit on error, but rather throws an exception, so we can catch that in # our tests:
def setUp(self): super(TestDrivers, self).setUp() self.logger = self.useFixture( fixtures.FakeLogger( format="%(levelname)8s [%(name)s] %(message)s", level=logging.DEBUG, nuke_handlers=True, ) )
def test_output_not_containing_virtual_does_not_set_tag(self): logger = self.useFixture(FakeLogger()) node = factory.make_Node() self.assertTagsEqual(node, []) set_virtual_tag(node, b"", 0) self.assertTagsEqual(node, []) self.assertIn( "No virtual type reported in VIRTUALITY_SCRIPT output for node " "%s" % node.system_id, logger.output)
def test_output_not_containing_virtual_does_not_remove_tag(self): logger = self.useFixture(FakeLogger()) node = factory.make_Node() node.tags.add(self.getVirtualTag()) self.assertTagsEqual(node, ["virtual"]) set_virtual_tag(node, b"", 0) self.assertTagsEqual(node, ["virtual"]) self.assertIn( "No virtual type reported in VIRTUALITY_SCRIPT output for node " "%s" % node.system_id, logger.output)
def test_hardware_updates_logs_invalid_xml(self): logger = self.useFixture(FakeLogger()) update_hardware_details(factory.make_Node(), b"garbage", 0) expected_log = dedent("""\ Invalid lshw data. Traceback (most recent call last): ... lxml.etree.XMLSyntaxError: Start tag expected, ... """) self.assertThat( logger.output, DocTestMatches( expected_log, self.doctest_flags))
def test_hardware_updates_does_nothing_when_exit_status_is_not_zero(self): logger = self.useFixture(FakeLogger(name='commissioningscript')) update_hardware_details(factory.make_Node(), b"garbage", exit_status=1) self.assertEqual("", logger.output)
def test_update_services_logs_when_service_not_recognised(self): service_name = factory.make_name("service") service = self.make_service(service_name) rack_controller = factory.make_RackController() with FakeLogger(services.__name__) as logger: update_services(rack_controller.system_id, [service]) self.assertThat(logger.output, DocTestMatches( "Rack ... reported status for '...' but this is not a " "recognised service (status='...', info='...')."))
def test_logs_finding_existing_node_when_master(self): logger = self.useFixture(FakeLogger("maas")) node = factory.make_Node(node_type=NODE_TYPE.RACK_CONTROLLER) self.patch(rackcontrollers, "is_master_process").return_value = True register(system_id=node.system_id) self.assertEqual( "Existing rack controller '%s' running version 2.2 or below has " "connected to region '%s'." % ( node.hostname, self.this_region.hostname), logger.output.strip())
def test_logs_finding_existing_node_when_master_with_version(self): logger = self.useFixture(FakeLogger("maas")) node = factory.make_Node(node_type=NODE_TYPE.RACK_CONTROLLER) self.patch(rackcontrollers, "is_master_process").return_value = True register(system_id=node.system_id, version="4.0") self.assertEqual( "Existing rack controller '%s' running version 4.0 has " "connected to region '%s'." % ( node.hostname, self.this_region.hostname), logger.output.strip())
def test_logs_converting_region_controller(self): logger = self.useFixture(FakeLogger("maas")) node = factory.make_Node(node_type=NODE_TYPE.REGION_CONTROLLER) register(system_id=node.system_id) self.assertEqual( "Region controller '%s' running version 2.2 or below converted " "into a region and rack controller.\n" % node.hostname, logger.output)
def test_logs_converting_region_controller_with_version(self): logger = self.useFixture(FakeLogger("maas")) node = factory.make_Node(node_type=NODE_TYPE.REGION_CONTROLLER) register(system_id=node.system_id, version="7.8") self.assertEqual( "Region controller '%s' running version 7.8 converted " "into a region and rack controller.\n" % node.hostname, logger.output)
def test_logs_converting_existing_node(self): logger = self.useFixture(FakeLogger("maas")) node = factory.make_Node(node_type=NODE_TYPE.MACHINE) register(system_id=node.system_id) self.assertEqual( "Region controller '%s' converted '%s' running version 2.2 or " "below into a rack controller.\n" % ( self.this_region.hostname, node.hostname), logger.output)
def test_logs_creating_new_rackcontroller(self): logger = self.useFixture(FakeLogger("maas")) hostname = factory.make_name("hostname") register(hostname=hostname) self.assertEqual( "New rack controller '%s' running version 2.2 or below was " "created by region '%s' upon first connection." % ( hostname, self.this_region.hostname), logger.output.strip())
def test_logs_creating_new_rackcontroller_with_version(self): logger = self.useFixture(FakeLogger("maas")) hostname = factory.make_name("hostname") register(hostname=hostname, version="4.2") self.assertEqual( "New rack controller '%s' running version 4.2 was " "created by region '%s' upon first connection." % ( hostname, self.this_region.hostname), logger.output.strip())