Python socket 模块,getprotobyname() 实例源码

我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用socket.getprotobyname()

项目:Python-Network-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def ping_once(self):
        """
        Returns the delay (in seconds) or none on timeout.
        """
        icmp = socket.getprotobyname("icmp")
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
        except socket.error as e:
            if e.errno == 1:
                # Not superuser, so operation not permitted
                e.msg +=  "ICMP messages can only be sent from root user processes"
                raise socket.error(e.msg)
        except Exception as e:
            print ("Exception: %s" %(e))

        my_ID = os.getpid() & 0xFFFF

        self.send_ping(sock, my_ID)
        delay = self.receive_pong(sock, my_ID, self.timeout)
        sock.close()
        return delay
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def getprotobyname(name):
    """Look up a protocol number by name. (Rarely used.)

    Like :func:`socket.getprotobyname`, but async.

    """
    return await run_sync_in_worker_thread(
        _stdlib_socket.getprotobyname, name, cancellable=True
    )


# obsolete gethostbyname etc. intentionally omitted
# likewise for create_connection (use open_tcp_stream instead)

################################################################
# Socket "constructors"
################################################################
项目:netmet    作者:godaddy    | 项目源码 | 文件源码
def _create_socket(self):
        try:
            socket.inet_pton(socket.AF_INET, self.dest)
            dest_ip = self.dest
        except socket.error:
            try:
                dest_ip = socket.gethostbyname(self.dest)
            except socket.gaierror:
                self.ret_code = EXIT_STATUS.ERROR_HOST_NOT_FOUND
                return
        self.dest_ip = dest_ip

        try:
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_RAW,
                                      socket.getprotobyname("icmp"))
        except socket.error as e:
            if e.errno == 1:
                self.ret_code = EXIT_STATUS.ERROR_ROOT_REQUIRED
            else:
                self.ret_code = EXIT_STATUS.ERROR_CANT_OPEN_SOCKET
            return
项目:isf    作者:w3h    | 项目源码 | 文件源码
def ping_once(self):
        """
        Returns the delay (in seconds) or none on timeout.
        """
        icmp = socket.getprotobyname("icmp")
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
        except socket.error, (errno, msg):
            if errno == 1:
                # Not superuser, so operation not permitted
                msg +=  "ICMP messages can only be sent from root user processes"
                raise socket.error(msg)
        except Exception, e:
            print "Exception: %s" %(e)

        my_ID = os.getpid() & 0xFFFF

        self.send_ping(sock, my_ID)
        delay = self.receive_pong(sock, my_ID, self.timeout)
        sock.close()
        return delay
项目:g3ar    作者:VillanCh    | 项目源码 | 文件源码
def do_one(dest_addr, timeout):
    """
    Returns either the delay (in seconds) or none on timeout.
    """
    icmp = socket.getprotobyname("icmp")
    try:
        my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
    except socket.error as xxx_todo_changeme:
        (errno, msg) = xxx_todo_changeme.args
        if errno == 1:
            # Operation not permitted
            msg = msg + (
                " - Note that ICMP messages can only be sent from processes"
                " running as root."
            )
            raise socket.error(msg)
        raise # raise the original error

    my_ID = os.getpid() & 0xFFFF

    send_one_ping(my_socket, dest_addr, my_ID)
    delay = receive_one_ping(my_socket, my_ID, timeout)

    my_socket.close()
    return delay
项目:g3ar    作者:VillanCh    | 项目源码 | 文件源码
def do_one(dest_addr, timeout):
    """
    Returns either the delay (in seconds) or none on timeout.
    """
    icmp = socket.getprotobyname("icmp")
    try:
        my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
    except socket.error as xxx_todo_changeme:
        (errno, msg) = xxx_todo_changeme.args
        if errno == 1:
            # Operation not permitted
            msg = msg + (
                " - Note that ICMP messages can only be sent from processes"
                " running as root."
            )
            raise socket.error(msg)
        raise # raise the original error

    my_ID = os.getpid() & 0xFFFF

    send_one_ping(my_socket, dest_addr, my_ID)
    delay = receive_one_ping(my_socket, my_ID, timeout)

    my_socket.close()
    return delay
项目:py-sys    作者:vicky-tan    | 项目源码 | 文件源码
def ping_once(self, dest_addr, timeout, sequence):
        icmp = socket.getprotobyname('icmp')

        try:
            icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
        except socket.error, (errno, msg):
            if errno == 1:
                msg = '%s : Just root can send ICMP Message' % msg
                raise socket.error(msg)
            raise

        packet_id = os.getpid() & 0xFFFF

        self.__send_icmp_request(icmp_socket, dest_addr, packet_id, sequence)
        delay = self.__receive_icmp_response(icmp_socket, packet_id, timeout)

        icmp_socket.close()
        return delay
项目:Network    作者:Momingcoder    | 项目源码 | 文件源码
def ping_once(ip_addr, timeout):
    """
    return either delay (in second) or none on timeout.
    """
    # Translate an Internet protocol name to a constant suitable for
    # passing as the (optional) third argument to the socket() function.
    # This is usually only needed for sockets opened in “raw” mode.
    icmp = socket.getprotobyname('icmp')
    try:
        # socket.socket([family[, type[, proto]]])
        # Create a new socket using the given address family(default: AF_INET),
        # socket type(SOCK_STREAM) and protocol number(zero or may be omitted).
        my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
    except socket.error:
        raise

    # Return the current process id.
    # int: 0xFFFF = -1, unsigned int: 65535
    my_ID = os.getpid() & 0xFFFF

    send_ping(my_socket, ip_addr, my_ID)
    delay = receive_ping(my_socket, my_ID, timeout)

    my_socket.close()
    return delay
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def make_sockets():
    """Makes and returns the raw IPv6 and IPv4 ICMP sockets.

    This needs to run as root before dropping privileges.

    """
    try:
        socketv6 = socket.socket(socket.AF_INET6, socket.SOCK_RAW,
                                 socket.getprotobyname('ipv6-icmp'))
    except Exception:
        LOGGER.error("Could not create v6 socket")
        raise

    try:
        socketv4 = socket.socket(socket.AF_INET, socket.SOCK_RAW,
                                 socket.getprotobyname('icmp'))
    except Exception:
        LOGGER.error("Could not create v6 socket")
        raise

    return [socketv6, socketv4]
项目:minihydra    作者:VillanCh    | 项目源码 | 文件源码
def do_one(dest_addr, timeout):
    """
    Returns either the delay (in seconds) or none on timeout.
    """
    icmp = socket.getprotobyname("icmp")
    try:
        my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
    except socket.error as xxx_todo_changeme:
        (errno, msg) = xxx_todo_changeme.args
        if errno == 1:
            # Operation not permitted
            msg = msg + (
                " - Note that ICMP messages can only be sent from processes"
                " running as root."
            )
            raise socket.error(msg)
        raise # raise the original error

    my_ID = os.getpid() & 0xFFFF

    send_one_ping(my_socket, dest_addr, my_ID)
    delay = receive_one_ping(my_socket, my_ID, timeout)

    my_socket.close()
    return delay
项目:011_python_network_programming_cookbook_demo    作者:jerry-0824    | 项目源码 | 文件源码
def ping_once(self):
        """
        Returns the delay (in seconds) or none on timeout.
        """

        icmp = socket.getprotobyname("icmp")
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
        except socket.error, (errno, msg):
            if errno == 1:
                # Not superuser, so operation not permmited
                msg += "ICMP messages can only be sent from root user processes"
                raise socket.error(msg)
        except Exception, e:
            print "Exception: %s" %(e)

        my_ID = os.getpid() & 0xFFFF

        self.send_ping(sock, my_ID)
        delay = self.receive_pong(sock, my_ID, self.timeout)
        sock.close()
        return delay
项目:Automation-Framework-for-devices    作者:tok-gogogo    | 项目源码 | 文件源码
def do_one(dest_addr, timeout):
    """
    Returns either the delay (in seconds) or none on timeout.
    """
    icmp = socket.getprotobyname("icmp")
    try:
        my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
    except socket.error, (errno, msg):
        if errno == 1:
            # Operation not permitted
            msg = msg + (
                " - Note that ICMP messages can only be sent from processes"
                " running as root."
            )
            raise socket.error(msg)
        raise # raise the original error

    my_ID = os.getpid() & 0xFFFF

    send_one_ping(my_socket, dest_addr, my_ID)
    delay = receive_one_ping(my_socket, my_ID, timeout)

    my_socket.close()
    return delay
项目:F-Scrack    作者:y1ng1996    | 项目源码 | 文件源码
def __icmpSocket(self):
        Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
        return Sock
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = bytearray()
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append(0)
            bitmap[i] = bitmap[i] | (0x80 >> (serv % 8))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = []
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append('\x00')
            bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:do-latency    作者:dizballanze    | 项目源码 | 文件源码
def ping(host, timeout=2, ping_id=None, udp=False):
    """Ping remote host.

    :param str host: Host name/address.
    :param float timeout: timeout.
    :param int ping_id: 16 bit integer to identify packet.
    """
    dest_addr = socket.gethostbyname(host)
    icmp = socket.getprotobyname('icmp')
    socket_type = socket.SOCK_DGRAM if udp else socket.SOCK_RAW
    raw_socket = socket.socket(socket.AF_INET, socket_type, icmp)

    ping_id = os.getpid() if ping_id is None else ping_id
    ping_id &= 0xffff

    seq_code = random.randint(1, 65535)

    latency = None

    start_ts = time.time()
    end_ts = start_ts + timeout

    send_ping(raw_socket, dest_addr, ping_id, seq_code)

    while time.time() < end_ts:
        r_ping_id, r_seq_code, r_recv_ts = receive_reply(raw_socket, timeout)
        if ping_id == r_ping_id and r_seq_code == seq_code:
            latency = r_recv_ts - start_ts
            break

    return latency
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = []
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append('\x00')
            bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:SameKeyProxy    作者:xzhou    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = []
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append('\x00')
            bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:networks    作者:delimitry    | 项目源码 | 文件源码
def ping(addr):
    """Send echo request (ping) to addr and get reply"""
    dest = ''
    try:
        dest = socket.gethostbyname(addr)
    except socket.gaierror:
        print('unknown host: {}'.format(addr))
        return

    print('PING {} ({})'.format(addr, dest))
    icmp_header = make_ping_request()

    # prepare payload
    icmp_header_bytes = BytesIO()
    icmp_header_bytes.write(icmp_header)
    icmp_header_bytes.seek(0)

    full_payload = icmp_header_bytes.read()
    print('request: {}'.format(full_payload.hex() if PY3 else full_payload.encode('hex')))

    response = ''
    s = None
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname('icmp'))
        s.settimeout(1)
        start_time = time.time()
        s.sendto(full_payload, (dest, 0))
        response, _ = s.recvfrom(256)
        end_time = time.time()
    except Exception as ex:
        print(ex)
        return
    finally:
        if s:
            s.close()
    if not response:
        return
    print('response: {}'.format(response.hex() if PY3 else response.encode('hex')))
    print('time = {:.2f} ms'.format((end_time - start_time) * 1000))

    # read IPv4 header
    response_bytes = BytesIO(response)
    ipv4_header = IPv4Header()
    response_bytes.readinto(ipv4_header)

    # read ICMP header
    icmp_echo_header = ICMPEchoHeader()
    response_bytes.readinto(icmp_echo_header)

    print('=====================================================')
    print('IPv4 header')
    print('=====================================================')
    print(ipv4_header)

    print('=====================================================')
    print('ICMP Echo header')
    print('=====================================================')
    print(icmp_echo_header)
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = bytearray()
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append(0)
            bitmap[i] = bitmap[i] | (0x80 >> (serv % 8))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:heartbreaker    作者:lokori    | 项目源码 | 文件源码
def parse_args(self):
    (options, args) = OptionParser.parse_args(self)
    try:
      options.proto_num = socket.getprotobyname(options.protocol)
    except socket.error:
      exit("Unsupported protocol " + options.protocol)

    for f in ('bind_address','listen_address','connect_address'):
      setattr(options,f,addrparse(getattr(options,f,None)))

    for f in ('starttime','stoptime'):
      val=getattr(options,f,None)
      try:
        setattr(options,f,float(val))
        continue
      except ValueError:
        pass
      try:
        ts=datetime.datetime.now()
        t=datetime.datetime.strptime(val,'%H:%M:%S')
        ts=ts.replace(hour=t.hour,minute=t.minute,second=t.second)
        setattr(options,f,float(ts.strftime("%Y%m%d%H%M%S")))
      except ValueError:
        exit("Invalid time %s" % val)

    return options, args
项目:xunfeng    作者:ysrc    | 项目源码 | 文件源码
def __icmpSocket(self):
        Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
        return Sock
项目:ysrc    作者:myDreamShadow    | 项目源码 | 文件源码
def __icmpSocket(self):
        Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
        return Sock
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = bytearray()
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append(0)
            bitmap[i] = bitmap[i] | (0x80 >> (serv % 8))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:cheapstream    作者:miltador    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = []
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append('\x00')
            bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:Infrax-as-Code-1000-webservers-in-40-minutes    作者:ezeeetm    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = bytearray()
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append(0)
            bitmap[i] = bitmap[i] | (0x80 >> (serv % 8))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def __icmpSocket(self):
        Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
        return Sock
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def __icmpSocket(self):
        Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
        return Sock
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def __icmpSocket(self):
        Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
        return Sock
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = []
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append('\x00')
            bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = []
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append('\x00')
            bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:VulScript    作者:y1ng1996    | 项目源码 | 文件源码
def __icmpSocket(self):
        Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
        return Sock
项目:minihydra    作者:VillanCh    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = bytearray()
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append(0)
            bitmap[i] = bitmap[i] | (0x80 >> (serv % 8))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:00scanner    作者:xiaoqin00    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = []
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append('\x00')
            bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
        bitmap = rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:00scanner    作者:xiaoqin00    | 项目源码 | 文件源码
def __icmpSocket(self):
        Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
        return Sock
项目:mysplunk_csc    作者:patel-bhavin    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = []
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append('\x00')
            bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = bytearray()
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append(0)
            bitmap[i] = bitmap[i] | (0x80 >> (serv % 8))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:inwx-zonefile-sync    作者:lukas2511    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = bytearray()
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append(0)
            bitmap[i] = bitmap[i] | (0x80 >> (serv % 8))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)
项目:spiderfoot    作者:ParrotSec    | 项目源码 | 文件源码
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
        address = tok.get_string()
        protocol = tok.get_string()
        if protocol.isdigit():
            protocol = int(protocol)
        else:
            protocol = socket.getprotobyname(protocol)
        bitmap = []
        while 1:
            token = tok.get().unescape()
            if token.is_eol_or_eof():
                break
            if token.value.isdigit():
                serv = int(token.value)
            else:
                if protocol != _proto_udp and protocol != _proto_tcp:
                    raise NotImplementedError("protocol must be TCP or UDP")
                if protocol == _proto_udp:
                    protocol_text = "udp"
                else:
                    protocol_text = "tcp"
                serv = socket.getservbyname(token.value, protocol_text)
            i = serv // 8
            l = len(bitmap)
            if l < i + 1:
                for j in xrange(l, i + 1):
                    bitmap.append('\x00')
            bitmap[i] = chr(ord(bitmap[i]) | (0x80 >> (serv % 8)))
        bitmap = dns.rdata._truncate_bitmap(bitmap)
        return cls(rdclass, rdtype, address, protocol, bitmap)