Python urllib 模块,unquote_plus() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.unquote_plus()

项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def resolve(self, url):
        try:
            b = urlparse.urlparse(url).netloc
            b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]

            if not b in base64.b64decode(self.b_link): return url

            u, p, h = url.split('|')
            r = urlparse.parse_qs(h)['Referer'][0]
            #u += '&app_id=Exodus'

            c = self.request(r, output='cookie', close=False)
            result = self.request(u, post=p, referer=r, cookie=c)

            url = result.split('url=')
            url = [urllib.unquote_plus(i.strip()) for i in url]
            url = [i for i in url if i.startswith('http')]
            url = url[-1]

            return url
        except:
            return
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def get_smtp_server():
    """
    Instanciate, configure and return a SMTP or SMTP_SSL instance from
    smtplib.
    :return: A SMTP instance. The quit() method must be call when all
    the calls to sendmail() have been made.
    """
    uri = parse_uri(config.get('email', 'uri'))
    if uri.scheme.startswith('smtps'):
        smtp_server = smtplib.SMTP_SSL(uri.hostname, uri.port)
    else:
        smtp_server = smtplib.SMTP(uri.hostname, uri.port)

    if 'tls' in uri.scheme:
        smtp_server.starttls()

    if uri.username and uri.password:
        smtp_server.login(
            urllib.unquote_plus(uri.username),
            urllib.unquote_plus(uri.password))

    return smtp_server
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def cursor(self, autocommit=False, readonly=False):
        conv = MySQLdb.converters.conversions.copy()
        conv[float] = lambda value, _: repr(value)
        conv[MySQLdb.constants.FIELD_TYPE.TIME] = MySQLdb.times.Time_or_None
        args = {
            'db': self.database_name,
            'sql_mode': 'traditional,postgresql',
            'use_unicode': True,
            'charset': 'utf8',
            'conv': conv,
        }
        uri = parse_uri(config.get('database', 'uri'))
        assert uri.scheme == 'mysql'
        if uri.hostname:
            args['host'] = uri.hostname
        if uri.port:
            args['port'] = uri.port
        if uri.username:
            args['user'] = uri.username
        if uri.password:
            args['passwd'] = urllib.unquote_plus(uri.password)
        conn = MySQLdb.connect(**args)
        cursor = Cursor(conn, self.database_name)
        cursor.execute('SET time_zone = "+00:00"')
        return cursor
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def connect(self):
        if self._connpool is not None:
            return self
        logger.info('connect to "%s"', self.database_name)
        uri = parse_uri(config.get('database', 'uri'))
        assert uri.scheme == 'postgresql'
        host = uri.hostname and "host=%s" % uri.hostname or ''
        port = uri.port and "port=%s" % uri.port or ''
        name = "dbname=%s" % self.database_name
        user = uri.username and "user=%s" % uri.username or ''
        password = ("password=%s" % urllib.unquote_plus(uri.password)
            if uri.password else '')
        minconn = config.getint('database', 'minconn', default=1)
        maxconn = config.getint('database', 'maxconn', default=64)
        dsn = '%s %s %s %s %s' % (host, port, name, user, password)
        self._connpool = ThreadedConnectionPool(minconn, maxconn, dsn)
        return self
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _split_token_parts(blob):
  """Extracts and unescapes fields from the provided binary string.

  Reverses the packing performed by _join_token_parts. Used to extract
  the members of a token object.

  Note: An empty string from the blob will be interpreted as None.

  Args:
    blob: str A string of the form 1x|member1|member2|member3 as created
        by _join_token_parts

  Returns:
    A list of unescaped strings.
  """
  return [urllib.unquote_plus(part) or None for part in blob.split('|')]
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def filter(self, handler):
        path = urlparse.urlsplit(handler.path).path
        if path.startswith('/'):
            path = urllib.unquote_plus(path.lstrip('/') or '.').decode('utf8')
            if os.path.isdir(path):
                index_file = os.path.join(path, self.index_file)
                if not os.path.isfile(index_file):
                    content = self.format_index_html(path).encode('UTF-8')
                    headers = {'Content-Type': 'text/html; charset=utf-8', 'Connection': 'close'}
                    return 'mock', {'status': 200, 'headers': headers, 'body': content}
                else:
                    path = index_file
            if os.path.isfile(path):
                content_type = 'application/octet-stream'
                try:
                    import mimetypes
                    content_type = mimetypes.types_map.get(os.path.splitext(path)[1])
                    if os.path.splitext(path)[1].endswith(('crt', 'pem')):
                        content_type = 'application/x-x509-ca-cert'
                except StandardError as e:
                    logging.error('import mimetypes failed: %r', e)
                with open(path, 'rb') as fp:
                    content = fp.read()
                    headers = {'Connection': 'close', 'Content-Type': content_type}
                    return 'mock', {'status': 200, 'headers': headers, 'body': content}
项目:tvalacarta    作者:tvalacarta    | 项目源码 | 文件源码
def fromurl(self, url):
        '''
        Genera un item a partir de una cadena de texto. La cadena puede ser creada por la funcion tourl() o tener
        el formato antiguo: plugin://plugin.video.pelisalacarta/?channel=... (+ otros parametros)
        Uso: item.fromurl("cadena")
        '''
        if "?" in url: url = url.split("?")[1]
        try:
            STRItem = base64.b64decode(urllib.unquote(url))
            JSONItem = json.loads(STRItem, object_hook=self.toutf8)
            self.__dict__.update(JSONItem)
        except:
            url = urllib.unquote_plus(url)
            dct = dict([[param.split("=")[0], param.split("=")[1]] for param in url.split("&") if "=" in param])
            self.__dict__.update(dct)
            self.__dict__ = self.toutf8(self.__dict__)
        return self
项目:auth-srv    作者:openpermissions    | 项目源码 | 文件源码
def prepare(self):
        if self.request.method == 'OPTIONS':
            return

        auth_header = self.request.headers.get('Authorization')
        if not auth_header or not auth_header.startswith('Basic '):
            raise exceptions.HTTPError(401, 'Unauthenticated')

        decoded = unquote_plus(base64.decodestring(auth_header[6:]))
        client_id, client_secret = decoded.split(':', 1)

        service = yield Service.authenticate(client_id, client_secret)
        if not service:
            raise exceptions.HTTPError(401, 'Unauthenticated')

        self.request.client_id = client_id
        self.request.client = service

        grant_type = self.request.body_arguments.get('grant_type', [None])[0]
        self.request.grant_type = grant_type
项目:pyucwa    作者:tonybaloney    | 项目源码 | 文件源码
def process_message_event(message, resource, token, config):
    logging.debug('Processing message event')
    try:
        if str(message['_embedded']['message']['direction']) == 'Incoming':
            message_uri = message['_embedded']['message']['_links']['plainMessage']['href']
            logging.debug("Received raw message - %s" % message_uri)

            inbound_message = urllib.unquote_plus(DataURI(message_uri).data)
            logging.info("Received message - %s" % inbound_message)

            thread_uri = message['_embedded']['message']['_links']['messaging']['href']

            if MESSAGE_CALLBACK is not None:
                MESSAGE_CALLBACK(inbound_message, thread_uri, resource)
            # send_message(resource + thread_uri + '/messages', 'I found 4 matching incidents https://it12321.servicenow.com/search?query={0}'.format(inbound_message), token, config['redirect_uri'])
    except KeyError:
        logging.debug('not an inbound message')
    pass
项目:plugin.video.exodus    作者:huberyhe    | 项目源码 | 文件源码
def resolve(self, url):
        try:
            b = urlparse.urlparse(url).netloc
            b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]

            if not b in base64.b64decode(self.b_link): return url

            u, p, h = url.split('|')
            r = urlparse.parse_qs(h)['Referer'][0]

            c = self.request(r, output='cookie', close=False)
            result = self.request(u, post=p, referer=r, cookie=c)

            url = result.split('url=')
            url = [urllib.unquote_plus(i.strip()) for i in url]
            url = [i for i in url if i.startswith('http')]
            url = url[-1]

            return url
        except:
            return
项目:aws-lambda-unzip    作者:mehmetboraezer    | 项目源码 | 文件源码
def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    url = event['Records'][0]['s3']['object']['key'].encode('utf8')
    key = urllib.unquote_plus(url)
    s3_path = os.path.dirname(key)
    try:
        s3.download_file(bucket, key, '/tmp/target.zip')
        zfile = zipfile.ZipFile('/tmp/target.zip')
        namelist = zfile.namelist()
        for filename in namelist:
            data = zfile.read(filename)
            localpath = '/tmp/{}'.format(str(filename))
            f = open(localpath, 'wb')
            f.write(data)
            f.close()
            s3.upload_file(localpath, bucket, os.path.join(s3_path, filename))
        s3.delete_object(Bucket=bucket, Key=key)
        return "AWS Key -> {}".format(key)
    except Exception as e:
        print(e)
        raise e
项目:exodus-favourite-library    作者:arcane47    | 项目源码 | 文件源码
def resolve(self, url):
        try:
            b = urlparse.urlparse(url).netloc
            b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]

            if not b in base64.b64decode(self.b_link): return url

            u, p, h = url.split('|')
            r = urlparse.parse_qs(h)['Referer'][0]
            #u += '&app_id=Exodus'

            c = self.request(r, output='cookie', close=False)
            result = self.request(u, post=p, referer=r, cookie=c)

            url = re.compile('url=(.*)').findall(result)[0]
            url = urllib.unquote_plus(url)

            return url
        except:
            return
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_unquoting(self):
        # Make sure unquoting of all ASCII values works
        escape_list = []
        for num in range(128):
            given = hexescape(chr(num))
            expect = chr(num)
            result = urllib.unquote(given)
            self.assertEqual(expect, result,
                             "using unquote(): %s != %s" % (expect, result))
            result = urllib.unquote_plus(given)
            self.assertEqual(expect, result,
                             "using unquote_plus(): %s != %s" %
                             (expect, result))
            escape_list.append(given)
        escape_string = ''.join(escape_list)
        del escape_list
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using quote(): not all characters escaped; %s" %
                         result)
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using unquote(): not all characters escaped: "
                         "%s" % result)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_unquoting(self):
        # Make sure unquoting of all ASCII values works
        escape_list = []
        for num in range(128):
            given = hexescape(chr(num))
            expect = chr(num)
            result = urllib.unquote(given)
            self.assertEqual(expect, result,
                             "using unquote(): %s != %s" % (expect, result))
            result = urllib.unquote_plus(given)
            self.assertEqual(expect, result,
                             "using unquote_plus(): %s != %s" %
                             (expect, result))
            escape_list.append(given)
        escape_string = ''.join(escape_list)
        del escape_list
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using quote(): not all characters escaped; %s" %
                         result)
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using unquote(): not all characters escaped: "
                         "%s" % result)
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def process(self, pyfile):
        name = re.search(self.NAME_PATTERN, pyfile.url).group(1)
        pyfile.name = urllib.unquote_plus(name)

        session = re.search(self.SESSION_PATTERN, pyfile.url).group(1)

        url = "http://flyfiles.net"

        #: Get download URL
        parsed_url = self.load(url, post={'getDownLink': session})
        self.log_debug("Parsed URL: %s" % parsed_url)

        if parsed_url == "#downlink|" or parsed_url == "#downlink|#":
            self.log_warning(
                _("Could not get the download URL. Please wait 10 minutes"))
            self.wait(10 * 60, True)
            self.retry()

        self.link = parsed_url.replace('#downlink|', '')
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
项目:websearch    作者:abelkhan    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:specto    作者:mrknow    | 项目源码 | 文件源码
def print_var_node(xml_node, stream):
    name = xml_node.getAttribute('name')
    value = xml_node.getAttribute('value')
    val_type = xml_node.getAttribute('type')

    found_as = xml_node.getAttribute('found_as')
    stream.write('Name: ')
    stream.write(unquote_plus(name))
    stream.write(', Value: ')
    stream.write(unquote_plus(value))
    stream.write(', Type: ')
    stream.write(unquote_plus(val_type))
    if found_as:
        stream.write(', Found as: %s' % (unquote_plus(found_as),))
    stream.write('\n')

#===================================================================================================
# print_referrers
#===================================================================================================
项目:specto    作者:mrknow    | 项目源码 | 文件源码
def GetImports(module_name):
    try:
        processor = pycompletionserver.Processor()
        data = urllib.unquote_plus(module_name)
        def_file, completions = _pydev_imports_tipper.GenerateTip(data)
        return processor.formatCompletionMessage(def_file, completions)
    except:
        s = StringIO.StringIO()
        exc_info = sys.exc_info()

        traceback.print_exception(exc_info[0], exc_info[1], exc_info[2], limit=None, file=s)
        err = s.getvalue()
        pycompletionserver.dbg('Received error: ' + str(err), pycompletionserver.ERROR)
        raise


#=======================================================================================================================
# main
#=======================================================================================================================
项目:pelisalacarta-ce    作者:pelisalacarta-ce    | 项目源码 | 文件源码
def get_video_url(page_url, user="", password="", video_password=""):
    video_urls = []
    urls = []
    response = httptools.downloadpage(page_url, cookies=False, headers={"Referer": page_url})
    cookies = ""
    cookie = response.headers["set-cookie"].split("HttpOnly, ")
    for c in cookie:
        cookies += c.split(";", 1)[0] + "; "
    data = response.data.decode('unicode-escape')
    data = urllib.unquote_plus(urllib.unquote_plus(data))
    headers_string = "|Cookie=" + cookies
    url_streams = scrapertools.find_single_match(data, 'url_encoded_fmt_stream_map=(.*)')
    streams = scrapertools.find_multiple_matches(url_streams,
                                             'itag=(\d+)&url=(.*?)(?:;.*?quality=.*?(?:,|&)|&quality=.*?(?:,|&))')
    itags = {'18':'360p', '22':'720p', '34':'360p', '35':'480p', '37':'1080p', '43':'360p', '59':'480p'}
    for itag, video_url in streams:
        if not video_url in urls:
            video_url += headers_string
            video_urls.append([itags[itag], video_url])
            urls.append(video_url)
    video_urls.sort(key=lambda video_urls: int(video_urls[0].replace("p", "")))
    return video_urls
项目:curso-aws-bigdata-ai    作者:paradigmadigital    | 项目源码 | 文件源码
def lambda_handler(event, context):
    """
    Demonstrates S3 trigger that uses Rekognition APIs to detect faces, labels and index faces in S3 Object.
    """

    # Get the object from the event
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))

    try:
        rekognition_faces_response = detect_faces(bucket, key)
        rekognition_faces_response_json = json.dumps(rekognition_faces_response, indent=4)
        rekognition_faces_response_csv = transform_json_to_csv(bucket, key, rekognition_faces_response)

        write_s3(bucket, key, rekognition_faces_response_json, rekognition_faces_response_csv)

        return rekognition_faces_response

    except Exception as e:
        print("Error processing object {} from bucket {}".format(key, bucket))
        print("Exception: {}. {}".format(e, sys.exc_info()[0]))
        raise
项目:aws_lambda_backup_s3    作者:ogckw    | 项目源码 | 文件源码
def lambda_handler(event, context):
    #print("Received event: " + json.dumps(event, indent=2))

    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    print (bucket)
    key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
    print (key)
    try:
        response = s3.get_object(Bucket=bucket, Key=key)
        print (response)
        print ("CONTENT TYPE: " + response['ContentType'])
        s3.put_object(Body=response['Body'].read(), Bucket='lambdabkt-testsave123',Key=key+'123')
        return response['ContentType']
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e
项目:plugin.video.streamondemand-pureita    作者:orione7    | 项目源码 | 文件源码
def playstrm(params,url,category):
    '''Play para videos en ficheros strm
    '''
    logger.info("[xbmctools.py] playstrm url="+url)

    title = unicode( xbmc.getInfoLabel( "ListItem.Title" ), "utf-8" )
    thumbnail = urllib.unquote_plus( params.get("thumbnail") )
    plot = unicode( xbmc.getInfoLabel( "ListItem.Plot" ), "utf-8" )
    server = params["server"]
    if (params.has_key("Serie")):
        serie = params.get("Serie")
    else:
        serie = ""
    if (params.has_key("subtitle")):
        subtitle = params.get("subtitle")
    else:
        subtitle = ""
    from core.item import Item
    from platformcode.subtitletools import saveSubtitleName
    item = Item(title=title,show=serie)
    saveSubtitleName(item)
    play_video("Biblioteca streamondemand",server,url,category,title,thumbnail,plot,strmfile=True,Serie=serie,subtitle=subtitle)
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def filter(self, handler):
        path = urlparse.urlsplit(handler.path).path
        if path.startswith('/'):
            path = urllib.unquote_plus(path.lstrip('/') or '.').decode('utf8')
            if os.path.isdir(path):
                index_file = os.path.join(path, self.index_file)
                if not os.path.isfile(index_file):
                    content = self.format_index_html(path).encode('UTF-8')
                    headers = {'Content-Type': 'text/html; charset=utf-8', 'Connection': 'close'}
                    return 'mock', {'status': 200, 'headers': headers, 'body': content}
                else:
                    path = index_file
            if os.path.isfile(path):
                content_type = 'application/octet-stream'
                try:
                    import mimetypes
                    content_type = mimetypes.types_map.get(os.path.splitext(path)[1])
                    if os.path.splitext(path)[1].endswith(('crt', 'pem')):
                        content_type = 'application/x-x509-ca-cert'
                except StandardError as e:
                    logging.error('import mimetypes failed: %r', e)
                with open(path, 'rb') as fp:
                    content = fp.read()
                    headers = {'Connection': 'close', 'Content-Type': content_type}
                    return 'mock', {'status': 200, 'headers': headers, 'body': content}
项目:reddit-service-ads-tracking    作者:reddit    | 项目源码 | 文件源码
def _encode_query(query):
    """
    `urlparse.parse_qsl` and `urllib.encodeurl` modify
    blank query values so we had to roll our own.
    """
    kvps = urllib.unquote_plus(query).split("&")
    encoded_pairs = []

    for kvp in kvps:
        if "=" not in kvp:
            encoded_pairs.append(urllib.quote_plus(kvp))
        else:
            key, value = kvp.split("=")
            encoded_pairs.append("%s=%s" % (
                urllib.quote_plus(key),
                urllib.quote_plus(value)
            ))

    return "&".join(encoded_pairs)
项目:plugin.video.rtbfauvio    作者:Gaet81    | 项目源码 | 文件源码
def get_params():
    param = {}
    if len(sys.argv) < 3:
        return {}
    paramstring = sys.argv[2]
    if len(paramstring) >= 2:
        params = sys.argv[2]
        cleanedparams = params.replace('?', '')
        if (params[len(params) - 1] == '/'):
            params = params[0:len(params) - 2]
        xbmc.log(str(cleanedparams),xbmc.LOGDEBUG)
        pairsofparams = cleanedparams.split('&')
        xbmc.log(str(pairsofparams),xbmc.LOGDEBUG)
        param = {}
        for i in range(len(pairsofparams)):
            splitparams = {}
            splitparams = pairsofparams[i].split('=')
            if (len(splitparams)) == 2:
                try:
                    param[splitparams[0]] = urllib.unquote_plus(splitparams[1])
                except:
                    pass
    return param
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
项目:CodeLabs    作者:TheIoTLearningInitiative    | 项目源码 | 文件源码
def do_GET(self):

        query = self.path.split("?",1)[-1]
        filepath = urllib.unquote_plus(query)

        self.suppress_socket_error_report = None

        self.send_headers(filepath)       

        print "sending data"      
        try: 
            self.write_response(filepath)
        except socket.error, e:     
            if isinstance(e.args, tuple):
                if e[0] in (errno.EPIPE, errno.ECONNRESET):
                   print "disconnected"
                   self.suppress_socket_error_report = True
                   return

            raise
项目:ecs-refarch-batch-processing    作者:awslabs    | 项目源码 | 文件源码
def process_images():
    """Process the image

    No real error handling in this sample code. In case of error we'll put
    the message back in the queue and make it visable again. It will end up in
    the dead letter queue after five failed attempts.

    """
    for message in get_messages_from_sqs():
        try:
            message_content = json.loads(message.body)
            image = urllib.unquote_plus(message_content
                                        ['Records'][0]['s3']['object']
                                        ['key']).encode('utf-8')
            s3.download_file(input_bucket_name, image, image)
            resize_image(image)
            upload_image(image)
            cleanup_files(image)
        except:
            message.change_visibility(VisibilityTimeout=0)
            continue
        else:
            message.delete()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_unquoting(self):
        # Make sure unquoting of all ASCII values works
        escape_list = []
        for num in range(128):
            given = hexescape(chr(num))
            expect = chr(num)
            result = urllib.unquote(given)
            self.assertEqual(expect, result,
                             "using unquote(): %s != %s" % (expect, result))
            result = urllib.unquote_plus(given)
            self.assertEqual(expect, result,
                             "using unquote_plus(): %s != %s" %
                             (expect, result))
            escape_list.append(given)
        escape_string = ''.join(escape_list)
        del escape_list
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using quote(): not all characters escaped; %s" %
                         result)
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using unquote(): not all characters escaped: "
                         "%s" % result)
项目:spiders    作者:poodarchu    | 项目源码 | 文件源码
def parse(self, response):
        """
        default parse method, rule is not useful now
        """
        # import pdb; pdb.set_trace()
        response = response.replace(url=HtmlParser.remove_url_parameter(response.url))
        hxs = HtmlXPathSelector(response)
        index_level = self.determine_level(response)
        log.msg("Parse: index level:" + str(index_level))
        if index_level in [1, 2, 3, 4]:
            self.save_to_file_system(index_level, response)
            relative_urls = self.get_follow_links(index_level, hxs)
            if relative_urls is not None:
                for url in relative_urls:
                    log.msg('yield process, url:' + url)
                    yield Request(url, callback=self.parse)
        elif index_level == 5:
            personProfile = HtmlParser.extract_person_profile(hxs)
            linkedin_id = self.get_linkedin_id(response.url)
            linkedin_id = UnicodeDammit(urllib.unquote_plus(linkedin_id)).markup
            if linkedin_id:
                personProfile['_id'] = linkedin_id
                personProfile['url'] = UnicodeDammit(response.url).markup
                yield personProfile
项目:CVProject    作者:hieuxinhe94    | 项目源码 | 文件源码
def _translate_single_text(self, text, target_language, source_lauguage):
        assert _is_bytes(text)
        def split_text(text):
            start = 0
            text = quote_plus(text)
            length = len(text)
            while (length - start) > self._MAX_LENGTH_PER_QUERY:
                for seperator in self._SEPERATORS:
                    index = text.rfind(seperator, start, start+self._MAX_LENGTH_PER_QUERY)
                    if index != -1:
                        break
                else:
                    raise Error('input too large')
                end = index + len(seperator)
                yield unquote_plus(text[start:end])
                start = end

            yield unquote_plus(text[start:])

        def make_task(text):
            return lambda: self._basic_translate(text, target_language, source_lauguage)[0]

        results = list(self._execute(make_task(i) for i in split_text(text)))
        return tuple(''.join(i[n] for i in results) for n in range(len(self._writing)))
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_unquoting(self):
        # Make sure unquoting of all ASCII values works
        escape_list = []
        for num in range(128):
            given = hexescape(chr(num))
            expect = chr(num)
            result = urllib.unquote(given)
            self.assertEqual(expect, result,
                             "using unquote(): %s != %s" % (expect, result))
            result = urllib.unquote_plus(given)
            self.assertEqual(expect, result,
                             "using unquote_plus(): %s != %s" %
                             (expect, result))
            escape_list.append(given)
        escape_string = ''.join(escape_list)
        del escape_list
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using quote(): not all characters escaped; %s" %
                         result)
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using unquote(): not all characters escaped: "
                         "%s" % result)
项目:lalascan    作者:blackye    | 项目源码 | 文件源码
def argument_query(query_str):
    if query_str and query_str.startswith('?'):
        warn("You don't need to use a leading '?' when setting the query"
         " string, this may be an error!", stacklevel=3)
    if not query_str:
        query_params = {}
    else:
        try:
            # much faster than parse_qsl()
            query_params = dict(( map(unquote_plus, (to_utf8(token) + '=').split('=', 2)[:2])
                                  for token in query_str.split('&') ))
            if len(query_params) == 1 and not query_params.values()[0]:
                query_params = {}
            else:
                query = None
        except Exception:
            ##raise   # XXX DEBUG
            query_params = {}
    return query_params


#------------------------------------------------------------------------------
项目:lalascan    作者:blackye    | 项目源码 | 文件源码
def query(self, query):
        if query and query.startswith('?'):
            warn("You don't need to use a leading '?' when setting the query"
                 " string, this may be an error!", stacklevel=3)
        if not query:
            query_params = {}
        else:
            try:
                # much faster than parse_qsl()
                query_params = dict(( map(unquote_plus, (to_utf8(token) + '=').split('=', 2)[:2])
                                      for token in query.split('&') ))
                if len(query_params) == 1 and not query_params.values()[0]:
                    query_params = {}
                else:
                    query = None
            except Exception:
                ##raise   # XXX DEBUG
                query_params = {}
        self.__query, self.__query_params = query, query_params
项目:lalascan    作者:blackye    | 项目源码 | 文件源码
def netloc(self, netloc):
        if '@' in netloc:
            auth, host = netloc.split('@', 1)
        else:
            auth, host = None, netloc
        port = ''
        if host and host[0] == '[':
            host, port = host[1:].split(']', 1)
            if ':' in port:
                _host, port = port.split(':', 1)
                if not host:
                    host = _host
        elif ':' in host:
            host, port = host.split(':', 1)
        if '%' in port:
            port = unquote(port)
        if port:
            port = int(port)
        if host:
            host = unquote_plus(host)
        self.auth = auth  # TODO: roll back changes if it fails
        self.host = host
        self.port = port
项目:dancedeets-monorepo    作者:mikelambert    | 项目源码 | 文件源码
def _initialize(self, request):
        self.response.headers.add_header('Access-Control-Allow-Headers', 'Content-Type')

        # We use _initialize instead of webapp2's initialize, so that exceptions can be caught easily
        self.fbl = fb_api.FBLookup(None, None)

        if self.request.body:
            logging.info("Request body: %r", self.request.body)
            escaped_body = urllib.unquote_plus(self.request.body.strip('='))
            self.json_body = json.loads(escaped_body)
            logging.info("json_request: %r", self.json_body)
        else:
            self.json_body = None

        if self.requires_auth or self.supports_auth:
            if self.json_body.get('access_token'):
                access_token = self.json_body.get('access_token')
                self.fb_uid = get_user_id_for_token(access_token)
                self.fbl = fb_api.FBLookup(self.fb_uid, access_token)
                logging.info("Access token for user ID %s", self.fb_uid)
            elif self.requires_auth:
                self.add_error("Needs access_token parameter")
项目:script.reddit.reader    作者:gedisony    | 项目源码 | 文件源码
def getParameters(parameterString):
    log("", 5)
    commands = {}
    if getXBMCVersion() >= 12.0:
        parameterString = urllib.unquote_plus(parameterString)
    splitCommands = parameterString[parameterString.find('?') + 1:].split('&')

    for command in splitCommands:
        if (len(command) > 0):
            splitCommand = command.split('=')
            key = splitCommand[0]
            try:
                value = splitCommand[1].encode("utf-8")
            except:
                log("Error utf-8 encoding argument value: " + repr(splitCommand[1]))
                value = splitCommand[1]

            commands[key] = value

    log(repr(commands), 5)
    return commands
项目:script.reddit.reader    作者:gedisony    | 项目源码 | 文件源码
def prettify_reddit_query(subreddit_entry):
    #for search queries; make the reddit query string presentable

    if subreddit_entry.startswith('?'):
        #log('************ prettify_reddit_query='+subreddit_entry)
        tbn=subreddit_entry.split('/')[-1]
        tbn=urllib.unquote_plus(tbn)

        tbn=tbn.replace('?q=','[LIGHT]search:[/LIGHT]' )
        tbn=tbn.replace('site:','' )
        tbn=tbn.replace('&sort=','[LIGHT] sort by:[/LIGHT]' )
        tbn=tbn.replace('&t=','[LIGHT] from:[/LIGHT]' )
        tbn=tbn.replace('subreddit:','r/' )
        tbn=tbn.replace('author:','[LIGHT] by:[/LIGHT]' )
        tbn=tbn.replace('&restrict_sr=on','' )
        tbn=tbn.replace('&restrict_sr=','' )
        tbn=tbn.replace('nsfw:no','' )
        tbn=tbn.replace('nsfw:yes','nsfw' )
        #log('************ prettify_reddit_query='+tbn)
        return tbn
    else:
        return subreddit_entry
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
项目:Data-visualization    作者:insta-code1    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd