我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用binascii.b2a_hex()。
def get_pwd_rsa(self, pwd, servertime, nonce): """ Get rsa2 encrypted password, using RSA module from https://pypi.python.org/pypi/rsa/3.1.1, documents can be accessed at http://stuvel.eu/files/python-rsa-doc/index.html """ #n, n parameter of RSA public key, which is published by WEIBO.COM #hardcoded here but you can also find it from values return from prelogin status above weibo_rsa_n = 'EB2A38568661887FA180BDDB5CABD5F21C7BFD59C090CB2D245A87AC253062882729293E5506350508E7F9AA3BB77F4333231490F915F6D63C55FE2F08A49B353F444AD3993CACC02DB784ABBB8E42A9B1BBFFFB38BE18D78E87A0E41B9B8F73A928EE0CCEE1F6739884B9777E4FE9E88A1BBE495927AC4A799B3181D6442443' #e, exponent parameter of RSA public key, WEIBO uses 0x10001, which is 65537 in Decimal weibo_rsa_e = 65537 message = str(servertime) + '\t' + str(nonce) + '\n' + str(pwd) #construct WEIBO RSA Publickey using n and e above, note that n is a hex string key = rsa.PublicKey(int(weibo_rsa_n, 16), weibo_rsa_e) #get encrypted password encropy_pwd = rsa.encrypt(message, key) #trun back encrypted password binaries to hex string return binascii.b2a_hex(encropy_pwd)
def readnode(self, lnum, offs): """ read a node from a lnum + offset. """ ch = UbiFsCommonHeader() hdrdata = self.vol.read(lnum, offs, ch.hdrsize) ch.parse(hdrdata) ch.lnum = lnum ch.offs = offs node = ch.getnode() nodedata = self.vol.read(lnum, offs + ch.hdrsize, ch.len - ch.hdrsize) if crc32(hdrdata[8:] + nodedata) != ch.crc: print(ch, node) print(" %s + %s = %08x -> want = %08x" % ( b2a_hex(hdrdata), b2a_hex(nodedata), crc32(hdrdata[8:] + nodedata), ch.crc)) raise Exception("invalid node crc") node.parse(nodedata) return node
def yandex(url): try: cookie = client.request(url, output='cookie') r = client.request(url, cookie=cookie) r = re.sub(r'[^\x00-\x7F]+', ' ', r) sk = re.findall('"sk"\s*:\s*"([^"]+)', r)[0] idstring = re.findall('"id"\s*:\s*"([^"]+)', r)[0] idclient = binascii.b2a_hex(os.urandom(16)) post = {'idClient': idclient, 'version': '3.9.2', 'sk': sk, '_model.0': 'do-get-resource-url', 'id.0': idstring} post = urllib.urlencode(post) r = client.request('https://yadi.sk/models/?_m=do-get-resource-url', post=post, cookie=cookie) r = json.loads(r) url = r['models'][0]['data']['file'] return url except: return
def _oauth_request_token_url(self, callback_uri=None, extra_params=None): consumer_token = self._oauth_consumer_token() url = self._OAUTH_REQUEST_TOKEN_URL args = dict( oauth_consumer_key=escape.to_basestring(consumer_token["key"]), oauth_signature_method="HMAC-SHA1", oauth_timestamp=str(int(time.time())), oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)), oauth_version="1.0", ) if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a": if callback_uri == "oob": args["oauth_callback"] = "oob" elif callback_uri: args["oauth_callback"] = urlparse.urljoin( self.request.full_url(), callback_uri) if extra_params: args.update(extra_params) signature = _oauth10a_signature(consumer_token, "GET", url, args) else: signature = _oauth_signature(consumer_token, "GET", url, args) args["oauth_signature"] = signature return url + "?" + urllib_parse.urlencode(args)
def _oauth_access_token_url(self, request_token): consumer_token = self._oauth_consumer_token() url = self._OAUTH_ACCESS_TOKEN_URL args = dict( oauth_consumer_key=escape.to_basestring(consumer_token["key"]), oauth_token=escape.to_basestring(request_token["key"]), oauth_signature_method="HMAC-SHA1", oauth_timestamp=str(int(time.time())), oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)), oauth_version="1.0", ) if "verifier" in request_token: args["oauth_verifier"] = request_token["verifier"] if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a": signature = _oauth10a_signature(consumer_token, "GET", url, args, request_token) else: signature = _oauth_signature(consumer_token, "GET", url, args, request_token) args["oauth_signature"] = signature return url + "?" + urllib_parse.urlencode(args)
def encrypt(self, passwd=None, length=32): """ encrypt gen password ??????????? """ if not passwd: passwd = self.gen_rand_pass() cryptor = AES.new(self.key, self.mode, b'8122ca7d906ad5e1') try: count = len(passwd) except TypeError: raise ServerError('Encrypt password error, TYpe error.') add = (length - (count % length)) passwd += ('\0' * add) cipher_text = cryptor.encrypt(passwd) return b2a_hex(cipher_text)
def music_files(): """ Get a list of music files and file identifier hashes as JSON; also refresh internal cache of music files and hashes. """ global music_files_dict file_paths = sorted(glob.glob(path.join(settings.MUSIC_ROOT, '*'))) out = [] music_files_dict = dict() for file_path in file_paths: file_name = path.split(file_path)[1] file_hash = music_file_hash(file_name) out.append(dict(name=file_name, hash=binascii.b2a_hex(file_hash))) music_files_dict[file_hash] = file_name # set music files dict in RFID handler rfid_handler.set_music_files_dict(music_files_dict) return json.dumps(out)
def test_FortunaPool(self): """FortunaAccumulator.FortunaPool""" pool = FortunaAccumulator.FortunaPool() self.assertEqual(0, pool.length) self.assertEqual("5df6e0e2761359d30a8275058e299fcc0381534545f55cf43e41983f5d4c9456", pool.hexdigest()) pool.append(b('abc')) self.assertEqual(3, pool.length) self.assertEqual("4f8b42c22dd3729b519ba6f68d2da7cc5b2d606d05daed5ad5128cc03e6c6358", pool.hexdigest()) pool.append(b("dbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq")) self.assertEqual(56, pool.length) self.assertEqual(b('0cffe17f68954dac3a84fb1458bd5ec99209449749b2b308b7cb55812f9563af'), b2a_hex(pool.digest())) pool.reset() self.assertEqual(0, pool.length) pool.append(b('a') * 10**6) self.assertEqual(10**6, pool.length) self.assertEqual(b('80d1189477563e1b5206b2749f1afe4807e5705e8bd77887a60187a712156688'), b2a_hex(pool.digest()))
def runTest(self): plaintext = a2b_hex(self.plaintext) ciphertext = a2b_hex(self.ciphertext) ct1 = b2a_hex(self._new().encrypt(plaintext)) pt1 = b2a_hex(self._new(1).decrypt(ciphertext)) ct2 = b2a_hex(self._new().encrypt(plaintext)) pt2 = b2a_hex(self._new(1).decrypt(ciphertext)) if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP: # In PGP mode, data returned by the first encrypt() # is prefixed with the encrypted IV. # Here we check it and then remove it from the ciphertexts. eilen = len(self.encrypted_iv) self.assertEqual(self.encrypted_iv, ct1[:eilen]) self.assertEqual(self.encrypted_iv, ct2[:eilen]) ct1 = ct1[eilen:] ct2 = ct2[eilen:] self.assertEqual(self.ciphertext, ct1) # encrypt self.assertEqual(self.ciphertext, ct2) # encrypt (second time) self.assertEqual(self.plaintext, pt1) # decrypt self.assertEqual(self.plaintext, pt2) # decrypt (second time)
def runTest(self): plaintext = a2b_hex(self.plaintext) ciphertext = a2b_hex(self.ciphertext) # The cipher should work like a stream cipher # Test counter mode encryption, 3 bytes at a time ct3 = [] cipher = self._new() for i in range(0, len(plaintext), 3): ct3.append(cipher.encrypt(plaintext[i:i+3])) ct3 = b2a_hex(b("").join(ct3)) self.assertEqual(self.ciphertext, ct3) # encryption (3 bytes at a time) # Test counter mode decryption, 3 bytes at a time pt3 = [] cipher = self._new() for i in range(0, len(ciphertext), 3): pt3.append(cipher.encrypt(ciphertext[i:i+3])) # PY3K: This is meant to be text, do not change to bytes (data) pt3 = b2a_hex(b("").join(pt3)) self.assertEqual(self.plaintext, pt3) # decryption (3 bytes at a time)
def runTest(self): from Crypto.Cipher import DES from binascii import b2a_hex X = [] X[0:] = [b('\x94\x74\xB8\xE8\xC7\x3B\xCA\x7D')] for i in range(16): c = DES.new(X[i],DES.MODE_ECB) if not (i&1): # (num&1) returns 1 for odd numbers X[i+1:] = [c.encrypt(X[i])] # even else: X[i+1:] = [c.decrypt(X[i])] # odd self.assertEqual(b2a_hex(X[16]), b2a_hex(b('\x1B\x1A\x2D\xDB\x4C\x64\x24\x38')))
def home(): form = SendToDeviceForm(request.form) if form.validate_on_submit(): device = PushDevice.query.filter_by(device_uid=form.device_uid.data).first() if device is None: flash('Device not found (please check id)', "danger") else: otp = binascii.b2a_hex(os.urandom(4)).decode() push_status_txt = sendpush(device.push_id, otp) push_json = json.loads(push_status_txt) if "status" in push_json: if push_json['status'] == "OK": flash("One Time Password Sent To Device", "success") else: flash("Could Not Communicate With Device ( " + push_status_txt + " )", "danger") return render_template('main/home.html', form=form)
def register(): ret_dict = {} if 'push_id' in request.form : device = PushDevice() device.push_id = request.form.get('push_id') device.device_uid = binascii.b2a_hex(os.urandom(4)).decode() db.session.add(device) db.session.commit() ret_dict['device_uid'] = device.device_uid else : ret_dict['error'] = 'could not register push device' return json.dumps(ret_dict)
def handle_msg(self, msg_pmt): msg = pmt.cdr(msg_pmt) if not pmt.is_u8vector(msg): print "[ERROR] Received invalid message type. Expected u8vector" return packet = bytearray(pmt.u8vector_elements(msg)) if len(packet) <= 3: return if self.verbose: print 'Spacecraft ID', binascii.b2a_hex(packet[:2]) if packet[2] == 0x50: print 'CSP downlink, protocol version 0' else: print 'Unknown packet type' data = packet[3:] self.message_port_pub(pmt.intern('out'), pmt.cons(pmt.PMT_NIL, pmt.init_u8vector(len(data), data)))
def _convert_host_to_hex(host): """ Convert the provided host to the format in /proc/net/tcp* /proc/net/tcp uses little-endian four byte hex for ipv4 /proc/net/tcp6 uses little-endian per 4B word for ipv6 Args: host: String with either hostname, IPv4, or IPv6 address Returns: List of tuples containing address family and the little-endian converted host """ ips = [] if host is not None: for family, ip in _convert_host_to_ip(host): hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip)) hexip_hf = "" for i in range(0, len(hexip_nf), 8): ipgroup_nf = hexip_nf[i:i+8] ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16)) hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf) ips.append((family, hexip_hf)) return ips
def run(self, objHmac, mk, ssid, fast=False): #Requires Python 2.7.8 or later #Ubuntu 14.04 LTS generally only updates to 2.7.6 :( #if fast==True: # return hashlib.pbkdf2_hmac('sha1', mk, ssid, 4095) x1 = objHmac.load(mk, ssid + '\0\0\0\1', fast) x2 = objHmac.load(mk, ssid + '\0\0\0\2', fast) f1 = a2b_hex(x1) f2 = a2b_hex(x2) for x in xrange(4095): x1 = objHmac.load(mk, a2b_hex(x1), fast) x2 = objHmac.load(mk, a2b_hex(x2), fast) f1 = self.xorString(a2b_hex(x1), f1) f2 = self.xorString(a2b_hex(x2), f2) out = b2a_hex(f1) + b2a_hex(f2) return out[0:64]
def request_hex_nonce(self): self.open_send_socket() # build nonce request payload string nonce_request = self.authentication.buildNonceRequestPayloadString() self.logger.debug("Sending nonce request with body of length %d", len(nonce_request)) self.logger.debug('Send: %s', nonce_request) self.sock.send(nonce_request) self.logger.debug('Nonce request sent.') resultbuf_hex = binascii.b2a_hex(self.receive_send_socket(max_receive_bytes=32)) if resultbuf_hex is None: raise HologramError('Internal nonce error') return resultbuf_hex
def encrypt(self,text): cryptor = AES.new(self.key,self.mode,b'0000000000000000') #????key ?????16?AES-128?, #24?AES-192?,??32 ?AES-256?Bytes ?? #??AES-128 ?????? length = 16 count = len(text) if count < length: add = (length-count) #\0 backspace text = text + ('\0' * add) elif count > length: add = (length-(count % length)) text = text + ('\0' * add) self.ciphertext = cryptor.encrypt(text) #??AES??????????????ascii?????????????????????? #?????????????????16????? return b2a_hex(self.ciphertext) #????????????strip() ??
def connect(self): """ Connects the sphero with the address given in the constructor """ self._device = bluepy.btle.Peripheral(self._addr, addrType=bluepy.btle.ADDR_TYPE_RANDOM) self._notifier = DelegateObj(self, self._notification_lock) #set notifier to be notified self._device.withDelegate(self._notifier) self._devModeOn() self._connected = True #Might need to change to be a callback format #get the command service cmd_service = self._device.getServiceByUUID(RobotControlService) self._cmd_characteristics = {} characteristic_list = cmd_service.getCharacteristics() for characteristic in characteristic_list: uuid_str = binascii.b2a_hex(characteristic.uuid.binVal).decode('utf-8') self._cmd_characteristics[uuid_str] = characteristic self._listening_flag = True self._listening_thread = threading.Thread(target=self._listening_loop) self._listening_thread.start()
def _devModeOn(self): """ A sequence of read/write that enables the developer mode """ service = self._device.getServiceByUUID(BLEService) characteristic_list = service.getCharacteristics() #make it into a dict characteristic_dict = {} for characteristic in characteristic_list: uuid_str = binascii.b2a_hex(characteristic.uuid.binVal).decode('utf-8') characteristic_dict[uuid_str] = characteristic characteristic = characteristic_dict[AntiDosCharacteristic] characteristic.write("011i3".encode(),True) characteristic = characteristic_dict[TXPowerCharacteristic] characteristic.write((7).to_bytes(1, 'big'),True) characteristic = characteristic_dict[WakeCharacteristic] characteristic.write((1).to_bytes(1, 'big'),True)
def oauth_request_parameters(consumer_token, url, access_token, parameters={}, method="GET", oauth_version="1.0a", override_version=""): base_args = dict( oauth_consumer_key=consumer_token["key"], oauth_token=access_token["key"], oauth_signature_method="HMAC-SHA1", oauth_timestamp=str(int(time.time())), oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes), oauth_version=override_version or oauth_version, ) args = base_args.copy() args.update(parameters) if oauth_version == "1.0a": signature = tornado.auth._oauth10a_signature(consumer_token, method, url, args, access_token) else: signature = tornado.auth._oauth_signature(consumer_token, method, url, args, access_token) base_args["oauth_signature"] = signature return base_args
def __init__(self, name, resultdir, vt_type, test=None, mux=None): self.id = binascii.b2a_hex(os.urandom(20)) self.name = str(name) self.shortname = "_".join(self.name.split('_')[1:]) self.job_dir = None self.type = str(name.split('_')[0]) self.resultdir = resultdir self.conf = None self.test = test self.mux = mux self.run = "Not_Run" self.runsummary = None self.runlink = None if self.type == 'guest': self.vt_type = vt_type else: self.vt_type = None
def parse_xbee_rf_data(data): if DEBUG: print data node_addr = binascii.b2a_hex(data['source_addr_long']) + binascii.b2a_hex((data['source_addr'])) readings = re.findall("{.*?}", data['rf_data']) # returns a list of K-V pair matches payload = {'node': node_addr} for reading in readings: item_dict = json.loads(reading) for k, v in item_dict.iteritems(): sensor_name = k.encode('utf-8') sensor_value = v.encode('utf-8') if sensor_name == "temp:": sensor_name = "temp" payload['sensor'] = sensor_name payload['val'] = sensor_value # now pass the payload to the RequestBuilder and send it print payload if PRODUCTION: sendHTTPPost(payload)
def get_sample_name(input_sample_name): """ Return sample name, only alphabet and digits are allowed. (no underscore, no space) If input_sample_name is None or empty string, return a random string starts with 'sample' """ if input_sample_name is None or str(input_sample_name) == "None": input_sample_name = "" else: input_sample_name = str(input_sample_name) input_sample_name = "".join([x for x in input_sample_name if x.isalpha() or x.isdigit()]) if len(input_sample_name) == 0: import binascii return str("sample"+binascii.b2a_hex(os.urandom(3))) else: return input_sample_name
def seed_callback(bot, update, args): user_id = update.message.from_user.id chat_id = update.message.chat_id lang_id = mysql_select_language(user_id) seed = mysql_select_seed(user_id) if (seed is False): seed = binascii.b2a_hex(os.urandom(8)).upper() mysql_set_seed(user_id, seed) seed_split = [seed[i:i+4] for i in range(0, len(seed), 4)] seed_text = seed_split[0] + '-' + seed_split[1] + '-' + seed_split[2] + '-' + seed_split[3] check = mysql_check_password(user_id) if ((len(args) > 0) and (check is not False)): password = args[0] hex = password_check(update, password) if (check == hex): message_markdown(bot, chat_id, lang_text('seed_creation', lang_id).format(seed_text)) else: text_reply(update, lang_text('password_error', lang_id)) elif (check is not False): text_reply(update, lang_text('password_error', lang_id)) else: message_markdown(bot, chat_id, lang_text('seed_creation', lang_id).format(seed_text))
def read_body(self, f, body_checksum): checksum = 0 data = f.read(32) while len(data) == 32: meta = dict() segment = "" segment, meta['incremental'], meta['base'], meta['encryption'], meta['compression'], sha1 = unpack('<2IH2B20s', data) meta['sha1_hash'] = b2a_hex(sha1) self.segments[segment] = meta checksum = crc32(data, checksum) data = f.read(32) if checksum != body_checksum: raise Exception('Body checksum does not match')
def main(): args = parse_args() manifest = driver.DriverManager( namespace='ekko.manifest.drivers', name=args.driver, invoke_on_load=True, invoke_args=[args.manifest] ).driver for segment in manifest.get_segments(manifest.get_metadata()): print(binascii.b2a_hex(segment.segment_hash))
def hex_encode(input, errors='strict'): assert errors == 'strict' return (binascii.b2a_hex(input), len(input))