我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用socket.AF_PACKET。
def send_arp_reply(request): if request.haslayer(ARP): global _current_number_of_packets global _current_network_interface global _current_mac_address global _arp SOCK = socket(AF_PACKET, SOCK_RAW) SOCK.bind((_current_network_interface, 0)) if request[ARP].op == 1: if request[Ether].dst == "ff:ff:ff:ff:ff:ff" and request[ARP].hwdst == "00:00:00:00:00:00": print Base.c_info + "ARP request from MAC: " + request[ARP].hwsrc + " IP: " + request[ARP].pdst reply = _arp.make_response(ethernet_src_mac=_current_mac_address, ethernet_dst_mac=request[ARP].hwsrc, sender_mac=_current_mac_address, sender_ip=request[ARP].pdst, target_mac=request[ARP].hwsrc, target_ip=request[ARP].psrc) SOCK.send(reply) _current_number_of_packets += 1 if _current_number_of_packets >= _number_of_packets: SOCK.close() exit(0)
def send_dhcpv6_solicit(): Client_DUID = dhcpv6r.get_client_duid(macsrc) request_options = [23, 24] pkt = dhcpv6r.make_solicit_packet(ethernet_src_mac=macsrc, ipv6_src=ipv6src_link, transaction_id=randint(1, 16777215), client_identifier=Client_DUID, option_request_list=request_options) try: SOCK = socket(AF_PACKET, SOCK_RAW) SOCK.bind((current_network_interface, 0)) SOCK.send(pkt) print Base.c_info + "Send Solicit request to: [ff02::1:2]:547" SOCK.close() except: print Base.c_error + "Do not send Solicit request." exit(1)
def __init__(self, interface_name, on_ip_incoming, on_ip_outgoing): self.interface_name = interface_name self.on_ip_incoming = on_ip_incoming self.on_ip_outgoing = on_ip_outgoing # The raw in (listen) socket is a L2 raw socket that listens # for all packets going through a specific interface. # SOL_SOCKET: SO_LINGER, SO_RCVBUF, SO_SNDBUF, TCP_NODELAY # SO_LINGER ???????????????????TIME_WAIT????? # ????????????????????tcp?? # SO_RCVBUF?SO_SNDBUF ???????????????? # SO_SNDBUF ?? MSS??????? 2MSS # TCP_NODELAY ?? nagle ?? self.sock = socket.socket( socket.AF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_IP)) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 2 ** 30) self.sock.bind((self.interface_name, ETH_P_IP))
def server_socket_init(self): if "stdin" in self.local_end_type: return try: if "windows" in system().lower() or "cygwin" in system().lower(): self.server_socket.bind((self.lhost,self.lport)) elif self.server_socket.family == socket.AF_PACKET: # case normal L2 socket if self.server_socket.proto == 0: return # case promiscuous socket if self.server_socket.proto == 0x300: return elif self.server_socket.family == socket.AF_UNIX: self.server_socket.bind((self.lhost)) elif "darwin" not in system().lower() and self.server_socket.family == socket.AF_PACKET: self.server_socket.bind((self.lhost,0)) else: self.server_socket.bind((self.lhost,self.lport)) output("[*.*] Listening on %s:%s" % (self.lhost,str(self.lport)),CYAN) except Exception as e: output(str(e),YELLOW) output("[x.x] Unable to bind to %s:%s" % (self.lhost,str(self.lport)) ,RED) sys.exit(0) output("[$.$] local:%s|remote:%s" % (self.local_end_type,self.remote_end_type), GREEN) if self.local_end_type in ConnectionBased: try: self.server_socket.listen(self.max_conns) except Exception as e: output(e)
def ack_sender(): SOCK = socket(AF_PACKET, SOCK_RAW) SOCK.bind((current_network_interface, 0)) ack_packet = make_dhcp_ack_packet(transaction_id_global, requested_ip_address) while True: SOCK.send(ack_packet) sleep(0.01)
def discover_sender(): SOCK = socket(AF_PACKET, SOCK_RAW) SOCK.bind((current_network_interface, 0)) while True: discover_packet = dhcp.make_discover_packet(source_mac=your_mac_address, client_mac=eth.get_random_mac()) SOCK.send(discover_packet) sleep(0.01)
def _arp_loop(self): try: with contextlib.closing( socket.socket( socket.AF_PACKET, socket.SOCK_RAW, socket.htons(ether.ETH_TYPE_ARP))) as packet_socket: packet_socket.bind((self.interface.device_name, socket.htons(ether.ETH_TYPE_ARP), socket.PACKET_BROADCAST, arp.ARP_HW_TYPE_ETHERNET, mac_lib.BROADCAST)) self._arp_loop_socket(packet_socket) except greenlet.GreenletExit: # suppress thread.kill exception pass
def test_net_if_addrs(self): nics = psutil.net_if_addrs() assert nics, nics nic_stats = psutil.net_if_stats() # Not reliable on all platforms (net_if_addrs() reports more # interfaces). # self.assertEqual(sorted(nics.keys()), # sorted(psutil.net_io_counters(pernic=True).keys())) families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK]) for nic, addrs in nics.items(): self.assertIsInstance(nic, (str, unicode)) self.assertEqual(len(set(addrs)), len(addrs)) for addr in addrs: self.assertIsInstance(addr.family, int) self.assertIsInstance(addr.address, str) self.assertIsInstance(addr.netmask, (str, type(None))) self.assertIsInstance(addr.broadcast, (str, type(None))) self.assertIn(addr.family, families) if sys.version_info >= (3, 4): self.assertIsInstance(addr.family, enum.IntEnum) if nic_stats[nic].isup: # Do not test binding to addresses of interfaces # that are down if addr.family == socket.AF_INET: s = socket.socket(addr.family) with contextlib.closing(s): s.bind((addr.address, 0)) elif addr.family == socket.AF_INET6: info = socket.getaddrinfo( addr.address, 0, socket.AF_INET6, socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0] af, socktype, proto, canonname, sa = info s = socket.socket(af, socktype, proto) with contextlib.closing(s): s.bind(sa) for ip in (addr.address, addr.netmask, addr.broadcast, addr.ptp): if ip is not None: # TODO: skip AF_INET6 for now because I get: # AddressValueError: Only hex digits permitted in # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0' if addr.family != AF_INET6: check_net_address(ip, addr.family) # broadcast and ptp addresses are mutually exclusive if addr.broadcast: self.assertIsNone(addr.ptp) elif addr.ptp: self.assertIsNone(addr.broadcast) if BSD or OSX or SUNOS: if hasattr(socket, "AF_LINK"): self.assertEqual(psutil.AF_LINK, socket.AF_LINK) elif LINUX: self.assertEqual(psutil.AF_LINK, socket.AF_PACKET) elif WINDOWS: self.assertEqual(psutil.AF_LINK, -1)
def test_net_if_addrs(self): nics = psutil.net_if_addrs() assert nics, nics nic_stats = psutil.net_if_stats() # Not reliable on all platforms (net_if_addrs() reports more # interfaces). # self.assertEqual(sorted(nics.keys()), # sorted(psutil.net_io_counters(pernic=True).keys())) families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK]) for nic, addrs in nics.items(): self.assertEqual(len(set(addrs)), len(addrs)) for addr in addrs: self.assertIsInstance(addr.family, int) self.assertIsInstance(addr.address, str) self.assertIsInstance(addr.netmask, (str, type(None))) self.assertIsInstance(addr.broadcast, (str, type(None))) self.assertIn(addr.family, families) if sys.version_info >= (3, 4): self.assertIsInstance(addr.family, enum.IntEnum) if nic_stats[nic].isup: # Do not test binding to addresses of interfaces # that are down if addr.family == socket.AF_INET: s = socket.socket(addr.family) with contextlib.closing(s): s.bind((addr.address, 0)) elif addr.family == socket.AF_INET6: info = socket.getaddrinfo( addr.address, 0, socket.AF_INET6, socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0] af, socktype, proto, canonname, sa = info s = socket.socket(af, socktype, proto) with contextlib.closing(s): s.bind(sa) for ip in (addr.address, addr.netmask, addr.broadcast, addr.ptp): if ip is not None: # TODO: skip AF_INET6 for now because I get: # AddressValueError: Only hex digits permitted in # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0' if addr.family != AF_INET6: check_net_address(ip, addr.family) # broadcast and ptp addresses are mutually exclusive if addr.broadcast: self.assertIsNone(addr.ptp) elif addr.ptp: self.assertIsNone(addr.broadcast) if BSD or OSX or SUNOS: if hasattr(socket, "AF_LINK"): self.assertEqual(psutil.AF_LINK, socket.AF_LINK) elif LINUX: self.assertEqual(psutil.AF_LINK, socket.AF_PACKET) elif WINDOWS: self.assertEqual(psutil.AF_LINK, -1)