Python struct 模块,Struct() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用struct.Struct()

项目:pyall    作者:pktrigg    | 项目源码 | 文件源码
def read(self):
        self.fileptr.seek(self.offset, 0)
        rec_fmt = '=LBBHLLHHlBBH'
        rec_len = struct.calcsize(rec_fmt)
        rec_unpack = struct.Struct(rec_fmt).unpack_from
        s = rec_unpack(self.fileptr.read(rec_len))

        self.STX             = s[1]
        self.typeOfDatagram  = chr(s[2])
        self.EMModel         = s[3]
        self.RecordDate      = s[4]
        self.Time            = float(s[5]/1000.0)
        self.Counter         = s[6]
        self.SerialNumber    = s[7]
        self.Height          = float (s[8] / float (100))
        self.HeightType      = s[9]

        # now read the footer
        self.ETX, self.checksum = readFooter(self.numberOfBytes, self.fileptr)

###############################################################################
项目:pyall    作者:pktrigg    | 项目源码 | 文件源码
def readFooter(numberOfBytes, fileptr):
        rec_fmt = '=BH'

        rec_len = struct.calcsize(rec_fmt)
        rec_unpack = struct.Struct(rec_fmt).unpack_from
        s = rec_unpack(fileptr.read(rec_len))
        ETX                = s[0]
        checksum           = s[1]
        # self.DatagramAsReceived = s[0].decode('utf-8').rstrip('\x00')
        # if numberOfBytes % 2 == 0:
        #     # skip the spare byte
        #     ETX                = s[2]
        #     checksum           = s[3]
        # else:        
        #     ETX                = s[1]
        #     checksum           = s[2]

        # #read any trailing bytes.  We have seen the need for this with some .all files.
        # if bytesRead < self.numberOfBytes:
        #     self.fileptr.read(int(self.numberOfBytes - bytesRead))

        return ETX, checksum

###############################################################################
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _85encode(b, chars, chars2, pad=False, foldnuls=False, foldspaces=False):
    # Helper function for a85encode and b85encode
    if not isinstance(b, bytes_types):
        b = memoryview(b).tobytes()

    padding = (-len(b)) % 4
    if padding:
        b = b + b'\0' * padding
    words = struct.Struct('!%dI' % (len(b) // 4)).unpack(b)

    chunks = [b'z' if foldnuls and not word else
              b'y' if foldspaces and word == 0x20202020 else
              (chars2[word // 614125] +
               chars2[word // 85 % 7225] +
               chars[word % 85])
              for word in words]

    if padding and not pad:
        if chunks[-1] == b'z':
            chunks[-1] = chars[0] * 5
        chunks[-1] = chunks[-1][:-padding]

    return b''.join(chunks)
项目:sharedbuffers    作者:jampp    | 项目源码 | 文件源码
def get_packer(self, obj):
        has_bitmap, none_bitmap, present_bitmap = self._get_bitmaps(obj)
        rv = self.packer_cache.get(present_bitmap)
        if rv is None:
            packer = struct.Struct("".join([
                self.bitmap_packer.format,
            ] + [
                self.slot_struct_types[slot]
                for i,slot in enumerate(self.slot_keys)
                if present_bitmap & (cython.cast(cython.ulonglong, 1) << i)
            ]))
            alignment = self.alignment
            size = packer.size
            padding = (size + alignment - 1) / alignment * alignment - size
            self.packer_cache[present_bitmap] = rv = (packer, padding)
        return rv
项目:sharedbuffers    作者:jampp    | 项目源码 | 文件源码
def get_unpacker(self, has_bitmap, none_bitmap):
        present_bitmap = has_bitmap & ~none_bitmap
        if self._last_unpacker is not None and present_bitmap == self._last_unpacker_bitmap:
            return self._last_unpacker
        rv = self.unpacker_cache.get(present_bitmap)
        if rv is None:
            pformat = "".join([
                self.slot_struct_types[slot]
                for i,slot in enumerate(self.slot_keys)
                if present_bitmap & (cython.cast(cython.ulonglong, 1) << i)
            ])
            unpacker = struct.Struct(pformat)
            alignment = self.alignment
            size = unpacker.size
            padding = (size + self.bitmap_size + alignment - 1) / alignment * alignment - size
            gfactory = GenericProxyClass(self.slot_keys, self.slot_types, present_bitmap, self.bitmap_size)
            rv = (unpacker, padding, pformat, gfactory)
            self.unpacker_cache[present_bitmap] = rv
        self._last_unpacker_bitmap = present_bitmap
        self._last_unpacker = rv
        return rv
项目:Parallel.GAMIT    作者:demiangomez    | 项目源码 | 文件源码
def parse_station_record(self, line):

        fieldnames = ['StationCode', 'StationName', 'DateStart', 'DateEnd', 'AntennaHeight', 'HeightCode', 'AntennaNorth', 'AntennaEast',
                      'ReceiverCode', 'ReceiverVers', 'ReceiverFirmware', 'ReceiverSerial', 'AntennaCode', 'RadomeCode', 'AntennaSerial']

        fieldwidths = (
        1, 6, 18, 19, 19, 9, 7, 9, 9, 22, 22, 7, 22, 17, 7, 20)  # negative widths represent ignored padding fields
        fmtstring = ' '.join('{}{}'.format(abs(fw), 'x' if fw < 0 else 's') for fw in fieldwidths)

        fieldstruct = struct.Struct(fmtstring)
        parse = fieldstruct.unpack_from

        if line[0] == ' ' and len(line) >= 77:
            record = dict(zip(fieldnames, map(str.strip, parse(line.ljust(fieldstruct.size))[1:])))
        else:
            return None

        # convert to datetime object
        DateStart, DateEnd = self.stninfodate2datetime(record['DateStart'], record['DateEnd'])
        record['DateStart'] = DateStart
        record['DateEnd'] = DateEnd
        record['StationCode'] = record['StationCode'].lower()

        return record
项目:centos-base-consul    作者:zeroc0d3lab    | 项目源码 | 文件源码
def __init__(self, cloexec=True, nonblock=True):
        self._init1, self._add_watch, self._rm_watch, self._read = load_inotify()
        flags = 0
        if cloexec:
            flags |= self.CLOEXEC
        if nonblock:
            flags |= self.NONBLOCK
        self._inotify_fd = self._init1(flags)
        if self._inotify_fd == -1:
            raise INotifyError(os.strerror(ctypes.get_errno()))

        self._buf = ctypes.create_string_buffer(5000)
        self.fenc = get_preferred_file_name_encoding()
        self.hdr = struct.Struct(b'iIII')
        # We keep a reference to os to prevent it from being deleted
        # during interpreter shutdown, which would lead to errors in the
        # __del__ method
        self.os = os
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def _85encode(b, chars, chars2, pad=False, foldnuls=False, foldspaces=False):
    # Helper function for a85encode and b85encode
    if not isinstance(b, bytes_types):
        b = memoryview(b).tobytes()

    padding = (-len(b)) % 4
    if padding:
        b = b + b'\0' * padding
    words = struct.Struct('!%dI' % (len(b) // 4)).unpack(b)

    chunks = [b'z' if foldnuls and not word else
              b'y' if foldspaces and word == 0x20202020 else
              (chars2[word // 614125] +
               chars2[word // 85 % 7225] +
               chars[word % 85])
              for word in words]

    if padding and not pad:
        if chunks[-1] == b'z':
            chunks[-1] = chars[0] * 5
        chunks[-1] = chunks[-1][:-padding]

    return b''.join(chunks)
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def _binary_write(filepath, faces):
    with open(filepath, 'wb') as data:
        fw = data.write
        # header
        # we write padding at header beginning to avoid to
        # call len(list(faces)) which may be expensive
        fw(struct.calcsize('<80sI') * b'\0')

        # 3 vertex == 9f
        pack = struct.Struct('<9f').pack

        # number of vertices written
        nb = 0

        for face in faces:
            # calculate face normal
            # write normal + vertexes + pad as attributes
            fw(struct.pack('<3f', *normal(*face)) + pack(*itertools.chain.from_iterable(face)))
            # attribute byte count (unused)
            fw(b'\0\0')
            nb += 1

        # header, with correct value now
        data.seek(0)
        fw(struct.pack('<80sI', _header_version().encode('ascii'), nb))
项目:pyall    作者:pktrigg    | 项目源码 | 文件源码
def read(self):
        self.fileptr.seek(self.offset, 0)
        rec_fmt = '=LBBHLLHHLLBBH'
        rec_len = struct.calcsize(rec_fmt)
        rec_unpack = struct.Struct(rec_fmt).unpack
        # bytesRead = rec_len
        s = rec_unpack(self.fileptr.read(rec_len))

        # self.numberOfBytes   = s[0]
        self.STX             = s[1]
        self.typeOfDatagram  = chr(s[2])
        self.EMModel         = s[3]
        self.RecordDate      = s[4]
        self.Time            = float(s[5]/1000.0)
        self.ClockCounter    = s[6]
        self.SerialNumber    = s[7]

        self.ExternalDate       = s[8]
        self.ExternalTime       = s[9]
        self.PPS                = s[10]
        self.ETX                = s[11]
        self.checksum           = s[12]


###############################################################################
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_unpack_from(self):
        test_string = b'abcd01234'
        fmt = '4s'
        s = struct.Struct(fmt)
        for cls in (bytes, bytearray):
            data = cls(test_string)
            self.assertEqual(s.unpack_from(data), (b'abcd',))
            self.assertEqual(s.unpack_from(data, 2), (b'cd01',))
            self.assertEqual(s.unpack_from(data, 4), (b'0123',))
            for i in range(6):
                self.assertEqual(s.unpack_from(data, i), (data[i:i+4],))
            for i in range(6, len(test_string) + 1):
                self.assertRaises(struct.error, s.unpack_from, data, i)
        for cls in (bytes, bytearray):
            data = cls(test_string)
            self.assertEqual(struct.unpack_from(fmt, data), (b'abcd',))
            self.assertEqual(struct.unpack_from(fmt, data, 2), (b'cd01',))
            self.assertEqual(struct.unpack_from(fmt, data, 4), (b'0123',))
            for i in range(6):
                self.assertEqual(struct.unpack_from(fmt, data, i), (data[i:i+4],))
            for i in range(6, len(test_string) + 1):
                self.assertRaises(struct.error, struct.unpack_from, fmt, data, i)
项目:news-for-good    作者:thecodinghub    | 项目源码 | 文件源码
def _85encode(b, chars, chars2, pad=False, foldnuls=False, foldspaces=False):
    # Helper function for a85encode and b85encode
    if not isinstance(b, bytes_types):
        b = memoryview(b).tobytes()

    padding = (-len(b)) % 4
    if padding:
        b = b + b'\0' * padding
    words = struct.Struct('!%dI' % (len(b) // 4)).unpack(b)

    chunks = [b'z' if foldnuls and not word else
              b'y' if foldspaces and word == 0x20202020 else
              (chars2[word // 614125] +
               chars2[word // 85 % 7225] +
               chars[word % 85])
              for word in words]

    if padding and not pad:
        if chunks[-1] == b'z':
            chunks[-1] = chars[0] * 5
        chunks[-1] = chunks[-1][:-padding]

    return b''.join(chunks)
项目:aiozk    作者:tipsi    | 项目源码 | 文件源码
def parse(cls, buff, offset):
        """
        Given a buffer and offset, returns the parsed value and new offset.

        Parses the ``size_primitive`` first to determine how many more bytes to
        consume to extract the value.
        """
        size, offset = cls.size_primitive.parse(buff, offset)
        if size == -1:
            return None, offset

        var_struct = struct.Struct("!%ds" % size)

        value = var_struct.unpack_from(buff, offset)[0]
        value = cls.parse_value(value)
        offset += var_struct.size

        return value, offset
项目:Peppy    作者:project-owner    | 项目源码 | 文件源码
def send_message(self, message):
        """ Send the message to client

        :param message: the message to send
        """
        output = self.request.wfile
        output.write(struct.Struct(">B").pack(129))

        length = len(message)
        if length <= 125: 
            output.write(struct.Struct(">B").pack(length))
        elif length >= 126 and length <= 65535:
            output.write(struct.Struct(">B").pack(126))
            output.write(struct.pack(">H", length))
        else:
            output.write(struct.Struct(">B").pack(127))
            output.write(struct.pack(">Q", length))

        output.write(message)
        logging.debug(message)
项目:Tencent_Cartoon_Download    作者:Fretice    | 项目源码 | 文件源码
def _85encode(b, chars, chars2, pad=False, foldnuls=False, foldspaces=False):
    # Helper function for a85encode and b85encode
    if not isinstance(b, bytes_types):
        b = memoryview(b).tobytes()

    padding = (-len(b)) % 4
    if padding:
        b = b + b'\0' * padding
    words = struct.Struct('!%dI' % (len(b) // 4)).unpack(b)

    chunks = [b'z' if foldnuls and not word else
              b'y' if foldspaces and word == 0x20202020 else
              (chars2[word // 614125] +
               chars2[word // 85 % 7225] +
               chars[word % 85])
              for word in words]

    if padding and not pad:
        if chunks[-1] == b'z':
            chunks[-1] = chars[0] * 5
        chunks[-1] = chunks[-1][:-padding]

    return b''.join(chunks)
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def _85encode(b, chars, chars2, pad=False, foldnuls=False, foldspaces=False):
    # Helper function for a85encode and b85encode
    if not isinstance(b, bytes_types):
        b = memoryview(b).tobytes()

    padding = (-len(b)) % 4
    if padding:
        b = b + b'\0' * padding
    words = struct.Struct('!%dI' % (len(b) // 4)).unpack(b)

    chunks = [b'z' if foldnuls and not word else
              b'y' if foldspaces and word == 0x20202020 else
              (chars2[word // 614125] +
               chars2[word // 85 % 7225] +
               chars[word % 85])
              for word in words]

    if padding and not pad:
        if chunks[-1] == b'z':
            chunks[-1] = chars[0] * 5
        chunks[-1] = chunks[-1][:-padding]

    return b''.join(chunks)
项目:options-screener    作者:dcwangmit01    | 项目源码 | 文件源码
def test_ceph_key(self, mock_urandom):
        result = utils.JinjaUtils.ceph_key()

        # First, decode the base64
        raw_result = base64.b64decode(result.encode('ascii'))

        # Decompose into a header and a key
        hdr_struct = struct.Struct('<hiih')
        header = raw_result[:hdr_struct.size]
        key = raw_result[hdr_struct.size:]

        # Interpret the header
        _type, _secs, _nanosecs, key_len = hdr_struct.unpack(header)
        assert key_len == len(key)

        # Verify that the key is what it should be
        assert key == b'0123456789012345'
项目:pnet    作者:vodik    | 项目源码 | 文件源码
def __new__(mcs, clsname, clsbases, clsdict):
        headers = clsdict.get('__header__', [])
        if headers:
            header_attrs, header_fmt = zip(*headers)
            header_format_order = clsdict.get('__byte_order__', '>')
            header_format = [header_format_order] + list(header_fmt)
            header_struct = struct.Struct(''.join(header_format))

            clsdict['__slots__'] = ('_fields', '_view', '_payload')
            clsdict['_header_fields'] = tuple(header_attrs)
            clsdict['_header_bytes_order'] = header_format_order
            clsdict['_header_struct'] = header_struct
            clsdict['_header_size'] = header_struct.size

        return type.__new__(mcs, clsname, clsbases, clsdict)
项目:kargo-aws-k8s    作者:net592    | 项目源码 | 文件源码
def b85decode(b):
        _b85dec = [None] * 256
        for i, c in enumerate(iterbytes(_b85alphabet)):
            _b85dec[c] = i

        padding = (-len(b)) % 5
        b = b + b'~' * padding
        out = []
        packI = struct.Struct('!I').pack
        for i in range(0, len(b), 5):
            chunk = b[i:i + 5]
            acc = 0
            try:
                for c in iterbytes(chunk):
                    acc = acc * 85 + _b85dec[c]
            except TypeError:
                for j, c in enumerate(iterbytes(chunk)):
                    if _b85dec[c] is None:
                        raise ValueError(
                            'bad base85 character at position %d' % (i + j)
                        )
                raise
            try:
                out.append(packI(acc))
            except struct.error:
                raise ValueError('base85 overflow in hunk starting at byte %d'
                                 % i)

        result = b''.join(out)
        if padding:
            result = result[:-padding]
        return result
项目:j3dview    作者:blank63    | 项目源码 | 文件源码
def __init__(self,format_string):
        self._struct = _Struct(format_string)
项目:sharedbuffers    作者:jampp    | 项目源码 | 文件源码
def _unpack_bytes_from_pybuffer(buf, offs, idmap):
    if idmap is not None and offs in idmap:
        return idmap[offs]

    if cython.compiled:
        try:
            buf = _likebuffer(buf)
            PyObject_GetBuffer(buf, cython.address(pybuf), PyBUF_SIMPLE)  # lint:ok
            rv = _unpack_bytes_from_cbuffer(cython.cast(cython.p_char, pybuf.buf), offs, pybuf.len, None)  # lint:ok
        finally:
            PyBuffer_Release(cython.address(pybuf))  # lint:ok
    else:
        hpacker = struct.Struct('=H')
        objlen = hpacker.unpack_from(buf, offs)[0]
        offs = int(offs)
        dataoffs = offs + hpacker.size
        compressed = (objlen & 0x8000) != 0
        if (objlen & 0x7FFF) == 0x7FFF:
            qpacker = struct.Struct('=HQ')
            objlen = qpacker.unpack_from(buf, offs)[1]
            dataoffs = offs + qpacker.size
        else:
            objlen = objlen & 0x7FFF
        rv = buffer(buf, dataoffs, objlen)
        if compressed:
            rv = lz4_decompress(rv)
        else:
            rv = bytes(rv)

    if idmap is not None:
        idmap[offs] = rv
    return rv
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def set_nomenclature(self):
        """
        As in get_nomenclature, but set the title of the file header
        in the file, encoded as a pascal string containing 
        15 characters and stored as 16 bytes of binary data.  
        """
        self.file.seek(0)
        title = 'DAC2 objects'
        st = struct.Struct( '<B15sH' )
        header_rec = [len(title), title, 18] # constant header
        header_chr = st.pack( *header_rec )
        self.header_size = len( header_chr )
        self.file.write( header_chr )
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def set_nomenclature(self):
        """
        As in get_nomenclature, but set the title of the file header
        in the file, encoded as a pascal string containing 
        15 characters and stored as 16 bytes of binary data.  
        """
        self.file.seek(0)
        title = 'DAC2 objects'
        st = struct.Struct( '<B15sH' )
        header_rec = [len(title), title, 18] # constant header
        header_chr = st.pack( *header_rec )
        self.header_size = len( header_chr )
        self.file.write( header_chr )
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def _make_packer(format_string):
    packer = struct.Struct(format_string)
    pack = packer.pack
    unpack = lambda s: packer.unpack(s)[0]
    return pack, unpack
项目:geekcloud    作者:Mr-Linus    | 项目源码 | 文件源码
def b85decode(b):
        _b85dec = [None] * 256
        for i, c in enumerate(iterbytes(_b85alphabet)):
            _b85dec[c] = i

        padding = (-len(b)) % 5
        b = b + b'~' * padding
        out = []
        packI = struct.Struct('!I').pack
        for i in range(0, len(b), 5):
            chunk = b[i:i + 5]
            acc = 0
            try:
                for c in iterbytes(chunk):
                    acc = acc * 85 + _b85dec[c]
            except TypeError:
                for j, c in enumerate(iterbytes(chunk)):
                    if _b85dec[c] is None:
                        raise ValueError(
                            'bad base85 character at position %d' % (i + j)
                        )
                raise
            try:
                out.append(packI(acc))
            except struct.error:
                raise ValueError('base85 overflow in hunk starting at byte %d'
                                 % i)

        result = b''.join(out)
        if padding:
            result = result[:-padding]
        return result
项目:tensorflow-study    作者:bob-chen    | 项目源码 | 文件源码
def b85decode(b):
        _b85dec = [None] * 256
        for i, c in enumerate(iterbytes(_b85alphabet)):
            _b85dec[c] = i

        padding = (-len(b)) % 5
        b = b + b'~' * padding
        out = []
        packI = struct.Struct('!I').pack
        for i in range(0, len(b), 5):
            chunk = b[i:i + 5]
            acc = 0
            try:
                for c in iterbytes(chunk):
                    acc = acc * 85 + _b85dec[c]
            except TypeError:
                for j, c in enumerate(iterbytes(chunk)):
                    if _b85dec[c] is None:
                        raise ValueError(
                            'bad base85 character at position %d' % (i + j)
                        )
                raise
            try:
                out.append(packI(acc))
            except struct.error:
                raise ValueError('base85 overflow in hunk starting at byte %d'
                                 % i)

        result = b''.join(out)
        if padding:
            result = result[:-padding]
        return result
项目:wiktionary-translations-parser    作者:elfxiong    | 项目源码 | 文件源码
def __init__(self, rich_format):
        self.rich_fmt = rich_format
        self.fmt = format_from_rich(rich_format)
        self.compiled = struct.Struct(self.fmt)
        self.size = self.compiled.size
项目:wiktionary-translations-parser    作者:elfxiong    | 项目源码 | 文件源码
def __init__(self, rich_format):
        self.rich_fmt = rich_format
        self.fmt = format_from_rich(rich_format)
        self.compiled = struct.Struct(self.fmt)
        self.size = self.compiled.size
项目:machines-learning-movement    作者:katsully    | 项目源码 | 文件源码
def b85decode(b):
        _b85dec = [None] * 256
        for i, c in enumerate(iterbytes(_b85alphabet)):
            _b85dec[c] = i

        padding = (-len(b)) % 5
        b = b + b'~' * padding
        out = []
        packI = struct.Struct('!I').pack
        for i in range(0, len(b), 5):
            chunk = b[i:i + 5]
            acc = 0
            try:
                for c in iterbytes(chunk):
                    acc = acc * 85 + _b85dec[c]
            except TypeError:
                for j, c in enumerate(iterbytes(chunk)):
                    if _b85dec[c] is None:
                        raise ValueError(
                            'bad base85 character at position %d' % (i + j)
                        )
                raise
            try:
                out.append(packI(acc))
            except struct.error:
                raise ValueError('base85 overflow in hunk starting at byte %d'
                                 % i)

        result = b''.join(out)
        if padding:
            result = result[:-padding]
        return result
项目:AlmostSignificant    作者:bartongroup    | 项目源码 | 文件源码
def b85decode(b):
        _b85dec = [None] * 256
        for i, c in enumerate(iterbytes(_b85alphabet)):
            _b85dec[c] = i

        padding = (-len(b)) % 5
        b = b + b'~' * padding
        out = []
        packI = struct.Struct('!I').pack
        for i in range(0, len(b), 5):
            chunk = b[i:i + 5]
            acc = 0
            try:
                for c in iterbytes(chunk):
                    acc = acc * 85 + _b85dec[c]
            except TypeError:
                for j, c in enumerate(iterbytes(chunk)):
                    if _b85dec[c] is None:
                        raise ValueError(
                            'bad base85 character at position %d' % (i + j)
                        )
                raise
            try:
                out.append(packI(acc))
            except struct.error:
                raise ValueError('base85 overflow in hunk starting at byte %d'
                                 % i)

        result = b''.join(out)
        if padding:
            result = result[:-padding]
        return result
项目:Wall-EEG    作者:neurotechuoft    | 项目源码 | 文件源码
def __call__(self, sample, as_string=False):
        values = sample.channel_data
        # save sockets that are closed to remove them later on
        outdated_list = []
        for sock in self.CONNECTION_LIST:
            # If one error should happen, we remove socket from the list
            try:
                if as_string:
                    sock.send(str(values) + "\n")
                else:
                    nb_channels = len(values)
                    # format for binary data, network endian (big) and float (float32)
                    packer = struct.Struct('!%sf' % nb_channels)
                    # convert values to bytes
                    packed_data = packer.pack(*values)
                    sock.send(packed_data)
                    # TODO: should check if the correct number of bytes passed through
            except:
                # sometimes (always?) it's only during the second write to a close socket that an error is raised?
                print "Something bad happened, will close socket"
                outdated_list.append(sock)
        # now we are outside of the main list, it's time to remove outdated sockets, if any
        for bad_sock in outdated_list:
            print "Removing socket..."
            self.CONNECTION_LIST.remove(bad_sock)
            # not very costly to be polite
            bad_sock.close()
项目:warhexer    作者:sudasana    | 项目源码 | 文件源码
def blit(self, dest, fill_fore=True, fill_back=True):
        # use libtcod's "fill" functions to write the buffer to a console.
        if (console_get_width(dest) != self.width or
            console_get_height(dest) != self.height):
            raise ValueError('ConsoleBuffer.blit: Destination console has an incorrect size.')

        s = struct.Struct('%di' % len(self.back_r))

        if fill_back:
            _lib.TCOD_console_fill_background(dest, (c_int * len(self.back_r))(*self.back_r), (c_int * len(self.back_g))(*self.back_g), (c_int * len(self.back_b))(*self.back_b))

        if fill_fore:
            _lib.TCOD_console_fill_foreground(dest, (c_int * len(self.fore_r))(*self.fore_r), (c_int * len(self.fore_g))(*self.fore_g), (c_int * len(self.fore_b))(*self.fore_b))
            _lib.TCOD_console_fill_char(dest, (c_int * len(self.char))(*self.char))
项目:nstock    作者:ybenitezf    | 项目源码 | 文件源码
def __setstate__(self, d):
        self.__dict__.update(d)
        self._struct = struct.Struct(">" + str(self.sortable_typecode))
        if "min_value" not in d:
            d["min_value"], d["max_value"] = self._min_max()
项目:nstock    作者:ybenitezf    | 项目源码 | 文件源码
def __init__(self, dbfile, basepos, length, doccount, fixedlen):
            self._dbfile = dbfile
            self._basepos = basepos
            self._doccount = doccount
            self._fixedlen = fixedlen

            self._typecode = chr(dbfile.get_byte(basepos + length - 1))

            st = struct.Struct("!" + self._typecode)
            self._unpack = st.unpack
            self._itemsize = st.size

            dbfile.seek(basepos + doccount * self._itemsize)
            self._uniques = self._read_uniques()
项目:nstock    作者:ybenitezf    | 项目源码 | 文件源码
def __init__(self, dbfile, typecode, default):
            self._dbfile = dbfile
            self._pack = struct.Struct("!" + typecode).pack
            self._default = default
            self._defaultbytes = self._pack(default)
            self._fixedlen = struct.calcsize(typecode)
            self._count = 0
项目:nstock    作者:ybenitezf    | 项目源码 | 文件源码
def __init__(self, dbfile, basepos, length, doccount, typecode,
                     default):
            self._dbfile = dbfile
            self._basepos = basepos
            self._doccount = doccount
            self._default = default
            self._reverse = False

            self._typecode = typecode
            self._unpack = struct.Struct("!" + typecode).unpack
            self._defaultbytes = struct.pack("!" + typecode, default)
            self._fixedlen = struct.calcsize(typecode)
            self._count = length // self._fixedlen
项目:nstock    作者:ybenitezf    | 项目源码 | 文件源码
def __init__(self, dbfile, spec, default):
            self._dbfile = dbfile
            self._struct = struct.Struct(spec)
            self._fixedlen = self._struct.size
            self._default = default
            self._defaultbytes = self._struct.pack(*default)
            self._count = 0
项目:nstock    作者:ybenitezf    | 项目源码 | 文件源码
def __init__(self, dbfile, basepos, length, doccount, spec, default):
            self._dbfile = dbfile
            self._basepos = basepos
            self._doccount = doccount
            self._struct = struct.Struct(spec)
            self._fixedlen = self._struct.size
            self._default = default
            self._defaultbytes = self._struct.pack(*default)
            self._count = length // self._fixedlen
项目:nstock    作者:ybenitezf    | 项目源码 | 文件源码
def __repr__(self):
            return "<Struct.Reader>"
项目:Parallel.GAMIT    作者:demiangomez    | 项目源码 | 文件源码
def read_fields(self, line, data, format_tuple):

        # create the parser object
        formatstr = re.sub(r'\..', '',' '.join(format_tuple).replace('%', '').replace('f', 's').replace('i', 's').replace('-', ''))

        fs = struct.Struct(formatstr)
        parse = fs.unpack_from

        if len(data) < fs.size:
            # line too short, add padding zeros
            f = '%-' + str(fs.size) + 's'
            data = f % line
        elif len(data) > fs.size:
            # line too long! cut
            data = line[0:fs.size]

        fields = list(parse(data))

        # convert each element in the list to float if necessary
        for i, field in enumerate(fields):
            if 'f' in format_tuple[i]:
                try:
                    fields[i] = float(fields[i])
                except ValueError:
                    # invalid number in the field!, replace with something harmless
                    fields[i] = float(2.11)
            elif 'i' in format_tuple[i]:
                try:
                    fields[i] = int(fields[i])
                except ValueError:
                    # invalid number in the field!, replace with something harmless
                    fields[i] = int(1)
            elif 's' in format_tuple[i]:
                fields[i] = fields[i].strip()

        return fields
项目:Parallel.GAMIT    作者:demiangomez    | 项目源码 | 文件源码
def get_firstobs(self):

        fs = struct.Struct('1s2s1s2s1s2s1s2s1s2s11s2s1s3s')
        parse = fs.unpack_from

        date = None
        with open(self.rinex_path,'r') as fileio:

            found = False
            for line in fileio:
                if 'END OF HEADER' in line:
                    found = True
                    break

            if found:
                skip = 0
                for line in fileio:
                    if skip == 0:
                        fields = list(parse(line))

                        if int(fields[12]) <= 1: # OK FLAG
                            # read first observation
                            year = int(fields[1])
                            month = int(fields[3])
                            day = int(fields[5])
                            hour = int(fields[7])
                            minute = int(fields[9])
                            second = float(fields[10])

                            date = pyDate.Date(year=year, month=month, day=day, hour=hour, minute=minute, second=second)
                            break
                        elif int(fields[12]) > 1:
                            # event, skip lines indicated in next field
                            skip = int(fields[13])
                    else:
                        skip -= 1

        return date
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def __init__(self, n, r, p):
        # store config
        self.n = n
        self.r = r
        self.p = p
        self.smix_bytes = r << 7  # num bytes in smix input - 2*r*16*4
        self.iv_bytes = self.smix_bytes * p
        self.bmix_len = bmix_len = r << 5  # length of bmix block list - 32*r integers
        self.bmix_half_len = r << 4
        assert struct.calcsize("I") == 4
        self.bmix_struct = struct.Struct("<" + str(bmix_len) + "I")

        # use optimized bmix for certain cases
        if r == 1:
            self.bmix = self._bmix_1

        # pick best integerify function - integerify(bmix_block) should
        # take last 64 bytes of block and return a little-endian integer.
        # since it's immediately converted % n, we only have to extract
        # the first 32 bytes if n < 2**32 - which due to the current
        # internal representation, is already unpacked as a 32-bit int.
        if n <= 0xFFFFffff:
            integerify = operator.itemgetter(-16)
        else:
            assert n <= 0xFFFFffffFFFFffff
            ig1 = operator.itemgetter(-16)
            ig2 = operator.itemgetter(-17)
            def integerify(X):
                return ig1(X) | (ig2(X)<<32)
        self.integerify = integerify

    #=================================================================
    # frontend
    #=================================================================
项目:Twitter-and-IMDB-Sentimental-Analytics    作者:abhinandanramesh    | 项目源码 | 文件源码
def b85decode(b):
        _b85dec = [None] * 256
        for i, c in enumerate(iterbytes(_b85alphabet)):
            _b85dec[c] = i

        padding = (-len(b)) % 5
        b = b + b'~' * padding
        out = []
        packI = struct.Struct('!I').pack
        for i in range(0, len(b), 5):
            chunk = b[i:i + 5]
            acc = 0
            try:
                for c in iterbytes(chunk):
                    acc = acc * 85 + _b85dec[c]
            except TypeError:
                for j, c in enumerate(iterbytes(chunk)):
                    if _b85dec[c] is None:
                        raise ValueError(
                            'bad base85 character at position %d' % (i + j)
                        )
                raise
            try:
                out.append(packI(acc))
            except struct.error:
                raise ValueError('base85 overflow in hunk starting at byte %d'
                                 % i)

        result = b''.join(out)
        if padding:
            result = result[:-padding]
        return result
项目:uecda-pyclient    作者:Hironsan    | 项目源码 | 文件源码
def recv_int(self):
        unpacked_value = self._recv_msg(byte_length=4)
        s = struct.Struct(self.BINARY_INT)
        integer = s.unpack(unpacked_value)
        return integer[0]
项目:uecda-pyclient    作者:Hironsan    | 项目源码 | 文件源码
def recv_table(self):
        unpacked_value = self._recv_msg(byte_length=480)
        s = struct.Struct(self.BINARY_TABLE)
        ls = s.unpack(unpacked_value)
        table = [ls[15 * i: 15 * (i + 1)][:] for i in range(8)]  # 8x15???????
        return table
项目:uecda-pyclient    作者:Hironsan    | 项目源码 | 文件源码
def send_table(self, table):
        ls = [item for inner in table for item in inner]  # 2??????1?????
        s = struct.Struct(self.BINARY_TABLE)
        packed_value = s.pack(*ls)
        self._send_msg(packed_value)
项目:pgadmin4_installer    作者:HuasoFoundries    | 项目源码 | 文件源码
def b85decode(b):
        _b85dec = [None] * 256
        for i, c in enumerate(iterbytes(_b85alphabet)):
            _b85dec[c] = i

        padding = (-len(b)) % 5
        b = b + b'~' * padding
        out = []
        packI = struct.Struct('!I').pack
        for i in range(0, len(b), 5):
            chunk = b[i:i + 5]
            acc = 0
            try:
                for c in iterbytes(chunk):
                    acc = acc * 85 + _b85dec[c]
            except TypeError:
                for j, c in enumerate(iterbytes(chunk)):
                    if _b85dec[c] is None:
                        raise ValueError(
                            'bad base85 character at position %d' % (i + j)
                        )
                raise
            try:
                out.append(packI(acc))
            except struct.error:
                raise ValueError('base85 overflow in hunk starting at byte %d'
                                 % i)

        result = b''.join(out)
        if padding:
            result = result[:-padding]
        return result
项目:gatekeeper    作者:otto-de    | 项目源码 | 文件源码
def b85decode(b):
        _b85dec = [None] * 256
        for i, c in enumerate(iterbytes(_b85alphabet)):
            _b85dec[c] = i

        padding = (-len(b)) % 5
        b = b + b'~' * padding
        out = []
        packI = struct.Struct('!I').pack
        for i in range(0, len(b), 5):
            chunk = b[i:i + 5]
            acc = 0
            try:
                for c in iterbytes(chunk):
                    acc = acc * 85 + _b85dec[c]
            except TypeError:
                for j, c in enumerate(iterbytes(chunk)):
                    if _b85dec[c] is None:
                        raise ValueError(
                            'bad base85 character at position %d' % (i + j)
                        )
                raise
            try:
                out.append(packI(acc))
            except struct.error:
                raise ValueError('base85 overflow in hunk starting at byte %d'
                                 % i)

        result = b''.join(out)
        if padding:
            result = result[:-padding]
        return result
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def read_struct(self, fmt):
        s = struct.Struct(fmt)
        result = s.unpack_from(self._data, self._position)
        self._position += s.size
        return result
项目:warriorframework    作者:warriorframework    | 项目源码 | 文件源码
def b85decode(b):
        _b85dec = [None] * 256
        for i, c in enumerate(iterbytes(_b85alphabet)):
            _b85dec[c] = i

        padding = (-len(b)) % 5
        b = b + b'~' * padding
        out = []
        packI = struct.Struct('!I').pack
        for i in range(0, len(b), 5):
            chunk = b[i:i + 5]
            acc = 0
            try:
                for c in iterbytes(chunk):
                    acc = acc * 85 + _b85dec[c]
            except TypeError:
                for j, c in enumerate(iterbytes(chunk)):
                    if _b85dec[c] is None:
                        raise ValueError(
                            'bad base85 character at position %d' % (i + j)
                        )
                raise
            try:
                out.append(packI(acc))
            except struct.error:
                raise ValueError('base85 overflow in hunk starting at byte %d'
                                 % i)

        result = b''.join(out)
        if padding:
            result = result[:-padding]
        return result