Python struct 模块,unpack() 实例源码


项目:shellgen    作者:MarioVilas    | 项目源码 | 文件源码
def encode_8(bytes, key, terminator):
        Encode the bytecode with the given 8-bit XOR key.

        :type  bytes: str
        :param bytes: Bytecode to encode.

        :type  key: str
        :param key: 8-bit XOR key.

        :type  terminator: str
        :param terminator: 8-bit terminator.

        :rtype:  str
        :return: Encoded bytecode.
        if not bytes.endswith(terminator):
            bytes += terminator
        fmt = "B" * len(bytes)
        unpack = struct.unpack
        pad = unpack("B", key) * len(bytes)
        bytes = unpack(fmt, bytes)
        bytes = [ bytes[i] ^ pad[i] for i in xrange(len(bytes)) ]
        return struct.pack(fmt, *bytes)
项目:Cypher    作者:NullArray    | 项目源码 | 文件源码
def decrypt_file(key, in_filename, out_filename=None, chunksize=24*1024):

    # Split .crypt extension to restore file format
    if not out_filename:
        out_filename = os.path.splitext(in_filename)[0]

    with open(in_filename, 'rb') as infile:
        origsize = struct.unpack('<Q','Q')))[0]
        iv =
        decryptor =, AES.MODE_CBC, iv)

        with open(out_filename, 'wb') as outfile:
            while True:
                chunk =
                if len(chunk) == 0:

        # Truncate file to original size
项目:pbtk    作者:marin-m    | 项目源码 | 文件源码
def parse_default(field, ftype, fdefault):
    if not (ftype == 'bool' and fdefault == 'true'):
            fdefault = literal_eval(fdefault.rstrip('LDF'))
        except (ValueError, SyntaxError):
            fdefault = None

    if type(fdefault) is int:
        if ftype[0] != 'u' and ftype[:5] != 'fixed':
            if fdefault >> 63:
                fdefault = c_long(fdefault).value
            elif fdefault >> 31 and ftype[-2:] != '64':
                fdefault = c_int(fdefault).value
            fdefault &= (1 << int(ftype[-2:])) - 1

        if ftype == 'float' and abs(fdefault) >> 23:
            fdefault = unpack('=f', pack('=i', fdefault))[0]
        elif ftype == 'double' and abs(fdefault) >> 52:
            fdefault = unpack('=d', pack('=q', fdefault))[0]

    if fdefault:
        field.default_value = str(fdefault)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def parse_bgzf_header(f):
    cur_pos = f.tell()
    header_fmt = "BBBBIBBH"

    d =

    # We are at EOF when read returns an empty string
    if d == '':
        return None

    header = struct.unpack(header_fmt, d)

    # Check for a valid gzip header
    if header[0] != 31 or header[1] != 139:
        raise Exception("Not a valid gzip header")

    xlen = header[7]
    bsize = get_bsize(f, f.tell(), xlen)

    next_pos = cur_pos + bsize + 1
    return next_pos
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def get_user_hashes(user_key, hbootkey):
    samaddr =
    rid = int(user_key.Name, 16)
    V = None
    for v in values(user_key):
        if v.Name == 'V':
            V =, v.DataLength.value)
    if not V: return None

    hash_offset = unpack("<L", V[0x9c:0x9c+4])[0] + 0xCC

    lm_exists = True if unpack("<L", V[0x9c+4:0x9c+8])[0] == 20 else False
    nt_exists = True if unpack("<L", V[0x9c+16:0x9c+20])[0] == 20 else False

    enc_lm_hash = V[hash_offset+4:hash_offset+20] if lm_exists else ""
    enc_nt_hash = V[hash_offset+(24 if lm_exists else 8):hash_offset+(24 if lm_exists else 8)+16] if nt_exists else ""

    return decrypt_hashes(rid, enc_lm_hash, enc_nt_hash, hbootkey)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def decode(self, offset):
        """Decode a section of the data section starting at offset

        offset -- the location of the data structure to decode
        new_offset = offset + 1
        (ctrl_byte,) = struct.unpack(b'!B', self._buffer[offset:new_offset])
        type_num = ctrl_byte >> 5
        # Extended type
        if not type_num:
            (type_num, new_offset) = self._read_extended(new_offset)

        (size, new_offset) = self._size_from_ctrl_byte(
            ctrl_byte, new_offset, type_num)
        return self._type_decoder[type_num](self, size, new_offset)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def _size_from_ctrl_byte(self, ctrl_byte, offset, type_num):
        size = ctrl_byte & 0x1f
        if type_num == 1:
            return size, offset
        bytes_to_read = 0 if size < 29 else size - 28

        new_offset = offset + bytes_to_read
        size_bytes = self._buffer[offset:new_offset]

        # Using unpack rather than int_from_bytes as it is about 200 lookups
        # per second faster here.
        if size == 29:
            size = 29 + struct.unpack(b'!B', size_bytes)[0]
        elif size == 30:
            size = 285 + struct.unpack(b'!H', size_bytes)[0]
        elif size > 30:
            size = struct.unpack(
                b'!I', size_bytes.rjust(4, b'\x00'))[0] + 65821

        return size, new_offset
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def _read_node(self, node_number, index):
        base_offset = node_number * self._metadata.node_byte_size

        record_size = self._metadata.record_size
        if record_size == 24:
            offset = base_offset + index * 3
            node_bytes = b'\x00' + self._buffer[offset:offset + 3]
        elif record_size == 28:
            (middle,) = struct.unpack(
                b'!B', self._buffer[base_offset + 3:base_offset + 4])
            if index:
                middle &= 0x0F
                middle = (0xF0 & middle) >> 4
            offset = base_offset + index * 4
            node_bytes = byte_from_int(
                middle) + self._buffer[offset:offset + 3]
        elif record_size == 32:
            offset = base_offset + index * 4
            node_bytes = self._buffer[offset:offset + 4]
            raise InvalidDatabaseError(
                'Unknown record size: {0}'.format(record_size))
        return struct.unpack(b'!I', node_bytes)[0]
项目:CTF    作者:calee0219    | 项目源码 | 文件源码
def grayscale(self):
        """ Convert the image into a (24-bit) grayscale one, using the Y'UV method. """

        Wr = 0.299
        Wb = 0.114
        Wg = 0.587

        mod_bitmap = ""

        f = StringIO(self.bitmap_data)
        for row_num in xrange(0, self.height):
            for pix in xrange(0, self.width):
                pixel = struct.unpack("3B",
                out_pix = chr(int(Wr * pixel[2] + Wg * pixel[1] + Wb * pixel[0]))
                mod_bitmap += out_pix * 3

            mod_bitmap += chr(0x00) * self.padding_size
  , 1)

        self.bitmap_data = mod_bitmap

        return self
项目:pyOSC3    作者:Qirky    | 项目源码 | 文件源码
def _receiveMsg(self):
        """ Receive OSC message from a socket and decode.
        If an error occurs, None is returned, else the message.
        # get OSC packet size from stream which is prepended each transmission
        chunk = self._receive(4)
        if chunk == None:
            print("SERVER: Socket has been closed.")
            return None
        # extract message length from big endian unsigned long (32 bit) 
        slen = struct.unpack(">L", chunk)[0]
        # receive the actual message
        chunk = self._receive(slen)
        if chunk == None:
            print("SERVER: Socket has been closed.")
            return None
        # decode OSC data and dispatch
        msg = decodeOSC(chunk)
        if msg == None:
            raise OSCError("SERVER: Message decoding failed.")      
        return msg
项目:pyOSC3    作者:Qirky    | 项目源码 | 文件源码
def _receiveMsgWithTimeout(self):
        """ Receive OSC message from a socket and decode.
        If an error occurs, None is returned, else the message.
        # get OSC packet size from stream which is prepended each transmission
        chunk = self._receiveWithTimeout(4)
        if not chunk:
            return None
        # extract message length from big endian unsigned long (32 bit) 
        slen = struct.unpack(">L", chunk)[0]
        # receive the actual message
        chunk = self._receiveWithTimeout(slen)
        if not chunk:
            return None
        # decode OSC content
        msg = decodeOSC(chunk)
        if msg == None:
            raise OSCError("CLIENT: Message decoding failed.")
        return msg
项目:pyOSC3    作者:Qirky    | 项目源码 | 文件源码
def _receiveMsg(self):
        """ Receive OSC message from a socket and decode.
        If an error occurs, None is returned, else the message.
        # get OSC packet size from stream which is prepended each transmission
        chunk = self._receive(4)
        if chunk == None:
            print "SERVER: Socket has been closed."
            return None
        # extract message length from big endian unsigned long (32 bit) 
        slen = struct.unpack(">L", chunk)[0]
        # receive the actual message
        chunk = self._receive(slen)
        if chunk == None:
            print "SERVER: Socket has been closed."
            return None
        # decode OSC data and dispatch
        msg = decodeOSC(chunk)
        if msg == None:
            raise OSCError("SERVER: Message decoding failed.")      
        return msg
项目:pyOSC3    作者:Qirky    | 项目源码 | 文件源码
def _receiveMsgWithTimeout(self):
        """ Receive OSC message from a socket and decode.
        If an error occurs, None is returned, else the message.
        # get OSC packet size from stream which is prepended each transmission
        chunk = self._receiveWithTimeout(4)
        if not chunk:
            return None
        # extract message length from big endian unsigned long (32 bit) 
        slen = struct.unpack(">L", chunk)[0]
        # receive the actual message
        chunk = self._receiveWithTimeout(slen)
        if not chunk:
            return None
        # decode OSC content
        msg = decodeOSC(chunk)
        if msg == None:
            raise OSCError("CLIENT: Message decoding failed.")
        return msg
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def from_key_val_list(value):
    """Take an object and test to see if it can be represented as a
    dictionary. Unless it can not be represented as such, return an
    OrderedDict, e.g.,


        >>> from_key_val_list([('key', 'val')])
        OrderedDict([('key', 'val')])
        >>> from_key_val_list('string')
        ValueError: need more than 1 value to unpack
        >>> from_key_val_list({'key': 'val'})
        OrderedDict([('key', 'val')])

    :rtype: OrderedDict
    if value is None:
        return None

    if isinstance(value, (str, bytes, bool, int)):
        raise ValueError('cannot encode objects that are not 2-tuples')

    return OrderedDict(value)
项目:cbapi-examples    作者:cbcommunity    | 项目源码 | 文件源码
def addressInNetwork(self, ip, cidr):

        # the ip can be the emtpy string ('') in cases where the connection
        # is made via a web proxy.  in these cases the sensor cannot report
        # the true remote IP as DNS resolution happens on the web proxy (and
        # not the endpoint)
        if '' == ip:
            return False

            net = cidr.split('/')[0]
            bits = cidr.split('/')[1]

            if int(ip) > 0: 
                ipaddr = struct.unpack('<L', socket.inet_aton(ip))[0]
                ipaddr = struct.unpack('<L', socket.inet_aton(".".join(map(lambda n: str(int(ip)>>n & 0xFF), [24,16,8,0]))))[0]
            netaddr = struct.unpack('<L', socket.inet_aton(net))[0]
            netmask = ((1L << int(bits)) - 1)

            return ipaddr & netmask == netaddr & netmask

            return False
项目:SceneDensity    作者:ImOmid    | 项目源码 | 文件源码
def file_scanlines(self, infile):
        Generates boxed rows in flat pixel format, from the input file
        `infile`.  It assumes that the input file is in a "Netpbm-like"
        binary format, and is positioned at the beginning of the first
        pixel.  The number of pixels to read is taken from the image
        dimensions (`width`, `height`, `planes`) and the number of bytes
        per value is implied by the image `bitdepth`.

        # Values per row
        vpr = self.width * self.planes
        row_bytes = vpr
        if self.bitdepth > 8:
            assert self.bitdepth == 16
            row_bytes *= 2
            fmt = '>%dH' % vpr
            def line():
                return array('H', struct.unpack(fmt,
            def line():
                scanline = array('B',
                return scanline
        for y in range(self.height):
            yield line()
项目:SceneDensity    作者:ImOmid    | 项目源码 | 文件源码
def chunklentype(self):
        """Reads just enough of the input to determine the next
        chunk's length and type, returned as a (*length*, *type*) pair
        where *type* is a string.  If there are no more chunks, ``None``
        is returned.

        x =
        if not x:
            return None
        if len(x) != 8:
            raise FormatError(
              'End of file whilst reading chunk length and type.')
        length,type = struct.unpack('!I4s', x)
        if length > 2**31-1:
            raise FormatError('Chunk %s is too large: %d.' % (type,length))
        return length,type
项目:SceneDensity    作者:ImOmid    | 项目源码 | 文件源码
def _process_tRNS(self, data):
        self.trns = data
        if self.colormap:
            if not self.plte:
                warnings.warn("PLTE chunk is required before tRNS chunk.")
                if len(data) > len(self.plte)/3:
                    # Was warning, but promoted to Error as it
                    # would otherwise cause pain later on.
                    raise FormatError("tRNS chunk is too long.")
            if self.alpha:
                raise FormatError(
                  "tRNS chunk is not valid with colour type %d." %
                self.transparent = \
                    struct.unpack("!%dH" % self.color_planes, data)
            except struct.error:
                raise FormatError("tRNS chunk has incorrect length.")
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def parse_policy(raw_data):
    """Parse policy data."""
    policy = {}
    raw_int = _raw_to_int(raw_data)

    policy['domain_id'] = DOMAINS_REV[raw_int[3] & 0x0F]
    policy['enabled'] = bool(raw_int[3] & 0x10)
    policy['per_domain_enabled'] = bool(raw_int[3] & 0x20)
    policy['global_enabled'] = bool(raw_int[3] & 0x40)
    policy['created_by_nm'] = not bool(raw_int[3] & 0x80)
    policy['policy_trigger'] = TRIGGERS_REV[raw_int[4] & 0x0F]
    policy['power_policy'] = bool(raw_int[4] & 0x10)
    power_correction = CPU_CORRECTION_REV[raw_int[4] & 0x60]
    policy['cpu_power_correction'] = power_correction
    policy['storage'] = STORAGE_REV[raw_int[4] & 0x80]
    policy['action'] = ACTIONS_REV[raw_int[5] & 0x01]
    policy['power_domain'] = POWER_DOMAIN_REV[raw_int[5] & 0x80]
    policy_values = struct.unpack('<HIHH', bytearray(raw_int[6:]))
    policy_names = ('target_limit', 'correction_time', 'trigger_limit',
    _add_to_dict(policy, policy_values, policy_names)

    return policy
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def parse_capabilities(raw_data):
    """Parse capabilities data."""
    capabilities = {}
    raw_int = _raw_to_int(raw_data)

    capabilities['max_policies'] = raw_int[3]
    capabilities_values = struct.unpack('<HHIIHH', bytearray(
    capabilities_names = ('max_limit_value', 'min_limit_value',
                          'min_correction_time', 'max_correction_time',
                          'min_reporting_period', 'max_reporting_period')
    _add_to_dict(capabilities, capabilities_values, capabilities_names)
    capabilities['domain_id'] = DOMAINS_REV[raw_int[20] & 0x0F]
    power_domain = POWER_DOMAIN_REV[raw_int[20] & 0x80]
    capabilities['power_domain'] = power_domain

    return capabilities
项目:estreamer    作者:spohara79    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        if args:

            self.type = struct.unpack('>I', args[0][:4])[0]
            # check if it's  an event data record that uses archival timestamps and if we've set archival
            if self.type in ARCHIVAL_RCD_TYPES and config.test_bit(Struct.get_flags(), 23) and 'reserved' not in self._field_names_:
                self._fields_.extend([('timestamp', 'uint32', 0), ('reserved', 'uint32', 0)])
                self._field_names_.extend(['timestamp', 'reserved'])
                self._field_format_.update({'timestamp': 'I', 'reserved': 'I'})
                # The field values do not reset after being extended for some reason (metaclass). Without this, all events parsed after the first ARCHIVAL_RCD gets parsed as if it has the 'reserved' and 'timestamp field'
                #map(self._fields_.remove, [f for f in self._fields_ if f[0] in ['timestamp', 'reserved']])
                #map(self._field_names_.remove, [f for f in self._field_names_ if f[0] in ['timestamp', 'reserved']])
                self._fields_ = [f for f in self._fields_ if f[0] not in ['timestamp', 'reserved']]
                self._field_names_ = [f for f in self._field_names_ if f[0] not in ['timestamp', 'reserved']]
                for k in ['timestamp', 'reserved']:

            super(EventData, self).__init__(*args, **kwargs)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def from_key_val_list(value):
    """Take an object and test to see if it can be represented as a
    dictionary. Unless it can not be represented as such, return an
    OrderedDict, e.g.,


        >>> from_key_val_list([('key', 'val')])
        OrderedDict([('key', 'val')])
        >>> from_key_val_list('string')
        ValueError: need more than 1 value to unpack
        >>> from_key_val_list({'key': 'val'})
        OrderedDict([('key', 'val')])

    :rtype: OrderedDict
    if value is None:
        return None

    if isinstance(value, (str, bytes, bool, int)):
        raise ValueError('cannot encode objects that are not 2-tuples')

    return OrderedDict(value)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def from_key_val_list(value):
    """Take an object and test to see if it can be represented as a
    dictionary. Unless it can not be represented as such, return an
    OrderedDict, e.g.,


        >>> from_key_val_list([('key', 'val')])
        OrderedDict([('key', 'val')])
        >>> from_key_val_list('string')
        ValueError: need more than 1 value to unpack
        >>> from_key_val_list({'key': 'val'})
        OrderedDict([('key', 'val')])

    :rtype: OrderedDict
    if value is None:
        return None

    if isinstance(value, (str, bytes, bool, int)):
        raise ValueError('cannot encode objects that are not 2-tuples')

    return OrderedDict(value)
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def __parse_header(self):
        self.header_length, = struct.unpack('<I', await
        self.header_chunk_count, = struct.unpack('<I', await

        self.header_chunks = dict()
        self.header = dict()

        # Save header data from binary.
        for nr in range(self.header_chunk_count):
            chunk_id, = struct.unpack('<I', await
            chunk_size, = struct.unpack('<I', await
            self.header_chunks[chunk_id] = chunk_size & ~0x80000000

        # Parse all header chunks.
        for chunk_id, chunk_size in self.header_chunks.items():
            self.header.update(await self.__parse_chunk(chunk_id, chunk_size))

        return self.header
项目:rain    作者:scizzorz    | 项目源码 | 文件源码
def to_rain(cls, val):
    if val is None:
      return, 0, 0, cls.null)
    elif val is False:
      return, 0, 0, cls.null)
    elif val is True:
      return, 0, 1, cls.null)
    elif isinstance(val, int):
      return, 0, val, cls.null)
    elif isinstance(val, float):
      raw = struct.pack('d', val)
      intrep = struct.unpack('Q', raw)[0]
      return, 0, intrep, cls.null)
    elif isinstance(val, str):
      str_p = ct.create_string_buffer(val.encode('utf-8'))
      return, len(val), ct.cast(str_p, ct.c_void_p).value, cls.null)

    raise Exception("Can't convert value {!r} to Rain".format(val))
项目:tockloader    作者:helena-project    | 项目源码 | 文件源码
def _check_crc (self, address, binary):
        Compares the CRC of the local binary to the one calculated by the
        # Check the CRC
        crc_data = self._get_crc_internal_flash(address, len(binary))

        # Now interpret the returned bytes as the CRC
        crc_bootloader = struct.unpack('<I', crc_data[0:4])[0]

        # Calculate the CRC locally
        crc_function = crcmod.mkCrcFun(0x104c11db7, initCrc=0, xorOut=0xFFFFFFFF)
        crc_loader = crc_function(binary, 0)

        if crc_bootloader != crc_loader:
            raise TockLoaderException('Error: CRC check failed. Expected: 0x{:04x}, Got: 0x{:04x}'.format(crc_loader, crc_bootloader))
            print('CRC check passed. Binaries successfully loaded.')
项目:tockloader    作者:helena-project    | 项目源码 | 文件源码
def _checksum (self, buffer):
        Calculate the TBF header checksum.
        # Add 0s to the end to make sure that we are multiple of 4.
        padding = len(buffer) % 4
        if padding != 0:
            padding = 4 - padding
            buffer += bytes([0]*padding)

        # Loop throw
        checksum = 0
        for i in range(0, len(buffer), 4):
            checksum ^= struct.unpack('<I', buffer[i:i+4])[0]

        return checksum
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, file, align=True, bigendian=True, inclheader=False):
        import struct
        self.closed = False
        self.align = align      # whether to align to word (2-byte) boundaries
        if bigendian:
            strflag = '>'
            strflag = '<'
        self.file = file
        self.chunkname =
        if len(self.chunkname) < 4:
            raise EOFError
            self.chunksize = struct.unpack(strflag+'L',[0]
        except struct.error:
            raise EOFError
        if inclheader:
            self.chunksize = self.chunksize - 8 # subtract header
        self.size_read = 0
            self.offset = self.file.tell()
        except (AttributeError, IOError):
            self.seekable = False
            self.seekable = True
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def py_suffix_importer(filename, finfo, fqname):
    file = filename[:-3] + _suffix
    t_py = long(finfo[8])
    t_pyc = _timestamp(file)

    code = None
    if t_pyc is not None and t_pyc >= t_py:
        f = open(file, 'rb')
        if == imp.get_magic():
            t = struct.unpack('<I',[0]
            if t == t_py:
                code = marshal.load(f)
    if code is None:
        file = filename
        code = _compile(file, t_py)

    return 0, code, { '__file__' : file }
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def scan_opcodes(self, co,
                     unpack = struct.unpack):
        # Scan the code, and yield 'interesting' opcode combinations
        # Version for Python 2.4 and older
        code = co.co_code
        names = co.co_names
        consts = co.co_consts
        while code:
            c = code[0]
            if c in STORE_OPS:
                oparg, = unpack('<H', code[1:3])
                yield "store", (names[oparg],)
                code = code[3:]
            if c == LOAD_CONST and code[3] == IMPORT_NAME:
                oparg_1, oparg_2 = unpack('<xHxH', code[:6])
                yield "import", (consts[oparg_1], names[oparg_2])
                code = code[6:]
            if c >= HAVE_ARGUMENT:
                code = code[3:]
                code = code[1:]
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def addressInNetwork(self, ip, cidr):

        # the ip can be the emtpy string ('') in cases where the connection
        # is made via a web proxy.  in these cases the sensor cannot report
        # the true remote IP as DNS resolution happens on the web proxy (and
        # not the endpoint)
        if '' == ip:
            return False

            net = cidr.split('/')[0]
            bits = cidr.split('/')[1]

            if int(ip) > 0: 
                ipaddr = struct.unpack('<L', socket.inet_aton(ip))[0]
                ipaddr = struct.unpack('<L', socket.inet_aton(".".join(map(lambda n: str(int(ip)>>n & 0xFF), [24,16,8,0]))))[0]
            netaddr = struct.unpack('<L', socket.inet_aton(net))[0]
            netmask = ((1L << int(bits)) - 1)

            return ipaddr & netmask == netaddr & netmask

            return False
项目:rpi-can-logger    作者:JonnoFTW    | 项目源码 | 文件源码
def _fms(msg, gain, offset, sbyte, ebyte=None):
    Extract a number from a series of bytes, a number 1,2,4 bytes long
    :param msg: the series of bytes
    :param gain: the  value to multiply the value by
    :param offset: the offset of the value
    :param sbyte:  the start byte of the value (indexed from 1)
    :param ebyte:  the end byte of the value (indexed from 1), if None, assume the value is 1 byte long
    if ebyte is None:
        return msg[sbyte - 1] * gain + offset
    if ebyte is None:
        ebyte = sbyte + 1
    sbyte -= 1
    # interpret bytes as n-byte unsigned int,
    # multiply by gain + offset
    byte_len = ebyte - sbyte
    fmt = {
        1: '>B',
        2: '<H',
        4: '<I'

    return struct.unpack(fmt[byte_len], msg[sbyte: ebyte])[0] * gain + offset
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelmsmtrptlci_(s,sid):
    """ :returns: parsed LCI optional subelement """
    ret = s
    if sid == std.EID_MSMT_RPT_LCI_AZIMUTH:
        ret = {
    elif sid == std.EID_MSMT_RPT_LCI_ORIGIN:
        ret = {'originator':_hwaddr_(struct.unpack('=6B',s))}
    elif sid == std.EID_MSMT_RPT_LCI_TARGET:
        ret = {'target':_hwaddr_(struct.unpack('=6B',s))}
    elif sid == std.EID_MSMT_RPT_LCI_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# MSMT Report->Azimuth Report fields Std Fig. 8-164
项目:pocket-cli    作者:rakanalh    | 项目源码 | 文件源码
def get_terminal_size():
        def ioctl_GWINSZ(fd):
                import fcntl
                import termios
                import struct
                cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
                return None
            return cr
        cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
        if not cr:
                fd =, os.O_RDONLY)
                cr = ioctl_GWINSZ(fd)
        if not cr:
                cr = (os.env['LINES'], os.env['COLUMNS'])
                cr = (25, 80)
        return int(cr[1]), int(cr[0])
项目:googletranslate.popclipext    作者:wizyoung    | 项目源码 | 文件源码
def from_key_val_list(value):
    """Take an object and test to see if it can be represented as a
    dictionary. Unless it can not be represented as such, return an
    OrderedDict, e.g.,


        >>> from_key_val_list([('key', 'val')])
        OrderedDict([('key', 'val')])
        >>> from_key_val_list('string')
        ValueError: need more than 1 value to unpack
        >>> from_key_val_list({'key': 'val'})
        OrderedDict([('key', 'val')])

    :rtype: OrderedDict
    if value is None:
        return None

    if isinstance(value, (str, bytes, bool, int)):
        raise ValueError('cannot encode objects that are not 2-tuples')

    return OrderedDict(value)
项目:PyJFuzz    作者:mseclab    | 项目源码 | 文件源码
def handle(self, sock):
        Handle the actual TCP connection
            size = struct.unpack("<I", sock.recv(4))[0]
            data = ""
            while len(data) < size:
                data += sock.recv(size - len(data))
            if len(self.testcase) >= 100:
                del self.testcase
                self.testcase = list()
        except socket.error as e:
            raise PJFSocketError(e.message if hasattr(e, "message") else str(e))
        except Exception as e:
            raise  PJFBaseException(e.message)
项目:python-rst2ansi    作者:Snaipe    | 项目源码 | 文件源码
def _get_terminal_size(fd):
        handle = windll.kernel32.GetStdHandle(_handle_ids[fd])
        if handle == 0:
            raise OSError('handle cannot be retrieved')
        if handle == -1:
            raise WinError()
        csbi = create_string_buffer(22)
        res = windll.kernel32.GetConsoleScreenBufferInfo(handle, csbi)
        if res:
            res = struct.unpack("hhhhHhhhhhh", csbi.raw)
            left, top, right, bottom = res[5:9]
            columns = right - left + 1
            lines = bottom - top + 1
            return terminal_size(columns, lines)
            raise WinError()
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def _async_recv_msg(self):
        """Internal use only; use 'recv_msg' with 'yield' instead.

        Message is tagged with length of the payload (data). This method
        receives length of payload, then the payload and returns the payload.
        n = AsyncSocket._MsgLengthSize
            data = yield self.recvall(n)
        except socket.error as err:
            if err.args[0] == 'hangup':
                # raise socket.error(errno.EPIPE, 'Insufficient data')
                raise StopIteration('')
        if len(data) != n:
            # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n))
            raise StopIteration('')
        n = struct.unpack('>L', data)[0]
        # assert n >= 0
        if n:
                data = yield self.recvall(n)
            except socket.error as err:
                if err.args[0] == 'hangup':
                    # raise socket.error(errno.EPIPE, 'Insufficient data')
                    raise StopIteration('')
            if len(data) != n:
                # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n))
                raise StopIteration('')
            raise StopIteration(data)
            raise StopIteration('')
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def _sync_recv_msg(self):
        """Internal use only; use 'recv_msg' instead.

        Synchronous version of async_recv_msg.
        n = AsyncSocket._MsgLengthSize
            data = self._sync_recvall(n)
        except socket.error as err:
            if err.args[0] == 'hangup':
                # raise socket.error(errno.EPIPE, 'Insufficient data')
                return ''
        if len(data) != n:
            # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n))
            return ''
        n = struct.unpack('>L', data)[0]
        # assert n >= 0
        if n:
                data = self._sync_recvall(n)
            except socket.error as err:
                if err.args[0] == 'hangup':
                    # raise socket.error(errno.EPIPE, 'Insufficient data')
                    return ''
            if len(data) != n:
                # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n))
                return ''
            return data
            return ''
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def _async_recv_msg(self):
        """Internal use only; use 'recv_msg' with 'yield' instead.

        Message is tagged with length of the payload (data). This method
        receives length of payload, then the payload and returns the payload.
        n = AsyncSocket._MsgLengthSize
            data = yield self.recvall(n)
        except socket.error as err:
            if err.args[0] == 'hangup':
                # raise socket.error(errno.EPIPE, 'Insufficient data')
                raise StopIteration(b'')
        if len(data) != n:
            # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n))
            raise StopIteration(b'')
        n = struct.unpack('>L', data)[0]
        # assert n >= 0
        if n:
                data = yield self.recvall(n)
            except socket.error as err:
                if err.args[0] == 'hangup':
                    # raise socket.error(errno.EPIPE, 'Insufficient data')
                    raise StopIteration(b'')
            if len(data) != n:
                # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n))
                raise StopIteration(b'')
            raise StopIteration(data)
            raise StopIteration(b'')
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def _sync_recv_msg(self):
        """Internal use only; use 'recv_msg' instead.

        Synchronous version of async_recv_msg.
        n = AsyncSocket._MsgLengthSize
            data = self._sync_recvall(n)
        except socket.error as err:
            if err.args[0] == 'hangup':
                # raise socket.error(errno.EPIPE, 'Insufficient data')
                return b''
        if len(data) != n:
            # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n))
            return b''
        n = struct.unpack('>L', data)[0]
        # assert n >= 0
        if n:
                data = self._sync_recvall(n)
            except socket.error as err:
                if err.args[0] == 'hangup':
                    # raise socket.error(errno.EPIPE, 'Insufficient data')
                    return b''
            if len(data) != n:
                # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n))
                return b''
            return data
            return b''
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def get_user_name(user_key):
    samaddr =
    V = None
    for v in values(user_key):
        if v.Name == 'V':
            V =, v.DataLength.value)
    if not V: return None

    name_offset = unpack("<L", V[0x0c:0x10])[0] + 0xCC
    name_length = unpack("<L", V[0x10:0x14])[0]

    username = V[name_offset:name_offset+name_length].decode('utf-16-le')
    return username
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def __init__(self, name, address, space):
        super(Primitive,self).__init__(name, address, space)
        length, fmt = builtin_types[name]
        data =,length)
        if not data: self.value = None
        else: self.value = unpack(fmt,data)[0]
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def read_long(self, addr):
        string =, 4)
        (longval, ) =  struct.unpack('L', string)
        return longval
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def read_long_phys(self, addr):
        string =, 4)
        (longval, ) =  struct.unpack('L', string)
        return longval
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def _decode_packed_type(type_code, type_size, pad=False):
        # pylint: disable=protected-access, missing-docstring
        def unpack_type(self, size, offset):
            if not pad:
                self._verify_size(size, type_size)
            new_offset = offset + type_size
            packed_bytes = self._buffer[offset:new_offset]
            if pad:
                packed_bytes = packed_bytes.rjust(type_size, b'\x00')
            (value,) = struct.unpack(type_code, packed_bytes)
            return value, new_offset
        return unpack_type
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def _read_extended(self, offset):
        (next_byte,) = struct.unpack(b'!B', self._buffer[offset:offset + 1])
        type_num = next_byte + 7
        if type_num < 7:
            raise InvalidDatabaseError(
                'Something went horribly wrong in the decoder. An '
                'extended type resolved to a type number < 8 '
        return type_num, offset + 1
项目:CTF    作者:calee0219    | 项目源码 | 文件源码
def rgb_split(self):
        """Splits one BMP object into three; one with only the red channel, one with the green, and one with the blue.

        Returns a tuple (R, G, B) of BMP instances."""

        if self.empty == True:
            die("Attempted to call rgb_split() on an empty BMP object!")

        f = StringIO(self.bitmap_data)
        red_data = self.all_headers
        green_data = self.all_headers
        blue_data = self.all_headers

        for row_num in xrange(0, self.height):
            for pix in xrange(0, self.width):
                pixel = struct.unpack("3B",
                red_data += chr(0x00) + chr(0x00) + chr(pixel[2])
                green_data += chr(0x00) + chr(pixel[1]) + chr(0x00)
                blue_data += chr(pixel[0]) + chr(0x00) + chr(0x00)

            red_data += chr(0x00) * self.padding_size
            blue_data += chr(0x00) * self.padding_size
            green_data += chr(0x00) * self.padding_size
  , 1)

        # I'm not fond of the constructor hack... but it was the easiest way.
        return (BMP(red_data, True), BMP(green_data, True), BMP(blue_data, True))
项目:CTF    作者:calee0219    | 项目源码 | 文件源码
def rgb_merge(self, r, g, b):
        """ (Re)combine the red, green and blue color channels from three separate pictures of identical size into one. """
        if self.empty != True:
             warn("Running rgb_merge() on a non-empty BMP; the existing data will be overwritten!")

        # Ugh...
        if len(set((r.width, g.width, b.width))) != 1 or len(set((r.height, g.height, b.height))) != 1 or len(set((r.bpp, g.bpp, b.bpp))) != 1:
            die("The dimensions and/or bpp differs between the input images to rgb_merge()!")

        rf = StringIO(r.bitmap_data)
        gf = StringIO(g.bitmap_data)
        bf = StringIO(b.bitmap_data)

        out_bitmap_data = ""

        for row_num in xrange(0, b.height):
            for pix in xrange(0, b.width):
                red_pixel = struct.unpack("3B",[2]
                green_pixel = struct.unpack("3B",[1]
                blue_pixel = struct.unpack("3B",[0]

                out_bitmap_data += "".join( (chr(blue_pixel), chr(green_pixel), chr(red_pixel)) )

            out_bitmap_data += chr(0x00) * r.padding_size

  , 1)
  , 1)
  , 1)

        return BMP(r.all_headers + out_bitmap_data, True)
项目:pnet    作者:vodik    | 项目源码 | 文件源码
def checksum(*data):
    data = b''.join(data)
    if len(data) % 2:
        data += b'\x00'

    csum = sum(struct.unpack('!H', data[x:x+2])[0]
               for x in range(0, len(data), 2))

    csum = (csum >> 16) + (csum & 0xffff)
    csum += csum >> 16
    return ~csum & 0xffff