我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用string.encode()。
def send_content(self, connection, request_body): connection.putheader("Content-Type", "text/xml") #optionally encode the request if (self.encode_threshold is not None and self.encode_threshold < len(request_body) and gzip): connection.putheader("Content-Encoding", "gzip") request_body = gzip_encode(request_body) connection.putheader("Content-Length", str(len(request_body))) connection.endheaders(request_body) ## # Parse response. # # @param file Stream. # @return Response tuple and target method.
def serve_amfrpc(self, version=0): try: import pyamf import pyamf.remoting.gateway except: return "pyamf not installed or not in Python sys.path" request = current.request response = current.response if version == 3: services = self.amfrpc3_procedures base_gateway = pyamf.remoting.gateway.BaseGateway(services) pyamf_request = pyamf.remoting.decode(request.body) else: services = self.amfrpc_procedures base_gateway = pyamf.remoting.gateway.BaseGateway(services) context = pyamf.get_context(pyamf.AMF0) pyamf_request = pyamf.remoting.decode(request.body, context) pyamf_response = pyamf.remoting.Envelope(pyamf_request.amfVersion) for name, message in pyamf_request: pyamf_response[name] = base_gateway.getProcessor(message)(message) response.headers['Content-Type'] = pyamf.remoting.CONTENT_TYPE if version == 3: return pyamf.remoting.encode(pyamf_response).getvalue() else: return pyamf.remoting.encode(pyamf_response, context).getvalue()
def sign(self): logger.debug('Enter sign...') logger.debug('self.ret:%s' % (self.ret)) #string = '&'.join(['%s=%s' % (key.lower(), self.ret[key]) for key in sorted(self.ret)]) #string = 'appid=%s&body=%s&device_info=%s&mch_id=%s&nonce_str=%s' % (self.ret['appid'],self.ret['body'],self.ret['device_info'],self.ret['mch_id'],self.ret['nonce_str']) string = 'nima' logger.debug('string:' + str(string)) logger.debug('string:%s' % (string)) mch_key = System_Config.objects.get(name='wechat_mch_key').val logger.debug('mch_key:%s' %(mch_key)) #stringSignTemp= '%s&key=%s' % (string,mch_key) #logger.debug('stringSignTemp:%s' % (stringSignTemp)) #signature = hashlib.sha1(string.encode('utf8')).hexdigest() #signature = hashlib.md5(stringSignTemp.encode('utf8')).hexdigest().upper() #self.other_parameters['sign'] = signature #logger.debug('signature:%s' % (signature)) #return signature
def playTrack(asin): content = trackPostUnicodeGetHLSPage('https://music.amazon.de/dmls/', asin) temp_file_path = addonUserDataFolder if forceDVDPlayer: temp_file_path += "/temp.mp4" else: temp_file_path += "/temp.m3u8" if xbmcvfs.exists(temp_file_path): xbmcvfs.delete(temp_file_path) m3u_temp_file = xbmcvfs.File(temp_file_path, 'w') manifest_match = re.compile('manifest":"(.+?)"',re.DOTALL).findall(content) if manifest_match: m3u_string = manifest_match[0] m3u_string = m3u_string.replace("\\n", os.linesep) m3u_temp_file.write(m3u_string.encode("ascii")) m3u_temp_file.close() play_item = xbmcgui.ListItem(path=temp_file_path) play_item = setPlayItemInfo(play_item) xbmcplugin.setResolvedUrl(pluginhandle, True, listitem=play_item)
def search(type): keyboard = xbmc.Keyboard('', translation(30015)) keyboard.doModal() if keyboard.isConfirmed() and keyboard.getText(): search_string = unicode(keyboard.getText(), "utf-8").replace(" ", "+") search_string = urllib.quote_plus(search_string.encode("utf8")) if siteVersion=="de": if type=="albums": listAlbums(urlMain+"/s?rh=n%3A5686557031%2Ck%2Cp_n_format_browse-bin%3A180848031&keywords="+search_string+"&ie=UTF8") elif type=="songs": listSearchedSongs(urlMain+"/s/ref=sr_nr_p_n_format_browse-bi_2?fst=as%3Aoff&rh=n%3A5686557031%2Ck%2Cp_n_format_browse-bin%3A180849031&bbn=5686557031&keywords="+search_string+"&ie=UTF8") # elif siteVersion=="com": # if type=="movies": # listMovies(urlMain+"/mn/search/ajax/?_encoding=UTF8&url=node%3D7613704011&field-keywords="+search_string) # elif type=="tv": # listShows(urlMain+"/mn/search/ajax/?_encoding=UTF8&url=node%3D2858778011&field-keywords="+search_string) # elif siteVersion=="co.uk": # if type=="movies": # listMovies(urlMain+"/mn/search/ajax/?_encoding=UTF8&url=node%3D3356010031&field-keywords="+search_string) # elif type=="tv": # listShows(urlMain+"/mn/search/ajax/?_encoding=UTF8&url=node%3D3356011031&field-keywords="+search_string)
def __unquote_to_bytes(string): string = string.encode('utf-8') bits = string.split(b'%') if len(bits) == 1: return string res = [bits[0]] append = res.append # Delay the initialization of the table to not waste memory # if the function is never called global _hextobyte if _hextobyte is None: _hextobyte = dict((a+b, chr(int(a+b,16))) for a in _hexdig for b in _hexdig) for item in bits[1:]: try: append(_hextobyte[item[:2]]) append(item[2:]) except KeyError: append(b'%') append(item) return b''.join(res)
def deunicodise(string, encoding = None, errors = "replace"): """ Convert unicode 'string' to <type str>, by default replacing all invalid characters with '?' or raise an exception. """ if not encoding: encoding = Config.Config().encoding if type(string) != unicode: return str(string) debug("DeUnicodising %r using %s" % (string, encoding)) try: return string.encode(encoding, errors) except UnicodeEncodeError: raise UnicodeEncodeError("Conversion from unicode failed: %r" % string)
def _stringify(string): # convert to 7-bit ascii if possible try: return string.encode("ascii") except UnicodeError: return string
def encode(self, out): out.write("<value><boolean>%d</boolean></value>\n" % self.value)
def encode(self, out): out.write("<value><dateTime.iso8601>") out.write(self.value) out.write("</dateTime.iso8601></value>\n")
def encode(self, out): out.write("<value><base64>\n") base64.encode(StringIO.StringIO(self.data), out) out.write("</base64></value>\n")
def dump_unicode(self, value, write, escape=escape): value = value.encode(self.encoding) write("<value><string>") write(escape(value)) write("</string></value>\n")
def dump_instance(self, value, write): # check for special wrappers if value.__class__ in WRAPPERS: self.write = write value.encode(self) del self.write else: # store instance attributes as a struct (really?) self.dump_struct(value.__dict__, write)
def __init__(self, uri, transport=None, encoding=None, verbose=0, allow_none=0, use_datetime=0, context=None): # establish a "logical" server connection if isinstance(uri, unicode): uri = uri.encode('ISO-8859-1') # get the url import urllib type, uri = urllib.splittype(uri) if type not in ("http", "https"): raise IOError, "unsupported XML-RPC protocol" self.__host, self.__handler = urllib.splithost(uri) if not self.__handler: self.__handler = "/RPC2" if transport is None: if type == "https": transport = SafeTransport(use_datetime=use_datetime, context=context) else: transport = Transport(use_datetime=use_datetime) self.__transport = transport self.__encoding = encoding self.__verbose = verbose self.__allow_none = allow_none
def wx_signature(data): string = '&'.join(['%s=%s' % (key.lower(), data[key]) for key in sorted(data) if data[key] is not None and data[key] is not '']) return hashlib.sha1(string.encode('utf-8')).hexdigest()
def wx_sign_for_pay(params): content = '&'.join(['%s=%s' % (key, params[key]) for key in sorted(params) if params[key] is not None and params[key] is not '']) content += '&key=' + settings.WEIXIN_KEY return hashlib.md5(content.encode('utf-8')).hexdigest().upper()
def __init__( self, payload, filename=None, content_id=None, content_type=None, encoding='utf-8'): if isinstance(payload, str): if filename is None: filename = os.path.basename(payload) payload = read_file(payload, 'rb') else: if filename is None: raise Exception('Missing attachment name') payload = payload.read() filename = filename.encode(encoding) if content_type is None: content_type = contenttype(filename) self.my_filename = filename self.my_payload = payload MIMEBase.MIMEBase.__init__(self, *content_type.split('/', 1)) self.set_payload(payload) self['Content-Disposition'] = 'attachment; filename="%s"' % filename if content_id is not None: self['Content-Id'] = '<%s>' % content_id.encode(encoding) Encoders.encode_base64(self)
def jwt_b64e(string): if isinstance(string, unicode): string = string.encode('utf-8', 'strict') return base64.urlsafe_b64encode(string).strip(b'=')
def jwt_b64d(string): """base64 decodes a single bytestring (and is tolerant to getting called with a unicode string). The result is also a bytestring. """ if isinstance(string, unicode): string = string.encode('ascii', 'ignore') return base64.urlsafe_b64decode(string + '=' * (-len(string) % 4))
def generate_token(self, payload): secret = self.secret_key if self.salt: if callable(self.salt): secret = "%s$%s" % (secret, self.salt(payload)) else: secret = "%s$%s" % (secret, self.salt) if isinstance(secret, unicode): secret = secret.encode('ascii', 'ignore') b64h = self.cached_b64h b64p = self.jwt_b64e(serializers.json(payload)) jbody = b64h + '.' + b64p mauth = hmac.new(key=secret, msg=jbody, digestmod=self.digestmod) jsign = self.jwt_b64e(mauth.digest()) return jbody + '.' + jsign
def load_token(self, token): if isinstance(token, unicode): token = token.encode('utf-8', 'strict') body, sig = token.rsplit('.', 1) b64h, b64b = body.split('.', 1) if b64h != self.cached_b64h: # header not the same raise HTTP(400, u'Invalid JWT Header') secret = self.secret_key tokend = serializers.loads_json(self.jwt_b64d(b64b)) if self.salt: if callable(self.salt): secret = "%s$%s" % (secret, self.salt(tokend)) else: secret = "%s$%s" % (secret, self.salt) if isinstance(secret, unicode): secret = secret.encode('ascii', 'ignore') if not self.verify_signature(body, sig, secret): # signature verification failed raise HTTP(400, u'Token signature is invalid') if self.verify_expiration: now = time.mktime(datetime.datetime.utcnow().timetuple()) if tokend['exp'] + self.leeway < now: raise HTTP(400, u'Token is expired') if callable(self.before_authorization): self.before_authorization(tokend) return tokend
def getHash(self, string): '''return a unique and stable hash value from hashing the website url''' m = hashlib.sha1(string.encode('utf-8')) return m.hexdigest()
def getHash(url): '''return a unique and stable hash value from hashing the website url''' m = hashlib.sha1(url.encode('utf-8')) return m.hexdigest()
def sign(self): string = '&'.join(['%s=%s' % (key.lower(), self.ret[key]) for key in sorted(self.ret)]) self.ret['signature'] = hashlib.sha1(string.encode('utf-8')).hexdigest() return self.ret