我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用base64.encodestring()。
def main (): n = int (sys.argv[2], 16) keysize = n.bit_length() / 16 with open (sys.argv[1], "rb") as f: chunk = f.read (16384) while chunk: for offset in xrange (0, len (chunk) - keysize): p = long (''.join (["%02x" % ord (chunk[x]) for x in xrange (offset + keysize - 1, offset - 1, -1)]).strip(), 16) if gmpy.is_prime (p) and p != n and n % p == 0: e = 65537 q = n / p phi = (p - 1) * (q - 1) d = gmpy.invert (e, phi) dp = d % (p - 1) dq = d % (q - 1) qinv = gmpy.invert (q, p) seq = Sequence() for x in [0, n, e, d, p, q, dp, dq, qinv]: seq.setComponentByPosition (len (seq), Integer (x)) print "\n\n-----BEGIN RSA PRIVATE KEY-----\n%s-----END RSA PRIVATE KEY-----\n\n" % base64.encodestring(encoder.encode (seq)) sys.exit (0) chunk = f.read (16384) print "private key not found :("
def _encode_auth(auth): """ A function compatible with Python 2.3-3.3 that will encode auth from a URL suitable for an HTTP header. >>> str(_encode_auth('username%3Apassword')) 'dXNlcm5hbWU6cGFzc3dvcmQ=' Long auth strings should not cause a newline to be inserted. >>> long_auth = 'username:' + 'password'*10 >>> chr(10) in str(_encode_auth(long_auth)) False """ auth_s = urllib.parse.unquote(auth) # convert to bytes auth_bytes = auth_s.encode() # use the legacy interface for Python 2.3 support encoded_bytes = base64.encodestring(auth_bytes) # convert back to a string encoded = encoded_bytes.decode() # strip the trailing carriage return return encoded.replace('\n', '')
def save_pem(contents, pem_marker): '''Saves a PEM file. @param contents: the contents to encode in PEM format @param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY' when your file has '-----BEGIN RSA PRIVATE KEY-----' and '-----END RSA PRIVATE KEY-----' markers. @return the base64-encoded content between the start and end markers. ''' (pem_start, pem_end) = _markers(pem_marker) b64 = base64.encodestring(contents).replace(b('\n'), b('')) pem_lines = [pem_start] for block_start in range(0, len(b64), 64): block = b64[block_start:block_start + 64] pem_lines.append(block) pem_lines.append(pem_end) pem_lines.append(b('')) return b('\n').join(pem_lines)
def _encode_auth(auth): """ A function compatible with Python 2.3-3.3 that will encode auth from a URL suitable for an HTTP header. >>> str(_encode_auth('username%3Apassword')) 'dXNlcm5hbWU6cGFzc3dvcmQ=' Long auth strings should not cause a newline to be inserted. >>> long_auth = 'username:' + 'password'*10 >>> chr(10) in str(_encode_auth(long_auth)) False """ auth_s = unquote(auth) # convert to bytes auth_bytes = auth_s.encode() # use the legacy interface for Python 2.3 support encoded_bytes = base64.encodestring(auth_bytes) # convert back to a string encoded = encoded_bytes.decode() # strip the trailing carriage return return encoded.replace('\n','')
def get_proxy(self): if not self._ptype: proxy=socket.socket(socket.AF_INET,socket.SOCK_STREAM) proxy.connect((self._phost,self._pport)) proxy_authorization='' if self._puser: proxy_authorization='Proxy-authorization: Basic '+\ base64.encodestring(self._puser+':'+self._ppass).strip()+'\r\n' proxy_connect='CONNECT %s:%sHTTP/1.0\r\n'%(self.host,self._port) user_agent='User-Agent: pytunnel\r\n' proxy_pieces=proxy_connect+proxy_authorization+user_agent+'\r\n' proxy.sendall(proxy_pieces+'\r\n') response=recv_all(proxy,timeout=0.5) status=response.split()[1] if int(status)/100 !=2: print 'error',response raise status return proxy
def __init__(self, server, port, ssl, auth, timeout, base_uri, success_codes, name, combined_cert): self.server = server self.port = port self.ssl = ssl self.base_uri = base_uri self.timeout = timeout self.name = name self.success_codes = success_codes self.auth = None self.failed = False self.capabilities = [] # cache connection here to avoid a SSL handshake for every connection # self.currentconn = None if auth: self.auth = 'Basic ' + base64.encodestring(auth).strip() self.combined_cert = combined_cert
def _encode_auth(auth): """ A function compatible with Python 2.3-3.3 that will encode auth from a URL suitable for an HTTP header. >>> str(_encode_auth('username%3Apassword')) 'dXNlcm5hbWU6cGFzc3dvcmQ=' Long auth strings should not cause a newline to be inserted. >>> long_auth = 'username:' + 'password'*10 >>> chr(10) in str(_encode_auth(long_auth)) False """ auth_s = urllib.parse.unquote(auth) # convert to bytes auth_bytes = auth_s.encode() # use the legacy interface for Python 2.3 support encoded_bytes = base64.encodestring(auth_bytes) # convert back to a string encoded = encoded_bytes.decode() # strip the trailing carriage return return encoded.replace('\n','')
def do_AUTH(self, args=None): if not getattr(self.factory, 'challengers', None): self.failResponse("AUTH extension unsupported") return if args is None: self.successResponse("Supported authentication methods:") for a in self.factory.challengers: self.sendLine(a.upper()) self.sendLine(".") return auth = self.factory.challengers.get(args.strip().upper()) if not self.portal or not auth: self.failResponse("Unsupported SASL selected") return self._auth = auth() chal = self._auth.getChallenge() self.sendLine('+ ' + base64.encodestring(chal).rstrip('\n')) self.state = 'AUTH'
def testValidLogin(self): p = pop3.POP3() p.factory = TestServerFactory() p.factory.challengers = {'CRAM-MD5': cred.credentials.CramMD5Credentials} p.portal = cred.portal.Portal(TestRealm()) ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse() ch.addUser('testuser', 'testpassword') p.portal.registerChecker(ch) s = StringIO.StringIO() p.transport = internet.protocol.FileWrapper(s) p.connectionMade() p.lineReceived("CAPA") self.failUnless(s.getvalue().find("SASL CRAM-MD5") >= 0) p.lineReceived("AUTH CRAM-MD5") chal = s.getvalue().splitlines()[-1][2:] chal = base64.decodestring(chal) response = hmac.HMAC('testpassword', chal).hexdigest() p.lineReceived(base64.encodestring('testuser ' + response).rstrip('\n')) self.failUnless(p.mbox) self.failUnless(s.getvalue().splitlines()[-1].find("+OK") >= 0) p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
def getPage(url, contextFactory=None, *args, **kwargs): scheme, host, port, path, username, password = _parse(url) if username and password: url = scheme + '://' + host + ':' + str(port) + path basicAuth = encodestring("%s:%s" % (username, password)) authHeader = "Basic " + basicAuth.strip() AuthHeaders = {"Authorization": authHeader} if kwargs.has_key("headers"): kwargs["headers"].update(AuthHeaders) else: kwargs["headers"] = AuthHeaders factory = HTTPClientFactory(url, *args, **kwargs) reactor.connectTCP(host, port, factory) return factory.deferred
def generate_rsa_signature(http_request, consumer_key, rsa_key, timestamp, nonce, version, next='oob', token=None, token_secret=None, verifier=None): import base64 try: from tlslite.utils import keyfactory except ImportError: from gdata.tlslite.utils import keyfactory base_string = build_oauth_base_string( http_request, consumer_key, nonce, RSA_SHA1, timestamp, version, next, token, verifier=verifier) private_key = keyfactory.parsePrivateKey(rsa_key) # Sign using the key signed = private_key.hashAndSign(base_string) # Python2.3 does not have base64.b64encode. if hasattr(base64, 'b64encode'): return base64.b64encode(signed) else: return base64.encodestring(signed).replace('\n', '')
def UseBasicAuth(service, username, password, for_proxy=False): """Sets an Authenticaiton: Basic HTTP header containing plaintext. Deprecated, use AtomService.use_basic_auth insread. The username and password are base64 encoded and added to an HTTP header which will be included in each request. Note that your username and password are sent in plaintext. The auth header is added to the additional_headers dictionary in the service object. Args: service: atom.AtomService or a subclass which has an additional_headers dict as a member. username: str password: str """ deprecation('calling deprecated function UseBasicAuth') base_64_string = base64.encodestring('%s:%s' % (username, password)) base_64_string = base_64_string.strip() if for_proxy: header_name = 'Proxy-Authorization' else: header_name = 'Authorization' service.additional_headers[header_name] = 'Basic %s' % (base_64_string,)
def _upload(self, filename): fp = IOStream() zipped = zipfile.ZipFile(fp, 'w', zipfile.ZIP_DEFLATED) zipped.write(filename, os.path.split(filename)[1]) zipped.close() content = base64.encodestring(fp.getvalue()) if not isinstance(content, str): content = content.decode('utf-8') try: return self._execute(Command.UPLOAD_FILE, {'file': content})['value'] except WebDriverException as e: if "Unrecognized command: POST" in e.__str__(): return filename elif "Command not found: POST " in e.__str__(): return filename elif '{"status":405,"value":["GET","HEAD","DELETE"]}' in e.__str__(): return filename else: raise e
def process_record(self, record, dbm): if self.augment_record and dbm: record = self.get_record(record, dbm) if self.unit_system is not None: record = weewx.units.to_std_system(record, self.unit_system) url = '%s/write?db=%s' % (self.server_url, self.database) data = self.get_data(record) if weewx.debug >= 2: logdbg('url: %s' % url) logdbg('data: %s' % data) if self.skip_upload: raise AbortedPost() req = urllib2.Request(url, data) req.add_header("User-Agent", "weewx/%s" % weewx.__version__) if self.username is not None: b64s = base64.encodestring( '%s:%s' % (self.username, self.password)).replace('\n', '') req.add_header("Authorization", "Basic %s" % b64s) req.get_method = lambda: 'POST' self.post_with_retries(req)
def _build_auth_headers(self): """ Build a dictionary of the authentication required for accessing OAuth endpoints. Args: None Returns: value (dict) with the 'Authorization' key and value """ auth = base64.encodestring((self.client_id + ':' + self.client_secret).encode('latin-1')).decode('latin-1') auth = auth.replace('\n', '').replace(' ', '') auth = 'Basic {}'.format(auth) headers = { 'Authorization': auth } return headers
def _build_auth_headers(self): """Create authentication headers. Build a dictionary of the authentication required for accessing OAuth endpoints. Args: None Returns: value (dict) with the 'Authorization' key and value """ auth = base64.encodestring((self.client_id + ':' + self.client_secret).encode('latin-1')).decode('latin-1') auth = auth.replace('\n', '').replace(' ', '') auth = 'Basic {}'.format(auth) headers = { 'Authorization': auth } return headers
def hash_host(hostname, salt=None): """ Return a "hashed" form of the hostname, as used by openssh when storing hashed hostnames in the known_hosts file. @param hostname: the hostname to hash @type hostname: str @param salt: optional salt to use when hashing (must be 20 bytes long) @type salt: str @return: the hashed hostname @rtype: str """ if salt is None: salt = rng.read(SHA.digest_size) else: if salt.startswith('|1|'): salt = salt.split('|')[2] salt = base64.decodestring(salt) assert len(salt) == SHA.digest_size hmac = HMAC.HMAC(salt, hostname, SHA).digest() hostkey = '|1|%s|%s' % (base64.encodestring(salt), base64.encodestring(hmac)) return hostkey.replace('\n', '')
def commit_image_to_github(path, message, file_, name, email, sha=None, branch=u'master'): """ Save given image file content to github :param path: Path to file (<owner>/<repo>/<dir>/.../<filename>) :param message: Commit message to save file with :param file_: Open file object :param name: Name of author who wrote file :param email: Email address of author :param sha: Optional SHA of file if it already exists on github :param branch: Name of branch to commit file to (branch must already exist) :returns: SHA of commit or None for failure """ contents = base64.encodestring(file_.read()) return commit_file_to_github(path, message, contents, name, email, sha=sha, branch=branch, auto_encode=False)
def _tunnel(sock, host, port, auth): debug("Connecting proxy...") connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port) # TODO: support digest auth. if auth and auth[0]: auth_str = auth[0] if auth[1]: auth_str += ":" + auth[1] encoded_str = base64encode(auth_str.encode()).strip().decode() connect_header += "Proxy-Authorization: Basic %s\r\n" % encoded_str connect_header += "\r\n" dump("request header", connect_header) send(sock, connect_header) try: status, resp_headers = read_headers(sock) except Exception as e: raise WebSocketProxyException(str(e)) if status != 200: raise WebSocketProxyException( "failed CONNECT via proxy status: %r" % status) return sock
def save_certificate_to_secret(certificate, domain, secret_name, namespace): check_cert_is_valid_for_domain(domain, certificate['cert']) secret = { "kind": "Secret", "apiVersion": "v1", "metadata": { "name": secret_name, }, "data": { "tls.crt": base64.encodestring(certificate['cert']), "tls.key": base64.encodestring(certificate['key']), }, "type": "kubernetes.io/tls" } kq = KubeQuery() r = kq.post(['secrets'], json.dumps(secret), ns=namespace, rest=True) if r.get('code') == 409: kq.put(['secrets', secret_name], json.dumps(secret), ns=namespace, rest=True)
def request(url, user, passwd, data=None, method=None): if data: data = json.dumps(data) # NOTE: fetch_url uses a password manager, which follows the # standard request-then-challenge basic-auth semantics. However as # JIRA allows some unauthorised operations it doesn't necessarily # send the challenge, so the request occurs as the anonymous user, # resulting in unexpected results. To work around this we manually # inject the basic-auth header up-front to ensure that JIRA treats # the requests as authorized for this user. auth = base64.encodestring('%s:%s' % (user, passwd)).replace('\n', '') response, info = fetch_url(module, url, data=data, method=method, headers={'Content-Type':'application/json', 'Authorization':"Basic %s" % auth}) if info['status'] not in (200, 201, 204): module.fail_json(msg=info['msg']) body = response.read() if body: return json.loads(body) else: return {}
def http_request(self, api_endpoint, data_json={}): request_url = self._nsc_protocol + '://' + self._nsc_host + self._nitro_base_url + api_endpoint data_json = urllib.urlencode(data_json) if not len(data_json): data_json = None auth = base64.encodestring('%s:%s' % (self._nsc_user, self._nsc_pass)).replace('\n', '').strip() headers = { 'Authorization': 'Basic %s' % auth, 'Content-Type' : 'application/x-www-form-urlencoded', } response, info = fetch_url(self.module, request_url, data=data_json, headers=headers) return json.load(response)
def _create(module, hookurl, oauthkey, repo, user, content_type): url = "%s/hooks" % repo values = { "active": True, "name": "web", "config": { "url": "%s" % hookurl, "content_type": "%s" % content_type } } data = json.dumps(values) auth = base64.encodestring('%s:%s' % (user, oauthkey)).replace('\n', '') headers = { 'Authorization': 'Basic %s' % auth, } response, info = fetch_url(module, url, data=data, headers=headers) if info['status'] != 200: return 0, '[]' else: return 0, response.read()