Python socket 模块,ntohs() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.ntohs()

项目:dmpt    作者:sistason    | 项目源码 | 文件源码
def validate(self, data):
        eth_length = 14

        eth_header = data[:eth_length]
        eth = struct.unpack('!6s6sH' , eth_header)
        eth_protocol = socket.ntohs(eth[2])
#        print 'Destination MAC : ' + self.eth_addr(data[0:6]) + ' Source MAC : ' + self.eth_addr(data[6:12]) + ' Protocol : ' + str(eth_protocol)

        ip_header = data[eth_length:20+eth_length]
        iph = struct.unpack('!BBHHHBBH4s4s' , ip_header)
        version_ihl = iph[0]
        ihl = version_ihl & 0xF

        iph_length = ihl * 4
        protocol = iph[6]
        s_addr = socket.inet_ntoa(iph[8])
        d_addr = socket.inet_ntoa(iph[9])

#        udp_header = data[iph_length+eth_length:iph_length+20+eth_length]
#        udph = struct.unpack('!HHLLBBHHH' , udp_header)
#        print "[{}] - {}:{} -> {}:{}".format(protocol, s_addr,udph[0], d_addr,udph[1])
项目:dmpt    作者:sistason    | 项目源码 | 文件源码
def validate(self, data):
        eth_length = 14

        eth_header = data[:eth_length]
        eth = struct.unpack('!6s6sH' , eth_header)
        eth_protocol = socket.ntohs(eth[2])
#        print 'Destination MAC : ' + self.eth_addr(data[0:6]) + ' Source MAC : ' + self.eth_addr(data[6:12]) + ' Protocol : ' + str(eth_protocol)

        ip_header = data[eth_length:20+eth_length]
        iph = struct.unpack('!BBHHHBBH4s4s' , ip_header)
        version_ihl = iph[0]
        ihl = version_ihl & 0xF

        iph_length = ihl * 4
        protocol = iph[6]
        s_addr = socket.inet_ntoa(iph[8])
        d_addr = socket.inet_ntoa(iph[9])

#        udp_header = data[iph_length+eth_length:iph_length+20+eth_length]
#        udph = struct.unpack('!HHLLBBHHH' , udp_header)
#        print "[{}] - {}:{} -> {}:{}".format(protocol, s_addr,udph[0], d_addr,udph[1])
项目:fascinatedNight    作者:songshixuan    | 项目源码 | 文件源码
def user_login(self):
        login = socket.ntohs(1)
        login = struct.pack('h',login)
        action   = socket.ntohs(2)
        peer_id  = socket.ntohl(632949210)
        myid = 632949211
        action   = struct.pack('h', action)
        peer_id  = struct.pack('I', peer_id)
        myid = struct.pack('I',myid)

        print ("user login...")
        msgbody_len = 4
        msgbody_len  = socket.ntohs(msgbody_len)
        msgbody_len  = struct.pack('h', msgbody_len)
        send_login = login + peer_id + msgbody_len + myid
        self.send(send_login)
项目:scapy-vxlan    作者:p4lang    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:flare-fakenet-ng    作者:fireeye    | 项目源码 | 文件源码
def get_pid_port_tcp(self, port):

        for item in self.get_extended_tcp_table():

            lPort = socket.ntohs(item.dwLocalPort)
            lAddr = socket.inet_ntoa(struct.pack('L', item.dwLocalAddr))
            pid   = item.dwOwningPid

            if lPort == port:
                return pid
        else:
            return None

    #################################################################################
    # The GetExtendedUdpTable function retrieves a table that contains a list of UDP endpoints available to the application.
    #
    # DWORD GetExtendedUdpTable(
    #   _Out_   PVOID           pUdpTable,
    #   _Inout_ PDWORD          pdwSize,
    #   _In_    BOOL            bOrder,
    #   _In_    ULONG           ulAf,
    #   _In_    UDP_TABLE_CLASS TableClass,
    #   _In_    ULONG           Reserved
    # );
项目:flare-fakenet-ng    作者:fireeye    | 项目源码 | 文件源码
def get_pid_port_udp(self, port):

        for item in self.get_extended_udp_table():

            lPort = socket.ntohs(item.dwLocalPort)
            lAddr = socket.inet_ntoa(struct.pack('L', item.dwLocalAddr))
            pid   = item.dwOwningPid

            if lPort == port:
                return pid
        else:
            return None

    ###############################################################################
    # Retrieves the name of the executable file for the specified process.
    #
    # DWORD WINAPI GetProcessImageFileName(
    #   _In_  HANDLE hProcess,
    #   _Out_ LPTSTR lpImageFileName,
    #   _In_  DWORD  nSize
    # );
项目:Malware    作者:vduddu    | 项目源码 | 文件源码
def main():
    connection=socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(3))
    while True:
        raw_data,addr=connection.recvfrom(65536)
        dest_mac, src_mac, eth_proto, data = ethernet_frame(raw_data)
        print('\nEthernet Frame:')
        print(TAB_1+'Destination: {}, Source: {}, Protocol: {}'.format(dest_mac,src_mac,eth_proto))

        if eth_proto==8:
            (version,header_length,ttl,proto,src,target,data)=ipv4_packet(data)
            print(TAB_1+'IPv4 Packet:')
            print(TAB_2 + 'Version: {}, Header Length: {}, TTL: {},'.format(version,header_length,ttl))
            print(TAB_2 + 'Protocol: {}, Source: {}, Target: {}'.format(proto, src, target))

        elif eth_proto==1:
             icmp = ICMP(ipv4.data)
项目:amadash    作者:ipartola    | 项目源码 | 文件源码
def parse_ethernet_header(self, packet):
        dst_mac = self.bytes_to_mac(packet[0:6])
        src_mac = self.bytes_to_mac(packet[6:12])
        eth_protocol = socket.ntohs((packet[13] << 8) + packet[12])

        return dst_mac, src_mac, eth_protocol
项目:PythonForWindows    作者:hakril    | 项目源码 | 文件源码
def remote_port(self):
        """:type: :class:`int`"""
        if not self.established:
            return None
        return socket.ntohs(self.dwRemotePort)
项目:PythonForWindows    作者:hakril    | 项目源码 | 文件源码
def local_port(self):
        """:type: :class:`int`"""
        return socket.ntohs(self.dwLocalPort)
项目:PythonForWindows    作者:hakril    | 项目源码 | 文件源码
def remote_port(self):
        """:type: :class:`int`"""
        if not self.established:
            return None
        return socket.ntohs(self.dwRemotePort)
项目:PythonForWindows    作者:hakril    | 项目源码 | 文件源码
def local_port(self):
        """:type: :class:`int`"""
        return socket.ntohs(self.dwLocalPort)
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:packet_analysis    作者:tanjiti    | 项目源码 | 文件源码
def getPortNumHostByteOrder(st_network):
    """
    Convert 16-bit positive integers from network to host byte order. On machines where the host byte order is the same as network byte order, this is a no-op; otherwise, it performs a 2-byte swap operation
    :param st_network:
    :return: 20480 - 80
    """
    try:
        st_network = int(st_network)
        return socket.ntohs(st_network)
    except Exception as e:
        logging.error("[NetworkByteOrderPortFalse]: %f %s" % (st_network, repr(e)))
项目:CVE-2016-6366    作者:RiskSense-Ops    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:Python-Network-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def convert_integer():
    data = 1234
    # 32-bit
    print ("Original: %s => Long  host byte order: %s, Network byte order: %s" %(data, socket.ntohl(data), socket.htonl(data)))
    # 16-bit
    print ("Original: %s => Short  host byte order: %s, Network byte order: %s" %(data, socket.ntohs(data), socket.htons(data)))
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def test_socket_has_some_reexports():
    assert tsocket.SOL_SOCKET == stdlib_socket.SOL_SOCKET
    assert tsocket.TCP_NODELAY == stdlib_socket.TCP_NODELAY
    assert tsocket.gaierror == stdlib_socket.gaierror
    assert tsocket.ntohs == stdlib_socket.ntohs


################################################################
# name resolution
################################################################
项目:Theseus    作者:Dylan-halls    | 项目源码 | 文件源码
def __init__(self, interface):
        global s, redirect_to_mac, arp
        super(Arp_Spoof, self).__init__()
        s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0800))
        try:
            s.bind((interface, socket.htons(0x0800)))
        except socket.error:
            print("\033[1;31mUnable to bind to interface... unknown type\033[00m")
            exit()
项目:Theseus    作者:Dylan-halls    | 项目源码 | 文件源码
def __init__(self, interface):
        global s, sent, rev
        sent = 0
        rev = 0
        super(Arp_Ping, self).__init__()
        s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0800))
        try:
            s.bind((interface, socket.htons(0x0800)))
        except socket.error:
            print("\033[1;31mUnable to bind to interface... unknown type\033[00m")
            exit()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1<<34)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1, 2, 3 ]
        bad_values = [ -1, -2, -3, -1, -2, -3 ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises((OverflowError, ValueError), socket.ntohl, k)
            self.assertRaises((OverflowError, ValueError), socket.ntohs, k)
            self.assertRaises((OverflowError, ValueError), socket.htonl, k)
            self.assertRaises((OverflowError, ValueError), socket.htons, k)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:scapy-bpf    作者:guedou    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:scapy-radio    作者:BastilleResearch    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:wireless-network-reproduction    作者:FinalTheory    | 项目源码 | 文件源码
def get_total_length(self):
        return ntohs(self.ip_len)
项目:isf    作者:w3h    | 项目源码 | 文件源码
def reverse(self, val):
        if self.size == 16:
            val = socket.ntohs(val)
        elif self.size == 32:
            val = socket.ntohl(val)
        return val
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1<<34)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1, 2, 3 ]
        bad_values = [ -1, -2, -3, -1, -2, -3 ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:ryu-lagopus-ext    作者:lagopus    | 项目源码 | 文件源码
def checksum(data):
    data = six.binary_type(data)    # input can be bytearray.
    if len(data) % 2:
        data += b'\x00'

    s = sum(array.array('H', data))
    s = (s & 0xffff) + (s >> 16)
    s += (s >> 16)
    return socket.ntohs(~s & 0xffff)


# avoid circular import
项目:namedstruct    作者:hubo1016    | 项目源码 | 文件源码
def checksum(data):
    if len(data) % 2:
        data += b'\x00'
    s = sum(array.array('H',data))
    s = (s & 0xffff) + (s >> 16)
    s += (s >> 16)
    return _socket.ntohs(~s & 0xffff)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises((OverflowError, ValueError), socket.ntohl, k)
            self.assertRaises((OverflowError, ValueError), socket.ntohs, k)
            self.assertRaises((OverflowError, ValueError), socket.htonl, k)
            self.assertRaises((OverflowError, ValueError), socket.htons, k)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1<<34)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1, 2, 3 ]
        bad_values = [ -1, -2, -3, -1, -2, -3 ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:fascinatedNight    作者:songshixuan    | 项目源码 | 文件源码
def worker_send(self):
        while True:
           msgbody = input('say sth: ')
           msgbody = msgbody.rstrip()
           msgbody_len = len(msgbody)
           msgbody_len  = socket.ntohs(msgbody_len)
           msgbody_len  = struct.pack('h', msgbody_len)
           send_msg = action + peer_id + msgbody_len + msgbody.encode('utf-8')
           self.send(send_msg)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1L<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1L<<34)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1L, 2L, 3L ]
        bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)
项目:fenrir-ocd    作者:Orange-Cyberdefense    | 项目源码 | 文件源码
def __init__(self):
        self.hostmac = ""
        self.hostip = ""
        self.conf = True
        self.ifaceHost = "em1"
        self.ifaceNetwork = "eth0"
        self.sockHost = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
        self.sockNetwork = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
        try:
            self.sockHost.bind((self.ifaceHost, 0))
            self.sockNetwork.bind((self.ifaceNetwork, 0))
        except:
            #exit("You need 2 physical network interfaces to use FENRIR !")
            print("You need 2 physical network interfaces to use FENRIR !")
        self.inputs = [self.sockHost, self.sockNetwork]
项目:fenrir-ocd    作者:Orange-Cyberdefense    | 项目源码 | 文件源码
def bindAllIface(self):
        self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
项目:snmpsim    作者:etingof    | 项目源码 | 文件源码
def parsePacket(s):
    d = {}

    # http://www.tcpdump.org/linktypes.html
    llHeaders = {
        0: 4,
        1: 14,
        108: 4,
        228: 0
    }

    if pcapObj.datalink() in llHeaders:
        s = s[llHeaders[pcapObj.datalink()]:]
    else:
        stats['unknown L2 protocol'] += 1

    d['version'] = (ord(s[0]) & 0xf0) >> 4
    d['header_len'] = ord(s[0]) & 0x0f
    d['tos'] = ord(s[1])
    d['total_len'] = socket.ntohs(struct.unpack('H', s[2:4])[0])
    d['id'] = socket.ntohs(struct.unpack('H', s[4:6])[0])
    d['flags'] = (ord(s[6]) & 0xe0) >> 5
    d['fragment_offset'] = socket.ntohs(struct.unpack('H', s[6:8])[0] & 0x1f)
    d['ttl'] = ord(s[8])
    d['protocol'] = ord(s[9])
    d['checksum'] = socket.ntohs(struct.unpack('H', s[10:12])[0])
    d['source_address'] = pcap.ntoa(struct.unpack('i', s[12:16])[0])
    d['destination_address'] = pcap.ntoa(struct.unpack('i', s[16:20])[0])
    if d['header_len'] > 5:
        d['options'] = s[20:4 * (d['header_len'] - 5)]
    else:
        d['options'] = None
    s = s[4 * d['header_len']:]
    if d['protocol'] == 17:
        d['source_port'] = socket.ntohs(struct.unpack('H', s[0:2])[0])
        d['destination_port'] = socket.ntohs(struct.unpack('H', s[2:4])[0])
        s = s[8:]
        stats['UDP packets'] += 1
    d['data'] = s
    stats['IP packets'] += 1
    return d
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testNtoH(self):
        # This just checks that htons etc. are their own inverse,
        # when looking at the lower 16 or 32 bits.
        sizes = {socket.htonl: 32, socket.ntohl: 32,
                 socket.htons: 16, socket.ntohs: 16}
        for func, size in sizes.items():
            mask = (1<<size) - 1
            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
                self.assertEqual(i & mask, func(func(i&mask)) & mask)

            swapped = func(mask)
            self.assertEqual(swapped & mask, mask)
            self.assertRaises(OverflowError, func, 1<<34)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testNtoHErrors(self):
        good_values = [ 1, 2, 3, 1, 2, 3 ]
        bad_values = [ -1, -2, -3, -1, -2, -3 ]
        for k in good_values:
            socket.ntohl(k)
            socket.ntohs(k)
            socket.htonl(k)
            socket.htons(k)
        for k in bad_values:
            self.assertRaises(OverflowError, socket.ntohl, k)
            self.assertRaises(OverflowError, socket.ntohs, k)
            self.assertRaises(OverflowError, socket.htonl, k)
            self.assertRaises(OverflowError, socket.htons, k)