我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用ipaddr.IPv4Network()。
def get_sre(net, target_prefix_len): """Calculate first and last /target_prefix_len subnets from net Returns: first, last, cnt (all integers) """ assert isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network)) assert target_prefix_len <= 64, "Max target prefix length is 64" assert net.prefixlen <= target_prefix_len, \ ("Prefix length ({}) must be <= of the target prefix " "length ({}): {}".format(net.prefixlen, target_prefix_len, net)) first = UniqueSmallestRoutableEntriesMonitor.get_first(net) tot_len = 64 if net.version == 6 else 32 # first <= 2^63 -1 to avoid overflows assert first <= 9223372036854775807, \ "Only prefixes <= 7fff:ffff:ffff:ffff::/64 can be processed" diff_len = target_prefix_len - net.prefixlen last = first | ((2**diff_len - 1) << tot_len - target_prefix_len) return first, last, 2**diff_len
def get_free_ip(subnets): for subnet in subnets: reserved = subnet.reserved_addresses net = ipaddr.IPv4Network(subnet.address) used_ips = [ip.address for ip in subnet.ips] if (net.numhosts -2 - reserved) <= len(used_ips): continue for ip in net.iterhosts(): if reserved > 0: reserved -= 1 continue if ip.compressed not in used_ips: return ip.compressed raise SubnetFullException()
def fetch_url(self, i, fn_on_response): item = self.get_next_task() while item is not None: try: if '/' in item: mask = ipaddr.IPv4Network(item) ip_list = [text_type(t) for t in mask.iterhosts()] else: ip_list = [item] except: ip_list = [] for t in ip_list: if t == '': continue url_list = ['http://%s:%s' % (t, p) for p in self.port_list] url_list.extend(['https://%s:%s' % (t, p) for p in [443, 8443]]) for u in url_list: yield self.do_request(u, 'GET', fn_on_response) item = self.get_next_task()
def on_queue_empty(self, queue, max_num=100): for _ in range(max_num): try: item = self.list_data.popleft() if '/' in item: mask = ipaddr.IPv4Network(item) ip_list = [text_type(t) for t in mask.iterhosts()] else: ip_list = [item] queue.extend(ip_list) except IndexError: break try: item = queue.popleft() except IndexError: item = None return item
def is_reserved(ip): return (ipaddr.IPv4Network(ip).is_multicast | ipaddr.IPv4Network(ip).is_private | ipaddr.IPv4Network(ip).is_link_local | ipaddr.IPv4Network(ip).is_loopback | ipaddr.IPv4Network(ip).is_reserved)
def get_net(net): if isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network)): return net return ipaddr.IPNetwork(net)
def get_first(net): assert isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network)) if net.version == 6: return int(net.network) >> 64 else: return int(net.network)
def ipListBuild(address): print '1. Single IP Covert For En\n2. Build IP List' opt_req = raw_input("[+] [1 By Default/2]") or '1' if opt_req == '1': print numToEnToNum(address) exit() conf_main = conf_read('maindomain')[:-1] seg_len = raw_input("[+] Please Input Segment Length [24 By Default]") or 24 encode_req = raw_input("[+] Please Input Encoding ['ipv4' By Default]") mainDomain = raw_input("[+] Please Input Server Root Address [{} By Default]".format(conf_main)) or conf_main segment = eval("ipaddr.IPv4Network('{}/{}').iterhosts()".format(address, int(seg_len))) save_file = "{}_{}_{}.txt".format(time.strftime("%Y%m%d%X", time.localtime()).replace(':', ''), mainDomain.replace('.','_'),(encode_req if encode_req else 'ipv4')) results = [] try: if encode_req == '': results += ["{}.{}".format(str(i),mainDomain) for i in list(segment)] elif encode_req == 'en': results += ["{}.{}".format(numToEnToNum(str(i)),mainDomain) for i in list(segment)] elif encode_req == 'int': results += ["{}.{}".format(int(ipaddr.IPAddress(str(i))),mainDomain) for i in list(segment)] elif encode_req == 'hex': results += ["{}.{}".format(str(i).encode('hex'),mainDomain) for i in list(segment)] else: pass f = open(save_file,'a') [f.write(i+'\n') for i in results] f.close() print '[+] Stored in the {}'.format(save_file) except Exception,e: print e exit()
def default(self, obj): if isinstance(obj, (ipaddress.IPv4Network, ipaddress.IPv6Network, ipaddress.IPv4Address, ipaddress.IPv6Address)): return str(obj) return json.JSONEncoder.default(self, obj)
def main(): targetlist=[] if len(argv)>2: if argv[1]=='-f': return extract_target(argv[2]) if argv[1]=='-i': port=6379 if len(argv)==5: port=int(argv[4]) targets = ipaddr.IPv4Network(argv[2]) for tar in targets: targetlist.append("%s:%d"%(tar,port)) return targetlist
def select_subnet_for_ip(ip, subnets): for subnet in subnets: if ipaddr.IPAddress(ip) in ipaddr.IPv4Network(subnet.address): return subnet
def __init__(self): self.ipv4_networks = [] for ip in WHATSAPP_IPV4.split("\n"): try: self.ipv4_networks.append(ipaddr.IPv4Network(ip)) except Exception: log.err("IP is wrong") log.msg(ip) self.ipv6_networks = map(ipaddr.IPv6Network, WHATSAPP_IPV6.split("\n"))
def main(): """main function""" global timeout opt = parse_options() scan_network = opt.network try: timeout = float(opt.timeout) except ValueError: timeout = 0.5 print("Use default timeout {}".format(timeout)) hosts = [] if '/' in scan_network: network = IPv4Network(scan_network) for host in network.iterhosts(): hosts.append(str(host)) print('start to scan network {} for {} hosts...'.format(str(network), len(hosts))) pool = multiprocessing.Pool(processes=16) pool.map(scan, hosts) pool.close() pool.join() else: print('start to scan host {}'.format(scan_network)) scan(scan_network) print("done")
def default(self, obj): if isinstance(obj, ( ipaddress.IPv4Network, ipaddress.IPv6Network, ipaddress.IPv4Address, ipaddress.IPv6Address )): return str(obj) return json.JSONEncoder.default(self, obj)
def get(self): esstore = current_app.extensions['es_access_store'] mpstore = current_app.extensions['mp_access_store'] args = self.parser.parse_args() event = args['event'][:120] auth_token=request.headers.get('Auth-Token') ip_resolver = current_app.config['IP_RESOLVER'] ip = RateLimiter.get_remote_addr() ip_net = IPNetwork(ip) resolved_org = ip_resolver['default'] for net, org in ip_resolver.items(): if isinstance(net, (IPv4Network, IPv6Network)): if net.overlaps(ip_net): resolved_org = org break data = dict(ip=ip, org=resolved_org, host=request.host, timestamp=datetime.now(), event=event) if auth_token: payload = TokenAuthentication.get_payload_from_token(auth_token) data['app_name'] = payload['app_name'] # esstore.store_event(data) mpstore.store_event(data) data['timestamp']= str(data['timestamp']) return CTTVResponse.OK(SimpleResult(None, data=data))
def add_random_net(ip_ver, target_prefix_len): def dump_vars(): print("target_prefix_len", target_prefix_len) print("prefix_len", prefix_len) print("max_prefix_len", max_prefix_len) print("max_range_len", max_range_len) print("max_range", max_range) print("rand", rand) print("net_id", net_id) print("ip", ip) print("net", net) print("str(net.ip)", str(net.ip)) print("str(net.network)", str(net.network)) if ip_ver == 4: prefix_len = random.randint(8, target_prefix_len-1) max_range_len = prefix_len max_prefix_len = 32 ip_class = ipaddr.IPv4Address net_class = ipaddr.IPv4Network else: prefix_len = random.randint(19, target_prefix_len-1) max_range_len = prefix_len - 1 # to avoid overflow max_prefix_len = 64 ip_class = ipaddr.IPv6Address net_class = ipaddr.IPv6Network max_range = 2**max_range_len - 1 rand = random.randint(0, max_range) net_id = rand << max_prefix_len - prefix_len if ip_ver == 6: net_id = net_id << 64 ip = ip_class(net_id) net = net_class("{}/{}".format(str(ip), prefix_len)) try: assert str(net.ip) == str(net.network) except: dump_vars() raise try: usres_monitor.add_net(net) except USRESMonitorException as e: if "it was already in the db" in str(e): return "dup", net except: dump_vars() return "ok", net