我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bitstring.pack()。
def encode_fallback(fallback, currency): """ Encode all supported fallback addresses. """ if currency == 'bc' or currency == 'tb': fbhrp, witness = bech32_decode(fallback) if fbhrp: if fbhrp != currency: raise ValueError("Not a bech32 address for this currency") wver = witness[0] if wver > 16: raise ValueError("Invalid witness version {}".format(witness[0])) wprog = u5_to_bitarray(witness[1:]) else: addr = base58.b58decode_check(fallback) if is_p2pkh(currency, addr[0]): wver = 17 elif is_p2sh(currency, addr[0]): wver = 18 else: raise ValueError("Unknown address type for {}".format(currency)) wprog = addr[1:] return tagged('f', bitstring.pack("uint:5", wver) + wprog) else: raise NotImplementedError("Support for currency {} not implemented".format(currency))
def _updateBitString(self): self._packStringFormat = 'uint:8, uint:3, uint:3, uint:%s, uint:%s' % (self.getField("CompanyPrefix").getBitLength(),self.getField("IndividualAssetReference").getBitLength()) #Pack the bitstring bsp= bitstring.pack(self._packStringFormat, self.getFieldValue("Header"), self.getFieldValue("Filter"), self.getFieldValue("Partition"), self.getFieldValue("CompanyPrefix"), self.getFieldValue("IndividualAssetReference")) #Set the _bits for the SGTIN-96 self._bits = bsp.unpack("bin")[0][2:]
def _updateBitString(self): self._packStringFormat = 'uint:8, uint:3, uint:3, uint:%s, uint:%s, uint:24' % (self.getField("CompanyPrefix").getBitLength(),self.getField("ServiceReference").getBitLength()) #Pack the bitstring bsp= bitstring.pack(self._packStringFormat, self.getFieldValue("Header"), self.getFieldValue("Filter"), self.getFieldValue("Partition"), self.getFieldValue("CompanyPrefix"), self.getFieldValue("ServiceReference"), self.getFieldValue("Reserved")) #Set the _bits for the GSRN-96 self._bits = bsp.unpack("bin")[0][2:]
def encode(self,generalManager,indicatorDigit,objectClass,filter_value,serialNumber=0): """ encodes an GID-96. """ #Reload fields self._loadFields() #General Manager generalManagerField = self.getField("GeneralManager") generalManagerField.setBitLength(28) generalManagerField.setDigitLength(28/4) generalManagerField.setOffset(8) self.setFieldValue("GeneralManager",int(generalManager)) #ObjectClass objectClassField = self.getField("ObjectClass") objectClassField.setBitLength(24) objectClassField.setDigitLength(24/8) objectClassField.setOffset(36) self.setFieldValue("ObjectClass",int(objectClass)) #SerialNumber serialNumberField = self.getField("SerialNumber") serialNumberField.setBitLength(36) serialNumberField.setDigitLength(36/4) self.setFieldValue("SerialNumber",int(serialNumber)) #Set the PackString Format self._packStringFormat = 'uint:8, uint:28, uint:24, uint:36' #Pack the bitstring bsp= bitstring.pack(self._packStringFormat, int(self.getFieldValue("Header")), int(self.getFieldValue("GeneralManager")), int(self.getFieldValue("ObjectClass")), int(self.getFieldValue("SerialNumber"))) #Set the _bits for the GID-96 self._bits = bsp.unpack("bin")[0][2:] return self
def _updateBitString(self): #Set the PackString Format self._packStringFormat = 'uint:8, uint:3, uint:3, uint:%s, uint:%s, uint:41' % (self.getField("CompanyPrefix").getBitLength(),self.getField("LocationReference").getBitLength()) #Pack the bitstring bsp= bitstring.pack(self._packStringFormat, self.getFieldValue("Header"), self.getFieldValue("Filter"), self.getFieldValue("Partition"), self.getFieldValue("CompanyPrefix"), self.getFieldValue("LocationReference"), self.getFieldValue("Extension")) #Set the _bits for the SGLN-96 self._bits = bsp.unpack("bin")[0][2:]
def _updateBitString(self): self._packStringFormat = 'uint:8, uint:3, uint:3, uint:%s, uint:%s, uint:24' % (self.getField("CompanyPrefix").getBitLength(),self.getField("SerialReference").getBitLength()) #Pack the bitstring bsp= bitstring.pack(self._packStringFormat, self.getFieldValue("Header"), self.getFieldValue("Filter"), self.getFieldValue("Partition"), self.getFieldValue("CompanyPrefix"), self.getFieldValue("SerialReference"), self.getFieldValue("Reserved")) #Set the _bits for the SSCC-96 self._bits = bsp.unpack("bin")[0][2:]
def _updateBitString(self): #Set the PackString Format self._packStringFormat = 'uint:8, uint:3, uint:3, uint:%s, uint:%s, uint:38' % (self.getField("CompanyPrefix").getBitLength(),self.getField("AssetType").getBitLength()) #Pack the bitstring bsp= bitstring.pack(self._packStringFormat, self.getFieldValue("Header"), self.getFieldValue("Filter"), self.getFieldValue("Partition"), self.getFieldValue("CompanyPrefix"), self.getFieldValue("AssetType"), self.getFieldValue("Serial")) #Set the _bits for the GRAI-96 self._bits = bsp.unpack("bin")[0][2:]
def updateBitString(self): self._packStringFormat = 'uint:8, uint:3, uint:3, uint:%s, uint:%s, uint:38' % (self.getField("CompanyPrefix").getBitLength(),self.getField("ItemReference").getBitLength()) #Pack the bitstring bsp= bitstring.pack(self._packStringFormat, self.getFieldValue("Header"), self.getFieldValue("Filter"), self.getFieldValue("Partition"), self.getFieldValue("CompanyPrefix"), self.getFieldValue("ItemReference"), int(self.getFieldValue("SerialNumber"))) #Set the _bits for the SGTIN-96 self._bits = bsp.unpack("bin")[0]
def _updateBitString(self): #Set the PackString Format self._packStringFormat = 'uint:8, uint:3, uint:3, uint:%s, uint:%s, uint:58' % (self.getField("CompanyPrefix").getBitLength(),self.getField("DocumentType").getBitLength()) #Pack the bitstring bsp= bitstring.pack(self._packStringFormat, self.getFieldValue("Header"), self.getFieldValue("Filter"), self.getFieldValue("Partition"), self.getFieldValue("CompanyPrefix"), self.getFieldValue("DocumentType"), self.getFieldValue("Serial")) #Set the _bits for the GDTI-113 self._bits = bsp.unpack("bin")[0][2:]
def u5_to_bitarray(arr): ret = bitstring.BitArray() for a in arr: ret += bitstring.pack("uint:5", a) return ret
def tagged(char, l): # Tagged fields need to be zero-padded to 5 bits. while l.len % 5 != 0: l.append('0b0') return bitstring.pack("uint:5, uint:5, uint:5", CHARSET.find(char), (l.len / 5) / 32, (l.len / 5) % 32) + l # Tagged field containing bytes
def packing(gateway_addr, node_addr , trans_direct, func_code, wind_direct, wind_speed, model, on_off, work_mode, temp): return bitstring.pack('bin, bin, bin, bin, bin, bin, bin, bin, bin, bin', gateway_addr, node_addr , trans_direct, func_code, wind_direct, wind_speed, model, on_off, work_mode, temp)
def return_air_con_mqtt_str_bytes_to_send(str_bin): units,crc = return_crc(str_bin) str_bytes=struct.pack('7B', units[0], units[1], units[2], units[3], units[4], units[5], crc) print(len(str_bytes)) print(repr(str_bytes)) return str_bytes
def mqtt_pub_air_con(data): # {'node_select': 2, 'wind_speed': 2, 'temp_setting': 28, 'wind_directtion': 1, 'switch': 1, 'working_model': 1} node_addr = bitstring.pack('uint:13',data['node_select']).bin wind_direct = bitstring.pack('uint:2',data['wind_directtion']).bin wind_speed = bitstring.pack('uint:2',data['wind_speed']).bin on_off = bitstring.pack('uint:2',data['switch']).bin work_mode = bitstring.pack('uint:3',data['working_model']).bin temp = bitstring.pack('uint:5',data['temp_setting']).bin str_bin = return_str_bin(node_addr, wind_direct, wind_speed, on_off, work_mode, temp) str_bytes = return_air_con_mqtt_str_bytes_to_send(str_bin) transmitMQTT(str_bytes)
def mqtt_pub_node_setting(): gateway_addr = '0b001' # 1 node_addr = '0b0000000010100' # 1 trans_direct = '0b1' # 1 func_code = '0b0010010' # 18 new_gateway_addr = '0b001' # new_node_addr = '0b0000000001111' new_node_addr = node_addr reserve = '0b0000' sleep_time = '0b0100101100' #5 minute # sleep_time = '0b0000110111' #55 second # sleep_time = '0b0001111000' #120 second send_power = '0b11' str_replaced = replace_0b(gateway_addr) + replace_0b(node_addr) + replace_0b(trans_direct) + replace_0b(func_code) + replace_0b(new_gateway_addr) + replace_0b(new_node_addr) + replace_0b(reserve) + replace_0b(sleep_time) + replace_0b(send_power) print('str_repalced:', str_replaced) str_bin = BitStream('0b' + str_replaced) print('str_bin:', str_bin) units, crc = return_crc(str_bin) str_bytes = struct.pack('8B', units[0], units[1], units[2], units[3], units[4], units[5], units[6], crc) print(str_bytes) print(len(str_bytes)) print(repr(str_bytes)) transmitMQTT(str_bytes)
def replace_0b(input): return input.replace('0b', '') # str_bin = return_str_bin(gateway_addr, node_addr, trans_direct, func_code, wind_direct, wind_speed, model, on_off, work_mode, temp) # str_bin = bitstring.pack(gateway_addr, node_addr, trans_direct, func_code, wind_direct, wind_speed, model, on_off, work_mode, temp)
def bitwise_reverse(int_src): bin_str = bitstring.pack('uint:8',int_src).bin ret = "".join(map(lambda x: "1" if x == "0" else "0", bin_str)) return int(ret, 2)
def padding(length): # pragma: no cover def make_padding_field(parent, **field_options): def encode_pad(value): return pack('pad:n', n=value) def decode_pad(encoded): return None return BreadField( length, encode_pad, decode_pad, str_format=field_options.get('str_format', None)) return make_padding_field
def get_payload(self): self.payload_fields.append(("Service", self.service)) self.payload_fields.append(("Port", self.port)) service = little_endian(bitstring.pack("8", self.service)) port = little_endian(bitstring.pack("32", self.port)) payload = service + port return payload
def get_payload(self): self.payload_fields.append(("Signal (mW)", self.signal)) self.payload_fields.append(("TX (bytes since on)", self.tx)) self.payload_fields.append(("RX (bytes since on)", self.rx)) self.payload_fields.append(("Reserved", self.reserved1)) signal = little_endian(bitstring.pack("32", self.signal)) tx = little_endian(bitstring.pack("32", self.tx)) rx = little_endian(bitstring.pack("32", self.rx)) reserved1 = little_endian(bitstring.pack("16", self.reserved1)) payload = signal + tx + rx + reserved1 return payload
def get_payload(self): self.payload_fields.append(("Timestamp of Build", self.build)) self.payload_fields.append(("Reserved", self.reserved1)) self.payload_fields.append(("Version", self.version)) build = little_endian(bitstring.pack("64", self.build)) reserved1 = little_endian(bitstring.pack("64", self.reserved1)) version = little_endian(bitstring.pack("32", self.version)) payload = build + reserved1 + version return payload
def get_payload(self): self.payload_fields.append(("Power", self.power_level)) power_level = little_endian(bitstring.pack("16", self.power_level)) payload = power_level return payload
def get_payload(self): self.payload_fields.append(("Label", self.label)) field_len_bytes = 32 label = b"".join(little_endian(bitstring.pack("8", ord(c))) for c in self.label) padding = b"".join(little_endian(bitstring.pack("8", 0)) for i in range(field_len_bytes-len(self.label))) payload = label + padding return payload
def get_payload(self): self.payload_fields.append(("Vendor", self.vendor)) self.payload_fields.append(("Reserved", self.product)) self.payload_fields.append(("Version", self.version)) vendor = little_endian(bitstring.pack("32", self.vendor)) product = little_endian(bitstring.pack("32", self.product)) version = little_endian(bitstring.pack("32", self.version)) payload = vendor + product + version return payload
def get_payload(self): self.payload_fields.append(("Current Time", self.time)) self.payload_fields.append(("Uptime (ns)", self.uptime)) self.payload_fields.append(("Last Downtime Duration (ns) (5 second error)", self.downtime)) time = little_endian(bitstring.pack("64", self.time)) uptime = little_endian(bitstring.pack("64", self.uptime)) downtime = little_endian(bitstring.pack("64", self.downtime)) payload = time + uptime + downtime return payload
def get_payload(self): self.payload_fields.append(("Group ", self.group)) self.payload_fields.append(("Label ", self.label)) self.payload_fields.append(("Updated At ", self.updated_at)) group = b"".join(little_endian(bitstring.pack("8", b)) for b in self.group) label = b"".join(little_endian(bitstring.pack("8", ord(c))) for c in self.label) label_padding = b"".join(little_endian(bitstring.pack("8", 0)) for i in range(32-len(self.label))) label += label_padding updated_at = little_endian(bitstring.pack("64", self.updated_at)) payload = group + label + updated_at return payload
def get_payload(self): field_len = 64 self.payload_fields.append(("Byte Array", self.byte_array)) byte_array = b"".join(little_endian(bitstring.pack("8", b)) for b in self.byte_array) byte_array_len = len(byte_array) if byte_array_len < field_len: byte_array += b"".join(little_endian(bitstring.pack("8", 0)) for i in range(field_len-byte_array_len)) elif byte_array_len > field_len: byte_array = byte_array[:field_len] payload = byte_array return payload
def get_payload(self): self.payload_fields.append(("Byte Array", self.byte_array)) byte_array = b"".join(little_endian(bitstring.pack("8", b)) for b in self.byte_array) payload = byte_array return payload ##### LIGHT MESSAGES #####
def get_payload(self): reserved_8 = little_endian(bitstring.pack("8", self.reserved)) color = b"".join(little_endian(bitstring.pack("16", field)) for field in self.color) duration = little_endian(bitstring.pack("32", self.duration)) payload = reserved_8 + color + duration return payload
def get_payload(self): reserved_8 = little_endian(bitstring.pack("8", self.reserved)) transient = little_endian(bitstring.pack("uint:8", self.transient)) color = b"".join(little_endian(bitstring.pack("16", field)) for field in self.color) period = little_endian(bitstring.pack("uint:32", self.period)) cycles = little_endian(bitstring.pack("float:32", self.cycles)) duty_cycle = little_endian(bitstring.pack("int:16", self.duty_cycle)) waveform = little_endian(bitstring.pack("uint:8", self.waveform)) payload = reserved_8 + transient + color + period + cycles + duty_cycle + waveform return payload
def get_payload(self): power_level = little_endian(bitstring.pack("16", self.power_level)) duration = little_endian(bitstring.pack("32", self.duration)) payload = power_level + duration return payload
def get_payload(self): self.payload_fields.append(("Power Level", self.power_level)) power_level = little_endian(bitstring.pack("16", self.power_level)) payload = power_level return payload ##### INFRARED MESSAGES #####
def get_payload(self): self.payload_fields.append(("Infrared Brightness", self.infrared_brightness)) infrared_brightness = little_endian(bitstring.pack("16", self.infrared_brightness)) payload = infrared_brightness return payload
def get_payload(self): infrared_brightness = little_endian(bitstring.pack("16", self.infrared_brightness)) payload = infrared_brightness return payload ##### MULTIZONE MESSAGES #####
def get_payload(self): self.payload_fields.append(("Count", self.count)) self.payload_fields.append(("Index", self.index)) self.payload_fields.append(("Color (HSBK)", self.color)) count = little_endian(bitstring.pack("8", self.count)) index = little_endian(bitstring.pack("8", self.index)) payload = count + index for color in self.color: payload += b"".join(little_endian(bitstring.pack("16", field)) for field in color) return payload
def get_payload(self): start_index = little_endian(bitstring.pack("8", self.start_index)) end_index = little_endian(bitstring.pack("8", self.end_index)) color = b"".join(little_endian(bitstring.pack("16", field)) for field in self.color) duration = little_endian(bitstring.pack("32", self.duration)) apply = little_endian(bitstring.pack("8", self.apply)) payload = start_index + end_index + color + duration + apply return payload
def get_payload(self): start_index = little_endian(bitstring.pack("8", self.start_index)) end_index = little_endian(bitstring.pack("8", self.end_index)) payload = start_index + end_index return payload
def get_payload(self): return little_endian(bitstring.pack(""))
def get_frame(self): size_format = self.frame_format[0] flags_format = self.frame_format[1] source_id_format = self.frame_format[2] size = little_endian(bitstring.pack(size_format, self.size)) flags = little_endian(bitstring.pack(flags_format, self.origin, self.tagged, self.addressable, self.protocol)) source_id = little_endian(bitstring.pack(source_id_format, self.source_id)) frame = size + flags + source_id return frame
def get_frame_addr(self): mac_addr_format = self.frame_addr_format[0] reserved_48_format = self.frame_addr_format[1] response_flags_format = self.frame_addr_format[2] seq_num_format = self.frame_addr_format[3] mac_addr = little_endian(bitstring.pack(mac_addr_format, convert_MAC_to_int(self.target_addr))) reserved_48 = little_endian(bitstring.pack(reserved_48_format, self.reserved)) response_flags = little_endian(bitstring.pack(response_flags_format, self.reserved, self.ack_requested, self.response_requested)) seq_num = little_endian(bitstring.pack(seq_num_format, self.seq_num)) frame_addr = mac_addr + reserved_48 + response_flags + seq_num return frame_addr
def little_endian(bs): shifts = [i*8 for i in range(int(len(bs)/8))] int_bytes_little_endian = [int(bs.uintbe >> i & 0xff) for i in shifts] packed_message_little_endian = b"" for b in int_bytes_little_endian: packed_message_little_endian += struct.pack("B", b) return packed_message_little_endian