Python six.moves.urllib.request 模块,Request() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用six.moves.urllib.request.Request()

项目:tempest-horizon    作者:openstack    | 项目源码 | 文件源码
def user_login(self, username, password):
        response = self._get_opener().open(CONF.dashboard.dashboard_url).read()

        # Grab the CSRF token and default region
        parser = HorizonHTMLParser()
        parser.feed(response)

        # construct login url for dashboard, discovery accommodates non-/ web
        # root for dashboard
        login_url = parse.urljoin(CONF.dashboard.dashboard_url, parser.login)

        # Prepare login form request
        req = request.Request(login_url)
        req.add_header('Content-type', 'application/x-www-form-urlencoded')
        req.add_header('Referer', CONF.dashboard.dashboard_url)

        # Pass the default domain name regardless of the auth version in order
        # to test the scenario of when horizon is running with keystone v3
        params = {'username': username,
                  'password': password,
                  'region': parser.region,
                  'domain': CONF.auth.default_credentials_domain_name,
                  'csrfmiddlewaretoken': parser.csrf_token}
        self._get_opener().open(req, parse.urlencode(params))
项目:lago-workshop    作者:lago-project    | 项目源码 | 文件源码
def create_credentials_on_jenkins(jenkins_api, _uuid):
    cred_exist = has_credentials_on_jenkins(jenkins_api, _uuid)
    if cred_exist:
        return cred_exist

    headers = {'Content-Type': 'application/x-www-form-urlencoded'}
    payload = '''json={
        "": "0",
        "credentials": {
            "scope": "GLOBAL",
            "id": "%s",
            "username": "root",
            "password": "123456",
            "description": "test",
            "$class": "com.cloudbees.plugins.credentials.impl.UsernamePasswordCredentialsImpl"
        }
    }''' % _uuid
    url = jenkins_api._build_url(
        'credentials/store/system/domain/_/createCredentials'
    )
    request = Request(url, payload, headers)
    jenkins_api.jenkins_open(request)

    return has_credentials_on_jenkins(jenkins_api, _uuid)
项目:lago-workshop    作者:lago-project    | 项目源码 | 文件源码
def create_credentials_on_jenkins(jenkins_api, _uuid):
    cred_exist = has_credentials_on_jenkins(jenkins_api, _uuid)
    if cred_exist:
        return cred_exist

    headers = {'Content-Type': 'application/x-www-form-urlencoded'}
    payload = '''json={
        "": "0",
        "credentials": {
            "scope": "GLOBAL",
            "id": "%s",
            "username": "root",
            "password": "123456",
            "description": "test",
            "$class": "com.cloudbees.plugins.credentials.impl.UsernamePasswordCredentialsImpl"
        }
    }''' % _uuid
    url = jenkins_api._build_url(
        'credentials/store/system/domain/_/createCredentials'
    )
    request = Request(url, payload, headers)
    jenkins_api.jenkins_open(request)

    return has_credentials_on_jenkins(jenkins_api, _uuid)
项目:spacel-provision    作者:pebble    | 项目源码 | 文件源码
def _pd_api(self, url, data=None, method='GET'):
        url = '%s/%s' % (PD_API_BASE, url)
        request_args = {
            'headers': dict(self._pd_headers)
        }
        if six.PY3:  # pragma: no cover
            request_args['method'] = method

        if data is not None:
            request_args['data'] = json.dumps(data).encode('utf-8')
            request_args['headers']['Content-Type'] = APPLICATION_JSON

        request = Request(url, **request_args)
        if six.PY2:  # pragma: no cover
            request.get_method = lambda: method

        try:
            response = urlopen(request)
            return json.loads(response.read().decode('utf-8'))
        except HTTPError as e:
            response = e.read().decode('utf-8')
            logger.warning("API error: %s", response)
            if method == 'GET' and e.code == 404:
                return None
            else:
                raise e
项目:cegr-galaxy    作者:seqcode    | 项目源码 | 文件源码
def post(api_key, url, data):
    url = make_url(api_key, url)
    response = Request(url, headers={'Content-Type': 'application/json'}, data=json.dumps(data))
    return json.loads(urlopen(response).read())
项目:babis    作者:glogiotatidis    | 项目源码 | 文件源码
def ping_url(self, url):
        method = self.method.lower()
        data = None if method == 'get' else ''
        headers = {
            'User-Agent': self.user_agent,
        }
        request = Request(url, data=data, headers=headers)
        try:
            urlopen(request)
        except URLError:
            if not self.fail_silenty:
                raise
项目:lago-workshop    作者:lago-project    | 项目源码 | 文件源码
def has_credentials_on_jenkins(jenkins_api, _uuid):
    headers = {'Content-Type': 'application/json'}
    path = 'credentials/store/system/domain/_/credential/%(uuid)s/api/json'
    url = jenkins_api._build_url(path, {'uuid': _uuid})
    request = Request(url, headers=headers)
    try:
        jenkins_api.jenkins_open(request)
        return _uuid
    except jenkins.NotFoundException:
        return False
项目:lago-workshop    作者:lago-project    | 项目源码 | 文件源码
def restart_jenkins(jenkins_api):
    headers = {'Content-Type': 'application/x-www-form-urlencoded'}
    path = 'restart'
    url = jenkins_api._build_url(path)
    payload = '''json={}
    Submit: Yes
    '''
    request = Request(url, payload, headers)
    try:
        jenkins_api.jenkins_open(request)
    except HTTPError as e:
        if e.code != 503:
            raise
项目:lago-workshop    作者:lago-project    | 项目源码 | 文件源码
def has_credentials_on_jenkins(jenkins_api, _uuid):
    headers = {'Content-Type': 'application/json'}
    path = 'credentials/store/system/domain/_/credential/%(uuid)s/api/json'
    url = jenkins_api._build_url(path, {'uuid': _uuid})
    request = Request(url, headers=headers)
    try:
        jenkins_api.jenkins_open(request)
        return _uuid
    except jenkins.NotFoundException:
        return False
项目:spacel-provision    作者:stratos    | 项目源码 | 文件源码
def _pd_api(self, url, data=None, method='GET'):
        url = '%s/%s' % (PD_API_BASE, url)
        request_args = {
            'headers': dict(self._pd_headers)
        }
        if six.PY3:  # pragma: no cover
            request_args['method'] = method

        if data is not None:
            request_args['data'] = json.dumps(data).encode('utf-8')
            request_args['headers']['Content-Type'] = APPLICATION_JSON

        request = Request(url, **request_args)
        if six.PY2:  # pragma: no cover
            request.get_method = lambda: method

        try:
            response = urlopen(request)
            return json.loads(response.read().decode('utf-8'))
        except HTTPError as e:
            response = e.read().decode('utf-8')
            logger.warning("API error: %s", response)
            if method == 'GET' and e.code == 404:
                return None
            else:
                raise e
项目:nadex    作者:knoguchi    | 项目源码 | 文件源码
def _call(self, base_url, url, body):
        """Open a network connection and performs HTTP Post
        with provided body.
        """
        # Combines the "base_url" with the
        # required "url" to be used for the specific request.
        url = urljoin(base_url.geturl(), url)
        headers = {'User-Agent': 'Lightstreamer iOS client/1.2.7'}
        req = Request(url)
        for k, v in headers.items():
            req.add_header(k, v)
        return urlopen(req, data=self._encode_params(body))
项目:nadex    作者:knoguchi    | 项目源码 | 文件源码
def disconnect(self):
        """Request to close the session previously opened with
        the connect() invocation.
        """
        if self._stream_connection is not None:
            # Close the HTTP connection
            self._stream_connection.close()
            log.debug("Connection closed")
            print("DISCONNECTED FROM LIGHTSTREAMER")
        else:
            log.warning("No connection to Lightstreamer")
项目:kobo    作者:release-engineering    | 项目源码 | 文件源码
def _request(self, host, handler, request_body, verbose=0):
        """Send a HTTP request."""
        h = self.make_connection(host)
        self.verbose = verbose
        if verbose:
            h.set_debuglevel(1)

        request_url = "%s://%s/" % (self.scheme, host)
        cookie_request = urllib2.Request(request_url)

        self.send_request(h, handler, request_body)
        self.send_host(h, host)
        self.send_cookies(h, cookie_request)
        self.send_user_agent(h)
        self.send_content(h, request_body)
        errcode, errmsg, headers = h.getreply()

        if errcode == 401 and USE_KERBEROS:
            vc, challenge = self._kerberos_client_request(host, handler, errcode, errmsg, headers)
            # retry the original request & add the Authorization header:
            self.send_request(h, handler, request_body)
            self.send_host(h, host)
            h.putheader("Authorization", "Negotiate %s" % challenge)
            self.send_cookies(h, cookie_request)
            self.send_user_agent(h)
            self.send_content(h, request_body)
            errcode, errmsg, headers = h.getreply()
            self._kerberos_verify_response(vc, host, handler, errcode, errmsg, headers)

        elif errcode != 200:
            raise xmlrpclib.ProtocolError(host + handler, errcode, errmsg, headers)

        self._save_cookies(headers, cookie_request)

        try:
            sock = h._conn.sock
        except AttributeError:
            sock = None

        return self._parse_response(h.getfile(), sock)
项目:fanfou-py    作者:akgnah    | 项目源码 | 文件源码
def oauth_request(self, url, method='GET', args={}, headers={}):
        base_args = {
            'oauth_consumer_key': self.oauth_consumer['key'],
            'oauth_signature_method': 'HMAC-SHA1',
            'oauth_timestamp': oauth_timestamp(),
            'oauth_nonce': oauth_nonce(),
            'oauth_version': '1.0'
        }

        args = args.copy()
        headers = headers.copy()
        headers['User-Agent'] = headers.get('User-Agent', 'fanfou-py')

        if url.startswith('/'):
            if ':' in url:
                path, _ = url.split(':')
                if args.get('id'):
                    url = path + oauth_escape(args['id'], via='quote_plus', safe='')
                else:
                    url = path[:-1]
            url = self.base_api_url % url

        if method == 'POST':
            headers['Content-Type'] = headers.get('Content-Type', self.form_urlencoded)
            (headers['Content-Type'] == self.form_urlencoded) and base_args.update(args)
        else:
            base_args.update(args)
            url = url + '?' + oauth_query(args, via='quote_plus', safe='')

        self.oauth_token and base_args.update({'oauth_token': self.oauth_token['key']})
        base_args['oauth_signature'] = self.oauth_signature(url, method, base_args)
        headers.update(self.oauth_header(base_args))

        req = request.Request(url, headers=headers)

        if headers.get('Content-Type') == self.form_urlencoded:
            data = oauth_query(args, via='quote_plus', safe='').encode()
        elif 'form-data' in headers.get('Content-Type', ''):  # multipart/form-data
            data = args['form-data']
        else:
            data = None

        resp = request.urlopen(req, data=data)
        resp.json = lambda: json.loads(resp.read().decode() or '""')
        return resp
项目:molPX    作者:markovmodel    | 项目源码 | 文件源码
def _version_check(current, testing=False):
    """ checks latest version online from the server and logs some user metadata.
    Can be disabled by setting config.check_version = False.

    """

    import platform
    import os
    from six.moves.urllib.request import urlopen, Request
    from contextlib import closing
    import threading
    import uuid

    import sys
    if 'pytest' in sys.modules or os.getenv('CI', False):
        testing = True

    def _impl():
        try:
            r = Request('http://emma-project.org/versions.json',
                        headers={'User-Agent': 'molpx-{molpx_version}-Py-{python_version}-{platform}-{addr}'
                        .format(molpx_version=current, python_version=platform.python_version(),
                                platform=platform.platform(terse=True), addr=uuid.getnode())} if not testing else {})
            with closing(urlopen(r, timeout=30)):
                pass
            """
            versions = json.loads(payload)
            latest_json = tuple(filter(lambda x: x['latest'], versions))[0]['version']
            latest = parse(latest_json)
            if parse(current) < latest:
                import warnings
                warnings.warn("You are not using the latest release of PyEMMA."
                              " Latest is {latest}, you have {current}."
                              .format(latest=latest, current=current), category=UserWarning)
            """
            # TODO add warnings about versions
        except Exception:
            pass
            """
            import logging
            logging.getLogger('pyemma').debug("error during version check", exc_info=True)
            """
            # TODO add loggers

    return threading.Thread(target=_impl if _report_status() and not testing else lambda: '')
项目:kobo    作者:release-engineering    | 项目源码 | 文件源码
def _single_request(self, host, handler, request_body, verbose=0):
        # issue XML-RPC request

        request_url = "%s://%s/" % (self.scheme, host)
        cookie_request = urllib2.Request(request_url)

        h = self.make_connection(host)
        self.verbose = verbose
        if verbose:
            h.set_debuglevel(1)

        try:
            self.send_request(h, handler, request_body)
            self.send_host(h, host)
            self.send_cookies(h, cookie_request)
            self.send_user_agent(h)
            self.send_content(h, request_body)

            response = h.getresponse(buffering=True)

            if response.status == 401 and USE_KERBEROS:
                vc, challenge = self._kerberos_client_request(host, handler, response.status, response.reason, response.msg)

                # discard any response data
                if (response.getheader("content-length", 0)):
                    response.read()

                # retry the original request & add the Authorization header:
                self.send_request(h, handler, request_body)
                self._extra_headers = [("Authorization", "Negotiate %s" % challenge)]
                self.send_host(h, host)
                self.send_user_agent(h)
                self.send_content(h, request_body)
                self._extra_headers = []
                response = h.getresponse(buffering=True)
                self._kerberos_verify_response(vc, host, handler, response.status, response.reason, response.msg)

            if response.status == 200:
                self.verbose = verbose
                self._save_cookies(response.msg, cookie_request)
                return self.parse_response(response)
        except xmlrpclib.Fault:
            raise
        except Exception:
            # All unexpected errors leave connection in
            # a strange state, so we clear it.
            self.close()
            raise

        # discard any response data and raise exception
        if (response.getheader("content-length", 0)):
            response.read()
        raise xmlrpclib.ProtocolError(host + handler, response.status, response.reason, response.msg)

    # override the appropriate request method