Python struct 模块,unpack_from() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用struct.unpack_from()

项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _parseiesubel_(info,f=_iesubel_):
    """
     parse a variable length info element sub element
     :param info: packed string, next element starts at index 0
     :param f: function to apply to each sub element for further parsing
     :returns: list of parsed subelements
    """
    opt = []
    offset = 0
    while len(info) >= 2: # may be flags (0-octet subelements)
        sid,slen = struct.unpack_from('=2B',info,offset)
        opt.append((sid,f(info[offset+2:offset+2+slen],sid)))
        offset += 2 + slen
    return opt

#### OPTIONAL SUBELEMENTS -> the sub element id and length have been stripped
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubeltfsreq_(s,sid):
    """ :returns: parsed tfs request subelement """
    ret = s
    if sid == std.EID_TFS_SUBELEMENT_TFS:
        # there are one or more tclas elements folled by an option tclas
        # processing element
        ret = {}
        while s:
            eid,tlen = struct.unpack_from('=2B',s)
            if eid == std.EID_TCLAS:
                if not 'tclas' in ret: ret['tclas'] = []
                ret['tclas'].append(_parseie_(std.EID_TCLAS,s[:tlen]))
                s = s[2+tlen:]
            elif eid == std.EID_TCLAS_PRO:
                s['tclas-pro'] = _parseie_(std.EID_TCLAS_PRO,ret)
                # could use a break here but want to make sure
                # there are not hanging elements
                s = s[3:]
    elif sid == std.EID_TFS_SUBELEMENT_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# MSMT Pilot subelements Std Table 8-117
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelmsmtreqstasta_(s,sid):
    """ :returns: parsed subelement of type sta request for sta counters in msmt request """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_STA_RPT:
        # Std Fig. 8-117
        cnt,to,t = struct.unpack_from('=I2H',s)
        ts = s[8:]
        ret = {'msmt-cnt':cnt,
               'trigger-timeout':to,
               'sta-cntr-trigger-cond':_stacntrtriggerconds_(t),
               'thresholds':{}}

        # optional count fields are 4-bytes assuming they are appending in order
        for thresh in ['fail','fcs-error','mult-retry','dup','rts-fail','ack-fail','retry-cnt']:
            if ret['sta-cntr-trigger-cond'][thresh]:
                ret['thresholds'][thresh] = struct.unpack_from('=I',ts)[0]
                ts = ts[4:]
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_STA_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# STA Counter Trigger Conditions Std Fig. 8-118
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelmsmtreqstarsna_(s,sid):
    """ :returns: parsed subelement of type sta request for qos counters in msmt request """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_STA_RPT:
        # Std Fig. 8-121
        cnt,to,t = struct.unpack_from('=I2H',s)
        ts = s[8:]
        ret = {'msmt-cnt':cnt,
               'trigger-timeout':to,
               'rsna-cntr-trigger-cond':_rsnacntrtriggerconds_(t),
               'thresholds':{}}

        # optional count fields are 4-bytes assuming they are appending in order
        for thresh in ['cmacicv-err','cmarc-replay','robust-ccmp-replay','tkipicv-err','tkip-replay','ccmp-decrypt','ccmp-replay']:
            if ret['sta-cntr-trigger-cond'][thresh]:
                ret['thresholds'][thresh] = struct.unpack_from('=I',ts)[0]
                ts = ts[4:]
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_STA_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# RSNA Counter Trigger Conditions Std Fig. 8-122
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelmsmtreqtx_(s,sid):
    """ :returns: parsed tx optional subfield """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_TX_RPT:
        c,ae,ce,d,m,to = struct.unpack_from('=6B',s)
        ret = {'trigger-cond':_eidmsmtreqtxtrigger_(c),
               'avg-err-thresh':ae,
               'cons-err-thresh':ce,
               'delay-thresh':_eidmsmtreqtxdelay_(d),
               'msmt-cnt':m,
               'trigger-timeout':to}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_TX_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# TX TRIGGER CONDITION Std Fig. 8-132
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelmsmtrptframe_(s,sid):
    """ :returns: parsed frame subelement """
    ret = s
    if sid == std.EID_MSMT_RPT_FRAME_CNT_RPT:
        # Fig 8-151, 8-152
        ret = []
        n = len(s)/19
        for i in xrange(n):
            vs = struct.unpack_from('=17BH',s,i*19)
            ent = {'tx-addr':_hwaddr_(vs[0:6]),
                   'bssid':_hwaddr_(vs[6:12]),
                   'phy-type':vs[12],
                   'avg-rcpi':vs[13],
                   'last-rsni':vs[14],
                   'last-rcpi':vs[15],
                   'antenna-id':vs[16],
                   'frmae-cnt':vs[17]}
            ret.append(ent)
    elif sid == std.EID_MSMT_RPT_FRAME_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# MSMT Report subelements for Type STA statistics Std Table 8-89
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelmsmtrptlci_(s,sid):
    """ :returns: parsed LCI optional subelement """
    ret = s
    if sid == std.EID_MSMT_RPT_LCI_AZIMUTH:
        ret = {
            'azimuth-rpt':_iesubelmsmtrptlicazimuth_(struct.unpack_from('=H',s)[0])
        }
    elif sid == std.EID_MSMT_RPT_LCI_ORIGIN:
        ret = {'originator':_hwaddr_(struct.unpack('=6B',s))}
    elif sid == std.EID_MSMT_RPT_LCI_TARGET:
        ret = {'target':_hwaddr_(struct.unpack('=6B',s))}
    elif sid == std.EID_MSMT_RPT_LCI_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# MSMT Report->Azimuth Report fields Std Fig. 8-164
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelevreqtransistion_(s,sid):
    """ :returns: parsed subelements of type transistion in event request """
    ret = s
    if sid == std.EVENT_REQUEST_TYPE_TRANSITION_TARGET:
        ret = {'tgt-bssid':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EVENT_REQUEST_TYPE_TRANSITION_SOURCE:
        ret = {'src-bssid':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EVENT_REQUEST_TYPE_TRANSITION_TIME_TH:
        ret = {'trans-time-threshold':struct.unpack_from('=H',s)[0]}
    elif sid == std.EVENT_REQUEST_TYPE_TRANSITION_RESULT:
        v = struct.unpack_from('=B',s)[0]
        ret = {'match-val':_eidevreqsubelmatchval_(v)}
    elif sid == std.EVENT_REQUEST_TYPE_TRANSITION_FREQUENT:
        ft,t = struct.unpack_from('=BH',s)
        ret = {'freq-transistion-cnt-threahold':ft,'time-intv':t}
    return ret

# EVENT REQUEST match value for Type transition Std Fig 8-272
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _parseinfoeldse_(s):
    """ :returns: parsed dse location from packed string s """
    dse = {}
    for n,i,l,x,f in _EID_DSE_FIELDS_:
        if n == 'lat-int' or n == 'lon-int' or n == 'alt-int':
            dse[n] = int2s(s[i:i+l]+'\x00'*x)
        else:
            dse[n] = struct.unpack_from(f,s[i:i+l]+'\x00'*x)

    # last three fields are byte centric
    dei,op,chn = struct.unpack_from('=H2B',s,len(s)-4)
    dse['depend-enable-id'] = dei
    dse['op-class'] = op
    dse['ch-num'] = chn
    return dse

# HT Capabilities Info field Std Fig 8-249
# octest are defined as 1|1|2|1|1|1|1|2|1|1|1|1|1|1 see Fig 8-249 for names
# See also Std Table 8-124 for definition of sub fields
项目:otRebuilder    作者:Pal3love    | 项目源码 | 文件源码
def decompile(self, data, ttFont):
        sstruct.unpack2(Silf_hdr_format, data, self)
        if self.version >= 5.0:
            (data, self.scheme) = grUtils.decompress(data)
            sstruct.unpack2(Silf_hdr_format_3, data, self)
            base = sstruct.calcsize(Silf_hdr_format_3)
        elif self.version < 3.0:
            self.numSilf = struct.unpack('>H', data[4:6])
            self.scheme = 0
            self.compilerVersion = 0
            base = 8
        else:
            self.scheme = 0
            sstruct.unpack2(Silf_hdr_format_3, data, self)
            base = sstruct.calcsize(Silf_hdr_format_3)

        silfoffsets = struct.unpack_from(('>%dL' % self.numSilf), data[base:])
        for offset in silfoffsets:
            s = Silf()
            self.silfs.append(s)
            s.decompile(data[offset:], ttFont, self.version)
项目:sharedbuffers    作者:jampp    | 项目源码 | 文件源码
def register_schema(cls, typ, schema, typecode):
        if typecode is not None and typ in cls.TYPE_CODES:
            if cls.TYPE_CODES[typ] != typecode or cls.OBJ_PACKERS[typecode][2].schema is not schema:
                raise ValueError("Registering different types with same typecode %r: %r" % (cls.TYPE_CODES[typ], typ))
            return cls.OBJ_PACKERS[typecode][2]

        if isinstance(typ, mapped_object_with_schema):
            packable = typ
        else:
            packable = mapped_object_with_schema(schema)
        class SchemaBufferProxyProperty(GenericBufferProxyProperty):
            typ = packable
        if typecode is not None:
            cls.TYPE_CODES[typ] = typecode
            cls.TYPE_CODES[packable] = typecode
            cls.OBJ_PACKERS[typecode] = (packable.pack_into, packable.unpack_from, packable)
        TYPES[typ] = packable
        TYPES[packable] = packable
        PROXY_TYPES[typ] = SchemaBufferProxyProperty
        PROXY_TYPES[packable] = SchemaBufferProxyProperty
        return packable
项目:sharedbuffers    作者:jampp    | 项目源码 | 文件源码
def __get__(self, obj, klass):
        if obj is None:
            return self
        elif obj.none_bitmap & self.mask:
            return None
        if cython.compiled:
            pybuf = cython.address(obj.pybuf)
            buflen = pybuf.len
            assert (obj.offs + self.offs + cython.sizeof(cython.longlong)) <= buflen
            offs = obj.offs + cython.cast(cython.p_longlong,
                cython.cast(cython.p_uchar, pybuf.buf) + obj.offs + self.offs)[0]
            if obj.idmap is not None:
                poffs = offs # python version of offs
                rv = obj.idmap.get(poffs, poffs) # idmap cannot possibly hold "poffs" for that offset
                if rv is not poffs:
                    return rv
            assert offs + cython.sizeof(cython.ushort) <= buflen
        else:
            poffs = offs = obj.offs + struct.unpack_from('q', obj.buf, obj.offs + self.offs)[0]
        rv = self.typ.unpack_from(obj.buf, offs)
        if obj.idmap is not None:
            obj.idmap[poffs] = rv
        return rv
项目:sharedbuffers    作者:jampp    | 项目源码 | 文件源码
def getter(self, proxy_into = None, no_idmap = False):
        schema = self.schema
        proxy_class = self.proxy_class
        index = self.index
        idmap = self.idmap if not no_idmap else None
        buf = self.buf

        if proxy_class is not None:
            proxy_class_new = functools.partial(proxy_class.__new__, proxy_class)
        else:
            proxy_class_new = None

        @cython.locals(pos=int)
        def getter(pos):
            return schema.unpack_from(buf, index[pos], idmap, proxy_class_new, proxy_into)
        return getter
项目:sharedbuffers    作者:jampp    | 项目源码 | 文件源码
def iter_fast(self):
        # getter inlined
        schema = self.schema
        proxy_class = self.proxy_class
        index = self.index
        idmap = self.idmap
        buf = self.buf

        if proxy_class is not None:
            proxy_class_new = functools.partial(proxy_class.__new__, proxy_class)
        else:
            proxy_class_new = None

        proxy_into = schema.Proxy()
        for i in xrange(len(self)):
            yield schema.unpack_from(buf, index[i], idmap, proxy_class_new, proxy_into)
项目:photinia    作者:XoriieInpottn    | 项目源码 | 文件源码
def _extract_images(filename):
        """???????????????????

        :param filename: ?????
        :return: 4??numpy??[index, y, x, depth]? ???np.float32
        """
        images = []
        print('Extracting {}'.format(filename))
        with gzip.GzipFile(fileobj=open(filename, 'rb')) as f:
            buf = f.read()
            index = 0
            magic, num_images, rows, cols = struct.unpack_from('>IIII', buf, index)
            if magic != 2051:
                raise ValueError('Invalid magic number {} in MNIST image file: {}'.format(magic, filename))
            index += struct.calcsize('>IIII')
            for i in range(num_images):
                img = struct.unpack_from('>784B', buf, index)
                index += struct.calcsize('>784B')
                img = np.array(img, dtype=np.float32)
                # ????[0,255]???[0,1]
                img = np.multiply(img, 1.0 / 255.0)
                img = img.reshape(rows, cols, 1)
                images.append(img)
        return np.array(images, dtype=np.float32)
项目:photinia    作者:XoriieInpottn    | 项目源码 | 文件源码
def _extract_labels(filename, num_classes=10):
        """???????????????

        :param filename: ?????
        :param num_classes: ??one-hot??????????10?
        :return: 2??numpy??[index, num_classes]? ???np.float32
        """
        labels = []
        print('Extracting {}'.format(filename))
        with gzip.GzipFile(fileobj=open(filename, 'rb')) as f:
            buf = f.read()
            index = 0
            magic, num_labels = struct.unpack_from('>II', buf, index)
            if magic != 2049:
                raise ValueError('Invalid magic number {} in MNIST label file: {}'.format(magic, filename))
            index += struct.calcsize('>II')
            for i in range(num_labels):
                label = struct.unpack_from('>B', buf, index)
                index += struct.calcsize('>B')
                label_one_hot = np.zeros(num_classes, dtype=np.float32)
                label_one_hot[label[0]] = 1
                labels.append(label_one_hot)
        return np.array(labels, dtype=np.float32)
项目:tensorflow-study    作者:bob-chen    | 项目源码 | 文件源码
def read_image(filename):
    f = open(filename, 'rb')
    index = 0
    buf = f.read()
    f.close()

    magic, images, rows, columns = struct.unpack_from('>IIII' , buf , index)
    index += struct.calcsize('>IIII')
    for i in xrange(images):
        #for i in xrange(2000):
        image = Image.new('L', (columns, rows))

        for x in xrange(rows):
            for y in xrange(columns):
                image.putpixel((y, x), int(struct.unpack_from('>B', buf, index)[0]))
                index += struct.calcsize('>B')


        print 'save ' + str(i) + 'image'
        image.save('./test/' + str(i) + '.png')
项目:tensorflow-study    作者:bob-chen    | 项目源码 | 文件源码
def read_label(filename, saveFilename):
    f = open(filename, 'rb')
    index = 0
    buf = f.read()
    f.close()

    magic, labels = struct.unpack_from('>II' , buf , index)
    index += struct.calcsize('>II')

    labelArr = [0] * labels
    #labelArr = [0] * 2000

    for x in xrange(labels):
        #for x in xrange(2000):
        labelArr[x] = int(struct.unpack_from('>B', buf, index)[0])
        index += struct.calcsize('>B')

    save = open(saveFilename, 'w')

    save.write(','.join(map(lambda x: str(x), labelArr)))
    save.write('\n')

    save.close()
    print 'save labels success'
项目:pyUBoot    作者:molejar    | 项目源码 | 文件源码
def parse(self, data, offset=0):
        if len(data) < self.size:
            raise Exception("Header: Too small size of input data !")

        val = unpack_from(self.FORMAT, data, offset)
        self.MagicNumber  = val[0]
        header_crc        = val[1]
        self.TimeStamp    = val[2]
        self.DataSize     = val[3]
        self.LoadAddress  = val[4]
        self.EntryAddress = val[5]
        self.DataCRC      = val[6]
        self.OsType       = val[7]
        self.ArchType     = val[8]
        self.ImageType    = val[9]
        self.Compression  = val[10]
        self.Name         = val[11].decode('utf-8').strip('\0')

        if header_crc != self._crc():
            raise Exception("Header: Uncorrect CRC of input data !")

        return self.size
项目:pyUBoot    作者:molejar    | 项目源码 | 文件源码
def get_img_type(data, offset=0):
    """ Help function for extracting image fom raw data
    :param data: The raw data as byte array
    :param offset: The offset
    :return: Image type and offset where image start
    """
    while True:
        if (offset + Header.SIZE) > len(data):
            raise Exception("Error: Not an U-Boot image !")

        (header_mn, header_crc,) = unpack_from('!2L', data, offset)
        # Check the magic number if is U-Boot image
        if header_mn == 0x27051956:
            header = bytearray(data[offset:offset+Header.SIZE])
            header[4:8] = [0]*4
            if header_crc == CRC32(header):
                break
        offset += 4

    (image_type,) = unpack_from('B', data, offset + 30)

    return image_type, offset
项目:v2f.py    作者:concise    | 项目源码 | 文件源码
def uhid_parse_event_from_kernel(event):
    assert len(event) == 4380
    ev_type = struct.unpack_from('< L', event)[0]
    if ev_type == 2:
        return struct.unpack_from(UHID_EVENT_FMT_START, event)
    elif ev_type == 6:
        return struct.unpack_from(UHID_EVENT_FMT_OUTPUT, event)
    elif ev_type == 4:
        return struct.unpack_from(UHID_EVENT_FMT_OPEN, event)
    elif ev_type == 5:
        return struct.unpack_from(UHID_EVENT_FMT_CLOSE, event)
    elif ev_type == 3:
        return struct.unpack_from(UHID_EVENT_FMT_STOP, event)
    elif ev_type == 9:
        return struct.unpack_from(UHID_EVENT_FMT_GETRPRT, event)
    elif ev_type == 13:
        return struct.unpack_from(UHID_EVENT_FMT_SETRPRT, event)
    else:
        raise ValueError('unknown UHID event type from kernel %d' % ev_type)
项目:nrs    作者:isra17    | 项目源码 | 文件源码
def _find_firstheader(nsis_file):
    firstheader_offset = 0
    pos = 0
    while True:
        chunk = nsis_file.read(32768 if firstheader_offset else 512)
        if len(chunk) < _firstheader_pack.size:
            return None

        if firstheader_offset == 0:
            firstheader = FirstHeader._make(
                    _firstheader_pack.unpack_from(chunk))
            firstheader.header_offset = pos
            firstheader.data_offset = pos + _firstheader_pack.size

            if firstheader.siginfo == FH_SIG and \
                    firstheader.magics == FH_MAGICS:
                # NSIS header found.
                return firstheader

        pos += len(chunk)
项目:nrs    作者:isra17    | 项目源码 | 文件源码
def _extract_header(nsis_file, firstheader):
    inflated_data, data_size = inflate_header(nsis_file, firstheader.data_offset)
    header = Header._make(_header_pack.unpack_from(inflated_data))
    firstheader.header = header
    firstheader._raw_header = bytes(inflated_data)
    firstheader._raw_header_c_size = data_size

    # Parse the block headers.
    block_headers = []
    for i in range(BLOCKS_COUNT):
        header_offset = i * _blockheader_pack.size
        block_header = BlockHeader._make(_blockheader_pack.unpack_from(
            header.raw_blocks[header_offset:]))
        block_headers.append(block_header)
    header.blocks = block_headers

    # Parse the install types.
    header.install_types = [
            struct.unpack_from('<I', header.raw_install_types[i:])[0]
                for i in range(0, len(header.raw_install_types), 4)]

    return header
项目:mauzr    作者:eqrx    | 项目源码 | 文件源码
def _init(self):
        self._t1, self._t2, self._t3, self._p1, \
            self._p2, self._p3, self._p4, self._p5, \
            self._p6, self._p7, self._p8, self._p9, \
            _, self._h1 = self._i2c.read_register(self._address, 0x88,
                                                  fmt="<HhhHhhhhhhhhBB")

        buf = self._i2c.read_register(self._address, 0xE1, amount=7)
        self._h6 = struct.unpack_from("<b", buf, 6)[0]
        self._h2, self._h3 = struct.unpack("<hB", buf[0:3])
        self._h4 = (struct.unpack_from("<b", buf, 3)[0] << 4) | (buf[4] & 0xf)
        self._h5 = (struct.unpack_from("<b", buf, 5)[0] << 4) | (buf[4] >> 4)
        self._tfine = 0
        self._i2c.write(self._address, [0xf4, 0x3f])

        super()._init()
项目:steem-python    作者:steemit    | 项目源码 | 文件源码
def init_aes(shared_secret, nonce):
    """ Initialize AES instance

        :param hex shared_secret: Shared Secret to use as encryption key
        :param int nonce: Random nonce
        :return: AES instance and checksum of the encryption key
        :rtype: length 2 tuple

    """
    " Seed "
    ss = unhexlify(shared_secret)
    n = struct.pack("<Q", int(nonce))
    encryption_key = hashlib.sha512(n + ss).hexdigest()
    " Check'sum' "
    check = hashlib.sha256(unhexlify(encryption_key)).digest()
    check = struct.unpack_from("<I", check[:4])[0]
    " AES "
    key = unhexlify(encryption_key[0:64])
    iv = unhexlify(encryption_key[64:96])
    return AES.new(key, AES.MODE_CBC, iv), check
项目:salty    作者:GaloisInc    | 项目源码 | 文件源码
def unpack(self, buffer, _pos):
        """
        Unpacks data from a string buffer and sets class members
        """
        _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
        self.TaskID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        self.OptionID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        self.AssignedVehicle = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        self.TimeThreshold = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        self.TimeTaskCompleted = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        return _pos
项目:salty    作者:GaloisInc    | 项目源码 | 文件源码
def unpack(self, buffer, _pos):
        """
        Unpacks data from a string buffer and sets class members
        """
        _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
        self.EntityID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        _valid = struct.unpack_from("B", buffer, _pos )[0]
        _pos += 1
        if _valid:
            _series = struct.unpack_from(">q", buffer, _pos)[0]
            _pos += 8
            _type = struct.unpack_from(">I", buffer, _pos)[0]
            _pos += 4
            _version = struct.unpack_from(">H", buffer, _pos)[0]
            _pos += 2
            from lmcp import LMCPFactory
            self.PlanningPosition = LMCPFactory.LMCPFactory().createObject(_series, _version, _type )
            _pos = self.PlanningPosition.unpack(buffer, _pos)
        else:
            self.PlanningPosition = None
        self.PlanningHeading = struct.unpack_from(">f", buffer, _pos)[0]
        _pos += 4
        return _pos
项目:salty    作者:GaloisInc    | 项目源码 | 文件源码
def unpack(self, buffer, _pos):
        """
        Unpacks data from a string buffer and sets class members
        """
        _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
        self.TaskID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        boolChar = struct.unpack_from(">B", buffer, _pos)[0]
        self.RestartCompletely = True if boolChar == 1 else False
        _pos += 1
        _valid = struct.unpack_from("B", buffer, _pos )[0]
        _pos += 1
        if _valid:
            _series = struct.unpack_from(">q", buffer, _pos)[0]
            _pos += 8
            _type = struct.unpack_from(">I", buffer, _pos)[0]
            _pos += 4
            _version = struct.unpack_from(">H", buffer, _pos)[0]
            _pos += 2
            from lmcp import LMCPFactory
            self.ReAssign = LMCPFactory.LMCPFactory().createObject(_series, _version, _type )
            _pos = self.ReAssign.unpack(buffer, _pos)
        else:
            self.ReAssign = None
        return _pos
项目:salty    作者:GaloisInc    | 项目源码 | 文件源码
def unpack(self, buffer, _pos):
        """
        Unpacks data from a string buffer and sets class members
        """
        _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
        _arraylen = struct.unpack_from(">H", buffer, _pos )[0]
        _arraylen = struct.unpack_from(">H", buffer, _pos )[0]
        self.Vehicles = [None] * _arraylen
        _pos += 2
        if _arraylen > 0:
            self.Vehicles = struct.unpack_from(">" + `_arraylen` + "q", buffer, _pos )
            _pos += 8 * _arraylen
        _arraylen = struct.unpack_from(">H", buffer, _pos )[0]
        _arraylen = struct.unpack_from(">H", buffer, _pos )[0]
        self.CanceledTasks = [None] * _arraylen
        _pos += 2
        if _arraylen > 0:
            self.CanceledTasks = struct.unpack_from(">" + `_arraylen` + "q", buffer, _pos )
            _pos += 8 * _arraylen
        return _pos
项目:salty    作者:GaloisInc    | 项目源码 | 文件源码
def unpack(self, buffer, _pos):
        """
        Unpacks data from a string buffer and sets class members
        """
        _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
        self.VehicleID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        self.IntialTaskID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        self.IntialTaskOption = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        self.DestinationTaskID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        self.DestinationTaskOption = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        self.TimeToGo = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        return _pos
项目:salty    作者:GaloisInc    | 项目源码 | 文件源码
def unpack(self, buffer, _pos):
        """
        Unpacks data from a string buffer and sets class members
        """
        _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
        self.ResponseID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        self.TaskID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        self.PercentComplete = struct.unpack_from(">f", buffer, _pos)[0]
        _pos += 4
        _arraylen = struct.unpack_from(">H", buffer, _pos )[0]
        _arraylen = struct.unpack_from(">H", buffer, _pos )[0]
        self.EntitiesEngaged = [None] * _arraylen
        _pos += 2
        if _arraylen > 0:
            self.EntitiesEngaged = struct.unpack_from(">" + `_arraylen` + "q", buffer, _pos )
            _pos += 8 * _arraylen
        return _pos
项目:salty    作者:GaloisInc    | 项目源码 | 文件源码
def unpack(self, buffer, _pos):
        """
        Unpacks data from a string buffer and sets class members
        """
        _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
        self.CoordinatedAutomationRequestID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        _valid = struct.unpack_from("B", buffer, _pos )[0]
        _pos += 1
        if _valid:
            _series = struct.unpack_from(">q", buffer, _pos)[0]
            _pos += 8
            _type = struct.unpack_from(">I", buffer, _pos)[0]
            _pos += 4
            _version = struct.unpack_from(">H", buffer, _pos)[0]
            _pos += 2
            from lmcp import LMCPFactory
            self.PlanningState = LMCPFactory.LMCPFactory().createObject(_series, _version, _type )
            _pos = self.PlanningState.unpack(buffer, _pos)
        else:
            self.PlanningState = None
        return _pos
项目:salty    作者:GaloisInc    | 项目源码 | 文件源码
def unpack(self, buffer, _pos):
        """
        Unpacks data from a string buffer and sets class members
        """
        _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
        self.TaskID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        _arraylen = struct.unpack_from(">H", buffer, _pos )[0]
        _arraylen = struct.unpack_from(">H", buffer, _pos )[0]
        self.EntitiesInvolved = [None] * _arraylen
        _pos += 2
        if _arraylen > 0:
            self.EntitiesInvolved = struct.unpack_from(">" + `_arraylen` + "q", buffer, _pos )
            _pos += 8 * _arraylen
        self.TimeTaskCompleted = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        return _pos
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def calc_chksums(buf):
    """Calculate the checksum for a member's header by summing up all
       characters except for the chksum field which is treated as if
       it was filled with spaces. According to the GNU tar sources,
       some tars (Sun and NeXT) calculate chksum with signed char,
       which will be different if there are chars in the buffer with
       the high bit set. So we calculate two checksums, unsigned and
       signed.
    """
    unsigned_chksum = 256 + sum(struct.unpack_from("148B8x356B", buf))
    signed_chksum = 256 + sum(struct.unpack_from("148b8x356b", buf))
    return unsigned_chksum, signed_chksum
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def listen(self):
        """
        Listen to socket.
        """
        try:
            while True:
                head = await self.reader.readexactly(8)
                size, handle = struct.unpack_from('<LL', head)
                body = await self.reader.readexactly(size)
                data = method = fault = None

                try:
                    data, method = loads(body, use_builtin_types=True)
                except Fault as e:
                    fault = e
                except ExpatError as e:
                    # See #121 for this solution.
                    handle_exception(exception=e, module_name=__name__, func_name='listen', extra_data={'body': body})
                    continue

                if data and len(data) == 1:
                    data = data[0]

                self.event_loop.create_task(self.handle_payload(handle, method, data, fault))
        except ConnectionResetError as e:
            logger.critical(
                'Connection with the dedicated server has been closed, we will now close down the subprocess! {}'.format(str(e))
            )
            # When the connection has been reset, we will close the controller process so it can be restarted by the god
            # process. Exit code 10 gives the information to the god process.
            exit(10)
        except Exception as e:
            handle_exception(exception=e, module_name=__name__, func_name='listen')
            raise
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelfte_(s,sid):
    """ :returns: parsed opt subelement of FTE element """
    ret = s
    if sid == std.EID_FTE_RSRV: pass
    elif sid == std.EID_FTE_PMK_R1:
        # a 6-octed key
        ret = {'r1kh-id':struct.unpack_from('=Q',s+'\x00\x00')[0]}
    elif sid == std.EID_FTE_GTK:
        # Std Fig. 8-237 Key Info|Key Len|RSC|Wrapped Key
        #                       2|      1|  8|      24-40
        ki,kl,r = struct.unpack_from('=HBQ',s)
        ret = {'key-info':{'key-id':bits.leastx(2,ki),
                           'rsrv':bits.mostx(2,ki)},
               'key-leng':kl,
               'rsc':r,
               'wrapped-key':binascii.hexlify(s[struct.calcsize('=HBQ'):])}
    elif sid == std.EID_FTE_PMK_R0:
        # variable length 1-48 octets
        ret = {'r0kh-id':binascii.hexlify(s)}
    elif sid == std.EID_FTE_IGTK:
        # Std Fig 8-239 Key ID|IPN|Key Length|Wrapped Key
        #                    2|  6|         1|         24
        ki = struct.unpack_from('=H',s)[0]
        ipn = struct.unpack_from('=Q',s[2:8]+'\x00\x00')[0]
        kl = struct.unpack_from('=B',s,8)[0]
        ret = {'key-id':ki,
               'ipn':ipn,
               'key-len':kl,
               'wrapped-key':binascii.hexlify(s[9:])}
    return ret

# Diagnositc Report/Request optional subelements Std Table 8-143 & figures commented below
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelfmsreq_(s,sid):
    """ :returns: parsed fms request subelement """
    ret = s
    if sid == std.EID_FMS_REQ_SUBELEMENT_FMS: # Std Fig. 8-327
        # Note: the 4-byte rate identification is defined in 8.4.1.32
        # as 1|1|2
        di,mi,m,i,r = struct.unpack_from('=4BH',s)
        rem = s[6:]
        ret = {'delv-intv':di,
               'max-delv-intv':mi,
               'rate-ident':{'mask':_rateidmask_(m),
                             'mcs-index':i,
                             'rate':r}}

        # there are one or more tclas elements folled by an option tclas
        # processing element
        while rem:
            eid,tlen = struct.unpack_from('=2B',rem)
            if eid == std.EID_TCLAS:
                if not 'tclas' in ret: ret['tclas'] = []
                ret['tclas'].append(_parseie_(std.EID_TCLAS,rem[:tlen]))
                ret = ret[2+tlen:]
            elif eid == std.EID_TCLAS_PRO:
                ret['tclas-pro'] = _parseie_(std.EID_TCLAS_PRO,ret)
                # could use a break here but want to make sure
                # there are not hanging elements
                ret = ret[3:]
    elif sid == std.EID_FMS_REQ_SUBELEMENT_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# FMS Response subelements Std Table 8-159 & figures commented below
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelmsmtreqcl_(s,sid):
    """ :returns: parsed subelement of type channel load in msmt request """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_CL_RPT:
        # Std fig. 8-110
        c,r = struct.unpack_from('=2B',s)
        ret = {'rpt-condition':c,'ch-load-ref-val':r}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_CL_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# MSMT Request subelements for type Noise Histogram Std Table 8-62 and figures below
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelmsmtreqnh_(s,sid):
    """ :returns: parsed subelement of type channel load in msmt request """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_NH_RPT:
        # Std fig. 8-112
        c,a = struct.unpack_from('=2B',s)
        ret = {'rpt-condition':c,'anpi-ref-val':a}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_NH_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# MSMT Request subelements for type Beaon Std Table 8-65 and figures below
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelmsmtreqbeacon_(s,sid):
    """ :returns: parsed subelement of type beacon in msmt request """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_SSID:
        ret = {'ssid':_iesubelssid_(s)}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_BRI:
        # Std Fig. 8-114
        r = struct.unpack_from('=B',s)[0]
        if 5 <= r <= 10: t = int2s(s[1])
        else: t = struct.unpack_from('=B',s,1)[0]
        ret = {'rpt-condition':r,
               'threshold':t}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_RPT:
        # Std Table 8-67
        ret = {'rpt-detail':struct.unpack_from('=B',s)}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_REQ:
        # same as Std 8.4.2.13
        ret = _parseie_(std.EID_REQUEST,s)
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_AP_CH_RPT:
        # same as Std 8.4.2.38
        ret = _parseie_(std.EID_AP_CH_RPT,s)
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

#### NOTE: next three could probably be combined

# MSMT Request subelements for type Frame Request Std Table 8-68 and figures below
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelmsmtreqlci_(s,sid):
    """ :returns: parsed lci optional subfield """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_LCI_AZIMUTH: # std Fig. 8-124
        ret = {'azimuth-req':_eidmsmtreqlciazimuth_(struct.unpack_from('=B',s)[0])}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_LCI_REQUESTING:
        ret = {'originator-mac':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_LCI_TARGET:
        ret = {'target-mac':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_LCI_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# LCI AZIMUTH REQUEST AZIMUTH REQUST FIELD Std Fig. 8-125
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelmsmtreqmcastdiag_(s,sid):
    """ :returns: parsed subelement of type mcast diag """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_MCAST_TRIGGER:
        c,t,d = struct.unpack_from('=3B',s)
        ret = {'mcast-trigger-rpt':{'trigger-condition': c,
                                    'inactivity-timeout': t,
                                    'reactivation-delay': d}}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_MCAST_VEND:
        ret =  _parseie_(std.EID_VEND_SPEC,s)
    return ret

# MSMT Request subelements for type Location civic request Std Table 8-79
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelmsmtreqloccivic_(s,sid):
    """ :returns: parsed subelement of type location civic in msmt request """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_CIVIC_ORIGIN:
        ret = {'originator':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_CIVIC_TARGET:
        ret = {'target':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_CIVIC_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# MSMT Request subelements for Type Location Id Std Table 8-80
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelmsmtrptsta_(s,sid):
    """ :returns: parsed STA optional subelement """
    ret = s
    if sid == std.EID_MSMT_RPT_STA_STAT_REASON:
        ret = {'reason':struct.unpack_from('=B',s)[0]}
    elif sid == std.EID_MSMT_RPT_STA_STAT_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# MSMT Report subelements for Type LCI Std Table 8-90
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelmsmtrptlocid_(s,sid):
    """ :returns: parsed subelement of type location civic in msmt request """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_ID_ORIGIN:
        ret = {'originator':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_ID_TARGET:
        ret = {'target':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_ID_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# EVENT REQUEST sublements for Type transition Std 8.4.2.69.2
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _iesubelevreqp2p_(s,sid):
    """ :returns: parsed sublements of type P2P link in event request """
    ret = s
    if sid == std.EVENT_REQUEST_TYPE_P2P_PEER:
        ret = {'peer-addr':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EVENT_REQUEST_TYPE_P2P_CH_NUM:
        # TODO: make this a single function -> it appears multiple times
        o,c = struct.unpack_from('=2B',s)
        ret = {'op-class':o,'ch-num':c}
    return ret

# EVENT REQUEST sublements for Type Vend Std 8.4.2.69.5
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _parselcirpt_(s):
    """ :returns: parsed lci location elements """
    lci = {}
    for n,i,l,x,f in _EID_MSMT_RPT_LCI_FIELDS_:
        if n == 'lat-int' or n == 'lon-int' or n == 'alt-int':
            # these are 2's complement
            lci[n] = int2s(s[i:i+1]+'\x00'*x)
        else:
            lci[n] = struct.unpack_from(f,s[i:i+l]+'\x00'*x)
    return lci

# MSMT Report->MCast Diagn report reason field Std Fig. 8-188
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _parsesuitesel_(s):
    """ :returns: parse suite selector from packed string s """
    vs = struct.unpack_from('=4B',s)
    return {'oui':_hwaddr_(vs[0:3]).replace(':','-'),'suite-type':vs[-1]}

# RSN capabilities of the RSNE Std Fig 8-188
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _parsetimeval_(s):
    """ :returns: a parsed time value from packed string s """
    tval = struct.unpack_from('=H5BHB',s)
    return {'year':tval[0],
            'month':tval[1],
            'day':tval[2],
            'hours':tval[3],
            'minutes':tval[4],
            'seconds':tval[5],
            'milliseconds':tval[6],
            'rsrv':tval[7]}

################################################################################
#### CTRL Frames Std 8.3.1
################################################################################
项目:itamae    作者:wraith-wireless    | 项目源码 | 文件源码
def _tkip_(f,m):
    """
     parse tkip data from frame f into mac dict
     :param f: frame
     :param m: mpdu dict
    """
    try:
        keyid = struct.unpack_from('=B',f,m['offset']+_TKIP_KEY_BYTE_)[0]
        m['l3-crypt'] = {'type':'tkip',
                         'iv':{'tsc1':f[m['offset']+_TKIP_TSC1_BYTE_],
                               'wep-seed':f[m['offset']+_TKIP_WEPSEED_BYTE_],
                               'tsc0':f[m['offset']+_TKIP_TSC0_BYTE_],
                               'key-id':{'rsrv':bits.leastx(_TKIP_EXT_IV_,keyid),
                                         'ext-iv':bits.midx(_TKIP_EXT_IV_,_TKIP_EXT_IV_LEN_,keyid),
                                         'key-id':bits.mostx(_TKIP_EXT_IV_+_TKIP_EXT_IV_LEN_,keyid)}},
                         'ext-iv':{'tsc2':f[m['offset']+_TKIP_TSC2_BYTE_],
                                   'tsc3':f[m['offset']+_TKIP_TSC3_BYTE_],
                                   'tsc4':f[m['offset']+_TKIP_TSC4_BYTE_],
                                   'tsc5':f[m['offset']+_TKIP_TSC5_BYTE_]},
                         'mic':f[-(_TKIP_MIC_LEN_ + _TKIP_ICV_LEN_):-_TKIP_ICV_LEN_],
                         'icv':f[-_TKIP_ICV_LEN_:]}
        m['offset'] += _TKIP_IV_LEN_
        m['stripped'] += _TKIP_MIC_LEN_ + _TKIP_ICV_LEN_
    except Exception as e:
        m['err'].append(('l3-crypt.tkip',"parsing {0}".format(e)))

#### CCMP Std 11.4.3.2
# <MAC HDR>|CCMP HDR|DATA|MIC|FCS
# bytes var|       8| >=1|  8|  4
# where the CCMP Header is defined
#    PN0|PN1|RSRV|RSRV|EXT IV|KeyID|PN2|PN3|PN4|PN5
# bits 8|  8|   8|   5|     1|    2|  8|  8|  8|  8