我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用urllib.request.HTTPHandler()。
def send_signal(event, response_status, reason, response_data=None): response_body = json.dumps( { 'Status': response_status, 'Reason': str(reason or 'ReasonCanNotBeNone'), 'PhysicalResourceId': event.get('PhysicalResourceId', event['LogicalResourceId']), 'StackId': event['StackId'], 'RequestId': event['RequestId'], 'LogicalResourceId': event['LogicalResourceId'], 'Data': response_data or {} }, sort_keys=True, ) logging.debug(response_body) opener = build_opener(HTTPHandler) request = Request(event['ResponseURL'], data=response_body) request.add_header('Content-Type', '') request.add_header('Content-Length', len(response_body)) request.get_method = lambda: 'PUT' opener.open(request)
def __init__(self, *args, **kwargs): self.args = args self.kw = kwargs urllib2.HTTPHandler.__init__(self)
def login(self, username, pwd, cookie_file): """" Login with use name, password and cookies. (1) If cookie file exists then try to load cookies; (2) If no cookies found then do login """ # If cookie file exists then try to load cookies if os.path.exists(cookie_file): try: cookie_jar = cookielib.LWPCookieJar(cookie_file) cookie_jar.load(ignore_discard=True, ignore_expires=True) loaded = 1 except cookielib.LoadError: loaded = 0 LOG.info('Loading cookies error') # install loaded cookies for urllib2 if loaded: cookie_support = urllib2.HTTPCookieProcessor(cookie_jar) opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler) urllib2.install_opener(opener) LOG.info('Loading cookies success') return 1 else: return self.do_login(username, pwd, cookie_file) else: # If no cookies found return self.do_login(username, pwd, cookie_file)
def save_cookie(self, text, cookie_file=CONF.cookie_file): cookie_jar2 = cookielib.LWPCookieJar() cookie_support2 = urllib2.HTTPCookieProcessor(cookie_jar2) opener2 = urllib2.build_opener(cookie_support2, urllib2.HTTPHandler) urllib2.install_opener(opener2) if six.PY3: text = text.decode('gbk') p = re.compile('location\.replace\(\'(.*?)\'\)') # ???httpfox?????????????? # location.replace('http://weibo.com ????????? # ?????????????# ????login_url?? ??????re????? # p = re.compile('location\.replace\(\B'(.*?)'\B\)') # ??? ??????? re?????\'??????? try: # Search login redirection URL login_url = p.search(text).group(1) data = urllib2.urlopen(login_url).read() # Verify login feedback, check whether result is TRUE patt_feedback = 'feedBackUrlCallBack\((.*)\)' p = re.compile(patt_feedback, re.MULTILINE) feedback = p.search(data).group(1) feedback_json = json.loads(feedback) if feedback_json['result']: cookie_jar2.save(cookie_file, ignore_discard=True, ignore_expires=True) return 1 else: return 0 except: return 0
def login(self, username, pwd, cookie_file): """" Login with use name, password and cookies. (1) If cookie file exists then try to load cookies; (2) If no cookies found then do login """ # If cookie file exists then try to load cookies if os.path.exists(cookie_file): try: cookie_jar = cookielib.LWPCookieJar(cookie_file) cookie_jar.load(ignore_discard=True, ignore_expires=True) loaded = 1 except cookielib.LoadError: loaded = 0 print('Loading cookies error') #install loaded cookies for urllib2 if loaded: cookie_support = urllib2.HTTPCookieProcessor(cookie_jar) opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler) urllib2.install_opener(opener) print('Loading cookies success') return 1 else: return self.do_login(username, pwd, cookie_file) else: #If no cookies found return self.do_login(username, pwd, cookie_file)
def _request(self, url, method='GET', data=None): url = self._auth.endpoint + url headers = self._auth.headers if data is not None: data = urlencode(data) if method in ['GET', 'DELETE']: url = url + '?' + data data = None else: headers.update({'Content-Type': POST_CONTENT_TYPE}) if sys.version_info > (3,): # python3 data = data.encode('utf-8') log.debug(method + ' ' + url) log.debug(data) try: opener = build_opener(HTTPHandler) request = Request(url, data=data, headers=headers) request.get_method = lambda: method response = opener.open(request).read() data = self._parse_response(response) except HTTPError as e: log.error(e) data = self._parse_response(e.read()) raise ApiHandlerError('Invalid server response', data) except ValueError as e: log.error(e) raise ApiHandlerError('Invalid server response') return data
def hashed_download(url, temp, digest): """Download ``url`` to ``temp``, make sure it has the SHA-256 ``digest``, and return its path.""" # Based on pip 1.4.1's URLOpener but with cert verification removed. Python # >=2.7.9 verifies HTTPS certs itself, and, in any case, the cert # authenticity has only privacy (not arbitrary code execution) # implications, since we're checking hashes. def opener(): opener = build_opener(HTTPSHandler()) # Strip out HTTPHandler to prevent MITM spoof: for handler in opener.handlers: if isinstance(handler, HTTPHandler): opener.handlers.remove(handler) return opener def read_chunks(response, chunk_size): while True: chunk = response.read(chunk_size) if not chunk: break yield chunk response = opener().open(url) path = join(temp, urlparse(url).path.split('/')[-1]) actual_hash = sha256() with open(path, 'wb') as file: for chunk in read_chunks(response, 4096): file.write(chunk) actual_hash.update(chunk) actual_digest = actual_hash.hexdigest() if actual_digest != digest: raise HashError(url, path, actual_digest, digest) return path
def send(event, context, response_status, reason=None, response_data=None, physical_resource_id=None): response_data = response_data or {} response_body = json.dumps( { 'Status': response_status, 'Reason': reason or "See the details in CloudWatch Log Stream: " + context.log_stream_name, 'PhysicalResourceId': physical_resource_id or context.log_stream_name, 'StackId': event['StackId'], 'RequestId': event['RequestId'], 'LogicalResourceId': event['LogicalResourceId'], 'Data': response_data } ) opener = build_opener(HTTPHandler) request = Request(event['ResponseURL'], data=response_body) request.add_header('Content-Type', '') request.add_header('Content-Length', len(response_body)) request.get_method = lambda: 'PUT' try: response = opener.open(request) print("Status code: {}".format(response.getcode())) print("Status message: {}".format(response.msg)) return True except HTTPError as exc: print("Failed executing HTTP request: {}".format(exc.code)) return False
def getFile(cls, getfile, unpack=True): if cls.getProxy(): proxy = req.ProxyHandler({'http': cls.getProxy(), 'https': cls.getProxy()}) auth = req.HTTPBasicAuthHandler() opener = req.build_opener(proxy, auth, req.HTTPHandler) req.install_opener(opener) try: response = req.urlopen(getfile) except: msg = "[!] Could not fetch file %s"%getfile if cls.exitWhenNoSource(): sys.exit(msg) else: print(msg) data = None data = response.read() # TODO: if data == text/plain; charset=utf-8, read and decode if unpack: if 'gzip' in response.info().get('Content-Type'): data = gzip.GzipFile(fileobj = BytesIO(data)) elif 'bzip2' in response.info().get('Content-Type'): data = BytesIO(bz2.decompress(data)) elif 'zip' in response.info().get('Content-Type'): fzip = zipfile.ZipFile(BytesIO(data), 'r') if len(fzip.namelist())>0: data=BytesIO(fzip.read(fzip.namelist()[0])) # In case the webserver is being generic elif 'application/octet-stream' in response.info().get('Content-Type'): if data[:4] == b'PK\x03\x04': # Zip fzip = zipfile.ZipFile(BytesIO(data), 'r') if len(fzip.namelist())>0: data=BytesIO(fzip.read(fzip.namelist()[0])) return (data, response)