我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.unquote_plus()。
def parse_userinfo(userinfo): """Validates the format of user information in a MongoDB URI. Reserved characters like ':', '/', '+' and '@' must be escaped following RFC 2396. Returns a 2-tuple containing the unescaped username followed by the unescaped password. :Paramaters: - `userinfo`: A string of the form <username>:<password> .. versionchanged:: 2.2 Now uses `urllib.unquote_plus` so `+` characters must be escaped. """ if '@' in userinfo or userinfo.count(':') > 1: raise InvalidURI("':' or '@' characters in a username or password " "must be escaped according to RFC 2396.") user, _, passwd = _partition(userinfo, ":") # No password is expected with GSSAPI authentication. if not user: raise InvalidURI("The empty string is not valid username.") user = unquote_plus(user) passwd = unquote_plus(passwd) return user, passwd
def _parse_options(opts, delim): """Helper method for split_options which creates the options dict. Also handles the creation of a list for the URI tag_sets/ readpreferencetags portion.""" options = {} for opt in opts.split(delim): key, val = opt.split("=") if key.lower() == 'readpreferencetags': options.setdefault('readpreferencetags', []).append(val) else: # str(option) to ensure that a unicode URI results in plain 'str' # option names. 'normalized' is then suitable to be passed as # kwargs in all Python versions. if str(key) in options: warnings.warn("Duplicate URI option %s" % (str(key),)) options[str(key)] = unquote_plus(val) # Special case for deprecated options if "wtimeout" in options: if "wtimeoutMS" in options: options.pop("wtimeout") warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'" " instead") return options
def download(dest_path, url): try: file_name = url.split('/')[-1] path = os.path.realpath(os.path.join(dest_path, unquote_plus(file_name))) if not os.path.exists(path): f = urlopen(url) headers = f.headers['content-type'].split('/') md = 'w' if 'html' in headers: file_name = '{}.html'.format(uuid.uuid1()) else: md = 'wb' with open(path, md) as local_file: local_file.write(f.read()) if os.path.exists(path): return path else: logger.info("Wasn't able to find the file....!") return None except Exception as error: logger.error('download error %s', error)
def test_pack_metadata_statement(): jb = FSJWKSBundle('', None, 'fo_jwks', key_conv={'to': quote_plus, 'from': unquote_plus}) _keyjar = build_keyjar(KEYDEFS)[1] op = Operator(keyjar=_keyjar, jwks_bundle=jb, iss='https://example.com/') req = MetadataStatement(issuer='https://example.org/op') sms = op.pack_metadata_statement(req) assert sms # Should be a signed JWT _jwt = factory(sms) assert _jwt assert _jwt.jwt.headers['alg'] == 'RS256' _body = json.loads(as_unicode(_jwt.jwt.part[1])) assert _body['iss'] == op.iss assert _body['issuer'] == 'https://example.org/op' # verify signature r = _jwt.verify_compact(sms, _keyjar.get_signing_key()) assert r
def test_unpack_metadata_statement_uri(): s = signer[OA['sunet']] req = MetadataStatement(issuer='https://example.org/op') # Not intermediate ms = s.create_signed_metadata_statement(req, 'discovery', single=True) jb = FSJWKSBundle('', None, 'fo_jwks', key_conv={'to': quote_plus, 'from': unquote_plus}) mds = MetaDataStore('msd') op = Operator(jwks_bundle=jb) op.httpcli = MockHTTPClient(mds) res = op.unpack_metadata_statement(jwt_ms=ms) assert len(res.parsed_statement) == 3 loel = op.evaluate_metadata_statement(res.result) assert len(loel) == 3 assert set([l.fo for l in loel]) == {'https://swamid.sunet.se', 'https://edugain.com', 'https://www.feide.no'}
def test_unquoting(self): # Make sure unquoting of all ASCII values works escape_list = [] for num in range(128): given = hexescape(chr(num)) expect = chr(num) result = urllib_parse.unquote(given) self.assertEqual(expect, result, "using unquote(): %r != %r" % (expect, result)) result = urllib_parse.unquote_plus(given) self.assertEqual(expect, result, "using unquote_plus(): %r != %r" % (expect, result)) escape_list.append(given) escape_string = ''.join(escape_list) del escape_list result = urllib_parse.unquote(escape_string) self.assertEqual(result.count('%'), 1, "using unquote(): not all characters escaped: " "%s" % result) self.assertRaises((TypeError, AttributeError), urllib_parse.unquote, None) self.assertRaises((TypeError, AttributeError), urllib_parse.unquote, ()) with support.check_warnings(('', BytesWarning), quiet=True): self.assertRaises((TypeError, AttributeError), urllib_parse.unquote, bytes(b''))
def get_analysis_result(request): """ Retrieves the analysis report for a given commit. Do provide the `repo`, `provider` and `sha` GET parameters. """ repo = unquote_plus(request.GET.get('repo', '')) provider = request.GET.get('provider') sha = request.GET.get('sha') if not all((repo, sha, provider)): return Response( data={ 'detail': '`repo`, `provider` or `sha` GET parameter missing' }, status=HTTP_400_BAD_REQUEST) db_repo = get_object_or_404( Repository, Q(full_name=repo) | Q(identifier=repo if repo.isdigit() else 0), provider=provider) result = get_object_or_404(AnalysisResults, repo=db_repo, sha=sha) return Response(data=ResultsSerializer(result).data)
def list(self, details=False) : """ Listing cache files """ clist = os.listdir(self.cachedir) if details : nlist = [] for file in clist : filepath = os.path.join(self.cachedir, file) nlist.append('%s (%s bytes)' % (unquote_plus(file), os.path.getsize(filepath) ) ) return nlist else : return clist
def _translate_single_text(self, text, target_language, source_lauguage): assert _is_bytes(text) def split_text(text): start = 0 text = quote_plus(text) length = len(text) while (length - start) > self._MAX_LENGTH_PER_QUERY: for seperator in self._SEPERATORS: index = text.rfind(seperator, start, start+self._MAX_LENGTH_PER_QUERY) if index != -1: break else: raise Error('input too large') end = index + len(seperator) yield unquote_plus(text[start:end]) start = end yield unquote_plus(text[start:]) def make_task(text): return lambda: self._basic_translate(text, target_language, source_lauguage)[0] results = list(self._execute(make_task(i) for i in split_text(text))) return tuple(''.join(i[n] for i in results) for n in range(len(self._writing)))
def session_spider(request, spider): """ Scraped sessions from althingi """ payload = {'spider_name': spider, 'url': unquote_plus(utils.spider_url[spider])} r = requests.get('http://localhost:9080/crawl.json?', params=payload) spider_response = r.json()['items'] for item in spider_response: date_from = datetime.datetime.strptime(item['date_from'], '%d.%m.%Y').date() if item['date_to']: date_to = datetime.datetime.strptime(item['date_to'], '%d.%m.%Y').date() else: date_to = '' Session.objects.update_or_create( session_id=item['session_id'], defaults={'date_to': date_from, 'date_from': date_from} ) return Response(r.json()['items'])
def committee_spider(request, spider, session): """ Scraped committees from althingi """ payload = {'spider_name': spider, 'url': unquote_plus(utils.spider_url[spider] % session)} r = requests.get('http://localhost:9080/crawl.json?', params=payload) spider_response = r.json()['items'] session_obj = Session.objects.get(session_id=int(session)) for item in spider_response: committee_obj, created = Committee.objects.update_or_create( committee_id=item['committee_id'], defaults = { 'name': item['name'], 'short_abbr': item['short_abbr'], 'long_abbr': item['long_abbr'], } ) committee_obj.session.add(session_obj) return Response(r.json()['items'])
def committee_meeting_spider(request, spider, session): """ Scraped committee meetings from althingi """ payload = {'spider_name': spider, 'url': unquote_plus(utils.spider_url[spider] % session)} r = requests.get('http://localhost:9080/crawl.json?', params=payload) spider_response = r.json()['items'] session_obj = Session.objects.get(session_id=session) for item in spider_response: committee_obj = Committee.objects.get(committee_id=item['committee']) if (item['start_time'] and item['end_time']): CommitteeMeeting.objects.update_or_create( meeting_id=item['meeting_id'], defaults = { 'session': session_obj, 'committee': committee_obj, 'date_from': datetime.datetime.strptime(item['start_time'], '%Y-%m-%dT%H:%M:%S'), 'date_to': datetime.datetime.strptime(item['end_time'], '%Y-%m-%dT%H:%M:%S'), } ) return Response(r.json()['items'])
def issue_spider(request, spider, session): """ Scraped issues from althingi """ payload = {'spider_name': spider, 'url': unquote_plus(utils.spider_url[spider] % session)} r = requests.get('http://localhost:9080/crawl.json?', params=payload) spider_response = r.json()['items'] session_obj = Session.objects.get(session_id=session) for item in spider_response: issue_obj, created = Issue.objects.update_or_create( issue_id=item['issue_id'], session=session_obj, defaults={'name': item['name'], 'url': item['url'], 'issue_id': item['issue_id']}) return Response(r.json()['items'])
def url_unescape(value, encoding='utf-8', plus=True): """Decodes the given value from a URL. The argument may be either a byte or unicode string. If encoding is None, the result will be a byte string. Otherwise, the result is a unicode string in the specified encoding. If ``plus`` is true (the default), plus signs will be interpreted as spaces (literal plus signs must be represented as "%2B"). This is appropriate for query strings and form-encoded values but not for the path component of a URL. Note that this default is the reverse of Python's urllib module. .. versionadded:: 3.1 The ``plus`` argument """ unquote = (urllib_parse.unquote_plus if plus else urllib_parse.unquote) if encoding is None: return unquote(utf8(value)) else: return unicode_type(unquote(utf8(value)), encoding)
def _decompose_fragment(self, frag): """Parse the fragment component""" from urllib.parse import unquote_plus if isinstance(frag, (list, tuple)): return frag if not frag: return None, None frag_parts = unquote_plus(frag).split(';') if not frag_parts: file, segment = None, None elif len(frag_parts) == 1: file = frag_parts[0] segment = None elif len(frag_parts) >= 2: file = frag_parts[0] segment = frag_parts[1] return file, segment
def _clean_link(content): def replacement(match): groups = match.groups('') if len(groups) < 2: replacement = groups[0] else: replacement = groups[1] or groups[0] if replacement: if replacement == '../': replacement = '' replacement = replacement.replace('/', ' ') return unquote_plus(replacement) for pattern in _patterns['link']: content = pattern.sub(replacement, content) return content
def lambda_handler(event, context): for record in event['Records']: bucket = record['s3']['bucket']['name'] # key = record['s3']['object']['key'] key = unquote_plus(record['s3']['object']['key']) try: download_path = '/tmp/{}{}'.format(uuid.uuid4(), key) upload_path = '/tmp/out/{}'.format(key) s3_client.download_file(bucket, key, download_path) if not os.path.exists(download_path): raise Exception('ERROR: Unable to save file {} at {}'.format( key, download_path)) if download_path.endswith('.zip'): print('Processing zipfile') process_sketch_archive(zip_path=download_path, compress_zip=True, output_path=upload_path) elif download_path.endswith('.json'): process_sketch_dump(download_path, compress_zip=True, output_path=upload_path) else: raise Exception( 'ERROR: Expected zip or json file but found {}'.format( key)) if target_bucket is not None: s3_client.upload_file(upload_path, target_bucket, key) print('Uploaded {} to {}/{}'.format(upload_path, target_bucket, key)) except Exception as e: print(e) print('Error processing on object {} from bucket {}. ' 'Make sure they exist and your bucket is in the ' 'same region as this function.'.format(key, bucket)) traceback.print_exc(file=sys.stdout) raise e
def name_file(imgUrl): fileName = unquote_plus(unquote_plus(unquote_plus(basename(imgUrl)))) if os.path.exists(fileName): base,ext = os.path.splitext(fileName) print(fileName) print(base+'*'+ext) nfiles = len(glob.glob(base+'*'+ext)) fileName = base+'_'+str(nfiles)+ext return fileName
def compat_urllib_parse_unquote_plus(string, encoding='utf-8', errors='replace'): """Like unquote(), but also replace plus signs by spaces, as required for unquoting HTML form values. unquote_plus('%7e/abc+def') -> '~/abc def' """ string = string.replace('+', ' ') return compat_urllib_parse_unquote(string, encoding, errors)
def test_encode_result_is_agnostic_to_url_quoting(string_num): """Test if querystringsafe_base64.encode returns a string that does not have characters that must be quoted.""" original = test_strings[string_num] # quoting and unquoting has no impact on base64dotted: safe_encoded = querystringsafe_base64.encode(original) assert safe_encoded == quote_plus(safe_encoded) assert safe_encoded == unquote_plus(safe_encoded) # base64 has to be quoted: url_encoded = b64encode(original) assert url_encoded != quote_plus(url_encoded)
def unquote_str(value, encoding='utf-8'): # In python2, unquote() gives us a string back that has the urldecoded # bits, but not the unicode parts. We need to decode this manually. # unquote has special logic in which if it receives a unicode object it # will decode it to latin1. This is hard coded. To avoid this, we'll # encode the string with the passed in encoding before trying to # unquote it. byte_string = value.encode(encoding) return unquote_plus(byte_string).decode(encoding)
def apply_filter(self, filters, filter_value, **kwargs): """ Apply a filter submitted in the querystring to the ToasterTable filters: (str) in the format: '<filter name>:<action name>' filter_value: (str) parameters to pass to the named filter <filter name> and <action name> are used to look up the correct filter in the ToasterTable's filter map; the <action params> are set on TableFilterAction* before its filter is applied and may modify the queryset returned by the filter """ self.setup_filters(**kwargs) try: filter_name, action_name = filters.split(':') action_params = unquote_plus(filter_value) except ValueError: return if "all" in action_name: return try: table_filter = self.filter_map.get_filter(filter_name) action = table_filter.get_action(action_name) action.set_filter_params(action_params) self.queryset = action.filter(self.queryset) except KeyError: # pass it to the user - programming error here raise
def narrow_url_to_stream_topic(url): parsed_url = urlparse(url) unquoted_fragment = unquote_plus(parsed_url.fragment.replace('.', '%')) split_fragment = re.split('/', unquoted_fragment) if len(split_fragment) < 5: return None, None stream = split_fragment[2] topic = split_fragment[4] return stream, topic
def get_creds(request): flow = jsonpickle.decode(request.session['flow']) credential = flow.step2_exchange(request.GET.get('code', False)) storage = DjangoORMStorage(GoogleCredentials, 'id', request.user, 'credential') storage.put(credential) if request.GET.get('state', False): return redirect(unquote_plus(request.GET.get('state'))) else: return HttpResponse(status=200)
def make_fs_jwks_bundle(iss, fo_liss, sign_keyjar, keydefs, base_path=''): """ Given a list of Federation identifiers creates a FSJWKBundle containing all the signing keys. :param iss: The issuer ID of the entity owning the JWKSBundle :param fo_liss: List with federation identifiers as keys :param sign_keyjar: Keys that the JWKSBundel owner can use to sign an export version of the JWKS bundle. :param keydefs: What type of keys that should be created for each federation. The same for all of them. :param base_path: Where the pem versions of the keys are stored as files :return: A FSJWKSBundle instance. """ jb = FSJWKSBundle(iss, sign_keyjar, 'fo_jwks', key_conv={'to': quote_plus, 'from': unquote_plus}) jb.clear() # start from scratch # Need to save the private parts on disc jb.bundle.value_conv['to'] = keyjar_to_jwks_private for entity in fo_liss: _name = entity.replace('/', '_') try: _ = jb[entity] except KeyError: fname = os.path.join(base_path, 'keys', "{}.key".format(_name)) _keydef = copy.deepcopy(keydefs) _keydef[0]['key'] = fname _jwks, _keyjar, _kidd = build_keyjar(_keydef) jb[entity] = _keyjar return jb
def create_federation_entity(iss, jwks_dir, sup='', fo_jwks=None, ms_dir='', entity=None, sig_keys=None, sig_def_keys=None): fname = os.path.join(ms_dir, quote_plus(sup)) if fo_jwks: _keybundle = FSJWKSBundle('', fdir=fo_jwks, key_conv={'to': quote_plus, 'from': unquote_plus}) # Organisation information _kj = _keybundle[sup] signer = Signer(InternalSigningService(sup, _kj), ms_dir=fname) else: signer = Signer(ms_dir=fname) # And then the FOs public keys _public_keybundle = FSJWKSBundle('', fdir=jwks_dir, key_conv={'to': quote_plus, 'from': unquote_plus}) # The OPs own signing keys if sig_keys is None: sig_keys = build_keyjar(sig_def_keys)[1] return FederationEntity(entity, iss=iss, keyjar=sig_keys, signer=signer, fo_bundle=_public_keybundle)
def test_unquoting_parts(self): # Make sure unquoting works when have non-quoted characters # interspersed given = 'ab%sd' % hexescape('c') expect = "abcd" result = urllib_parse.unquote(given) self.assertEqual(expect, result, "using quote(): %r != %r" % (expect, result)) result = urllib_parse.unquote_plus(given) self.assertEqual(expect, result, "using unquote_plus(): %r != %r" % (expect, result))