Python struct 模块,calcsize() 实例源码


项目:Cypher    作者:NullArray    | 项目源码 | 文件源码
def decrypt_file(key, in_filename, out_filename=None, chunksize=24*1024):

    # Split .crypt extension to restore file format
    if not out_filename:
        out_filename = os.path.splitext(in_filename)[0]

    with open(in_filename, 'rb') as infile:
        origsize = struct.unpack('<Q','Q')))[0]
        iv =
        decryptor =, AES.MODE_CBC, iv)

        with open(out_filename, 'wb') as outfile:
            while True:
                chunk =
                if len(chunk) == 0:

        # Truncate file to original size
项目:lan-ichat    作者:Forec    | 项目源码 | 文件源码
def run(self):
        print("VEDIO server starts...")
        conn, addr = self.sock.accept()
        print("remote VEDIO client success connected...")
        data = "".encode("utf-8")
        payload_size = struct.calcsize("L")
        cv2.namedWindow('Remote', cv2.WINDOW_NORMAL)
        while True:
            while len(data) < payload_size:
                data += conn.recv(81920)
            packed_size = data[:payload_size]
            data = data[payload_size:]
            msg_size = struct.unpack("L", packed_size)[0]
            while len(data) < msg_size:
                data += conn.recv(81920)
            zframe_data = data[:msg_size]
            data = data[msg_size:]
            frame_data = zlib.decompress(zframe_data)
            frame = pickle.loads(frame_data)
            cv2.imshow('Remote', frame)
            if cv2.waitKey(1) & 0xFF == 27:
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def _getOffsets(self):
        Calculate offsets to VDMX_Group records.
        For each ratRange return a list of offset values from the beginning of
        the VDMX table to a VDMX_Group.
        lenHeader = sstruct.calcsize(VDMX_HeaderFmt)
        lenRatRange = sstruct.calcsize(VDMX_RatRangeFmt)
        lenOffset = struct.calcsize('>H')
        lenGroupHeader = sstruct.calcsize(VDMX_GroupFmt)
        lenVTable = sstruct.calcsize(VDMX_vTableFmt)
        # offset to the first group
        pos = lenHeader + self.numRatios*lenRatRange + self.numRatios*lenOffset
        groupOffsets = []
        for group in self.groups:
            lenGroup = lenGroupHeader + len(group) * lenVTable
            pos += lenGroup  # offset to next group
        offsets = []
        for ratio in self.ratRanges:
            groupIndex = ratio['groupIndex']
        return offsets
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def decompile(self, data, ttFont):
        sstruct.unpack2(Silf_hdr_format, data, self)
        if self.version >= 5.0:
            (data, self.scheme) = grUtils.decompress(data)
            sstruct.unpack2(Silf_hdr_format_3, data, self)
            base = sstruct.calcsize(Silf_hdr_format_3)
        elif self.version < 3.0:
            self.numSilf = struct.unpack('>H', data[4:6])
            self.scheme = 0
            self.compilerVersion = 0
            base = 8
            self.scheme = 0
            sstruct.unpack2(Silf_hdr_format_3, data, self)
            base = sstruct.calcsize(Silf_hdr_format_3)

        silfoffsets = struct.unpack_from(('>%dL' % self.numSilf), data[base:])
        for offset in silfoffsets:
            s = Silf()
            s.decompile(data[offset:], ttFont, self.version)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def read_f(self, offset=None):
        if offset is not None:
        d = {}
        for key, fmt in self.description:
            buf =
            if len(buf) != struct.calcsize(fmt):
                return None
            val = list(struct.unpack(fmt, buf))
            for i, ival in enumerate(val):
                if hasattr(ival, 'replace'):
                    ival = ival.replace(unicode.encode('\x03'), unicode.encode(''))
                    ival = ival.replace(unicode.encode('\x00'), unicode.encode(''))
                    val[i] = ival.decode("utf-8")
            if len(val) == 1:
                val = val[0]
            d[key] = val
        return d
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def read_f(self, offset =None):
        if offset is not None :
        d = { }
        for key, fmt in self.description :
            fmt = '<' + fmt # insures use of standard sizes
            buf =
            if len(buf) != struct.calcsize(fmt) : return None
            val = list(struct.unpack(fmt , buf))
            for i, ival in enumerate(val):
                if hasattr(ival, 'split'):
                    val[i] = ival.split('\x00', 1)[0]
            if len(val) == 1:
                val = val[0]
            d[key] = val
        return d
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def read_f(self, offset=None):
        if offset is not None:
        d = {}
        for key, fmt in self.description:
            buf =
            if len(buf) != struct.calcsize(fmt):
                return None
            val = list(struct.unpack(fmt, buf))
            for i, ival in enumerate(val):
                if hasattr(ival, 'replace'):
                    ival = ival.replace(str.encode('\x03'), str.encode(''))
                    ival = ival.replace(str.encode('\x00'), str.encode(''))
                    val[i] = ival.decode("utf-8")
            if len(val) == 1:
                val = val[0]
            d[key] = val
        return d
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def read_f(self, offset =None):
        if offset is not None :
        d = { }
        for key, fmt in self.description :
            fmt = '<' + fmt # insures use of standard sizes
            buf =
            if len(buf) != struct.calcsize(fmt) : return None
            val = list(struct.unpack(fmt , buf))
            for i, ival in enumerate(val):
                if hasattr(ival, 'split'):
                    val[i] = ival.split('\x00', 1)[0]
            if len(val) == 1:
                val = val[0]
            d[key] = val
        return d
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def _RegisterWndClass(self):
        className = "PythonSplash"
        global g_registeredClass
        if not g_registeredClass:
            message_map = {}
            wc = win32gui.WNDCLASS()
            wc.SetDialogProc() # Make it a dialog class.
            self.hinst = wc.hInstance = win32api.GetModuleHandle(None)
            wc.lpszClassName = className
   = 0
            wc.hCursor = win32gui.LoadCursor( 0, win32con.IDC_ARROW )
            wc.hbrBackground = win32con.COLOR_WINDOW + 1
            wc.lpfnWndProc = message_map # could also specify a wndproc.
            wc.cbWndExtra = win32con.DLGWINDOWEXTRA + struct.calcsize("Pi")
            classAtom = win32gui.RegisterClass(wc)
            g_registeredClass = 1
        return className
项目:photinia    作者:XoriieInpottn    | 项目源码 | 文件源码
def _extract_images(filename):

        :param filename: ?????
        :return: 4??numpy??[index, y, x, depth]? ???np.float32
        images = []
        print('Extracting {}'.format(filename))
        with gzip.GzipFile(fileobj=open(filename, 'rb')) as f:
            buf =
            index = 0
            magic, num_images, rows, cols = struct.unpack_from('>IIII', buf, index)
            if magic != 2051:
                raise ValueError('Invalid magic number {} in MNIST image file: {}'.format(magic, filename))
            index += struct.calcsize('>IIII')
            for i in range(num_images):
                img = struct.unpack_from('>784B', buf, index)
                index += struct.calcsize('>784B')
                img = np.array(img, dtype=np.float32)
                # ????[0,255]???[0,1]
                img = np.multiply(img, 1.0 / 255.0)
                img = img.reshape(rows, cols, 1)
        return np.array(images, dtype=np.float32)
项目:photinia    作者:XoriieInpottn    | 项目源码 | 文件源码
def _extract_labels(filename, num_classes=10):

        :param filename: ?????
        :param num_classes: ??one-hot??????????10?
        :return: 2??numpy??[index, num_classes]? ???np.float32
        labels = []
        print('Extracting {}'.format(filename))
        with gzip.GzipFile(fileobj=open(filename, 'rb')) as f:
            buf =
            index = 0
            magic, num_labels = struct.unpack_from('>II', buf, index)
            if magic != 2049:
                raise ValueError('Invalid magic number {} in MNIST label file: {}'.format(magic, filename))
            index += struct.calcsize('>II')
            for i in range(num_labels):
                label = struct.unpack_from('>B', buf, index)
                index += struct.calcsize('>B')
                label_one_hot = np.zeros(num_classes, dtype=np.float32)
                label_one_hot[label[0]] = 1
        return np.array(labels, dtype=np.float32)
项目:tensorflow-study    作者:bob-chen    | 项目源码 | 文件源码
def read_image(filename):
    f = open(filename, 'rb')
    index = 0
    buf =

    magic, images, rows, columns = struct.unpack_from('>IIII' , buf , index)
    index += struct.calcsize('>IIII')
    for i in xrange(images):
        #for i in xrange(2000):
        image ='L', (columns, rows))

        for x in xrange(rows):
            for y in xrange(columns):
                image.putpixel((y, x), int(struct.unpack_from('>B', buf, index)[0]))
                index += struct.calcsize('>B')

        print 'save ' + str(i) + 'image''./test/' + str(i) + '.png')
项目:tensorflow-study    作者:bob-chen    | 项目源码 | 文件源码
def read_label(filename, saveFilename):
    f = open(filename, 'rb')
    index = 0
    buf =

    magic, labels = struct.unpack_from('>II' , buf , index)
    index += struct.calcsize('>II')

    labelArr = [0] * labels
    #labelArr = [0] * 2000

    for x in xrange(labels):
        #for x in xrange(2000):
        labelArr[x] = int(struct.unpack_from('>B', buf, index)[0])
        index += struct.calcsize('>B')

    save = open(saveFilename, 'w')

    save.write(','.join(map(lambda x: str(x), labelArr)))

    print 'save labels success'
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def unpack(self, buf):
        if len(buf) < self.pyr_len:
            raise StorageError("Buffer too short")
        self._magic, essidlen = struct.unpack(self.pyr_head, \
        if self._magic == 'PYR2':
            self._delimiter = '\n'
        elif self._magic == 'PYRT':
            self._delimiter = '\00'
            raise StorageError("Not a PYRT- or PYR2-buffer.")
        headfmt = "<%ssi16s" % (essidlen, )
        headsize = struct.calcsize(headfmt)
        header = buf[self.pyr_len:self.pyr_len + headsize]
        if len(header) != headsize:
            raise StorageError("Invalid header size")
        header = struct.unpack(headfmt, header)
        self.essid, self._numElems, self._digest = header
        pmkoffset = self.pyr_len + headsize
        pwoffset = pmkoffset + self._numElems * 32
        self._pwbuffer = buf[pwoffset:]
        self._pmkbuffer = buf[pmkoffset:pwoffset]
        if len(self._pmkbuffer) % 32 != 0:
            raise StorageError("pmkbuffer seems truncated")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def valid_ranges(*types):
    # given a sequence of numeric types, collect their _type_
    # attribute, which is a single format character compatible with
    # the struct module, use the struct module to calculate the
    # minimum and maximum value allowed for this format.
    # Returns a list of (min, max) values.
    result = []
    for t in types:
        fmt = t._type_
        size = struct.calcsize(fmt)
        a = struct.unpack(fmt, ("\x00"*32)[:size])[0]
        b = struct.unpack(fmt, ("\xFF"*32)[:size])[0]
        c = struct.unpack(fmt, ("\x7F"+"\x00"*32)[:size])[0]
        d = struct.unpack(fmt, ("\x80"+"\xFF"*32)[:size])[0]
        result.append((min(a, b, c, d), max(a, b, c, d)))
    return result
项目:ripe-atlas-scripts    作者:farrokhi    | 项目源码 | 文件源码
def _types_bitmap(cls, data, error):
        bits = []
        o = 0
        while o < len(data):
            fmt = "!BB"
            fmtsz = struct.calcsize(fmt)
            dat = data[o:o+fmtsz]
            if len(dat) != fmtsz:
                e = ("_types_bitmap", o, 'offset out of range: data size = %d' % len(data))
                return None
            block, bytes = struct.unpack(fmt, dat)
            o += fmtsz
            for i in range(bytes):
                b = struct.unpack("!B", data[o+i:o+i+1])[0]
                for j in range(8):
                        if b & (1 << (7-j)):
            o += bytes
        return bits
项目:ripe-atlas-scripts    作者:farrokhi    | 项目源码 | 文件源码
def _do_query(cls, buf, offset, error):
        qry = {}
        res = cls._do_name(buf, offset, 0, error)
        if res is None:
            e = ("_do_query", offset, "_do_name failed")
            return None
        offset, name = res
        qry['Qname'] = name

        fmt = "!HH"
        reqlen = struct.calcsize(fmt)
        strng = buf[offset:offset + reqlen]
        if len(strng) != reqlen:
            e = ("_do_query", offset, 'offset out of range: buf size = %d' % len(buf))
            return None
        res = struct.unpack(fmt, strng)
        qry['Qtype'] = cls._type_to_text(res[0])
        qry['Qclass'] = cls._class_to_text(res[1])

        return offset + reqlen, qry
项目:wltrace    作者:jhshi    | 项目源码 | 文件源码
def calc_padding(fmt, align):
    """Calculate how many padding bytes needed for ``fmt`` to be aligned to

        fmt (str): :mod:`struct` format.
        align (int): alignment (2, 4, 8, etc.)

        str: padding format (e.g., various number of 'x').

    >>> calc_padding('b', 2)

    >>> calc_padding('b', 3)
    remain = struct.calcsize(fmt) % align
    if remain == 0:
        return ""
    return 'x' * (align - remain)
项目:nstock    作者:ybenitezf    | 项目源码 | 文件源码
def _read_extras(self):
        dbfile = self.dbfile

        # Read the extras

        # Set up for reading the index array
        indextype = self.extras["indextype"]
        self.indexbase = dbfile.tell()
        self.indexlen = self.extras["indexlen"]
        self.indexsize = struct.calcsize(indextype)
        # Set up the function to read values from the index array
        if indextype == "B":
            self._get_pos = dbfile.get_byte
        elif indextype == "H":
            self._get_pos = dbfile.get_ushort
        elif indextype == "i":
            self._get_pos = dbfile.get_int
        elif indextype == "I":
            self._get_pos = dbfile.get_uint
        elif indextype == "q":
            self._get_pos = dbfile.get_long
            raise Exception("Unknown index type %r" % indextype)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def doVersion(checkForArgs=True):
  forceCheck = simple = False
  if checkForArgs:
    while Cmd.ArgumentsRemaining():
      myarg = getArgument()
      if myarg == u'check':
        forceCheck = True
      elif myarg == u'simple':
        simple = True
  if simple:
  import struct
  version_data = u'GAM {0} - {1}\n{2}\nPython {3}.{4}.{5} {6}-bit {7}\ngoogle-api-python-client {8}\noauth2client {9}\n{10} {11}\nPath: {12}\n'
  writeStdout(version_data.format(__version__, GAM_URL, __author__, sys.version_info[0],
                                  sys.version_info[1], sys.version_info[2], struct.calcsize(u'P')*8,
                                  sys.version_info[3], googleapiclient.__version__, oauth2client.__version__, platform.platform(),
                                  platform.machine(), GM.Globals[GM.GAM_PATH]))
  if forceCheck:

# gam help
项目:Software-Architecture-with-Python    作者:PacktPublishing    | 项目源码 | 文件源码
def receive(channel):
    """ Receive a message from a channel """

    size = struct.calcsize("L")
    size = channel.recv(size)
        size = socket.ntohl(struct.unpack("L", size)[0])
    except struct.error as e:
        return ''

    buf = ""

    while len(buf) < size:
        buf = channel.recv(size - len(buf))

    return pickle.loads(buf)[0]
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def UnpackNMITEMACTIVATE(lparam):
    format = _nmhdr_fmt + _nmhdr_align_padding
    if is64bit:
        # the struct module doesn't handle this correctly as some of the items
        # are actually structs in structs, which get individually aligned.
        format = format + "iiiiiiixxxxP"
        format = format + "iiiiiiiP"
    buf = win32gui.PyMakeBuffer(struct.calcsize(format), lparam)
    return _MakeResult("NMITEMACTIVATE hwndFrom idFrom code iItem iSubItem uNewState uOldState uChanged actionx actiony lParam",
                       struct.unpack(format, buf))

# We use the struct module to pack and unpack strings as MENUITEMINFO
# structures.  We also have special handling for the 'fMask' item in that
# structure to avoid the caller needing to explicitly check validity
# (None is used if the mask excludes/should exclude the value)
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def PackMENUINFO(dwStyle = None, cyMax = None,
                 hbrBack = None, dwContextHelpID = None, dwMenuData = None,
                 fMask = 0):
    if dwStyle is None: dwStyle = 0
    else: fMask |= win32con.MIM_STYLE
    if cyMax is None: cyMax = 0
    else: fMask |= win32con.MIM_MAXHEIGHT
    if hbrBack is None: hbrBack = 0
    else: fMask |= win32con.MIM_BACKGROUND
    if dwContextHelpID is None: dwContextHelpID = 0
    else: fMask |= win32con.MIM_HELPID
    if dwMenuData is None: dwMenuData = 0
    else: fMask |= win32con.MIM_MENUDATA
    # Create the struct.
    item = struct.pack(
                struct.calcsize(_menuinfo_fmt), # cbSize
    return array.array("b", item)
项目:deoplete-asm    作者:zchee    | 项目源码 | 文件源码
def _StructPackDecoder(wire_type, format):
  """Return a constructor for a decoder for a fixed-width field.

      wire_type:  The field's wire type.
      format:  The format string to pass to struct.unpack().

  value_size = struct.calcsize(format)
  local_unpack = struct.unpack

  # Reusing _SimpleDecoder is slightly slower than copying a bunch of code, but
  # not enough to make a significant difference.

  # Note that we expect someone up-stack to catch struct.error and convert
  # it to _DecodeError -- this way we don't have to set up exception-
  # handling blocks every time we parse one value.

  def InnerDecode(buffer, pos):
    new_pos = pos + value_size
    result = local_unpack(format, buffer[pos:new_pos])[0]
    return (result, new_pos)
  return _SimpleDecoder(wire_type, InnerDecode)
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def _process_scrape(self, payload, trans):
        info_struct = '!LLL'
        info_size = struct.calcsize(info_struct)
        info_count = len(payload) / info_size
        hashes = trans['sent_hashes']
        response = {}
        for info_offset in xrange(info_count):
            off = info_size * info_offset
            info = payload[off:off + info_size]
            seeders, completed, leechers = struct.unpack(info_struct, info)
            response[hashes[info_offset]] = {
                'seeders': seeders,
                'completed': completed,
                'leechers': leechers,
        return response
项目:NodePingManage    作者:hongfeioo    | 项目源码 | 文件源码
def send_one_ping(my_socket, dest_addr, ID):
    Send one ping to the given >dest_addr<.
    dest_addr  =  socket.gethostbyname(dest_addr)

    # Header is type (8), code (8), checksum (16), id (16), sequence (16)
    my_checksum = 0

    # Make a dummy heder with a 0 checksum.
    header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
    bytesInDouble = struct.calcsize("d")
    data = (192 - bytesInDouble) * "Q"
    data = struct.pack("d", default_timer()) + data

    # Calculate the checksum on the data and the dummy header.
    my_checksum = checksum(header + data)

    # Now that we have the right checksum, we put that in. It's just easier
    # to make up a new header than to stuff it into the dummy.
    header = struct.pack(
        "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
    packet = header + data
    my_socket.sendto(packet, (dest_addr, 1)) # Don't know about the 1
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def get_platform():
    """ get_platform()

    Get a string that specifies the platform more specific than
    sys.platform does. The result can be: linux32, linux64, win32,
    win64, osx32, osx64. Other platforms may be added in the future.
    # Get platform
    if sys.platform.startswith('linux'):
        plat = 'linux%i'
    elif sys.platform.startswith('win'):
        plat = 'win%i'
    elif sys.platform.startswith('darwin'):
        plat = 'osx%i'
    else:  # pragma: no cover
        return None

    return plat % (struct.calcsize('P') * 8)  # 32 or 64 bits
项目:Python-GoogleDrive-VideoStream    作者:ddurdle    | 项目源码 | 文件源码
def decryptStreamChunkOld(self,response, wfile, chunksize=24*1024, startOffset=0):
            if ENCRYPTION_ENABLE == 0:
    #    with open(in_filename, 'rb') as infile:
            origsize = struct.unpack('<Q','Q')))[0]
            decryptor =, AES.MODE_ECB)

            count = 0
            while True:
                chunk =
                count = count + 1
                if len(chunk) == 0:
                responseChunk = decryptor.decrypt(chunk)
                if count == 1 and startOffset !=0:
                elif (len(chunk)) < (len(responseChunk.strip())):
项目:Python-GoogleDrive-VideoStream    作者:ddurdle    | 项目源码 | 文件源码
def decryptCalculatePadding(self,response, chunksize=24*1024):
            if ENCRYPTION_ENABLE == 0:
    #    with open(in_filename, 'rb') as infile:
            origsize = struct.unpack('<Q','Q')))[0]
            decryptor =, AES.MODE_ECB)

            count = 0
            while True:
                chunk =
                count = count + 1
                if len(chunk) == 0:

                responseChunk = decryptor.decrypt(chunk)
                return int(len(chunk) - len(responseChunk.strip()))
项目:lan-ichat    作者:Forec    | 项目源码 | 文件源码
def run(self):
        print("VEDIO server starts...")
        conn, addr = self.sock.accept()
        print("remote VEDIO client success connected...")
        data = "".encode("utf-8")
        payload_size = struct.calcsize("L")
        cv2.namedWindow('Remote', cv2.WINDOW_NORMAL)
        while True:
            while len(data) < payload_size:
                data += conn.recv(81920)
            packed_size = data[:payload_size]
            data = data[payload_size:]
            msg_size = struct.unpack("L", packed_size)[0]
            while len(data) < msg_size:
                data += conn.recv(81920)
            zframe_data = data[:msg_size]
            data = data[msg_size:]
            frame_data = zlib.decompress(zframe_data)
            frame = pickle.loads(frame_data)
            cv2.imshow('Remote', frame)
            if cv2.waitKey(1) & 0xFF == 27:
项目:srctools    作者:TeamSpen210    | 项目源码 | 文件源码
def read_texture_names(self):
        """Iterate through all brush textures in the map."""
        tex_data = self.get_lump(BSP_LUMPS.TEXDATA_STRING_DATA)
        tex_table = self.get_lump(BSP_LUMPS.TEXDATA_STRING_TABLE)
        # tex_table is an array of int offsets into tex_data. tex_data is a
        # null-terminated block of strings.

        table_offsets = struct.unpack(
            # The number of ints + i, for the repetitions in the struct.
            str(len(tex_table) // struct.calcsize('i')) + 'i',

        for off in table_offsets:
            # Look for the NULL at the end - strings are limited to 128 chars.
            str_off = 0
            for str_off in range(off, off + 128):
                if tex_data[str_off] == 0:
                    yield tex_data[off: str_off].decode('ascii')
                # Reached the 128 char limit without finding a null.
                raise ValueError('Bad string at', off, 'in BSP! ("{}")'.format(
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def get_size(self):
        # XXX: even more hackish, we need a better way
        if self.type.is_void():
            return 0
        elif self.type.is_bool():
            # not strictly correct, but we cannot return 1/8
            return 0
        import struct
        return struct.calcsize(self.get_fmt())

# =============================================
# hand-written union subclasses
# =============================================
# As of now, the compiler is not capable of generating different subclasses
# for each union tag. In the meantime, write it by hand
项目:capnpy    作者:antocuni    | 项目源码 | 文件源码
def check(self, fmt, value):
        from random import randrange
        # build a buffer which is surely big enough to contain what we need
        # and check:
        #   1) that we correctly write the bytes we expect
        #   2) that we do NOT write outside the bounds
        pattern = [six.int2byte(randrange(256)) for _ in range(256)]
        pattern = b''.join(pattern)
        buf = bytearray(pattern)
        buf2 = bytearray(pattern)
        offset = 16
        pack_into(ord(fmt), buf, offset, value)
        struct.pack_into(fmt, buf2, offset, value)
        assert buf == buf2
        # check that it raises if it's out of bound
        out_of_bound = 256-struct.calcsize(fmt)+1
        pytest.raises(IndexError, "pack_into(ord(fmt), buf, out_of_bound, value)")
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _get_launcher(self, kind):
            if struct.calcsize('P') == 8:   # 64-bit
                bits = '64'
                bits = '32'
            name = '%s%s.exe' % (kind, bits)
            # Issue 31: don't hardcode an absolute package name, but
            # determine it relative to the current package
            distlib_package = __name__.rsplit('.', 1)[0]
            result = finder(distlib_package).find(name).bytes
            return result

    # Public API follows
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def is_64bit():
    return struct.calcsize("P") == 8
项目:SceneDensity    作者:ImOmid    | 项目源码 | 文件源码
def _process_pHYs(self, data):
        self.phys = data
        fmt = "!LLB"
        if len(data) != struct.calcsize(fmt):
            raise FormatError("pHYs chunk has incorrect length.")
        self.x_pixels_per_unit, self.y_pixels_per_unit, unit = struct.unpack(fmt,data)
        self.unit_is_meter = bool(unit)
项目:estreamer    作者:spohara79    | 项目源码 | 文件源码
def __unpack__(self, type_, buf, _size=None):
        fmt = self.endian + type_
        size = struct.calcsize(fmt) if _size is None else _size
            unpacked = struct.unpack(fmt, buf[:size]), buf[size:]
        except struct.error as exc:
            raise_from(UnpackError("Unable to unpack structure"), exc)
            return unpacked
项目:estreamer    作者:spohara79    | 项目源码 | 文件源码
def __len__(self):
        fmt = ''
        more_len = 0
        for field in self._field_names_:
            fmt_ = self._field_format_[field]
            if isinstance(fmt_, StructArray):
                fmt += fmt_.get_struct()
            elif isinstance(fmt_, type) and issubclass(fmt_, Struct):
                more_len = len(fmt)
            elif fmt_ != 'variable':
                fmt += fmt_
        hdr_len = struct.calcsize(fmt) + more_len
        if hasattr(self, 'data'):
            hdr_len += len(
        return hdr_len
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _get_launcher(self, kind):
            if struct.calcsize('P') == 8:   # 64-bit
                bits = '64'
                bits = '32'
            name = '%s%s.exe' % (kind, bits)
            # Issue 31: don't hardcode an absolute package name, but
            # determine it relative to the current package
            distlib_package = __name__.rsplit('.', 1)[0]
            result = finder(distlib_package).find(name).bytes
            return result

    # Public API follows
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def is_64bit():
    return struct.calcsize("P") == 8
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def pack_main_header_keywords(hdr):
    Replace the 'keywords' field of the given BLUE header dictionary
    with the X-Midas packed (str) form of the main header keywords.
    The order of the key value pairs is indeterminate.

    In packed form, keys are separated from values by a single '=',
    key-value pairs are separated from one another by a single '\0'
    and all values are stringized using str(). Hence, each key value
    pair takes up keylength + stringized value length + 2 characters.
    If the resulting packed string is longer than the max allowed for
    the BLUE header main keywords (96 characters) the string is
    truncated.  If no 'keywords' field is present or if it is an empty
    dict, then the keyword fields are updated to represent an empty
    main header keywords section.
    keydict = hdr.get('keywords', {})
    if keydict:
        hdr['keywords'] = '\0'.join([k + '=' + str(v)
                                     for k,v in keydict.items()]) + '\0'
        hdr['keylength'] = min(len(hdr['keywords']),
        if hdr['keylength'] < len(hdr['keywords']):
            print "WARNING: Main header keywords truncated"
        hdr['keywords'] = '\0'
        hdr['keylength'] = 0
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def _getCaps(self):
        bits = struct.calcsize(self.datatype) * 8
        if self.datatype in ('f', 'd'):
            return gst.Caps('audio/x-raw-float,endianness=%s,width=%d,rate=%d,channels=1' % (self.ENDIANNESS, bits, self.sample_rate))
            # In struct module, unsigned types are uppercase, signed are lower
            if self.datatype.isupper():
                signed = 'false'
                signed = 'true'
            return gst.Caps('audio/x-raw-int,endianness=%s,signed=%s,width=%d,depth=%d,rate=%d,channels=1' % (self.ENDIANNESS, signed, bits, bits, self.sample_rate))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __init__(self, name, PortTypeClass, PortTransferType=TRANSFER_TYPE, logger=None, noData=None ): = name
        self.logger = logger
        self.PortType = PortTypeClass
        self.outConnections = {} # key=connectionId,  value=port
        self.stats = OutStats(, PortTransferType )
        self.port_lock = threading.Lock()
        self.sriDict = {} # key=streamID  value=SriMapStruct
        self.filterTable = []
        if noData==None:
           self.noData = []
           self.noData = noData

        # Determine maximum transfer size in advance
        self.byteSize = 1
        if self.PortTransferType:
            self.byteSize = struct.calcsize(PortTransferType)
        # Multiply by some number < 1 to leave some margin for the CORBA header
        self.maxSamplesPerPush = int(MAX_TRANSFER_BYTES*.9)/self.byteSize
        # Make sure maxSamplesPerPush is even so that complex data case is handled properly
        if self.maxSamplesPerPush%2 != 0:
            self.maxSamplesPerPush = self.maxSamplesPerPush - 1

        if self.logger == None:
            self.logger = logging.getLogger("redhawk.bulkio.outport."+name)
        if self.logger:
            self.logger.debug('bulkio::OutPort CTOR port:' + str(
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __init__(self, name, element_type ):
        self.enabled = True
        self.bitSize = struct.calcsize(element_type) * 8
        self.historyWindow = 10
        self.receivedStatistics = {} = name
        self.receivedStatistics_idx = {}
        self.activeStreamIDs = []
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __init__(self, name, PortTypeClass, PortTransferType=TRANSFER_TYPE, logger=None, noData=None ): = name
        self.logger = logger
        self.PortType = PortTypeClass
        self.outConnections = {} # key=connectionId,  value=port
        self.stats = OutStats(, PortTransferType )
        self.port_lock = threading.Lock()
        self.sriDict = {} # key=streamID  value=SriMapStruct
        self.filterTable = []
        if noData==None:
           self.noData = []
           self.noData = noData

        # Determine maximum transfer size in advance
        self.byteSize = 1
        if self.PortTransferType:
            self.byteSize = struct.calcsize(PortTransferType)
        # Multiply by some number < 1 to leave some margin for the CORBA header
        self.maxSamplesPerPush = int(MAX_TRANSFER_BYTES*.9)/self.byteSize
        # Make sure maxSamplesPerPush is even so that complex data case is handled properly
        if self.maxSamplesPerPush%2 != 0:
            self.maxSamplesPerPush = self.maxSamplesPerPush - 1

        if self.logger == None:
            self.logger = logging.getLogger("redhawk.bulkio.outport."+name)
        if self.logger:
            self.logger.debug('bulkio::OutPort CTOR port:' + str(
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __init__(self, name, element_type ):
        self.enabled = True
        self.flushTime = None
        self.historyWindow = 10
        self.receivedStatistics = [] = name
        self.receivedStatistics_idx = 0
        self.bitSize = struct.calcsize(element_type) * 8
        self.activeStreamIDs = []
        for i in range(self.historyWindow):
        self.runningStats = None
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __init__(self, name, element_type ):
        self.enabled = True
        self.bitSize = struct.calcsize(element_type) * 8
        self.historyWindow = 10
        self.receivedStatistics = {} = name
        self.receivedStatistics_idx = {}
        self.activeStreamIDs = []
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def FileHeader(self):
        """Return the per-file header as a string."""
        dt = self.date_time
        dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
        dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
        if self.flag_bits & 0x08:
            # Set these to zero because we write them after the file data
            CRC = compress_size = file_size = 0
            CRC = self.CRC
            compress_size = self.compress_size
            file_size = self.file_size

        extra = self.extra

        if file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT:
            # File is larger than what fits into a 4 byte integer,
            # fall back to the ZIP64 extension
            fmt = '<HHQQ'
            extra = extra + struct.pack(fmt,
                    1, struct.calcsize(fmt)-4, file_size, compress_size)
            file_size = 0xffffffff
            compress_size = 0xffffffff
            self.extract_version = max(45, self.extract_version)
            self.create_version = max(45, self.extract_version)

        filename, flag_bits = self._encodeFilenameFlags()
        header = struct.pack(structFileHeader, stringFileHeader,
                 self.extract_version, self.reserved, flag_bits,
                 self.compress_type, dostime, dosdate, CRC,
                 compress_size, file_size,
                 len(filename), len(extra))
        return header + filename + extra
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _findSoname_ldconfig(name):
            import struct
            if struct.calcsize('l') == 4:
                machine = os.uname()[4] + '-32'
                machine = os.uname()[4] + '-64'
            mach_map = {
                'x86_64-64': 'libc6,x86-64',
                'ppc64-64': 'libc6,64bit',
                'sparc64-64': 'libc6,64bit',
                's390x-64': 'libc6,64bit',
                'ia64-64': 'libc6,IA-64',
            abi_type = mach_map.get(machine, 'libc6')

            # XXX assuming GLIBC's ldconfig (with option -p)
            expr = r'(\S+)\s+\((%s(?:, OS ABI:[^\)]*)?)\)[^/]*(/[^\(\)\s]*lib%s\.[^\(\)\s]*)' \
                   % (abi_type, re.escape(name))
            f = os.popen('/sbin/ldconfig -p 2>/dev/null')
                data =
            res =, data)
            if not res:
                return None
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _check_size(typ, typecode=None):
    # Check if sizeof(ctypes_type) against struct.calcsize.  This
    # should protect somewhat against a misconfigured libffi.
    from struct import calcsize
    if typecode is None:
        # Most _type_ codes are the same as used in struct
        typecode = typ._type_
    actual, required = sizeof(typ), calcsize(typecode)
    if actual != required:
        raise SystemError("sizeof(%s) wrong: %d instead of %d" % \
                          (typ, actual, required))