Python binascii 模块,b2a_hex() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用binascii.b2a_hex()

项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def get_pwd_rsa(self, pwd, servertime, nonce):
        """
            Get rsa2 encrypted password, using RSA module from https://pypi.python.org/pypi/rsa/3.1.1, documents can be accessed at
            http://stuvel.eu/files/python-rsa-doc/index.html
        """
        #n, n parameter of RSA public key, which is published by WEIBO.COM
        #hardcoded here but you can also find it from values return from prelogin status above
        weibo_rsa_n = 'EB2A38568661887FA180BDDB5CABD5F21C7BFD59C090CB2D245A87AC253062882729293E5506350508E7F9AA3BB77F4333231490F915F6D63C55FE2F08A49B353F444AD3993CACC02DB784ABBB8E42A9B1BBFFFB38BE18D78E87A0E41B9B8F73A928EE0CCEE1F6739884B9777E4FE9E88A1BBE495927AC4A799B3181D6442443'

        #e, exponent parameter of RSA public key, WEIBO uses 0x10001, which is 65537 in Decimal
        weibo_rsa_e = 65537
        message = str(servertime) + '\t' + str(nonce) + '\n' + str(pwd)

        #construct WEIBO RSA Publickey using n and e above, note that n is a hex string
        key = rsa.PublicKey(int(weibo_rsa_n, 16), weibo_rsa_e)

        #get encrypted password
        encropy_pwd = rsa.encrypt(message, key)
        #trun back encrypted password binaries to hex string
        return binascii.b2a_hex(encropy_pwd)
项目:ubidump    作者:nlitsme    | 项目源码 | 文件源码
def readnode(self, lnum, offs):
        """
        read a node from a lnum + offset.
        """
        ch = UbiFsCommonHeader()
        hdrdata = self.vol.read(lnum, offs, ch.hdrsize)
        ch.parse(hdrdata)

        ch.lnum = lnum
        ch.offs = offs

        node = ch.getnode()
        nodedata = self.vol.read(lnum, offs + ch.hdrsize, ch.len - ch.hdrsize)

        if crc32(hdrdata[8:] + nodedata) != ch.crc:
            print(ch, node)
            print(" %s + %s = %08x -> want = %08x" % ( b2a_hex(hdrdata), b2a_hex(nodedata), crc32(hdrdata[8:] + nodedata), ch.crc))
            raise Exception("invalid node crc")
        node.parse(nodedata)

        return node
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def yandex(url):
    try:
        cookie = client.request(url, output='cookie')

        r = client.request(url, cookie=cookie)
        r = re.sub(r'[^\x00-\x7F]+', ' ', r)

        sk = re.findall('"sk"\s*:\s*"([^"]+)', r)[0]

        idstring = re.findall('"id"\s*:\s*"([^"]+)', r)[0]

        idclient = binascii.b2a_hex(os.urandom(16))

        post = {'idClient': idclient, 'version': '3.9.2', 'sk': sk, '_model.0': 'do-get-resource-url', 'id.0': idstring}
        post = urllib.urlencode(post)

        r = client.request('https://yadi.sk/models/?_m=do-get-resource-url', post=post, cookie=cookie)
        r = json.loads(r)

        url = r['models'][0]['data']['file']

        return url
    except:
        return
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth_request_token_url(self, callback_uri=None, extra_params=None):
        consumer_token = self._oauth_consumer_token()
        url = self._OAUTH_REQUEST_TOKEN_URL
        args = dict(
            oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
            oauth_signature_method="HMAC-SHA1",
            oauth_timestamp=str(int(time.time())),
            oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
            oauth_version="1.0",
        )
        if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
            if callback_uri == "oob":
                args["oauth_callback"] = "oob"
            elif callback_uri:
                args["oauth_callback"] = urlparse.urljoin(
                    self.request.full_url(), callback_uri)
            if extra_params:
                args.update(extra_params)
            signature = _oauth10a_signature(consumer_token, "GET", url, args)
        else:
            signature = _oauth_signature(consumer_token, "GET", url, args)

        args["oauth_signature"] = signature
        return url + "?" + urllib_parse.urlencode(args)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth_access_token_url(self, request_token):
        consumer_token = self._oauth_consumer_token()
        url = self._OAUTH_ACCESS_TOKEN_URL
        args = dict(
            oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
            oauth_token=escape.to_basestring(request_token["key"]),
            oauth_signature_method="HMAC-SHA1",
            oauth_timestamp=str(int(time.time())),
            oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
            oauth_version="1.0",
        )
        if "verifier" in request_token:
            args["oauth_verifier"] = request_token["verifier"]

        if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
            signature = _oauth10a_signature(consumer_token, "GET", url, args,
                                            request_token)
        else:
            signature = _oauth_signature(consumer_token, "GET", url, args,
                                         request_token)

        args["oauth_signature"] = signature
        return url + "?" + urllib_parse.urlencode(args)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth_request_token_url(self, callback_uri=None, extra_params=None):
        consumer_token = self._oauth_consumer_token()
        url = self._OAUTH_REQUEST_TOKEN_URL
        args = dict(
            oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
            oauth_signature_method="HMAC-SHA1",
            oauth_timestamp=str(int(time.time())),
            oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
            oauth_version="1.0",
        )
        if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
            if callback_uri == "oob":
                args["oauth_callback"] = "oob"
            elif callback_uri:
                args["oauth_callback"] = urlparse.urljoin(
                    self.request.full_url(), callback_uri)
            if extra_params:
                args.update(extra_params)
            signature = _oauth10a_signature(consumer_token, "GET", url, args)
        else:
            signature = _oauth_signature(consumer_token, "GET", url, args)

        args["oauth_signature"] = signature
        return url + "?" + urllib_parse.urlencode(args)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth_access_token_url(self, request_token):
        consumer_token = self._oauth_consumer_token()
        url = self._OAUTH_ACCESS_TOKEN_URL
        args = dict(
            oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
            oauth_token=escape.to_basestring(request_token["key"]),
            oauth_signature_method="HMAC-SHA1",
            oauth_timestamp=str(int(time.time())),
            oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
            oauth_version="1.0",
        )
        if "verifier" in request_token:
            args["oauth_verifier"] = request_token["verifier"]

        if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
            signature = _oauth10a_signature(consumer_token, "GET", url, args,
                                            request_token)
        else:
            signature = _oauth_signature(consumer_token, "GET", url, args,
                                         request_token)

        args["oauth_signature"] = signature
        return url + "?" + urllib_parse.urlencode(args)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth_access_token_url(self, request_token):
        consumer_token = self._oauth_consumer_token()
        url = self._OAUTH_ACCESS_TOKEN_URL
        args = dict(
            oauth_consumer_key=escape.to_basestring(consumer_token["key"]),
            oauth_token=escape.to_basestring(request_token["key"]),
            oauth_signature_method="HMAC-SHA1",
            oauth_timestamp=str(int(time.time())),
            oauth_nonce=escape.to_basestring(binascii.b2a_hex(uuid.uuid4().bytes)),
            oauth_version="1.0",
        )
        if "verifier" in request_token:
            args["oauth_verifier"] = request_token["verifier"]

        if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
            signature = _oauth10a_signature(consumer_token, "GET", url, args,
                                            request_token)
        else:
            signature = _oauth_signature(consumer_token, "GET", url, args,
                                         request_token)

        args["oauth_signature"] = signature
        return url + "?" + urllib_parse.urlencode(args)
项目:geekcloud    作者:Mr-Linus    | 项目源码 | 文件源码
def encrypt(self, passwd=None, length=32):
        """
        encrypt gen password
        ???????????
        """
        if not passwd:
            passwd = self.gen_rand_pass()

        cryptor = AES.new(self.key, self.mode, b'8122ca7d906ad5e1')
        try:
            count = len(passwd)
        except TypeError:
            raise ServerError('Encrypt password error, TYpe error.')

        add = (length - (count % length))
        passwd += ('\0' * add)
        cipher_text = cryptor.encrypt(passwd)
        return b2a_hex(cipher_text)
项目:nfcmusik    作者:ehansis    | 项目源码 | 文件源码
def music_files():
    """
    Get a list of music files and file identifier hashes as JSON; also refresh 
    internal cache of music files and hashes.
    """
    global music_files_dict

    file_paths = sorted(glob.glob(path.join(settings.MUSIC_ROOT, '*')))

    out = []
    music_files_dict = dict()
    for file_path in file_paths:
        file_name = path.split(file_path)[1]
        file_hash = music_file_hash(file_name)
        out.append(dict(name=file_name,
                        hash=binascii.b2a_hex(file_hash)))
        music_files_dict[file_hash] = file_name

        # set music files dict in RFID handler
        rfid_handler.set_music_files_dict(music_files_dict)

    return json.dumps(out)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_FortunaPool(self):
        """FortunaAccumulator.FortunaPool"""
        pool = FortunaAccumulator.FortunaPool()
        self.assertEqual(0, pool.length)
        self.assertEqual("5df6e0e2761359d30a8275058e299fcc0381534545f55cf43e41983f5d4c9456", pool.hexdigest())

        pool.append(b('abc'))

        self.assertEqual(3, pool.length)
        self.assertEqual("4f8b42c22dd3729b519ba6f68d2da7cc5b2d606d05daed5ad5128cc03e6c6358", pool.hexdigest())

        pool.append(b("dbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq"))

        self.assertEqual(56, pool.length)
        self.assertEqual(b('0cffe17f68954dac3a84fb1458bd5ec99209449749b2b308b7cb55812f9563af'), b2a_hex(pool.digest()))

        pool.reset()

        self.assertEqual(0, pool.length)

        pool.append(b('a') * 10**6)

        self.assertEqual(10**6, pool.length)
        self.assertEqual(b('80d1189477563e1b5206b2749f1afe4807e5705e8bd77887a60187a712156688'), b2a_hex(pool.digest()))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def runTest(self):
        plaintext = a2b_hex(self.plaintext)
        ciphertext = a2b_hex(self.ciphertext)

        ct1 = b2a_hex(self._new().encrypt(plaintext))
        pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
        ct2 = b2a_hex(self._new().encrypt(plaintext))
        pt2 = b2a_hex(self._new(1).decrypt(ciphertext))

        if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
            # In PGP mode, data returned by the first encrypt()
            # is prefixed with the encrypted IV.
            # Here we check it and then remove it from the ciphertexts.
            eilen = len(self.encrypted_iv)
            self.assertEqual(self.encrypted_iv, ct1[:eilen])
            self.assertEqual(self.encrypted_iv, ct2[:eilen])
            ct1 = ct1[eilen:]
            ct2 = ct2[eilen:]

        self.assertEqual(self.ciphertext, ct1)  # encrypt
        self.assertEqual(self.ciphertext, ct2)  # encrypt (second time)
        self.assertEqual(self.plaintext, pt1)   # decrypt
        self.assertEqual(self.plaintext, pt2)   # decrypt (second time)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def runTest(self):
        plaintext = a2b_hex(self.plaintext)
        ciphertext = a2b_hex(self.ciphertext)

        # The cipher should work like a stream cipher

        # Test counter mode encryption, 3 bytes at a time
        ct3 = []
        cipher = self._new()
        for i in range(0, len(plaintext), 3):
            ct3.append(cipher.encrypt(plaintext[i:i+3]))
        ct3 = b2a_hex(b("").join(ct3))
        self.assertEqual(self.ciphertext, ct3)  # encryption (3 bytes at a time)

        # Test counter mode decryption, 3 bytes at a time
        pt3 = []
        cipher = self._new()
        for i in range(0, len(ciphertext), 3):
            pt3.append(cipher.encrypt(ciphertext[i:i+3]))
        # PY3K: This is meant to be text, do not change to bytes (data)
        pt3 = b2a_hex(b("").join(pt3))
        self.assertEqual(self.plaintext, pt3)  # decryption (3 bytes at a time)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def runTest(self):
        from Crypto.Cipher import DES
        from binascii import b2a_hex

        X = []
        X[0:] = [b('\x94\x74\xB8\xE8\xC7\x3B\xCA\x7D')]

        for i in range(16):
            c = DES.new(X[i],DES.MODE_ECB)
            if not (i&1): # (num&1) returns 1 for odd numbers 
                X[i+1:] = [c.encrypt(X[i])] # even
            else:
                X[i+1:] = [c.decrypt(X[i])] # odd

        self.assertEqual(b2a_hex(X[16]),
            b2a_hex(b('\x1B\x1A\x2D\xDB\x4C\x64\x24\x38')))
项目:otp_demo_server    作者:ownpush    | 项目源码 | 文件源码
def home():
    form = SendToDeviceForm(request.form)

    if form.validate_on_submit():
        device = PushDevice.query.filter_by(device_uid=form.device_uid.data).first()

        if device is None:
            flash('Device not found (please check id)', "danger")
        else:
            otp = binascii.b2a_hex(os.urandom(4)).decode()
            push_status_txt = sendpush(device.push_id, otp)
            push_json = json.loads(push_status_txt)

            if "status" in push_json:
                if push_json['status'] == "OK":
                    flash("One Time Password Sent To Device", "success")
                else:
                    flash("Could Not Communicate With Device ( " + push_status_txt + " )", "danger")

    return render_template('main/home.html', form=form)
项目:otp_demo_server    作者:ownpush    | 项目源码 | 文件源码
def register():
    ret_dict = {}

    if 'push_id' in request.form :
        device = PushDevice()
        device.push_id = request.form.get('push_id')
        device.device_uid = binascii.b2a_hex(os.urandom(4)).decode()
        db.session.add(device)
        db.session.commit()

        ret_dict['device_uid'] = device.device_uid

    else :
        ret_dict['error'] = 'could not register push device'

    return json.dumps(ret_dict)
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def test_FortunaPool(self):
        """FortunaAccumulator.FortunaPool"""
        pool = FortunaAccumulator.FortunaPool()
        self.assertEqual(0, pool.length)
        self.assertEqual("5df6e0e2761359d30a8275058e299fcc0381534545f55cf43e41983f5d4c9456", pool.hexdigest())

        pool.append(b('abc'))

        self.assertEqual(3, pool.length)
        self.assertEqual("4f8b42c22dd3729b519ba6f68d2da7cc5b2d606d05daed5ad5128cc03e6c6358", pool.hexdigest())

        pool.append(b("dbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq"))

        self.assertEqual(56, pool.length)
        self.assertEqual(b('0cffe17f68954dac3a84fb1458bd5ec99209449749b2b308b7cb55812f9563af'), b2a_hex(pool.digest()))

        pool.reset()

        self.assertEqual(0, pool.length)

        pool.append(b('a') * 10**6)

        self.assertEqual(10**6, pool.length)
        self.assertEqual(b('80d1189477563e1b5206b2749f1afe4807e5705e8bd77887a60187a712156688'), b2a_hex(pool.digest()))
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def runTest(self):
        plaintext = a2b_hex(self.plaintext)
        ciphertext = a2b_hex(self.ciphertext)

        ct1 = b2a_hex(self._new().encrypt(plaintext))
        pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
        ct2 = b2a_hex(self._new().encrypt(plaintext))
        pt2 = b2a_hex(self._new(1).decrypt(ciphertext))

        if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
            # In PGP mode, data returned by the first encrypt()
            # is prefixed with the encrypted IV.
            # Here we check it and then remove it from the ciphertexts.
            eilen = len(self.encrypted_iv)
            self.assertEqual(self.encrypted_iv, ct1[:eilen])
            self.assertEqual(self.encrypted_iv, ct2[:eilen])
            ct1 = ct1[eilen:]
            ct2 = ct2[eilen:]

        self.assertEqual(self.ciphertext, ct1)  # encrypt
        self.assertEqual(self.ciphertext, ct2)  # encrypt (second time)
        self.assertEqual(self.plaintext, pt1)   # decrypt
        self.assertEqual(self.plaintext, pt2)   # decrypt (second time)
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def runTest(self):
        plaintext = a2b_hex(self.plaintext)
        ciphertext = a2b_hex(self.ciphertext)

        # The cipher should work like a stream cipher

        # Test counter mode encryption, 3 bytes at a time
        ct3 = []
        cipher = self._new()
        for i in range(0, len(plaintext), 3):
            ct3.append(cipher.encrypt(plaintext[i:i+3]))
        ct3 = b2a_hex(b("").join(ct3))
        self.assertEqual(self.ciphertext, ct3)  # encryption (3 bytes at a time)

        # Test counter mode decryption, 3 bytes at a time
        pt3 = []
        cipher = self._new()
        for i in range(0, len(ciphertext), 3):
            pt3.append(cipher.encrypt(ciphertext[i:i+3]))
        # PY3K: This is meant to be text, do not change to bytes (data)
        pt3 = b2a_hex(b("").join(pt3))
        self.assertEqual(self.plaintext, pt3)  # decryption (3 bytes at a time)
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def runTest(self):
        from Crypto.Cipher import DES
        from binascii import b2a_hex

        X = []
        X[0:] = [b('\x94\x74\xB8\xE8\xC7\x3B\xCA\x7D')]

        for i in range(16):
            c = DES.new(X[i],DES.MODE_ECB)
            if not (i&1): # (num&1) returns 1 for odd numbers 
                X[i+1:] = [c.encrypt(X[i])] # even
            else:
                X[i+1:] = [c.decrypt(X[i])] # odd

        self.assertEqual(b2a_hex(X[16]),
            b2a_hex(b('\x1B\x1A\x2D\xDB\x4C\x64\x24\x38')))
项目:gr-satellites    作者:daniestevez    | 项目源码 | 文件源码
def handle_msg(self, msg_pmt):
        msg = pmt.cdr(msg_pmt)
        if not pmt.is_u8vector(msg):
            print "[ERROR] Received invalid message type. Expected u8vector"
            return
        packet = bytearray(pmt.u8vector_elements(msg))

        if len(packet) <= 3:
            return

        if self.verbose:
            print 'Spacecraft ID', binascii.b2a_hex(packet[:2])
            if packet[2] == 0x50:
                print 'CSP downlink, protocol version 0'
            else:
                print 'Unknown packet type'

        data = packet[3:]
        self.message_port_pub(pmt.intern('out'),
                                  pmt.cons(pmt.PMT_NIL,
                                           pmt.init_u8vector(len(data), data)))
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def test_FortunaPool(self):
        """FortunaAccumulator.FortunaPool"""
        pool = FortunaAccumulator.FortunaPool()
        self.assertEqual(0, pool.length)
        self.assertEqual("5df6e0e2761359d30a8275058e299fcc0381534545f55cf43e41983f5d4c9456", pool.hexdigest())

        pool.append(b('abc'))

        self.assertEqual(3, pool.length)
        self.assertEqual("4f8b42c22dd3729b519ba6f68d2da7cc5b2d606d05daed5ad5128cc03e6c6358", pool.hexdigest())

        pool.append(b("dbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq"))

        self.assertEqual(56, pool.length)
        self.assertEqual(b('0cffe17f68954dac3a84fb1458bd5ec99209449749b2b308b7cb55812f9563af'), b2a_hex(pool.digest()))

        pool.reset()

        self.assertEqual(0, pool.length)

        pool.append(b('a') * 10**6)

        self.assertEqual(10**6, pool.length)
        self.assertEqual(b('80d1189477563e1b5206b2749f1afe4807e5705e8bd77887a60187a712156688'), b2a_hex(pool.digest()))
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def runTest(self):
        plaintext = a2b_hex(self.plaintext)
        ciphertext = a2b_hex(self.ciphertext)

        ct1 = b2a_hex(self._new().encrypt(plaintext))
        pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
        ct2 = b2a_hex(self._new().encrypt(plaintext))
        pt2 = b2a_hex(self._new(1).decrypt(ciphertext))

        if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
            # In PGP mode, data returned by the first encrypt()
            # is prefixed with the encrypted IV.
            # Here we check it and then remove it from the ciphertexts.
            eilen = len(self.encrypted_iv)
            self.assertEqual(self.encrypted_iv, ct1[:eilen])
            self.assertEqual(self.encrypted_iv, ct2[:eilen])
            ct1 = ct1[eilen:]
            ct2 = ct2[eilen:]

        self.assertEqual(self.ciphertext, ct1)  # encrypt
        self.assertEqual(self.ciphertext, ct2)  # encrypt (second time)
        self.assertEqual(self.plaintext, pt1)   # decrypt
        self.assertEqual(self.plaintext, pt2)   # decrypt (second time)
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def runTest(self):
        plaintext = a2b_hex(self.plaintext)
        ciphertext = a2b_hex(self.ciphertext)

        # The cipher should work like a stream cipher

        # Test counter mode encryption, 3 bytes at a time
        ct3 = []
        cipher = self._new()
        for i in range(0, len(plaintext), 3):
            ct3.append(cipher.encrypt(plaintext[i:i+3]))
        ct3 = b2a_hex(b("").join(ct3))
        self.assertEqual(self.ciphertext, ct3)  # encryption (3 bytes at a time)

        # Test counter mode decryption, 3 bytes at a time
        pt3 = []
        cipher = self._new()
        for i in range(0, len(ciphertext), 3):
            pt3.append(cipher.encrypt(ciphertext[i:i+3]))
        # PY3K: This is meant to be text, do not change to bytes (data)
        pt3 = b2a_hex(b("").join(pt3))
        self.assertEqual(self.plaintext, pt3)  # decryption (3 bytes at a time)
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def runTest(self):
        from Crypto.Cipher import DES
        from binascii import b2a_hex

        X = []
        X[0:] = [b('\x94\x74\xB8\xE8\xC7\x3B\xCA\x7D')]

        for i in range(16):
            c = DES.new(X[i],DES.MODE_ECB)
            if not (i&1): # (num&1) returns 1 for odd numbers 
                X[i+1:] = [c.encrypt(X[i])] # even
            else:
                X[i+1:] = [c.decrypt(X[i])] # odd

        self.assertEqual(b2a_hex(X[16]),
            b2a_hex(b('\x1B\x1A\x2D\xDB\x4C\x64\x24\x38')))
项目:tidb-ansible    作者:pingcap    | 项目源码 | 文件源码
def _convert_host_to_hex(host):
    """
    Convert the provided host to the format in /proc/net/tcp*

    /proc/net/tcp uses little-endian four byte hex for ipv4
    /proc/net/tcp6 uses little-endian per 4B word for ipv6

    Args:
        host: String with either hostname, IPv4, or IPv6 address

    Returns:
        List of tuples containing address family and the
        little-endian converted host
    """
    ips = []
    if host is not None:
        for family, ip in _convert_host_to_ip(host):
            hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip))
            hexip_hf = ""
            for i in range(0, len(hexip_nf), 8):
                ipgroup_nf = hexip_nf[i:i+8]
                ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16))
                hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf)
            ips.append((family, hexip_hf))
    return ips
项目:WPA-Slowed-Down    作者:JarrettR    | 项目源码 | 文件源码
def run(self, objHmac, mk, ssid, fast=False):
        #Requires Python 2.7.8 or later
        #Ubuntu 14.04 LTS generally only updates to 2.7.6 :(
        #if fast==True:
        #   return hashlib.pbkdf2_hmac('sha1', mk, ssid, 4095)

        x1 = objHmac.load(mk, ssid + '\0\0\0\1', fast)
        x2 = objHmac.load(mk, ssid + '\0\0\0\2', fast)

        f1 = a2b_hex(x1)
        f2 = a2b_hex(x2)

        for x in xrange(4095):
            x1 = objHmac.load(mk, a2b_hex(x1), fast)
            x2 = objHmac.load(mk, a2b_hex(x2), fast)

            f1 = self.xorString(a2b_hex(x1), f1)
            f2 = self.xorString(a2b_hex(x2), f2)

        out = b2a_hex(f1) + b2a_hex(f2)
        return out[0:64]
项目:git_intgrtn_aws_s3    作者:droidlabour    | 项目源码 | 文件源码
def test_FortunaPool(self):
        """FortunaAccumulator.FortunaPool"""
        pool = FortunaAccumulator.FortunaPool()
        self.assertEqual(0, pool.length)
        self.assertEqual("5df6e0e2761359d30a8275058e299fcc0381534545f55cf43e41983f5d4c9456", pool.hexdigest())

        pool.append(b('abc'))

        self.assertEqual(3, pool.length)
        self.assertEqual("4f8b42c22dd3729b519ba6f68d2da7cc5b2d606d05daed5ad5128cc03e6c6358", pool.hexdigest())

        pool.append(b("dbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq"))

        self.assertEqual(56, pool.length)
        self.assertEqual(b('0cffe17f68954dac3a84fb1458bd5ec99209449749b2b308b7cb55812f9563af'), b2a_hex(pool.digest()))

        pool.reset()

        self.assertEqual(0, pool.length)

        pool.append(b('a') * 10**6)

        self.assertEqual(10**6, pool.length)
        self.assertEqual(b('80d1189477563e1b5206b2749f1afe4807e5705e8bd77887a60187a712156688'), b2a_hex(pool.digest()))
项目:git_intgrtn_aws_s3    作者:droidlabour    | 项目源码 | 文件源码
def runTest(self):
        plaintext = a2b_hex(self.plaintext)
        ciphertext = a2b_hex(self.ciphertext)

        ct1 = b2a_hex(self._new().encrypt(plaintext))
        pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
        ct2 = b2a_hex(self._new().encrypt(plaintext))
        pt2 = b2a_hex(self._new(1).decrypt(ciphertext))

        if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
            # In PGP mode, data returned by the first encrypt()
            # is prefixed with the encrypted IV.
            # Here we check it and then remove it from the ciphertexts.
            eilen = len(self.encrypted_iv)
            self.assertEqual(self.encrypted_iv, ct1[:eilen])
            self.assertEqual(self.encrypted_iv, ct2[:eilen])
            ct1 = ct1[eilen:]
            ct2 = ct2[eilen:]

        self.assertEqual(self.ciphertext, ct1)  # encrypt
        self.assertEqual(self.ciphertext, ct2)  # encrypt (second time)
        self.assertEqual(self.plaintext, pt1)   # decrypt
        self.assertEqual(self.plaintext, pt2)   # decrypt (second time)
项目:git_intgrtn_aws_s3    作者:droidlabour    | 项目源码 | 文件源码
def runTest(self):
        plaintext = a2b_hex(self.plaintext)
        ciphertext = a2b_hex(self.ciphertext)

        # The cipher should work like a stream cipher

        # Test counter mode encryption, 3 bytes at a time
        ct3 = []
        cipher = self._new()
        for i in range(0, len(plaintext), 3):
            ct3.append(cipher.encrypt(plaintext[i:i+3]))
        ct3 = b2a_hex(b("").join(ct3))
        self.assertEqual(self.ciphertext, ct3)  # encryption (3 bytes at a time)

        # Test counter mode decryption, 3 bytes at a time
        pt3 = []
        cipher = self._new()
        for i in range(0, len(ciphertext), 3):
            pt3.append(cipher.encrypt(ciphertext[i:i+3]))
        # PY3K: This is meant to be text, do not change to bytes (data)
        pt3 = b2a_hex(b("").join(pt3))
        self.assertEqual(self.plaintext, pt3)  # decryption (3 bytes at a time)
项目:git_intgrtn_aws_s3    作者:droidlabour    | 项目源码 | 文件源码
def runTest(self):
        from Crypto.Cipher import DES
        from binascii import b2a_hex

        X = []
        X[0:] = [b('\x94\x74\xB8\xE8\xC7\x3B\xCA\x7D')]

        for i in range(16):
            c = DES.new(X[i],DES.MODE_ECB)
            if not (i&1): # (num&1) returns 1 for odd numbers 
                X[i+1:] = [c.encrypt(X[i])] # even
            else:
                X[i+1:] = [c.decrypt(X[i])] # odd

        self.assertEqual(b2a_hex(X[16]),
            b2a_hex(b('\x1B\x1A\x2D\xDB\x4C\x64\x24\x38')))
项目:hologram-python    作者:hologram-io    | 项目源码 | 文件源码
def request_hex_nonce(self):

        self.open_send_socket()

        # build nonce request payload string
        nonce_request = self.authentication.buildNonceRequestPayloadString()

        self.logger.debug("Sending nonce request with body of length %d", len(nonce_request))
        self.logger.debug('Send: %s', nonce_request)

        self.sock.send(nonce_request)
        self.logger.debug('Nonce request sent.')

        resultbuf_hex = binascii.b2a_hex(self.receive_send_socket(max_receive_bytes=32))

        if resultbuf_hex is None:
            raise HologramError('Internal nonce error')

        return resultbuf_hex
项目:wizard    作者:honor100    | 项目源码 | 文件源码
def encrypt(self,text):
        cryptor = AES.new(self.key,self.mode,b'0000000000000000')
        #????key ?????16?AES-128?,
        #24?AES-192?,??32 ?AES-256?Bytes ??
        #??AES-128 ??????
        length = 16
        count = len(text)
        if count < length:
            add = (length-count)
            #\0 backspace
            text = text + ('\0' * add)
        elif count > length:
            add = (length-(count % length))
            text = text + ('\0' * add)
        self.ciphertext = cryptor.encrypt(text)
        #??AES??????????????ascii??????????????????????
        #?????????????????16?????
        return b2a_hex(self.ciphertext)

    #????????????strip() ??
项目:plugin.video.lastship    作者:lastship    | 项目源码 | 文件源码
def yandex(url):
    try:
        cookie = client.request(url, output='cookie')

        r = client.request(url, cookie=cookie)
        r = re.sub(r'[^\x00-\x7F]+', ' ', r)

        sk = re.findall('"sk"\s*:\s*"([^"]+)', r)[0]

        idstring = re.findall('"id"\s*:\s*"([^"]+)', r)[0]

        idclient = binascii.b2a_hex(os.urandom(16))

        post = {'idClient': idclient, 'version': '3.9.2', 'sk': sk, '_model.0': 'do-get-resource-url', 'id.0': idstring}
        post = urllib.urlencode(post)

        r = client.request('https://yadi.sk/models/?_m=do-get-resource-url', post=post, cookie=cookie)
        r = json.loads(r)

        url = r['models'][0]['data']['file']

        return url
    except:
        return
项目:sphero_sprk    作者:CMU-ARM    | 项目源码 | 文件源码
def connect(self):
        """
        Connects the sphero with the address given in the constructor
        """
        self._device = bluepy.btle.Peripheral(self._addr, addrType=bluepy.btle.ADDR_TYPE_RANDOM)
        self._notifier = DelegateObj(self, self._notification_lock)
        #set notifier to be notified
        self._device.withDelegate(self._notifier)

        self._devModeOn()
        self._connected = True #Might need to change to be a callback format
        #get the command service
        cmd_service = self._device.getServiceByUUID(RobotControlService)
        self._cmd_characteristics = {}
        characteristic_list = cmd_service.getCharacteristics()
        for characteristic in characteristic_list:
            uuid_str = binascii.b2a_hex(characteristic.uuid.binVal).decode('utf-8')
            self._cmd_characteristics[uuid_str] = characteristic

        self._listening_flag = True
        self._listening_thread = threading.Thread(target=self._listening_loop)
        self._listening_thread.start()
项目:sphero_sprk    作者:CMU-ARM    | 项目源码 | 文件源码
def _devModeOn(self):
        """
        A sequence of read/write that enables the developer mode
        """
        service = self._device.getServiceByUUID(BLEService)
        characteristic_list = service.getCharacteristics()
        #make it into a dict
        characteristic_dict = {}
        for characteristic in characteristic_list:
            uuid_str = binascii.b2a_hex(characteristic.uuid.binVal).decode('utf-8')
            characteristic_dict[uuid_str] = characteristic

        characteristic = characteristic_dict[AntiDosCharacteristic]
        characteristic.write("011i3".encode(),True)
        characteristic = characteristic_dict[TXPowerCharacteristic]
        characteristic.write((7).to_bytes(1, 'big'),True)
        characteristic = characteristic_dict[WakeCharacteristic]
        characteristic.write((1).to_bytes(1, 'big'),True)
项目:MCSManager-fsmodule    作者:Suwings    | 项目源码 | 文件源码
def test_FortunaPool(self):
        """FortunaAccumulator.FortunaPool"""
        pool = FortunaAccumulator.FortunaPool()
        self.assertEqual(0, pool.length)
        self.assertEqual("5df6e0e2761359d30a8275058e299fcc0381534545f55cf43e41983f5d4c9456", pool.hexdigest())

        pool.append(b('abc'))

        self.assertEqual(3, pool.length)
        self.assertEqual("4f8b42c22dd3729b519ba6f68d2da7cc5b2d606d05daed5ad5128cc03e6c6358", pool.hexdigest())

        pool.append(b("dbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq"))

        self.assertEqual(56, pool.length)
        self.assertEqual(b('0cffe17f68954dac3a84fb1458bd5ec99209449749b2b308b7cb55812f9563af'), b2a_hex(pool.digest()))

        pool.reset()

        self.assertEqual(0, pool.length)

        pool.append(b('a') * 10**6)

        self.assertEqual(10**6, pool.length)
        self.assertEqual(b('80d1189477563e1b5206b2749f1afe4807e5705e8bd77887a60187a712156688'), b2a_hex(pool.digest()))
项目:MCSManager-fsmodule    作者:Suwings    | 项目源码 | 文件源码
def runTest(self):
        plaintext = a2b_hex(self.plaintext)
        ciphertext = a2b_hex(self.ciphertext)

        ct1 = b2a_hex(self._new().encrypt(plaintext))
        pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
        ct2 = b2a_hex(self._new().encrypt(plaintext))
        pt2 = b2a_hex(self._new(1).decrypt(ciphertext))

        if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
            # In PGP mode, data returned by the first encrypt()
            # is prefixed with the encrypted IV.
            # Here we check it and then remove it from the ciphertexts.
            eilen = len(self.encrypted_iv)
            self.assertEqual(self.encrypted_iv, ct1[:eilen])
            self.assertEqual(self.encrypted_iv, ct2[:eilen])
            ct1 = ct1[eilen:]
            ct2 = ct2[eilen:]

        self.assertEqual(self.ciphertext, ct1)  # encrypt
        self.assertEqual(self.ciphertext, ct2)  # encrypt (second time)
        self.assertEqual(self.plaintext, pt1)   # decrypt
        self.assertEqual(self.plaintext, pt2)   # decrypt (second time)
项目:MCSManager-fsmodule    作者:Suwings    | 项目源码 | 文件源码
def runTest(self):
        plaintext = a2b_hex(self.plaintext)
        ciphertext = a2b_hex(self.ciphertext)

        # The cipher should work like a stream cipher

        # Test counter mode encryption, 3 bytes at a time
        ct3 = []
        cipher = self._new()
        for i in range(0, len(plaintext), 3):
            ct3.append(cipher.encrypt(plaintext[i:i+3]))
        ct3 = b2a_hex(b("").join(ct3))
        self.assertEqual(self.ciphertext, ct3)  # encryption (3 bytes at a time)

        # Test counter mode decryption, 3 bytes at a time
        pt3 = []
        cipher = self._new()
        for i in range(0, len(ciphertext), 3):
            pt3.append(cipher.encrypt(ciphertext[i:i+3]))
        # PY3K: This is meant to be text, do not change to bytes (data)
        pt3 = b2a_hex(b("").join(pt3))
        self.assertEqual(self.plaintext, pt3)  # decryption (3 bytes at a time)
项目:MCSManager-fsmodule    作者:Suwings    | 项目源码 | 文件源码
def runTest(self):
        from Crypto.Cipher import DES
        from binascii import b2a_hex

        X = []
        X[0:] = [b('\x94\x74\xB8\xE8\xC7\x3B\xCA\x7D')]

        for i in range(16):
            c = DES.new(X[i],DES.MODE_ECB)
            if not (i&1): # (num&1) returns 1 for odd numbers 
                X[i+1:] = [c.encrypt(X[i])] # even
            else:
                X[i+1:] = [c.decrypt(X[i])] # odd

        self.assertEqual(b2a_hex(X[16]),
            b2a_hex(b('\x1B\x1A\x2D\xDB\x4C\x64\x24\x38')))
项目:geekcloud    作者:GeekCloud-Team    | 项目源码 | 文件源码
def encrypt(self, passwd=None, length=32):
        """
        encrypt gen password
        ???????????
        """
        if not passwd:
            passwd = self.gen_rand_pass()

        cryptor = AES.new(self.key, self.mode, b'8122ca7d906ad5e1')
        try:
            count = len(passwd)
        except TypeError:
            raise ServerError('Encrypt password error, TYpe error.')

        add = (length - (count % length))
        passwd += ('\0' * add)
        cipher_text = cryptor.encrypt(passwd)
        return b2a_hex(cipher_text)
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def _convert_host_to_hex(host):
    """
    Convert the provided host to the format in /proc/net/tcp*

    /proc/net/tcp uses little-endian four byte hex for ipv4
    /proc/net/tcp6 uses little-endian per 4B word for ipv6

    Args:
        host: String with either hostname, IPv4, or IPv6 address

    Returns:
        List of tuples containing address family and the
        little-endian converted host
    """
    ips = []
    if host is not None:
        for family, ip in _convert_host_to_ip(host):
            hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip))
            hexip_hf = ""
            for i in range(0, len(hexip_nf), 8):
                ipgroup_nf = hexip_nf[i:i+8]
                ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16))
                hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf)
            ips.append((family, hexip_hf))
    return ips
项目:slidoc    作者:mitotic    | 项目源码 | 文件源码
def oauth_request_parameters(consumer_token, url, access_token, parameters={},
                             method="GET", oauth_version="1.0a",
                             override_version=""):
    base_args = dict(
        oauth_consumer_key=consumer_token["key"],
        oauth_token=access_token["key"],
        oauth_signature_method="HMAC-SHA1",
        oauth_timestamp=str(int(time.time())),
        oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
        oauth_version=override_version or oauth_version,
    )
    args = base_args.copy()
    args.update(parameters)
    if oauth_version == "1.0a":
        signature = tornado.auth._oauth10a_signature(consumer_token, method, url, args, access_token)
    else:
        signature = tornado.auth._oauth_signature(consumer_token, method, url, args, access_token)
    base_args["oauth_signature"] = signature
    return base_args
项目:tests    作者:open-power-host-os    | 项目源码 | 文件源码
def __init__(self, name, resultdir, vt_type, test=None, mux=None):
        self.id = binascii.b2a_hex(os.urandom(20))
        self.name = str(name)
        self.shortname = "_".join(self.name.split('_')[1:])
        self.job_dir = None
        self.type = str(name.split('_')[0])
        self.resultdir = resultdir
        self.conf = None
        self.test = test
        self.mux = mux
        self.run = "Not_Run"
        self.runsummary = None
        self.runlink = None
        if self.type == 'guest':
            self.vt_type = vt_type
        else:
            self.vt_type = None
项目:fewsn-zigbee-base-station    作者:sethmbaker    | 项目源码 | 文件源码
def parse_xbee_rf_data(data):

    if DEBUG:
        print data

    node_addr = binascii.b2a_hex(data['source_addr_long']) + binascii.b2a_hex((data['source_addr']))
    readings = re.findall("{.*?}", data['rf_data'])        # returns a list of K-V pair matches
    payload = {'node': node_addr}

    for reading in readings:
        item_dict = json.loads(reading)
        for k, v in item_dict.iteritems():
            sensor_name = k.encode('utf-8')
            sensor_value = v.encode('utf-8')
            if sensor_name == "temp:":
                sensor_name = "temp"
            payload['sensor'] = sensor_name
            payload['val'] = sensor_value
            # now pass the payload to the RequestBuilder and send it
            print payload
            if PRODUCTION:
                sendHTTPPost(payload)
项目:pbtranscript    作者:PacificBiosciences    | 项目源码 | 文件源码
def get_sample_name(input_sample_name):
    """
    Return sample name, only alphabet and digits are allowed. (no underscore, no space)
    If input_sample_name is None or empty string, return a random string
    starts with 'sample'
    """
    if input_sample_name is None or str(input_sample_name) == "None":
        input_sample_name = ""
    else:
        input_sample_name = str(input_sample_name)

    input_sample_name = "".join([x for x in input_sample_name
                                 if x.isalpha() or x.isdigit()])

    if len(input_sample_name) == 0:
        import binascii
        return str("sample"+binascii.b2a_hex(os.urandom(3)))
    else:
        return input_sample_name
项目:RaiWalletBot    作者:SergiySW    | 项目源码 | 文件源码
def seed_callback(bot, update, args):
    user_id = update.message.from_user.id
    chat_id = update.message.chat_id
    lang_id = mysql_select_language(user_id)
    seed = mysql_select_seed(user_id)
    if (seed is False):
        seed = binascii.b2a_hex(os.urandom(8)).upper()
        mysql_set_seed(user_id, seed)
    seed_split = [seed[i:i+4] for i in range(0, len(seed), 4)]
    seed_text = seed_split[0] + '-' + seed_split[1] + '-' + seed_split[2] + '-' + seed_split[3]
    check = mysql_check_password(user_id)
    if ((len(args) > 0) and (check is not False)):
        password = args[0]
        hex = password_check(update, password)
        if (check == hex):
            message_markdown(bot, chat_id, lang_text('seed_creation', lang_id).format(seed_text))
        else:
            text_reply(update, lang_text('password_error', lang_id))
    elif (check is not False):
        text_reply(update, lang_text('password_error', lang_id))
    else:
        message_markdown(bot, chat_id, lang_text('seed_creation', lang_id).format(seed_text))
项目:ekko    作者:openstack    | 项目源码 | 文件源码
def read_body(self, f, body_checksum):
        checksum = 0

        data = f.read(32)
        while len(data) == 32:
            meta = dict()
            segment = ""

            segment,
            meta['incremental'],
            meta['base'],
            meta['encryption'],
            meta['compression'],
            sha1 = unpack('<2IH2B20s', data)

            meta['sha1_hash'] = b2a_hex(sha1)

            self.segments[segment] = meta
            checksum = crc32(data, checksum)
            data = f.read(32)

        if checksum != body_checksum:
            raise Exception('Body checksum does not match')
项目:ekko    作者:openstack    | 项目源码 | 文件源码
def main():
    args = parse_args()
    manifest = driver.DriverManager(
        namespace='ekko.manifest.drivers',
        name=args.driver,
        invoke_on_load=True,
        invoke_args=[args.manifest]
    ).driver

    for segment in manifest.get_segments(manifest.get_metadata()):
        print(binascii.b2a_hex(segment.segment_hash))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def hex_encode(input, errors='strict'):
    assert errors == 'strict'
    return (binascii.b2a_hex(input), len(input))