我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用binascii.a2b_hex()。
def test_ttag_info_packet(self): pkt = sdds_pkt.sdds_packet() res=binascii.a2b_hex('00000000') self.assertEqual( pkt.header.ttag.info.asString(), res ) self.assertEqual( pkt.get_msv(), 0 ) res=binascii.a2b_hex('80000000') pkt.set_msv() self.assertEqual( pkt.header.ttag.info.asString(), res ) self.assertEqual( pkt.get_msv(), 1 ) pkt.set_msv(False) res=binascii.a2b_hex('40000000') pkt.set_ttv() self.assertEqual( pkt.header.ttag.info.asString(), res ) self.assertEqual( pkt.get_ttv(), 1 ) pkt.set_ttv(False) res=binascii.a2b_hex('20000000') pkt.set_sscv() self.assertEqual( pkt.header.ttag.info.asString(), res ) self.assertEqual( pkt.get_sscv(), 1 ) pkt.set_sscv(False)
def hex_decode(input,errors='strict'): """ Decodes the object input and returns a tuple (output object, length consumed). input must be an object which provides the bf_getreadbuf buffer slot. Python strings, buffer objects and memory mapped files are examples of objects providing this slot. errors defines the error handling to apply. It defaults to 'strict' handling which is the only currently supported error handling for this codec. """ assert errors == 'strict' output = binascii.a2b_hex(input) return (output, len(input))
def mongodb(self,user,pass_): try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((self.ip,int(self.port))) data = binascii.a2b_hex("3a000000a741000000000000d40700000000000061646d696e2e24636d640000000000ffffffff130000001069736d6173746572000100000000") s.send(data) result = s.recv(1024) if "ismaster" in result: getlog_data = binascii.a2b_hex("480000000200000000000000d40700000000000061646d696e2e24636d6400000000000100000021000000026765744c6f670010000000737461727475705761726e696e67730000") s.send(getlog_data) result = s.recv(1024) if "totalLinesWritten" in result: return "unauthorized" else:return 3 except Exception,e: return 3
def test_body_encoding(self): unicode_body = u("\xe9") byte_body = binascii.a2b_hex(b"e9") # unicode string in body gets converted to utf8 response = self.fetch("/echopost", method="POST", body=unicode_body, headers={"Content-Type": "application/blah"}) self.assertEqual(response.headers["Content-Length"], "2") self.assertEqual(response.body, utf8(unicode_body)) # byte strings pass through directly response = self.fetch("/echopost", method="POST", body=byte_body, headers={"Content-Type": "application/blah"}) self.assertEqual(response.headers["Content-Length"], "1") self.assertEqual(response.body, byte_body) # Mixing unicode in headers and byte string bodies shouldn't # break anything response = self.fetch("/echopost", method="POST", body=byte_body, headers={"Content-Type": "application/blah"}, user_agent=u("foo")) self.assertEqual(response.headers["Content-Length"], "1") self.assertEqual(response.body, byte_body)
def runTest(self): plaintext = a2b_hex(self.plaintext) ciphertext = a2b_hex(self.ciphertext) ct1 = b2a_hex(self._new().encrypt(plaintext)) pt1 = b2a_hex(self._new(1).decrypt(ciphertext)) ct2 = b2a_hex(self._new().encrypt(plaintext)) pt2 = b2a_hex(self._new(1).decrypt(ciphertext)) if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP: # In PGP mode, data returned by the first encrypt() # is prefixed with the encrypted IV. # Here we check it and then remove it from the ciphertexts. eilen = len(self.encrypted_iv) self.assertEqual(self.encrypted_iv, ct1[:eilen]) self.assertEqual(self.encrypted_iv, ct2[:eilen]) ct1 = ct1[eilen:] ct2 = ct2[eilen:] self.assertEqual(self.ciphertext, ct1) # encrypt self.assertEqual(self.ciphertext, ct2) # encrypt (second time) self.assertEqual(self.plaintext, pt1) # decrypt self.assertEqual(self.plaintext, pt2) # decrypt (second time)
def runTest(self): plaintext = a2b_hex(self.plaintext) ciphertext = a2b_hex(self.ciphertext) # The cipher should work like a stream cipher # Test counter mode encryption, 3 bytes at a time ct3 = [] cipher = self._new() for i in range(0, len(plaintext), 3): ct3.append(cipher.encrypt(plaintext[i:i+3])) ct3 = b2a_hex(b("").join(ct3)) self.assertEqual(self.ciphertext, ct3) # encryption (3 bytes at a time) # Test counter mode decryption, 3 bytes at a time pt3 = [] cipher = self._new() for i in range(0, len(ciphertext), 3): pt3.append(cipher.encrypt(ciphertext[i:i+3])) # PY3K: This is meant to be text, do not change to bytes (data) pt3 = b2a_hex(b("").join(pt3)) self.assertEqual(self.plaintext, pt3) # decryption (3 bytes at a time)
def _get_notification(self, token_hex, payload): """ Takes a token as a hex string and a payload as a Python dict and sends the notification """ token_bin = a2b_hex(token_hex) token_length_bin = APNs.packed_ushort_big_endian(len(token_bin)) payload_json = payload.json() payload_length_bin = APNs.packed_ushort_big_endian(len(payload_json)) zero_byte = '\0' if sys.version_info[0] != 2: zero_byte = bytes(zero_byte, 'utf-8') notification = (zero_byte + token_length_bin + token_bin + payload_length_bin + payload_json) return notification
def run(self, objHmac, mk, ssid, fast=False): #Requires Python 2.7.8 or later #Ubuntu 14.04 LTS generally only updates to 2.7.6 :( #if fast==True: # return hashlib.pbkdf2_hmac('sha1', mk, ssid, 4095) x1 = objHmac.load(mk, ssid + '\0\0\0\1', fast) x2 = objHmac.load(mk, ssid + '\0\0\0\2', fast) f1 = a2b_hex(x1) f2 = a2b_hex(x2) for x in xrange(4095): x1 = objHmac.load(mk, a2b_hex(x1), fast) x2 = objHmac.load(mk, a2b_hex(x2), fast) f1 = self.xorString(a2b_hex(x1), f1) f2 = self.xorString(a2b_hex(x2), f2) out = b2a_hex(f1) + b2a_hex(f2) return out[0:64]
def Heuristics(self, data, noDecode=False): if data.lower().startswith('http:'): return data if data[::-1].lower().startswith('http:'): return data[::-1] if noDecode: return data try: decoded = binascii.a2b_hex(data) return self.Heuristics(decoded, True) except: if not re.compile(r'^[0-9a-zA-Z/=]+$').match(data): return data try: decoded = binascii.a2b_base64(data) return self.Heuristics(decoded, True) except: return data # bruteforce XOR; short strings (< 10) are keys
def HexDecode(hexstream, options): if hexstream == None: return '' result = '' for entry in hexstream: if isinstance(entry, str): if len(entry) % 2 == 1: if options.hexshift: hexdata = '0' + entry else: hexdata = entry + '0' elif options.hexshift: hexdata = '0' + entry + '0' else: hexdata = entry result += binascii.a2b_hex(hexdata) else: result += entry[1] return result
def __init__(self, tl_line, is_method=None): line = TL_line_regex.match(tl_line) crc = struct.unpack('>i', binascii.a2b_hex( '{:>08}'.format(line.group(2)) ) )[0] params = [ {'name': name, 'type': type} for name, type in TL_param_regex.findall(line.group(3)) if type != 'Type}' ] super().__init__( name=line.group(1), crc=crc, params=params, result=line.group(4), is_method=is_method )
def from_tl_line(cls, tl_line, is_method, tl_info, no_crc_verify=False): line = cls.TL_line_regex.match(tl_line) crc = struct.unpack('>i', binascii.a2b_hex( '{:>08}'.format(line.group(2)) ) )[0] params = [ TLTokenParam(name=name, type=type, flags=flags) for name, flags, type in cls.TL_param_regex.findall(line.group(3)) ] return cls( name=line.group(1), crc=crc, params=params, result=line.group(4), is_method=is_method, tl_info=tl_info, no_crc_verify=no_crc_verify, )
def check(ip, port, timeout): try: socket.setdefaulttimeout(timeout) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ip, int(port))) data = binascii.a2b_hex( "3a000000a741000000000000d40700000000000061646d696e2e24636d640000000000ffffffff130000001069736d6173746572000100000000") s.send(data) result = s.recv(1024) if "ismaster" in result: getlog_data = binascii.a2b_hex( "480000000200000000000000d40700000000000061646d696e2e24636d6400000000000100000021000000026765744c6f670010000000737461727475705761726e696e67730000") s.send(getlog_data) result = s.recv(1024) if "totalLinesWritten" in result: return u"?????" except Exception, e: pass
def hex_decode(input, errors='strict'): assert errors == 'strict' return (binascii.a2b_hex(input), len(input))
def decode(self, input, final=False): assert self.errors == 'strict' return binascii.a2b_hex(input)
def test_format_identifier_api(self): # get default format identifer.. sf=1, sos=1, dm=1 bps=8 all others 0 res=binascii.a2b_hex('C108') formatid = format_identifier() self.assertEqual( formatid.asString(), res ) # assign from values sf=1, sos=1, dm =4, bps=16 formatid= format_identifier.from_buffer_copy('\xC4\x10') self.assertEqual( formatid.sf, 1 ) self.assertEqual( formatid.sos, 1 ) self.assertEqual( formatid.dm, 4 ) self.assertEqual( formatid.bps, 16 ) # assign 32 bit value and get back formatid= format_identifier() formatid.set_bps(32) self.assertEqual( formatid.bps, 31 ) res=formatid.get_bps() self.assertEqual( res, 32 ) # assign data mode... formatid.set_dmode( [ 1,1,1 ] ) self.assertEqual( formatid.dm, 7 ) formatid.set_dmode( 5 ) self.assertEqual( formatid.dm, 5 )
def test_frame_sequence(self): # get default frame sequence == 0 res=binascii.a2b_hex('0000') fsn = frame_sequence() self.assertEqual( fsn.asString(), res ) # test big endian format for number seq=256 res=struct.pack("!H",seq) fsn.seq = seq self.assertEqual( fsn.asString(), res ) # add one... fsn.inc() seq +=1 res=struct.pack("!H",seq) self.assertEqual( fsn.asString(), res ) # set for rollover seq =65535 fsn.seq = seq res=struct.pack("!H",seq) self.assertEqual( fsn.asString(), res ) # set for rollover fsn.inc() res=struct.pack("!H",0) self.assertEqual( fsn.asString(), res )
def test_frame_sequence_packet(self): pkt = sdds_pkt.sdds_packet() res=binascii.a2b_hex('0000') self.assertEqual( pkt.header.fsn.asString(), res ) # test big endian format for number seq=256 res=struct.pack("!H",seq) pkt.set_fsn(seq) self.assertEqual( pkt.header.fsn.asString(), res ) self.assertEqual( pkt.get_fsn(), seq ) # add one... seq +=1 res=struct.pack("!H",seq) pkt.inc_fsn() self.assertEqual( pkt.header.fsn.asString(), res ) self.assertEqual( pkt.get_fsn(), seq ) # set for rollover seq =65535 res=struct.pack("!H",seq) pkt.set_fsn(seq) self.assertEqual( pkt.header.fsn.asString(), res ) self.assertEqual( pkt.get_fsn(), seq ) # set for rollover res=struct.pack("!H",0) pkt.inc_fsn() self.assertEqual( pkt.header.fsn.asString(), res ) self.assertEqual( pkt.get_fsn(), 0 )
def test_ttag_info_ttv(self): msptr_=0 msdelta_=0 ttag_=0 ttage_=0 res=struct.pack("!HHQI", msptr_, msdelta_, ttag_, ttage_) ttag_val = ttag_info() self.assertEqual( ttag_val.asString(), res ) # test big endian format for number ttag_= 4294967296 ttage_= 8388608 msptr_=2047 msdelta_=256 res=struct.pack("!HHQI", msptr_, msdelta_, ttag_, ttage_) ttag_val.info.msptr.msptr=msptr_ ttag_val.info.msptr.msdelta=msdelta_ ttag_val.tstamp.ttag=ttag_ ttag_val.tstamp.ttage=ttage_ self.assertEqual( ttag_val.asString(), res ) ttag_val.info.info.set_ttv() res=struct.pack("!QI", ttag_, ttage_) # first bit is msv tinfo=binascii.a2b_hex('47FF0100') res=tinfo+res self.assertEqual( ttag_val.asString(), res )
def test_ttag_info_sscv(self): msptr_=0 msdelta_=0 ttag_=0 ttage_=0 res=struct.pack("!HHQI", msptr_, msdelta_, ttag_, ttage_) ttag_val = ttag_info() self.assertEqual( ttag_val.asString(), res ) # test big endian format for number ttag_= 4294967296 ttage_= 8388608 msptr_=2047 msdelta_=256 res=struct.pack("!HHQI", msptr_, msdelta_, ttag_, ttage_) ttag_val.info.msptr.msptr=msptr_ ttag_val.info.msptr.msdelta=msdelta_ ttag_val.tstamp.ttag=ttag_ ttag_val.tstamp.ttage=ttage_ self.assertEqual( ttag_val.asString(), res ) ttag_val.info.info.set_sscv() res=struct.pack("!QI", ttag_, ttage_) # first bit is msv tinfo=binascii.a2b_hex('27FF0100') res=tinfo+res self.assertEqual( ttag_val.asString(), res )