我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用struct.unpack_from()。
def _parseiesubel_(info,f=_iesubel_): """ parse a variable length info element sub element :param info: packed string, next element starts at index 0 :param f: function to apply to each sub element for further parsing :returns: list of parsed subelements """ opt = [] offset = 0 while len(info) >= 2: # may be flags (0-octet subelements) sid,slen = struct.unpack_from('=2B',info,offset) opt.append((sid,f(info[offset+2:offset+2+slen],sid))) offset += 2 + slen return opt #### OPTIONAL SUBELEMENTS -> the sub element id and length have been stripped
def _iesubeltfsreq_(s,sid): """ :returns: parsed tfs request subelement """ ret = s if sid == std.EID_TFS_SUBELEMENT_TFS: # there are one or more tclas elements folled by an option tclas # processing element ret = {} while s: eid,tlen = struct.unpack_from('=2B',s) if eid == std.EID_TCLAS: if not 'tclas' in ret: ret['tclas'] = [] ret['tclas'].append(_parseie_(std.EID_TCLAS,s[:tlen])) s = s[2+tlen:] elif eid == std.EID_TCLAS_PRO: s['tclas-pro'] = _parseie_(std.EID_TCLAS_PRO,ret) # could use a break here but want to make sure # there are not hanging elements s = s[3:] elif sid == std.EID_TFS_SUBELEMENT_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # MSMT Pilot subelements Std Table 8-117
def _iesubelmsmtreqstasta_(s,sid): """ :returns: parsed subelement of type sta request for sta counters in msmt request """ ret = s if sid == std.EID_MSMT_REQ_SUBELEMENT_STA_RPT: # Std Fig. 8-117 cnt,to,t = struct.unpack_from('=I2H',s) ts = s[8:] ret = {'msmt-cnt':cnt, 'trigger-timeout':to, 'sta-cntr-trigger-cond':_stacntrtriggerconds_(t), 'thresholds':{}} # optional count fields are 4-bytes assuming they are appending in order for thresh in ['fail','fcs-error','mult-retry','dup','rts-fail','ack-fail','retry-cnt']: if ret['sta-cntr-trigger-cond'][thresh]: ret['thresholds'][thresh] = struct.unpack_from('=I',ts)[0] ts = ts[4:] elif sid == std.EID_MSMT_REQ_SUBELEMENT_STA_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # STA Counter Trigger Conditions Std Fig. 8-118
def _iesubelmsmtreqstarsna_(s,sid): """ :returns: parsed subelement of type sta request for qos counters in msmt request """ ret = s if sid == std.EID_MSMT_REQ_SUBELEMENT_STA_RPT: # Std Fig. 8-121 cnt,to,t = struct.unpack_from('=I2H',s) ts = s[8:] ret = {'msmt-cnt':cnt, 'trigger-timeout':to, 'rsna-cntr-trigger-cond':_rsnacntrtriggerconds_(t), 'thresholds':{}} # optional count fields are 4-bytes assuming they are appending in order for thresh in ['cmacicv-err','cmarc-replay','robust-ccmp-replay','tkipicv-err','tkip-replay','ccmp-decrypt','ccmp-replay']: if ret['sta-cntr-trigger-cond'][thresh]: ret['thresholds'][thresh] = struct.unpack_from('=I',ts)[0] ts = ts[4:] elif sid == std.EID_MSMT_REQ_SUBELEMENT_STA_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # RSNA Counter Trigger Conditions Std Fig. 8-122
def _iesubelmsmtreqtx_(s,sid): """ :returns: parsed tx optional subfield """ ret = s if sid == std.EID_MSMT_REQ_SUBELEMENT_TX_RPT: c,ae,ce,d,m,to = struct.unpack_from('=6B',s) ret = {'trigger-cond':_eidmsmtreqtxtrigger_(c), 'avg-err-thresh':ae, 'cons-err-thresh':ce, 'delay-thresh':_eidmsmtreqtxdelay_(d), 'msmt-cnt':m, 'trigger-timeout':to} elif sid == std.EID_MSMT_REQ_SUBELEMENT_TX_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # TX TRIGGER CONDITION Std Fig. 8-132
def _iesubelmsmtrptframe_(s,sid): """ :returns: parsed frame subelement """ ret = s if sid == std.EID_MSMT_RPT_FRAME_CNT_RPT: # Fig 8-151, 8-152 ret = [] n = len(s)/19 for i in xrange(n): vs = struct.unpack_from('=17BH',s,i*19) ent = {'tx-addr':_hwaddr_(vs[0:6]), 'bssid':_hwaddr_(vs[6:12]), 'phy-type':vs[12], 'avg-rcpi':vs[13], 'last-rsni':vs[14], 'last-rcpi':vs[15], 'antenna-id':vs[16], 'frmae-cnt':vs[17]} ret.append(ent) elif sid == std.EID_MSMT_RPT_FRAME_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # MSMT Report subelements for Type STA statistics Std Table 8-89
def _iesubelmsmtrptlci_(s,sid): """ :returns: parsed LCI optional subelement """ ret = s if sid == std.EID_MSMT_RPT_LCI_AZIMUTH: ret = { 'azimuth-rpt':_iesubelmsmtrptlicazimuth_(struct.unpack_from('=H',s)[0]) } elif sid == std.EID_MSMT_RPT_LCI_ORIGIN: ret = {'originator':_hwaddr_(struct.unpack('=6B',s))} elif sid == std.EID_MSMT_RPT_LCI_TARGET: ret = {'target':_hwaddr_(struct.unpack('=6B',s))} elif sid == std.EID_MSMT_RPT_LCI_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # MSMT Report->Azimuth Report fields Std Fig. 8-164
def _iesubelevreqtransistion_(s,sid): """ :returns: parsed subelements of type transistion in event request """ ret = s if sid == std.EVENT_REQUEST_TYPE_TRANSITION_TARGET: ret = {'tgt-bssid':_hwaddr_(struct.unpack_from('=6B',s))} elif sid == std.EVENT_REQUEST_TYPE_TRANSITION_SOURCE: ret = {'src-bssid':_hwaddr_(struct.unpack_from('=6B',s))} elif sid == std.EVENT_REQUEST_TYPE_TRANSITION_TIME_TH: ret = {'trans-time-threshold':struct.unpack_from('=H',s)[0]} elif sid == std.EVENT_REQUEST_TYPE_TRANSITION_RESULT: v = struct.unpack_from('=B',s)[0] ret = {'match-val':_eidevreqsubelmatchval_(v)} elif sid == std.EVENT_REQUEST_TYPE_TRANSITION_FREQUENT: ft,t = struct.unpack_from('=BH',s) ret = {'freq-transistion-cnt-threahold':ft,'time-intv':t} return ret # EVENT REQUEST match value for Type transition Std Fig 8-272
def _parseinfoeldse_(s): """ :returns: parsed dse location from packed string s """ dse = {} for n,i,l,x,f in _EID_DSE_FIELDS_: if n == 'lat-int' or n == 'lon-int' or n == 'alt-int': dse[n] = int2s(s[i:i+l]+'\x00'*x) else: dse[n] = struct.unpack_from(f,s[i:i+l]+'\x00'*x) # last three fields are byte centric dei,op,chn = struct.unpack_from('=H2B',s,len(s)-4) dse['depend-enable-id'] = dei dse['op-class'] = op dse['ch-num'] = chn return dse # HT Capabilities Info field Std Fig 8-249 # octest are defined as 1|1|2|1|1|1|1|2|1|1|1|1|1|1 see Fig 8-249 for names # See also Std Table 8-124 for definition of sub fields
def decompile(self, data, ttFont): sstruct.unpack2(Silf_hdr_format, data, self) if self.version >= 5.0: (data, self.scheme) = grUtils.decompress(data) sstruct.unpack2(Silf_hdr_format_3, data, self) base = sstruct.calcsize(Silf_hdr_format_3) elif self.version < 3.0: self.numSilf = struct.unpack('>H', data[4:6]) self.scheme = 0 self.compilerVersion = 0 base = 8 else: self.scheme = 0 sstruct.unpack2(Silf_hdr_format_3, data, self) base = sstruct.calcsize(Silf_hdr_format_3) silfoffsets = struct.unpack_from(('>%dL' % self.numSilf), data[base:]) for offset in silfoffsets: s = Silf() self.silfs.append(s) s.decompile(data[offset:], ttFont, self.version)
def register_schema(cls, typ, schema, typecode): if typecode is not None and typ in cls.TYPE_CODES: if cls.TYPE_CODES[typ] != typecode or cls.OBJ_PACKERS[typecode][2].schema is not schema: raise ValueError("Registering different types with same typecode %r: %r" % (cls.TYPE_CODES[typ], typ)) return cls.OBJ_PACKERS[typecode][2] if isinstance(typ, mapped_object_with_schema): packable = typ else: packable = mapped_object_with_schema(schema) class SchemaBufferProxyProperty(GenericBufferProxyProperty): typ = packable if typecode is not None: cls.TYPE_CODES[typ] = typecode cls.TYPE_CODES[packable] = typecode cls.OBJ_PACKERS[typecode] = (packable.pack_into, packable.unpack_from, packable) TYPES[typ] = packable TYPES[packable] = packable PROXY_TYPES[typ] = SchemaBufferProxyProperty PROXY_TYPES[packable] = SchemaBufferProxyProperty return packable
def __get__(self, obj, klass): if obj is None: return self elif obj.none_bitmap & self.mask: return None if cython.compiled: pybuf = cython.address(obj.pybuf) buflen = pybuf.len assert (obj.offs + self.offs + cython.sizeof(cython.longlong)) <= buflen offs = obj.offs + cython.cast(cython.p_longlong, cython.cast(cython.p_uchar, pybuf.buf) + obj.offs + self.offs)[0] if obj.idmap is not None: poffs = offs # python version of offs rv = obj.idmap.get(poffs, poffs) # idmap cannot possibly hold "poffs" for that offset if rv is not poffs: return rv assert offs + cython.sizeof(cython.ushort) <= buflen else: poffs = offs = obj.offs + struct.unpack_from('q', obj.buf, obj.offs + self.offs)[0] rv = self.typ.unpack_from(obj.buf, offs) if obj.idmap is not None: obj.idmap[poffs] = rv return rv
def getter(self, proxy_into = None, no_idmap = False): schema = self.schema proxy_class = self.proxy_class index = self.index idmap = self.idmap if not no_idmap else None buf = self.buf if proxy_class is not None: proxy_class_new = functools.partial(proxy_class.__new__, proxy_class) else: proxy_class_new = None @cython.locals(pos=int) def getter(pos): return schema.unpack_from(buf, index[pos], idmap, proxy_class_new, proxy_into) return getter
def iter_fast(self): # getter inlined schema = self.schema proxy_class = self.proxy_class index = self.index idmap = self.idmap buf = self.buf if proxy_class is not None: proxy_class_new = functools.partial(proxy_class.__new__, proxy_class) else: proxy_class_new = None proxy_into = schema.Proxy() for i in xrange(len(self)): yield schema.unpack_from(buf, index[i], idmap, proxy_class_new, proxy_into)
def _extract_images(filename): """??????????????????? :param filename: ????? :return: 4??numpy??[index, y, x, depth]? ???np.float32 """ images = [] print('Extracting {}'.format(filename)) with gzip.GzipFile(fileobj=open(filename, 'rb')) as f: buf = f.read() index = 0 magic, num_images, rows, cols = struct.unpack_from('>IIII', buf, index) if magic != 2051: raise ValueError('Invalid magic number {} in MNIST image file: {}'.format(magic, filename)) index += struct.calcsize('>IIII') for i in range(num_images): img = struct.unpack_from('>784B', buf, index) index += struct.calcsize('>784B') img = np.array(img, dtype=np.float32) # ????[0,255]???[0,1] img = np.multiply(img, 1.0 / 255.0) img = img.reshape(rows, cols, 1) images.append(img) return np.array(images, dtype=np.float32)
def _extract_labels(filename, num_classes=10): """??????????????? :param filename: ????? :param num_classes: ??one-hot??????????10? :return: 2??numpy??[index, num_classes]? ???np.float32 """ labels = [] print('Extracting {}'.format(filename)) with gzip.GzipFile(fileobj=open(filename, 'rb')) as f: buf = f.read() index = 0 magic, num_labels = struct.unpack_from('>II', buf, index) if magic != 2049: raise ValueError('Invalid magic number {} in MNIST label file: {}'.format(magic, filename)) index += struct.calcsize('>II') for i in range(num_labels): label = struct.unpack_from('>B', buf, index) index += struct.calcsize('>B') label_one_hot = np.zeros(num_classes, dtype=np.float32) label_one_hot[label[0]] = 1 labels.append(label_one_hot) return np.array(labels, dtype=np.float32)
def read_image(filename): f = open(filename, 'rb') index = 0 buf = f.read() f.close() magic, images, rows, columns = struct.unpack_from('>IIII' , buf , index) index += struct.calcsize('>IIII') for i in xrange(images): #for i in xrange(2000): image = Image.new('L', (columns, rows)) for x in xrange(rows): for y in xrange(columns): image.putpixel((y, x), int(struct.unpack_from('>B', buf, index)[0])) index += struct.calcsize('>B') print 'save ' + str(i) + 'image' image.save('./test/' + str(i) + '.png')
def read_label(filename, saveFilename): f = open(filename, 'rb') index = 0 buf = f.read() f.close() magic, labels = struct.unpack_from('>II' , buf , index) index += struct.calcsize('>II') labelArr = [0] * labels #labelArr = [0] * 2000 for x in xrange(labels): #for x in xrange(2000): labelArr[x] = int(struct.unpack_from('>B', buf, index)[0]) index += struct.calcsize('>B') save = open(saveFilename, 'w') save.write(','.join(map(lambda x: str(x), labelArr))) save.write('\n') save.close() print 'save labels success'
def parse(self, data, offset=0): if len(data) < self.size: raise Exception("Header: Too small size of input data !") val = unpack_from(self.FORMAT, data, offset) self.MagicNumber = val[0] header_crc = val[1] self.TimeStamp = val[2] self.DataSize = val[3] self.LoadAddress = val[4] self.EntryAddress = val[5] self.DataCRC = val[6] self.OsType = val[7] self.ArchType = val[8] self.ImageType = val[9] self.Compression = val[10] self.Name = val[11].decode('utf-8').strip('\0') if header_crc != self._crc(): raise Exception("Header: Uncorrect CRC of input data !") return self.size
def get_img_type(data, offset=0): """ Help function for extracting image fom raw data :param data: The raw data as byte array :param offset: The offset :return: Image type and offset where image start """ while True: if (offset + Header.SIZE) > len(data): raise Exception("Error: Not an U-Boot image !") (header_mn, header_crc,) = unpack_from('!2L', data, offset) # Check the magic number if is U-Boot image if header_mn == 0x27051956: header = bytearray(data[offset:offset+Header.SIZE]) header[4:8] = [0]*4 if header_crc == CRC32(header): break offset += 4 (image_type,) = unpack_from('B', data, offset + 30) return image_type, offset
def uhid_parse_event_from_kernel(event): assert len(event) == 4380 ev_type = struct.unpack_from('< L', event)[0] if ev_type == 2: return struct.unpack_from(UHID_EVENT_FMT_START, event) elif ev_type == 6: return struct.unpack_from(UHID_EVENT_FMT_OUTPUT, event) elif ev_type == 4: return struct.unpack_from(UHID_EVENT_FMT_OPEN, event) elif ev_type == 5: return struct.unpack_from(UHID_EVENT_FMT_CLOSE, event) elif ev_type == 3: return struct.unpack_from(UHID_EVENT_FMT_STOP, event) elif ev_type == 9: return struct.unpack_from(UHID_EVENT_FMT_GETRPRT, event) elif ev_type == 13: return struct.unpack_from(UHID_EVENT_FMT_SETRPRT, event) else: raise ValueError('unknown UHID event type from kernel %d' % ev_type)
def _find_firstheader(nsis_file): firstheader_offset = 0 pos = 0 while True: chunk = nsis_file.read(32768 if firstheader_offset else 512) if len(chunk) < _firstheader_pack.size: return None if firstheader_offset == 0: firstheader = FirstHeader._make( _firstheader_pack.unpack_from(chunk)) firstheader.header_offset = pos firstheader.data_offset = pos + _firstheader_pack.size if firstheader.siginfo == FH_SIG and \ firstheader.magics == FH_MAGICS: # NSIS header found. return firstheader pos += len(chunk)
def _extract_header(nsis_file, firstheader): inflated_data, data_size = inflate_header(nsis_file, firstheader.data_offset) header = Header._make(_header_pack.unpack_from(inflated_data)) firstheader.header = header firstheader._raw_header = bytes(inflated_data) firstheader._raw_header_c_size = data_size # Parse the block headers. block_headers = [] for i in range(BLOCKS_COUNT): header_offset = i * _blockheader_pack.size block_header = BlockHeader._make(_blockheader_pack.unpack_from( header.raw_blocks[header_offset:])) block_headers.append(block_header) header.blocks = block_headers # Parse the install types. header.install_types = [ struct.unpack_from('<I', header.raw_install_types[i:])[0] for i in range(0, len(header.raw_install_types), 4)] return header
def _init(self): self._t1, self._t2, self._t3, self._p1, \ self._p2, self._p3, self._p4, self._p5, \ self._p6, self._p7, self._p8, self._p9, \ _, self._h1 = self._i2c.read_register(self._address, 0x88, fmt="<HhhHhhhhhhhhBB") buf = self._i2c.read_register(self._address, 0xE1, amount=7) self._h6 = struct.unpack_from("<b", buf, 6)[0] self._h2, self._h3 = struct.unpack("<hB", buf[0:3]) self._h4 = (struct.unpack_from("<b", buf, 3)[0] << 4) | (buf[4] & 0xf) self._h5 = (struct.unpack_from("<b", buf, 5)[0] << 4) | (buf[4] >> 4) self._tfine = 0 self._i2c.write(self._address, [0xf4, 0x3f]) super()._init()
def init_aes(shared_secret, nonce): """ Initialize AES instance :param hex shared_secret: Shared Secret to use as encryption key :param int nonce: Random nonce :return: AES instance and checksum of the encryption key :rtype: length 2 tuple """ " Seed " ss = unhexlify(shared_secret) n = struct.pack("<Q", int(nonce)) encryption_key = hashlib.sha512(n + ss).hexdigest() " Check'sum' " check = hashlib.sha256(unhexlify(encryption_key)).digest() check = struct.unpack_from("<I", check[:4])[0] " AES " key = unhexlify(encryption_key[0:64]) iv = unhexlify(encryption_key[64:96]) return AES.new(key, AES.MODE_CBC, iv), check
def unpack(self, buffer, _pos): """ Unpacks data from a string buffer and sets class members """ _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos) self.TaskID = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 self.OptionID = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 self.AssignedVehicle = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 self.TimeThreshold = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 self.TimeTaskCompleted = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 return _pos
def unpack(self, buffer, _pos): """ Unpacks data from a string buffer and sets class members """ _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos) self.EntityID = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 _valid = struct.unpack_from("B", buffer, _pos )[0] _pos += 1 if _valid: _series = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 _type = struct.unpack_from(">I", buffer, _pos)[0] _pos += 4 _version = struct.unpack_from(">H", buffer, _pos)[0] _pos += 2 from lmcp import LMCPFactory self.PlanningPosition = LMCPFactory.LMCPFactory().createObject(_series, _version, _type ) _pos = self.PlanningPosition.unpack(buffer, _pos) else: self.PlanningPosition = None self.PlanningHeading = struct.unpack_from(">f", buffer, _pos)[0] _pos += 4 return _pos
def unpack(self, buffer, _pos): """ Unpacks data from a string buffer and sets class members """ _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos) self.TaskID = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 boolChar = struct.unpack_from(">B", buffer, _pos)[0] self.RestartCompletely = True if boolChar == 1 else False _pos += 1 _valid = struct.unpack_from("B", buffer, _pos )[0] _pos += 1 if _valid: _series = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 _type = struct.unpack_from(">I", buffer, _pos)[0] _pos += 4 _version = struct.unpack_from(">H", buffer, _pos)[0] _pos += 2 from lmcp import LMCPFactory self.ReAssign = LMCPFactory.LMCPFactory().createObject(_series, _version, _type ) _pos = self.ReAssign.unpack(buffer, _pos) else: self.ReAssign = None return _pos
def unpack(self, buffer, _pos): """ Unpacks data from a string buffer and sets class members """ _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos) _arraylen = struct.unpack_from(">H", buffer, _pos )[0] _arraylen = struct.unpack_from(">H", buffer, _pos )[0] self.Vehicles = [None] * _arraylen _pos += 2 if _arraylen > 0: self.Vehicles = struct.unpack_from(">" + `_arraylen` + "q", buffer, _pos ) _pos += 8 * _arraylen _arraylen = struct.unpack_from(">H", buffer, _pos )[0] _arraylen = struct.unpack_from(">H", buffer, _pos )[0] self.CanceledTasks = [None] * _arraylen _pos += 2 if _arraylen > 0: self.CanceledTasks = struct.unpack_from(">" + `_arraylen` + "q", buffer, _pos ) _pos += 8 * _arraylen return _pos
def unpack(self, buffer, _pos): """ Unpacks data from a string buffer and sets class members """ _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos) self.VehicleID = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 self.IntialTaskID = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 self.IntialTaskOption = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 self.DestinationTaskID = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 self.DestinationTaskOption = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 self.TimeToGo = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 return _pos
def unpack(self, buffer, _pos): """ Unpacks data from a string buffer and sets class members """ _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos) self.ResponseID = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 self.TaskID = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 self.PercentComplete = struct.unpack_from(">f", buffer, _pos)[0] _pos += 4 _arraylen = struct.unpack_from(">H", buffer, _pos )[0] _arraylen = struct.unpack_from(">H", buffer, _pos )[0] self.EntitiesEngaged = [None] * _arraylen _pos += 2 if _arraylen > 0: self.EntitiesEngaged = struct.unpack_from(">" + `_arraylen` + "q", buffer, _pos ) _pos += 8 * _arraylen return _pos
def unpack(self, buffer, _pos): """ Unpacks data from a string buffer and sets class members """ _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos) self.CoordinatedAutomationRequestID = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 _valid = struct.unpack_from("B", buffer, _pos )[0] _pos += 1 if _valid: _series = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 _type = struct.unpack_from(">I", buffer, _pos)[0] _pos += 4 _version = struct.unpack_from(">H", buffer, _pos)[0] _pos += 2 from lmcp import LMCPFactory self.PlanningState = LMCPFactory.LMCPFactory().createObject(_series, _version, _type ) _pos = self.PlanningState.unpack(buffer, _pos) else: self.PlanningState = None return _pos
def unpack(self, buffer, _pos): """ Unpacks data from a string buffer and sets class members """ _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos) self.TaskID = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 _arraylen = struct.unpack_from(">H", buffer, _pos )[0] _arraylen = struct.unpack_from(">H", buffer, _pos )[0] self.EntitiesInvolved = [None] * _arraylen _pos += 2 if _arraylen > 0: self.EntitiesInvolved = struct.unpack_from(">" + `_arraylen` + "q", buffer, _pos ) _pos += 8 * _arraylen self.TimeTaskCompleted = struct.unpack_from(">q", buffer, _pos)[0] _pos += 8 return _pos
def calc_chksums(buf): """Calculate the checksum for a member's header by summing up all characters except for the chksum field which is treated as if it was filled with spaces. According to the GNU tar sources, some tars (Sun and NeXT) calculate chksum with signed char, which will be different if there are chars in the buffer with the high bit set. So we calculate two checksums, unsigned and signed. """ unsigned_chksum = 256 + sum(struct.unpack_from("148B8x356B", buf)) signed_chksum = 256 + sum(struct.unpack_from("148b8x356b", buf)) return unsigned_chksum, signed_chksum
def listen(self): """ Listen to socket. """ try: while True: head = await self.reader.readexactly(8) size, handle = struct.unpack_from('<LL', head) body = await self.reader.readexactly(size) data = method = fault = None try: data, method = loads(body, use_builtin_types=True) except Fault as e: fault = e except ExpatError as e: # See #121 for this solution. handle_exception(exception=e, module_name=__name__, func_name='listen', extra_data={'body': body}) continue if data and len(data) == 1: data = data[0] self.event_loop.create_task(self.handle_payload(handle, method, data, fault)) except ConnectionResetError as e: logger.critical( 'Connection with the dedicated server has been closed, we will now close down the subprocess! {}'.format(str(e)) ) # When the connection has been reset, we will close the controller process so it can be restarted by the god # process. Exit code 10 gives the information to the god process. exit(10) except Exception as e: handle_exception(exception=e, module_name=__name__, func_name='listen') raise
def _iesubelfte_(s,sid): """ :returns: parsed opt subelement of FTE element """ ret = s if sid == std.EID_FTE_RSRV: pass elif sid == std.EID_FTE_PMK_R1: # a 6-octed key ret = {'r1kh-id':struct.unpack_from('=Q',s+'\x00\x00')[0]} elif sid == std.EID_FTE_GTK: # Std Fig. 8-237 Key Info|Key Len|RSC|Wrapped Key # 2| 1| 8| 24-40 ki,kl,r = struct.unpack_from('=HBQ',s) ret = {'key-info':{'key-id':bits.leastx(2,ki), 'rsrv':bits.mostx(2,ki)}, 'key-leng':kl, 'rsc':r, 'wrapped-key':binascii.hexlify(s[struct.calcsize('=HBQ'):])} elif sid == std.EID_FTE_PMK_R0: # variable length 1-48 octets ret = {'r0kh-id':binascii.hexlify(s)} elif sid == std.EID_FTE_IGTK: # Std Fig 8-239 Key ID|IPN|Key Length|Wrapped Key # 2| 6| 1| 24 ki = struct.unpack_from('=H',s)[0] ipn = struct.unpack_from('=Q',s[2:8]+'\x00\x00')[0] kl = struct.unpack_from('=B',s,8)[0] ret = {'key-id':ki, 'ipn':ipn, 'key-len':kl, 'wrapped-key':binascii.hexlify(s[9:])} return ret # Diagnositc Report/Request optional subelements Std Table 8-143 & figures commented below
def _iesubelfmsreq_(s,sid): """ :returns: parsed fms request subelement """ ret = s if sid == std.EID_FMS_REQ_SUBELEMENT_FMS: # Std Fig. 8-327 # Note: the 4-byte rate identification is defined in 8.4.1.32 # as 1|1|2 di,mi,m,i,r = struct.unpack_from('=4BH',s) rem = s[6:] ret = {'delv-intv':di, 'max-delv-intv':mi, 'rate-ident':{'mask':_rateidmask_(m), 'mcs-index':i, 'rate':r}} # there are one or more tclas elements folled by an option tclas # processing element while rem: eid,tlen = struct.unpack_from('=2B',rem) if eid == std.EID_TCLAS: if not 'tclas' in ret: ret['tclas'] = [] ret['tclas'].append(_parseie_(std.EID_TCLAS,rem[:tlen])) ret = ret[2+tlen:] elif eid == std.EID_TCLAS_PRO: ret['tclas-pro'] = _parseie_(std.EID_TCLAS_PRO,ret) # could use a break here but want to make sure # there are not hanging elements ret = ret[3:] elif sid == std.EID_FMS_REQ_SUBELEMENT_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # FMS Response subelements Std Table 8-159 & figures commented below
def _iesubelmsmtreqcl_(s,sid): """ :returns: parsed subelement of type channel load in msmt request """ ret = s if sid == std.EID_MSMT_REQ_SUBELEMENT_CL_RPT: # Std fig. 8-110 c,r = struct.unpack_from('=2B',s) ret = {'rpt-condition':c,'ch-load-ref-val':r} elif sid == std.EID_MSMT_REQ_SUBELEMENT_CL_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # MSMT Request subelements for type Noise Histogram Std Table 8-62 and figures below
def _iesubelmsmtreqnh_(s,sid): """ :returns: parsed subelement of type channel load in msmt request """ ret = s if sid == std.EID_MSMT_REQ_SUBELEMENT_NH_RPT: # Std fig. 8-112 c,a = struct.unpack_from('=2B',s) ret = {'rpt-condition':c,'anpi-ref-val':a} elif sid == std.EID_MSMT_REQ_SUBELEMENT_NH_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # MSMT Request subelements for type Beaon Std Table 8-65 and figures below
def _iesubelmsmtreqbeacon_(s,sid): """ :returns: parsed subelement of type beacon in msmt request """ ret = s if sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_SSID: ret = {'ssid':_iesubelssid_(s)} elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_BRI: # Std Fig. 8-114 r = struct.unpack_from('=B',s)[0] if 5 <= r <= 10: t = int2s(s[1]) else: t = struct.unpack_from('=B',s,1)[0] ret = {'rpt-condition':r, 'threshold':t} elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_RPT: # Std Table 8-67 ret = {'rpt-detail':struct.unpack_from('=B',s)} elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_REQ: # same as Std 8.4.2.13 ret = _parseie_(std.EID_REQUEST,s) elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_AP_CH_RPT: # same as Std 8.4.2.38 ret = _parseie_(std.EID_AP_CH_RPT,s) elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret #### NOTE: next three could probably be combined # MSMT Request subelements for type Frame Request Std Table 8-68 and figures below
def _iesubelmsmtreqlci_(s,sid): """ :returns: parsed lci optional subfield """ ret = s if sid == std.EID_MSMT_REQ_SUBELEMENT_LCI_AZIMUTH: # std Fig. 8-124 ret = {'azimuth-req':_eidmsmtreqlciazimuth_(struct.unpack_from('=B',s)[0])} elif sid == std.EID_MSMT_REQ_SUBELEMENT_LCI_REQUESTING: ret = {'originator-mac':_hwaddr_(struct.unpack_from('=6B',s))} elif sid == std.EID_MSMT_REQ_SUBELEMENT_LCI_TARGET: ret = {'target-mac':_hwaddr_(struct.unpack_from('=6B',s))} elif sid == std.EID_MSMT_REQ_SUBELEMENT_LCI_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # LCI AZIMUTH REQUEST AZIMUTH REQUST FIELD Std Fig. 8-125
def _iesubelmsmtreqmcastdiag_(s,sid): """ :returns: parsed subelement of type mcast diag """ ret = s if sid == std.EID_MSMT_REQ_SUBELEMENT_MCAST_TRIGGER: c,t,d = struct.unpack_from('=3B',s) ret = {'mcast-trigger-rpt':{'trigger-condition': c, 'inactivity-timeout': t, 'reactivation-delay': d}} elif sid == std.EID_MSMT_REQ_SUBELEMENT_MCAST_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # MSMT Request subelements for type Location civic request Std Table 8-79
def _iesubelmsmtreqloccivic_(s,sid): """ :returns: parsed subelement of type location civic in msmt request """ ret = s if sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_CIVIC_ORIGIN: ret = {'originator':_hwaddr_(struct.unpack_from('=6B',s))} elif sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_CIVIC_TARGET: ret = {'target':_hwaddr_(struct.unpack_from('=6B',s))} elif sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_CIVIC_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # MSMT Request subelements for Type Location Id Std Table 8-80
def _iesubelmsmtrptsta_(s,sid): """ :returns: parsed STA optional subelement """ ret = s if sid == std.EID_MSMT_RPT_STA_STAT_REASON: ret = {'reason':struct.unpack_from('=B',s)[0]} elif sid == std.EID_MSMT_RPT_STA_STAT_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # MSMT Report subelements for Type LCI Std Table 8-90
def _iesubelmsmtrptlocid_(s,sid): """ :returns: parsed subelement of type location civic in msmt request """ ret = s if sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_ID_ORIGIN: ret = {'originator':_hwaddr_(struct.unpack_from('=6B',s))} elif sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_ID_TARGET: ret = {'target':_hwaddr_(struct.unpack_from('=6B',s))} elif sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_ID_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # EVENT REQUEST sublements for Type transition Std 8.4.2.69.2
def _iesubelevreqp2p_(s,sid): """ :returns: parsed sublements of type P2P link in event request """ ret = s if sid == std.EVENT_REQUEST_TYPE_P2P_PEER: ret = {'peer-addr':_hwaddr_(struct.unpack_from('=6B',s))} elif sid == std.EVENT_REQUEST_TYPE_P2P_CH_NUM: # TODO: make this a single function -> it appears multiple times o,c = struct.unpack_from('=2B',s) ret = {'op-class':o,'ch-num':c} return ret # EVENT REQUEST sublements for Type Vend Std 8.4.2.69.5
def _parselcirpt_(s): """ :returns: parsed lci location elements """ lci = {} for n,i,l,x,f in _EID_MSMT_RPT_LCI_FIELDS_: if n == 'lat-int' or n == 'lon-int' or n == 'alt-int': # these are 2's complement lci[n] = int2s(s[i:i+1]+'\x00'*x) else: lci[n] = struct.unpack_from(f,s[i:i+l]+'\x00'*x) return lci # MSMT Report->MCast Diagn report reason field Std Fig. 8-188
def _parsesuitesel_(s): """ :returns: parse suite selector from packed string s """ vs = struct.unpack_from('=4B',s) return {'oui':_hwaddr_(vs[0:3]).replace(':','-'),'suite-type':vs[-1]} # RSN capabilities of the RSNE Std Fig 8-188
def _parsetimeval_(s): """ :returns: a parsed time value from packed string s """ tval = struct.unpack_from('=H5BHB',s) return {'year':tval[0], 'month':tval[1], 'day':tval[2], 'hours':tval[3], 'minutes':tval[4], 'seconds':tval[5], 'milliseconds':tval[6], 'rsrv':tval[7]} ################################################################################ #### CTRL Frames Std 8.3.1 ################################################################################
def _tkip_(f,m): """ parse tkip data from frame f into mac dict :param f: frame :param m: mpdu dict """ try: keyid = struct.unpack_from('=B',f,m['offset']+_TKIP_KEY_BYTE_)[0] m['l3-crypt'] = {'type':'tkip', 'iv':{'tsc1':f[m['offset']+_TKIP_TSC1_BYTE_], 'wep-seed':f[m['offset']+_TKIP_WEPSEED_BYTE_], 'tsc0':f[m['offset']+_TKIP_TSC0_BYTE_], 'key-id':{'rsrv':bits.leastx(_TKIP_EXT_IV_,keyid), 'ext-iv':bits.midx(_TKIP_EXT_IV_,_TKIP_EXT_IV_LEN_,keyid), 'key-id':bits.mostx(_TKIP_EXT_IV_+_TKIP_EXT_IV_LEN_,keyid)}}, 'ext-iv':{'tsc2':f[m['offset']+_TKIP_TSC2_BYTE_], 'tsc3':f[m['offset']+_TKIP_TSC3_BYTE_], 'tsc4':f[m['offset']+_TKIP_TSC4_BYTE_], 'tsc5':f[m['offset']+_TKIP_TSC5_BYTE_]}, 'mic':f[-(_TKIP_MIC_LEN_ + _TKIP_ICV_LEN_):-_TKIP_ICV_LEN_], 'icv':f[-_TKIP_ICV_LEN_:]} m['offset'] += _TKIP_IV_LEN_ m['stripped'] += _TKIP_MIC_LEN_ + _TKIP_ICV_LEN_ except Exception as e: m['err'].append(('l3-crypt.tkip',"parsing {0}".format(e))) #### CCMP Std 11.4.3.2 # <MAC HDR>|CCMP HDR|DATA|MIC|FCS # bytes var| 8| >=1| 8| 4 # where the CCMP Header is defined # PN0|PN1|RSRV|RSRV|EXT IV|KeyID|PN2|PN3|PN4|PN5 # bits 8| 8| 8| 5| 1| 2| 8| 8| 8| 8