我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用socket.getprotobyname()。
def ping_once(self): """ Returns the delay (in seconds) or none on timeout. """ icmp = socket.getprotobyname("icmp") try: sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) except socket.error as e: if e.errno == 1: # Not superuser, so operation not permitted e.msg += "ICMP messages can only be sent from root user processes" raise socket.error(e.msg) except Exception as e: print ("Exception: %s" %(e)) my_ID = os.getpid() & 0xFFFF self.send_ping(sock, my_ID) delay = self.receive_pong(sock, my_ID, self.timeout) sock.close() return delay
def getprotobyname(name): """Look up a protocol number by name. (Rarely used.) Like :func:`socket.getprotobyname`, but async. """ return await run_sync_in_worker_thread( _stdlib_socket.getprotobyname, name, cancellable=True ) # obsolete gethostbyname etc. intentionally omitted # likewise for create_connection (use open_tcp_stream instead) ################################################################ # Socket "constructors" ################################################################
def _create_socket(self): try: socket.inet_pton(socket.AF_INET, self.dest) dest_ip = self.dest except socket.error: try: dest_ip = socket.gethostbyname(self.dest) except socket.gaierror: self.ret_code = EXIT_STATUS.ERROR_HOST_NOT_FOUND return self.dest_ip = dest_ip try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp")) except socket.error as e: if e.errno == 1: self.ret_code = EXIT_STATUS.ERROR_ROOT_REQUIRED else: self.ret_code = EXIT_STATUS.ERROR_CANT_OPEN_SOCKET return
def ping_once(self): """ Returns the delay (in seconds) or none on timeout. """ icmp = socket.getprotobyname("icmp") try: sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) except socket.error, (errno, msg): if errno == 1: # Not superuser, so operation not permitted msg += "ICMP messages can only be sent from root user processes" raise socket.error(msg) except Exception, e: print "Exception: %s" %(e) my_ID = os.getpid() & 0xFFFF self.send_ping(sock, my_ID) delay = self.receive_pong(sock, my_ID, self.timeout) sock.close() return delay
def do_one(dest_addr, timeout): """ Returns either the delay (in seconds) or none on timeout. """ icmp = socket.getprotobyname("icmp") try: my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) except socket.error as xxx_todo_changeme: (errno, msg) = xxx_todo_changeme.args if errno == 1: # Operation not permitted msg = msg + ( " - Note that ICMP messages can only be sent from processes" " running as root." ) raise socket.error(msg) raise # raise the original error my_ID = os.getpid() & 0xFFFF send_one_ping(my_socket, dest_addr, my_ID) delay = receive_one_ping(my_socket, my_ID, timeout) my_socket.close() return delay
def ping_once(self, dest_addr, timeout, sequence): icmp = socket.getprotobyname('icmp') try: icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) except socket.error, (errno, msg): if errno == 1: msg = '%s : Just root can send ICMP Message' % msg raise socket.error(msg) raise packet_id = os.getpid() & 0xFFFF self.__send_icmp_request(icmp_socket, dest_addr, packet_id, sequence) delay = self.__receive_icmp_response(icmp_socket, packet_id, timeout) icmp_socket.close() return delay
def ping_once(ip_addr, timeout): """ return either delay (in second) or none on timeout. """ # Translate an Internet protocol name to a constant suitable for # passing as the (optional) third argument to the socket() function. # This is usually only needed for sockets opened in “raw” mode. icmp = socket.getprotobyname('icmp') try: # socket.socket([family[, type[, proto]]]) # Create a new socket using the given address family(default: AF_INET), # socket type(SOCK_STREAM) and protocol number(zero or may be omitted). my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) except socket.error: raise # Return the current process id. # int: 0xFFFF = -1, unsigned int: 65535 my_ID = os.getpid() & 0xFFFF send_ping(my_socket, ip_addr, my_ID) delay = receive_ping(my_socket, my_ID, timeout) my_socket.close() return delay
def make_sockets(): """Makes and returns the raw IPv6 and IPv4 ICMP sockets. This needs to run as root before dropping privileges. """ try: socketv6 = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.getprotobyname('ipv6-icmp')) except Exception: LOGGER.error("Could not create v6 socket") raise try: socketv4 = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname('icmp')) except Exception: LOGGER.error("Could not create v6 socket") raise return [socketv6, socketv4]
def ping_once(self): """ Returns the delay (in seconds) or none on timeout. """ icmp = socket.getprotobyname("icmp") try: sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) except socket.error, (errno, msg): if errno == 1: # Not superuser, so operation not permmited msg += "ICMP messages can only be sent from root user processes" raise socket.error(msg) except Exception, e: print "Exception: %s" %(e) my_ID = os.getpid() & 0xFFFF self.send_ping(sock, my_ID) delay = self.receive_pong(sock, my_ID, self.timeout) sock.close() return delay
def do_one(dest_addr, timeout): """ Returns either the delay (in seconds) or none on timeout. """ icmp = socket.getprotobyname("icmp") try: my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp) except socket.error, (errno, msg): if errno == 1: # Operation not permitted msg = msg + ( " - Note that ICMP messages can only be sent from processes" " running as root." ) raise socket.error(msg) raise # raise the original error my_ID = os.getpid() & 0xFFFF send_one_ping(my_socket, dest_addr, my_ID) delay = receive_one_ping(my_socket, my_ID, timeout) my_socket.close() return delay
def __icmpSocket(self): Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp")) return Sock
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True): address = tok.get_string() protocol = tok.get_string() if protocol.isdigit(): protocol = int(protocol) else: protocol = socket.getprotobyname(protocol) bitmap = bytearray() while 1: token = tok.get().unescape() if token.is_eol_or_eof(): break if token.value.isdigit(): serv = int(token.value) else: if protocol != _proto_udp and protocol != _proto_tcp: raise NotImplementedError("protocol must be TCP or UDP") if protocol == _proto_udp: protocol_text = "udp" else: protocol_text = "tcp" serv = socket.getservbyname(token.value, protocol_text) i = serv // 8 l = len(bitmap) if l < i + 1: for j in xrange(l, i + 1): bitmap.append(0) bitmap[i] = bitmap[i] | (0x80 >> (serv % 8)) bitmap = dns.rdata._truncate_bitmap(bitmap) return cls(rdclass, rdtype, address, protocol, bitmap)
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True): address = tok.get_string() protocol = tok.get_string() if protocol.isdigit(): protocol = int(protocol) else: protocol = socket.getprotobyname(protocol) bitmap = [] while 1: token = tok.get().unescape() if token.is_eol_or_eof(): break if token.value.isdigit(): serv = int(token.value) else: if protocol != _proto_udp and protocol != _proto_tcp: raise NotImplementedError("protocol must be TCP or UDP") if protocol == _proto_udp: protocol_text = "udp" else: protocol_text = "tcp" serv = socket.getservbyname(token.value, protocol_text) i = serv // 8 l = len(bitmap) if l < i + 1: for j in xrange(l, i + 1): bitmap.append('\x00') bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8))) bitmap = dns.rdata._truncate_bitmap(bitmap) return cls(rdclass, rdtype, address, protocol, bitmap)
def ping(host, timeout=2, ping_id=None, udp=False): """Ping remote host. :param str host: Host name/address. :param float timeout: timeout. :param int ping_id: 16 bit integer to identify packet. """ dest_addr = socket.gethostbyname(host) icmp = socket.getprotobyname('icmp') socket_type = socket.SOCK_DGRAM if udp else socket.SOCK_RAW raw_socket = socket.socket(socket.AF_INET, socket_type, icmp) ping_id = os.getpid() if ping_id is None else ping_id ping_id &= 0xffff seq_code = random.randint(1, 65535) latency = None start_ts = time.time() end_ts = start_ts + timeout send_ping(raw_socket, dest_addr, ping_id, seq_code) while time.time() < end_ts: r_ping_id, r_seq_code, r_recv_ts = receive_reply(raw_socket, timeout) if ping_id == r_ping_id and r_seq_code == seq_code: latency = r_recv_ts - start_ts break return latency
def ping(addr): """Send echo request (ping) to addr and get reply""" dest = '' try: dest = socket.gethostbyname(addr) except socket.gaierror: print('unknown host: {}'.format(addr)) return print('PING {} ({})'.format(addr, dest)) icmp_header = make_ping_request() # prepare payload icmp_header_bytes = BytesIO() icmp_header_bytes.write(icmp_header) icmp_header_bytes.seek(0) full_payload = icmp_header_bytes.read() print('request: {}'.format(full_payload.hex() if PY3 else full_payload.encode('hex'))) response = '' s = None try: s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname('icmp')) s.settimeout(1) start_time = time.time() s.sendto(full_payload, (dest, 0)) response, _ = s.recvfrom(256) end_time = time.time() except Exception as ex: print(ex) return finally: if s: s.close() if not response: return print('response: {}'.format(response.hex() if PY3 else response.encode('hex'))) print('time = {:.2f} ms'.format((end_time - start_time) * 1000)) # read IPv4 header response_bytes = BytesIO(response) ipv4_header = IPv4Header() response_bytes.readinto(ipv4_header) # read ICMP header icmp_echo_header = ICMPEchoHeader() response_bytes.readinto(icmp_echo_header) print('=====================================================') print('IPv4 header') print('=====================================================') print(ipv4_header) print('=====================================================') print('ICMP Echo header') print('=====================================================') print(icmp_echo_header)
def parse_args(self): (options, args) = OptionParser.parse_args(self) try: options.proto_num = socket.getprotobyname(options.protocol) except socket.error: exit("Unsupported protocol " + options.protocol) for f in ('bind_address','listen_address','connect_address'): setattr(options,f,addrparse(getattr(options,f,None))) for f in ('starttime','stoptime'): val=getattr(options,f,None) try: setattr(options,f,float(val)) continue except ValueError: pass try: ts=datetime.datetime.now() t=datetime.datetime.strptime(val,'%H:%M:%S') ts=ts.replace(hour=t.hour,minute=t.minute,second=t.second) setattr(options,f,float(ts.strftime("%Y%m%d%H%M%S"))) except ValueError: exit("Invalid time %s" % val) return options, args
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True): address = tok.get_string() protocol = tok.get_string() if protocol.isdigit(): protocol = int(protocol) else: protocol = socket.getprotobyname(protocol) bitmap = [] while 1: token = tok.get().unescape() if token.is_eol_or_eof(): break if token.value.isdigit(): serv = int(token.value) else: if protocol != _proto_udp and protocol != _proto_tcp: raise NotImplementedError("protocol must be TCP or UDP") if protocol == _proto_udp: protocol_text = "udp" else: protocol_text = "tcp" serv = socket.getservbyname(token.value, protocol_text) i = serv // 8 l = len(bitmap) if l < i + 1: for j in xrange(l, i + 1): bitmap.append('\x00') bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8))) bitmap = rdata._truncate_bitmap(bitmap) return cls(rdclass, rdtype, address, protocol, bitmap)