我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.request.build_opener()。
def download(self, url, retry_count=3, headers=None, proxy=None, data=None): if url is None: return None try: req = request.Request(url, headers=headers, data=data) cookie = cookiejar.CookieJar() cookie_process = request.HTTPCookieProcessor(cookie) opener = request.build_opener() if proxy: proxies = {urlparse(url).scheme: proxy} opener.add_handler(request.ProxyHandler(proxies)) content = opener.open(req).read() except error.URLError as e: print('HtmlDownLoader download error:', e.reason) content = None if retry_count > 0: if hasattr(e, 'code') and 500 <= e.code < 600: #??? HTTPError ??? HTTP CODE ? 5XX ??????????????????? return self.download(url, retry_count-1, headers, proxy, data) return content
def get_response(url, faker = False): logging.debug('get_response: %s' % url) # install cookies if cookies: opener = request.build_opener(request.HTTPCookieProcessor(cookies)) request.install_opener(opener) if faker: response = request.urlopen(request.Request(url, headers = fake_headers), None) else: response = request.urlopen(url) data = response.read() if response.info().get('Content-Encoding') == 'gzip': data = ungzip(data) elif response.info().get('Content-Encoding') == 'deflate': data = undeflate(data) response.data = data return response # DEPRECATED in favor of get_content()
def youtube(self, ctx, *, ytsearch: str): """Does a little YouTube search.""" opener = request.build_opener() opener.addheaders = [('User-agent', 'Mozilla/5.0')] search = ytsearch.split() search = "+".join(search) errorthing = ytsearch url = ('https://www.youtube.com/results?search_query={}'.format(search)) ourUrl = opener.open(url).read() await self.bot.type() soup = bs(ourUrl, "html.parser") alexpls = re.findall('"(/watch\?v=.*?)"', str(soup.find_all('a', attrs={'href': re.compile('^/watch\?v=.*')}))) try: await self.bot.say('{}: https://www.youtube.com{}'.format(ctx.message.author.mention, alexpls[0])) except IndexError: await self.bot.say('Sorry I could not find any results containing the name `{}`'.format(errorthing))
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False): if (timeout is not None) and not self.supports_feature('timeout'): raise RuntimeError('timeout is not supported with urllib2 transport') if proxy: raise RuntimeError('proxy is not supported with urllib2 transport') if cacert: raise RuntimeError('cacert is not support with urllib2 transport') handlers = [] if ((sys.version_info[0] == 2 and sys.version_info >= (2,7,9)) or (sys.version_info[0] == 3 and sys.version_info >= (3,2,0))): context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE handlers.append(urllib2.HTTPSHandler(context=context)) if sessions: handlers.append(urllib2.HTTPCookieProcessor(CookieJar())) opener = urllib2.build_opener(*handlers) self.request_opener = opener.open self._timeout = timeout
def follow_redirects(link, sites= None): """Follow directs for the link as long as the redirects are on the given sites and return the resolved link.""" def follow(url): return sites == None or urlparse.urlparse(url).hostname in sites class RedirectHandler(urllib2.HTTPRedirectHandler): def __init__(self): self.last_url = None def redirect_request(self, req, fp, code, msg, hdrs, newurl): self.last_url = newurl if not follow(newurl): return None r = urllib2.HTTPRedirectHandler.redirect_request( self, req, fp, code, msg, hdrs, newurl) r.get_method = lambda : 'HEAD' return r if not follow(link): return link redirect_handler = RedirectHandler() opener = urllib2.build_opener(redirect_handler) req = urllib2.Request(link) req.get_method = lambda : 'HEAD' try: with contextlib.closing(opener.open(req,timeout=1)) as site: return site.url except: return redirect_handler.last_url if redirect_handler.last_url else link
def __init__(self): self.articles = [] self.query = None self.cjar = MozillaCookieJar() # If we have a cookie file, load it: if ScholarConf.COOKIE_JAR_FILE and \ os.path.exists(ScholarConf.COOKIE_JAR_FILE): try: self.cjar.load(ScholarConf.COOKIE_JAR_FILE, ignore_discard=True) ScholarUtils.log('info', 'loaded cookies file') except Exception as msg: ScholarUtils.log('warn', 'could not load cookies file: %s' % msg) self.cjar = MozillaCookieJar() # Just to be safe self.opener = build_opener(HTTPCookieProcessor(self.cjar)) self.settings = None # Last settings object, if any
def get_access_token(self, code, state=None): ''' In callback url: http://host/callback?code=123&state=xyz use code and state to get an access token. ''' kw = dict(client_id=self._client_id, client_secret=self._client_secret, code=code) if self._redirect_uri: kw['redirect_uri'] = self._redirect_uri if state: kw['state'] = state opener = build_opener(HTTPSHandler) request = Request('https://github.com/login/oauth/access_token', data=_encode_params(kw)) request.get_method = _METHOD_MAP['POST'] request.add_header('Accept', 'application/json') try: response = opener.open(request, timeout=TIMEOUT) r = _parse_json(response.read()) if 'error' in r: raise ApiAuthError(str(r.error)) return str(r.access_token) except HTTPError as e: raise ApiAuthError('HTTPError when get access token')
def cookie_friendly_download(referer_url, file_url, store_dir='.', timeout=1000): from http.cookiejar import CookieJar from urllib import request cj = CookieJar() cp = request.HTTPCookieProcessor(cj) opener = request.build_opener(cp) with opener.open(referer_url) as fin: fin.headers.items() import os from os import path with opener.open(file_url, timeout=timeout) as fin: file_bin = fin.read() filename = fin.headers['Content-Disposition'] filename = filename.split(';')[-1].split('=')[1] os.makedirs(store_dir, exist_ok=True) with open(path.join(store_dir, filename), mode='wb') as fout: fout.write(file_bin) return path.join(store_dir, filename)
def get_req(self, start_size, end_size): '''??socket''' logger.debug('DownloadBatch.get_req: %s, %s' % (start_size, end_size)) opener = request.build_opener() content_range = 'bytes={0}-{1}'.format(start_size, end_size) opener.addheaders = [ ('Range', content_range), ('User-Agent', const.USER_AGENT), ('Referer', const.PAN_REFERER), ] for i in range(RETRIES): try: return opener.open(self.url, timeout=self.timeout) except OSError: logger.error(traceback.format_exc()) self.queue.put((self.id_, BATCH_ERROR), block=False) return None except: self.queue.put((self.id_, BATCH_ERROR), block=False) return None else: return None
def authenticate(top_level_url=u'https://api.github.com'): try: if 'GH_AUTH_USER' not in os.environ: try: username = raw_input(u'Username: ') except NameError: username = input(u'Username: ') else: username = os.environ['GH_AUTH_USER'] if 'GH_AUTH_PASS' not in os.environ: password = getpass.getpass(u'Password: ') else: password = os.environ['GH_AUTH_USER'] except KeyboardInterrupt: sys.exit(u'') try: import urllib.request as urllib_alias except ImportError: import urllib2 as urllib_alias password_mgr = urllib_alias.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None, top_level_url, username, password) handler = urllib_alias.HTTPBasicAuthHandler(password_mgr) opener = urllib_alias.build_opener(handler) urllib_alias.install_opener(opener)
def supports_site(url): """ Rss Crawler are supported if by every site containing an rss feed. Determines if this crawler works on the given url. :param str url: The url to test :return bool: Determines wether this crawler work on the given url """ # Follow redirects opener = urllib2.build_opener(urllib2.HTTPRedirectHandler) redirect = opener.open(url).url response = urllib2.urlopen(redirect).read() # Check if a standard rss feed exists return re.search( r'(<link[^>]*href[^>]*type ?= ?"application\/rss\+xml"|' + r'<link[^>]*type ?= ?"application\/rss\+xml"[^>]*href)', response.decode('utf-8')) is not None
def main(which_days): for day in which_days: day_input_file = os.path.join(root_dir, 'input_{0:02d}.txt'.format(day)) if not os.path.exists(day_input_file): session_token = os.environ.get("AOC_SESSION_TOKEN") if session_token is None: raise ValueError("Must set AOC_SESSION_TOKEN environment variable!") url = 'https://adventofcode.com/2016/day/{0}/input'.format(day) opener = build_opener() opener.addheaders.append(('Cookie', 'session={0}'.format(session_token))) response = opener.open(url) with open(day_input_file, 'w') as f: f.write(response.read().decode("utf-8")) print("Solutions to Day {0:02d}\n-------------------".format(day)) # Horrible way to run scripts, but I did not want to rewrite old solutions. day_module = __import__('{0:02d}'.format(day)) print('')
def proxyurllib(): print(COLOR_GREEN+'-'*30+COLOR_NONE) #TODO proxy handler=request.ProxyHandler({'http':'http://10.112.5.173:49908'}) ''' proxy_auth_handler = urllib.request.ProxyBasicAuthHandler() proxy_auth_handler.add_password('realm', 'host', 'username', 'password') ''' opener=request.build_opener(handler) request.install_opener(opener) #??opener??urlopen()?????URL opener??????urlopen()????????opener???response= google = request.urlopen('http://www.google.com') print(google.read()) print("?????",request.getproxies()) #proxyurllib() #FIXME ROBOT.TXT??
def make_request(self, method, request_body=None, query_params=None, request_headers=None): method = method.upper() if request_headers: self._set_headers(request_headers) request_body = json.dumps(request_body) if request_body else None query_params = query_params if query_params else None opener = urllib.build_opener() request = urllib.Request(self._build_url(query_params), data=request_body) for key, value in self.request_headers.iteritems(): request.add_header(key, value) request.get_method = lambda: method self._response = opener.open(request) self._set_response(self._response) self._reset()
def make_request(*args): if platform.system() == "Windows": #pragma: no cover sctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) sh = urllib2.HTTPSHandler(debuglevel=0, context=sctx) opener = urllib2.build_opener(sh) else: opener = build_opener() opener.addheaders = [('User-agent', 'Mozilla/5.0' + str(random.randrange(1000000)))] try: return opener.open(*args).read().strip() except Exception as e: try: p = e.read().strip() except: p = e raise Exception(p)
def index(request): if request.method == "GET": try: ssl._create_default_https_context = ssl._create_unverified_context opener = wdf_urllib.build_opener( wdf_urllib.HTTPCookieProcessor(CookieJar())) wdf_urllib.install_opener(opener) except: pass uuid = getUUID() url = 'https://login.weixin.qq.com/qrcode/' + uuid params = { 't': 'webwx', '_': int(time.time()), } request = getRequest(url=url, data=urlencode(params)) response = wdf_urllib.urlopen(request) context = { 'uuid': uuid, 'response': response.read(), 'delyou': '', } return render_to_response('index.html', context)
def send_signal(event, response_status, reason, response_data=None): response_body = json.dumps( { 'Status': response_status, 'Reason': str(reason or 'ReasonCanNotBeNone'), 'PhysicalResourceId': event.get('PhysicalResourceId', event['LogicalResourceId']), 'StackId': event['StackId'], 'RequestId': event['RequestId'], 'LogicalResourceId': event['LogicalResourceId'], 'Data': response_data or {} }, sort_keys=True, ) logging.debug(response_body) opener = build_opener(HTTPHandler) request = Request(event['ResponseURL'], data=response_body) request.add_header('Content-Type', '') request.add_header('Content-Length', len(response_body)) request.get_method = lambda: 'PUT' opener.open(request)
def __init__(self, writing=WRITING_NATIVE, opener=None, retry_times=4, executor=_g_executor, timeout=4, service_urls=('http://translate.google.com',), debug=False): self._DEBUG = debug self._MIN_TASKS_FOR_CONCURRENT = 2 self._opener = opener self._languages = None self._TIMEOUT = timeout if not self._opener: debuglevel = self._DEBUG and 1 or 0 self._opener = build_opener( HTTPHandler(debuglevel=debuglevel), HTTPSHandler(debuglevel=debuglevel)) self._RETRY_TIMES = retry_times self._executor = executor self._writing = writing if _is_sequence(service_urls): self._service_urls = service_urls else: self._service_urls = (service_urls,)
def __init__(self): self.articles = [] self.query = None self.cjar = MozillaCookieJar() # If we have a cookie file, load it: if ScholarConf.COOKIE_JAR_FILE and \ os.path.exists(ScholarConf.COOKIE_JAR_FILE): try: self.cjar.load(ScholarConf.COOKIE_JAR_FILE, ignore_discard=True) print "Using cookie file" ScholarUtils.log('info', 'loaded cookies file') except Exception as msg: print "Ignoring cookie file: %s" % msg ScholarUtils.log('warn', 'could not load cookies file: %s' % msg) self.cjar = MozillaCookieJar() # Just to be safe self.opener = build_opener(HTTPCookieProcessor(self.cjar)) self.settings = None # Last settings object, if any
def __get_cookies(self, req): cookies = cookiejar.CookieJar() handler = request.HTTPCookieProcessor(cookies) opener = request.build_opener(handler) try: with opener.open(req) as f: if f.code == 200: pattern = re.compile(r"<input.*?type='hidden'.*?name='csrfmiddlewaretoken'.*?value='(.*?)'.*>") try: self.csrfmiddlewaretoken = pattern.search(f.read().decode("utf-8")).group(1) print("Achieved cookies and csrfmiddlewaretoken sucessfully") except: print("Achieved cookies sucessfully") return cookies else: print("Lost cookies") except error.URLError as e: if hasattr(e, "reason"): print ("We failed to reach a server. Please check your url and read the Reason") print ("Reason: {}".format(e.reason)) elif hasattr(e, "code"): print("The server couldn't fulfill the request.") print("Error code: {}".format(e.code)) exit()
def __init__(self, server, port, username, password): """ Connection Class init call """ self.server = server self.port = port self.username = username self.password = password self.url = 'https://{0}:{1}'.format(self.server,self.port) self.api = '/api/1.1/xml' self.authtoken = '' self.response = None self.sync_id = '' #force urllib2 to not use a proxy proxy_handler = urllib2.ProxyHandler({}) opener = urllib2.build_opener(proxy_handler) urllib2.install_opener(opener) self.login() #Gets called in __init__
def default_urllib2_opener(config): if config is not None: proxy_server = config.get("http", "proxy") else: proxy_server = None handlers = [] if proxy_server is not None: handlers.append(urllib2.ProxyHandler({"http": proxy_server})) opener = urllib2.build_opener(*handlers) if config is not None: user_agent = config.get("http", "useragent") else: user_agent = None if user_agent is None: user_agent = default_user_agent_string() opener.addheaders = [('User-agent', user_agent)] return opener
def make_request(*args): opener = build_opener() opener.addheaders = [('User-agent', 'Mozilla/5.0'+str(random.randrange(1000000)))] try: return opener.open(*args).read().strip() except Exception as e: try: p = e.read().strip() except: p = e raise Exception(p)
def login(self, username, pwd, cookie_file): """" Login with use name, password and cookies. (1) If cookie file exists then try to load cookies; (2) If no cookies found then do login """ # If cookie file exists then try to load cookies if os.path.exists(cookie_file): try: cookie_jar = cookielib.LWPCookieJar(cookie_file) cookie_jar.load(ignore_discard=True, ignore_expires=True) loaded = 1 except cookielib.LoadError: loaded = 0 LOG.info('Loading cookies error') # install loaded cookies for urllib2 if loaded: cookie_support = urllib2.HTTPCookieProcessor(cookie_jar) opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler) urllib2.install_opener(opener) LOG.info('Loading cookies success') return 1 else: return self.do_login(username, pwd, cookie_file) else: # If no cookies found return self.do_login(username, pwd, cookie_file)
def save_cookie(self, text, cookie_file=CONF.cookie_file): cookie_jar2 = cookielib.LWPCookieJar() cookie_support2 = urllib2.HTTPCookieProcessor(cookie_jar2) opener2 = urllib2.build_opener(cookie_support2, urllib2.HTTPHandler) urllib2.install_opener(opener2) if six.PY3: text = text.decode('gbk') p = re.compile('location\.replace\(\'(.*?)\'\)') # ???httpfox?????????????? # location.replace('http://weibo.com ????????? # ?????????????# ????login_url?? ??????re????? # p = re.compile('location\.replace\(\B'(.*?)'\B\)') # ??? ??????? re?????\'??????? try: # Search login redirection URL login_url = p.search(text).group(1) data = urllib2.urlopen(login_url).read() # Verify login feedback, check whether result is TRUE patt_feedback = 'feedBackUrlCallBack\((.*)\)' p = re.compile(patt_feedback, re.MULTILINE) feedback = p.search(data).group(1) feedback_json = json.loads(feedback) if feedback_json['result']: cookie_jar2.save(cookie_file, ignore_discard=True, ignore_expires=True) return 1 else: return 0 except: return 0
def login(self, username, pwd, cookie_file): """" Login with use name, password and cookies. (1) If cookie file exists then try to load cookies; (2) If no cookies found then do login """ # If cookie file exists then try to load cookies if os.path.exists(cookie_file): try: cookie_jar = cookielib.LWPCookieJar(cookie_file) cookie_jar.load(ignore_discard=True, ignore_expires=True) loaded = 1 except cookielib.LoadError: loaded = 0 print('Loading cookies error') #install loaded cookies for urllib2 if loaded: cookie_support = urllib2.HTTPCookieProcessor(cookie_jar) opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler) urllib2.install_opener(opener) print('Loading cookies success') return 1 else: return self.do_login(username, pwd, cookie_file) else: #If no cookies found return self.do_login(username, pwd, cookie_file)
def request(self, host, handler, request_body, verbose=0): """Send XMLRPC request""" uri = '{scheme}://{host}{handler}'.format(scheme=self._scheme, host=host, handler=handler) if self._passmgr: self._passmgr.add_password(None, uri, self._username, self._password) if self.verbose: _LOGGER.debug("FabricTransport: {0}".format(uri)) opener = urllib2.build_opener(*self._handlers) headers = { 'Content-Type': 'text/xml', 'User-Agent': self.user_agent, } req = urllib2.Request(uri, request_body, headers=headers) try: return self.parse_response(opener.open(req)) except (urllib2.URLError, urllib2.HTTPError) as exc: try: code = -1 if exc.code == 400: reason = 'Permission denied' code = exc.code else: reason = exc.reason msg = "{reason} ({code})".format(reason=reason, code=code) except AttributeError: if 'SSL' in str(exc): msg = "SSL error" else: msg = str(exc) raise InterfaceError("Connection with Fabric failed: " + msg) except BadStatusLine: raise InterfaceError("Connection with Fabric failed: check SSL")
def _request(self, url, method='GET', data=None): url = self._auth.endpoint + url headers = self._auth.headers if data is not None: data = urlencode(data) if method in ['GET', 'DELETE']: url = url + '?' + data data = None else: headers.update({'Content-Type': POST_CONTENT_TYPE}) if sys.version_info > (3,): # python3 data = data.encode('utf-8') log.debug(method + ' ' + url) log.debug(data) try: opener = build_opener(HTTPHandler) request = Request(url, data=data, headers=headers) request.get_method = lambda: method response = opener.open(request).read() data = self._parse_response(response) except HTTPError as e: log.error(e) data = self._parse_response(e.read()) raise ApiHandlerError('Invalid server response', data) except ValueError as e: log.error(e) raise ApiHandlerError('Invalid server response') return data
def build_opener(): cookie = http.cookiejar.CookieJar() cookie_processor = request.HTTPCookieProcessor(cookie) opener = request.build_opener(cookie_processor) opener.addheaders = [("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1"), ("Referer", "https://passport.weibo.cn"), ("Origin", "https://passport.weibo.cn"), ("Host", "passport.weibo.cn")] request.install_opener(opener) #??
def build_opener(): cookie = http.cookiejar.CookieJar() cookie_processor = request.HTTPCookieProcessor(cookie) opener = request.build_opener(cookie_processor) opener.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:49.0) Gecko/20100101 Firefox/49.0"), ("Referer", "http://cn.v2ex.com/signin"), ("Origin", "http://cn.v2ex.com"), ("Host", "cn.v2ex.com")] request.install_opener(opener)
def build_opener(): cookie = http.cookiejar.CookieJar() cookie_processor = request.HTTPCookieProcessor(cookie) opener = request.build_opener(cookie_processor) opener.addheaders = [("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36"), ("Referer", "https://wx.qq.com/"), ("Origin", "https://wx.qq.com/"), ("Host", "wx.qq.com")] request.install_opener(opener) #??uuid
def build_opener(): cookie = http.cookiejar.CookieJar() cookie_processor = request.HTTPCookieProcessor(cookie) opener = request.build_opener(cookie_processor) opener.addheaders = [("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1"), ("Referer", "https://www.zhihu.com/"), ("Origin", "https://www.zhihu.com/"), ("Host", "www.zhihu.com")] request.install_opener(opener)
def getUrllibOpener(): if pythonVersion > 3.0: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE opener = urllib2.build_opener(urllib2.HTTPSHandler(context=ctx)) opener.addheaders = [('Content-Type', 'application/json'),('User-Agent', 'vulners-getsploit-v%s' % __version__)] else: opener = urllib2.build_opener(urllib2.HTTPSHandler()) opener.addheaders = [('Content-Type', 'application/json'), ('User-Agent', 'vulners-getsploit-v%s' % __version__)] return opener
def hashed_download(url, temp, digest): """Download ``url`` to ``temp``, make sure it has the SHA-256 ``digest``, and return its path.""" # Based on pip 1.4.1's URLOpener but with cert verification removed. Python # >=2.7.9 verifies HTTPS certs itself, and, in any case, the cert # authenticity has only privacy (not arbitrary code execution) # implications, since we're checking hashes. def opener(): opener = build_opener(HTTPSHandler()) # Strip out HTTPHandler to prevent MITM spoof: for handler in opener.handlers: if isinstance(handler, HTTPHandler): opener.handlers.remove(handler) return opener def read_chunks(response, chunk_size): while True: chunk = response.read(chunk_size) if not chunk: break yield chunk response = opener().open(url) path = join(temp, urlparse(url).path.split('/')[-1]) actual_hash = sha256() with open(path, 'wb') as file: for chunk in read_chunks(response, 4096): file.write(chunk) actual_hash.update(chunk) actual_digest = actual_hash.hexdigest() if actual_digest != digest: raise HashError(url, path, actual_digest, digest) return path
def send(event, context, response_status, reason=None, response_data=None, physical_resource_id=None): response_data = response_data or {} response_body = json.dumps( { 'Status': response_status, 'Reason': reason or "See the details in CloudWatch Log Stream: " + context.log_stream_name, 'PhysicalResourceId': physical_resource_id or context.log_stream_name, 'StackId': event['StackId'], 'RequestId': event['RequestId'], 'LogicalResourceId': event['LogicalResourceId'], 'Data': response_data } ) opener = build_opener(HTTPHandler) request = Request(event['ResponseURL'], data=response_body) request.add_header('Content-Type', '') request.add_header('Content-Length', len(response_body)) request.get_method = lambda: 'PUT' try: response = opener.open(request) print("Status code: {}".format(response.getcode())) print("Status message: {}".format(response.msg)) return True except HTTPError as exc: print("Failed executing HTTP request: {}".format(exc.code)) return False
def __init__(self): self.DEBUG = False self.appid = 'wx782c26e4c19acffb' self.uuid = '' self.base_uri = '' self.redirect_uri = '' self.uin = '' self.sid = '' self.skey = '' self.pass_ticket = '' self.deviceId = 'e' + repr(random.random())[2:17] self.BaseRequest = {} self.synckey = '' self.SyncKey = [] self.User = [] self.MemberList = [] self.ContactList = [] self.GroupList = [] self.autoReplyMode = False self.syncHost = '' self._handlers = dict((k, []) for k in self.message_types) self._handlers['location'] = [] self._handlers['all'] = [] self._filters = dict() opener = request.build_opener(request.HTTPCookieProcessor(CookieJar())) opener.addheaders = [('User-agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.109 Safari/537.36'), ('Referer','https://wx2.qq.com/')] request.install_opener(opener)
def __init__(self, secure: bool=True) -> None: transmissionrpc.HTTPHandler.__init__(self) self.http_opener = build_opener() self.auth: DataDict = None self.secure = secure
def set_authentication(self, uri: str, login: str, password: str) -> None: if self.secure: context = ssl.create_default_context() else: context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) context.verify_mode = ssl.CERT_NONE context.check_hostname = False self.http_opener = build_opener(HTTPSHandler(context=context)) self.auth = {'Authorization': 'Basic %s' % b64encode(str.encode(login + ":" + password)).decode('utf-8')}
def getFile(cls, getfile, unpack=True): if cls.getProxy(): proxy = req.ProxyHandler({'http': cls.getProxy(), 'https': cls.getProxy()}) auth = req.HTTPBasicAuthHandler() opener = req.build_opener(proxy, auth, req.HTTPHandler) req.install_opener(opener) try: response = req.urlopen(getfile) except: msg = "[!] Could not fetch file %s"%getfile if cls.exitWhenNoSource(): sys.exit(msg) else: print(msg) data = None data = response.read() # TODO: if data == text/plain; charset=utf-8, read and decode if unpack: if 'gzip' in response.info().get('Content-Type'): data = gzip.GzipFile(fileobj = BytesIO(data)) elif 'bzip2' in response.info().get('Content-Type'): data = BytesIO(bz2.decompress(data)) elif 'zip' in response.info().get('Content-Type'): fzip = zipfile.ZipFile(BytesIO(data), 'r') if len(fzip.namelist())>0: data=BytesIO(fzip.read(fzip.namelist()[0])) # In case the webserver is being generic elif 'application/octet-stream' in response.info().get('Content-Type'): if data[:4] == b'PK\x03\x04': # Zip fzip = zipfile.ZipFile(BytesIO(data), 'r') if len(fzip.namelist())>0: data=BytesIO(fzip.read(fzip.namelist()[0])) return (data, response)
def test_urllib_request_opener(self): """ When making a request via urllib.request.OpenerDirector we return the original response we capture a span for the request """ opener = build_opener() with override_global_tracer(self.tracer): resp = opener.open(URL_200) self.assertEqual(self.to_str(resp.read()), '') self.assertEqual(resp.getcode(), 200) spans = self.tracer.writer.pop() self.assertEqual(len(spans), 1) span = spans[0] self.assertEqual(span.span_type, 'http') self.assertIsNone(span.service) self.assertEqual(span.name, self.SPAN_NAME) self.assertEqual(span.error, 0) self.assertEqual(span.get_tag('http.method'), 'GET') self.assertEqual(span.get_tag('http.status_code'), '200') self.assertEqual(span.get_tag('http.url'), URL_200) # Additional Python2 test cases for urllib
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False): if (timeout is not None) and not self.supports_feature('timeout'): raise RuntimeError('timeout is not supported with urllib2 transport') if proxy: raise RuntimeError('proxy is not supported with urllib2 transport') if cacert: raise RuntimeError('cacert is not support with urllib2 transport') self.request_opener = urllib2.urlopen if sessions: opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(CookieJar())) self.request_opener = opener.open self._timeout = timeout
def create_cookie_opener(self): ''' ????Cookie :return: ????????opener ''' cookie = cookiejar.CookieJar() cookie_process = request.HTTPCookieProcessor(cookie) opener = request.build_opener(cookie_process) return opener