我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用base64.encodebytes()。
def _read_file(self, blob, format): """Reads a non-notebook file. blob: instance of :class:`google.cloud.storage.Blob`. format: If "text", the contents will be decoded as UTF-8. If "base64", the raw bytes contents will be encoded as base64. If not specified, try to decode as UTF-8, and fall back to base64 """ bcontent = blob.download_as_string() if format is None or format == "text": # Try to interpret as unicode if format is unknown or if unicode # was explicitly requested. try: return bcontent.decode("utf8"), "text" except UnicodeError: if format == "text": raise web.HTTPError( 400, "%s is not UTF-8 encoded" % self._get_blob_path(blob), reason="bad format", ) return base64.encodebytes(bcontent).decode("ascii"), "base64"
def _tunnel(sock, host, port, auth): debug("Connecting proxy...") connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port) # TODO: support digest auth. if auth and auth[0]: auth_str = auth[0] if auth[1]: auth_str += ":" + auth[1] encoded_str = base64encode(auth_str.encode()).strip().decode() connect_header += "Proxy-Authorization: Basic %s\r\n" % encoded_str connect_header += "\r\n" dump("request header", connect_header) send(sock, connect_header) try: status, resp_headers = read_headers(sock) except Exception as e: raise WebSocketProxyException(str(e)) if status != 200: raise WebSocketProxyException( "failed CONNECT via proxy status: %r" % status) return sock
def get_host_info(self, host): x509 = {} if isinstance(host, tuple): host, x509 = host auth, host = urllib_parse.splituser(host) if auth: auth = urllib_parse.unquote_to_bytes(auth) auth = base64.encodebytes(auth).decode("utf-8") auth = "".join(auth.split()) # get rid of whitespace extra_headers = [ ("Authorization", "Basic " + auth) ] else: extra_headers = [] return host, extra_headers, x509 ## # Connect to server. # # @param host Target host. # @return An HTTPConnection object
def storage(self, session_id): # ?? Session ?????????????? Session ID session_path = os.path.join(self.__storage_path__, session_id) # ????? Session ??????????????? if self.__storage_path__ is not None: data = {} for key, v in self.__session_map__[session_id].items(): if key not in self.ignore_key: data[key] = v with open(session_path, 'wb') as f: # ???????????? content = json.dumps(data) # ?? base64 ?????????????????????????? f.write(base64.encodebytes(content.encode())) # ???????
def download_comments_terms(username, password): userpass = '%s:%s' % (username, password) auth_encoded = base64.encodebytes(userpass.encode('ascii'))[:-1] ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE req = Request(TERM_API_COMMENTS) req.add_header('Authorization', 'Basic %s' % str(auth_encoded, 'utf-8')) response = urlopen(req, context=ctx) str_response = response.read().decode('utf-8') j = json.loads(str_response) return {str(script(_t["IEML"]["value"])) : { "comment": _t["commentaire_sur_terme"], "drupal_nid": _t["Nid"] } for _t in j}
def get_response_sspi(self, challenge=None): dprint("pywin32 SSPI") if challenge: try: challenge = base64.decodebytes(challenge.encode("utf-8")) except: challenge = base64.decodestring(challenge) output_buffer = None try: error_msg, output_buffer = self.sspi_client.authorize(challenge) except pywintypes.error: traceback.print_exc(file=sys.stdout) return None response_msg = output_buffer[0].Buffer try: response_msg = base64.encodebytes(response_msg.encode("utf-8")) except: response_msg = base64.encodestring(response_msg) response_msg = response_msg.decode("utf-8").replace('\012', '') return response_msg
def GetAuthHeader(self, http_method, http_url): """Generates the Authorization header. The form of the secure AuthSub Authorization header is Authorization: AuthSub token="token" sigalg="sigalg" data="data" sig="sig" and data represents a string in the form data = http_method http_url timestamp nonce Args: http_method: string HTTP method i.e. operation e.g. GET, POST, PUT, etc. http_url: string or atom.url.Url HTTP URL to which request is made. Returns: dict Header to be sent with every subsequent request after authentication. """ timestamp = int(math.floor(time.time())) nonce = '%lu' % random.randrange(1, 2 ** 64) data = '%s %s %d %s' % (http_method, str(http_url), timestamp, nonce) # noinspection PyDeprecation sig = encodebytes(str(self.rsa_key.hashAndSign(data))).rstrip() header = {'Authorization': '%s"%s" data="%s" sig="%s" sigalg="rsa-sha1"' % (AUTHSUB_AUTH_LABEL, self.token_string, data, sig)} return header
def get_host_info(self, host): x509 = {} if isinstance(host, tuple): host, x509 = host auth, host = urllib.parse.splituser(host) if auth: auth = urllib.parse.unquote_to_bytes(auth) auth = base64.encodebytes(auth).decode("utf-8") auth = "".join(auth.split()) # get rid of whitespace extra_headers = [ ("Authorization", "Basic " + auth) ] else: extra_headers = [] return host, extra_headers, x509 ## # Connect to server. # # @param host Target host. # @return An HTTPConnection object
def _base_set(self, mode, key, value, timeout=None): timeout = timeout or self.default_timeout # Only UTC here for Mongo auto-purge now = datetime.utcnow() expires = now + timedelta(seconds=timeout) # pickled = pickle.dumps(value, pickle.HIGHEST_PROTOCOL) encoded = base64.encodebytes(pickled).strip() if mode == 'set': # Set sets new data. And there is no matter, that key exists self._coll.update({'key':key} ,{'key':key, 'data': encoded, 'expires': expires}, upsert = True) elif mode == 'add': if self._coll.find_one({'key': key}): return False self._coll.insert({'key': key, 'data': encoded, 'expires': expires}) return True
def _tunnel(sock, host, port, auth): debug("Connecting proxy...") connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port) # TODO: support digest auth. if auth and auth[0]: auth_str = auth[0] if auth[1]: auth_str += ":" + auth[1] encoded_str = base64encode(auth_str.encode()).strip().decode() connect_header += "Proxy-Authorization: Basic %s\r\n" % encoded_str connect_header += "\r\n" dump("request header", connect_header) send(sock, connect_header) try: status, resp_headers = read_headers(sock) except Exception as e: raise WebSocketProxyException(str(e)) if status != 200: raise WebSocketProxyException( "failed CONNECT via proxy status: %r" + status) return sock
def write(self, fd, bytes): """ Write a block of bytes to an open file descriptor (that is open with one of the writing modes :param fd: file descriptor :param bytes: bytes block to write :return: :note: don't overkill the node with large byte chunks, also for large file upload check the upload method. """ args = { 'fd': fd, 'block': base64.encodebytes(bytes).decode(), } return self._client.json('filesystem.write', args)
def base64_encode(input, errors='strict'): assert errors == 'strict' return (base64.encodebytes(input), len(input))
def encode(self, input, final=False): assert self.errors == 'strict' return base64.encodebytes(input)
def encrypt(self, source_obj): encrypter = self._cipher.encryptor() source = json.dumps(source_obj) source = source.encode(self._encoding) need_to_padding = 16 - (len(source) % 16) if need_to_padding > 0: source = source + b'\x00' * need_to_padding enc_data = encrypter.update(source) + encrypter.finalize() b64_enc_data = base64.encodebytes(enc_data) return urllib.parse.quote(b64_enc_data)
def download_image_base64(self, url): response = requests.get(url) return base64.encodebytes(response.content)
def represent_binary(self, data): if hasattr(base64, 'encodebytes'): data = base64.encodebytes(data).decode('ascii') else: data = base64.encodestring(data).decode('ascii') return self.represent_scalar('tag:yaml.org,2002:binary', data, style='|')
def base64_encodebytes(data): if hasattr(base64, "encodebytes"): return base64.encodebytes(data) return base64.encodestring(data)
def test_save_file(self): self.contents_manager.save({ "type": "file", "content": "blah-blah-blah", "format": "text" }, self.path("test.txt")) bucket = self.bucket blob = bucket.blob("test.txt") self.assertTrue(blob.exists()) try: self.assertEqual(blob.download_as_string(), b"blah-blah-blah") finally: blob.delete() obj = {"one": 1, "two": [2, 3]} self.contents_manager.save({ "type": "file", "content": base64.encodebytes(pickle.dumps(obj)).decode("ascii"), "format": "base64" }, self.path("test.pickle")) bucket = self.bucket blob = bucket.blob("test.pickle") self.assertTrue(blob.exists()) try: self.assertEqual(blob.download_as_string(), pickle.dumps(obj)) finally: blob.delete()
def sign_sha1(body, pri_key): ''' sign body with pri_key by hash SHA1, return base64 bytes body: binary pur_key: binary ''' rsa_pri = RSA.importKey(pri_key) pk = PKCS1_v1_5.new(rsa_pri) h = SHA.new(body) cbs = pk.sign(h) return base64.encodebytes(cbs)
def encode_image(cls, bpy_image): import tempfile import base64 with tempfile.TemporaryDirectory() as directory: filename = directory + "/i.png" bpy_image.filepath_raw = filename bpy_image.save() return base64.encodebytes(open(filename, "rb").read()).decode('ascii')
def _validate(headers, key, subprotocols): subproto = None for k, v in _HEADERS_TO_CHECK.items(): r = headers.get(k, None) if not r: return False, None r = r.lower() if v != r: return False, None if subprotocols: subproto = headers.get("sec-websocket-protocol", None).lower() if not subproto or subproto not in [s.lower() for s in subprotocols]: error("Invalid subprotocol: " + str(subprotocols)) return False, None result = headers.get("sec-websocket-accept", None) if not result: return False, None result = result.lower() if isinstance(result, six.text_type): result = result.encode('utf-8') value = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode('utf-8') hashed = base64encode(hashlib.sha1(value).digest()).strip().lower() success = compare_digest(hashed, result) if success: return True, subproto else: return False, None
def _create_sec_websocket_key(): randomness = os.urandom(16) return base64encode(randomness).decode('utf-8').strip()
def base64_encodestring(s): if six.PY2: return base64.encodestring(s) else: if isinstance(s, str): s = s.encode('utf8') return base64.encodebytes(s).decode('utf8')
def generate_headers(self): return {b"Authorization": b"Basic " + encodebytes( ("%s:%s" %(self.username, self.password)) .encode('utf8')).strip(b'\n') }