我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用six.moves.urllib.request.Request()。
def user_login(self, username, password): response = self._get_opener().open(CONF.dashboard.dashboard_url).read() # Grab the CSRF token and default region parser = HorizonHTMLParser() parser.feed(response) # construct login url for dashboard, discovery accommodates non-/ web # root for dashboard login_url = parse.urljoin(CONF.dashboard.dashboard_url, parser.login) # Prepare login form request req = request.Request(login_url) req.add_header('Content-type', 'application/x-www-form-urlencoded') req.add_header('Referer', CONF.dashboard.dashboard_url) # Pass the default domain name regardless of the auth version in order # to test the scenario of when horizon is running with keystone v3 params = {'username': username, 'password': password, 'region': parser.region, 'domain': CONF.auth.default_credentials_domain_name, 'csrfmiddlewaretoken': parser.csrf_token} self._get_opener().open(req, parse.urlencode(params))
def create_credentials_on_jenkins(jenkins_api, _uuid): cred_exist = has_credentials_on_jenkins(jenkins_api, _uuid) if cred_exist: return cred_exist headers = {'Content-Type': 'application/x-www-form-urlencoded'} payload = '''json={ "": "0", "credentials": { "scope": "GLOBAL", "id": "%s", "username": "root", "password": "123456", "description": "test", "$class": "com.cloudbees.plugins.credentials.impl.UsernamePasswordCredentialsImpl" } }''' % _uuid url = jenkins_api._build_url( 'credentials/store/system/domain/_/createCredentials' ) request = Request(url, payload, headers) jenkins_api.jenkins_open(request) return has_credentials_on_jenkins(jenkins_api, _uuid)
def _pd_api(self, url, data=None, method='GET'): url = '%s/%s' % (PD_API_BASE, url) request_args = { 'headers': dict(self._pd_headers) } if six.PY3: # pragma: no cover request_args['method'] = method if data is not None: request_args['data'] = json.dumps(data).encode('utf-8') request_args['headers']['Content-Type'] = APPLICATION_JSON request = Request(url, **request_args) if six.PY2: # pragma: no cover request.get_method = lambda: method try: response = urlopen(request) return json.loads(response.read().decode('utf-8')) except HTTPError as e: response = e.read().decode('utf-8') logger.warning("API error: %s", response) if method == 'GET' and e.code == 404: return None else: raise e
def post(api_key, url, data): url = make_url(api_key, url) response = Request(url, headers={'Content-Type': 'application/json'}, data=json.dumps(data)) return json.loads(urlopen(response).read())
def ping_url(self, url): method = self.method.lower() data = None if method == 'get' else '' headers = { 'User-Agent': self.user_agent, } request = Request(url, data=data, headers=headers) try: urlopen(request) except URLError: if not self.fail_silenty: raise
def has_credentials_on_jenkins(jenkins_api, _uuid): headers = {'Content-Type': 'application/json'} path = 'credentials/store/system/domain/_/credential/%(uuid)s/api/json' url = jenkins_api._build_url(path, {'uuid': _uuid}) request = Request(url, headers=headers) try: jenkins_api.jenkins_open(request) return _uuid except jenkins.NotFoundException: return False
def restart_jenkins(jenkins_api): headers = {'Content-Type': 'application/x-www-form-urlencoded'} path = 'restart' url = jenkins_api._build_url(path) payload = '''json={} Submit: Yes ''' request = Request(url, payload, headers) try: jenkins_api.jenkins_open(request) except HTTPError as e: if e.code != 503: raise
def _call(self, base_url, url, body): """Open a network connection and performs HTTP Post with provided body. """ # Combines the "base_url" with the # required "url" to be used for the specific request. url = urljoin(base_url.geturl(), url) headers = {'User-Agent': 'Lightstreamer iOS client/1.2.7'} req = Request(url) for k, v in headers.items(): req.add_header(k, v) return urlopen(req, data=self._encode_params(body))
def disconnect(self): """Request to close the session previously opened with the connect() invocation. """ if self._stream_connection is not None: # Close the HTTP connection self._stream_connection.close() log.debug("Connection closed") print("DISCONNECTED FROM LIGHTSTREAMER") else: log.warning("No connection to Lightstreamer")
def _request(self, host, handler, request_body, verbose=0): """Send a HTTP request.""" h = self.make_connection(host) self.verbose = verbose if verbose: h.set_debuglevel(1) request_url = "%s://%s/" % (self.scheme, host) cookie_request = urllib2.Request(request_url) self.send_request(h, handler, request_body) self.send_host(h, host) self.send_cookies(h, cookie_request) self.send_user_agent(h) self.send_content(h, request_body) errcode, errmsg, headers = h.getreply() if errcode == 401 and USE_KERBEROS: vc, challenge = self._kerberos_client_request(host, handler, errcode, errmsg, headers) # retry the original request & add the Authorization header: self.send_request(h, handler, request_body) self.send_host(h, host) h.putheader("Authorization", "Negotiate %s" % challenge) self.send_cookies(h, cookie_request) self.send_user_agent(h) self.send_content(h, request_body) errcode, errmsg, headers = h.getreply() self._kerberos_verify_response(vc, host, handler, errcode, errmsg, headers) elif errcode != 200: raise xmlrpclib.ProtocolError(host + handler, errcode, errmsg, headers) self._save_cookies(headers, cookie_request) try: sock = h._conn.sock except AttributeError: sock = None return self._parse_response(h.getfile(), sock)
def oauth_request(self, url, method='GET', args={}, headers={}): base_args = { 'oauth_consumer_key': self.oauth_consumer['key'], 'oauth_signature_method': 'HMAC-SHA1', 'oauth_timestamp': oauth_timestamp(), 'oauth_nonce': oauth_nonce(), 'oauth_version': '1.0' } args = args.copy() headers = headers.copy() headers['User-Agent'] = headers.get('User-Agent', 'fanfou-py') if url.startswith('/'): if ':' in url: path, _ = url.split(':') if args.get('id'): url = path + oauth_escape(args['id'], via='quote_plus', safe='') else: url = path[:-1] url = self.base_api_url % url if method == 'POST': headers['Content-Type'] = headers.get('Content-Type', self.form_urlencoded) (headers['Content-Type'] == self.form_urlencoded) and base_args.update(args) else: base_args.update(args) url = url + '?' + oauth_query(args, via='quote_plus', safe='') self.oauth_token and base_args.update({'oauth_token': self.oauth_token['key']}) base_args['oauth_signature'] = self.oauth_signature(url, method, base_args) headers.update(self.oauth_header(base_args)) req = request.Request(url, headers=headers) if headers.get('Content-Type') == self.form_urlencoded: data = oauth_query(args, via='quote_plus', safe='').encode() elif 'form-data' in headers.get('Content-Type', ''): # multipart/form-data data = args['form-data'] else: data = None resp = request.urlopen(req, data=data) resp.json = lambda: json.loads(resp.read().decode() or '""') return resp
def _version_check(current, testing=False): """ checks latest version online from the server and logs some user metadata. Can be disabled by setting config.check_version = False. """ import platform import os from six.moves.urllib.request import urlopen, Request from contextlib import closing import threading import uuid import sys if 'pytest' in sys.modules or os.getenv('CI', False): testing = True def _impl(): try: r = Request('http://emma-project.org/versions.json', headers={'User-Agent': 'molpx-{molpx_version}-Py-{python_version}-{platform}-{addr}' .format(molpx_version=current, python_version=platform.python_version(), platform=platform.platform(terse=True), addr=uuid.getnode())} if not testing else {}) with closing(urlopen(r, timeout=30)): pass """ versions = json.loads(payload) latest_json = tuple(filter(lambda x: x['latest'], versions))[0]['version'] latest = parse(latest_json) if parse(current) < latest: import warnings warnings.warn("You are not using the latest release of PyEMMA." " Latest is {latest}, you have {current}." .format(latest=latest, current=current), category=UserWarning) """ # TODO add warnings about versions except Exception: pass """ import logging logging.getLogger('pyemma').debug("error during version check", exc_info=True) """ # TODO add loggers return threading.Thread(target=_impl if _report_status() and not testing else lambda: '')
def _single_request(self, host, handler, request_body, verbose=0): # issue XML-RPC request request_url = "%s://%s/" % (self.scheme, host) cookie_request = urllib2.Request(request_url) h = self.make_connection(host) self.verbose = verbose if verbose: h.set_debuglevel(1) try: self.send_request(h, handler, request_body) self.send_host(h, host) self.send_cookies(h, cookie_request) self.send_user_agent(h) self.send_content(h, request_body) response = h.getresponse(buffering=True) if response.status == 401 and USE_KERBEROS: vc, challenge = self._kerberos_client_request(host, handler, response.status, response.reason, response.msg) # discard any response data if (response.getheader("content-length", 0)): response.read() # retry the original request & add the Authorization header: self.send_request(h, handler, request_body) self._extra_headers = [("Authorization", "Negotiate %s" % challenge)] self.send_host(h, host) self.send_user_agent(h) self.send_content(h, request_body) self._extra_headers = [] response = h.getresponse(buffering=True) self._kerberos_verify_response(vc, host, handler, response.status, response.reason, response.msg) if response.status == 200: self.verbose = verbose self._save_cookies(response.msg, cookie_request) return self.parse_response(response) except xmlrpclib.Fault: raise except Exception: # All unexpected errors leave connection in # a strange state, so we clear it. self.close() raise # discard any response data and raise exception if (response.getheader("content-length", 0)): response.read() raise xmlrpclib.ProtocolError(host + handler, response.status, response.reason, response.msg) # override the appropriate request method