我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用binascii.hexlify()。
def assert_fingerprint(cert, fingerprint): """ Checks if given fingerprint matches the supplied certificate. :param cert: Certificate as bytes object. :param fingerprint: Fingerprint as string of hexdigits, can be interspersed by colons. """ fingerprint = fingerprint.replace(':', '').lower() digest_length = len(fingerprint) hashfunc = HASHFUNC_MAP.get(digest_length) if not hashfunc: raise SSLError( 'Fingerprint of invalid length: {0}'.format(fingerprint)) # We need encode() here for py32; works on py2 and p33. fingerprint_bytes = unhexlify(fingerprint.encode()) cert_digest = hashfunc(cert).digest() if not _const_compare_digest(cert_digest, fingerprint_bytes): raise SSLError('Fingerprints did not match. Expected "{0}", got "{1}".' .format(fingerprint, hexlify(cert_digest)))
def parse_slave_and_channel(file_path): """Parse the dumped file to get slave address and channel number. :param file_path: file path of dumped SDR file. :return: slave address and channel number of target device. """ prefix = '5701000d01' # According to Intel Node Manager spec, section 4.5, for Intel NM # discovery OEM SDR records are type C0h. It contains manufacture ID # and OEM data in the record body. # 0-2 bytes are OEM ID, byte 3 is 0Dh and byte 4 is 01h. Byte 5, 6 # is Intel NM device slave address and channel number/sensor owner LUN. with open(file_path, 'rb') as bin_fp: data_str = binascii.hexlify(bin_fp.read()) if six.PY3: data_str = data_str.decode() oem_id_index = data_str.find(prefix) if oem_id_index != -1: ret = data_str[oem_id_index + len(prefix): oem_id_index + len(prefix) + 4] # Byte 5 is slave address. [7:4] from byte 6 is channel # number, so just pick ret[2] here. return ('0x' + ret[0:2], '0x0' + ret[2])
def hadleSslCommunication(self, deviceId, sslSocket): try: while True: payload = iotcommon.recvMessage(sslSocket) clientAddr = sslSocket.getpeername() self.logger.info("Received SSL payload from {0} at {1}:{2}: {3}".format(binascii.hexlify(deviceId), clientAddr[0], clientAddr[1], payload)) if deviceId in self.sessions: session = self.sessions[deviceId] else: self.logger.debug(" creating new session for SSL device: %s", binascii.hexlify(deviceId)) session = IotSession(deviceId, IotSession.TYPE_SSL) self.sessions[deviceId] = session session.lastUpdateTime = datetime.datetime.now() session.lastPayload = payload if self.logger.getEffectiveLevel() == logging.DEBUG: self.dumpSessions() self.passToHandler(deviceId, payload) except Exception as e: self.logger.exception(e) try: self.removeSession(deviceId) sslSocket.shutdown(socket.SHUT_RDWR) sslSocket.close() except: pass
def handleUdpMessage(self, message, clientAddr): self.logger.debug(" handling decoded UDP message from device") isNewSession = False if message.deviceId in self.sessions: session = self.sessions[message.deviceId] else: self.logger.debug(" attemping to create new session for UDP device: %s", binascii.hexlify(message.deviceId)) session = IotSession(message.deviceId, IotSession.TYPE_UDP) isNewSession = True counter = self.getCounter(message.deviceId) self.logger.debug(" Validating counters: local={0}, incoming={1}".format(counter.udpReceivedCounter, message.counter1)) if (message.counter1 > counter.udpReceivedCounter): self.logger.debug(" Counter OK. updating session for device %s", binascii.hexlify(message.deviceId)) session.lastUdpMessage = message session.lastPayload = message.payload session.clientAddr = clientAddr session.lastUpdateTime = datetime.datetime.now() counter.udpReceivedCounter = message.counter1 if isNewSession: self.sessions[message.deviceId] = session self.logger.info("Received valid UDP message from {0}:{1}, deviceId={2}, payload={3}. Calling server handler.".format(clientAddr[0], clientAddr[1], binascii.hexlify(message.deviceId), message.payload)) self.passToHandler(message.deviceId, message.payload) else: self.logger.warning("Invalid counter in message from device {0}, local={1}, incoming={2} - discarding".format(binascii.hexlify(message.deviceId), counter.udpReceivedCounter, message.counter1))
def sendMessage(self, deviceId, payload): self.logger.debug("Attempting do send message to device %s, payload %s", binascii.hexlify(deviceId), payload) if deviceId in self.sessions: session = self.sessions[deviceId] if session.protocol == IotSession.TYPE_UDP: counter = self.getCounter(deviceId) message = UdpPacket(deviceId, UdpPacket.SERVER_TO_CLIENT, self.IOT_PROTOCOL_VERSION, counter.udpSentCounter, session.lastUdpMessage.counter1, payload) deviceKey = iotcommon.deriveKey(self.masterKey, deviceId) data = message.createPacket(deviceKey) self.logger.info("Sending {0} bytes in UDP to device {1} at {2}:{3}".format(len(data), binascii.hexlify(message.deviceId), session.clientAddr[0], session.clientAddr[1])) with session.lock: self.udpServer.socket.sendto(data, session.clientAddr) counter.udpSentCounter += 1 self.saveState() elif session.protocol == IotSession.TYPE_SSL: self.logger.info("Sending {0} bytes by SSL to device {1} at {2}:{3}".format(len(payload), binascii.hexlify(deviceId), session.clientAddr[0], session.clientAddr[1])) with session.lock: iotcommon.sendMessage(session.sslSocket, payload) else: self.logger.warning("could not send message to device - device %s is not connected", binascii.hexlify(message.deviceId)) return False
def handle(self): try: data = self.request[0] clientAddr = self.client_address; self.logger.debug("UDP packet from {0}:{1}, length {2}".format(clientAddr[0], clientAddr[1], len(data))) self.logger.debug("message hex : %s", binascii.hexlify(data)) if data[0:4] == "IOT\xff": self.logger.debug("heartbeat packet - ignoring") elif data[0:4] == "IOT\0" and len(data)>=88 and ((len(data)-72)%16) == 0: self.handleIotPacket(data, clientAddr) else: self.logger.warning("unknown packet - ignoring") except Exception as e: self.logger.exception(e) except: self.logger.error("error on handling incomming packet: {0} ".format(sys.exc_info()[0]))
def mapColor(self, t): r,g,b = 255,255,255 if t <= -30: r,g,b = 128,128,255 # below -30: dark blue elif t <= -10: r,g,b = self.linearMap(t, -30, -10, 128, 192),self.linearMap(t, -30, -10, 128, 192),255 # -30 to -10: to light blue elif t <= 5: r,g,b = self.linearMap(t, -10, 5, 192, 0),self.linearMap(t, -10, 5, 192, 255),self.linearMap(t, -10, 5, 255, 192) # -10 to 5: to aquamarin elif t <= 15: r,g,b = self.linearMap(t, 5, 15, 0, 128),255,self.linearMap(t, 5, 15, 192, 128) # 5 to 15: to green elif t <= 25: r,g,b = self.linearMap(t, 15, 25, 128, 255),255,128 # 15 to 25: to yellow elif t <= 35: r,g,b = 255,self.linearMap(t, 25, 35, 255, 128),128 # 25 to 35: to red else: r,g,b = 255,128,128 # above 30: red return "#" + binascii.hexlify(bytearray([r,g,b]))
def loadFromSession(self, session, deviceConfig, payloadDict): self.deviceId = binascii.hexlify(session.deviceId) self.protocol = session.protocol self.lastContact = session.lastUpdateTime.strftime('%Y-%m-%d %H:%M:%S') self.address = "{0}:{1}".format(session.clientAddr[0], session.clientAddr[1]) self.values = [] self.commands = [] self.images = [] self.isOnline = True conf = deviceConfig.get(self.deviceId, None) if conf: self.name = conf.get("name", self.deviceId) if payloadDict: if "values" in payloadDict: for attr, value in payloadDict["values"].items(): if conf: varConf = conf.get("values", {}).get(attr, {}) self.values.append(SensorValue(attr, varConf.get("label", attr), value, varConf.get("unit"))) else: self.values.append(SensorValue(attr, attr, value, "")) return self
def seed(self, a=None): """Initialize internal state from hashable object. None or no argument seeds from current time or from an operating system specific randomness source if available. If a is not None or an int or long, hash(a) is used instead. """ if a is None: try: a = long(_hexlify(_urandom(16)), 16) except NotImplementedError: import time a = long(time.time() * 256) # use fractional seconds super(Random, self).seed(a) self.gauss_next = None
def _wait_for_recognized_message(self): code = self._buffer[1] for ppcode in PP: if ppcode == code or ppcode == bytes([code]): ppc = PP.lookup(code, fullmessage=self._buffer) self.log.debug('Found a code %02x message which is %d bytes', code, ppc.size) if len(self._buffer) == ppc.size: new_message = self._buffer[0:ppc.size] self.log.debug('new message is: %s', binascii.hexlify(new_message)) self._recv_queue.append(new_message) self._buffer = self._buffer[ppc.size:] else: self.log.debug('Need more bytes to process message.')
def SendAndAccount(self, binary_data): # Keep this check! if self._logger.isEnabledFor(logging.DEBUG): logging.debug("!! Sending BIN data: {0}".format(binascii.hexlify(binary_data))) datalen = len(binary_data) if self._is_udp: self._swarm.SendData(self.ip_address, self.udp_port, binary_data) else: # Prevent crashes when TCP connection is already removed, but some sending is still pending if self._proto is not None: self._proto.send_data(binary_data) self._swarm._all_data_tx += datalen else: return # No need to increase sent data counter... self._total_data_tx += datalen
def test_send_receive(self): random.shuffle(self.swarm) senders = self.swarm[:len(self.swarm)/2] receivers = self.swarm[len(self.swarm)/2:] for sender, receiver in zip(senders, receivers): message = binascii.hexlify(os.urandom(64)) # check queue previously empty self.assertFalse(bool(receiver.message_list())) # send message self.assertTrue(sender.message_send(receiver.dht_id(), message)) # check received received = receiver.message_list() self.assertTrue(sender.dht_id() in received) messages = received[sender.dht_id()] self.assertTrue(len(messages) == 1) self.assertEqual(messages[0], message) # check queue empty after call to message_list self.assertFalse(bool(receiver.message_list()))
def test_flood(self): # every node subscribes and should receive the event topic = "test_flood_{0}".format(binascii.hexlify(os.urandom(32))) for peer in self.swarm: peer.pubsub_subscribe(topic) # wait until subscriptions propagate time.sleep(SLEEP_TIME) # send event peer = random.choice(self.swarm) event = binascii.hexlify(os.urandom(32)) peer.pubsub_publish(topic, event) # wait until event propagates time.sleep(SLEEP_TIME) # check all peers received the event for peer in self.swarm: events = peer.pubsub_events(topic) self.assertEqual(events, [event])
def test_multihop(self): random.shuffle(self.swarm) senders = self.swarm[:len(self.swarm) / 2] receivers = self.swarm[len(self.swarm) / 2:] for sender, receiver in zip(senders, receivers): # receiver subscribes to topic topic = "test_miltihop_{0}".format(binascii.hexlify(os.urandom(32))) receiver.pubsub_subscribe(topic) # wait until subscriptions propagate time.sleep(SLEEP_TIME) # send event event = binascii.hexlify(os.urandom(32)) sender.pubsub_publish(topic, event) # wait until event propagates time.sleep(SLEEP_TIME) # check all peers received the event events = receiver.pubsub_events(topic) self.assertEqual(events, [event])
def recv(self, x=512, timeout_secs=10.0): # FIXME: Don't know how many bytes to expect here, # using 512 bytes -- will this fly if there's another event right # after it? Or is each event guaranteed to be put in a USB packet of # its own? try: data_array = self.pyusb_dev.read( USB_ENDPOINT_HCI_EVT, 512, int(timeout_secs * 1000.0)) except usb.core.USBError as e: if e.errno == errno.ETIMEDOUT: return None else: raise e data = ''.join([chr(c) for c in data_array]) # Ugh.. array return val data = "\4" + data # Prepend H4 'Event' packet indicator scapy_packet = HCI_Hdr(data) LOG.debug("recv %s" % scapy_packet.lastlayer().summary()) LOG.debug("recv bytes: " + binascii.hexlify(data)) return scapy_packet
def encodeControllerString(cont_str, hex_encode = True): ''' Encode a string for transmission over the control port. If hex_encode is True, encode in hexadecimal, otherwise, encode in the Tor Control Protocol QuotedString format. Does not support the CString encoding. Only some strings in the tor control protocol need encoding. The same encodings are used by tor and the controller. ''' if hex_encode: # hex encoded strings do not have an 0x prefix encoded = hexlify(cont_str) else: # QuotedString # quoted strings escape \ and " with \, then quote with " # the order of these replacements is important: they ensure that # " becomes \" rather than \\" cont_str = cont_str.replace("\\", "\\\\") cont_str = cont_str.replace("\"", "\\\"") encoded = "\"" + cont_str + "\"" # sanity check assert TorControlProtocol.decodeControllerString(encoded) == cont_str return encoded
def hexlify(val): """ This function is used to display binary data in a friendly format. .. seealso:: :meth:`LinkageEntity:friendly_hash` Note: - Without the decode() the builtin `hexlify` return the bytes for hexadecimal representation of the binary data. - The returned string is twice as long as the length of data. :param val: binary :rtype: string """ return binascii.hexlify(val).decode()
def main(): try: if len(sys.argv) != 2: print "Usage : %s 908000000" % sys.argv[0] sys.exit(0) d = RfCat() ConfigureD(d, long(sys.argv[1])) d.setModeRX() while True: pkt = d.RFrecv(timeout=120000) frame = hexlify(pkt[0]) print "[rfcat recv] : '%s'\n" % (frame) except KeyboardInterrupt, e: print("W: interrupt received, proceeding") print e finally: d.setModeIDLE() sys.exit(0)
def _space_separated_little_endian(integer_value, byte_len): """INTERNAL. Get an integer in format for WAV file header.""" if byte_len <= 1: pack_type = '<B' elif byte_len <= 2: pack_type = '<H' elif byte_len <= 4: pack_type = '<I' elif byte_len <= 8: pack_type = '<Q' else: PTLogger.info("Value cannot be represented in 8 bytes - exiting") exit() hex_string = pack(pack_type, integer_value) temp = hexlify(hex_string).decode() return ' '.join([temp[i:i + 2] for i in range(0, len(temp), 2)])
def encode(self, password, salt): bcrypt = self._load_library() # Need to reevaluate the force_bytes call once bcrypt is supported on # Python 3 # Hash the password prior to using bcrypt to prevent password truncation # See: https://code.djangoproject.com/ticket/20138 if self.digest is not None: # We use binascii.hexlify here because Python3 decided that a hex encoded # bytestring is somehow a unicode. password = binascii.hexlify(self.digest(force_bytes(password)).digest()) else: password = force_bytes(password) data = bcrypt.hashpw(password, salt) return "%s$%s" % (self.algorithm, force_text(data))
def from_address(text): """Convert an IPv4 or IPv6 address in textual form into a Name object whose value is the reverse-map domain name of the address. @param text: an IPv4 or IPv6 address in textual form (e.g. '127.0.0.1', '::1') @type text: str @rtype: dns.name.Name object """ try: v6 = dns.ipv6.inet_aton(text) if dns.ipv6.is_mapped(v6): if sys.version_info >= (3,): parts = ['%d' % byte for byte in v6[12:]] else: parts = ['%d' % ord(byte) for byte in v6[12:]] origin = ipv4_reverse_domain else: parts = [x for x in str(binascii.hexlify(v6).decode())] origin = ipv6_reverse_domain except: parts = ['%d' % byte for byte in bytearray(dns.ipv4.inet_aton(text))] origin = ipv4_reverse_domain parts.reverse() return dns.name.from_text('.'.join(parts), origin=origin)
def seed(self, a=None): """Initialize internal state from hashable object. None or no argument seeds from current time or from an operating system specific randomness source if available. If a is not None or an int or long, hash(a) is used instead. """ if a is None: try: # Seed with enough bytes to span the 19937 bit # state space for the Mersenne Twister a = long(_hexlify(_urandom(2500)), 16) except NotImplementedError: import time a = long(time.time() * 256) # use fractional seconds super(Random, self).seed(a) self.gauss_next = None
def kciCloudHelper(iCloudKey): #this function is tailored to keychaindump. Takes an iCloud key, and returns tokens msg = base64.b64decode(iCloudKey) key = "t9s\"lx^awe.580Gj%'ld+0LG<#9xa?>vb)-fkwb92[}" hashed = hmac.new(key, msg, digestmod=hashlib.md5).digest() hexedKey = binascii.hexlify(hashed) IV = 16 * '0' mme_token_file = glob("/Users/%s/Library/Application Support/iCloud/Accounts/*" % get_bella_user()) #this doesnt need to be globber bc only current user's info can be decrypted for x in mme_token_file: try: int(x.split("/")[-1]) mme_token_file = x except ValueError: continue send_msg("\t%sDecrypting token plist\n\t [%s]\n" % (blue_star, mme_token_file), False) decryptedBinary = subprocess.check_output("openssl enc -d -aes-128-cbc -iv '%s' -K %s < '%s'" % (IV, hexedKey, mme_token_file), shell=True) from Foundation import NSData, NSPropertyListSerialization binToPlist = NSData.dataWithBytes_length_(decryptedBinary, len(decryptedBinary)) token_plist = NSPropertyListSerialization.propertyListWithData_options_format_error_(binToPlist, 0, None, None)[0] tokz = "[%s | %s]\n" % (token_plist["appleAccountInfo"]["primaryEmail"], token_plist["appleAccountInfo"]["fullName"]) tokz += "%s:%s\n" % (token_plist["appleAccountInfo"]["dsPrsID"], token_plist["tokens"]["mmeAuthToken"]) return tokz
def load_trace(path, *args, **kwargs): """Read a packet trace file, return a :class:`wltrace.common.WlTrace` object. This function first reads the file's magic (first ``FILE_TYPE_HANDLER`` bytes), and automatically determine the file type, and call appropriate handler to process the file. Args: path (str): the file's path to be loaded. Returns: ``WlTrace`` object. """ with open(path, 'rb') as f: magic = f.read(MAGIC_LEN) if magic not in FILE_TYPE_HANDLER: raise Exception('Unknown file magic: %s' % (binascii.hexlify(magic))) return FILE_TYPE_HANDLER[magic](path, *args, **kwargs)
def raw_lmhash(secret, encoding="ascii", hex=False): """encode password using des-based LMHASH algorithm; returns string of raw bytes, or unicode hex""" # NOTE: various references say LMHASH uses the OEM codepage of the host # for its encoding. until a clear reference is found, # as well as a path for getting the encoding, # letting this default to "ascii" to prevent incorrect hashes # from being made w/o user explicitly choosing an encoding. if isinstance(secret, unicode): secret = secret.encode(encoding) ns = secret.upper()[:14] + b"\x00" * (14-len(secret)) out = des_encrypt_block(ns[:7], LM_MAGIC) + des_encrypt_block(ns[7:], LM_MAGIC) return hexlify(out).decode("ascii") if hex else out #============================================================================= # eoc #=============================================================================
def disassemble(self): completed_disassembly = [] try: with open(self.byte_file, "rb") as fd: binCode = fd.read() except: binCode = self.byte_file try: print("Disassembling shellcode in {:s}-{:s} architecture".format(self.arch, self.mode)) mode = Cs(ARCH[self.arch], MODE[self.mode]) for i in mode.disasm(binCode, 0x1000): completed_disassembly += ("0x%x: %s\t%s %s" % ( i.address, binascii.hexlify(i.bytes).decode('utf-8'), i.mnemonic, i.op_str) ).expandtabs(25), for i in range(len(completed_disassembly)): print(completed_disassembly[i]) except CsError as e: print("Something went wrong: {:s}".format(e))
def parseStringInCode(self, code, as_string=False): """ This function will search for every line of assembly for quote(") pattern and convert it as a hexadecimal number. """ parsed = [] arch = self.emulator.parent.arch for line in code: i = line.find(b'"') if i==-1: # no string parsed.append(line) continue j = line[i+1:].find(b'"') if j==-1: # unfinished string parsed.append(line) continue if j != arch.ptrsize: # incorrect size parsed.append(line) continue origstr = line[i+1:i+j+1] hexstr = binascii.hexlify(origstr) newline = line.replace(b'"%s"'%origstr, b'0x%s'%hexstr) parsed.append(newline) if as_string: return b'\n'.join(parsed) return parsed
def b16encode(s): """Encode the bytes-like object s using Base16 and return a bytes object. """ return binascii.hexlify(s).upper()
def _bytehex(data): """Iterate by one byte with hexlify() output.""" for i in range(0, len(data), 2): yield data[i:i + 2]
def _hexarray(data): """Converting binary data to list of hex bytes as strings.""" return ['0x' + x.decode() for x in _bytehex(binascii.hexlify(data))]
def handleUdpMessage(self, message, remoteAddr): self.logger.debug(" handling decoded UDP message from server") if str(message.deviceId) == str(self.deviceId): self.logger.debug("Validating counters in incoming server message: local={0},{1}, message={2},{3}".format(self.udpSentCounter, self.udpReceivedCounter, message.counter1, message.counter2)) if (message.counter1 > self.udpReceivedCounter and message.counter2 >= self.udpSentCounter - 5): self.udpReceivedCounter = message.counter1 self.logger.info("Counters OK. Received valid message from server at {0}:{1} with payload {2}".format(remoteAddr[0], remoteAddr[1], message.payload)) self.saveState() self.passToHandler(message.payload) else: self.logger.warning("Invalid counters in incoming message. local={0},{1}, message={2},{3} - discarding".format(self.udpSentCounter, self.udpReceivedCounter, message.counter1, message.counter2)) else: self.logger.warning("Device key mismatch! local=%s, incoming=%s", binascii.hexlify(self.deviceId), binascii.hexlify(message.deviceId))