我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bitstring.BitArray()。
def rx_stream(self, count=None, secs=None): self.set_rx_mode() start = time.time() while count is None or count > 0: buffer = self.device.read(0x82, 64) if count is not None: count -= 1 if secs is not None: if time.time() >= start+secs: break pkt = BitArray(bytes=buffer) metadata = pkt.unpack('uint:8, uint:8, uint:8, uint:8, uint:32,' 'int:8, int:8, int:8, uint:8') metanames = ('pkt_type', 'status', 'channel', 'clkn_high', 'clk100ns', 'rssi_max', 'rssi_min', 'rss_avg', 'rssi_count') yield dict(zip(metanames, metadata)), pkt[112:]
def generateAuthCode(secret,offset=0): secret = b64decode(secret) secret = BitArray(bytes=secret,length=len(secret)*8) buff = BitArray(8*8) timestamp = timeOffset(offset) buff[4*8:] = int(timestamp/30) auth_hmac = hmac.new(secret.tobytes(),buff.tobytes(),hashlib.sha1) hashed = auth_hmac.digest() hashed = BitArray(bytes=hashed,length=len(hashed)*8) start = hashed[(19*8):(19*8)+8] & BitArray('0x0f') hash_slice = hashed[start.int*8:(start.int*8)+(4*8)] fullcode = hash_slice & BitArray("0x7fffffff") fullcode = fullcode.int chars = '23456789BCDFGHJKMNPQRTVWXY' code = "" for x in range(5): code += chars[int(fullcode % len(chars))] fullcode = fullcode/len(chars) return code
def string(length): def make_string_field(parent, **field_options): length_in_bits = length * 8 def encode_string(value): if type(value) != bytes: value = value.encode('utf-8') return BitArray(bytes=value) def decode_string(encoded): return encoded.bytes return BreadField(length_in_bits, encode_string, decode_string, str_format=field_options.get('str_format', None)) return make_string_field
def new(spec, type_name='bread_struct', data=None): struct = build_struct(spec, type_name) if data is None: data = BitArray(bytearray(int(math.ceil(len(struct) / 8.0)))) if len(struct) > len(data): raise ValueError( ("Data being parsed isn't long enough; expected at least %d " "bits, but data is only %d bits long") % (len(struct), len(data))) struct._set_data(data[:len(struct)]) struct._offset = 0 return struct
def __encode_voice_header( self, _rx_slot, _sync, _dtype ): _src_id = _rx_slot.rf_src _dst_id = _rx_slot.dst_id _cc = _rx_slot.cc # create lc lc = '\x00\x00\x00' + _dst_id + _src_id # PF + Reserved + FLCO + FID + Service Options + Group Address + Source Address # encode lc into info full_lc_encode = bptc.encode_header_lc(lc) _rx_slot.emblc = bptc.encode_emblc(lc) # save off the emb lc for voice frames B-E _rx_slot.emblc[5] = bitarray(32) # NULL message (F) # create slot_type slot_type = chr((_cc << 4) | (_dtype & 0x0f)) # data type is Header or Term # generate FEC for slot type slot_with_fec = BitArray(uint=golay.encode_2087(slot_type), length=20) # construct final frame - info[0:98] + slot_type[0:10] + DMR_DATA_SYNC_MS + slot_type[10:20] + info[98:196] frame_bits = full_lc_encode[0:98] + slot_with_fec[0:10] + decode.to_bits(_sync) + slot_with_fec[10:20] + full_lc_encode[98:196] return decode.to_bytes(frame_bits) # Create a voice header DMR frame
def send_voice72(self, _rx_slot, _ambe): ambe72_1 = BitArray('0x' + ahex(_ambe[0:9]))[0:72] ambe72_2 = BitArray('0x' + ahex(_ambe[9:18]))[0:72] ambe72_3 = BitArray('0x' + ahex(_ambe[18:27]))[0:72] ambe49_1 = ambe_utils.convert72BitTo49BitAMBE(ambe72_1) ambe49_2 = ambe_utils.convert72BitTo49BitAMBE(ambe72_2) ambe49_3 = ambe_utils.convert72BitTo49BitAMBE(ambe72_3) ambe49_1.append(False) ambe49_2.append(False) ambe49_3.append(False) ambe = ambe49_1 + ambe49_2 + ambe49_3 _frame = self._tempVoice[_rx_slot.vf][:33] + ambe.tobytes() + self._tempVoice[_rx_slot.vf][52:] # Insert the 3 49 bit AMBE frames self.rewriteFrame(_frame, _rx_slot.slot, _rx_slot.dst_id, _rx_slot.rf_src, _rx_slot.repeater_id) _rx_slot.vf = (_rx_slot.vf + 1) % 6 # the voice frame counter which is always mod 6 pass
def getBigEndian(hex_val): step0 = "0x" + hex_val step1 = BitArray(step0) step2 = step1.hex step3 = step2[::-1] step4 = len(step2) string="" for b in range(0,step4,2): revString = step3[b:(b+2)] string += revString[::-1] return string
def getDifficult(bits): step0 = getBigEndian(bits) step1 = "0x"+step0 step2 = BitArray(step1) step3 = step2.hex first_pair ="0x" + step3[:2] firstPair = BitArray(first_pair) second_pair ="0x" + step3[2:8] secondPair = BitArray(second_pair) firstPairVal = firstPair.int secondPairVal = secondPair.int formula = 2**(8*(firstPairVal -3)) bitsDec = secondPairVal * formula highestDifficulty = BitArray("0x00000000FFFF0000000000000000000000000000000000000000000000000000") highDiffDec = highestDifficulty.int answer = float(highDiffDec/bitsDec) return answer
def computeBlockHash(rawHash): initial_step = "0x" + rawHash primary_step = BitArray(initial_step) step0 = primary_step.bytes step1 = hashlib.sha256(step0) step2 = step1.digest() step3 = hashlib.sha256(step2) step4 = step3.hexdigest() step5 = "0x" + step4 step6 = BitArray(step5) step7 = hexHashReverser(step6) return step7
def computeTransHash(rawHash): initial_step = "0x" + rawHash primary_step = BitArray(initial_step) step0 = primary_step.bytes step1 = hashlib.sha256(step0) step2 = step1.digest() step3 = hashlib.sha256(step2) step4 = step3.hexdigest() step5 = "0x" + step4 step6 = BitArray(step5) step7 = hexHashReverser(step6) return step7 #address must be in decimal form before entering it into base58encode
def getDifficult(bits): step0 = getBigEndian(bits) step1 = "0x"+step0 step2 = BitArray(step1) step3 = step2.hex first_pair ="0x" + step3[:2] firstPair = BitArray(first_pair) second_pair ="0x" + step3[2:8] secondPair = BitArray(second_pair) firstPairVal = firstPair.int secondPairVal = secondPair.int formula = 2**(8*(firstPairVal -3)) bitsDec = secondPairVal * formula highestDifficulty = BitArray("0x00000000FFFF0000000000000000000000000000000000000000000000000000") highDiffDec = highestDifficulty.int answer = round(highDiffDec/bitsDec,2) return answer
def recv_bitfield(self, length): """ Received only at the start of a connection when a peer wants to tell you all the pieces it has in a very compact form. :param length: The size of the payload, a number of bits representing the number of pieces """ payload = self.recv(length) bits = BitArray(bytes=payload) for i in range(len(self.torrent.pieces)): sha, piece = self.torrent.pieces.items()[i] piece = self.PIECE(sha, self, have=bits[i]) self.pieces[sha] = piece
def send_bitfield(self): """ Tell the peer all the pieces you have in a really compact form. """ # the pieces are compacted in sequence in to bits field = BitArray( map( lambda (sha, piece): sha == piece.digest, self.torrent.pieces.items() ) ) self.send_payload(5, field.tobytes())
def checklength(pdu, speclen): "pdu is hex encoded, 4 bits per char." assert type(pdu) == str assert speclen is None or isinstance(speclen, int) assert len(pdu) % 2 == 0 if speclen is not None: if len(pdu)/2 != speclen: raise DataError("mtype=0x%s: Incorrect length %s (got %s) body: %s" % (pdu[:3*2], speclen, len(pdu)/2, pdu[3*2:])) return BitArray(hex=pdu[3*2:-1*2])
def intdecoder(body, width=8, signed=False): assert type(body) == BitArray assert width % 8 == 0 assert width in [8, 16] # for now, due to fmt. fmt = "%03i" if width == 16: fmt = "%05i" s = [] for idx in range(0, body.len, width): value = body[idx:idx+width] s += [fmt % (value.intle if signed else value.uintle)] return [("ints", " ".join(s)), ('strbody', body.hex)]
def pack(self): values = _BitArray(self.count*self.size) for i in range(self.count): offset = i * self.size try: values[offset:offset + self.size] = int(self.physical_range.scale_to(self.logical_range, self._values[i])) except ArithmeticError: # If the value is outside of the physical range, and NULLs are allowed, then do not modify the value if not self.flags & ReportFlags.NULL_STATE: raise return values
def serialize(self, report_type: ReportType) -> bytes: data = _BitArray() for item in self.items: if isinstance(item, Report): if item.report_type is not report_type: continue data.append(item.pack()) else: data.append(item.serialize(report_type)) return data
def toHex(self): '''Returns a hex representation of the EPCNumber''' #h = BitArray("0b%s" % self._bits) h = hex(int(self._bits,2)) return h[2:].upper()
def toHex(self): '''Returns a hex representation of the GDTI-113. Need to make the bits divisble by 4''' h = BitArray("0b%s%s" % ("0",self._bits)) return h.hex[2:].upper()
def getBits(self): if(self._fieldValue!=None and self._bitLength!=None and self._bitLength>0): ui = BitArray(uint = Conversion().uint32(self._fieldValue),length=self._bitLength) return ui.bin[2:] else: return "0".zfill(self._bitLength)
def u5_to_bitarray(arr): ret = bitstring.BitArray() for a in arr: ret += bitstring.pack("uint:5", a) return ret
def tagged_bytes(char, l): return tagged(char, bitstring.BitArray(l)) # Discard trailing bits, convert to bytes.
def __init__(self, src_ase, dst_ase, block_list, options, resume_mgr): # type: (Descriptior, blobxfer.models.azure.StorageEntity, # blobxfer.models.azure.StorageEntity, list, # blobxfer.models.options.SyncCopy, # blobxfer.operations.resume.SyncCopyResumeManager) -> None """Ctor for Descriptor :param Descriptor self: this :param blobxfer.models.azure.StorageEntity src_ase: source Azure Storage Entity :param blobxfer.models.azure.StorageEntity dst_ase: destination Azure Storage Entity :param list block_list: source blob block list :param blobxfer.models.options.SyncCopy options: synccopy options :param blobxfer.operations.resume.SyncCopyResumeManager resume_mgr: synccopy resume manager """ self._offset = 0 self._chunk_num = 0 self._finalized = False self._meta_lock = threading.Lock() self._resume_mgr = resume_mgr self._src_ase = src_ase self._dst_ase = dst_ase self._src_block_list = block_list self._chunk_size = self._compute_chunk_size() # calculate the total number of ops required for transfer self._total_chunks = self._compute_total_chunks(self._chunk_size) self._outstanding_ops = self._total_chunks if blobxfer.util.is_not_empty(self._dst_ase.replica_targets): self._outstanding_ops *= len(self._dst_ase.replica_targets) + 1 if self._resume_mgr: self._completed_chunks = bitstring.BitArray( length=self._total_chunks) self._replica_counters = {}
def __init__(self, node_id): self.node_id = BitArray(node_id) self.root = Bucket(self.node_id) self.__nodes = {}
def get_neighbor(self, info_hash): info_hash = BitArray(info_hash) return self.root.get_neighbor(info_hash)[0]
def __init__(self, node_id, addr): self.nid = BitArray(bytes=node_id) self.addr = addr self.dubious = False self.is_bad = False self.last_fresh = time.time()