我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用netaddr.valid_ipv4()。
def get_ipv6_addr_by_EUI64(cidr, mac): """Generate a IPv6 addr by EUI-64 with CIDR and MAC :param str cidr: a IPv6 CIDR :param str mac: a MAC address :return: an IPv6 Address :rtype: netaddr.IPAddress """ # Check if the prefix is IPv4 address is_ipv4 = netaddr.valid_ipv4(cidr) if is_ipv4: msg = "Unable to generate IP address by EUI64 for IPv4 prefix" raise TypeError(msg) try: eui64 = int(netaddr.EUI(mac).eui64()) prefix = netaddr.IPNetwork(cidr) return netaddr.IPAddress(prefix.first + eui64 ^ (1 << 57)) except (ValueError, netaddr.AddrFormatError): raise TypeError('Bad prefix or mac format for generating IPv6 ' 'address by EUI-64: %(prefix)s, %(mac)s:' % {'prefix': cidr, 'mac': mac}) except TypeError: raise TypeError('Bad prefix type for generate IPv6 address by ' 'EUI-64: %s' % cidr)
def serialize(self): # fixup if (netaddr.valid_ipv4(self.peer_ip) and netaddr.valid_ipv4(self.local_ip)): self.afi = self.AFI_IPv4 elif (netaddr.valid_ipv6(self.peer_ip) and netaddr.valid_ipv6(self.local_ip)): self.afi = self.AFI_IPv6 else: raise ValueError( 'peer_ip and local_ip must be the same address family: ' 'peer_ip=%s, local_ip=%s' % (self.peer_ip, self.local_ip)) buf = struct.pack(self._HEADER_FMT, self.peer_as, self.local_as, self.if_index, self.afi) buf += ip.text_to_bin(self.peer_ip) buf += ip.text_to_bin(self.local_ip) buf += struct.pack(self._STATES_FMT, self.old_state, self.new_state) return buf
def serialize(self): # fixup if (netaddr.valid_ipv4(self.peer_ip) and netaddr.valid_ipv4(self.local_ip)): self.afi = self.AFI_IPv4 elif (netaddr.valid_ipv6(self.peer_ip) and netaddr.valid_ipv6(self.local_ip)): self.afi = self.AFI_IPv6 else: raise ValueError( 'peer_ip and local_ip must be the same address family: ' 'peer_ip=%s, local_ip=%s' % (self.peer_ip, self.local_ip)) buf = struct.pack(self._HEADER_FMT, self.peer_as, self.local_as, self.if_index, self.afi) buf += ip.text_to_bin(self.peer_ip) buf += ip.text_to_bin(self.local_ip) buf += self.bgp_message.serialize() return buf
def __init__(self, afi, safi, next_hop, nlri, flags=0, type_=None, length=None): super(BGPPathAttributeMpReachNLRI, self).__init__( flags=flags, type_=type_, length=length) self.afi = afi self.safi = safi if not isinstance(next_hop, (list, tuple)): next_hop = [next_hop] for n in next_hop: if not netaddr.valid_ipv4(n) and not netaddr.valid_ipv6(n): raise ValueError('Invalid address for next_hop: %s' % n) # Note: For the backward compatibility, stores the first next_hop # address and all next_hop addresses separately. if next_hop: self._next_hop = next_hop[0] else: self._next_hop = None self._next_hop_list = next_hop self.nlri = nlri addr_cls = _get_addr_class(afi, safi) for i in nlri: if not isinstance(i, addr_cls): raise ValueError('Invalid NRLI class for afi=%d and safi=%d' % (self.afi, self.safi))
def __init__(self, lp_status, te_metric, max_bw, max_reserved_bw, unreserved_bw, admin_group, remote_as, remote_ip, average_delay, min_delay, max_delay, delay_var, pkt_loss, residual_bw, average_bw, utilized_bw): super(InterfaceLinkParams, self).__init__() self.lp_status = lp_status self.te_metric = te_metric self.max_bw = max_bw self.max_reserved_bw = max_reserved_bw assert isinstance(unreserved_bw, (list, tuple)) assert len(unreserved_bw) == MAX_CLASS_TYPE self.unreserved_bw = unreserved_bw self.admin_group = admin_group self.remote_as = remote_as assert netaddr.valid_ipv4(remote_ip) self.remote_ip = remote_ip self.average_delay = average_delay self.min_delay = min_delay self.max_delay = max_delay self.delay_var = delay_var self.pkt_loss = pkt_loss self.residual_bw = residual_bw self.average_bw = average_bw self.utilized_bw = utilized_bw
def serialize(self): if ip.valid_ipv4(self.prefix): self.family = socket.AF_INET # fixup prefix_addr, prefix_num = self.prefix.split('/') body_bin = struct.pack( self._IPV4_BODY_FMT, addrconv.ipv4.text_to_bin(prefix_addr), int(prefix_num)) elif ip.valid_ipv6(self.prefix): self.family = socket.AF_INET6 # fixup prefix_addr, prefix_num = self.prefix.split('/') body_bin = struct.pack( self._IPV6_BODY_FMT, addrconv.ipv6.text_to_bin(prefix_addr), int(prefix_num)) else: raise ValueError('Invalid prefix: %s' % self.prefix) return struct.pack(self._FAMILY_FMT, self.family) + body_bin
def serialize(self): # fixup if ip.valid_ipv4(self.prefix): self.family = socket.AF_INET elif ip.valid_ipv6(self.prefix): self.family = socket.AF_INET6 else: raise ValueError('Invalid prefix: %s' % self.prefix) buf = struct.pack(self._FAMILY_FMT, self.family) buf += _serialize_ip_prefix(self.prefix) buf += struct.pack(self._METRIC_FMT, self.metric) return buf + _serialize_nexthops(self.nexthops)
def validate(value): if isinstance(value, six.string_types) and netaddr.valid_ipv4(value): return value raise ValueError(_("Expected IPv4 string, got '%s'") % value)
def validIP(self, address): return netaddr.valid_ipv4(address) # Clean DNS results to be a simple list
def matches(self, value, includeParents=False, includeChildren=True): value = value.lower() if value is None or value == "": return False if netaddr.valid_ipv4(value): # 1.1 if value in self.getAddresses(): return True # 1.2 if self.targetType == "NETBLOCK_OWNER": if netaddr.IPAddress(value) in netaddr.IPNetwork(self.targetValue): return True if self.targetType == "IP_ADDRESS": if netaddr.IPAddress(value) in \ netaddr.IPNetwork(netaddr.IPAddress(self.targetValue)): return True else: for name in self.getNames(): # 2.1 if value == name: return True # 2.2 if includeParents and name.endswith("." + value): return True # 2.3 if includeChildren and value.endswith("." + name): return True return None # Class for SpiderFoot Events
def is_valid_ipv4(address): """Verify that address represents a valid IPv4 address. :param address: Value to verify :type address: string :returns: bool .. versionadded:: 1.1 """ try: return netaddr.valid_ipv4(address) except netaddr.AddrFormatError: return False
def get_server_port_map(self, server, ip_addr=None): ports = self.handle._list_ports(device_id=server['id'], fixed_ip=ip_addr) # pylint: disable=protected-access port_map = [ (p['id'], fxip['ip_address']) for p in ports for fxip in p['fixed_ips'] if netaddr.valid_ipv4(fxip['ip_address']) and fxip['ip_address'] == ip_addr['addr'] ] return port_map
def netlist(nets): v4nets = [] v6nets = [] for net in nets: ipNetwork = netaddr.IPNetwork(net) parts = str(ipNetwork).split("/") ip = parts[0] mask = parts[1] if netaddr.valid_ipv4(ip) and int(mask) <= 32: v4nets.append(ipNetwork) elif netaddr.valid_ipv6(ip) and int(mask) <= 128: v6nets.append(ipNetwork) return v4nets, v6nets
def ipset(nets): v4nets = netaddr.IPSet() v6nets = netaddr.IPSet() for net in nets: ipNetwork = netaddr.IPNetwork(net) parts = str(ipNetwork).split("/") ip = parts[0] mask = parts[1] if netaddr.valid_ipv4(ip) and int(mask) <= 32: v4nets.add(ipNetwork) elif netaddr.valid_ipv6(ip) and int(mask) <= 128: v6nets.add(ipNetwork) return v4nets, v6nets
def valid_ip_address(addr): if not netaddr.valid_ipv4(addr) and not netaddr.valid_ipv6(addr): return False return True
def prefix_add(self, prefix, next_hop=None, route_dist=None): """ This method adds a new prefix to be advertized. ``prefix`` must be the string representation of an IP network (e.g., 10.1.1.0/24). ``next_hop`` specifies the next hop address for this prefix. This parameter is necessary for only VPNv4 and VPNv6 address families. ``route_dist`` specifies a route distinguisher value. This parameter is necessary for only VPNv4 and VPNv6 address families. """ func_name = 'network.add' networks = {} networks[PREFIX] = prefix if next_hop: networks[NEXT_HOP] = next_hop if route_dist: func_name = 'prefix.add_local' networks[ROUTE_DISTINGUISHER] = route_dist rf, p = self._check_rf_and_normalize(prefix) networks[ROUTE_FAMILY] = rf networks[PREFIX] = p if rf == vrfs.VRF_RF_IPV6 and netaddr.valid_ipv4(next_hop): # convert the next_hop to IPv4-Mapped IPv6 Address networks[NEXT_HOP] = \ str(netaddr.IPAddress(next_hop).ipv6()) return call(func_name, **networks)
def _connect_tcp(self, peer_addr, conn_handler, time_out=None, bind_address=None, password=None): """Creates a TCP connection to given peer address. Tries to create a socket for `timeout` number of seconds. If successful, uses the socket instance to start `client_factory`. The socket is bound to `bind_address` if specified. """ LOG.debug('Connect TCP called for %s:%s', peer_addr[0], peer_addr[1]) if netaddr.valid_ipv4(peer_addr[0]): family = socket.AF_INET else: family = socket.AF_INET6 with Timeout(time_out, socket.error): sock = socket.socket(family) if bind_address: sock.bind(bind_address) if password: sockopt.set_tcp_md5sig(sock, peer_addr[0], password) sock.connect(peer_addr) # socket.error exception is rasied in cese of timeout and # the following code is executed only when the connection # is established. # Connection name for pro-active connection is made up of # local end address + remote end address local = self.get_localname(sock)[0] remote = self.get_remotename(sock)[0] conn_name = ('L: ' + local + ', R: ' + remote) self._asso_socket_map[conn_name] = sock # If connection is established, we call connection handler # in a new thread. self._spawn(conn_name, conn_handler, sock) return sock # # Sink #
def add_to_global_table(self, prefix, nexthop=None, is_withdraw=False): src_ver_num = 1 peer = None # set mandatory path attributes origin = BGPPathAttributeOrigin(BGP_ATTR_ORIGIN_IGP) aspath = BGPPathAttributeAsPath([[]]) pathattrs = OrderedDict() pathattrs[BGP_ATTR_TYPE_ORIGIN] = origin pathattrs[BGP_ATTR_TYPE_AS_PATH] = aspath net = netaddr.IPNetwork(prefix) ip = str(net.ip) masklen = net.prefixlen if netaddr.valid_ipv4(ip): _nlri = IPAddrPrefix(masklen, ip) if nexthop is None: nexthop = '0.0.0.0' p = Ipv4Path else: _nlri = IP6AddrPrefix(masklen, ip) if nexthop is None: nexthop = '::' p = Ipv6Path new_path = p(peer, _nlri, src_ver_num, pattrs=pathattrs, nexthop=nexthop, is_withdraw=is_withdraw) # add to global ipv4 table and propagates to neighbors self.learn_path(new_path)
def _set_password(self, address, password): if netaddr.valid_ipv4(address): family = socket.AF_INET else: family = socket.AF_INET6 for sock in self.listen_sockets.values(): if sock.family == family: sockopt.set_tcp_md5sig(sock, address, password)
def validate_ip_addr(ip_addr): if netaddr.valid_ipv4(ip_addr): return lib_consts.IP_VERSION_4 elif netaddr.valid_ipv6(ip_addr): return lib_consts.IP_VERSION_6 else: raise bgp_driver_exc.InvalidParamType(param=ip_addr, param_type='ip-address')
def is_ip(self, ip_addr=None): """ Return true if valid IP address return false if invalid IP address :param ip_addr: optional IP to pass. Takes from root class if not specified >>> from ipinformation import IPInformation >>> print IPInformation(ip_address='8.8.8.8').is_ip() True >>> print IPInformation(ip_address='NotAnIP').is_ip() False """ if not ip_addr: ip_addr = self.ip_address valid = True if netaddr.valid_ipv4( ip_addr ): #IPv4 Address if not re.match( valid_ip_regex, ip_addr ): valid = False elif netaddr.valid_ipv6( ip_addr ): pass else: # print '"%s" is not a valid IP Address.' %ip_addr valid = False return valid
def validate_ip_addr(addr, version=None): """ Validates that an IP address is valid. Returns true if valid, false if not. Version can be "4", "6", None for "IPv4", "IPv6", or "either" respectively. """ if version == 4: return netaddr.valid_ipv4(addr) elif version == 6: return netaddr.valid_ipv6(addr) else: return netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr)
def valid_ipv4(addr, flags=0): """ Wrapper function of "netaddr.valid_ipv4()". The function extends "netaddr.valid_ipv4()" to enable to validate IPv4 network address in "xxx.xxx.xxx.xxx/xx" format. :param addr: IP address to be validated. :param flags: See the "netaddr.valid_ipv4()" docs for details. :return: True is valid. False otherwise. """ return _valid_ip(netaddr.valid_ipv4, 32, addr, flags)
def next_hop(self, addr): if not netaddr.valid_ipv4(addr) and not netaddr.valid_ipv6(addr): raise ValueError('Invalid address for next_hop: %s' % addr) self._next_hop = addr self.next_hop_list[0] = addr
def _serialize_ip_prefix(prefix): if ip.valid_ipv4(prefix): prefix_addr, prefix_num = prefix.split('/') return bgp.IPAddrPrefix(int(prefix_num), prefix_addr).serialize() elif ip.valid_ipv6(prefix): prefix_addr, prefix_num = prefix.split('/') return IPv6Prefix(int(prefix_num), prefix_addr).serialize() else: raise ValueError('Invalid prefix: %s' % prefix)
def __init__(self, ifindex, ifc_flags, family, prefix, dest): super(_ZebraInterfaceAddress, self).__init__() self.ifindex = ifindex self.ifc_flags = ifc_flags self.family = family if isinstance(prefix, (IPv4Prefix, IPv6Prefix)): prefix = prefix.prefix self.prefix = prefix assert netaddr.valid_ipv4(dest) or netaddr.valid_ipv6(dest) self.dest = dest
def serialize(self): if ip.valid_ipv4(self.prefix): self.family = socket.AF_INET # fixup prefix_addr, prefix_num = self.prefix.split('/') body_bin = struct.pack( self._IPV4_BODY_FMT, addrconv.ipv4.text_to_bin(prefix_addr), int(prefix_num), addrconv.ipv4.text_to_bin(self.dest)) elif ip.valid_ipv6(self.prefix): self.family = socket.AF_INET6 # fixup prefix_addr, prefix_num = self.prefix.split('/') body_bin = struct.pack( self._IPV6_BODY_FMT, addrconv.ipv6.text_to_bin(prefix_addr), int(prefix_num), addrconv.ipv6.text_to_bin(self.dest)) else: raise ValueError( 'Invalid address family for prefix=%s and dest=%s' % (self.prefix, self.dest)) buf = struct.pack(self._HEADER_FMT, self.ifindex, self.ifc_flags, self.family) return buf + body_bin
def __init__(self, prefix, metric=None, nexthops=None): super(_ZebraIPImportLookup, self).__init__() assert netaddr.valid_ipv4(prefix) or netaddr.valid_ipv6(prefix) self.prefix = prefix self.metric = metric nexthops = nexthops or [] for nexthop in nexthops: assert isinstance(nexthop, _NextHop) self.nexthops = nexthops
def __init__(self, addr, distance, metric, nexthops=None): super(_ZebraIPNexthopLookupMRib, self).__init__() assert netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr) self.addr = addr self.distance = distance self.metric = metric nexthops = nexthops or [] for nexthop in nexthops: assert isinstance(nexthop, _NextHop) self.nexthops = nexthops
def is_valid_ipv4(ipv4): """Returns True if given is a valid ipv4 address. Given value should be a dot-decimal notation string. Samples: - valid address: 10.0.0.1, 192.168.0.1 - invalid address: 11.0.0, 192:168:0:1, etc. """ return netaddr.valid_ipv4(ipv4)
def create_rt_extended_community(value, subtype=2): """ Creates an instance of the BGP Route Target Community (if "subtype=2") or Route Origin Community ("subtype=3"). :param value: String of Route Target or Route Origin value. :param subtype: Subtype of Extended Community. :return: An instance of Route Target or Route Origin Community. """ global_admin, local_admin = value.split(':') local_admin = int(local_admin) if global_admin.isdigit() and 0 <= int(global_admin) <= 0xffff: ext_com = BGPTwoOctetAsSpecificExtendedCommunity( subtype=subtype, as_number=int(global_admin), local_administrator=local_admin) elif global_admin.isdigit() and 0xffff < int(global_admin) <= 0xffffffff: ext_com = BGPFourOctetAsSpecificExtendedCommunity( subtype=subtype, as_number=int(global_admin), local_administrator=local_admin) elif netaddr.valid_ipv4(global_admin): ext_com = BGPIPv4AddressSpecificExtendedCommunity( subtype=subtype, ipv4_address=global_admin, local_administrator=local_admin) else: raise ValueError( 'Invalid Route Target or Route Origin value: %s' % value) return ext_com
def _connect_tcp(self, peer_addr, conn_handler, time_out=None, bind_address=None, password=None): """Creates a TCP connection to given peer address. Tries to create a socket for `timeout` number of seconds. If successful, uses the socket instance to start `client_factory`. The socket is bound to `bind_address` if specified. """ LOG.debug('Connect TCP called for %s:%s', peer_addr[0], peer_addr[1]) if netaddr.valid_ipv4(peer_addr[0]): family = socket.AF_INET else: family = socket.AF_INET6 with Timeout(time_out, socket.error): sock = socket.socket(family) if bind_address: sock.bind(bind_address) if password: sockopt.set_tcp_md5sig(sock, peer_addr[0], password) sock.connect(peer_addr) # socket.error exception is raised in case of timeout and # the following code is executed only when the connection # is established. # Connection name for pro-active connection is made up of # local end address + remote end address local = self.get_localname(sock)[0] remote = self.get_remotename(sock)[0] conn_name = ('L: ' + local + ', R: ' + remote) self._asso_socket_map[conn_name] = sock # If connection is established, we call connection handler # in a new thread. self._spawn(conn_name, conn_handler, sock) return sock # # Sink #
def update_global_table(self, prefix, next_hop=None, is_withdraw=False): """Update a BGP route in the Global table for the given `prefix` with the given `next_hop`. If `is_withdraw` is False, which is the default, add a BGP route to the Global table. If `is_withdraw` is True, remove a BGP route from the Global table. """ src_ver_num = 1 peer = None # set mandatory path attributes origin = BGPPathAttributeOrigin(BGP_ATTR_ORIGIN_IGP) aspath = BGPPathAttributeAsPath([[]]) pathattrs = OrderedDict() pathattrs[BGP_ATTR_TYPE_ORIGIN] = origin pathattrs[BGP_ATTR_TYPE_AS_PATH] = aspath net = netaddr.IPNetwork(prefix) ip = str(net.ip) masklen = net.prefixlen if netaddr.valid_ipv4(ip): _nlri = IPAddrPrefix(masklen, ip) if next_hop is None: next_hop = '0.0.0.0' p = Ipv4Path else: _nlri = IP6AddrPrefix(masklen, ip) if next_hop is None: next_hop = '::' p = Ipv6Path new_path = p(peer, _nlri, src_ver_num, pattrs=pathattrs, nexthop=next_hop, is_withdraw=is_withdraw) # add to global table and propagates to neighbors self.learn_path(new_path)
def create_connection(address): """ Wrapper for socket.create_connection() function. If *address* (a 2-tuple ``(host, port)``) contains a valid IPv4/v6 address, passes *address* to socket.create_connection(). If *host* is valid path to Unix Domain socket, tries to connect to the server listening on the given socket. :param address: IP address or path to Unix Domain socket. :return: Socket instance. """ host, _port = address if (netaddr.valid_ipv4(host) or netaddr.valid_ipv6(host)): return socket.create_connection(address) elif os.path.exists(host): sock = None try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(host) except socket.error as e: if sock is not None: sock.close() raise e return sock else: raise ValueError('Invalid IP address or Unix Socket: %s' % host)
def general_info(self): """ Return IP in bits, ip_type (ie: private, multicast, loopback,etc..), time updated/returned and version for an IP Address >>> from ipinformation import IPInformation >>> from pprint import pprint >>> pprint( IPInformation(ip_address='8.8.8.8').general_info() ) {'general': {'bits': '00001000000010000000100000001000', 'type': 'public', 'updated': datetime.datetime(2016, 1, 16, 18, 7, 4, 288512), 'version': '4'}} >>> pprint( IPInformation(ip_address='127.0.0.1').general_info() ) {'general': {'bits': '01111111000000000000000000000001', 'type': 'loopback', 'updated': datetime.datetime(2016, 1, 16, 18, 10, 6, 729149), 'version': '4'}} """ data = { 'general': { 'bits': None, 'type': None, 'updated': None, 'version': None} } if not self.ISIP: # print '"%s" is not a valid IP Address.' %self.ip_address # logging_file.error( '"{0}" is not a valid IP Address.'.format(self.ip_address) ) return data if netaddr.valid_ipv4( self.ip_address ): #IPv4 Address ip_version = '4' data['general'].update({'version':ip_version}) ip_bits = netaddr.IPAddress( self.ip_address ).bits().replace( '.', '' ) #Set the IP bits for searching by subnet data['general'].update({'bits':ip_bits}) ip_addr = netaddr.IPAddress(self.ip_address) if ip_addr.is_private(): ip_type = 'private' elif ip_addr.is_multicast(): ip_type = 'multicast' elif ip_addr.is_loopback(): ip_type = 'loopback' elif ip_addr.is_netmask(): ip_type = 'netmask' elif ip_addr.is_reserved(): ip_type = 'reserved' elif ip_addr.is_link_local(): ip_type = 'link_local' elif ip_addr.is_unicast(): ip_type = 'public' else: #Unknown Type ip_type = 'unknown' logging_file.error( '"{0}" is an unknown IP Address.'.format(self.ip_address) ) elif netaddr.valid_ipv6( self.ip_address ): #IPv6 Address#TODO:Finish IPv6 ip_version = '6' print 'Is IPv6' return False data['general'].update( { 'type': ip_type } ) data['general'].update( { 'updated': datetime.utcnow() } ) return data
def main(): module = AnsibleModule( argument_spec=dict( neighbor=dict(required=False, default=None), direction=dict(required=False, choices=['adv', 'rec']), prefix=dict(required=False, default=None) ), supports_check_mode=False ) m_args = module.params neighbor = m_args['neighbor'] direction = m_args['direction'] prefix = m_args['prefix'] regex_ip = re.compile('[0-9a-fA-F.:]+') regex_iprange = re.compile('[0-9a-fA-F.:]+\/\d+') regex_ipv4 = re.compile('[12][0-9]{0,2}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\/?\d+?') if neighbor == None and direction == None and prefix == None: module.fail_json(msg="No support of parsing 'show ip bgp' full prefix table yet") return if neighbor and ((not netaddr.valid_ipv4(neighbor)) and (not netaddr.valid_ipv6(neighbor))): err_message = "Invalid neighbor address %s ??" % neighbor module.fail_json(msg=err_message) return if (neighbor and not direction) or (neighbor and 'adv' not in direction.lower()): err_message = 'No support of parsing this command " show ip(v6) bgp neighbor %s %s" yet' % (neighbor, direction) module.fail_json(msg=err_message) return try: bgproute = BgpRoutes(neighbor, direction, prefix) if prefix: if regex_ipv4.match(prefix): command = "docker exec -i bgp vtysh -c 'show ip bgp " + str(prefix) + "'" else: command = "docker exec -i bgp vtysh -c 'show ipv6 bgp " + str(prefix) + "'" rc, out, err = module.run_command(command) if rc != 0: err_message = "command %s failed rc=%d, out=%s, err=%s" %(command, rt, out, err) module.fail_json(msg=err_message) return bgproute.parse_bgp_route_prefix(out) elif neighbor: if netaddr.valid_ipv4(neighbor): command = "docker exec -i bgp vtysh -c 'show ip bgp neighbor " + str(neighbor) + " " + str(direction) + "'" else: command = "docker exec -i bgp vtysh -c 'show ipv6 bgp neighbor " + str(neighbor) + " " + str(direction) + "'" rc, out, err = module.run_command(command) if rc != 0: err_message = "command %s failed rc=%d, out=%s, err=%s" %(command, rt, out, err) module.fail_json(msg=err_message) return bgproute.parse_bgp_route_adv(out) results = bgproute.get_facts() module.exit_json(ansible_facts=results) except Exception as e: fail_msg = "cannot correctly parse BGP Routing facts!\n" fail_msg += str(e) module.fail_json(msg=fail_msg) return
def check_host(host, allow_localhost=config.ALLOW_CONNECT_LOCALHOST): """Check if a given host is a valid DNS name or IPv4 address""" try: ipaddr = socket.gethostbyname(host) except UnicodeEncodeError: raise MistError('Please provide a valid DNS name') except socket.gaierror: raise MistError("Not a valid IP address or resolvable DNS name: '%s'." % host) if host != ipaddr: msg = "Host '%s' resolves to '%s' which" % (host, ipaddr) else: msg = "Host '%s'" % host if not netaddr.valid_ipv4(ipaddr): raise MistError(msg + " is not a valid IPv4 address.") forbidden_subnets = { '0.0.0.0/8': "used for broadcast messages to the current network", '100.64.0.0/10': ("used for communications between a service provider " "and its subscribers when using a " "Carrier-grade NAT"), '169.254.0.0/16': ("used for link-local addresses between two hosts " "on a single link when no IP address is otherwise " "specified"), '192.0.0.0/24': ("used for the IANA IPv4 Special Purpose Address " "Registry"), '192.0.2.0/24': ("assigned as 'TEST-NET' for use solely in " "documentation and example source code"), '192.88.99.0/24': "used by 6to4 anycast relays", '198.18.0.0/15': ("used for testing of inter-network communications " "between two separate subnets"), '198.51.100.0/24': ("assigned as 'TEST-NET-2' for use solely in " "documentation and example source code"), '203.0.113.0/24': ("assigned as 'TEST-NET-3' for use solely in " "documentation and example source code"), '224.0.0.0/4': "reserved for multicast assignments", '240.0.0.0/4': "reserved for future use", '255.255.255.255/32': ("reserved for the 'limited broadcast' " "destination address"), } if not allow_localhost: forbidden_subnets['127.0.0.0/8'] = ("used for loopback addresses " "to the local host") cidr = netaddr.smallest_matching_cidr(ipaddr, forbidden_subnets.keys()) if cidr: raise MistError("%s is not allowed. It belongs to '%s' " "which is %s." % (msg, cidr, forbidden_subnets[str(cidr)]))
def __init__(self, route_type, flags, message, safi=None, prefix=None, nexthops=None, ifindexes=None, distance=None, metric=None, mtu=None, tag=None, from_zebra=False): super(_ZebraIPRoute, self).__init__() self.route_type = route_type self.flags = flags self.message = message # SAFI should be included if this message sent to Zebra. if from_zebra: self.safi = None else: self.safi = safi or packet_safi.UNICAST assert prefix is not None if isinstance(prefix, (IPv4Prefix, IPv6Prefix)): prefix = prefix.prefix self.prefix = prefix # Nexthops should be a list of str representations of IP address # if this message sent from Zebra, otherwise a list of _Nexthop # subclasses. nexthops = nexthops or [] if from_zebra: for nexthop in nexthops: assert (netaddr.valid_ipv4(nexthop) or netaddr.valid_ipv6(nexthop)) else: for nexthop in nexthops: assert isinstance(nexthop, _NextHop) self.nexthops = nexthops # Interface indexes should be included if this message sent from # Zebra. if from_zebra: ifindexes = ifindexes or [] for ifindex in ifindexes: assert isinstance(ifindex, six.integer_types) self.ifindexes = ifindexes else: self.ifindexes = None self.distance = distance self.metric = metric self.mtu = mtu self.tag = tag # is this message sent from Zebra message or not. self.from_zebra = from_zebra
def _send_ip_route_impl( self, prefix, nexthops=None, safi=packet_safi.UNICAST, flags=zebra.ZEBRA_FLAG_INTERNAL, distance=None, metric=None, mtu=None, tag=None, is_withdraw=False): if ip.valid_ipv4(prefix): if is_withdraw: msg_cls = zebra.ZebraIPv4RouteDelete else: msg_cls = zebra.ZebraIPv4RouteAdd elif ip.valid_ipv6(prefix): if is_withdraw: msg_cls = zebra.ZebraIPv6RouteDelete else: msg_cls = zebra.ZebraIPv6RouteAdd else: raise ValueError('Invalid prefix: %s' % prefix) nexthop_list = [] for nexthop in nexthops: if netaddr.valid_ipv4(nexthop): nexthop_list.append(zebra.NextHopIPv4(addr=nexthop)) elif netaddr.valid_ipv6(nexthop): nexthop_list.append(zebra.NextHopIPv6(addr=nexthop)) else: raise ValueError('Invalid nexthop: %s' % nexthop) msg = zebra.ZebraMessage( version=self.zserv_ver, body=msg_cls( route_type=self.route_type, flags=flags, message=0, safi=safi, prefix=prefix, nexthops=nexthop_list, distance=distance, metric=metric, mtu=mtu, tag=tag)) self.send_msg(msg) return msg