def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False): if (timeout is not None) and not self.supports_feature('timeout'): raise RuntimeError('timeout is not supported with urllib2 transport') if proxy: raise RuntimeError('proxy is not supported with urllib2 transport') if cacert: raise RuntimeError('cacert is not support with urllib2 transport') handlers = [] if ((sys.version_info[0] == 2 and sys.version_info >= (2,7,9)) or (sys.version_info[0] == 3 and sys.version_info >= (3,2,0))): context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE handlers.append(urllib2.HTTPSHandler(context=context)) if sessions: handlers.append(urllib2.HTTPCookieProcessor(CookieJar())) opener = urllib2.build_opener(*handlers) self.request_opener = opener.open self._timeout = timeout
def get_access_token(self, code, state=None): ''' In callback url: http://host/callback?code=123&state=xyz use code and state to get an access token. ''' kw = dict(client_id=self._client_id, client_secret=self._client_secret, code=code) if self._redirect_uri: kw['redirect_uri'] = self._redirect_uri if state: kw['state'] = state opener = build_opener(HTTPSHandler) request = Request('https://github.com/login/oauth/access_token', data=_encode_params(kw)) request.get_method = _METHOD_MAP['POST'] request.add_header('Accept', 'application/json') try: response = opener.open(request, timeout=TIMEOUT) r = _parse_json(response.read()) if 'error' in r: raise ApiAuthError(str(r.error)) return str(r.access_token) except HTTPError as e: raise ApiAuthError('HTTPError when get access token')
def u2handlers(self): """Get a collection of urllib handlers. """ handlers = suds.transport.http.HttpTransport.u2handlers(self) if self.ssl_context: try: handlers.append(HTTPSHandler(context=self.ssl_context, check_hostname=self.verify)) except TypeError: # Python 2.7.9 HTTPSHandler does not accept the # check_hostname keyword argument. # # Note that even older Python versions would also # croak on the context keyword argument. But these # old versions do not have SSLContext either, so we # will not end up here in the first place. handlers.append(HTTPSHandler(context=self.ssl_context)) return handlers
def __init__(self, ssl_config): # pylint: disable=E1002 """Initialize""" if PY2: urllib2.HTTPSHandler.__init__(self) else: super().__init__() # pylint: disable=W0104 self._ssl_config = ssl_config
def hashed_download(url, temp, digest): """Download ``url`` to ``temp``, make sure it has the SHA-256 ``digest``, and return its path.""" # Based on pip 1.4.1's URLOpener but with cert verification removed. Python # >=2.7.9 verifies HTTPS certs itself, and, in any case, the cert # authenticity has only privacy (not arbitrary code execution) # implications, since we're checking hashes. def opener(): opener = build_opener(HTTPSHandler()) # Strip out HTTPHandler to prevent MITM spoof: for handler in opener.handlers: if isinstance(handler, HTTPHandler): opener.handlers.remove(handler) return opener def read_chunks(response, chunk_size): while True: chunk = response.read(chunk_size) if not chunk: break yield chunk response = opener().open(url) path = join(temp, urlparse(url).path.split('/')[-1]) actual_hash = sha256() with open(path, 'wb') as file: for chunk in read_chunks(response, 4096): file.write(chunk) actual_hash.update(chunk) actual_digest = actual_hash.hexdigest() if actual_digest != digest: raise HashError(url, path, actual_digest, digest) return path
def __init__(self): urllib2.HTTPSHandler.__init__(self)
def set_authentication(self, uri: str, login: str, password: str) -> None: if self.secure: context = ssl.create_default_context() else: context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) context.verify_mode = ssl.CERT_NONE context.check_hostname = False self.http_opener = build_opener(HTTPSHandler(context=context)) self.auth = {'Authorization': 'Basic %s' % b64encode(str.encode(login + ":" + password)).decode('utf-8')}
def __init__(self, ssl_config): #pylint: disable=E1002 """Initialize""" if PY2: urllib2.HTTPSHandler.__init__(self) else: super().__init__() # pylint: disable=W0104 self._ssl_config = ssl_config
def _http(self, _method, _path, **kw): data = None params = None if _method=='GET' and kw: _path = '%s?%s' % (_path, _encode_params(kw)) if _method in ['POST', 'PATCH', 'PUT']: data = bytes(_encode_json(kw), 'utf-8') url = '%s%s' % (_URL, _path) opener = build_opener(HTTPSHandler) request = Request(url, data=data) request.get_method = _METHOD_MAP[_method] if self._authorization: request.add_header('Authorization', self._authorization) if _method in ['POST', 'PATCH', 'PUT']: request.add_header('Content-Type', 'application/x-www-form-urlencoded') try: response = opener.open(request, timeout=TIMEOUT) is_json = self._process_resp(response.headers) if is_json: return _parse_json(response.read().decode('utf-8')) except HTTPError as e: is_json = self._process_resp(e.headers) if is_json: json = _parse_json(e.read().decode('utf-8')) else: json = e.read().decode('utf-8') req = JsonObject(method=_method, url=url) resp = JsonObject(code=e.code, json=json) if resp.code==404: raise ApiNotFoundError(url, req, resp) raise ApiError(url, req, resp)
def api_request_native(url, data=None, token=None, https_proxy=None, method=None): request = urllib.Request(url) # print('API request url:', request.get_full_url()) if method: request.get_method = lambda: method token = token if token != None else token_auth_string() request.add_header('Authorization', 'token ' + token) request.add_header('Accept', 'application/json') request.add_header('Content-Type', 'application/json') if data is not None: request.add_data(bytes(data.encode('utf8'))) # print('API request data:', request.get_data()) # print('API request header:', request.header_items()) # https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy') # if https_proxy: # opener = urllib.build_opener(urllib.HTTPHandler(), urllib.HTTPSHandler(), # urllib.ProxyHandler({'https': https_proxy})) # urllib.install_opener(opener) try: with contextlib.closing(urllib.urlopen(request)) as response: if response.code == 204: # No Content return None else: return json.loads(response.read().decode('utf8', 'ignore')) except urllib.HTTPError as err: with contextlib.closing(err): raise SimpleHTTPError(err.code, err.read())
def get_access_token(self, code, state=None): ''' In callback url: http://host/callback?code=123&state=xyz use code and state to get an access token. ''' kw = dict( client_id=self._client_id, client_secret=self._client_secret, code=code) if self._redirect_uri: kw['redirect_uri'] = self._redirect_uri if state: kw['state'] = state opener = build_opener(HTTPSHandler) request = Request( 'https://github.com/login/oauth/access_token', data=_encode_params(kw)) request.get_method = _METHOD_MAP['POST'] request.add_header('Accept', 'application/json') try: response = opener.open(request, timeout=TIMEOUT) r = _parse_json(response.read()) if 'error' in r: raise ApiAuthError(str(r.error)) return str(r.access_token) except HTTPError as e: raise ApiAuthError('HTTPError when get access token')
def _http(self, _method, _path, **kw): data = None params = None if _method == 'GET' and kw: _path = '%s?%s' % (_path, _encode_params(kw)) if _method in ['POST', 'PATCH', 'PUT']: data = bytes(_encode_json(kw), 'utf-8') url = '%s%s' % (_URL, _path) opener = build_opener(HTTPSHandler) request = Request(url, data=data) request.get_method = _METHOD_MAP[_method] if self._authorization: request.add_header('Authorization', self._authorization) if _method in ['POST', 'PATCH', 'PUT']: request.add_header('Content-Type', 'application/x-www-form-urlencoded') class Resp(): code = None resp = Resp() req = None try: response = opener.open(request, timeout=TIMEOUT) is_json = self._process_resp(response.headers) if is_json: return _parse_json(response.read().decode('utf-8')) except HTTPError as e: is_json = self._process_resp(e.headers) if is_json: json = _parse_json(e.read().decode('utf-8')) else: json = e.read().decode('utf-8') req = JsonObject(method=_method, url=url) resp = JsonObject(code=e.code, json=json) finally: if resp.code == 404: raise ApiNotFoundError(url, req, resp) elif req: raise ApiError(url, req, resp)
def __init__(self, username="admin", password="admin", hostname=""): self.username = username self.password = password self.hostname = hostname self.baseurl = "https://" + self.hostname try: self.gcontext = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) except AttributeError: try: self.gcontext = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1) except AttributeError: self.gcontext = ssl.SSLContext(ssl.PROTOCOL_TLSv1) self.cj = CookieJar() self.opener = build_opener(HTTPCookieProcessor(self.cj), HTTPSHandler(context=self.gcontext)) self.loginresponse = self._login()
def __init__(self, client_cert=None, client_key=None, **kwargs): urllib_request.HTTPSHandler.__init__(self, **kwargs) self.client_cert = client_cert self.client_key = client_key