Python testtools 模块,ExpectedException() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用testtools.ExpectedException()

项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def test_parse_invalid_port(self):
        """
        When a listen address is parsed with an invalid port, an error is
        raised.
        """
        with ExpectedException(
                ValueError,
                r"'foo' does not appear to be a valid port number"):
            parse_listen_addr(':foo')

        with ExpectedException(
                ValueError,
                r"'0' does not appear to be a valid port number"):
            parse_listen_addr(':0')

        with ExpectedException(
                ValueError,
                r"'65536' does not appear to be a valid port number"):
            parse_listen_addr(':65536')

        with ExpectedException(
                ValueError,
                r"'' does not appear to be a valid port number"):
            parse_listen_addr(':')
项目:docker-ci-deploy    作者:praekeltfoundation    | 项目源码 | 文件源码
def test_image_required(self, capfd):
        """
        When the main function is given no image argument, it should exit with
        a return code of 2 and inform the user of the missing argument.
        """
        with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))):
            main(['--tag', 'abc'])

        out, err = capfd.readouterr()
        assert_that(out, Equals(''))

        # More useful error message added to argparse in Python 3
        if sys.version_info >= (3,):
            # Use re.DOTALL so that '.*' also matches newlines
            assert_that(err, MatchesRegex(
                r'.*error: the following arguments are required: image$',
                re.DOTALL
            ))
        else:
            assert_that(
                err, MatchesRegex(r'.*error: too few arguments$', re.DOTALL))
项目:docker-ci-deploy    作者:praekeltfoundation    | 项目源码 | 文件源码
def test_semver_precision_requires_version_semver(self, capfd):
        """
        When the main function is given the `--semver-precision` option but no
        `--version-semver` option, it should exit with a return code of 2 and
        inform the user of the missing option.
        """
        with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))):
            main(['--semver-precision', '2', 'test-image:abc'])

        out, err = capfd.readouterr()
        assert_that(out, Equals(''))
        assert_that(err, MatchesRegex(
            r'.*error: the --semver-precision option requires '
            r'--version-semver$',
            re.DOTALL
        ))
项目:docker-ci-deploy    作者:praekeltfoundation    | 项目源码 | 文件源码
def test_version_semver_requires_argument(self, capfd):
        """
        When the main function is given the `--version-semver` option without
        an argument, an error should be raised.
        """
        with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))):
            main([
                '--version', '1.2.3',
                '--version-semver',
                '--semver-precision',
                '--', 'test-image',
            ])

        out, err = capfd.readouterr()
        assert_that(out, Equals(''))
        assert_that(err, MatchesRegex(
            r'.*error: argument -P/--semver-precision: expected one argument$',
            re.DOTALL
        ))
项目:networking-avaya    作者:openstack    | 项目源码 | 文件源码
def test_validate_provider_segment(self):
        segment = {api.NETWORK_TYPE: self.TYPE,
                   api.PHYSICAL_NETWORK: "phys_net",
                   api.SEGMENTATION_ID: None}

        with testtools.ExpectedException(exc.InvalidInput, ".*specified.*"):
            self.driver.validate_provider_segment(segment)

        segment[api.PHYSICAL_NETWORK] = None
        self.driver.validate_provider_segment(segment)

        segment[api.SEGMENTATION_ID] = 1
        self.driver.validate_provider_segment(segment)

        segment = {api.NETWORK_TYPE: self.TYPE,
                   api.SEGMENTATION_ID: None,
                   'bad_key': "bad_value"}

        with testtools.ExpectedException(exc.InvalidInput, ".*prohibited.*"):
            self.driver.validate_provider_segment(segment)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_status_with_file_bad_encoder_fails(self):
        node = factory.make_Node(
            interface=True, status=NODE_STATUS.COMMISSIONING)
        contents = b'These are the contents of the file.'
        encoded_content = encode_as_base64(bz2.compress(contents))
        payload = {
            'event_type': 'finish',
            'result': 'FAILURE',
            'origin': 'curtin',
            'name': 'commissioning',
            'description': 'Commissioning',
            'timestamp': datetime.utcnow(),
            'files': [
                {
                    "path": "sample.txt",
                    "encoding": "uuencode",
                    "compression": "bzip2",
                    "content": encoded_content
                }
            ]
        }
        with ExpectedException(ValueError):
            self.processMessage(node, payload)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_status_with_file_bad_compression_fails(self):
        node = factory.make_Node(
            interface=True, status=NODE_STATUS.COMMISSIONING)
        contents = b'These are the contents of the file.'
        encoded_content = encode_as_base64(bz2.compress(contents))
        payload = {
            'event_type': 'finish',
            'result': 'FAILURE',
            'origin': 'curtin',
            'name': 'commissioning',
            'description': 'Commissioning',
            'timestamp': datetime.utcnow(),
            'files': [
                {
                    "path": "sample.txt",
                    "encoding": "base64",
                    "compression": "jpeg",
                    "content": encoded_content
                }
            ]
        }
        with ExpectedException(ValueError):
            self.processMessage(node, payload)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__create_node_fails_with_invalid_hostname(self):
        self.prepare_rack_rpc()

        mac_addresses = [
            factory.make_mac_address() for _ in range(3)]
        architecture = make_usable_architecture(self)
        hostname = random.choice([
            "---",
            "Microsoft Windows",
            ])
        power_type = random.choice(self.power_types)['name']
        power_parameters = {}

        with ExpectedException(ValidationError):
            create_node(
                architecture, power_type, power_parameters,
                mac_addresses, hostname=hostname)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_cannot_save_if_size_larger_than_volume_group(self):
        filesystem_group = factory.make_FilesystemGroup(
            group_type=FILESYSTEM_GROUP_TYPE.LVM_VG)
        factory.make_VirtualBlockDevice(
            filesystem_group=filesystem_group,
            size=filesystem_group.get_size() / 2)
        new_block_device_size = filesystem_group.get_size()
        human_readable_size = human_readable_bytes(new_block_device_size)
        with ExpectedException(
                ValidationError,
                re.escape(
                    "{'__all__': ['There is not enough free space (%s) "
                    "on volume group %s.']}" % (
                        human_readable_size,
                        filesystem_group.name,
                    ))):
            factory.make_VirtualBlockDevice(
                filesystem_group=filesystem_group,
                size=new_block_device_size)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_cannot_save_volume_group_if_logical_volumes_larger(self):
        node = factory.make_Node()
        filesystem_one = factory.make_Filesystem(
            fstype=FILESYSTEM_TYPE.LVM_PV,
            block_device=factory.make_PhysicalBlockDevice(node=node))
        filesystem_two = factory.make_Filesystem(
            fstype=FILESYSTEM_TYPE.LVM_PV,
            block_device=factory.make_PhysicalBlockDevice(node=node))
        filesystems = [
            filesystem_one,
            filesystem_two,
        ]
        volume_group = factory.make_FilesystemGroup(
            group_type=FILESYSTEM_GROUP_TYPE.LVM_VG,
            filesystems=filesystems)
        factory.make_VirtualBlockDevice(
            size=volume_group.get_size(), filesystem_group=volume_group)
        filesystem_two.delete()
        with ExpectedException(
                ValidationError,
                re.escape(
                    "['Volume group cannot be smaller than its "
                    "logical volumes.']")):
            volume_group.save()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_cannot_save_raid_0_with_spare_raid_devices(self):
        node = factory.make_Node()
        filesystems = [
            factory.make_Filesystem(
                fstype=FILESYSTEM_TYPE.RAID,
                block_device=factory.make_PhysicalBlockDevice(node=node))
            for _ in range(2)
        ]
        filesystems.append(
            factory.make_Filesystem(
                fstype=FILESYSTEM_TYPE.RAID_SPARE,
                block_device=factory.make_PhysicalBlockDevice(node=node)))
        with ExpectedException(
                ValidationError,
                re.escape(
                    "{'__all__': ['RAID level 0 must have at least 2 raid "
                    "devices and no spares.']}")):
            factory.make_FilesystemGroup(
                group_type=FILESYSTEM_GROUP_TYPE.RAID_0,
                filesystems=filesystems)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_cannot_save_raid_5_with_less_than_3_raid_devices(self):
        node = factory.make_Node()
        filesystems = [
            factory.make_Filesystem(
                fstype=FILESYSTEM_TYPE.RAID,
                block_device=factory.make_PhysicalBlockDevice(node=node))
            for _ in range(random.randint(1, 2))
        ]
        with ExpectedException(
                ValidationError,
                re.escape(
                    "{'__all__': ['RAID level 5 must have at least 3 raid "
                    "devices and any number of spares.']}")):
            factory.make_FilesystemGroup(
                group_type=FILESYSTEM_GROUP_TYPE.RAID_5,
                filesystems=filesystems)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_cannot_save_raid_10_with_less_than_3_raid_devices(self):
        node = factory.make_Node()
        filesystems = [
            factory.make_Filesystem(
                fstype=FILESYSTEM_TYPE.RAID,
                block_device=factory.make_PhysicalBlockDevice(node=node))
            for _ in range(random.randint(1, 2))
        ]
        with ExpectedException(
                ValidationError,
                re.escape(
                    "{'__all__': ['RAID level 10 must have at least 3 raid "
                    "devices and any number of spares.']}")):
            factory.make_FilesystemGroup(
                group_type=FILESYSTEM_GROUP_TYPE.RAID_10,
                filesystems=filesystems)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_cannot_save_bcache_without_cache_set(self):
        node = factory.make_Node()
        filesystems = [
            factory.make_Filesystem(
                fstype=FILESYSTEM_TYPE.BCACHE_BACKING,
                block_device=factory.make_PhysicalBlockDevice(node=node)),
        ]
        with ExpectedException(
                ValidationError,
                re.escape(
                    "{'__all__': ['Bcache requires an assigned cache "
                    "set.']}")):
            filesystem_group = factory.make_FilesystemGroup(
                group_type=FILESYSTEM_GROUP_TYPE.BCACHE,
                filesystems=filesystems)
            filesystem_group.cache_set = None
            filesystem_group.save()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_cannot_save_bcache_with_logical_volume_as_backing(self):
        node = factory.make_Node()
        cache_set = factory.make_CacheSet(node=node)
        filesystems = [
            factory.make_Filesystem(
                fstype=FILESYSTEM_TYPE.BCACHE_BACKING,
                block_device=factory.make_VirtualBlockDevice(node=node)),
        ]
        with ExpectedException(
                ValidationError,
                re.escape(
                    "{'__all__': ['Bcache cannot use a logical volume as a "
                    "backing device.']}")):
            factory.make_FilesystemGroup(
                group_type=FILESYSTEM_GROUP_TYPE.BCACHE,
                cache_set=cache_set,
                filesystems=filesystems)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_cannot_save_bcache_with_multiple_backings(self):
        node = factory.make_Node()
        cache_set = factory.make_CacheSet(node=node)
        filesystems = [
            factory.make_Filesystem(
                fstype=FILESYSTEM_TYPE.BCACHE_BACKING,
                block_device=factory.make_PhysicalBlockDevice(node=node))
            for _ in range(random.randint(2, 10))
        ]
        with ExpectedException(
                ValidationError,
                re.escape(
                    "{'__all__': ['Bcache can only contain one backing "
                    "device.']}")):
            factory.make_FilesystemGroup(
                group_type=FILESYSTEM_GROUP_TYPE.BCACHE,
                cache_set=cache_set,
                filesystems=filesystems)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_create_raid_0_with_one_element_fails(self):
        node = factory.make_Node()
        block_device = factory.make_PhysicalBlockDevice(node=node)
        uuid = str(uuid4())
        with ExpectedException(
                ValidationError,
                re.escape(
                    "{'__all__': ['RAID level 0 must have at least 2 raid "
                    "devices and no spares.']}")):
            RAID.objects.create_raid(
                name='md0',
                level=FILESYSTEM_GROUP_TYPE.RAID_0,
                uuid=uuid,
                block_devices=[block_device],
                partitions=[],
                spare_devices=[],
                spare_partitions=[])
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_create_raid_1_with_one_element_fails(self):
        node = factory.make_Node()
        block_device = factory.make_PhysicalBlockDevice(node=node)
        uuid = str(uuid4())
        with ExpectedException(
                ValidationError,
                re.escape(
                    "{'__all__': ['RAID level 1 must have at least 2 raid "
                    "devices and any number of spares.']}")):
            RAID.objects.create_raid(
                name='md0',
                level=FILESYSTEM_GROUP_TYPE.RAID_1,
                uuid=uuid,
                block_devices=[block_device],
                partitions=[],
                spare_devices=[],
                spare_partitions=[])
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_create_raid_5_with_2_elements_fails(self):
        node = factory.make_Node()
        block_devices = [
            factory.make_PhysicalBlockDevice(node=node, size=10 * 1000 ** 4)
            for _ in range(2)
        ]
        uuid = str(uuid4())
        with ExpectedException(
                ValidationError,
                re.escape(
                    "{'__all__': ['RAID level 5 must have at least 3 raid "
                    "devices and any number of spares.']}")):
            RAID.objects.create_raid(
                name='md0',
                level=FILESYSTEM_GROUP_TYPE.RAID_5,
                uuid=uuid,
                block_devices=block_devices,
                partitions=[],
                spare_devices=[],
                spare_partitions=[])
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_create_raid_6_with_3_elements_fails(self):
        node = factory.make_Node()
        block_devices = [
            factory.make_PhysicalBlockDevice(node=node)
            for _ in range(3)
            ]
        uuid = str(uuid4())
        with ExpectedException(
                ValidationError,
                re.escape(
                    "{'__all__': ['RAID level 6 must have at least 4 raid "
                    "devices and any number of spares.']}")):
            RAID.objects.create_raid(
                name='md0',
                level=FILESYSTEM_GROUP_TYPE.RAID_6,
                uuid=uuid,
                block_devices=block_devices,
                partitions=[],
                spare_devices=[],
                spare_partitions=[])
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_create_raid_with_block_device_from_other_node_fails(self):
        node1 = factory.make_Node()
        node2 = factory.make_Node()
        block_devices_1 = [
            factory.make_PhysicalBlockDevice(node=node1)
            for _ in range(5)
        ]
        block_devices_2 = [
            factory.make_PhysicalBlockDevice(node=node2)
            for _ in range(5)
        ]
        uuid = str(uuid4())
        with ExpectedException(
                ValidationError,
                re.escape(
                    "{'__all__': ['All added filesystems must belong to the "
                    "same node.']}")):
            RAID.objects.create_raid(
                name='md0',
                level=FILESYSTEM_GROUP_TYPE.RAID_1,
                uuid=uuid,
                block_devices=block_devices_1 + block_devices_2,
                partitions=[],
                spare_devices=[],
                spare_partitions=[])
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_add_device_from_another_node_to_array_fails(self):
        node = factory.make_Node()
        other_node = factory.make_Node()
        device_size = 10 * 1000 ** 4
        block_devices = [
            factory.make_PhysicalBlockDevice(node=node, size=device_size)
            for _ in range(10)
        ]
        uuid = str(uuid4())
        raid = RAID.objects.create_raid(
            name='md0',
            level=FILESYSTEM_GROUP_TYPE.RAID_5,
            uuid=uuid,
            block_devices=block_devices)
        device = factory.make_PhysicalBlockDevice(
            node=other_node, size=device_size)
        with ExpectedException(
            ValidationError,
            re.escape(
                "['Device needs to be from the same node as the rest of the "
                "array.']")):
            raid.add_device(device, FILESYSTEM_TYPE.RAID)
        self.assertEqual(10, raid.filesystems.count())  # Still 10 devices
        self.assertEqual(9 * device_size, raid.get_size())
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_add_partition_from_another_node_to_array_fails(self):
        node = factory.make_Node()
        other_node = factory.make_Node()
        device_size = 10 * 1000 ** 4
        block_devices = [
            factory.make_PhysicalBlockDevice(node=node, size=device_size)
            for _ in range(10)
        ]
        uuid = str(uuid4())
        raid = RAID.objects.create_raid(
            name='md0',
            level=FILESYSTEM_GROUP_TYPE.RAID_5,
            uuid=uuid,
            block_devices=block_devices)
        partition = factory.make_PartitionTable(
            block_device=factory.make_PhysicalBlockDevice(
                node=other_node, size=device_size)).add_partition()
        with ExpectedException(
                ValidationError,
                re.escape(
                    "['Partition must be on a device from the same node as "
                    "the rest of the array.']")):
            raid.add_partition(partition, FILESYSTEM_TYPE.RAID)
        self.assertEqual(10, raid.filesystems.count())  # Nothing added
        self.assertEqual(9 * device_size, raid.get_size())
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_add_already_used_device_to_array_fails(self):
        node = factory.make_Node()
        device_size = 10 * 1000 ** 4
        block_devices = [
            factory.make_PhysicalBlockDevice(node=node, size=device_size)
            for _ in range(10)
        ]
        uuid = str(uuid4())
        raid = RAID.objects.create_raid(
            name='md0',
            level=FILESYSTEM_GROUP_TYPE.RAID_5,
            uuid=uuid,
            block_devices=block_devices)
        device = factory.make_PhysicalBlockDevice(node=node, size=device_size)
        Filesystem.objects.create(
            block_device=device, mount_point='/export/home',
            fstype=FILESYSTEM_TYPE.EXT4)
        with ExpectedException(
            ValidationError,
            re.escape(
                "['There is another filesystem on this device.']")):
            raid.add_device(device, FILESYSTEM_TYPE.RAID)
        self.assertEqual(10, raid.filesystems.count())  # Nothing added.
        self.assertEqual(9 * device_size, raid.get_size())
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_remove_device_from_array_fails(self):
        node = factory.make_Node()
        device_size = 10 * 1000 ** 4
        block_devices = [
            factory.make_PhysicalBlockDevice(node=node, size=device_size)
            for _ in range(10)
        ]
        uuid = str(uuid4())
        raid = RAID.objects.create_raid(
            name='md0',
            level=FILESYSTEM_GROUP_TYPE.RAID_5,
            uuid=uuid,
            block_devices=block_devices)
        with ExpectedException(
                ValidationError,
                re.escape("['Device does not belong to this array.']")):
            raid.remove_device(
                factory.make_PhysicalBlockDevice(node=node, size=device_size))
        self.assertEqual(10, raid.filesystems.count())
        self.assertEqual(9 * device_size, raid.get_size())
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_allows_multiple_records_unless_cname(self):
        dnsdata = factory.make_DNSData(no_ip_addresses=True)
        if dnsdata.rrtype == 'CNAME':
            with ExpectedException(
                    ValidationError,
                    re.escape(
                        "{'__all__': ['%s']}" % MULTI_CNAME_MSG)):
                factory.make_DNSData(
                    dnsresource=dnsdata.dnsresource,
                    rrtype='CNAME')
        else:
            factory.make_DNSData(
                dnsresource=dnsdata.dnsresource,
                rrtype=dnsdata.rrtype)
            self.assertEqual(2, DNSData.objects.filter(
                dnsresource=dnsdata.dnsresource).count())
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__switch_static_to_already_used_ip_address(self):
        interface = factory.make_Interface(INTERFACE_TYPE.PHYSICAL)
        subnet = factory.make_Subnet(vlan=interface.vlan)
        static_ip = factory.make_StaticIPAddress(
            alloc_type=IPADDRESS_TYPE.STICKY,
            ip=factory.pick_ip_in_Subnet(subnet),
            subnet=subnet, interface=interface)
        other_interface = factory.make_Interface(INTERFACE_TYPE.PHYSICAL)
        used_ip_address = factory.pick_ip_in_Subnet(
            subnet, but_not=[static_ip.ip])
        factory.make_StaticIPAddress(
            alloc_type=IPADDRESS_TYPE.STICKY,
            ip=used_ip_address,
            subnet=subnet, interface=other_interface)
        with ExpectedException(StaticIPAddressUnavailable):
            interface.update_ip_address(
                static_ip, INTERFACE_LINK_TYPE.STATIC, subnet,
                ip_address=used_ip_address)
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__claim_fails_if_subnet_missing(self):
        with transaction.atomic():
            interface = factory.make_Interface(INTERFACE_TYPE.PHYSICAL)
            subnet = factory.make_Subnet(vlan=interface.vlan)
            ip = factory.make_StaticIPAddress(
                alloc_type=IPADDRESS_TYPE.AUTO, ip="",
                subnet=subnet, interface=interface)
            ip.subnet = None
            ip.save()
            maaslog = self.patch_autospec(interface_module, "maaslog")
        with transaction.atomic():
            with ExpectedException(StaticIPAddressUnavailable):
                interface.claim_auto_ips()
        self.expectThat(maaslog.error, MockCalledOnceWith(
            "Could not find subnet for interface %s." %
            interface.get_log_string()))
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__lock_released_on_error(self):
        exception = factory.make_exception()
        self.patch(node_module, 'refresh').side_effect = exception
        region = yield deferToDatabase(factory.make_RegionController)
        self.patch(node_module, 'get_maas_id').return_value = region.system_id
        self.patch(node_module, 'get_sys_info').return_value = {
            'hostname': region.hostname,
            'architecture': region.architecture,
            'osystem': '',
            'distro_series': '',
            'interfaces': {},
        }
        with ExpectedException(type(exception)):
            yield region.refresh()
        lock = NamedLock('refresh')
        self.assertFalse(lock.is_locked())
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__no_save_duplicate_ipranges(self):
        subnet = make_plain_subnet()
        IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.100",
            end_ip="192.168.0.150",
        ).save()
        # Make the same range again, should fail to save.
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.100",
            end_ip="192.168.0.150",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__no_save_range_overlap_begin(self):
        subnet = make_plain_subnet()
        IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.100",
            end_ip="192.168.0.150",
        ).save()
        # Make an overlapping range across start_ip, should fail to save.
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.90",
            end_ip="192.168.0.100",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
        # Try as reserved range.
        iprange.type = IPRANGE_TYPE.RESERVED
        with ExpectedException(ValidationError, self.reserved_overlaps):
            iprange.save()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__no_save_range_overlap_end(self):
        subnet = make_plain_subnet()
        IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.100",
            end_ip="192.168.0.150",
        ).save()
        # Make an overlapping range across end_ip, should fail to save.
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.140",
            end_ip="192.168.0.160",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__no_save_range_within_ranges(self):
        subnet = make_plain_subnet()
        IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.100",
            end_ip="192.168.0.150",
        ).save()
        # Make a contained range, should not save.
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.110",
            end_ip="192.168.0.140",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__no_save_range_within_existing_range(self):
        subnet = make_plain_subnet()
        IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.100",
            end_ip="192.168.0.150",
        ).save()
        # Make a contained range, should not save.
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.110",
            end_ip="192.168.0.140",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__no_save_range_within_existing_reserved_range(self):
        subnet = make_plain_subnet()
        IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.RESERVED,
            start_ip="192.168.0.100",
            end_ip="192.168.0.150",
        ).save()
        # Make a contained range, should not save.
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.110",
            end_ip="192.168.0.140",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__reserved_range_cannot_overlap_reserved_ranges(self):
        subnet = factory.make_Subnet(
            cidr='192.168.0.0/24',
            gateway_ip='192.168.0.1',
            dns_servers=['192.168.0.50', '192.168.0.200'])
        IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.RESERVED,
            start_ip="192.168.0.1",
            end_ip="192.168.0.250",
        ).save()
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.RESERVED,
            start_ip="192.168.0.250",
            end_ip="192.168.0.254",
        )
        with ExpectedException(ValidationError, self.reserved_overlaps):
            iprange.save()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__dynamic_range_cannot_overlap_auto_address(self):
        subnet = make_plain_subnet()
        factory.make_StaticIPAddress(
            subnet=subnet,
            alloc_type=IPADDRESS_TYPE.AUTO,
            ip=factory.pick_ip_in_network(
                IPNetwork(subnet.cidr),
                but_not=['192.168.0.1']))
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.2",
            end_ip="192.168.0.254",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__dynamic_range_cannot_overlap_sticky_address(self):
        subnet = make_plain_subnet()
        factory.make_StaticIPAddress(
            subnet=subnet,
            alloc_type=IPADDRESS_TYPE.STICKY,
            ip=factory.pick_ip_in_network(
                IPNetwork(subnet.cidr),
                but_not=['192.168.0.1']))
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.2",
            end_ip="192.168.0.254",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__dynamic_range_cannot_overlap_user_reserved_address(self):
        subnet = make_plain_subnet()
        factory.make_StaticIPAddress(
            subnet=subnet,
            alloc_type=IPADDRESS_TYPE.USER_RESERVED,
            ip=factory.pick_ip_in_network(
                IPNetwork(subnet.cidr),
                but_not=['192.168.0.1']))
        iprange = IPRange(
            subnet=subnet,
            type=IPRANGE_TYPE.DYNAMIC,
            start_ip="192.168.0.2",
            end_ip="192.168.0.254",
        )
        with ExpectedException(ValidationError, self.dynamic_overlaps):
            iprange.save()

    # Regression for lp:1580772.
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__skips_notify_on_controllerinfo_interface_update(self):
        yield deferToDatabase(register_websocket_triggers)
        listener = self.make_listener_without_delay()
        dv = DeferredValue()
        params = self.params.copy()
        controller = yield deferToDatabase(self.create_node, params)
        yield deferToDatabase(self.set_version, controller, '')
        listener.register(self.listener, lambda *args: dv.set(args))
        yield listener.startService()
        try:
            yield deferToDatabase(
                self.set_interface_update_info, controller, '{]', '{}')
            with ExpectedException(CancelledError):
                yield dv.get(timeout=0.2)
        finally:
            yield listener.stopService()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_doesnt_sends_message_for_update_on_node_alloc_type_no_ip(self):
        yield deferToDatabase(register_system_triggers)
        node = yield deferToDatabase(self.create_node_with_interface)
        sip = yield deferToDatabase(self.get_node_ip_address, node)
        yield self.capturePublication()
        dv = DeferredValue()
        listener = self.make_listener_without_delay()
        listener.register(
            "sys_dns", lambda *args: dv.set(args))
        yield listener.startService()
        try:
            yield deferToDatabase(self.update_staticipaddress, sip.id, {
                "alloc_type": IPADDRESS_TYPE.STICKY,
            })
            with ExpectedException(CancelledError):
                yield dv.get(timeout=1)
        finally:
            yield listener.stopService()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_doesnt_send_message_for_nic_link_non_authorative_domain(self):
        yield deferToDatabase(register_system_triggers)
        domain = yield deferToDatabase(
            self.create_domain, {'authoritative': False})
        node = yield deferToDatabase(
            self.create_node, {'domain': domain})
        interface = yield deferToDatabase(
            self.create_interface, {'node': node})
        subnet = yield deferToDatabase(self.create_subnet)
        yield self.capturePublication()
        dv = DeferredValue()
        listener = self.make_listener_without_delay()
        listener.register(
            "sys_dns", lambda *args: dv.set(args))
        yield listener.startService()
        try:
            yield deferToDatabase(self.create_staticipaddress, {
                "interface": interface,
                "subnet": subnet,
            })
            with ExpectedException(CancelledError):
                yield dv.get(timeout=1)
        finally:
            yield listener.stopService()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_doesnt_send_message_for_nic_unlink_non_authorative_domain(self):
        yield deferToDatabase(register_system_triggers)
        domain = yield deferToDatabase(
            self.create_domain, {'authoritative': False})
        node = yield deferToDatabase(
            self.create_node, {'domain': domain})
        interface = yield deferToDatabase(
            self.create_interface, {'node': node})
        subnet = yield deferToDatabase(self.create_subnet)
        sip = yield deferToDatabase(self.create_staticipaddress, {
            "interface": interface,
            "subnet": subnet,
        })
        yield self.capturePublication()
        dv = DeferredValue()
        listener = self.make_listener_without_delay()
        listener.register(
            "sys_dns", lambda *args: dv.set(args))
        yield listener.startService()
        try:
            yield deferToDatabase(self.delete_staticipaddress, sip.id)
            with ExpectedException(CancelledError):
                yield dv.get(timeout=1)
        finally:
            yield listener.stopService()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test_doesnt_send_message_for_subnet_delete_disabled_rdns(self):
        yield deferToDatabase(register_system_triggers)
        subnet = yield deferToDatabase(
            self.create_subnet, {'rdns_mode': RDNS_MODE.DISABLED})
        yield self.capturePublication()
        dv = DeferredValue()
        listener = self.make_listener_without_delay()
        listener.register(
            "sys_dns", lambda *args: dv.set(args))
        yield listener.startService()
        try:
            yield deferToDatabase(self.delete_subnet, subnet.id)
            with ExpectedException(CancelledError):
                yield dv.get(timeout=1)
        finally:
            yield listener.stopService()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__tryConnection_logs_error(self):
        listener = PostgresListenerService()

        exception_type = factory.make_exception_type()
        exception_message = factory.make_name("message")

        startConnection = self.patch(listener, "startConnection")
        startConnection.side_effect = exception_type(exception_message)

        with TwistedLoggerFixture() as logger:
            with ExpectedException(exception_type):
                yield listener.tryConnection()

        self.assertThat(logger.events, HasLength(1))
        self.assertThat(logger.events[0], ContainsDict({
            "log_format": Equals("Unable to connect to database: {error}"),
            "log_level": Equals(LogLevel.error),
            "error": Equals(exception_message),
        }))
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__tryConnection_will_not_retry_if_autoReconnect_not_set(self):
        listener = PostgresListenerService()
        listener.autoReconnect = False

        exception_type = factory.make_exception_type()
        exception_message = factory.make_name("message")

        startConnection = self.patch(listener, "startConnection")
        startConnection.side_effect = exception_type(exception_message)
        deferLater = self.patch(listener_module, "deferLater")
        deferLater.return_value = sentinel.retry

        with ExpectedException(exception_type):
            yield listener.tryConnection()

        self.assertThat(deferLater, MockNotCalled())
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__stopping_cancels_start(self):
        listener = PostgresListenerService()

        # Start then stop immediately, without waiting for start to complete.
        starting = listener.startService()
        starting_spy = DeferredValue()
        starting_spy.observe(starting)
        stopping = listener.stopService()

        # Both `starting` and `stopping` have callbacks yet to fire.
        self.assertThat(starting.callbacks, Not(Equals([])))
        self.assertThat(stopping.callbacks, Not(Equals([])))

        # Wait for the listener to stop.
        yield stopping

        # Neither `starting` nor `stopping` have callbacks. This is because
        # `stopping` chained itself onto the end of `starting`.
        self.assertThat(starting.callbacks, Equals([]))
        self.assertThat(stopping.callbacks, Equals([]))

        # Confirmation that `starting` was cancelled.
        with ExpectedException(CancelledError):
            yield starting_spy.get()
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__non_superuser_reloads_user(self):
        user = factory.make_admin()
        handler = VLANHandler(user, {})
        user.is_superuser = False
        user.save()
        rack = factory.make_RackController()
        vlan = factory.make_VLAN()
        factory.make_Interface(INTERFACE_TYPE.PHYSICAL, node=rack, vlan=vlan)
        vlan.dhcp_on = True
        vlan.primary_rack = rack
        vlan.save()
        factory.make_ipv4_Subnet_with_IPRanges(vlan=vlan)
        with ExpectedException(AssertionError, "Permission denied."):
            handler.configure_dhcp({
                "id": vlan.id,
                "controllers": []
            })
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__configure_dhcp_gateway_outside_subnet_raises(self):
        user = factory.make_admin()
        handler = VLANHandler(user, {})
        vlan = factory.make_VLAN()
        rack = factory.make_RackController()
        factory.make_Interface(INTERFACE_TYPE.PHYSICAL, node=rack, vlan=vlan)
        subnet = factory.make_Subnet(
            vlan=vlan, cidr="10.0.0.0/24", gateway_ip="")
        self.assertThat(subnet.get_dynamic_ranges().count(), Equals(0))
        with ExpectedException(ValueError):
            handler.configure_dhcp({
                "id": vlan.id,
                "controllers": [rack.system_id],
                "extra": {
                    "subnet": subnet.id,
                    "gateway": "1.0.0.1",
                    "start": "10.0.0.2",
                    "end": "10.0.0.99"
                }
            })
项目:maas    作者:maas    | 项目源码 | 文件源码
def test__configure_dhcp_gateway_inside_range_raises(self):
        user = factory.make_admin()
        handler = VLANHandler(user, {})
        vlan = factory.make_VLAN()
        rack = factory.make_RackController()
        factory.make_Interface(INTERFACE_TYPE.PHYSICAL, node=rack, vlan=vlan)
        subnet = factory.make_Subnet(
            vlan=vlan, cidr="10.0.0.0/24", gateway_ip="")
        self.assertThat(subnet.get_dynamic_ranges().count(), Equals(0))
        with ExpectedException(ValueError):
            handler.configure_dhcp({
                "id": vlan.id,
                "controllers": [rack.system_id],
                "extra": {
                    "subnet": subnet.id,
                    "gateway": "10.0.0.1",
                    "start": "10.0.0.1",
                    "end": "10.0.0.99"
                }
            })
        vlan = reload_object(vlan)