我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用base64.decodestring()。
def test_tile_symmetry(self): ''' Make sure that tiles are symmetric ''' upload_file = open('data/Dixon2012-J1-NcoI-R1-filtered.100kb.multires.cool', 'rb') tileset = tm.Tileset.objects.create( datafile=dcfu.SimpleUploadedFile(upload_file.name, upload_file.read()), filetype='cooler', datatype='matrix', owner=self.user1, uuid='aa') ret = self.client.get('/api/v1/tiles/?d=aa.0.0.0') contents = json.loads(ret.content.decode('utf-8')) import base64 r = base64.decodestring(contents['aa.0.0.0']['dense'].encode('utf-8')) q = np.frombuffer(r, dtype=np.float16) q = q.reshape((256,256))
def base64_decode(input,errors='strict'): """ Decodes the object input and returns a tuple (output object, length consumed). input must be an object which provides the bf_getreadbuf buffer slot. Python strings, buffer objects and memory mapped files are examples of objects providing this slot. errors defines the error handling to apply. It defaults to 'strict' handling which is the only currently supported error handling for this codec. """ assert errors == 'strict' output = base64.decodestring(input) return (output, len(input))
def asciiDecompress(code): """ decompress result of asciiCompress """ code = base64.decodestring(code) csum = zlib.crc32(code) data = zlib.decompress(code) return data, csum
def parse_request(self): if not BaseHTTPServer.BaseHTTPRequestHandler.parse_request(self): return False authorization = self.headers.get('Authorization', '') if authorization: scheme, credentials = authorization.split() if scheme != 'Basic': self.send_error(501) return False credentials = base64.decodestring(credentials) user, password = credentials.split(':', 2) if not self.get_userinfo(user, password, self.command): self.send_autherror(401, "Authorization Required") return False else: if not self.get_userinfo(None, None, self.command): self.send_autherror(401, "Authorization Required") return False return True
def parse_request(self): res = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.parse_request(self) if not res: return res database_name = self.path[1:] if not database_name: self.tryton = {'user': None, 'session': None} return res try: method, up64 = self.headers['Authorization'].split(None, 1) if method.strip().lower() == 'basic': user, password = base64.decodestring(up64).split(':', 1) user_id, session = security.login(database_name, user, password) self.tryton = {'user': user_id, 'session': session} return res except Exception: pass self.send_error(401, 'Unauthorized') self.send_header("WWW-Authenticate", 'Basic realm="Tryton"') return False
def parse_basic_auth(src_ip_port, dst_ip_port, headers, authorization_header): ''' Parse basic authentication over HTTP ''' if authorization_header: # authorization_header sometimes is triggered by failed ftp try: header_val = headers[authorization_header.group()] except KeyError: return b64_auth_re = re.match('basic (.+)', header_val, re.IGNORECASE) if b64_auth_re != None: basic_auth_b64 = b64_auth_re.group(1) basic_auth_creds = base64.decodestring(basic_auth_b64) msg = 'Basic Authentication: %s' % basic_auth_creds printer(src_ip_port, dst_ip_port, msg)
def parse_netntlm_resp_msg(headers, resp_header, seq): ''' Parse the client response to the challenge ''' try: header_val3 = headers[resp_header] except KeyError: return header_val3 = header_val3.split(' ', 1) # The header value can either start with NTLM or Negotiate if header_val3[0] == 'NTLM' or header_val3[0] == 'Negotiate': try: msg3 = base64.decodestring(header_val3[1]) except binascii.Error: return return parse_ntlm_resp(msg3, seq)
def tocRVOUS_PROPOSE(self,data): """ Handle a message that looks like:: RVOUS_PROPOSE:<user>:<uuid>:<cookie>:<seq>:<rip>:<pip>:<vip>:<port> [:tlv tag1:tlv value1[:tlv tag2:tlv value2[:...]]] """ user,uid,cookie,seq,rip,pip,vip,port=data[:8] cookie=base64.decodestring(cookie) port=int(port) tlvs={} for i in range(8,len(data),2): key=data[i] value=base64.decodestring(data[i+1]) tlvs[key]=value name=UUIDS[uid] try: func=getattr(self,"toc%s"%name) except: self._debug("no function for UID %s" % uid) return func(user,cookie,seq,pip,vip,port,tlvs)
def _authorize(self): # Authorization, (mostly) per the RFC try: authh = self.getHeader("Authorization") if not authh: self.user = self.password = '' return bas, upw = authh.split() if bas.lower() != "basic": raise ValueError upw = base64.decodestring(upw) self.user, self.password = upw.split(':', 1) except (binascii.Error, ValueError): self.user = self.password = "" except: log.err() self.user = self.password = ""
def state_AUTH(self, line): self.state = "COMMAND" try: parts = base64.decodestring(line).split(None, 1) except binascii.Error: self.failResponse("Invalid BASE64 encoding") else: if len(parts) != 2: self.failResponse("Invalid AUTH response") return self._auth.username = parts[0] self._auth.response = parts[1] d = self.portal.login(self._auth, None, IMailbox) d.addCallback(self._cbMailbox, parts[0]) d.addErrback(self._ebMailbox) d.addErrback(self._ebUnexpected)
def _authResponse(self, auth, challenge): self._failresponse = self.esmtpAUTHDeclined try: challenge = base64.decodestring(challenge) except binascii.Error, e: # Illegal challenge, give up, then quit self.sendLine('*') self._okresponse = self.esmtpAUTHMalformedChallenge self._failresponse = self.esmtpAUTHMalformedChallenge else: resp = auth.challengeResponse(self.secret, challenge) self._expected = [235] self._okresponse = self.smtpState_from self.sendLine(encode_base64(resp, eol="")) if auth.getName() == "LOGIN" and challenge == "Username:": self._expected = [334] self._authinfo = auth self._okresponse = self.esmtpState_challenge
def testValidLogin(self): p = pop3.POP3() p.factory = TestServerFactory() p.factory.challengers = {'CRAM-MD5': cred.credentials.CramMD5Credentials} p.portal = cred.portal.Portal(TestRealm()) ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse() ch.addUser('testuser', 'testpassword') p.portal.registerChecker(ch) s = StringIO.StringIO() p.transport = internet.protocol.FileWrapper(s) p.connectionMade() p.lineReceived("CAPA") self.failUnless(s.getvalue().find("SASL CRAM-MD5") >= 0) p.lineReceived("AUTH CRAM-MD5") chal = s.getvalue().splitlines()[-1][2:] chal = base64.decodestring(chal) response = hmac.HMAC('testpassword', chal).hexdigest() p.lineReceived(base64.encodestring('testuser ' + response).rstrip('\n')) self.failUnless(p.mbox) self.failUnless(s.getvalue().splitlines()[-1].find("+OK") >= 0) p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
def main(): tmpdir = None try: # Create a temporary working directory tmpdir = tempfile.mkdtemp() # Unpack the zipfile into the temporary directory pip_zip = os.path.join(tmpdir, "pip.zip") with open(pip_zip, "wb") as fp: fp.write(base64.decodestring(ZIPFILE)) # Add the zipfile to sys.path so that we can import it sys.path = [pip_zip] + sys.path # Run the bootstrap bootstrap(tmpdir=tmpdir) finally: # Clean up our temporary working directory if tmpdir: shutil.rmtree(tmpdir, ignore_errors=True)
def hash_host(hostname, salt=None): """ Return a "hashed" form of the hostname, as used by openssh when storing hashed hostnames in the known_hosts file. @param hostname: the hostname to hash @type hostname: str @param salt: optional salt to use when hashing (must be 20 bytes long) @type salt: str @return: the hashed hostname @rtype: str """ if salt is None: salt = rng.read(SHA.digest_size) else: if salt.startswith('|1|'): salt = salt.split('|')[2] salt = base64.decodestring(salt) assert len(salt) == SHA.digest_size hmac = HMAC.HMAC(salt, hostname, SHA).digest() hostkey = '|1|%s|%s' % (base64.encodestring(salt), base64.encodestring(hmac)) return hostkey.replace('\n', '')
def _prepare_bucket_entry_hmac(self, shard_array): """ Args: shard_array (): . """ storj_keyring = model.Keyring() encryption_key = storj_keyring.get_encryption_key('test') current_hmac = '' for shard in shard_array: base64_decoded = '%s%s' % (base64.decodestring(shard.hash), current_hmac) current_hmac = self._calculate_hmac(base64_decoded, encryption_key) self.__logger.debug('current_hmac=%s' % current_hmac) return current_hmac
def decode_param(param): name, v = param.split('=', 1) values = v.split('\n') value_results = [] for value in values: match = re.search(r'=\?((?:\w|-)+)\?(Q|B)\?(.+)\?=', value) if match: encoding, type_, code = match.groups() if type_ == 'Q': value = quopri.decodestring(code) elif type_ == 'B': value = base64.decodestring(code) value = str_encode(value, encoding) value_results.append(value) if value_results: v = ''.join(value_results) logger.debug("Decoded parameter {} - {}".format(name, v)) return name, v
def prepare(self): if self.request.method == 'OPTIONS': return auth_header = self.request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Basic '): raise exceptions.HTTPError(401, 'Unauthenticated') decoded = unquote_plus(base64.decodestring(auth_header[6:])) client_id, client_secret = decoded.split(':', 1) service = yield Service.authenticate(client_id, client_secret) if not service: raise exceptions.HTTPError(401, 'Unauthenticated') self.request.client_id = client_id self.request.client = service grant_type = self.request.body_arguments.get('grant_type', [None])[0] self.request.grant_type = grant_type
def init_channels(self, binaries=()): """Initialize the Apt channels as needed. @param binaries: A possibly empty list of 3-tuples of the form (hash, id, deb), holding the hash, the id and the content of additional Debian packages that should be loaded in the channels. """ binaries_path = self._config.binaries_path # Clean up the binaries we wrote in former runs self._clear_binaries() if binaries: hash_ids = {} for hash, id, deb in binaries: create_binary_file(os.path.join(binaries_path, "%d.deb" % id), base64.decodestring(deb)) hash_ids[hash] = id self._store.set_hash_ids(hash_ids) self._facade.add_channel_deb_dir(binaries_path) self._facade.reload_channels(force_reload_binaries=True) self._facade.ensure_channels_reloaded()
def load_invisible_font(self): font = """ eJzdlk1sG0UUx/+zs3btNEmrUKpCPxikSqRS4jpfFURUagmkEQQoiRXgAl07Y3vL2mvt2ml8APXG hQPiUEGEVDhWVHyIC1REPSAhBOWA+BCgSoULUqsKcWhVBKjhzfPU+VCi3Flrdn7vzZv33ryZ3TUE gC6chsTx8fHck1ONd98D0jnS7jn26GPjyMIleZhk9fT0wcHFl1/9GRDPkTxTqHg1dMkzJH9CbbTk xbWlJfKEdB+Np0pBswi+nH/Nvay92VtfJp4nvEztUJkUHXsdksUOkveXK/X5FNuLD838ICx4dv4N I1e8+ZqbxwCNP2jyqXoV/fmhy+WW/2SqFsb1pX68SfEpZ/TCrI3aHzcP//jitodvYmvL+6Xcr5mV vb1ScCzRnPRPfz+LsRSWNasuwRrZlh1sx0E8AriddyzEDfE6EkglFhJDJO5u9fJbFJ0etEMB78D5 4Djm/7kjT0wqhSNURyS+u/2MGJKRu+0ExNkrt1pJti9p2x6b3TBJgmUXuzgnDmI8UWMbkVxeinCw Mo311/l/v3rF7+01D+OkZYE0PrbsYAu+sSyxU0jLLtIiYzmBrFiwnCT9FcsdOOK8ZHbFleSn0znP nDCnxbnAnGT9JeYtrP+FOcV8nTlNnsoc3bBAD85adtCNRcsSffjBsoseca/lBE7Q09LiJOm/ttyB 0+IqcwfncJt5q4krO5k7jV7uY+5m7mPebuLKUea7iHvk48w72OYF5rvZT8C8k/WvMN/Dc19j3s02 bzPvZZv3me9j/ox5P9t/xdzPzPVJcc7yGnPL/1+GO1lPVTXM+VNWOTRRg0YRHgrUK5yj1kvaEA1E xAWiCtl4qJL2ADKkG6Q3XxYjzEcR0E9hCj5KtBd1xCxp6jV5mKP7LJBr1nTRK2h1TvU2w0akCmGl 5lWbBzJqMJsdyaijQaCm/FK5HqspHetoTtMsn4LO0T2mlqcwmlTVOT/28wGhCVKiNANKLiJRlxqB F603axQznIzRhDSq6EWZ4UUs+xud0VHsh1U1kMlmNwu9kTuFaRqpURU0VS3PVmZ0iE7gct0MG/8+ 2fmUvKlfRLYmisd1w8pk1LSu1XUlryM1MNTH9epTftWv+16gIh1oL9abJZyjrfF5a4qccp3oFAcz Wxxx4DpvlaKKxuytRDzeth5rW4W8qBFesvEX8RFRmLBHoB+TpCmRVCCb1gFCruzHqhhW6+qUF6tC pL26nlWN2K+W1LhRjxlVGKmRTFYVo7CiJug09E+GJb+QocMCPMWBK1wvEOfRFF2U0klK8CppqqvG pylRc2Zn+XDQWZIL8iO5KC9S+1RekOex1uOyZGR/w/Hf1lhzqVfFsxE39B/ws7Rm3N3nDrhPuMfc w3R/aE28KsfY2J+RPNp+j+KaOoCey4h+Dd48b9O5G0v2K7j0AM6s+5WQ/E0wVoK+pA6/3bup7bJf CMGjwvxTsr74/f/F95m3TH9x8o0/TU//N+7/D/ScVcA= """ ttf = cStringIO.StringIO(zlib.decompress(base64.decodestring(font))) pdfmetrics.registerFont(TTFont('invisible', ttf))