Python netaddr 模块,cidr_merge() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用netaddr.cidr_merge()

项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ipnetwork_cidr_merge():
    ip_list = (
        list(IPNetwork('fe80::/120')) +
        [
            IPNetwork('192.0.2.0/24'),
            IPNetwork('192.0.4.0/25'),
            IPNetwork('192.0.4.128/25'),
        ]
        + list(map(str, IPNetwork('192.0.3.0/24')))
    )
    assert len(ip_list) == 515

    assert cidr_merge(ip_list) == [
        IPNetwork('192.0.2.0/23'),
        IPNetwork('192.0.4.0/24'),
        IPNetwork('fe80::/120'),
    ]
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_ipnetwork_cidr_merge():
    ip_list = (
        list(IPNetwork('fe80::/120')) +
        [
            IPNetwork('192.0.2.0/24'),
            IPNetwork('192.0.4.0/25'),
            IPNetwork('192.0.4.128/25'),
        ]
        + list(map(str, IPNetwork('192.0.3.0/24')))
    )
    assert len(ip_list) == 515

    assert cidr_merge(ip_list) == [
        IPNetwork('192.0.2.0/23'),
        IPNetwork('192.0.4.0/24'),
        IPNetwork('fe80::/120'),
    ]
项目:scripts    作者:vulnersCom    | 项目源码 | 文件源码
def getRadarAs(asNumber):
    radarResponse = requests.get("https://radar.qrator.net/api/prefixes/%s?tab_id=current&page=1" % asNumber).json()
    totalPrefixes = int(radarResponse.get('total'))
    initalPageSoup = bs4.BeautifulSoup(radarResponse.get('page'), "html.parser")
    networkRawSet = set()
    for a in initalPageSoup.find_all(text=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d+?$")):
        networkRawSet.add("%s" % a)
    startPage = 1
    while len(networkRawSet) < totalPrefixes:
        radarResponse = requests.get("https://radar.qrator.net/api/prefixes/%s?tab_id=current&page=%s" % (asNumber, startPage)).json()
        pageSoup = bs4.BeautifulSoup(radarResponse.get('page'), "html.parser")
        for a in pageSoup.find_all(text=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d+?$")):
            networkRawSet.add("%s" % a)
        startPage += 1
    # Now minimize this shit
    networkSet = netaddr.IPSet([netaddr.IPNetwork(item) for item in networkRawSet])
    mergedNetworks = netaddr.cidr_merge(networkSet)
    if not mergedNetworks:
        print("Nothing found. Wrong AS number?")
    else:
        print("\n".join(["%s" % network for network in mergedNetworks]))
项目:aCloudGuru-Event-Driven-Security    作者:mikegchambers    | 项目源码 | 文件源码
def test_ipnetwork_cidr_merge():
    ip_list = (
        list(IPNetwork('fe80::/120')) +
        [
            IPNetwork('192.0.2.0/24'),
            IPNetwork('192.0.4.0/25'),
            IPNetwork('192.0.4.128/25'),
        ]
        + list(map(str, IPNetwork('192.0.3.0/24')))
    )
    assert len(ip_list) == 515

    assert cidr_merge(ip_list) == [
        IPNetwork('192.0.2.0/23'),
        IPNetwork('192.0.4.0/24'),
        IPNetwork('fe80::/120'),
    ]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_whole_network_cidr_merge_v6():
    assert cidr_merge(['::/0', 'fe80::1']) == [IPNetwork('::/0')]
    assert cidr_merge(['::/0', '::']) == [IPNetwork('::/0')]
    assert cidr_merge(['::/0', '::192.0.2.0/124', 'ff00::101']) == [IPNetwork('::/0')]
    assert cidr_merge(['0.0.0.0/0', '0.0.0.0', '::/0', '::']) == [IPNetwork('0.0.0.0/0'), IPNetwork('::/0')]
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_ip_range():
    ip_list = list(iter_iprange('192.0.2.1', '192.0.2.14'))

    assert len(ip_list) == 14

    assert ip_list == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.4'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
        IPAddress('192.0.2.8'),
        IPAddress('192.0.2.9'),
        IPAddress('192.0.2.10'),
        IPAddress('192.0.2.11'),
        IPAddress('192.0.2.12'),
        IPAddress('192.0.2.13'),
        IPAddress('192.0.2.14'),
    ]

    assert cidr_merge(ip_list) == [
        IPNetwork('192.0.2.1/32'),
        IPNetwork('192.0.2.2/31'),
        IPNetwork('192.0.2.4/30'),
        IPNetwork('192.0.2.8/30'),
        IPNetwork('192.0.2.12/31'),
        IPNetwork('192.0.2.14/32'),
    ]
项目:netcalc    作者:israel-lugo    | 项目源码 | 文件源码
def func(self, args):
        """Add networks together, aggregating as much as possible.

        Uses self._get_networks() to get the networks. Subclasses may
        want to redefine that method.

        """
        networks = self._get_networks(args)
        merged = netaddr.cidr_merge(networks)

        for i in merged:
            print(i)
项目:netcalc    作者:israel-lugo    | 项目源码 | 文件源码
def func(self, args):
        """Evaluate an expression of adding and subtracting networks."""

        expr = args.expression
        accum = [_network_address(expr.pop(0))]

        while len(expr) >= 2:
            operator = expr.pop(0)
            # right-hand side of the expression
            rhs = _network_address(expr.pop(0))

            if operator in ("+", "add", "merge"):
                # add (merge) in a new network
                accum = netaddr.cidr_merge(accum + [rhs])
            elif operator in ("-", "sub", "remove"):
                # subtract (remove) a network

                # for each network in accum, remove the RHS from it, then
                # chain everything into a single flat generator sequence
                # (RHS may be partially contained in more than one accum
                # network)
                minus_rhs = itertools.chain.from_iterable(
                                netaddr.cidr_exclude(network, rhs)
                                for network in accum
                            )
                accum = netaddr.cidr_merge(minus_rhs)
            else:
                raise CommandParseError("invalid operator '%s'" % operator)

        if expr:
            self.warn("ignoring extra argument '%s'" % ' '.join(expr))

        for i in accum:
            print(i)
项目:Cisco-ASA-ACL-toolkit    作者:AlekzNet    | 项目源码 | 文件源码
def unfold(objarr):
    for obj in objarr:
        unfold_rec(objarr[obj],objarr)
        if not args.noaggr and objarr is netgrp: objarr[obj] = netaddr.cidr_merge(objarr[obj])


# Unfold all included objects
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_whole_network_cidr_merge_v6():
    assert cidr_merge(['::/0', 'fe80::1']) == [IPNetwork('::/0')]
    assert cidr_merge(['::/0', '::']) == [IPNetwork('::/0')]
    assert cidr_merge(['::/0', '::192.0.2.0/124', 'ff00::101']) == [IPNetwork('::/0')]
    assert cidr_merge(['0.0.0.0/0', '0.0.0.0', '::/0', '::']) == [IPNetwork('0.0.0.0/0'), IPNetwork('::/0')]
项目:event-driven-security    作者:acantril    | 项目源码 | 文件源码
def test_ip_range():
    ip_list = list(iter_iprange('192.0.2.1', '192.0.2.14'))

    assert len(ip_list) == 14

    assert ip_list == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.4'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
        IPAddress('192.0.2.8'),
        IPAddress('192.0.2.9'),
        IPAddress('192.0.2.10'),
        IPAddress('192.0.2.11'),
        IPAddress('192.0.2.12'),
        IPAddress('192.0.2.13'),
        IPAddress('192.0.2.14'),
    ]

    assert cidr_merge(ip_list) == [
        IPNetwork('192.0.2.1/32'),
        IPNetwork('192.0.2.2/31'),
        IPNetwork('192.0.2.4/30'),
        IPNetwork('192.0.2.8/30'),
        IPNetwork('192.0.2.12/31'),
        IPNetwork('192.0.2.14/32'),
    ]
项目:aCloudGuru-Event-Driven-Security    作者:mikegchambers    | 项目源码 | 文件源码
def test_whole_network_cidr_merge_v6():
    assert cidr_merge(['::/0', 'fe80::1']) == [IPNetwork('::/0')]
    assert cidr_merge(['::/0', '::']) == [IPNetwork('::/0')]
    assert cidr_merge(['::/0', '::192.0.2.0/124', 'ff00::101']) == [IPNetwork('::/0')]
    assert cidr_merge(['0.0.0.0/0', '0.0.0.0', '::/0', '::']) == [IPNetwork('0.0.0.0/0'), IPNetwork('::/0')]
项目:aCloudGuru-Event-Driven-Security    作者:mikegchambers    | 项目源码 | 文件源码
def test_ip_range():
    ip_list = list(iter_iprange('192.0.2.1', '192.0.2.14'))

    assert len(ip_list) == 14

    assert ip_list == [
        IPAddress('192.0.2.1'),
        IPAddress('192.0.2.2'),
        IPAddress('192.0.2.3'),
        IPAddress('192.0.2.4'),
        IPAddress('192.0.2.5'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
        IPAddress('192.0.2.8'),
        IPAddress('192.0.2.9'),
        IPAddress('192.0.2.10'),
        IPAddress('192.0.2.11'),
        IPAddress('192.0.2.12'),
        IPAddress('192.0.2.13'),
        IPAddress('192.0.2.14'),
    ]

    assert cidr_merge(ip_list) == [
        IPNetwork('192.0.2.1/32'),
        IPNetwork('192.0.2.2/31'),
        IPNetwork('192.0.2.4/30'),
        IPNetwork('192.0.2.8/30'),
        IPNetwork('192.0.2.12/31'),
        IPNetwork('192.0.2.14/32'),
    ]
项目:ws-backend-community    作者:lavalamp-    | 项目源码 | 文件源码
def get_scanner_targets(self):
        """
        Get a list of strings representing the IP addresses and network ranges that
        this scanner is configured to scan.
        :return: A list of strings representing the IP addresses and network ranges that
        this scanner is configured to scan.
        """
        all_ips = self.get_all_ips()
        merged = cidr_merge(all_ips)
        return [str(x) for x in merged]
项目:Cisco-ASA-ACL-toolkit    作者:AlekzNet    | 项目源码 | 文件源码
def group_nets(nets):
    # nets = { src: [dst1, dst2, ...], ...}
    revnets = {} # nets reversed: { dst: [src1, src2, ...], ... }
    # next iteration
    debug("group_nets -- Begin ====================",4)
    debug("group_nets -- Before first phase of grouping (nets)",4)
    debug(nets,4)

    for src in nets:

        debug("group_nets -- The source",5)
        debug(src,5)
        debug("group_nets -- 1F The destination",5)
        debug(nets[src],5)
        nets[src] = netaddr.cidr_merge(nets[src])
        debug("group_nets -- 1F After CIDR-merge",5)
        debug(nets[src],5)
        if len(nets) == 1:
            debug("group_nets -- Only one pair",4)
            return {(src,):nets[src]}
        for dst in nets[src]:
            debug("group_nets -- For the destination",5)
            debug(dst,5)

            if dst not in revnets:
                revnets[dst] = []
            if src not in revnets[dst]:
                revnets[dst].append(src)
                debug("group_nets -- Added the following source",5)
                debug(src,5)
            debug("group_nets -- Current revnets[dst]",5)
            debug(revnets[dst],5)
    # grouping
    debug("group_nets -- The result of the first phase of grouping (revnets)",4)
    debug(revnets,4)
    debug("group_nets -- Second phase of grouping",4)

    nets = {}
    for dst in revnets:
        debug("group_nets -- 2F The destination",5)
        debug(dst,5)
        debug("group_nets -- The corresponfing sources",5)
        debug(revnets[dst],5)
        revnets[dst] = netaddr.cidr_merge(revnets[dst])
        debug("group_nets -- 2F After CIDR-merge",5)
        debug(revnets[dst],5)
        add_net_pair(tuple(revnets[dst]),dst,nets)
    debug("group_nets -- The result of grouping (nets)",4)
    debug(nets,4)

    debug("group_nets -- End ====================",4)
    return nets
项目:netbox    作者:digitalocean    | 项目源码 | 文件源码
def alter_queryset(self, request):

        if request.GET.get('family') == '6':
            family = 6
            denominator = 2 ** 64  # Count /64s for IPv6 rather than individual IPs
        else:
            family = 4
            denominator = 1

        rirs = []
        for rir in self.queryset:

            stats = {
                'total': 0,
                'active': 0,
                'reserved': 0,
                'deprecated': 0,
                'available': 0,
            }
            aggregate_list = Aggregate.objects.filter(family=family, rir=rir)
            for aggregate in aggregate_list:

                queryset = Prefix.objects.filter(prefix__net_contained_or_equal=str(aggregate.prefix))

                # Find all consumed space for each prefix status (we ignore containers for this purpose).
                active_prefixes = netaddr.cidr_merge([p.prefix for p in queryset.filter(status=PREFIX_STATUS_ACTIVE)])
                reserved_prefixes = netaddr.cidr_merge([p.prefix for p in queryset.filter(status=PREFIX_STATUS_RESERVED)])
                deprecated_prefixes = netaddr.cidr_merge([p.prefix for p in queryset.filter(status=PREFIX_STATUS_DEPRECATED)])

                # Find all available prefixes by subtracting each of the existing prefix sets from the aggregate prefix.
                available_prefixes = (
                    netaddr.IPSet([aggregate.prefix]) -
                    netaddr.IPSet(active_prefixes) -
                    netaddr.IPSet(reserved_prefixes) -
                    netaddr.IPSet(deprecated_prefixes)
                )

                # Add the size of each metric to the RIR total.
                stats['total'] += aggregate.prefix.size / denominator
                stats['active'] += netaddr.IPSet(active_prefixes).size / denominator
                stats['reserved'] += netaddr.IPSet(reserved_prefixes).size / denominator
                stats['deprecated'] += netaddr.IPSet(deprecated_prefixes).size / denominator
                stats['available'] += available_prefixes.size / denominator

            # Calculate the percentage of total space for each prefix status.
            total = float(stats['total'])
            stats['percentages'] = {
                'active': float('{:.2f}'.format(stats['active'] / total * 100)) if total else 0,
                'reserved': float('{:.2f}'.format(stats['reserved'] / total * 100)) if total else 0,
                'deprecated': float('{:.2f}'.format(stats['deprecated'] / total * 100)) if total else 0,
            }
            stats['percentages']['available'] = (
                100 -
                stats['percentages']['active'] -
                stats['percentages']['reserved'] -
                stats['percentages']['deprecated']
            )
            rir.stats = stats
            rirs.append(rir)

        return rirs