我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.request.Request()。
def download(self, url, retry_count=3, headers=None, proxy=None, data=None): if url is None: return None try: req = request.Request(url, headers=headers, data=data) cookie = cookiejar.CookieJar() cookie_process = request.HTTPCookieProcessor(cookie) opener = request.build_opener() if proxy: proxies = {urlparse(url).scheme: proxy} opener.add_handler(request.ProxyHandler(proxies)) content = opener.open(req).read() except error.URLError as e: print('HtmlDownLoader download error:', e.reason) content = None if retry_count > 0: if hasattr(e, 'code') and 500 <= e.code < 600: #??? HTTPError ??? HTTP CODE ? 5XX ??????????????????? return self.download(url, retry_count-1, headers, proxy, data) return content
def get_cpi(): """ ???????????? Return -------- DataFrame month :???? cpi :???? """ rdint = vs.random() request = Request(vs.MACRO_URL%(vs.P_TYPE['http'], vs.DOMAINS['sina'], rdint, vs.MACRO_TYPE[1], 0, 600, rdint)) text = urlopen(request,timeout=10).read() text = text.decode('gbk') if ct.PY3 else text regSym = re.compile(r'\,count:(.*?)\}') datastr = regSym.findall(text) datastr = datastr[0] datastr = datastr.split('data:')[1] js = json.loads(datastr) df = pd.DataFrame(js, columns=vs.CPI_COLS) df['cpi'] = df['cpi'].astype(float) return df
def workthread(item, user_agent,path): strurl = 'http://yxpjw.club'+item[0] picname = item[1] print('????%s...........................\n' %(picname)) req = request.Request(strurl) req.add_header('User-Agent',user_agent) response = request.urlopen(req) content = response.read().decode('gbk') strurl2 = re.search(r'^(.*)/',strurl).group(0) print('https headers...............%s'%(strurl2)) #destname = os.path.join(path,picname+'.txt') #with open(destname, 'w',encoding='gbk') as file: #file.write(content) destdir = os.path.join(path,picname) os.makedirs(destdir) page = 1 while(1): content = getpagedata(content,destdir,page,strurl2) if not content: break page = page + 1 print('%s?????????\n'%(picname))
def get_loan_rate(): """ ???????? Return -------- DataFrame date :???? loan_type :???? rate:???%? """ rdint = vs.random() request = Request(vs.MACRO_URL%(vs.P_TYPE['http'], vs.DOMAINS['sina'], rdint, vs.MACRO_TYPE[2], 3, 800, rdint)) text = urlopen(request, timeout=10).read() text = text.decode('gbk') regSym = re.compile(r'\,count:(.*?)\}') datastr = regSym.findall(text) datastr = datastr[0] datastr = datastr.split('data:')[1] js = json.loads(datastr) df = pd.DataFrame(js, columns=vs.LOAN_COLS) for i in df.columns: df[i] = df[i].apply(lambda x:np.where(x is None, '--', x)) return df
def requestData(url, user_agent): try: req = request.Request(url) req.add_header('User-Agent', user_agent) response = request.urlopen(req,timeout = 8) #bytes????? content = response.read().decode('gbk') except error.URLError as e: if hasattr(e,'code'): print (e.code) if hasattr(e,'reason'): print (e.reason) except error.HTTPError as e: if hasattr(e,'code'): print(e.code) if hasattr(e,'reason'): print(e.reason) print('HTTPError!!!') return content
def requestData(self,url, user_agent): try: req = request.Request(url) req.add_header('User-Agent', user_agent) response = request.urlopen(req,timeout = 8) #bytes????? content = response.read().decode('utf-8') return content except error.URLError as e: if hasattr(e,'code'): print (e.code) if hasattr(e,'reason'): print (e.reason) except error.HTTPError as e: if hasattr(e,'code'): print(e.code) if hasattr(e,'reason'): print(e.reason) print('HTTPError!!!')
def requestData(self,url, user_agent): try: req = request.Request(url) req.add_header('User-Agent', user_agent) response = request.urlopen(req,timeout = 3) #bytes????? content = response.read().decode('gbk') return content except error.URLError as e: if hasattr(e,'code'): print (e.code) if hasattr(e,'reason'): print (e.reason) except error.HTTPError as e: if hasattr(e,'code'): print(e.code) if hasattr(e,'reason'): print(e.reason) print('HTTPError!!!')
def getAbstractInfo(self): try: req = request.Request(self.url) req.add_header('User-Agent', self.user_agent) response = request.urlopen(req) #bytes????? content = response.read().decode('gbk') self.getDetailList(content) except error.URLError as e: if hasattr(e,'code'): print (e.code) if hasattr(e,'reason'): print (e.reason) except error.HTTPError as e: print('HTTPError!!!')
def test_download_and_verify_ok(self, mock_urlopen): mock_extract_tarball = self.mock_patch_object( self.glance.utils, 'extract_tarball') mock_md5 = mock.Mock() mock_md5.hexdigest.return_value = 'expect_cksum' mock_md5_new = self.mock_patch_object( self.glance.md5, 'new', mock_md5) mock_info = mock.Mock() mock_info.getheader.return_value = 'expect_cksum' mock_urlopen.return_value.info.return_value = mock_info fake_request = urllib2.Request('http://fakeurl.com') self.glance._download_tarball_and_verify( fake_request, 'fake_staging_path') mock_urlopen.assert_called_with(fake_request) mock_extract_tarball.assert_called_once() mock_md5_new.assert_called_once() mock_info.getheader.assert_called_once() mock_md5_new.return_value.hexdigest.assert_called_once()
def test_download_ok_verify_failed(self, mock_urlopen): mock_extract_tarball = self.mock_patch_object( self.glance.utils, 'extract_tarball') mock_md5 = mock.Mock() mock_md5.hexdigest.return_value = 'unexpect_cksum' mock_md5_new = self.mock_patch_object( self.glance.md5, 'new', mock_md5) mock_info = mock.Mock() mock_info.getheader.return_value = 'expect_cksum' mock_urlopen.return_value.info.return_value = mock_info fake_request = urllib2.Request('http://fakeurl.com') self.assertRaises(self.glance.RetryableError, self.glance._download_tarball_and_verify, fake_request, 'fake_staging_path' ) mock_urlopen.assert_called_with(fake_request) mock_extract_tarball.assert_called_once() mock_md5_new.assert_called_once() mock_md5_new.return_value.hexdigest.assert_called_once()
def execute(self): if hasattr(Context.g_module, 'publish'): Context.Context.execute(self) mod = Context.g_module rfile = getattr(self, 'rfile', send_package_name()) if not os.path.isfile(rfile): self.fatal('Create the release file with "waf release" first! %r' % rfile) fdata = Utils.readf(rfile, m='rb') data = safe_urlencode([('pkgdata', fdata), ('pkgname', mod.APPNAME), ('pkgver', mod.VERSION)]) req = Request(get_upload_url(), data) response = urlopen(req, timeout=TIMEOUT) data = response.read().strip() if sys.hexversion>0x300000f: data = data.decode('utf-8') if data != 'ok': self.fatal('Could not publish the package %r' % data)
def compute_dependencies(self, filename=REQUIRES): text = Utils.readf(filename) data = safe_urlencode([('text', text)]) if '--offline' in sys.argv: self.constraints = self.local_resolve(text) else: req = Request(get_resolve_url(), data) try: response = urlopen(req, timeout=TIMEOUT) except URLError as e: Logs.warn('The package server is down! %r' % e) self.constraints = self.local_resolve(text) else: ret = response.read() try: ret = ret.decode('utf-8') except Exception: pass self.trace(ret) self.constraints = parse_constraints(ret) self.check_errors()
def list_archive_timestamps(url, min_date, max_date, user_agent): """ List the available archive between min_date and max_date for the given URL """ logger.info('Listing the archives for the url {url}'.format(url=url)) # Construct the URL used to download the memento list parameters = {'url': url, 'output': 'json', 'from': min_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT), 'to': max_date.strftime(WEB_ARCHIVE_TIMESTAMP_FORMAT)} cdx_url = WEB_ARCHIVE_CDX_TEMPLATE.format(params=urlencode(parameters)) req = Request(cdx_url, None, {'User-Agent': user_agent}) with urlopen(req) as cdx: memento_json = cdx.read().decode("utf-8") timestamps = [] # Ignore the first line which contains column names for url_key, timestamp, original, mime_type, status_code, digest, length in json.loads(memento_json)[1:]: # Ignore archives with a status code != OK if status_code == '200': timestamps.append(datetime.strptime(timestamp, WEB_ARCHIVE_TIMESTAMP_FORMAT)) return timestamps
def fetch_file(self, url, filename): # if not os.path.exists(filename): # os.makedirs(filename) try: req = request.Request(url, headers=self.__headers) data = request.urlopen(req).read() with open(filename, 'wb') as f: f.write(data) f.flush() f.close() self.__url_manager.set_url_status(url, 2) except Exception as e: self.__url_manager.set_url_status(url, -1) raise e finally: time.sleep(config['basic']['sleep'])
def retrieve_csv(self,url): ''' Retrieve data from the Veneer service, at the given url path, in CSV format. url: Path to required resource, relative to the root of the Veneer service. NOTE: CSV responses are currently only available for time series results ''' if PRINT_URLS: print("*** %s ***" % (url)) req = Request(self.base_url + quote(url+self.data_ext),headers={"Accept":"text/csv"}) text = urlopen(req).read().decode('utf-8') result = utils.read_veneer_csv(text) if PRINT_ALL: print(result) print("") return result
def read_directory(self, directory_url): """Parses the SecureDrop directory into a dictionary of instance details.""" # CloudFlare will block us if we don't set user-agent dir_req = Request(directory_url) dir_req.add_header("User-Agent", "Mozilla/5.0 (Windows NT 6.1; rv:45.0) " "Gecko/20100101 Firefox/45.0") directory = urlopen(dir_req).read().decode() instances = [] for line in directory.splitlines()[1:-1]: fields = line.split("\t") instances.append(dict(organization=fields[0], landing_page=fields[1], ths_address=fields[2])) return instances
def respond_to_checkpoint(self, response_code): headers = { 'User-Agent': self.USER_AGENT, 'Origin': 'https://i.instagram.com', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US', 'Accept-Encoding': 'gzip', 'Referer': self.endpoint, 'Cookie': self.cookie, } req = Request(self.endpoint, headers=headers) data = {'csrfmiddlewaretoken': self.csrftoken, 'response_code': response_code} res = urlopen(req, data=urlencode(data).encode('ascii'), timeout=self.timeout) if res.info().get('Content-Encoding') == 'gzip': buf = BytesIO(res.read()) content = gzip.GzipFile(fileobj=buf).read().decode('utf-8') else: content = res.read().decode('utf-8') return res.code, content
def add_uri(self) -> None: user, passwd = '', '' if len(self.rpc_username) > 0 and len(self.rpc_password) > 0: user = self.rpc_username passwd = self.rpc_password elif len(self.rpc_secret) > 0: user = 'token' passwd = self.rpc_secret aria2_endpoint = '%s:%s/jsonrpc' % (self.rpc_host, self.rpc_port) headers = {'Content-Type': 'application/json'} payload = json.dumps({'jsonrpc': '2.0', 'id': 1, 'method': 'aria2.addUri', 'params': ['%s:%s' % (user, passwd), [self.link_url]]}, sort_keys=False).encode('utf-8') try: req = Request(aria2_endpoint, headers=headers, data=payload) res = urlopen(req).read().decode('utf-8') jsonres = json.loads(res) # res = requests.post(aria2_endpoint, headers=headers, data=payload) # jsonres = res.json() self.aria2Confirmation.emit('result' in jsonres.keys()) except HTTPError: print(sys.exc_info()) QMessageBox.critical(self, 'ERROR NOTIFICATION', sys.exc_info(), QMessageBox.Ok) self.aria2Confirmation.emit(False) # self.exit()
def __init__(self, url=None): self.url = url self.html = None self.links = [] self.soup = None self.text = None self.title = None req = Request(self.url, headers={'User-Agent': "Magic Browser"}) try: self.html = urlopen(req) except URLError as e: if hasattr(e, 'reason'): print('We failed to reach a server.') print('Reason: ', e.reason) elif hasattr(e, 'code'): print('The server couldn\'t fulfill the request.') print('Error code: ', e.code)
def _day_cinema(date=None, pNo=1, retry_count=3, pause=0.001): ct._write_console() for _ in range(retry_count): time.sleep(pause) try: request = Request(ct.BOXOFFICE_CBD%(ct.P_TYPE['http'], ct.DOMAINS['mbox'], ct.BOX, pNo, date)) lines = urlopen(request, timeout = 10).read() if len(lines) < 15: #no data return None except Exception as e: print(e) else: js = json.loads(lines.decode('utf-8') if ct.PY3 else lines) df = pd.DataFrame(js['data1']) df = df.drop(['CinemaID'], axis=1) return df
def _get_detail(tag, retry_count=3, pause=0.001): for _ in range(retry_count): time.sleep(pause) try: ct._write_console() request = Request(ct.SINA_DATA_DETAIL_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], ct.PAGES['jv'], tag)) text = urlopen(request, timeout=10).read() text = text.decode('gbk') except _network_error_classes: pass else: reg = re.compile(r'\,(.*?)\:') text = reg.sub(r',"\1":', text) text = text.replace('"{symbol', '{"symbol') text = text.replace('{symbol', '{"symbol"') jstr = json.dumps(text) js = json.loads(jstr) df = pd.DataFrame(pd.read_json(js, dtype={'code':object}), columns=ct.THE_FIELDS) df = df[ct.FOR_CLASSIFY_B_COLS] return df raise IOError(ct.NETWORK_URL_ERROR_MSG)
def _sz_hz(date='', retry_count=3, pause=0.001): for _ in range(retry_count): time.sleep(pause) ct._write_console() try: request = Request(rv.MAR_SZ_HZ_URL%(ct.P_TYPE['http'], ct.DOMAINS['szse'], ct.PAGES['szsefc'], date)) lines = urlopen(request, timeout = 10).read() if len(lines) <= 200: return pd.DataFrame() df = pd.read_html(lines, skiprows=[0])[0] df.columns = rv.MAR_SZ_HZ_COLS df['opDate'] = date except Exception as e: print(e) else: return df raise IOError(ct.NETWORK_URL_ERROR_MSG)
def _parase_fq_factor(code, start, end): symbol = _code_to_symbol(code) request = Request(ct.HIST_FQ_FACTOR_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], symbol)) text = urlopen(request, timeout=10).read() text = text[1:len(text)-1] text = text.decode('utf-8') if ct.PY3 else text text = text.replace('{_', '{"') text = text.replace('total', '"total"') text = text.replace('data', '"data"') text = text.replace(':"', '":"') text = text.replace('",_', '","') text = text.replace('_', '-') text = json.loads(text) df = pd.DataFrame({'date':list(text['data'].keys()), 'factor':list(text['data'].values())}) df['date'] = df['date'].map(_fun_except) # for null case if df['date'].dtypes == np.object: df['date'] = df['date'].astype(np.datetime64) df = df.drop_duplicates('date') df['factor'] = df['factor'].astype(float) return df
def weibo(content): st = get_st() add_weibo_url = 'https://m.weibo.cn/api/statuses/update' # ?????? params = { 'content':content, 'st':st } params = parse.urlencode(params).encode('utf-8') req =request.Request(add_weibo_url,params,method="POST") res = request.urlopen(req) html = res.read().decode('utf-8') print(html) #???????????
def login(): login_data = configparser.ConfigParser() login_data.read("user.ini") #????????user.ini???? username = login_data.get("LoginInfo", "email") password = login_data.get("LoginInfo", "password") url = 'https://www.v2ex.com/signin' req = request.Request(url) res = request.urlopen(req) html = res.read().decode('gbk','ignore') #????????????? soup = BeautifulSoup(html, 'html.parser', from_encoding='utf-8') inputs = soup.find_all('input') # ?????? params = { inputs[1]["name"]: username, inputs[2]["name"]: password, inputs[3]["name"]: inputs[3]["value"], inputs[5]["name"]: inputs[5]["value"] } params = parse.urlencode(params).encode('utf-8') req = request.Request(url,params,method="POST") #???????????????? res = request.urlopen(req)
def daily(): url = 'https://www.v2ex.com/mission/daily' #?????? req = request.Request(url) res = request.urlopen(req) html = res.read().decode("utf-8") soup = BeautifulSoup(html, 'html.parser', from_encoding='utf-8') #??????????? inputs = soup.find_all('input') try: daily_link = 'https://www.v2ex.com' + re.search("location.href = '(.*)';", inputs[1]['onclick']).group(1) #????????? if daily_link == 'https://www.v2ex.com/balance': print('?????????') return False req = request.Request(daily_link) res = request.urlopen(req) html = res.read().decode("utf-8") soup = BeautifulSoup(html, 'html.parser', from_encoding='utf-8') print(soup.find('div',class_="message").text) except Exception as e: print('??????')
def getQrcode(uuid): print('Please scan the QR code.') url = 'https://login.weixin.qq.com/qrcode/' + uuid req = request.Request(url) res = request.urlopen(req) f = open(QRImagePath, 'wb') f.write(res.read()) f.close() time.sleep(1) if sys.platform.find('darwin') >= 0: subprocess.call(['open', QRImagePath]) elif sys.platform.find('linux') >= 0: subprocess.call(['xdg-open', QRImagePath]) else: os.startfile(QRImagePath)
def is_login(): uuid = get_uuid() getQrcode(uuid) while True: url = 'https://login.wx.qq.com/cgi-bin/mmwebwx-bin/login?loginicon=true&uuid='+ uuid +'&tip=0&r=' + str(time.time()) + '&_=' + str(time.time()) req = request.Request(url) res = request.urlopen(req) html = res.read().decode('utf-8') status = re.search("window.code=(.*);",html).group(1) if str(status) == '200': print('login success.') index_url = re.search("window.redirect_uri=\"(.*)\"",html).group(1) # closeQRImage if sys.platform.find('darwin') >= 0: # for OSX with Preview os.system("osascript -e 'quit app \"Preview\"'") return index_url break elif str(status) == '201': print('confirm the login on the phone.') #????????
def getSKey(url): global globalData req = request.Request(url+'&fun=new&version=v2&lang=zh_CN') res = request.urlopen(req) html = res.read().decode('utf-8') globalData = {} globalData['wxuin'] = re.search("<wxuin>(.*)</wxuin>",html).group(1) globalData['skey'] = re.search("<skey>(.*)</skey>",html).group(1) globalData['wxsid'] = re.search("<wxsid>(.*)</wxsid>",html).group(1) globalData['pass_ticket'] = re.search("<pass_ticket>(.*)</pass_ticket>",html).group(1) webwxinit() #????,????? return globalData #?????????
def webwxstatusnotify(): global globalData url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxstatusnotify?lang=zh_CN&pass_ticket=%s' %(globalData['pass_ticket']) params = { 'BaseRequest':globalData['BaseRequest'], "Code":3, "FromUserName":globalData['UserName'], "ToUserName":globalData['UserName'], "ClientMsgId":math.floor(time.time()*1000) } params = json.dumps(params) params = bytes(params,'utf8') headers = {'Content-Type': 'application/json;charset=UTF-8'} req = request.Request(url,params,method="POST",headers=headers) res = request.urlopen(req) result = json.loads(res.read().decode('utf-8')) if result['BaseResponse']['Ret'] == 0: print('MsgID:%s' %(result['MsgID'])) #??????
def webwxbatchgetcontact(): global globalData url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxbatchgetcontact?type=ex&r=%s&lang=zh_CN&pass_ticket=%s' %(int(time.time()),globalData['pass_ticket']) params = { 'BaseRequest':globalData['BaseRequest'], "Count":1, "List":[ { "UserName":"@@a900dd2fe01da33c7d6b5db0b18eaeea678fa542570cd7dc2a9f7182605561de", "EncryChatRoomId":"" } ] } params = json.dumps(params) params = bytes(params,'utf8') headers = {'Content-Type': 'application/json;charset=UTF-8'} req = request.Request(url,params,method="POST",headers=headers) res = request.urlopen(req) result = json.loads(res.read().decode('utf-8')) print(result) #????
def sendMsg(content): global globalData url = 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsendmsg?lang=zh_CN&pass_ticket=%s' %(globalData['pass_ticket']) ClientMsgId = math.floor(time.time()*1000) params = { 'BaseRequest':globalData['BaseRequest'], "Msg":{ 'ClientMsgId':ClientMsgId, 'Content':content, 'FromUserName':globalData['UserName'], 'LocalID':ClientMsgId, 'ToUserName':"filehelper", 'Type':1 }, 'Scene':0 } params = json.dumps(params) params = bytes(params,'utf8') headers = {'Content-Type': 'application/json;charset=UTF-8'} req = request.Request(url,params,method="POST",headers=headers) res = request.urlopen(req) result = json.loads(res.read().decode('utf-8')) print(result) #????
def run(self): install.run(self) platform = self._get_platform() library_full_path = self._get_install_full_path( self._get_base_install_path(), self._LIBRARY_NAME[platform][1]) get_latest_request = Request('https://github.com/yamachu/World/releases/latest', headers={'Accept': 'application/json'}) get_latest_response = urlopen(get_latest_request) response_str = get_latest_response.read().decode('utf-8') response_json = json.loads(response_str) latest_version = response_json['tag_name'] urlretrieve("{}/{}/{}".format( self._DOWNLOAD_BASE_URL, latest_version, self._LIBRARY_NAME[platform][0]), library_full_path)
def upload(url, filename=None): from urllib.request import Request, urlopen from urllib.parse import urlsplit import shutil def getFilename(url,openUrl): if 'Content-Disposition' in openUrl.info(): # If the response has Content-Disposition, try to get filename from it cd = dict([x.strip().split('=') if '=' in x else (x.strip(),'') for x in openUrl.info().split(';')]) if 'filename' in cd: fname = cd['filename'].strip("\"'") if fname: return fname # if no filename was found above, parse it out of the final URL. return os.path.basename(urlsplit(openUrl.url)[2]) r = urlopen(Request(url)) success = None try: filename = filename or "/tmp/%s" % getFilename(url,r) with open(filename, 'wb') as f: shutil.copyfileobj(r,f) success = filename finally: r.close() return success
def present(): url = 'http://api.openweathermap.org/data/2.5/weather?q=daejeon,kr&units=metric' service_key = '709f54e9062fdbadbe73863ff0ac30b5' queryParams = '&' + urlencode({quote_plus('APPID'): service_key}) request = Request(url + queryParams) request.get_method = lambda: 'GET' response_body = (urlopen(request).read()).decode("utf-8") WeatherData = json.loads(response_body) # ??? ???? ?????. JSON ?? ? ??? ????? ?????? weather = WeatherData['weather'][0] weather = weather['description'] temp_min = WeatherData['main']['temp_min'] temp_max = WeatherData['main']['temp_max'] humidity = WeatherData['main']['humidity'] temp = WeatherData['main']['temp'] present_weather = [weather, temp, temp_max, temp_min, humidity] return present_weather
def week(): url = "http://api.openweathermap.org/data/2.5/forecast?q=daejeon,kr&units=metric" service_key = '709f54e9062fdbadbe73863ff0ac30b5' queryParams = '&' + urlencode({quote_plus('APPID'): service_key}) request = Request(url + queryParams) request.get_method = lambda: 'GET' response_body = (urlopen(request).read()).decode("utf-8") WeatherData = json.loads(response_body) day1 = WeatherData["list"][5] day2 = WeatherData["list"][12] day3 = WeatherData["list"][19] day4 = WeatherData["list"][26] day5 = WeatherData['list'][34] day1 = [day1['main']['temp'], day1['weather'][0]["description"]] day2 = [day2['main']['temp'], day2['weather'][0]["description"]] day3 = [day3['main']['temp'], day3['weather'][0]["description"]] day4 = [day4['main']['temp'], day4['weather'][0]["description"]] day5 = [day5['main']['temp'], day5['weather'][0]["description"]] days = [day1, day2, day3, day4, day5] return days
def request(cls, uri, params={}, client=None, wrapper=FreesoundObject, method='GET',data=False): p = params if params else {} url = '%s?%s' % (uri, urlencode(p)) if params else uri d = urllib.urlencode(data) if data else None headers = {'Authorization':client.header} req = Request(url,d,headers) try: f = urlopen(req) except HTTPError as e: resp = e.read() if e.code >= 200 and e.code < 300: return resp else: return FreesoundException(e.code, json.loads(resp.decode("utf-8"))) resp = f.read() f.close() result = None try: result = json.loads(resp.decode("utf-8")) except: raise FreesoundException(0,"Couldn't parse response") if wrapper: return wrapper(result,client) return result
def __create_request(self, uri, data=None): """ Internal method to create http/https web request Args: uri (str): protocol://web_address:port data (str): data to send over web request Returns: web request object """ request_ = urllib2.Request(url=uri, data=data) headers = self.__headers for header in headers: request_.add_header(header, headers[header]) return request_
def download(self, data_loader): display_name = self.data_def["displayName"] bytes_downloaded = 0 if "path" in self.data_def: path = self.data_def["path"] else: url = self.data_def["url"] req = Request(url, None, self.headers) print("Downloading '{0}' from {1}".format(display_name, url)) with tempfile.NamedTemporaryFile(delete=False) as f: bytes_downloaded = self.write(urlopen(req), f) path = f.name self.data_def["path"] = path = f.name if path: try: if bytes_downloaded > 0: print("Downloaded {} bytes".format(bytes_downloaded)) print("Creating {1} DataFrame for '{0}'. Please wait...".format(display_name, 'pySpark')) return data_loader(path) finally: print("Successfully created {1} DataFrame for '{0}'".format(display_name, 'pySpark'))
def request(cls, url, verify_cert=True): """ Web request :param: url: The url link :return JSON object """ req = urlrequest.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) # res = urlrequest.urlopen(url) if verify_cert: res = urlrequest.urlopen( req, timeout=RESTfulApiSocket.DEFAULT_URLOPEN_TIMEOUT) else: res = urlrequest.urlopen( req, context=ssl._create_unverified_context(), timeout=RESTfulApiSocket.DEFAULT_URLOPEN_TIMEOUT) try: res = json.loads(res.read().decode('utf8')) return res except: return {}
def __init__(self, search_page_url): self.search_page_url = search_page_url req = Request( search_page_url, data=None, headers={ 'User-Agent': UserAgent().chrome } ) self.html = urlopen(req).read().decode('utf-8') self.soup = BeautifulSoup(self.html, 'html.parser') self.num_results = None for f in self.soup.find_all('strong'): if '????????' in f.text: if f.text.split()[0].isdigit(): self.num_results = int(f.text.split()[0])
def follow_redirects(link, sites= None): """Follow directs for the link as long as the redirects are on the given sites and return the resolved link.""" def follow(url): return sites == None or urlparse.urlparse(url).hostname in sites class RedirectHandler(urllib2.HTTPRedirectHandler): def __init__(self): self.last_url = None def redirect_request(self, req, fp, code, msg, hdrs, newurl): self.last_url = newurl if not follow(newurl): return None r = urllib2.HTTPRedirectHandler.redirect_request( self, req, fp, code, msg, hdrs, newurl) r.get_method = lambda : 'HEAD' return r if not follow(link): return link redirect_handler = RedirectHandler() opener = urllib2.build_opener(redirect_handler) req = urllib2.Request(link) req.get_method = lambda : 'HEAD' try: with contextlib.closing(opener.open(req,timeout=1)) as site: return site.url except: return redirect_handler.last_url if redirect_handler.last_url else link
def __do_collect(coin, pk): """ ???? :param coin: :return: """ url = depth_url.format(coin, random.random()) req = request.Request(url=url, headers=headers) key = order_key_prefix + str(pk) + "_" + coin try: if RedisPool.conn.exists(key) == 1: print("exists : " + key) return time.sleep(0.2) with request.urlopen(req, timeout=3, context=context) as resp: d = resp.read().decode() if RedisPool.conn.set(key, d, ex=3600, nx=True) == 1: RedisPool.conn.rpush(order_coll_queue, key) except: exstr = traceback.format_exc() logger.warn(exstr) logger.warn("collect {} error".format(key)) time.sleep(2) # ???????????2s