我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用base64.b32encode()。
def check_for_wildcards(args, server, name, rectype, tries=4): """ Verify that the DNS server doesn't return wildcard results for domains which don't exist, it should correctly return NXDOMAIN. """ resolver = Resolver() resolver.timeout = args.timeout resolver.lifetime = args.timeout resolver.nameservers = [server] nx_names = [base64.b32encode( os.urandom( random.randint(8, 10)) ).strip('=').lower() + name for _ in range(0, tries)] correct_result_count = 0 for check_nx_name in nx_names: try: result = resolver.query(check_nx_name, rectype) return False # Any valid response = immediate fail! except (NXDOMAIN, NoNameservers): correct_result_count += 1 except DNSException: continue return correct_result_count > (tries / 2.0)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters): parameters = [ ("digits", hotp._length), ("secret", base64.b32encode(hotp._key)), ("algorithm", hotp._algorithm.name.upper()), ] if issuer is not None: parameters.append(("issuer", issuer)) parameters.extend(extra_parameters) uriparts = { "type": type_name, "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer else quote(account_name)), "parameters": urlencode(parameters), } return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def to_text(self, origin=None, relativize=True, **kw): next = base64.b32encode(self.next).translate( b32_normal_to_hex).lower().decode() if self.salt == b'': salt = '-' else: salt = binascii.hexlify(self.salt).decode() text = u'' for (window, bitmap) in self.windows: bits = [] for i in xrange(0, len(bitmap)): byte = bitmap[i] for j in xrange(0, 8): if byte & (0x80 >> j): bits.append(dns.rdatatype.to_text(window * 256 + i * 8 + j)) text += (u' ' + u' '.join(bits)) return u'%u %u %u %s %s%s' % (self.algorithm, self.flags, self.iterations, salt, next, text)
def __init__(self, root, env, data): self.root = root self.env = env self.data = data self.id = base64.b32encode(os.urandom(10)).decode("ascii") self.title = self.get_title() self.jobs = self.get_jobs() self.status = "pending" self.tasks = [] self.task_job_map = {} self.name_job_map = {} for job in self.jobs: self.name_job_map[job.config["name"]] = job
def to_text(self, origin=None, relativize=True, **kw): next = base64.b32encode(self.next).translate(b32_normal_to_hex).lower() if self.salt == '': salt = '-' else: salt = self.salt.encode('hex-codec') text = '' for (window, bitmap) in self.windows: bits = [] for i in xrange(0, len(bitmap)): byte = ord(bitmap[i]) for j in xrange(0, 8): if byte & (0x80 >> j): bits.append(dns.rdatatype.to_text(window * 256 + \ i * 8 + j)) text += (' ' + ' '.join(bits)) return '%u %u %u %s %s%s' % (self.algorithm, self.flags, self.iterations, salt, next, text)
def get_id(source_uuid): """Derive a short (12 character) id from a random UUID. The supplied UUID must be a version 4 UUID object. """ if isinstance(source_uuid, six.string_types): source_uuid = uuid.UUID(source_uuid) if source_uuid.version != 4: raise ValueError(_('Invalid UUID version (%d)') % source_uuid.version) # The "time" field of a v4 UUID contains 60 random bits # (see RFC4122, Section 4.4) random_bytes = _to_byte_string(source_uuid.time, 60) # The first 12 bytes (= 60 bits) of base32-encoded output is our data encoded = base64.b32encode(six.b(random_bytes))[:12] if six.PY3: return encoded.lower().decode('utf-8') else: return encoded.lower()
def writeServerPassword( password ): """ Dump our ScrambleSuit server descriptor to file. The file should make it easy for bridge operators to obtain copy & pasteable server descriptors. """ assert len(password) == const.SHARED_SECRET_LENGTH assert const.STATE_LOCATION != "" passwordFile = os.path.join(const.STATE_LOCATION, const.PASSWORD_FILE) log.info("Writing server password to file `%s'." % passwordFile) password_str = "# You are supposed to give this password to your clients to append it to their Bridge line" password_str = "# For example: Bridge scramblesuit 192.0.2.1:5555 EXAMPLEFINGERPRINTNOTREAL password=EXAMPLEPASSWORDNOTREAL" password_str = "# Here is your password:" password_str = "password=%s\n" % base64.b32encode(password) try: with open(passwordFile, 'w') as fd: fd.write(password_str) except IOError as err: log.error("Error writing password file to `%s': %s" % (passwordFile, err))
def get_public_server_options( cls, transportOptions ): """ Return ScrambleSuit's BridgeDB parameters, i.e., the shared secret. As a fallback mechanism, we return an automatically generated password if the bridge operator did not use `ServerTransportOptions'. """ #log.debug("Tor's transport options: %s" % str(transportOptions)) if not "password" in transportOptions: #log.warning("No password found in transport options (use Tor's " \ # "`ServerTransportOptions' to set your own password)." \ # " Using automatically generated password instead.") srv = state.load() transportOptions = {"password": base64.b32encode(srv.fallbackPassword)} cls.uniformDHSecret = srv.fallbackPassword return transportOptions
def main(): submission = raw_input("flag> ") low_sub = submission.lower() temp_list = list(low_sub) temp_list.reverse() flag = "".join(temp_list) new_string = "" for char in flag: if char in BASE: new_char = BASE[char] else: new_char = char new_string += new_char to_encode = new_string.encode('utf8') to_give = base64.b16encode(to_encode) #to_give = base64.b32encode(to_encode) #to_give = base64.b64encode(to_encode) print to_give
def setCacheEntryObjectCacheDbms(srvObj, cacheEntryObj): """ Set the file size field for a row in the local Cache Contents DBMS. srvObj: Reference to server object (ngamsServer). cacheEntryObj: Cache entry object instance (ngamsCacheEntry). Returns: Void. """ T = TRACE() cacheEntryObjPickle = cPickle.dumps(cacheEntryObj) cacheEntryObjPickleEnc = base64.b32encode(cacheEntryObjPickle) sqlQuery = _SET_CACHE_ENTRY_OBJECT_CACHE_DBMS_QUERY %\ (cacheEntryObjPickleEnc, cacheEntryObj.getDiskId(), cacheEntryObj.getFileId(), cacheEntryObj.getFileVersion()) queryCacheDbms(srvObj, sqlQuery)
def address_bytes_to_string(proto, buf): from .util import decode_big_endian_16 if proto.code == P_IP4: return str(IPAddress(int(buf, 16), 4).ipv4()) elif proto.code == P_IP6: return str(IPAddress(int(buf, 16), 6).ipv6()) elif proto.code in [P_TCP, P_UDP, P_DCCP, P_SCTP]: return str(decode_big_endian_16(binascii.unhexlify(buf))) elif proto.code == P_ONION: buf = binascii.unhexlify(buf) addr_bytes, port_bytes = (buf[:-2], buf[-2:]) addr = base64.b32encode(addr_bytes).decode('ascii').lower() port = str(decode_big_endian_16(port_bytes)) return ':'.join([addr, port]) elif proto.code == P_IPFS: buf = binascii.unhexlify(buf) size, num_bytes_read = read_varint_code(buf) buf = buf[num_bytes_read:] if len(buf) != size: raise ValueError("inconsistent lengths") return base58.b58encode(buf) raise ValueError("unknown protocol")
def tth_generate(self,file): # Generates the Tiger Tree Hash (Merkle Tree) for a given file # During the hashing of the raw data from the file, the leaf hash function uses the marker 0x00 prepended to the data before tiger hashing it. Similarly, the marker 0x01 is prepended in case of internal nodes. blocksize = 1024 # Standard Block Size filesize = os.path.getsize(file) # Get filesize for subsequent calculations if filesize==0: return [[self.tiger_hash(chr(0))]] # On failure of getsize or if file is empty, return hash for empty file. try: handle = open(file,"rb") # Open file for reading in binary mode except: return None # If it doesnt exist or is inaccessible, dont bother. level = [[]] # List of Levels, Level 0 Empty for i in range(int(math.ceil(float(filesize)/blocksize))): block = handle.read(blocksize) # Read part of the file level[0].append(self.tiger_hash(chr(0)+block)) # Hash that part only, and put it in Level 0 handle.close() # Close file current = 0 # Starting from level 0 while len(level[0])>1: # If whole file hasent been hashed yet level.append([]) # Create new level for i in range(len(level[current])/2): # For all hash pairs level[1].append( self.tiger_hash(chr(1)+level[0][2*i]+level[0][2*i+1]) ) # Combine two hashes to get a binary tree like structure. if len(level[0])%2==1: level[1].append(level[0][-1]) # If last item cannot be paired, promote it untouched. del level[0] # Discard lower level hashes, as they are no longer necessary return base64.b32encode(level[0][0])[:-1] # Result will be 40 characters; discarding the trailing '=' makes it 39
def _to_safe_path_param_name(matched_parameter): """Creates a safe string to be used as a regex group name. Only alphanumeric characters and underscore are allowed in variable name tokens, and numeric are not allowed as the first character. We cast the matched_parameter to base32 (since the alphabet is safe), strip the padding (= not safe) and prepend with _, since we know a token can begin with underscore. Args: matched_parameter: A string containing the parameter matched from the URL template. Returns: A string that's safe to be used as a regex group name. """ return '_' + base64.b32encode(matched_parameter).rstrip('=')
def to_text(self, origin=None, relativize=True, **kw): next = base64.b32encode(self.next).translate(b32_normal_to_hex).lower() if self.salt == '': salt = '-' else: salt = self.salt.encode('hex-codec') text = '' for (window, bitmap) in self.windows: bits = [] for i in xrange(0, len(bitmap)): byte = ord(bitmap[i]) for j in xrange(0, 8): if byte & (0x80 >> j): bits.append(rdatatype.to_text(window * 256 + \ i * 8 + j)) text += (' ' + ' '.join(bits)) return '%u %u %u %s %s%s' % (self.algorithm, self.flags, self.iterations, salt, next, text)
def _create_jwt(self, claim: Dict[str, Any], key_pemstr: str) -> str: _log.debug("Encoding JWT response: %s", claim) fp = base64.b32encode( SHA256.new( data=RSA.importKey(key_pemstr).publickey().exportKey(format="DER") ).digest()[0:30] # shorten to 240 bit presumably so no padding is necessary ).decode('utf-8') kid = ":".join([fp[i:i + 4] for i in range(0, len(fp), 4)]) jwtstr = jwt.encode( claim, headers={ "typ": "JWT", "alg": "RS256", "kid": kid, }, key=key_pemstr, algorithm="RS256", ).decode('utf-8') _log.debug("JWT response: %s", jwtstr) return jwtstr
def create_magnet_from_es_info(): def _create_magnet_from_es_info(display_name, info_hash, max_trackers=5, trackers=None): if trackers is None: trackers = get_default_trackers() magnet_parts = [ ('dn', display_name) ] for tracker in trackers[:max_trackers]: magnet_parts.append(('tr', tracker)) b32_info_hash = b32encode(bytes.fromhex(info_hash)).decode('utf-8') return 'magnet:?xt=urn:btih:' + b32_info_hash + '&' + urlencode(magnet_parts) return dict(create_magnet_from_es_info=_create_magnet_from_es_info) # ######################### TEMPLATE GLOBALS #########################
def rand_name(): return base64.b32encode(os.urandom(50))[:random.randint(10, 30)].lower()
def create_random_name(prefix='aztest', length=24): if len(prefix) > length: raise ValueError('The length of the prefix must not be longer than random name length') padding_size = length - len(prefix) if padding_size < 4: raise ValueError('The randomized part of the name is shorter than 4, which may not be able to offer enough ' 'randomness') random_bytes = os.urandom(int(math.ceil(float(padding_size) / 8) * 5)) random_padding = base64.b32encode(random_bytes)[:padding_size] return str(prefix + random_padding.decode().lower())
def random_branch(): return base64.b32encode(pysodium.randombytes(10))\ .decode('utf-8').lower()
def _oauth2_handler(self, request): client = await self.oauth.oauth(request.GET["code"], request.GET["state"]) user_data = await client.get("user") login = user_data["login"].encode("utf8") self.tokens[login] = client.token sid = base64.b32encode(os.urandom(15)).decode('ascii') self.sessions[sid] = login resp = web.HTTPFound("/monitor/") resp.set_cookie(self.config["cookie_name"], sid) return resp