我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用testtools.ExpectedException()。
def test_parse_invalid_port(self): """ When a listen address is parsed with an invalid port, an error is raised. """ with ExpectedException( ValueError, r"'foo' does not appear to be a valid port number"): parse_listen_addr(':foo') with ExpectedException( ValueError, r"'0' does not appear to be a valid port number"): parse_listen_addr(':0') with ExpectedException( ValueError, r"'65536' does not appear to be a valid port number"): parse_listen_addr(':65536') with ExpectedException( ValueError, r"'' does not appear to be a valid port number"): parse_listen_addr(':')
def test_image_required(self, capfd): """ When the main function is given no image argument, it should exit with a return code of 2 and inform the user of the missing argument. """ with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))): main(['--tag', 'abc']) out, err = capfd.readouterr() assert_that(out, Equals('')) # More useful error message added to argparse in Python 3 if sys.version_info >= (3,): # Use re.DOTALL so that '.*' also matches newlines assert_that(err, MatchesRegex( r'.*error: the following arguments are required: image$', re.DOTALL )) else: assert_that( err, MatchesRegex(r'.*error: too few arguments$', re.DOTALL))
def test_semver_precision_requires_version_semver(self, capfd): """ When the main function is given the `--semver-precision` option but no `--version-semver` option, it should exit with a return code of 2 and inform the user of the missing option. """ with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))): main(['--semver-precision', '2', 'test-image:abc']) out, err = capfd.readouterr() assert_that(out, Equals('')) assert_that(err, MatchesRegex( r'.*error: the --semver-precision option requires ' r'--version-semver$', re.DOTALL ))
def test_version_semver_requires_argument(self, capfd): """ When the main function is given the `--version-semver` option without an argument, an error should be raised. """ with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))): main([ '--version', '1.2.3', '--version-semver', '--semver-precision', '--', 'test-image', ]) out, err = capfd.readouterr() assert_that(out, Equals('')) assert_that(err, MatchesRegex( r'.*error: argument -P/--semver-precision: expected one argument$', re.DOTALL ))
def test_validate_provider_segment(self): segment = {api.NETWORK_TYPE: self.TYPE, api.PHYSICAL_NETWORK: "phys_net", api.SEGMENTATION_ID: None} with testtools.ExpectedException(exc.InvalidInput, ".*specified.*"): self.driver.validate_provider_segment(segment) segment[api.PHYSICAL_NETWORK] = None self.driver.validate_provider_segment(segment) segment[api.SEGMENTATION_ID] = 1 self.driver.validate_provider_segment(segment) segment = {api.NETWORK_TYPE: self.TYPE, api.SEGMENTATION_ID: None, 'bad_key': "bad_value"} with testtools.ExpectedException(exc.InvalidInput, ".*prohibited.*"): self.driver.validate_provider_segment(segment)
def test_status_with_file_bad_encoder_fails(self): node = factory.make_Node( interface=True, status=NODE_STATUS.COMMISSIONING) contents = b'These are the contents of the file.' encoded_content = encode_as_base64(bz2.compress(contents)) payload = { 'event_type': 'finish', 'result': 'FAILURE', 'origin': 'curtin', 'name': 'commissioning', 'description': 'Commissioning', 'timestamp': datetime.utcnow(), 'files': [ { "path": "sample.txt", "encoding": "uuencode", "compression": "bzip2", "content": encoded_content } ] } with ExpectedException(ValueError): self.processMessage(node, payload)
def test_status_with_file_bad_compression_fails(self): node = factory.make_Node( interface=True, status=NODE_STATUS.COMMISSIONING) contents = b'These are the contents of the file.' encoded_content = encode_as_base64(bz2.compress(contents)) payload = { 'event_type': 'finish', 'result': 'FAILURE', 'origin': 'curtin', 'name': 'commissioning', 'description': 'Commissioning', 'timestamp': datetime.utcnow(), 'files': [ { "path": "sample.txt", "encoding": "base64", "compression": "jpeg", "content": encoded_content } ] } with ExpectedException(ValueError): self.processMessage(node, payload)
def test__create_node_fails_with_invalid_hostname(self): self.prepare_rack_rpc() mac_addresses = [ factory.make_mac_address() for _ in range(3)] architecture = make_usable_architecture(self) hostname = random.choice([ "---", "Microsoft Windows", ]) power_type = random.choice(self.power_types)['name'] power_parameters = {} with ExpectedException(ValidationError): create_node( architecture, power_type, power_parameters, mac_addresses, hostname=hostname)
def test_cannot_save_if_size_larger_than_volume_group(self): filesystem_group = factory.make_FilesystemGroup( group_type=FILESYSTEM_GROUP_TYPE.LVM_VG) factory.make_VirtualBlockDevice( filesystem_group=filesystem_group, size=filesystem_group.get_size() / 2) new_block_device_size = filesystem_group.get_size() human_readable_size = human_readable_bytes(new_block_device_size) with ExpectedException( ValidationError, re.escape( "{'__all__': ['There is not enough free space (%s) " "on volume group %s.']}" % ( human_readable_size, filesystem_group.name, ))): factory.make_VirtualBlockDevice( filesystem_group=filesystem_group, size=new_block_device_size)
def test_cannot_save_volume_group_if_logical_volumes_larger(self): node = factory.make_Node() filesystem_one = factory.make_Filesystem( fstype=FILESYSTEM_TYPE.LVM_PV, block_device=factory.make_PhysicalBlockDevice(node=node)) filesystem_two = factory.make_Filesystem( fstype=FILESYSTEM_TYPE.LVM_PV, block_device=factory.make_PhysicalBlockDevice(node=node)) filesystems = [ filesystem_one, filesystem_two, ] volume_group = factory.make_FilesystemGroup( group_type=FILESYSTEM_GROUP_TYPE.LVM_VG, filesystems=filesystems) factory.make_VirtualBlockDevice( size=volume_group.get_size(), filesystem_group=volume_group) filesystem_two.delete() with ExpectedException( ValidationError, re.escape( "['Volume group cannot be smaller than its " "logical volumes.']")): volume_group.save()
def test_cannot_save_raid_0_with_spare_raid_devices(self): node = factory.make_Node() filesystems = [ factory.make_Filesystem( fstype=FILESYSTEM_TYPE.RAID, block_device=factory.make_PhysicalBlockDevice(node=node)) for _ in range(2) ] filesystems.append( factory.make_Filesystem( fstype=FILESYSTEM_TYPE.RAID_SPARE, block_device=factory.make_PhysicalBlockDevice(node=node))) with ExpectedException( ValidationError, re.escape( "{'__all__': ['RAID level 0 must have at least 2 raid " "devices and no spares.']}")): factory.make_FilesystemGroup( group_type=FILESYSTEM_GROUP_TYPE.RAID_0, filesystems=filesystems)
def test_cannot_save_raid_5_with_less_than_3_raid_devices(self): node = factory.make_Node() filesystems = [ factory.make_Filesystem( fstype=FILESYSTEM_TYPE.RAID, block_device=factory.make_PhysicalBlockDevice(node=node)) for _ in range(random.randint(1, 2)) ] with ExpectedException( ValidationError, re.escape( "{'__all__': ['RAID level 5 must have at least 3 raid " "devices and any number of spares.']}")): factory.make_FilesystemGroup( group_type=FILESYSTEM_GROUP_TYPE.RAID_5, filesystems=filesystems)
def test_cannot_save_raid_10_with_less_than_3_raid_devices(self): node = factory.make_Node() filesystems = [ factory.make_Filesystem( fstype=FILESYSTEM_TYPE.RAID, block_device=factory.make_PhysicalBlockDevice(node=node)) for _ in range(random.randint(1, 2)) ] with ExpectedException( ValidationError, re.escape( "{'__all__': ['RAID level 10 must have at least 3 raid " "devices and any number of spares.']}")): factory.make_FilesystemGroup( group_type=FILESYSTEM_GROUP_TYPE.RAID_10, filesystems=filesystems)
def test_cannot_save_bcache_without_cache_set(self): node = factory.make_Node() filesystems = [ factory.make_Filesystem( fstype=FILESYSTEM_TYPE.BCACHE_BACKING, block_device=factory.make_PhysicalBlockDevice(node=node)), ] with ExpectedException( ValidationError, re.escape( "{'__all__': ['Bcache requires an assigned cache " "set.']}")): filesystem_group = factory.make_FilesystemGroup( group_type=FILESYSTEM_GROUP_TYPE.BCACHE, filesystems=filesystems) filesystem_group.cache_set = None filesystem_group.save()
def test_cannot_save_bcache_with_logical_volume_as_backing(self): node = factory.make_Node() cache_set = factory.make_CacheSet(node=node) filesystems = [ factory.make_Filesystem( fstype=FILESYSTEM_TYPE.BCACHE_BACKING, block_device=factory.make_VirtualBlockDevice(node=node)), ] with ExpectedException( ValidationError, re.escape( "{'__all__': ['Bcache cannot use a logical volume as a " "backing device.']}")): factory.make_FilesystemGroup( group_type=FILESYSTEM_GROUP_TYPE.BCACHE, cache_set=cache_set, filesystems=filesystems)
def test_cannot_save_bcache_with_multiple_backings(self): node = factory.make_Node() cache_set = factory.make_CacheSet(node=node) filesystems = [ factory.make_Filesystem( fstype=FILESYSTEM_TYPE.BCACHE_BACKING, block_device=factory.make_PhysicalBlockDevice(node=node)) for _ in range(random.randint(2, 10)) ] with ExpectedException( ValidationError, re.escape( "{'__all__': ['Bcache can only contain one backing " "device.']}")): factory.make_FilesystemGroup( group_type=FILESYSTEM_GROUP_TYPE.BCACHE, cache_set=cache_set, filesystems=filesystems)
def test_create_raid_0_with_one_element_fails(self): node = factory.make_Node() block_device = factory.make_PhysicalBlockDevice(node=node) uuid = str(uuid4()) with ExpectedException( ValidationError, re.escape( "{'__all__': ['RAID level 0 must have at least 2 raid " "devices and no spares.']}")): RAID.objects.create_raid( name='md0', level=FILESYSTEM_GROUP_TYPE.RAID_0, uuid=uuid, block_devices=[block_device], partitions=[], spare_devices=[], spare_partitions=[])
def test_create_raid_1_with_one_element_fails(self): node = factory.make_Node() block_device = factory.make_PhysicalBlockDevice(node=node) uuid = str(uuid4()) with ExpectedException( ValidationError, re.escape( "{'__all__': ['RAID level 1 must have at least 2 raid " "devices and any number of spares.']}")): RAID.objects.create_raid( name='md0', level=FILESYSTEM_GROUP_TYPE.RAID_1, uuid=uuid, block_devices=[block_device], partitions=[], spare_devices=[], spare_partitions=[])
def test_create_raid_5_with_2_elements_fails(self): node = factory.make_Node() block_devices = [ factory.make_PhysicalBlockDevice(node=node, size=10 * 1000 ** 4) for _ in range(2) ] uuid = str(uuid4()) with ExpectedException( ValidationError, re.escape( "{'__all__': ['RAID level 5 must have at least 3 raid " "devices and any number of spares.']}")): RAID.objects.create_raid( name='md0', level=FILESYSTEM_GROUP_TYPE.RAID_5, uuid=uuid, block_devices=block_devices, partitions=[], spare_devices=[], spare_partitions=[])
def test_create_raid_6_with_3_elements_fails(self): node = factory.make_Node() block_devices = [ factory.make_PhysicalBlockDevice(node=node) for _ in range(3) ] uuid = str(uuid4()) with ExpectedException( ValidationError, re.escape( "{'__all__': ['RAID level 6 must have at least 4 raid " "devices and any number of spares.']}")): RAID.objects.create_raid( name='md0', level=FILESYSTEM_GROUP_TYPE.RAID_6, uuid=uuid, block_devices=block_devices, partitions=[], spare_devices=[], spare_partitions=[])
def test_create_raid_with_block_device_from_other_node_fails(self): node1 = factory.make_Node() node2 = factory.make_Node() block_devices_1 = [ factory.make_PhysicalBlockDevice(node=node1) for _ in range(5) ] block_devices_2 = [ factory.make_PhysicalBlockDevice(node=node2) for _ in range(5) ] uuid = str(uuid4()) with ExpectedException( ValidationError, re.escape( "{'__all__': ['All added filesystems must belong to the " "same node.']}")): RAID.objects.create_raid( name='md0', level=FILESYSTEM_GROUP_TYPE.RAID_1, uuid=uuid, block_devices=block_devices_1 + block_devices_2, partitions=[], spare_devices=[], spare_partitions=[])
def test_add_device_from_another_node_to_array_fails(self): node = factory.make_Node() other_node = factory.make_Node() device_size = 10 * 1000 ** 4 block_devices = [ factory.make_PhysicalBlockDevice(node=node, size=device_size) for _ in range(10) ] uuid = str(uuid4()) raid = RAID.objects.create_raid( name='md0', level=FILESYSTEM_GROUP_TYPE.RAID_5, uuid=uuid, block_devices=block_devices) device = factory.make_PhysicalBlockDevice( node=other_node, size=device_size) with ExpectedException( ValidationError, re.escape( "['Device needs to be from the same node as the rest of the " "array.']")): raid.add_device(device, FILESYSTEM_TYPE.RAID) self.assertEqual(10, raid.filesystems.count()) # Still 10 devices self.assertEqual(9 * device_size, raid.get_size())
def test_add_partition_from_another_node_to_array_fails(self): node = factory.make_Node() other_node = factory.make_Node() device_size = 10 * 1000 ** 4 block_devices = [ factory.make_PhysicalBlockDevice(node=node, size=device_size) for _ in range(10) ] uuid = str(uuid4()) raid = RAID.objects.create_raid( name='md0', level=FILESYSTEM_GROUP_TYPE.RAID_5, uuid=uuid, block_devices=block_devices) partition = factory.make_PartitionTable( block_device=factory.make_PhysicalBlockDevice( node=other_node, size=device_size)).add_partition() with ExpectedException( ValidationError, re.escape( "['Partition must be on a device from the same node as " "the rest of the array.']")): raid.add_partition(partition, FILESYSTEM_TYPE.RAID) self.assertEqual(10, raid.filesystems.count()) # Nothing added self.assertEqual(9 * device_size, raid.get_size())
def test_add_already_used_device_to_array_fails(self): node = factory.make_Node() device_size = 10 * 1000 ** 4 block_devices = [ factory.make_PhysicalBlockDevice(node=node, size=device_size) for _ in range(10) ] uuid = str(uuid4()) raid = RAID.objects.create_raid( name='md0', level=FILESYSTEM_GROUP_TYPE.RAID_5, uuid=uuid, block_devices=block_devices) device = factory.make_PhysicalBlockDevice(node=node, size=device_size) Filesystem.objects.create( block_device=device, mount_point='/export/home', fstype=FILESYSTEM_TYPE.EXT4) with ExpectedException( ValidationError, re.escape( "['There is another filesystem on this device.']")): raid.add_device(device, FILESYSTEM_TYPE.RAID) self.assertEqual(10, raid.filesystems.count()) # Nothing added. self.assertEqual(9 * device_size, raid.get_size())
def test_remove_device_from_array_fails(self): node = factory.make_Node() device_size = 10 * 1000 ** 4 block_devices = [ factory.make_PhysicalBlockDevice(node=node, size=device_size) for _ in range(10) ] uuid = str(uuid4()) raid = RAID.objects.create_raid( name='md0', level=FILESYSTEM_GROUP_TYPE.RAID_5, uuid=uuid, block_devices=block_devices) with ExpectedException( ValidationError, re.escape("['Device does not belong to this array.']")): raid.remove_device( factory.make_PhysicalBlockDevice(node=node, size=device_size)) self.assertEqual(10, raid.filesystems.count()) self.assertEqual(9 * device_size, raid.get_size())
def test_allows_multiple_records_unless_cname(self): dnsdata = factory.make_DNSData(no_ip_addresses=True) if dnsdata.rrtype == 'CNAME': with ExpectedException( ValidationError, re.escape( "{'__all__': ['%s']}" % MULTI_CNAME_MSG)): factory.make_DNSData( dnsresource=dnsdata.dnsresource, rrtype='CNAME') else: factory.make_DNSData( dnsresource=dnsdata.dnsresource, rrtype=dnsdata.rrtype) self.assertEqual(2, DNSData.objects.filter( dnsresource=dnsdata.dnsresource).count())
def test__switch_static_to_already_used_ip_address(self): interface = factory.make_Interface(INTERFACE_TYPE.PHYSICAL) subnet = factory.make_Subnet(vlan=interface.vlan) static_ip = factory.make_StaticIPAddress( alloc_type=IPADDRESS_TYPE.STICKY, ip=factory.pick_ip_in_Subnet(subnet), subnet=subnet, interface=interface) other_interface = factory.make_Interface(INTERFACE_TYPE.PHYSICAL) used_ip_address = factory.pick_ip_in_Subnet( subnet, but_not=[static_ip.ip]) factory.make_StaticIPAddress( alloc_type=IPADDRESS_TYPE.STICKY, ip=used_ip_address, subnet=subnet, interface=other_interface) with ExpectedException(StaticIPAddressUnavailable): interface.update_ip_address( static_ip, INTERFACE_LINK_TYPE.STATIC, subnet, ip_address=used_ip_address)
def test__claim_fails_if_subnet_missing(self): with transaction.atomic(): interface = factory.make_Interface(INTERFACE_TYPE.PHYSICAL) subnet = factory.make_Subnet(vlan=interface.vlan) ip = factory.make_StaticIPAddress( alloc_type=IPADDRESS_TYPE.AUTO, ip="", subnet=subnet, interface=interface) ip.subnet = None ip.save() maaslog = self.patch_autospec(interface_module, "maaslog") with transaction.atomic(): with ExpectedException(StaticIPAddressUnavailable): interface.claim_auto_ips() self.expectThat(maaslog.error, MockCalledOnceWith( "Could not find subnet for interface %s." % interface.get_log_string()))
def test__lock_released_on_error(self): exception = factory.make_exception() self.patch(node_module, 'refresh').side_effect = exception region = yield deferToDatabase(factory.make_RegionController) self.patch(node_module, 'get_maas_id').return_value = region.system_id self.patch(node_module, 'get_sys_info').return_value = { 'hostname': region.hostname, 'architecture': region.architecture, 'osystem': '', 'distro_series': '', 'interfaces': {}, } with ExpectedException(type(exception)): yield region.refresh() lock = NamedLock('refresh') self.assertFalse(lock.is_locked())
def test__no_save_duplicate_ipranges(self): subnet = make_plain_subnet() IPRange( subnet=subnet, type=IPRANGE_TYPE.DYNAMIC, start_ip="192.168.0.100", end_ip="192.168.0.150", ).save() # Make the same range again, should fail to save. iprange = IPRange( subnet=subnet, type=IPRANGE_TYPE.DYNAMIC, start_ip="192.168.0.100", end_ip="192.168.0.150", ) with ExpectedException(ValidationError, self.dynamic_overlaps): iprange.save()
def test__no_save_range_overlap_begin(self): subnet = make_plain_subnet() IPRange( subnet=subnet, type=IPRANGE_TYPE.DYNAMIC, start_ip="192.168.0.100", end_ip="192.168.0.150", ).save() # Make an overlapping range across start_ip, should fail to save. iprange = IPRange( subnet=subnet, type=IPRANGE_TYPE.DYNAMIC, start_ip="192.168.0.90", end_ip="192.168.0.100", ) with ExpectedException(ValidationError, self.dynamic_overlaps): iprange.save() # Try as reserved range. iprange.type = IPRANGE_TYPE.RESERVED with ExpectedException(ValidationError, self.reserved_overlaps): iprange.save()
def test__no_save_range_overlap_end(self): subnet = make_plain_subnet() IPRange( subnet=subnet, type=IPRANGE_TYPE.DYNAMIC, start_ip="192.168.0.100", end_ip="192.168.0.150", ).save() # Make an overlapping range across end_ip, should fail to save. iprange = IPRange( subnet=subnet, type=IPRANGE_TYPE.DYNAMIC, start_ip="192.168.0.140", end_ip="192.168.0.160", ) with ExpectedException(ValidationError, self.dynamic_overlaps): iprange.save()
def test__no_save_range_within_ranges(self): subnet = make_plain_subnet() IPRange( subnet=subnet, type=IPRANGE_TYPE.DYNAMIC, start_ip="192.168.0.100", end_ip="192.168.0.150", ).save() # Make a contained range, should not save. iprange = IPRange( subnet=subnet, type=IPRANGE_TYPE.DYNAMIC, start_ip="192.168.0.110", end_ip="192.168.0.140", ) with ExpectedException(ValidationError, self.dynamic_overlaps): iprange.save()
def test__no_save_range_within_existing_range(self): subnet = make_plain_subnet() IPRange( subnet=subnet, type=IPRANGE_TYPE.DYNAMIC, start_ip="192.168.0.100", end_ip="192.168.0.150", ).save() # Make a contained range, should not save. iprange = IPRange( subnet=subnet, type=IPRANGE_TYPE.DYNAMIC, start_ip="192.168.0.110", end_ip="192.168.0.140", ) with ExpectedException(ValidationError, self.dynamic_overlaps): iprange.save()
def test__no_save_range_within_existing_reserved_range(self): subnet = make_plain_subnet() IPRange( subnet=subnet, type=IPRANGE_TYPE.RESERVED, start_ip="192.168.0.100", end_ip="192.168.0.150", ).save() # Make a contained range, should not save. iprange = IPRange( subnet=subnet, type=IPRANGE_TYPE.DYNAMIC, start_ip="192.168.0.110", end_ip="192.168.0.140", ) with ExpectedException(ValidationError, self.dynamic_overlaps): iprange.save()
def test__reserved_range_cannot_overlap_reserved_ranges(self): subnet = factory.make_Subnet( cidr='192.168.0.0/24', gateway_ip='192.168.0.1', dns_servers=['192.168.0.50', '192.168.0.200']) IPRange( subnet=subnet, type=IPRANGE_TYPE.RESERVED, start_ip="192.168.0.1", end_ip="192.168.0.250", ).save() iprange = IPRange( subnet=subnet, type=IPRANGE_TYPE.RESERVED, start_ip="192.168.0.250", end_ip="192.168.0.254", ) with ExpectedException(ValidationError, self.reserved_overlaps): iprange.save()
def test__dynamic_range_cannot_overlap_auto_address(self): subnet = make_plain_subnet() factory.make_StaticIPAddress( subnet=subnet, alloc_type=IPADDRESS_TYPE.AUTO, ip=factory.pick_ip_in_network( IPNetwork(subnet.cidr), but_not=['192.168.0.1'])) iprange = IPRange( subnet=subnet, type=IPRANGE_TYPE.DYNAMIC, start_ip="192.168.0.2", end_ip="192.168.0.254", ) with ExpectedException(ValidationError, self.dynamic_overlaps): iprange.save()
def test__dynamic_range_cannot_overlap_sticky_address(self): subnet = make_plain_subnet() factory.make_StaticIPAddress( subnet=subnet, alloc_type=IPADDRESS_TYPE.STICKY, ip=factory.pick_ip_in_network( IPNetwork(subnet.cidr), but_not=['192.168.0.1'])) iprange = IPRange( subnet=subnet, type=IPRANGE_TYPE.DYNAMIC, start_ip="192.168.0.2", end_ip="192.168.0.254", ) with ExpectedException(ValidationError, self.dynamic_overlaps): iprange.save()
def test__dynamic_range_cannot_overlap_user_reserved_address(self): subnet = make_plain_subnet() factory.make_StaticIPAddress( subnet=subnet, alloc_type=IPADDRESS_TYPE.USER_RESERVED, ip=factory.pick_ip_in_network( IPNetwork(subnet.cidr), but_not=['192.168.0.1'])) iprange = IPRange( subnet=subnet, type=IPRANGE_TYPE.DYNAMIC, start_ip="192.168.0.2", end_ip="192.168.0.254", ) with ExpectedException(ValidationError, self.dynamic_overlaps): iprange.save() # Regression for lp:1580772.
def test__skips_notify_on_controllerinfo_interface_update(self): yield deferToDatabase(register_websocket_triggers) listener = self.make_listener_without_delay() dv = DeferredValue() params = self.params.copy() controller = yield deferToDatabase(self.create_node, params) yield deferToDatabase(self.set_version, controller, '') listener.register(self.listener, lambda *args: dv.set(args)) yield listener.startService() try: yield deferToDatabase( self.set_interface_update_info, controller, '{]', '{}') with ExpectedException(CancelledError): yield dv.get(timeout=0.2) finally: yield listener.stopService()
def test_doesnt_sends_message_for_update_on_node_alloc_type_no_ip(self): yield deferToDatabase(register_system_triggers) node = yield deferToDatabase(self.create_node_with_interface) sip = yield deferToDatabase(self.get_node_ip_address, node) yield self.capturePublication() dv = DeferredValue() listener = self.make_listener_without_delay() listener.register( "sys_dns", lambda *args: dv.set(args)) yield listener.startService() try: yield deferToDatabase(self.update_staticipaddress, sip.id, { "alloc_type": IPADDRESS_TYPE.STICKY, }) with ExpectedException(CancelledError): yield dv.get(timeout=1) finally: yield listener.stopService()
def test_doesnt_send_message_for_nic_link_non_authorative_domain(self): yield deferToDatabase(register_system_triggers) domain = yield deferToDatabase( self.create_domain, {'authoritative': False}) node = yield deferToDatabase( self.create_node, {'domain': domain}) interface = yield deferToDatabase( self.create_interface, {'node': node}) subnet = yield deferToDatabase(self.create_subnet) yield self.capturePublication() dv = DeferredValue() listener = self.make_listener_without_delay() listener.register( "sys_dns", lambda *args: dv.set(args)) yield listener.startService() try: yield deferToDatabase(self.create_staticipaddress, { "interface": interface, "subnet": subnet, }) with ExpectedException(CancelledError): yield dv.get(timeout=1) finally: yield listener.stopService()
def test_doesnt_send_message_for_nic_unlink_non_authorative_domain(self): yield deferToDatabase(register_system_triggers) domain = yield deferToDatabase( self.create_domain, {'authoritative': False}) node = yield deferToDatabase( self.create_node, {'domain': domain}) interface = yield deferToDatabase( self.create_interface, {'node': node}) subnet = yield deferToDatabase(self.create_subnet) sip = yield deferToDatabase(self.create_staticipaddress, { "interface": interface, "subnet": subnet, }) yield self.capturePublication() dv = DeferredValue() listener = self.make_listener_without_delay() listener.register( "sys_dns", lambda *args: dv.set(args)) yield listener.startService() try: yield deferToDatabase(self.delete_staticipaddress, sip.id) with ExpectedException(CancelledError): yield dv.get(timeout=1) finally: yield listener.stopService()
def test_doesnt_send_message_for_subnet_delete_disabled_rdns(self): yield deferToDatabase(register_system_triggers) subnet = yield deferToDatabase( self.create_subnet, {'rdns_mode': RDNS_MODE.DISABLED}) yield self.capturePublication() dv = DeferredValue() listener = self.make_listener_without_delay() listener.register( "sys_dns", lambda *args: dv.set(args)) yield listener.startService() try: yield deferToDatabase(self.delete_subnet, subnet.id) with ExpectedException(CancelledError): yield dv.get(timeout=1) finally: yield listener.stopService()
def test__tryConnection_logs_error(self): listener = PostgresListenerService() exception_type = factory.make_exception_type() exception_message = factory.make_name("message") startConnection = self.patch(listener, "startConnection") startConnection.side_effect = exception_type(exception_message) with TwistedLoggerFixture() as logger: with ExpectedException(exception_type): yield listener.tryConnection() self.assertThat(logger.events, HasLength(1)) self.assertThat(logger.events[0], ContainsDict({ "log_format": Equals("Unable to connect to database: {error}"), "log_level": Equals(LogLevel.error), "error": Equals(exception_message), }))
def test__tryConnection_will_not_retry_if_autoReconnect_not_set(self): listener = PostgresListenerService() listener.autoReconnect = False exception_type = factory.make_exception_type() exception_message = factory.make_name("message") startConnection = self.patch(listener, "startConnection") startConnection.side_effect = exception_type(exception_message) deferLater = self.patch(listener_module, "deferLater") deferLater.return_value = sentinel.retry with ExpectedException(exception_type): yield listener.tryConnection() self.assertThat(deferLater, MockNotCalled())
def test__stopping_cancels_start(self): listener = PostgresListenerService() # Start then stop immediately, without waiting for start to complete. starting = listener.startService() starting_spy = DeferredValue() starting_spy.observe(starting) stopping = listener.stopService() # Both `starting` and `stopping` have callbacks yet to fire. self.assertThat(starting.callbacks, Not(Equals([]))) self.assertThat(stopping.callbacks, Not(Equals([]))) # Wait for the listener to stop. yield stopping # Neither `starting` nor `stopping` have callbacks. This is because # `stopping` chained itself onto the end of `starting`. self.assertThat(starting.callbacks, Equals([])) self.assertThat(stopping.callbacks, Equals([])) # Confirmation that `starting` was cancelled. with ExpectedException(CancelledError): yield starting_spy.get()
def test__non_superuser_reloads_user(self): user = factory.make_admin() handler = VLANHandler(user, {}) user.is_superuser = False user.save() rack = factory.make_RackController() vlan = factory.make_VLAN() factory.make_Interface(INTERFACE_TYPE.PHYSICAL, node=rack, vlan=vlan) vlan.dhcp_on = True vlan.primary_rack = rack vlan.save() factory.make_ipv4_Subnet_with_IPRanges(vlan=vlan) with ExpectedException(AssertionError, "Permission denied."): handler.configure_dhcp({ "id": vlan.id, "controllers": [] })
def test__configure_dhcp_gateway_outside_subnet_raises(self): user = factory.make_admin() handler = VLANHandler(user, {}) vlan = factory.make_VLAN() rack = factory.make_RackController() factory.make_Interface(INTERFACE_TYPE.PHYSICAL, node=rack, vlan=vlan) subnet = factory.make_Subnet( vlan=vlan, cidr="10.0.0.0/24", gateway_ip="") self.assertThat(subnet.get_dynamic_ranges().count(), Equals(0)) with ExpectedException(ValueError): handler.configure_dhcp({ "id": vlan.id, "controllers": [rack.system_id], "extra": { "subnet": subnet.id, "gateway": "1.0.0.1", "start": "10.0.0.2", "end": "10.0.0.99" } })
def test__configure_dhcp_gateway_inside_range_raises(self): user = factory.make_admin() handler = VLANHandler(user, {}) vlan = factory.make_VLAN() rack = factory.make_RackController() factory.make_Interface(INTERFACE_TYPE.PHYSICAL, node=rack, vlan=vlan) subnet = factory.make_Subnet( vlan=vlan, cidr="10.0.0.0/24", gateway_ip="") self.assertThat(subnet.get_dynamic_ranges().count(), Equals(0)) with ExpectedException(ValueError): handler.configure_dhcp({ "id": vlan.id, "controllers": [rack.system_id], "extra": { "subnet": subnet.id, "gateway": "10.0.0.1", "start": "10.0.0.1", "end": "10.0.0.99" } }) vlan = reload_object(vlan)