Python struct 模块,pack() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用struct.pack()

项目:shellgen    作者:MarioVilas    | 项目源码 | 文件源码
def encode_8(bytes, key, terminator):
        """
        Encode the bytecode with the given 8-bit XOR key.

        :type  bytes: str
        :param bytes: Bytecode to encode.

        :type  key: str
        :param key: 8-bit XOR key.

        :type  terminator: str
        :param terminator: 8-bit terminator.

        :rtype:  str
        :return: Encoded bytecode.
        """
        if not bytes.endswith(terminator):
            bytes += terminator
        fmt = "B" * len(bytes)
        unpack = struct.unpack
        pad = unpack("B", key) * len(bytes)
        bytes = unpack(fmt, bytes)
        bytes = [ bytes[i] ^ pad[i] for i in xrange(len(bytes)) ]
        return struct.pack(fmt, *bytes)
项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def process_call(self, addr, cmd, val):
        """Perform a smbus process call by writing a word (2 byte) value to
        the specified register of the device, and then reading a word of response
        data (which is returned).
        """
        assert self._device is not None, 'Bus must be opened before operations are made against it!'
        # Build ctypes values to marshall between ioctl and Python.
        data = create_string_buffer(struct.pack('=BH', cmd, val))
        result = c_uint16()
        # Build ioctl request.
        request = make_i2c_rdwr_data([
            (addr, 0, 3, cast(pointer(data), POINTER(c_uint8))),          # Write data.
            (addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8)))  # Read word (2 bytes).
        ])
        # Make ioctl call and return result data.
        ioctl(self._device.fileno(), I2C_RDWR, request)
        # Note the python-smbus code appears to have a rather serious bug and
        # does not return the result value!  This is fixed below by returning it.
        return result.value
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def get_iphostname():
    '''??linux?????????IP??'''
    def get_ip(ifname):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
        ipaddr = socket.inet_ntoa(fcntl.ioctl(
                            sock.fileno(), 
                            0x8915,  # SIOCGIFADDR 
                            struct.pack('256s', ifname[:15]) 
                            )[20:24]
                            )   
        sock.close()
        return ipaddr
    try:
        ip = get_ip('eth0')
    except IOError:
        ip = get_ip('eno1')
    hostname = socket.gethostname()
    return {'hostname': hostname, 'ip':ip}
项目:pbtk    作者:marin-m    | 项目源码 | 文件源码
def parse_default(field, ftype, fdefault):
    if not (ftype == 'bool' and fdefault == 'true'):
        try:
            fdefault = literal_eval(fdefault.rstrip('LDF'))
        except (ValueError, SyntaxError):
            fdefault = None

    if type(fdefault) is int:
        if ftype[0] != 'u' and ftype[:5] != 'fixed':
            if fdefault >> 63:
                fdefault = c_long(fdefault).value
            elif fdefault >> 31 and ftype[-2:] != '64':
                fdefault = c_int(fdefault).value
        else:
            fdefault &= (1 << int(ftype[-2:])) - 1

        if ftype == 'float' and abs(fdefault) >> 23:
            fdefault = unpack('=f', pack('=i', fdefault))[0]
        elif ftype == 'double' and abs(fdefault) >> 52:
            fdefault = unpack('=d', pack('=q', fdefault))[0]

    if fdefault:
        field.default_value = str(fdefault)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _write_header(self, initlength):
        assert not self._headerwritten
        self._file.write('RIFF')
        if not self._nframes:
            self._nframes = initlength / (self._nchannels * self._sampwidth)
        self._datalength = self._nframes * self._nchannels * self._sampwidth
        self._form_length_pos = self._file.tell()
        self._file.write(struct.pack('<l4s4slhhllhh4s',
            36 + self._datalength, 'WAVE', 'fmt ', 16,
            WAVE_FORMAT_PCM, self._nchannels, self._framerate,
            self._nchannels * self._framerate * self._sampwidth,
            self._nchannels * self._sampwidth,
            self._sampwidth * 8, 'data'))
        self._data_length_pos = self._file.tell()
        self._file.write(struct.pack('<l', self._datalength))
        self._headerwritten = True
项目:PyJFuzz    作者:mseclab    | 项目源码 | 文件源码
def send_testcase(json, ip, port):
        """
        Send a raw testcase
        """
        try:
            json = struct.pack("<I", len(json)) + json
            try:
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                s.connect((ip, int(port)))
                s.send(json)
                s.shutdown(socket.SHUT_RDWR)
                s.close()
                return True
            except socket.error:
                return False
        except socket.error as e:
            raise PJFSocketError(e.message if hasattr(e, "message") else str(e))
        except Exception as e:
            raise  PJFBaseException(e.message)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def v4_int_to_packed(address):
    """The binary representation of this address.

    Args:
        address: An integer representation of an IPv4 IP address.

    Returns:
        The binary representation of this address.

    Raises:
        ValueError: If the integer is too large to be an IPv4 IP
          address.
    """
    if address > _BaseV4._ALL_ONES:
        raise ValueError('Address too large for IPv4')
    return Bytes(struct.pack('!I', address))
项目:CTF    作者:calee0219    | 项目源码 | 文件源码
def _create_header(cls, width, height):
        """ Internal function used to create headers when changing them is necessary, e.g. when image dimensions change.
            This function creates both a BMP header and a V3 DIB header, with some values left as defaults (e.g. pixels/meter) """

        total_header_size = cls.bmp_header_len + 40 # V3 len = 40 bytes
        padding_size = width & 3 # Magic stuff
        bitmap_size = ((width * 3) + padding_size) * height
        file_size = total_header_size + bitmap_size

        # BMP header: Magic (2 bytes), file size, 2 ignored values, bitmap offset
        header = struct.pack('<2s I 2H I', "BM", file_size, 0, 0, total_header_size)

        # DIB V3 header: header size, px width, px height, num of color planes, bpp, compression method,
        # bitmap data size, horizontal resolution, vertical resolution, number of colors in palette, number of important colors used
        # Few of these matter, so there are a bunch of default/"magic" numbers here...
        header += struct.pack('I 2i H H I I 2i 2I', 40, width, height, 1, 24, 0, bitmap_size, 0x0B13, 0x0B13, 0, 0)

        return header
项目:shellgen    作者:MarioVilas    | 项目源码 | 文件源码
def _load_bytecode_from_dump(input):

    # Read the data.
    data = input.read()

    # If it's an hexadecimal dump, decode and return it.
    if _re_is_hexa.match(data):
        hexstr  = ''.join(_re_get_hexa.findall(data))
        hexdump = [ int(hexstr[i:i+2], 16) for i in xrange(0, len(hexstr), 2) ]
        return struct.pack('B' * len(hexdump), *hexdump)

    # If it's base64 encoded data, decode and return it.
    if _re_is_b64.match(data):
        return data.decode('base64')

    # Assume it's a raw binary dump and return it unchanged.
    return data

#-----------------------------------------------------------------------------#
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def get_iphostname():
    '''??linux?????????IP??'''
    def get_ip(ifname):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
        ipaddr = socket.inet_ntoa(fcntl.ioctl(
                            sock.fileno(), 
                            0x8915,  # SIOCGIFADDR 
                            struct.pack('256s', ifname[:15]) 
                            )[20:24]
                            )   
        sock.close()
        return ipaddr
    try:
        ip = get_ip('eth0')
    except IOError:
        ip = get_ip('eno1')
    hostname = socket.gethostname()
    return {'hostname': hostname, 'ip':ip}
项目:pyOSC3    作者:Qirky    | 项目源码 | 文件源码
def OSCTimeTag(time):
    """Convert a time in floating seconds to its
    OSC binary representation
    """
    if time > 0:
        fract, secs = math.modf(time)
        secs = secs - NTP_epoch
        binary = struct.pack('>LL', int(secs), int(fract * NTP_units_per_second))
    else:
        binary = struct.pack('>LL', 0, 1)

    return binary

######
#
# OSCMessage decoding functions
#
######
项目:pyOSC3    作者:Qirky    | 项目源码 | 文件源码
def OSCTimeTag(time):
    """Convert a time in floating seconds to its
    OSC binary representation
    """
    if time > 0:
        fract, secs = math.modf(time)
        secs = secs - NTP_epoch
        binary = struct.pack('>LL', long(secs), long(fract * NTP_units_per_second))
    else:
        binary = struct.pack('>LL', 0L, 1L)

    return binary

######
#
# OSCMessage decoding functions
#
######
项目:ekko    作者:openstack    | 项目源码 | 文件源码
def build_header(self):
        timestamp = utctimestamp()
        padding = str.encode('\0\0' * 14)

        data = pack(
            '<i2IQH14s',
            timestamp,
            self.metadata['incremental'],
            self.metadata['segment_size'],
            self.metadata['sectors'],
            len(self.metadata['bases']),
            padding
        )

        checksum = crc32(data)

        for i in self.metadata['bases']:
            data += i
            checksum = crc32(i, checksum)

        return data, checksum
项目:ekko    作者:openstack    | 项目源码 | 文件源码
def write_body(self, f):
        checksum = 0

        for segment, meta in self.segments.items():
            data = pack(
                '<2IH2B20s',
                segment,
                meta['incremental'],
                meta['base'],
                meta['encryption'],
                meta['compression'],
                meta['sha1_hash']
            )

            f.write(data)
            checksum = crc32(data, checksum)

        """ Backfill the body_checksum """
        f.seek(24, 0)
        f.write(pack('<I', checksum))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def close(self):
        """Close the _Stream object. No operation should be
           done on it afterwards.
        """
        if self.closed:
            return

        self.closed = True
        try:
            if self.mode == "w" and self.comptype != "tar":
                self.buf += self.cmp.flush()

            if self.mode == "w" and self.buf:
                self.fileobj.write(self.buf)
                self.buf = b""
                if self.comptype == "gz":
                    self.fileobj.write(struct.pack("<L", self.crc))
                    self.fileobj.write(struct.pack("<L", self.pos & 0xffffFFFF))
        finally:
            if not self._extfileobj:
                self.fileobj.close()
项目:estreamer    作者:spohara79    | 项目源码 | 文件源码
def __pack__(self):
        fmt = self.endian
        value_list = []
        for field in self._field_names_:
            fmt_ = self._field_format_[field]
            val = getattr(self, field)
            if isinstance(fmt_, StructArray):
                value_list.extend([
                    getattr(struct_, field)
                    for struct_ in fmt_.structure_list
                    for field in struct_._field_names_
                ])
            elif isinstance(fmt_, basestring) and (fmt_.startswith('BBB') or fmt_.startswith('bbb')):
                value_list.extend([(val >> i & 0xFF) for i in [x for x in range(0, len(fmt_) * 8, 8)]])
            else:    
                try:
                    value_list.append(val.encode('ascii', 'ignore'))
                except AttributeError:
                    value_list.append(val)
            fmt += str(len(val)) + 's' if fmt_ == 'variable' else fmt_.get_struct() if isinstance(fmt_, StructArray) else fmt_
        try:
            return struct.pack(fmt, *value_list)
        except struct.error as exc:
            raise_from(PackError("Unable to pack structure"), exc)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_ttag_values_packet(self):
        pkt = sdds_pkt.sdds_packet()

        ttag_=0
        ttage_=0
        sddstime=Time()
        sddstime.set(ttag_,ttage_)
        res=struct.pack("!QI", ttag_, ttage_)
        pkt.set_time( ttag_, ttage_)
        self.assertEqual( pkt.header.ttag.tstamp.asString(), res )
        self.assertEqual( pkt.get_SDDSTime(), sddstime )
        pkt.set_SDDSTime( sddstime )
        self.assertEqual( pkt.get_SDDSTime(), sddstime )

        ttag_= 4294967296
        ttage_= 8388608
        sddstime=Time()
        sddstime.set(ttag_,ttage_)
        res=struct.pack("!QI", ttag_, ttage_)
        res=struct.pack("!QI", ttag_, ttage_)
        pkt.set_time( ttag_, ttage_)
        self.assertEqual( pkt.header.ttag.tstamp.asString(), res )
        self.assertEqual( pkt.get_SDDSTime(), sddstime )
        pkt.set_SDDSTime( sddstime )
        self.assertEqual( pkt.get_SDDSTime(), sddstime )
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_ttag_info(self):
        msptr_=0
        msdelta_=0
        ttag_=0
        ttage_=0
        res=struct.pack("!HHQI", msptr_, msdelta_, ttag_, ttage_)
        ttag_val = ttag_info()
        self.assertEqual( ttag_val.asString(), res )

        # test big endian format for number
        ttag_= 4294967296
        ttage_= 8388608
        msptr_=256
        msdelta_=256
        res=struct.pack("!HHQI", msptr_, msdelta_, ttag_, ttage_)
        ttag_val.info.msptr.msptr=msptr_
        ttag_val.info.msptr.msdelta=msdelta_
        ttag_val.tstamp.ttag=ttag_
        ttag_val.tstamp.ttage=ttage_
        self.assertEqual( ttag_val.asString(), res )
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def pack_ext_header(hdr, structured=0):
    """
    Packs the value of the given BLUE file hdr dictionary's
    'ext_header' key into the BLUE file extended header format and
    updates the value of 'ext_size'.  The value of 'ext_header' can be
    a list of (key, value) tuples or a dict.  Before writing this out
    to disk at the end of a BLUE file it must be padded out to a
    multiple of 512 bytes.  If the keywords given are already a string
    it is presumed they are already packed and the 'ext_size' field is
    updated but the string itself is left alone.

    If <structured> is true, any embedded Python dictionaries, lists
    or tuples will pack their structure into the keywords with
    them. See pack_keywords() for more info.
    """
    packed = pack_keywords(hdr['ext_header'], _rep_tran[hdr['head_rep']],
                           structured=structured)
    hdr['ext_header'] = packed
    hdr['ext_size'] = len(packed)
项目:Cypher    作者:NullArray    | 项目源码 | 文件源码
def encrypt_file(key, in_filename, out_filename=None, chunksize=64*1024):

    if not out_filename:
        out_filename = in_filename + '.crypt'

    iv = ''.join(chr(random.randint(0, 0xFF)) for i in range(16))
    encryptor = AES.new(key, AES.MODE_CBC, iv)
    filesize = os.path.getsize(in_filename)

    with open(in_filename, 'rb') as infile:
        with open(out_filename, 'wb') as outfile:
            outfile.write(struct.pack('<Q', filesize))
            outfile.write(iv)

            while True:
                chunk = infile.read(chunksize)
                if len(chunk) == 0:
                    break
                elif len(chunk) % 16 != 0:
                    chunk += ' ' * (16 - len(chunk) % 16)

                outfile.write(encryptor.encrypt(chunk))
项目:Picamera    作者:loadstarCN    | 项目源码 | 文件源码
def Send_File_Client():
    sendSock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sendSock.connect(ADDR)

    fhead=struct.pack('IdI',1,float(time.time()),os.stat(filename).st_size)
    print(fhead)
    sendSock.send(fhead)
    fp = open(filename,'rb')

    while 1:
        filedata = fp.read(BUFSIZE)
        if not filedata: 
            break
        sendSock.send(filedata)

    '''
    print u"?????????????...\n"

    fp.close()
    sendSock.close()
    print u"?????...\n" 

    '''
项目:rain    作者:scizzorz    | 项目源码 | 文件源码
def to_rain(cls, val):
    if val is None:
      return cls.new(typi.null, 0, 0, cls.null)
    elif val is False:
      return cls.new(typi.bool, 0, 0, cls.null)
    elif val is True:
      return cls.new(typi.bool, 0, 1, cls.null)
    elif isinstance(val, int):
      return cls.new(typi.int, 0, val, cls.null)
    elif isinstance(val, float):
      raw = struct.pack('d', val)
      intrep = struct.unpack('Q', raw)[0]
      return cls.new(typi.float, 0, intrep, cls.null)
    elif isinstance(val, str):
      str_p = ct.create_string_buffer(val.encode('utf-8'))
      cls._saves_.append(str_p)
      return cls.new(typi.str, len(val), ct.cast(str_p, ct.c_void_p).value, cls.null)

    raise Exception("Can't convert value {!r} to Rain".format(val))
项目:tockloader    作者:helena-project    | 项目源码 | 文件源码
def _change_baud_rate (self, baud_rate):
        '''
        If the bootloader on the board supports it and if it succeeds, try to
        increase the baud rate to make everything faster.
        '''
        pkt = struct.pack('<BI', 0x01, baud_rate)
        success, ret = self._issue_command(self.COMMAND_CHANGE_BAUD_RATE, pkt, True, 0, self.RESPONSE_OK, show_errors=False)

        if success:
            # The bootloader is new enough to support this.
            # Increase the baud rate
            self.sp.baudrate = baud_rate
            # Now confirm that everything is working.
            pkt = struct.pack('<BI', 0x02, baud_rate)
            success, ret = self._issue_command(self.COMMAND_CHANGE_BAUD_RATE, pkt, False, 0, self.RESPONSE_OK, show_errors=False)

            if not success:
                # Something went wrong. Go back to old baud rate
                self.sp.baudrate = 115200
项目:tockloader    作者:helena-project    | 项目源码 | 文件源码
def read_range (self, address, length):
        # Can only read up to 4095 bytes at a time.
        MAX_READ = 4095
        read = bytes()
        this_length = 0
        remaining = length
        while remaining > 0:
            if remaining > MAX_READ:
                this_length = MAX_READ
                remaining -= MAX_READ
            else:
                this_length = remaining
                remaining = 0

            message = struct.pack('<IH', address, this_length)
            success, flash = self._issue_command(self.COMMAND_READ_RANGE, message, True, this_length, self.RESPONSE_READ_RANGE)

            if not success:
                raise TockLoaderException('Error: Could not read flash')
            else:
                read += flash

            address += this_length

        return read
项目:tockloader    作者:helena-project    | 项目源码 | 文件源码
def _get_crc_internal_flash (self, address, length):
        '''
        Get the bootloader to compute a CRC.
        '''
        message = struct.pack('<II', address, length)
        success, crc = self._issue_command(self.COMMAND_CRC_INTERNAL_FLASH, message, True, 4, self.RESPONSE_CRC_INTERNAL_FLASH)

        # There is a bug in a version of the bootloader where the CRC returns 6
        # bytes and not just 4. Need to read just in case to grab those extra
        # bytes.
        self.sp.read(2)

        if not success:
            if crc[1] == self.RESPONSE_BADADDR:
                raise TockLoaderException('Error: RESPONSE_BADADDR: Invalid address for CRC (address: 0x{:X})'.format(address))
            elif crc[1] == self.RESPONSE_BADARGS:
                raise TockLoaderException('Error: RESPONSE_BADARGS: Invalid length for CRC check')
            else:
                raise TockLoaderException('Error: 0x{:X}'.format(crc[1]))

        return crc
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def save_int(self, obj, pack=struct.pack):
        if self.bin:
            # If the int is small enough to fit in a signed 4-byte 2's-comp
            # format, we can store it more efficiently than the general
            # case.
            # First one- and two-byte unsigned ints:
            if obj >= 0:
                if obj <= 0xff:
                    self.write(BININT1 + chr(obj))
                    return
                if obj <= 0xffff:
                    self.write("%c%c%c" % (BININT2, obj&0xff, obj>>8))
                    return
            # Next check for 4-byte signed ints:
            high_bits = obj >> 31  # note that Python shift sign-extends
            if high_bits == 0 or high_bits == -1:
                # All high bits are copies of bit 2**31, so the value
                # fits in a 4-byte signed int.
                self.write(BININT + pack("<i", obj))
                return
        # Text pickle, or int too big to fit in signed 4-byte format.
        self.write(INT + repr(obj) + '\n')
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def save_string(self, obj, pack=struct.pack):
            unicode = obj.isunicode()

            if self.bin:
                if unicode:
                    obj = obj.encode("utf-8")
                l = len(obj)
                if l < 256 and not unicode:
                    self.write(SHORT_BINSTRING + chr(l) + obj)
                else:
                    s = pack("<i", l)
                    if unicode:
                        self.write(BINUNICODE + s + obj)
                    else:
                        self.write(BINSTRING + s + obj)
            else:
                if unicode:
                    obj = obj.replace("\\", "\\u005c")
                    obj = obj.replace("\n", "\\u000a")
                    obj = obj.encode('raw-unicode-escape')
                    self.write(UNICODE + obj + '\n')
                else:
                    self.write(STRING + repr(obj) + '\n')
            self.memoize(obj)
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def parse_netconn(self, seq, netconn):
        parts = netconn.split('|')
        new_conn = {}
        timestamp = convert_event_time(parts[0])
        try:
            new_conn['remote_ip'] = socket.inet_ntoa(struct.pack('>i', int(parts[1])))
        except:
            new_conn['remote_ip'] = '0.0.0.0'
        new_conn['remote_port'] = int(parts[2])
        new_conn['proto'] = protocols[int(parts[3])]
        new_conn['domain'] = parts[4]
        if parts[5] == 'true':
            new_conn['direction'] = 'Outbound'
        else:
            new_conn['direction'] = 'Inbound'

        return CbNetConnEvent(self.process_model, timestamp, seq, new_conn)
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def parse_netconn(self, seq, netconn):
        new_conn = {}
        timestamp = convert_event_time(netconn.get("timestamp", None))
        direction = netconn.get("direction", "true")

        if direction == 'true':
            new_conn['direction'] = 'Outbound'
        else:
            new_conn['direction'] = 'Inbound'

        for ipfield in ('remote_ip', 'local_ip', 'proxy_ip'):
            try:
                new_conn[ipfield] = socket.inet_ntoa(struct.pack('>i', int(netconn.get(ipfield, 0))))
            except:
                new_conn[ipfield] = netconn.get(ipfield, '0.0.0.0')

        for portfield in ('remote_port', 'local_port', 'proxy_port'):
            new_conn[portfield] = int(netconn.get(portfield, 0))

        new_conn['proto'] = protocols.get(int(netconn.get('proto', 0)), "Unknown")
        new_conn['domain'] = netconn.get('domain', '')

        return CbNetConnEvent(self.process_model, timestamp, seq, new_conn, version=2)
项目:barpyrus    作者:t-wissmann    | 项目源码 | 文件源码
def spawn(self, lines, additional_args = [ '-p', ''], width = None):
        (mouse_x, mouse_y) = get_mouse_location()
        if not width:
            width = 100 # some default width
        width = max(width, 101) # width has to be 100 at least (rofi restriction)
        # first, compute the top left corner of the menu
        menu_x = min(max(mouse_x - width/2, self.x), self.x + self.panel_width - width)
        menu_y = self.y
        # then, specify these coordinates relative to the mouse cursor
        menu_x -= mouse_x
        menu_y -= mouse_y
        # compile rofi arguments
        cmd = ['rofi', '-dmenu', '-sep' , '\\0' ]
        cmd += ['-monitor', '-3' ] # position relative to mouse cursor
        cmd += ['-layout', '1' ] # specify top left corner of the menu
        cmd += ['-width', str(width) ]
        cmd += ['-xoffset', str(menu_x), '-yoffset', str(menu_y) ]
        cmd += self.rofi_args
        cmd += additional_args
        rofi = subprocess.Popen(cmd,stdout=subprocess.PIPE,stdin=subprocess.PIPE)
        for i in lines:
            rofi.stdin.write(i.encode('utf-8'))
            rofi.stdin.write(struct.pack('B', 0))
        rofi.stdin.close()
        rofi.wait()
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def encodeDeltaRunAsBytes_(deltas, offset, stream):
        runLength = 0
        pos = offset
        numDeltas = len(deltas)
        while pos < numDeltas and runLength < 64:
            value = deltas[pos]
            if value < -128 or value > 127:
                break
            # Within a byte-encoded run of deltas, a single zero
            # is best stored literally as 0x00 value. However,
            # if are two or more zeroes in a sequence, it is
            # better to start a new run. For example, the sequence
            # of deltas [15, 15, 0, 15, 15] becomes 6 bytes
            # (04 0F 0F 00 0F 0F) when storing the zero value
            # literally, but 7 bytes (01 0F 0F 80 01 0F 0F)
            # when starting a new run.
            if value == 0 and pos+1 < numDeltas and deltas[pos+1] == 0:
                break
            pos += 1
            runLength += 1
        assert runLength >= 1 and runLength <= 64
        stream.write(bytechr(runLength - 1))
        for i in range(offset, pos):
            stream.write(struct.pack('b', round(deltas[i])))
        return pos
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def compile(self, ttFont):
        vorgs = list(self.VOriginRecords.values())
        names = list(self.VOriginRecords.keys())
        nameMap = ttFont.getReverseGlyphMap()
        lenRecords = len(vorgs)
        try:
            gids = map(operator.getitem, [nameMap]*lenRecords, names)
        except KeyError:
            nameMap = ttFont.getReverseGlyphMap(rebuild=True)
            gids = map(operator.getitem, [nameMap]*lenRecords, names)
        vOriginTable = list(zip(gids, vorgs))
        self.numVertOriginYMetrics = lenRecords
        vOriginTable.sort() # must be in ascending GID order
        dataList = [ struct.pack(">Hh", rec[0], rec[1]) for rec in vOriginTable]
        header = struct.pack(">HHhH", self.majorVersion, self.minorVersion, self.defaultVertOriginY, self.numVertOriginYMetrics)
        dataList.insert(0, header)
        data = bytesjoin(dataList)
        return data
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def compile(self, ttFont):
        self.tables.sort()    # sort according to the spec; see CmapSubtable.__lt__()
        numSubTables = len(self.tables)
        totalOffset = 4 + 8 * numSubTables
        data = struct.pack(">HH", self.tableVersion, numSubTables)
        tableData = b""
        seen = {}  # Some tables are the same object reference. Don't compile them twice.
        done = {}  # Some tables are different objects, but compile to the same data chunk
        for table in self.tables:
            try:
                offset = seen[id(table.cmap)]
            except KeyError:
                chunk = table.compile(ttFont)
                if chunk in done:
                    offset = done[chunk]
                else:
                    offset = seen[id(table.cmap)] = done[chunk] = totalOffset + len(tableData)
                    tableData = tableData + chunk
            data = data + struct.pack(">HHl", table.platformID, table.platEncID, offset)
        return data + tableData
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def compile(self, ttFont):
        if self.data:
            return struct.pack(">HHH", self.format, self.length, self.language) + self.data
        cmap = self.cmap
        codes = sorted(cmap.keys())
        if codes: # yes, there are empty cmap tables.
            codes = list(range(codes[0], codes[-1] + 1))
            firstCode = codes[0]
            valueList = [cmap.get(code, ".notdef") for code in codes]
            valueList = map(ttFont.getGlyphID, valueList)
            gids = array.array("H", valueList)
            if sys.byteorder != "big":
                gids.byteswap()
            data = gids.tostring()
        else:
            data = b""
            firstCode = 0
        header = struct.pack(">HHHHH",
                6, len(data) + 10, self.language, firstCode, len(codes))
        return header + data
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def compile(self, ttFont):
        if not hasattr(self, "names"):
            # only happens when there are NO name table entries read
            # from the TTX file
            self.names = []
        names = self.names
        names.sort() # sort according to the spec; see NameRecord.__lt__()
        stringData = b""
        format = 0
        n = len(names)
        stringOffset = 6 + n * sstruct.calcsize(nameRecordFormat)
        data = struct.pack(b">HHH", format, n, stringOffset)
        lastoffset = 0
        done = {}  # remember the data so we can reuse the "pointers"
        for name in names:
            string = name.toBytes()
            if string in done:
                name.offset, name.length = done[string]
            else:
                name.offset, name.length = done[string] = len(stringData), len(string)
                stringData = bytesjoin([stringData, string])
            data = data + sstruct.pack(nameRecordFormat, name)
        return data + stringData
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def compile(self, ttFont):
        # First make sure that all the data lines up properly. Format 4
        # must have all its data lined up consecutively. If not this will fail.
        for curLoc, nxtLoc in zip(self.locations, self.locations[1:]):
            assert curLoc[1] == nxtLoc[0], "Data must be consecutive in indexSubTable format 4"

        offsets = list(self.locations[0]) + [loc[1] for loc in self.locations[1:]]
        # Image data offset must be less than or equal to the minimum of locations.
        # Resetting this offset may change the value for round tripping but is safer
        # and allows imageDataOffset to not be required to be in the XML version.
        self.imageDataOffset = min(offsets)
        offsets = [offset - self.imageDataOffset for offset in offsets]
        glyphIds = list(map(ttFont.getGlyphID, self.names))
        # Create an iterator over the ids plus a padding value.
        idsPlusPad = list(itertools.chain(glyphIds, [0]))

        dataList = [EblcIndexSubTable.compile(self, ttFont)]
        dataList.append(struct.pack(">L", len(glyphIds)))
        tmp = [struct.pack(codeOffsetPairFormat, *cop) for cop in zip(idsPlusPad, offsets)]
        dataList += tmp
        data = bytesjoin(dataList)
        return data
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def discover_peers(self, port=None):
        """This method can be invoked (periodically?) to broadcast message to
        discover peers, if there is a chance initial broadcast message may be
        lost (as these messages are sent over UDP).
        """
        ping_msg = {'signature': self._signature, 'name': self._name, 'version': __version__}

        def _discover(addrinfo, port, task=None):
            ping_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM))
            ping_sock.settimeout(2)
            if addrinfo.family == socket.AF_INET:
                ping_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
            else:  # addrinfo.family == socket.AF_INET6
                ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS,
                                     struct.pack('@i', 1))
                ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, addrinfo.ifn)
            ping_sock.bind((addrinfo.ip, 0))
            if not port:
                port = addrinfo.udp_sock.getsockname()[1]
            ping_msg['location'] = addrinfo.location
            try:
                yield ping_sock.sendto('ping:'.encode() + serialize(ping_msg),
                                       (addrinfo.broadcast, port))
            except:
                pass
            ping_sock.close()

        for addrinfo in self._addrinfos:
            SysTask(_discover, addrinfo, port)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def _async_send_msg(self, data):
        """Internal use only; use 'send_msg' with 'yield' instead.

        Messages are tagged with length of the data, so on the receiving side,
        recv_msg knows how much data to receive.
        """
        yield self.sendall(struct.pack('>L', len(data)) + data)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def _sync_send_msg(self, data):
        """Internal use only; use 'send_msg' instead.

        Synchronous version of async_send_msg.
        """
        return self._sync_sendall(struct.pack('>L', len(data)) + data)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def discover_peers(self, port=None):
        """This method can be invoked (periodically?) to broadcast message to
        discover peers, if there is a chance initial broadcast message may be
        lost (as these messages are sent over UDP).
        """
        ping_msg = {'signature': self._signature, 'name': self._name, 'version': __version__}

        def _discover(addrinfo, port, task=None):
            ping_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM))
            ping_sock.settimeout(2)
            if addrinfo.family == socket.AF_INET:
                ping_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
            else:  # addrinfo.family == socket.AF_INET6
                ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS,
                                     struct.pack('@i', 1))
                ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, addrinfo.ifn)
            ping_sock.bind((addrinfo.ip, 0))
            if not port:
                port = addrinfo.udp_sock.getsockname()[1]
            ping_msg['location'] = addrinfo.location
            try:
                yield ping_sock.sendto('ping:'.encode() + serialize(ping_msg),
                                       (addrinfo.broadcast, port))
            except:
                pass
            ping_sock.close()

        for addrinfo in self._addrinfos:
            SysTask(_discover, addrinfo, port)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def _async_send_msg(self, data):
        """Internal use only; use 'send_msg' with 'yield' instead.

        Messages are tagged with length of the data, so on the receiving side,
        recv_msg knows how much data to receive.
        """
        yield self.sendall(struct.pack('>L', len(data)) + data)
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def get_pixels_slow(self, ximage):
        ''' Retrieve all pixels from a monitor. Pixels have to be RGB.
            (!) Insanely slow version, see doc/linux-slow-version. (!)
        '''

        # @TODO: this part takes most of the time. Need a better solution.
        def pix(pixel, _resultats={}, p__=pack):
            ''' Apply shifts to a pixel to get the RGB values.
                This method uses of memoization.
            '''

            # pylint: disable=dangerous-default-value

            if pixel not in _resultats:
                _resultats[pixel] = p__('<B', (pixel & rmask) >> 16) + \
                    p__('<B', (pixel & gmask) >> 8) + \
                    p__('<B', pixel & bmask)
            return _resultats[pixel]

        self.width = ximage.contents.width
        self.height = ximage.contents.height
        rmask = ximage.contents.red_mask
        bmask = ximage.contents.blue_mask
        gmask = ximage.contents.green_mask
        get_pix = self.xlib.XGetPixel
        pixels = [pix(get_pix(ximage, x, y))
                  for y in range(self.height) for x in range(self.width)]
        self.image = b''.join(pixels)
        return self.image
项目:Stitch    作者:nathanlopez    | 项目源码 | 文件源码
def decrypt_single_hash(rid, hbootkey, enc_hash, lmntstr):
    (des_k1,des_k2) = sid_to_key(rid)
    d1 = DES.new(des_k1, DES.MODE_ECB)
    d2 = DES.new(des_k2, DES.MODE_ECB)

    md5 = MD5.new()
    md5.update(hbootkey[:0x10] + pack("<L",rid) + lmntstr)
    rc4_key = md5.digest()
    rc4 = ARC4.new(rc4_key)
    obfkey = rc4.encrypt(enc_hash)
    hash = d1.decrypt(obfkey[:8]) + d2.decrypt(obfkey[8:])

    return hash
项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def write_word_data(self, addr, cmd, val):
        """Write a word (2 bytes) of data to the specified cmd register of the
        device.  Note that this will write the data in the endianness of the
        processor running Python (typically little endian)!
        """
        assert self._device is not None, 'Bus must be opened before operations are made against it!'
        # Construct a string of data to send with the command register and word value.
        data = struct.pack('=BH', cmd & 0xFF, val & 0xFFFF)
        # Send the data to the device.
        self._select_device(addr)
        self._device.write(data)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def _decode_pointer(self, size, offset):
        pointer_size = ((size >> 3) & 0x3) + 1
        new_offset = offset + pointer_size
        pointer_bytes = self._buffer[offset:new_offset]
        packed = pointer_bytes if pointer_size == 4 else struct.pack(
            b'!c', byte_from_int(size & 0x7)) + pointer_bytes
        unpacked = int_from_bytes(packed)
        pointer = unpacked + self._pointer_base + \
            self._pointer_value_offset[pointer_size]
        if self._pointer_test:
            return pointer, new_offset
        (value, _) = self.decode(pointer)
        return value, new_offset
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def v6_int_to_packed(address):
    """The binary representation of this address.

    Args:
        address: An integer representation of an IPv6 IP address.

    Returns:
        The binary representation of this address.
    """
    return Bytes(struct.pack('!QQ', address >> 64, address & (2**64 - 1)))
项目:pnet    作者:vodik    | 项目源码 | 文件源码
def ipv4_checksum(ip4, udp, payload):
    ip4_src = ip4['src']
    ip4_dst = ip4['dst']
    ip4_len = len(payload) + len(UDP)
    return checksum(ip4_src or b'\x00' * 4,
                    ip4_dst or b'\x00' * 4,
                    struct.pack('!HH', ip4['p'], ip4_len),
                    udp.view,
                    payload)
项目:shellgen    作者:MarioVilas    | 项目源码 | 文件源码
def _load_bytecode_from_source(input):

    # Load the regular expressions as local variables,
    # since we're using them in a loop.
    re_is_line    = _re_is_line
    re_parse_line = _re_parse_line

    # We'll accumulate the hexadecimal characters here.
    hexa = []

    # Flag to signal errors during parsing.
    parse_warning = False

    # For each line of text in the input file...
    for line in input.readlines():

        # Skip lines that don't contain bytecode.
        if re_is_line.match(line):

            # Extract hexadecimal sequences.
            sequence = re_parse_line.findall(line)
            if sequence:
                hexa.extend(sequence)

            # Flag parsing errors.
            elif '\\x' in line or '%' in line:
                parse_warning = True

    # Show a warning if we had parsing errors.
    if parse_warning:
        warnings.warn("Source code layout was changed, possible load errors!")

    # Raise an exception if no bytecode was extracted.
    if not hexa:
        raise IOError("Load error, no bytecode found")

    # Pack the bytecode and return it.
    hexdump = [int(x, 16) for x in hexa]
    return struct.pack('B' * len(hexdump), *hexdump)

#-----------------------------------------------------------------------------#
项目:pyOSC3    作者:Qirky    | 项目源码 | 文件源码
def OSCString(next):
    """Convert a string into a zero-padded OSC String.
    The length of the resulting string is always a multiple of 4 bytes.
    The string ends with 1 to 4 zero-bytes ('\x00') 
    """

    OSCstringLength = math.ceil((len(next)+1) / 4.0) * 4
    if sys.version_info[0] > 2:
        next = bytes(next.encode("UTF-8"))
    else:
        next = str(next)
    return struct.pack(">%ds" % (OSCstringLength), next)
项目:pyOSC3    作者:Qirky    | 项目源码 | 文件源码
def OSCBlob(next):
    """Convert a string into an OSC Blob.
    An OSC-Blob is a binary encoded block of data, prepended by a 'size' (int32).
    The size is always a mutiple of 4 bytes. 
    The blob ends with 0 to 3 zero-bytes ('\x00') 
    """

    if type(next) in (str, bytes):
        OSCblobLength = math.ceil((len(next)) / 4.0) * 4
        binary = struct.pack(">i%ds" % (OSCblobLength), OSCblobLength, next)
    else:
        binary = ""

    return binary