我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用netaddr.cidr_merge()。
def test_ipnetwork_cidr_merge(): ip_list = ( list(IPNetwork('fe80::/120')) + [ IPNetwork('192.0.2.0/24'), IPNetwork('192.0.4.0/25'), IPNetwork('192.0.4.128/25'), ] + list(map(str, IPNetwork('192.0.3.0/24'))) ) assert len(ip_list) == 515 assert cidr_merge(ip_list) == [ IPNetwork('192.0.2.0/23'), IPNetwork('192.0.4.0/24'), IPNetwork('fe80::/120'), ]
def getRadarAs(asNumber): radarResponse = requests.get("https://radar.qrator.net/api/prefixes/%s?tab_id=current&page=1" % asNumber).json() totalPrefixes = int(radarResponse.get('total')) initalPageSoup = bs4.BeautifulSoup(radarResponse.get('page'), "html.parser") networkRawSet = set() for a in initalPageSoup.find_all(text=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d+?$")): networkRawSet.add("%s" % a) startPage = 1 while len(networkRawSet) < totalPrefixes: radarResponse = requests.get("https://radar.qrator.net/api/prefixes/%s?tab_id=current&page=%s" % (asNumber, startPage)).json() pageSoup = bs4.BeautifulSoup(radarResponse.get('page'), "html.parser") for a in pageSoup.find_all(text=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d+?$")): networkRawSet.add("%s" % a) startPage += 1 # Now minimize this shit networkSet = netaddr.IPSet([netaddr.IPNetwork(item) for item in networkRawSet]) mergedNetworks = netaddr.cidr_merge(networkSet) if not mergedNetworks: print("Nothing found. Wrong AS number?") else: print("\n".join(["%s" % network for network in mergedNetworks]))
def test_whole_network_cidr_merge_v6(): assert cidr_merge(['::/0', 'fe80::1']) == [IPNetwork('::/0')] assert cidr_merge(['::/0', '::']) == [IPNetwork('::/0')] assert cidr_merge(['::/0', '::192.0.2.0/124', 'ff00::101']) == [IPNetwork('::/0')] assert cidr_merge(['0.0.0.0/0', '0.0.0.0', '::/0', '::']) == [IPNetwork('0.0.0.0/0'), IPNetwork('::/0')]
def test_ip_range(): ip_list = list(iter_iprange('192.0.2.1', '192.0.2.14')) assert len(ip_list) == 14 assert ip_list == [ IPAddress('192.0.2.1'), IPAddress('192.0.2.2'), IPAddress('192.0.2.3'), IPAddress('192.0.2.4'), IPAddress('192.0.2.5'), IPAddress('192.0.2.6'), IPAddress('192.0.2.7'), IPAddress('192.0.2.8'), IPAddress('192.0.2.9'), IPAddress('192.0.2.10'), IPAddress('192.0.2.11'), IPAddress('192.0.2.12'), IPAddress('192.0.2.13'), IPAddress('192.0.2.14'), ] assert cidr_merge(ip_list) == [ IPNetwork('192.0.2.1/32'), IPNetwork('192.0.2.2/31'), IPNetwork('192.0.2.4/30'), IPNetwork('192.0.2.8/30'), IPNetwork('192.0.2.12/31'), IPNetwork('192.0.2.14/32'), ]
def func(self, args): """Add networks together, aggregating as much as possible. Uses self._get_networks() to get the networks. Subclasses may want to redefine that method. """ networks = self._get_networks(args) merged = netaddr.cidr_merge(networks) for i in merged: print(i)
def func(self, args): """Evaluate an expression of adding and subtracting networks.""" expr = args.expression accum = [_network_address(expr.pop(0))] while len(expr) >= 2: operator = expr.pop(0) # right-hand side of the expression rhs = _network_address(expr.pop(0)) if operator in ("+", "add", "merge"): # add (merge) in a new network accum = netaddr.cidr_merge(accum + [rhs]) elif operator in ("-", "sub", "remove"): # subtract (remove) a network # for each network in accum, remove the RHS from it, then # chain everything into a single flat generator sequence # (RHS may be partially contained in more than one accum # network) minus_rhs = itertools.chain.from_iterable( netaddr.cidr_exclude(network, rhs) for network in accum ) accum = netaddr.cidr_merge(minus_rhs) else: raise CommandParseError("invalid operator '%s'" % operator) if expr: self.warn("ignoring extra argument '%s'" % ' '.join(expr)) for i in accum: print(i)
def unfold(objarr): for obj in objarr: unfold_rec(objarr[obj],objarr) if not args.noaggr and objarr is netgrp: objarr[obj] = netaddr.cidr_merge(objarr[obj]) # Unfold all included objects
def get_scanner_targets(self): """ Get a list of strings representing the IP addresses and network ranges that this scanner is configured to scan. :return: A list of strings representing the IP addresses and network ranges that this scanner is configured to scan. """ all_ips = self.get_all_ips() merged = cidr_merge(all_ips) return [str(x) for x in merged]
def group_nets(nets): # nets = { src: [dst1, dst2, ...], ...} revnets = {} # nets reversed: { dst: [src1, src2, ...], ... } # next iteration debug("group_nets -- Begin ====================",4) debug("group_nets -- Before first phase of grouping (nets)",4) debug(nets,4) for src in nets: debug("group_nets -- The source",5) debug(src,5) debug("group_nets -- 1F The destination",5) debug(nets[src],5) nets[src] = netaddr.cidr_merge(nets[src]) debug("group_nets -- 1F After CIDR-merge",5) debug(nets[src],5) if len(nets) == 1: debug("group_nets -- Only one pair",4) return {(src,):nets[src]} for dst in nets[src]: debug("group_nets -- For the destination",5) debug(dst,5) if dst not in revnets: revnets[dst] = [] if src not in revnets[dst]: revnets[dst].append(src) debug("group_nets -- Added the following source",5) debug(src,5) debug("group_nets -- Current revnets[dst]",5) debug(revnets[dst],5) # grouping debug("group_nets -- The result of the first phase of grouping (revnets)",4) debug(revnets,4) debug("group_nets -- Second phase of grouping",4) nets = {} for dst in revnets: debug("group_nets -- 2F The destination",5) debug(dst,5) debug("group_nets -- The corresponfing sources",5) debug(revnets[dst],5) revnets[dst] = netaddr.cidr_merge(revnets[dst]) debug("group_nets -- 2F After CIDR-merge",5) debug(revnets[dst],5) add_net_pair(tuple(revnets[dst]),dst,nets) debug("group_nets -- The result of grouping (nets)",4) debug(nets,4) debug("group_nets -- End ====================",4) return nets
def alter_queryset(self, request): if request.GET.get('family') == '6': family = 6 denominator = 2 ** 64 # Count /64s for IPv6 rather than individual IPs else: family = 4 denominator = 1 rirs = [] for rir in self.queryset: stats = { 'total': 0, 'active': 0, 'reserved': 0, 'deprecated': 0, 'available': 0, } aggregate_list = Aggregate.objects.filter(family=family, rir=rir) for aggregate in aggregate_list: queryset = Prefix.objects.filter(prefix__net_contained_or_equal=str(aggregate.prefix)) # Find all consumed space for each prefix status (we ignore containers for this purpose). active_prefixes = netaddr.cidr_merge([p.prefix for p in queryset.filter(status=PREFIX_STATUS_ACTIVE)]) reserved_prefixes = netaddr.cidr_merge([p.prefix for p in queryset.filter(status=PREFIX_STATUS_RESERVED)]) deprecated_prefixes = netaddr.cidr_merge([p.prefix for p in queryset.filter(status=PREFIX_STATUS_DEPRECATED)]) # Find all available prefixes by subtracting each of the existing prefix sets from the aggregate prefix. available_prefixes = ( netaddr.IPSet([aggregate.prefix]) - netaddr.IPSet(active_prefixes) - netaddr.IPSet(reserved_prefixes) - netaddr.IPSet(deprecated_prefixes) ) # Add the size of each metric to the RIR total. stats['total'] += aggregate.prefix.size / denominator stats['active'] += netaddr.IPSet(active_prefixes).size / denominator stats['reserved'] += netaddr.IPSet(reserved_prefixes).size / denominator stats['deprecated'] += netaddr.IPSet(deprecated_prefixes).size / denominator stats['available'] += available_prefixes.size / denominator # Calculate the percentage of total space for each prefix status. total = float(stats['total']) stats['percentages'] = { 'active': float('{:.2f}'.format(stats['active'] / total * 100)) if total else 0, 'reserved': float('{:.2f}'.format(stats['reserved'] / total * 100)) if total else 0, 'deprecated': float('{:.2f}'.format(stats['deprecated'] / total * 100)) if total else 0, } stats['percentages']['available'] = ( 100 - stats['percentages']['active'] - stats['percentages']['reserved'] - stats['percentages']['deprecated'] ) rir.stats = stats rirs.append(rir) return rirs