Python cgi 模块,parse_qsl() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cgi.parse_qsl()

项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def do_GET(s):
    """Handle a GET request.

    Parses the query parameters and prints a message
    if the flow has completed. Note that we can't detect
    if an error occurred.
    """
    s.send_response(200)
    s.send_header("Content-type", "text/html")
    s.end_headers()
    query = s.path.split('?', 1)[-1]
    query = dict(parse_qsl(query))
    s.server.query_params = query
    s.wfile.write("<html><head><title>Authentication Status</title></head>")
    s.wfile.write("<body><p>The authentication flow has completed.</p>")
    s.wfile.write("</body></html>")
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def _add_query_parameter(url, name, value):
  """Adds a query parameter to a url.

  Replaces the current value if it already exists in the URL.

  Args:
    url: string, url to add the query parameter to.
    name: string, query parameter name.
    value: string, query parameter value.

  Returns:
    Updated query parameter. Does not update the url if value is None.
  """
  if value is None:
    return url
  else:
    parsed = list(urlparse.urlparse(url))
    q = dict(parse_qsl(parsed[4]))
    q[name] = value
    parsed[4] = urllib.urlencode(q)
    return urlparse.urlunparse(parsed)
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def handleRequest(self, env, start_response):
        path = env["PATH_INFO"]
        if env.get("QUERY_STRING"):
            get = dict(cgi.parse_qsl(env['QUERY_STRING']))
        else:
            get = {}
        ui_request = UiRequest(self, get, env, start_response)
        if config.debug:  # Let the exception catched by werkezung
            return ui_request.route(path)
        else:  # Catch and display the error
            try:
                return ui_request.route(path)
            except Exception, err:
                logging.debug("UiRequest error: %s" % Debug.formatException(err))
                return ui_request.error500("Err: %s" % Debug.formatException(err))

    # Reload the UiRequest class to prevent restarts in debug mode
项目:office-interoperability-tools    作者:milossramek    | 项目源码 | 文件源码
def _add_query_parameter(url, name, value):
  """Adds a query parameter to a url.

  Replaces the current value if it already exists in the URL.

  Args:
    url: string, url to add the query parameter to.
    name: string, query parameter name.
    value: string, query parameter value.

  Returns:
    Updated query parameter. Does not update the url if value is None.
  """
  if value is None:
    return url
  else:
    parsed = list(urlparse.urlparse(url))
    q = dict(parse_qsl(parsed[4]))
    q[name] = value
    parsed[4] = urllib.urlencode(q)
    return urlparse.urlunparse(parsed)
项目:office-interoperability-tools    作者:milossramek    | 项目源码 | 文件源码
def do_GET(s):
    """Handle a GET request.

    Parses the query parameters and prints a message
    if the flow has completed. Note that we can't detect
    if an error occurred.
    """
    s.send_response(200)
    s.send_header("Content-type", "text/html")
    s.end_headers()
    query = s.path.split('?', 1)[-1]
    query = dict(parse_qsl(query))
    s.server.query_params = query
    s.wfile.write("<html><head><title>Authentication Status</title></head>")
    s.wfile.write("<body><p>The authentication flow has completed.</p>")
    s.wfile.write("</body></html>")
项目:Chromium_DepotTools    作者:p07r0457    | 项目源码 | 文件源码
def do_GET(s):
    """Handle a GET request.

    Parses the query parameters and prints a message
    if the flow has completed. Note that we can't detect
    if an error occurred.
    """
    s.send_response(200)
    s.send_header("Content-type", "text/html")
    s.end_headers()
    query = s.path.split('?', 1)[-1]
    query = dict(parse_qsl(query))
    s.server.query_params = query
    s.wfile.write("<html><head><title>Authentication Status</title></head>")
    s.wfile.write("<body><p>The authentication flow has completed.</p>")
    s.wfile.write("</body></html>")
项目:Chromium_DepotTools    作者:p07r0457    | 项目源码 | 文件源码
def _add_query_parameter(url, name, value):
  """Adds a query parameter to a url.

  Replaces the current value if it already exists in the URL.

  Args:
    url: string, url to add the query parameter to.
    name: string, query parameter name.
    value: string, query parameter value.

  Returns:
    Updated query parameter. Does not update the url if value is None.
  """
  if value is None:
    return url
  else:
    parsed = list(urlparse.urlparse(url))
    q = dict(parse_qsl(parsed[4]))
    q[name] = value
    parsed[4] = urllib.urlencode(q)
    return urlparse.urlunparse(parsed)
项目:node-gn    作者:Shouqun    | 项目源码 | 文件源码
def do_GET(s):
    """Handle a GET request.

    Parses the query parameters and prints a message
    if the flow has completed. Note that we can't detect
    if an error occurred.
    """
    s.send_response(200)
    s.send_header("Content-type", "text/html")
    s.end_headers()
    query = s.path.split('?', 1)[-1]
    query = dict(parse_qsl(query))
    s.server.query_params = query
    s.wfile.write("<html><head><title>Authentication Status</title></head>")
    s.wfile.write("<body><p>The authentication flow has completed.</p>")
    s.wfile.write("</body></html>")
项目:node-gn    作者:Shouqun    | 项目源码 | 文件源码
def _add_query_parameter(url, name, value):
  """Adds a query parameter to a url.

  Replaces the current value if it already exists in the URL.

  Args:
    url: string, url to add the query parameter to.
    name: string, query parameter name.
    value: string, query parameter value.

  Returns:
    Updated query parameter. Does not update the url if value is None.
  """
  if value is None:
    return url
  else:
    parsed = list(urlparse.urlparse(url))
    q = dict(parse_qsl(parsed[4]))
    q[name] = value
    parsed[4] = urllib.urlencode(q)
    return urlparse.urlunparse(parsed)
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def parse_backend_uri(backend_uri):
    """
    Converts the "backend_uri" into a cache scheme ('db', 'memcached', etc), a
    host and any extra params that are required for the backend. Returns a
    (scheme, host, params) tuple.
    """
    if backend_uri.find(':') == -1:
        raise InvalidCacheBackendError("Backend URI must start with scheme://")
    scheme, rest = backend_uri.split(':', 1)
    if not rest.startswith('//'):
        raise InvalidCacheBackendError("Backend URI must start with scheme://")

    host = rest[2:]
    qpos = rest.find('?')
    if qpos != -1:
        params = dict(parse_qsl(rest[qpos+1:]))
        host = rest[2:qpos]
    else:
        params = {}
    if host.endswith('/'):
        host = host[:-1]

    return scheme, host, params
项目:depot_tools    作者:webrtc-uwp    | 项目源码 | 文件源码
def do_GET(s):
    """Handle a GET request.

    Parses the query parameters and prints a message
    if the flow has completed. Note that we can't detect
    if an error occurred.
    """
    s.send_response(200)
    s.send_header("Content-type", "text/html")
    s.end_headers()
    query = s.path.split('?', 1)[-1]
    query = dict(parse_qsl(query))
    s.server.query_params = query
    s.wfile.write("<html><head><title>Authentication Status</title></head>")
    s.wfile.write("<body><p>The authentication flow has completed.</p>")
    s.wfile.write("</body></html>")
项目:depot_tools    作者:webrtc-uwp    | 项目源码 | 文件源码
def _add_query_parameter(url, name, value):
  """Adds a query parameter to a url.

  Replaces the current value if it already exists in the URL.

  Args:
    url: string, url to add the query parameter to.
    name: string, query parameter name.
    value: string, query parameter value.

  Returns:
    Updated query parameter. Does not update the url if value is None.
  """
  if value is None:
    return url
  else:
    parsed = list(urlparse.urlparse(url))
    q = dict(parse_qsl(parsed[4]))
    q[name] = value
    parsed[4] = urllib.urlencode(q)
    return urlparse.urlunparse(parsed)
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def _update_query_params(uri, params):
  """Updates a URI with new query parameters.

  Args:
    uri: string, A valid URI, with potential existing query parameters.
    params: dict, A dictionary of query parameters.

  Returns:
    The same URI but with the new query parameters added.
  """
  parts = list(urlparse.urlparse(uri))
  query_params = dict(parse_qsl(parts[4])) # 4 is the index of the query part
  query_params.update(params)
  parts[4] = urllib.urlencode(query_params)
  return urlparse.urlunparse(parts)
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def _parse_exchange_token_response(content):
  """Parses response of an exchange token request.

  Most providers return JSON but some (e.g. Facebook) return a
  url-encoded string.

  Args:
    content: The body of a response

  Returns:
    Content as a dictionary object. Note that the dict could be empty,
    i.e. {}. That basically indicates a failure.
  """
  resp = {}
  try:
    resp = simplejson.loads(content)
  except StandardError:
    # different JSON libs raise different exceptions,
    # so we just do a catch-all here
    resp = dict(parse_qsl(content))

  # some providers respond with 'expires', others with 'expires_in'
  if resp and 'expires' in resp:
    resp['expires_in'] = resp.pop('expires')

  return resp
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def postReformat(self, postdata):
        return urllib.urlencode(cgi.parse_qsl(postdata))
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def querystringReformat(self, qsdata):
        temp = qsdata.split("?")
        if len(temp) == 2:
            return temp[0] + "?" + urllib.urlencode(cgi.parse_qsl(temp[1]))
        else:
            return qsdata
项目:yahoo-fantasy-football-metrics    作者:uberfastman    | 项目源码 | 文件源码
def test_y_token_to_string(self):
        token = yql.YahooToken('test-key', 'test-secret')
        token_to_string = token.to_string()
        string_data = dict(parse_qsl(token_to_string))
        self.assertEqual(string_data.get('oauth_token'), 'test-key')
        self.assertEqual(string_data.get('oauth_token_secret'), 'test-secret')
项目:yahoo-fantasy-football-metrics    作者:uberfastman    | 项目源码 | 文件源码
def test_y_token_to_string2(self):
        token = yql.YahooToken('test-key', 'test-secret')

        token.timestamp = '1111'
        token.session_handle = 'poop'
        token.callback_confirmed = 'basilfawlty'

        token_to_string = token.to_string()
        string_data = dict(parse_qsl(token_to_string))
        self.assertEqual(string_data.get('oauth_token'), 'test-key')
        self.assertEqual(string_data.get('oauth_token_secret'), 'test-secret')
        self.assertEqual(string_data.get('token_creation_timestamp'), '1111')
        self.assertEqual(string_data.get('oauth_callback_confirmed'), 'basilfawlty')
        self.assertEqual(string_data.get('oauth_session_handle'), 'poop')
项目:yahoo-fantasy-football-metrics    作者:uberfastman    | 项目源码 | 文件源码
def test_request_for_two_legged(self):
        query = 'SELECT * from foo'
        y = TestTwoLegged('test-api-key', 'test-secret', httplib2_inst=httplib2.Http())
        signed_url = y.execute(query)
        qs  = dict(parse_qsl(signed_url.split('?')[1]))
        self.assertEqual(qs['q'], query)
        self.assertEqual(qs['format'], 'json')
项目:yahoo-fantasy-football-metrics    作者:uberfastman    | 项目源码 | 文件源码
def test_request_for_three_legged(self):
        query = 'SELECT * from foo'
        y = TestThreeLegged('test-api-key', 'test-secret',
                                            httplib2_inst=httplib2.Http())
        token = oauth.Token.from_string(
                            'oauth_token=foo&oauth_token_secret=bar')
        signed_url = y.execute(query, token=token)
        qs  = dict(parse_qsl(signed_url.split('?')[1]))
        self.assertEqual(qs['q'], query)
        self.assertEqual(qs['format'], 'json')
项目:yahoo-fantasy-football-metrics    作者:uberfastman    | 项目源码 | 文件源码
def query_params(self):
        """The query parameters of the uri used to call the YQL API"""
        if self.uri:
            q_string = urlparse(self.uri)[4]
            return dict(parse_qsl(q_string))
        else:
            return {}
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def getPosted(self):
        if self.env['REQUEST_METHOD'] == "POST":
            return dict(cgi.parse_qsl(
                self.env['wsgi.input'].readline().decode()
            ))
        else:
            return {}

    # Return: <dict> Cookies based on self.env
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def getCookies(self):
        raw_cookies = self.env.get('HTTP_COOKIE')
        if raw_cookies:
            cookies = cgi.parse_qsl(raw_cookies)
            return {key.strip(): val for key, val in cookies}
        else:
            return {}
项目:office-interoperability-tools    作者:milossramek    | 项目源码 | 文件源码
def _parse_exchange_token_response(content):
  """Parses response of an exchange token request.

  Most providers return JSON but some (e.g. Facebook) return a
  url-encoded string.

  Args:
    content: The body of a response

  Returns:
    Content as a dictionary object. Note that the dict could be empty,
    i.e. {}. That basically indicates a failure.
  """
  resp = {}
  try:
    resp = simplejson.loads(content)
  except StandardError:
    # different JSON libs raise different exceptions,
    # so we just do a catch-all here
    resp = dict(parse_qsl(content))

  # some providers respond with 'expires', others with 'expires_in'
  if resp and 'expires' in resp:
    resp['expires_in'] = resp.pop('expires')

  return resp
项目:office-interoperability-tools    作者:milossramek    | 项目源码 | 文件源码
def step1_get_authorize_url(self, redirect_uri=None):
    """Returns a URI to redirect to the provider.

    Args:
      redirect_uri: string, Either the string 'urn:ietf:wg:oauth:2.0:oob' for
          a non-web-based application, or a URI that handles the callback from
          the authorization server. This parameter is deprecated, please move to
          passing the redirect_uri in via the constructor.

    Returns:
      A URI as a string to redirect the user to begin the authorization flow.
    """
    if redirect_uri is not None:
      logger.warning(('The redirect_uri parameter for'
          'OAuth2WebServerFlow.step1_get_authorize_url is deprecated. Please'
          'move to passing the redirect_uri in via the constructor.'))
      self.redirect_uri = redirect_uri

    if self.redirect_uri is None:
      raise ValueError('The value of redirect_uri must not be None.')

    query = {
        'response_type': 'code',
        'client_id': self.client_id,
        'redirect_uri': self.redirect_uri,
        'scope': self.scope,
        }
    query.update(self.params)
    parts = list(urlparse.urlparse(self.auth_uri))
    query.update(dict(parse_qsl(parts[4]))) # 4 is the index of the query part
    parts[4] = urllib.urlencode(query)
    return urlparse.urlunparse(parts)
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_y_token_to_string(self):
        token = yql.YahooToken('test-key', 'test-secret')
        token_to_string = token.to_string()
        string_data = dict(parse_qsl(token_to_string))
        self.assertEqual(string_data.get('oauth_token'), 'test-key')
        self.assertEqual(string_data.get('oauth_token_secret'), 'test-secret')
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_y_token_to_string2(self):
        token = yql.YahooToken('test-key', 'test-secret')

        token.timestamp = '1111'
        token.session_handle = 'poop'
        token.callback_confirmed = 'basilfawlty'

        token_to_string = token.to_string()
        string_data = dict(parse_qsl(token_to_string))
        self.assertEqual(string_data.get('oauth_token'), 'test-key')
        self.assertEqual(string_data.get('oauth_token_secret'), 'test-secret')
        self.assertEqual(string_data.get('token_creation_timestamp'), '1111')
        self.assertEqual(string_data.get('oauth_callback_confirmed'), 'basilfawlty')
        self.assertEqual(string_data.get('oauth_session_handle'), 'poop')
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_request_for_two_legged(self):
        query = 'SELECT * from foo'
        y = TestTwoLegged('test-api-key', 'test-secret', httplib2_inst=httplib2.Http())
        signed_url = y.execute(query)
        qs  = dict(parse_qsl(signed_url.split('?')[1]))
        self.assertEqual(qs['q'], query)
        self.assertEqual(qs['format'], 'json')
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_request_for_three_legged(self):
        query = 'SELECT * from foo'
        y = TestThreeLegged('test-api-key', 'test-secret',
                                            httplib2_inst=httplib2.Http())
        token = oauth.Token.from_string(
                            'oauth_token=foo&oauth_token_secret=bar')
        signed_url = y.execute(query, token=token)
        qs  = dict(parse_qsl(signed_url.split('?')[1]))
        self.assertEqual(qs['q'], query)
        self.assertEqual(qs['format'], 'json')
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def query_params(self):
        """The query parameters of the uri used to call the YQL API"""
        if self.uri:
            q_string = urlparse(self.uri)[4]
            return dict(parse_qsl(q_string))
        else:
            return {}
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def extract_params(self):
    """Returns the parameters for this task.

    Returns:
      A dictionary of strings mapping parameter names to their values as
      strings. If the same name parameter has several values then the value will
      be a list of strings. For POST and PULL requests then the parameters are
      extracted from the task payload. For all other methods, the parameters are
      extracted from the url query string. An empty dictionary is returned if
      the task contains an empty payload or query string.

    Raises:
      ValueError: if the payload does not contain valid
        'application/x-www-form-urlencoded' data (for POST and PULL) or the url
        does not contain a valid query (all other methods).
    """
    if self.__method in ('PULL', 'POST'):

      query = self.__payload
    else:
      query = urlparse.urlparse(self.__relative_url).query

    p = {}
    if not query:
      return p

    for key, value in cgi.parse_qsl(
        query, keep_blank_values=True, strict_parsing=True):
      p.setdefault(key, []).append(value)

    for key, value in p.items():
      if len(value) == 1:
        p[key] = value[0]

    return p
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def extract_params(self):
    """Returns the parameters for this task.

    Returns:
      A dictionary of strings mapping parameter names to their values as
      strings. If the same name parameter has several values then the value will
      be a list of strings. For POST and PULL requests then the parameters are
      extracted from the task payload. For all other methods, the parameters are
      extracted from the url query string. An empty dictionary is returned if
      the task contains an empty payload or query string.

    Raises:
      ValueError: if the payload does not contain valid
        'application/x-www-form-urlencoded' data (for POST and PULL) or the url
        does not contain a valid query (all other methods).
    """
    if self.__method in ('PULL', 'POST'):

      query = self.__payload
    else:
      query = urlparse.urlparse(self.__relative_url).query

    p = {}
    if not query:
      return p

    for key, value in cgi.parse_qsl(
        query, keep_blank_values=True, strict_parsing=True):
      p.setdefault(key, []).append(value)

    for key, value in p.items():
      if len(value) == 1:
        p[key] = value[0]

    return p
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _verifyReturnToArgs(query):
        """Verify that the arguments in the return_to URL are present in this
        response.
        """
        message = Message.fromPostArgs(query)
        return_to = message.getArg(OPENID_NS, 'return_to')

        if return_to is None:
            raise ProtocolError('Response has no return_to')

        parsed_url = urlparse(return_to)
        rt_query = parsed_url[4]
        parsed_args = cgi.parse_qsl(rt_query)

        for rt_key, rt_value in parsed_args:
            try:
                value = query[rt_key]
                if rt_value != value:
                    format = ("parameter %s value %r does not match "
                              "return_to's value %r")
                    raise ProtocolError(format % (rt_key, value, rt_value))
            except KeyError:
                format = "return_to parameter %s absent from query %r"
                raise ProtocolError(format % (rt_key, query))

        # Make sure all non-OpenID arguments in the response are also
        # in the signed return_to.
        bare_args = message.getArgs(BARE_NS)
        for pair in bare_args.iteritems():
            if pair not in parsed_args:
                raise ProtocolError("Parameter %s not in return_to URL" % (pair[0],))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_deprecated_parse_qsl(self):
        # this func is moved to urllib.parse, this is just a sanity check
        with check_warnings(('cgi.parse_qsl is deprecated, use urllib.parse.'
                             'parse_qsl instead', DeprecationWarning)):
            self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
                             cgi.parse_qsl('a=A1&b=B2&B=B3'))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_deprecated_parse_qsl(self):
        # this func is moved to urlparse, this is just a sanity check
        with check_warnings(('cgi.parse_qsl is deprecated, use urlparse.'
                             'parse_qsl instead', PendingDeprecationWarning)):
            self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
                             cgi.parse_qsl('a=A1&b=B2&B=B3'))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_deprecated_parse_qsl(self):
        # this func is moved to urlparse, this is just a sanity check
        with check_warnings(('cgi.parse_qsl is deprecated, use urlparse.'
                             'parse_qsl instead', PendingDeprecationWarning)):
            self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
                             cgi.parse_qsl('a=A1&b=B2&B=B3'))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_deprecated_parse_qsl(self):
        # this func is moved to urllib.parse, this is just a sanity check
        with check_warnings(('cgi.parse_qsl is deprecated, use urllib.parse.'
                             'parse_qsl instead', DeprecationWarning)):
            self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
                             cgi.parse_qsl('a=A1&b=B2&B=B3'))
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def extract_params(self):
    """Returns the parameters for this task.

    Returns:
      A dictionary of strings mapping parameter names to their values as
      strings. If the same name parameter has several values then the value will
      be a list of strings. For POST and PULL requests then the parameters are
      extracted from the task payload. For all other methods, the parameters are
      extracted from the url query string. An empty dictionary is returned if
      the task contains an empty payload or query string.

    Raises:
      ValueError: if the payload does not contain valid
        'application/x-www-form-urlencoded' data (for POST and PULL) or the url
        does not contain a valid query (all other methods).
    """
    if self.__method in ('PULL', 'POST'):

      query = self.__payload
    else:
      query = urlparse.urlparse(self.__relative_url).query

    p = {}
    if not query:
      return p

    for key, value in cgi.parse_qsl(
        query, keep_blank_values=True, strict_parsing=True):
      p.setdefault(key, []).append(value)

    for key, value in p.items():
      if len(value) == 1:
        p[key] = value[0]

    return p
项目:autosub-bootstrapbill    作者:BenjV    | 项目源码 | 文件源码
def request(self, uri, method="GET", body=None, headers=None, 
        redirections=httplib2.DEFAULT_MAX_REDIRECTS, connection_type=None,
        force_auth_header=False):

        if not isinstance(headers, dict):
            headers = {}

        if body and method == "POST":
            parameters = dict(parse_qsl(body))
        elif method == "GET":
            parsed = urlparse.urlparse(uri)
            parameters = parse_qs(parsed.query)     
        else:
            parameters = None

        req = Request.from_consumer_and_token(self.consumer, token=self.token,
            http_method=method, http_url=uri, parameters=parameters)

        req.sign_request(self.method, self.consumer, self.token)

        if force_auth_header:
            # ensure we always send Authorization
            headers.update(req.to_header())

        if method == "POST":
            if not force_auth_header:
                body = req.to_postdata()
            else:
                body = req.encode_postdata(req.get_nonoauth_parameters())
            headers['Content-Type'] = 'application/x-www-form-urlencoded'
        elif method == "GET":
            if not force_auth_header:
                uri = req.to_url()
        else:
            if not force_auth_header:
                # don't call update twice.
                headers.update(req.to_header())

        return httplib2.Http.request(self, uri, method=method, body=body, 
            headers=headers, redirections=redirections, 
            connection_type=connection_type)
项目:gdata-python3    作者:dvska    | 项目源码 | 文件源码
def generate_authorize_url(self, redirect_uri='urn:ietf:wg:oauth:2.0:oob',
                               response_type='code', access_type='offline',
                               approval_prompt='auto', **kwargs):
        """Returns a URI to redirect to the provider.

        Args:
          redirect_uri: Either the string 'urn:ietf:wg:oauth:2.0:oob' for a
                        non-web-based application, or a URI that handles the
                        callback from the authorization server.
          response_type: Either the string 'code' for server-side or native
                         application, or the string 'token' for client-side
                         application.
          access_type: Either the string 'offline' to request a refresh token or
                       'online'.
          approval_prompt: Either the string 'auto' to let the OAuth mechanism
                           determine if the approval page is needed or 'force'
                           if the approval prompt is desired in all cases.

        If redirect_uri is 'urn:ietf:wg:oauth:2.0:oob' then pass in the
        generated verification code to get_access_token,
        otherwise pass in the query parameters received
        at the callback uri to get_access_token.
        If the response_type is 'token', no need to call
        get_access_token as the API will return it within
        the query parameters received at the callback:
          oauth2_token.access_token = YOUR_ACCESS_TOKEN
        """
        self.redirect_uri = redirect_uri
        query = {
            'response_type': response_type,
            'client_id': self.client_id,
            'redirect_uri': redirect_uri,
            'scope': self.scope,
            'approval_prompt': approval_prompt,
            'access_type': access_type
        }
        query.update(kwargs)
        parts = list(urllib.parse.urlparse(self.auth_uri))
        query.update(dict(parse_qsl(parts[4])))  # 4 is the index of the query part
        parts[4] = urllib.parse.urlencode(query)
        return urllib.parse.urlunparse(parts)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_deprecated_parse_qsl(self):
        # this func is moved to urlparse, this is just a sanity check
        with check_warnings(('cgi.parse_qsl is deprecated, use urlparse.'
                             'parse_qsl instead', PendingDeprecationWarning)):
            self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
                             cgi.parse_qsl('a=A1&b=B2&B=B3'))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_deprecated_parse_qsl(self):
        # this func is moved to urllib.parse, this is just a sanity check
        with check_warnings(('cgi.parse_qsl is deprecated, use urllib.parse.'
                             'parse_qsl instead', DeprecationWarning)):
            self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
                             cgi.parse_qsl('a=A1&b=B2&B=B3'))
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_deprecated_parse_qsl(self):
        # this func is moved to urlparse, this is just a sanity check
        with check_warnings(('cgi.parse_qsl is deprecated, use urlparse.'
                             'parse_qsl instead', PendingDeprecationWarning)):
            self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
                             cgi.parse_qsl('a=A1&b=B2&B=B3'))
项目:Chromium_DepotTools    作者:p07r0457    | 项目源码 | 文件源码
def _update_query_params(uri, params):
  """Updates a URI with new query parameters.

  Args:
    uri: string, A valid URI, with potential existing query parameters.
    params: dict, A dictionary of query parameters.

  Returns:
    The same URI but with the new query parameters added.
  """
  parts = list(urlparse.urlparse(uri))
  query_params = dict(parse_qsl(parts[4])) # 4 is the index of the query part
  query_params.update(params)
  parts[4] = urllib.urlencode(query_params)
  return urlparse.urlunparse(parts)
项目:Chromium_DepotTools    作者:p07r0457    | 项目源码 | 文件源码
def _parse_exchange_token_response(content):
  """Parses response of an exchange token request.

  Most providers return JSON but some (e.g. Facebook) return a
  url-encoded string.

  Args:
    content: The body of a response

  Returns:
    Content as a dictionary object. Note that the dict could be empty,
    i.e. {}. That basically indicates a failure.
  """
  resp = {}
  try:
    resp = simplejson.loads(content)
  except StandardError:
    # different JSON libs raise different exceptions,
    # so we just do a catch-all here
    resp = dict(parse_qsl(content))

  # some providers respond with 'expires', others with 'expires_in'
  if resp and 'expires' in resp:
    resp['expires_in'] = resp.pop('expires')

  return resp
项目:node-gn    作者:Shouqun    | 项目源码 | 文件源码
def _update_query_params(uri, params):
  """Updates a URI with new query parameters.

  Args:
    uri: string, A valid URI, with potential existing query parameters.
    params: dict, A dictionary of query parameters.

  Returns:
    The same URI but with the new query parameters added.
  """
  parts = list(urlparse.urlparse(uri))
  query_params = dict(parse_qsl(parts[4])) # 4 is the index of the query part
  query_params.update(params)
  parts[4] = urllib.urlencode(query_params)
  return urlparse.urlunparse(parts)
项目:node-gn    作者:Shouqun    | 项目源码 | 文件源码
def _parse_exchange_token_response(content):
  """Parses response of an exchange token request.

  Most providers return JSON but some (e.g. Facebook) return a
  url-encoded string.

  Args:
    content: The body of a response

  Returns:
    Content as a dictionary object. Note that the dict could be empty,
    i.e. {}. That basically indicates a failure.
  """
  resp = {}
  try:
    resp = simplejson.loads(content)
  except StandardError:
    # different JSON libs raise different exceptions,
    # so we just do a catch-all here
    resp = dict(parse_qsl(content))

  # some providers respond with 'expires', others with 'expires_in'
  if resp and 'expires' in resp:
    resp['expires_in'] = resp.pop('expires')

  return resp
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def extract_params(self):
    """Returns the parameters for this task.

    Returns:
      A dictionary of strings mapping parameter names to their values as
      strings. If the same name parameter has several values then the value will
      be a list of strings. For POST and PULL requests then the parameters are
      extracted from the task payload. For all other methods, the parameters are
      extracted from the url query string. An empty dictionary is returned if
      the task contains an empty payload or query string.

    Raises:
      ValueError: if the payload does not contain valid
        'application/x-www-form-urlencoded' data (for POST and PULL) or the url
        does not contain a valid query (all other methods).
    """
    if self.__method in ('PULL', 'POST'):

      query = self.__payload
    else:
      query = urlparse.urlparse(self.__relative_url).query

    p = {}
    if not query:
      return p

    for key, value in cgi.parse_qsl(
        query, keep_blank_values=True, strict_parsing=True):
      p.setdefault(key, []).append(value)

    for key, value in p.items():
      if len(value) == 1:
        p[key] = value[0]

    return p
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_deprecated_parse_qsl(self):
        # this func is moved to urllib.parse, this is just a sanity check
        with check_warnings(('cgi.parse_qsl is deprecated, use urllib.parse.'
                             'parse_qsl instead', DeprecationWarning)):
            self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')],
                             cgi.parse_qsl('a=A1&b=B2&B=B3'))
项目:micro-blog    作者:nickChenyx    | 项目源码 | 文件源码
def _verifyReturnToArgs(query):
        """Verify that the arguments in the return_to URL are present in this
        response.
        """
        message = Message.fromPostArgs(query)
        return_to = message.getArg(OPENID_NS, 'return_to')

        if return_to is None:
            raise ProtocolError('Response has no return_to')

        parsed_url = urlparse(return_to)
        rt_query = parsed_url[4]
        parsed_args = cgi.parse_qsl(rt_query)

        for rt_key, rt_value in parsed_args:
            try:
                value = query[rt_key]
                if rt_value != value:
                    format = ("parameter %s value %r does not match "
                              "return_to's value %r")
                    raise ProtocolError(format % (rt_key, value, rt_value))
            except KeyError:
                format = "return_to parameter %s absent from query %r"
                raise ProtocolError(format % (rt_key, query))

        # Make sure all non-OpenID arguments in the response are also
        # in the signed return_to.
        bare_args = message.getArgs(BARE_NS)
        for pair in bare_args.iteritems():
            if pair not in parsed_args:
                raise ProtocolError("Parameter %s not in return_to URL" % (pair[0],))