我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.unquote_plus()。
def resolve(self, url): try: b = urlparse.urlparse(url).netloc b = re.compile('([\w]+[.][\w]+)$').findall(b)[0] if not b in base64.b64decode(self.b_link): return url u, p, h = url.split('|') r = urlparse.parse_qs(h)['Referer'][0] #u += '&app_id=Exodus' c = self.request(r, output='cookie', close=False) result = self.request(u, post=p, referer=r, cookie=c) url = result.split('url=') url = [urllib.unquote_plus(i.strip()) for i in url] url = [i for i in url if i.startswith('http')] url = url[-1] return url except: return
def parse_userinfo(userinfo): """Validates the format of user information in a MongoDB URI. Reserved characters like ':', '/', '+' and '@' must be escaped following RFC 2396. Returns a 2-tuple containing the unescaped username followed by the unescaped password. :Paramaters: - `userinfo`: A string of the form <username>:<password> .. versionchanged:: 2.2 Now uses `urllib.unquote_plus` so `+` characters must be escaped. """ if '@' in userinfo or userinfo.count(':') > 1: raise InvalidURI("':' or '@' characters in a username or password " "must be escaped according to RFC 2396.") user, _, passwd = _partition(userinfo, ":") # No password is expected with GSSAPI authentication. if not user: raise InvalidURI("The empty string is not valid username.") user = unquote_plus(user) passwd = unquote_plus(passwd) return user, passwd
def _parse_options(opts, delim): """Helper method for split_options which creates the options dict. Also handles the creation of a list for the URI tag_sets/ readpreferencetags portion.""" options = {} for opt in opts.split(delim): key, val = opt.split("=") if key.lower() == 'readpreferencetags': options.setdefault('readpreferencetags', []).append(val) else: # str(option) to ensure that a unicode URI results in plain 'str' # option names. 'normalized' is then suitable to be passed as # kwargs in all Python versions. if str(key) in options: warnings.warn("Duplicate URI option %s" % (str(key),)) options[str(key)] = unquote_plus(val) # Special case for deprecated options if "wtimeout" in options: if "wtimeoutMS" in options: options.pop("wtimeout") warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'" " instead") return options
def get_smtp_server(): """ Instanciate, configure and return a SMTP or SMTP_SSL instance from smtplib. :return: A SMTP instance. The quit() method must be call when all the calls to sendmail() have been made. """ uri = parse_uri(config.get('email', 'uri')) if uri.scheme.startswith('smtps'): smtp_server = smtplib.SMTP_SSL(uri.hostname, uri.port) else: smtp_server = smtplib.SMTP(uri.hostname, uri.port) if 'tls' in uri.scheme: smtp_server.starttls() if uri.username and uri.password: smtp_server.login( urllib.unquote_plus(uri.username), urllib.unquote_plus(uri.password)) return smtp_server
def cursor(self, autocommit=False, readonly=False): conv = MySQLdb.converters.conversions.copy() conv[float] = lambda value, _: repr(value) conv[MySQLdb.constants.FIELD_TYPE.TIME] = MySQLdb.times.Time_or_None args = { 'db': self.database_name, 'sql_mode': 'traditional,postgresql', 'use_unicode': True, 'charset': 'utf8', 'conv': conv, } uri = parse_uri(config.get('database', 'uri')) assert uri.scheme == 'mysql' if uri.hostname: args['host'] = uri.hostname if uri.port: args['port'] = uri.port if uri.username: args['user'] = uri.username if uri.password: args['passwd'] = urllib.unquote_plus(uri.password) conn = MySQLdb.connect(**args) cursor = Cursor(conn, self.database_name) cursor.execute('SET time_zone = "+00:00"') return cursor
def connect(self): if self._connpool is not None: return self logger.info('connect to "%s"', self.database_name) uri = parse_uri(config.get('database', 'uri')) assert uri.scheme == 'postgresql' host = uri.hostname and "host=%s" % uri.hostname or '' port = uri.port and "port=%s" % uri.port or '' name = "dbname=%s" % self.database_name user = uri.username and "user=%s" % uri.username or '' password = ("password=%s" % urllib.unquote_plus(uri.password) if uri.password else '') minconn = config.getint('database', 'minconn', default=1) maxconn = config.getint('database', 'maxconn', default=64) dsn = '%s %s %s %s %s' % (host, port, name, user, password) self._connpool = ThreadedConnectionPool(minconn, maxconn, dsn) return self
def _split_token_parts(blob): """Extracts and unescapes fields from the provided binary string. Reverses the packing performed by _join_token_parts. Used to extract the members of a token object. Note: An empty string from the blob will be interpreted as None. Args: blob: str A string of the form 1x|member1|member2|member3 as created by _join_token_parts Returns: A list of unescaped strings. """ return [urllib.unquote_plus(part) or None for part in blob.split('|')]
def filter(self, handler): path = urlparse.urlsplit(handler.path).path if path.startswith('/'): path = urllib.unquote_plus(path.lstrip('/') or '.').decode('utf8') if os.path.isdir(path): index_file = os.path.join(path, self.index_file) if not os.path.isfile(index_file): content = self.format_index_html(path).encode('UTF-8') headers = {'Content-Type': 'text/html; charset=utf-8', 'Connection': 'close'} return 'mock', {'status': 200, 'headers': headers, 'body': content} else: path = index_file if os.path.isfile(path): content_type = 'application/octet-stream' try: import mimetypes content_type = mimetypes.types_map.get(os.path.splitext(path)[1]) if os.path.splitext(path)[1].endswith(('crt', 'pem')): content_type = 'application/x-x509-ca-cert' except StandardError as e: logging.error('import mimetypes failed: %r', e) with open(path, 'rb') as fp: content = fp.read() headers = {'Connection': 'close', 'Content-Type': content_type} return 'mock', {'status': 200, 'headers': headers, 'body': content}
def fromurl(self, url): ''' Genera un item a partir de una cadena de texto. La cadena puede ser creada por la funcion tourl() o tener el formato antiguo: plugin://plugin.video.pelisalacarta/?channel=... (+ otros parametros) Uso: item.fromurl("cadena") ''' if "?" in url: url = url.split("?")[1] try: STRItem = base64.b64decode(urllib.unquote(url)) JSONItem = json.loads(STRItem, object_hook=self.toutf8) self.__dict__.update(JSONItem) except: url = urllib.unquote_plus(url) dct = dict([[param.split("=")[0], param.split("=")[1]] for param in url.split("&") if "=" in param]) self.__dict__.update(dct) self.__dict__ = self.toutf8(self.__dict__) return self
def prepare(self): if self.request.method == 'OPTIONS': return auth_header = self.request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Basic '): raise exceptions.HTTPError(401, 'Unauthenticated') decoded = unquote_plus(base64.decodestring(auth_header[6:])) client_id, client_secret = decoded.split(':', 1) service = yield Service.authenticate(client_id, client_secret) if not service: raise exceptions.HTTPError(401, 'Unauthenticated') self.request.client_id = client_id self.request.client = service grant_type = self.request.body_arguments.get('grant_type', [None])[0] self.request.grant_type = grant_type
def process_message_event(message, resource, token, config): logging.debug('Processing message event') try: if str(message['_embedded']['message']['direction']) == 'Incoming': message_uri = message['_embedded']['message']['_links']['plainMessage']['href'] logging.debug("Received raw message - %s" % message_uri) inbound_message = urllib.unquote_plus(DataURI(message_uri).data) logging.info("Received message - %s" % inbound_message) thread_uri = message['_embedded']['message']['_links']['messaging']['href'] if MESSAGE_CALLBACK is not None: MESSAGE_CALLBACK(inbound_message, thread_uri, resource) # send_message(resource + thread_uri + '/messages', 'I found 4 matching incidents https://it12321.servicenow.com/search?query={0}'.format(inbound_message), token, config['redirect_uri']) except KeyError: logging.debug('not an inbound message') pass
def resolve(self, url): try: b = urlparse.urlparse(url).netloc b = re.compile('([\w]+[.][\w]+)$').findall(b)[0] if not b in base64.b64decode(self.b_link): return url u, p, h = url.split('|') r = urlparse.parse_qs(h)['Referer'][0] c = self.request(r, output='cookie', close=False) result = self.request(u, post=p, referer=r, cookie=c) url = result.split('url=') url = [urllib.unquote_plus(i.strip()) for i in url] url = [i for i in url if i.startswith('http')] url = url[-1] return url except: return
def lambda_handler(event, context): bucket = event['Records'][0]['s3']['bucket']['name'] url = event['Records'][0]['s3']['object']['key'].encode('utf8') key = urllib.unquote_plus(url) s3_path = os.path.dirname(key) try: s3.download_file(bucket, key, '/tmp/target.zip') zfile = zipfile.ZipFile('/tmp/target.zip') namelist = zfile.namelist() for filename in namelist: data = zfile.read(filename) localpath = '/tmp/{}'.format(str(filename)) f = open(localpath, 'wb') f.write(data) f.close() s3.upload_file(localpath, bucket, os.path.join(s3_path, filename)) s3.delete_object(Bucket=bucket, Key=key) return "AWS Key -> {}".format(key) except Exception as e: print(e) raise e
def resolve(self, url): try: b = urlparse.urlparse(url).netloc b = re.compile('([\w]+[.][\w]+)$').findall(b)[0] if not b in base64.b64decode(self.b_link): return url u, p, h = url.split('|') r = urlparse.parse_qs(h)['Referer'][0] #u += '&app_id=Exodus' c = self.request(r, output='cookie', close=False) result = self.request(u, post=p, referer=r, cookie=c) url = re.compile('url=(.*)').findall(result)[0] url = urllib.unquote_plus(url) return url except: return
def test_unquoting(self): # Make sure unquoting of all ASCII values works escape_list = [] for num in range(128): given = hexescape(chr(num)) expect = chr(num) result = urllib.unquote(given) self.assertEqual(expect, result, "using unquote(): %s != %s" % (expect, result)) result = urllib.unquote_plus(given) self.assertEqual(expect, result, "using unquote_plus(): %s != %s" % (expect, result)) escape_list.append(given) escape_string = ''.join(escape_list) del escape_list result = urllib.unquote(escape_string) self.assertEqual(result.count('%'), 1, "using quote(): not all characters escaped; %s" % result) result = urllib.unquote(escape_string) self.assertEqual(result.count('%'), 1, "using unquote(): not all characters escaped: " "%s" % result)
def process(self, pyfile): name = re.search(self.NAME_PATTERN, pyfile.url).group(1) pyfile.name = urllib.unquote_plus(name) session = re.search(self.SESSION_PATTERN, pyfile.url).group(1) url = "http://flyfiles.net" #: Get download URL parsed_url = self.load(url, post={'getDownLink': session}) self.log_debug("Parsed URL: %s" % parsed_url) if parsed_url == "#downlink|" or parsed_url == "#downlink|#": self.log_warning( _("Could not get the download URL. Please wait 10 minutes")) self.wait(10 * 60, True) self.retry() self.link = parsed_url.replace('#downlink|', '')
def print_var_node(xml_node, stream): name = xml_node.getAttribute('name') value = xml_node.getAttribute('value') val_type = xml_node.getAttribute('type') found_as = xml_node.getAttribute('found_as') stream.write('Name: ') stream.write(unquote_plus(name)) stream.write(', Value: ') stream.write(unquote_plus(value)) stream.write(', Type: ') stream.write(unquote_plus(val_type)) if found_as: stream.write(', Found as: %s' % (unquote_plus(found_as),)) stream.write('\n') #=================================================================================================== # print_referrers #===================================================================================================
def GetImports(module_name): try: processor = pycompletionserver.Processor() data = urllib.unquote_plus(module_name) def_file, completions = _pydev_imports_tipper.GenerateTip(data) return processor.formatCompletionMessage(def_file, completions) except: s = StringIO.StringIO() exc_info = sys.exc_info() traceback.print_exception(exc_info[0], exc_info[1], exc_info[2], limit=None, file=s) err = s.getvalue() pycompletionserver.dbg('Received error: ' + str(err), pycompletionserver.ERROR) raise #======================================================================================================================= # main #=======================================================================================================================
def get_video_url(page_url, user="", password="", video_password=""): video_urls = [] urls = [] response = httptools.downloadpage(page_url, cookies=False, headers={"Referer": page_url}) cookies = "" cookie = response.headers["set-cookie"].split("HttpOnly, ") for c in cookie: cookies += c.split(";", 1)[0] + "; " data = response.data.decode('unicode-escape') data = urllib.unquote_plus(urllib.unquote_plus(data)) headers_string = "|Cookie=" + cookies url_streams = scrapertools.find_single_match(data, 'url_encoded_fmt_stream_map=(.*)') streams = scrapertools.find_multiple_matches(url_streams, 'itag=(\d+)&url=(.*?)(?:;.*?quality=.*?(?:,|&)|&quality=.*?(?:,|&))') itags = {'18':'360p', '22':'720p', '34':'360p', '35':'480p', '37':'1080p', '43':'360p', '59':'480p'} for itag, video_url in streams: if not video_url in urls: video_url += headers_string video_urls.append([itags[itag], video_url]) urls.append(video_url) video_urls.sort(key=lambda video_urls: int(video_urls[0].replace("p", ""))) return video_urls
def lambda_handler(event, context): """ Demonstrates S3 trigger that uses Rekognition APIs to detect faces, labels and index faces in S3 Object. """ # Get the object from the event bucket = event['Records'][0]['s3']['bucket']['name'] key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8')) try: rekognition_faces_response = detect_faces(bucket, key) rekognition_faces_response_json = json.dumps(rekognition_faces_response, indent=4) rekognition_faces_response_csv = transform_json_to_csv(bucket, key, rekognition_faces_response) write_s3(bucket, key, rekognition_faces_response_json, rekognition_faces_response_csv) return rekognition_faces_response except Exception as e: print("Error processing object {} from bucket {}".format(key, bucket)) print("Exception: {}. {}".format(e, sys.exc_info()[0])) raise
def lambda_handler(event, context): #print("Received event: " + json.dumps(event, indent=2)) # Get the object from the event and show its content type bucket = event['Records'][0]['s3']['bucket']['name'] print (bucket) key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8')) print (key) try: response = s3.get_object(Bucket=bucket, Key=key) print (response) print ("CONTENT TYPE: " + response['ContentType']) s3.put_object(Body=response['Body'].read(), Bucket='lambdabkt-testsave123',Key=key+'123') return response['ContentType'] except Exception as e: print(e) print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket)) raise e
def playstrm(params,url,category): '''Play para videos en ficheros strm ''' logger.info("[xbmctools.py] playstrm url="+url) title = unicode( xbmc.getInfoLabel( "ListItem.Title" ), "utf-8" ) thumbnail = urllib.unquote_plus( params.get("thumbnail") ) plot = unicode( xbmc.getInfoLabel( "ListItem.Plot" ), "utf-8" ) server = params["server"] if (params.has_key("Serie")): serie = params.get("Serie") else: serie = "" if (params.has_key("subtitle")): subtitle = params.get("subtitle") else: subtitle = "" from core.item import Item from platformcode.subtitletools import saveSubtitleName item = Item(title=title,show=serie) saveSubtitleName(item) play_video("Biblioteca streamondemand",server,url,category,title,thumbnail,plot,strmfile=True,Serie=serie,subtitle=subtitle)
def _encode_query(query): """ `urlparse.parse_qsl` and `urllib.encodeurl` modify blank query values so we had to roll our own. """ kvps = urllib.unquote_plus(query).split("&") encoded_pairs = [] for kvp in kvps: if "=" not in kvp: encoded_pairs.append(urllib.quote_plus(kvp)) else: key, value = kvp.split("=") encoded_pairs.append("%s=%s" % ( urllib.quote_plus(key), urllib.quote_plus(value) )) return "&".join(encoded_pairs)
def get_params(): param = {} if len(sys.argv) < 3: return {} paramstring = sys.argv[2] if len(paramstring) >= 2: params = sys.argv[2] cleanedparams = params.replace('?', '') if (params[len(params) - 1] == '/'): params = params[0:len(params) - 2] xbmc.log(str(cleanedparams),xbmc.LOGDEBUG) pairsofparams = cleanedparams.split('&') xbmc.log(str(pairsofparams),xbmc.LOGDEBUG) param = {} for i in range(len(pairsofparams)): splitparams = {} splitparams = pairsofparams[i].split('=') if (len(splitparams)) == 2: try: param[splitparams[0]] = urllib.unquote_plus(splitparams[1]) except: pass return param
def do_GET(self): query = self.path.split("?",1)[-1] filepath = urllib.unquote_plus(query) self.suppress_socket_error_report = None self.send_headers(filepath) print "sending data" try: self.write_response(filepath) except socket.error, e: if isinstance(e.args, tuple): if e[0] in (errno.EPIPE, errno.ECONNRESET): print "disconnected" self.suppress_socket_error_report = True return raise
def process_images(): """Process the image No real error handling in this sample code. In case of error we'll put the message back in the queue and make it visable again. It will end up in the dead letter queue after five failed attempts. """ for message in get_messages_from_sqs(): try: message_content = json.loads(message.body) image = urllib.unquote_plus(message_content ['Records'][0]['s3']['object'] ['key']).encode('utf-8') s3.download_file(input_bucket_name, image, image) resize_image(image) upload_image(image) cleanup_files(image) except: message.change_visibility(VisibilityTimeout=0) continue else: message.delete()
def parse(self, response): """ default parse method, rule is not useful now """ # import pdb; pdb.set_trace() response = response.replace(url=HtmlParser.remove_url_parameter(response.url)) hxs = HtmlXPathSelector(response) index_level = self.determine_level(response) log.msg("Parse: index level:" + str(index_level)) if index_level in [1, 2, 3, 4]: self.save_to_file_system(index_level, response) relative_urls = self.get_follow_links(index_level, hxs) if relative_urls is not None: for url in relative_urls: log.msg('yield process, url:' + url) yield Request(url, callback=self.parse) elif index_level == 5: personProfile = HtmlParser.extract_person_profile(hxs) linkedin_id = self.get_linkedin_id(response.url) linkedin_id = UnicodeDammit(urllib.unquote_plus(linkedin_id)).markup if linkedin_id: personProfile['_id'] = linkedin_id personProfile['url'] = UnicodeDammit(response.url).markup yield personProfile
def _translate_single_text(self, text, target_language, source_lauguage): assert _is_bytes(text) def split_text(text): start = 0 text = quote_plus(text) length = len(text) while (length - start) > self._MAX_LENGTH_PER_QUERY: for seperator in self._SEPERATORS: index = text.rfind(seperator, start, start+self._MAX_LENGTH_PER_QUERY) if index != -1: break else: raise Error('input too large') end = index + len(seperator) yield unquote_plus(text[start:end]) start = end yield unquote_plus(text[start:]) def make_task(text): return lambda: self._basic_translate(text, target_language, source_lauguage)[0] results = list(self._execute(make_task(i) for i in split_text(text))) return tuple(''.join(i[n] for i in results) for n in range(len(self._writing)))
def argument_query(query_str): if query_str and query_str.startswith('?'): warn("You don't need to use a leading '?' when setting the query" " string, this may be an error!", stacklevel=3) if not query_str: query_params = {} else: try: # much faster than parse_qsl() query_params = dict(( map(unquote_plus, (to_utf8(token) + '=').split('=', 2)[:2]) for token in query_str.split('&') )) if len(query_params) == 1 and not query_params.values()[0]: query_params = {} else: query = None except Exception: ##raise # XXX DEBUG query_params = {} return query_params #------------------------------------------------------------------------------
def query(self, query): if query and query.startswith('?'): warn("You don't need to use a leading '?' when setting the query" " string, this may be an error!", stacklevel=3) if not query: query_params = {} else: try: # much faster than parse_qsl() query_params = dict(( map(unquote_plus, (to_utf8(token) + '=').split('=', 2)[:2]) for token in query.split('&') )) if len(query_params) == 1 and not query_params.values()[0]: query_params = {} else: query = None except Exception: ##raise # XXX DEBUG query_params = {} self.__query, self.__query_params = query, query_params
def netloc(self, netloc): if '@' in netloc: auth, host = netloc.split('@', 1) else: auth, host = None, netloc port = '' if host and host[0] == '[': host, port = host[1:].split(']', 1) if ':' in port: _host, port = port.split(':', 1) if not host: host = _host elif ':' in host: host, port = host.split(':', 1) if '%' in port: port = unquote(port) if port: port = int(port) if host: host = unquote_plus(host) self.auth = auth # TODO: roll back changes if it fails self.host = host self.port = port
def _initialize(self, request): self.response.headers.add_header('Access-Control-Allow-Headers', 'Content-Type') # We use _initialize instead of webapp2's initialize, so that exceptions can be caught easily self.fbl = fb_api.FBLookup(None, None) if self.request.body: logging.info("Request body: %r", self.request.body) escaped_body = urllib.unquote_plus(self.request.body.strip('=')) self.json_body = json.loads(escaped_body) logging.info("json_request: %r", self.json_body) else: self.json_body = None if self.requires_auth or self.supports_auth: if self.json_body.get('access_token'): access_token = self.json_body.get('access_token') self.fb_uid = get_user_id_for_token(access_token) self.fbl = fb_api.FBLookup(self.fb_uid, access_token) logging.info("Access token for user ID %s", self.fb_uid) elif self.requires_auth: self.add_error("Needs access_token parameter")
def getParameters(parameterString): log("", 5) commands = {} if getXBMCVersion() >= 12.0: parameterString = urllib.unquote_plus(parameterString) splitCommands = parameterString[parameterString.find('?') + 1:].split('&') for command in splitCommands: if (len(command) > 0): splitCommand = command.split('=') key = splitCommand[0] try: value = splitCommand[1].encode("utf-8") except: log("Error utf-8 encoding argument value: " + repr(splitCommand[1])) value = splitCommand[1] commands[key] = value log(repr(commands), 5) return commands
def prettify_reddit_query(subreddit_entry): #for search queries; make the reddit query string presentable if subreddit_entry.startswith('?'): #log('************ prettify_reddit_query='+subreddit_entry) tbn=subreddit_entry.split('/')[-1] tbn=urllib.unquote_plus(tbn) tbn=tbn.replace('?q=','[LIGHT]search:[/LIGHT]' ) tbn=tbn.replace('site:','' ) tbn=tbn.replace('&sort=','[LIGHT] sort by:[/LIGHT]' ) tbn=tbn.replace('&t=','[LIGHT] from:[/LIGHT]' ) tbn=tbn.replace('subreddit:','r/' ) tbn=tbn.replace('author:','[LIGHT] by:[/LIGHT]' ) tbn=tbn.replace('&restrict_sr=on','' ) tbn=tbn.replace('&restrict_sr=','' ) tbn=tbn.replace('nsfw:no','' ) tbn=tbn.replace('nsfw:yes','nsfw' ) #log('************ prettify_reddit_query='+tbn) return tbn else: return subreddit_entry