我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.Session()。
def get_cookie(account, password): s = requests.Session() payload = { 'login_uid1': account, 'login_pwd1': password, 'agreeRule': "1", 'loginsubmit': "??", # 'redirect_to': "http://www.creprice.cn", # 'testcookie': "1" } response = s.post(login_url, data=payload, allow_redirects=False) cookies = response.cookies.get_dict() logger.warning("get cookie success!!!(account is:%s)" % account) return json.dumps(cookies) # ?Cookies??Redis???
def upload_prediction(self, file_path): filename, signedRequest, headers, status_code = self.authorize(file_path) if status_code!=200: return status_code dataset_id, comp_id, status_code = self.get_current_competition() if status_code!=200: return status_code with open(file_path, 'rb') as fp: r = requests.Request('PUT', signedRequest, data=fp.read()) prepped = r.prepare() s = requests.Session() resp = s.send(prepped) if resp.status_code!=200: return resp.status_code r = requests.post(self._submissions_url, data={'competition_id':comp_id, 'dataset_id':dataset_id, 'filename':filename}, headers=headers) return r.status_code
def prepareLogin(self): self.clientid = 53999199 self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:27.0) Gecko/20100101 Firefox/27.0', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8' }) self.urlGet( 'https://ui.ptlogin2.qq.com/cgi-bin/login?daid=164&target=self&style=16&mibao_css=m_webqq&' + \ 'appid=501004106&enable_qlogin=0&no_verifyimg=1&s_url=http%3A%2F%2Fw.qq.com%2Fproxy.html&' + \ 'f_url=loginerroralert&strong_login=1&login_state=10&t=20131024001' ) self.session.cookies.update(dict( RK='OfeLBai4FB', ptcz='ad3bf14f9da2738e09e498bfeb93dd9da7540dea2b7a71acfb97ed4d3da4e277', pgv_pvi='911366144', pgv_info='ssid pgv_pvid=1051433466', qrsig='hJ9GvNx*oIvLjP5I5dQ19KPa3zwxNI62eALLO*g2JLbKPYsZIRsnbJIxNe74NzQQ' )) self.getAuthStatus() self.session.cookies.pop('qrsig')
def hltb(bot,trigger): if not trigger.group(2): return bot.say("Enter a game name to search.") game = trigger.group(2) url = "http://howlongtobeat.com/search_main.php?page=1" payload = {"queryString":game,"t":"games","sorthead":"popular","sortd":"Normal Order","length_type":"main","detail":"0"} test = {'Content-type':'application/x-www-form-urlencoded', 'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.97 Safari/537.36','origin':'https://howlongtobeat.com','referer':'https://howlongtobeat.com'} session = requests.Session() session.post(url, headers=test, data=payload) r = session.post(url, headers=test, data=payload) if len(r.content) < 250: return bot.say("No results.") bs = BeautifulSoup(r.content) first = bs.findAll("div", {"class":"search_list_details"})[0] name = first.a.text time = first.findAll('div')[3].text bot.say('{} - {}'.format(name, time))
def AkamaiEdgeGridConfig_Setup(config_file, section): config_file = os.path.expanduser(config_file) if debug: print "DEBUG: config_file", config_file #Currently unused. required_options = ['client_token','client_secret','host','access_token'] EdgeGridConfig = {} if os.path.isfile(config_file): config = ConfigParser.ConfigParser() config.readfp(open(config_file)) for key, value in config.items(section): # ConfigParser lowercases magically EdgeGridConfig[key] = value else: print "Missing configuration file. Run python gen_creds.py to get your credentials file set up once you've provisioned credentials in LUNA." exit() EdgeGridConfig['host'] = '%s://%s' % ('https', EdgeGridConfig['host']) if debug: print EdgeGridConfig return EdgeGridConfig #Setup a EdgeGrid Session using the EdgeGridConfig previously loaded.
def __init__(self, **kwargs): """ Initialize the class, get the necessary parameters """ self.user_agent = 'python-cachetclient' try: self.endpoint = kwargs['endpoint'] except KeyError: raise KeyError('Cachet API endpoint is required') self.api_token = kwargs.get('api_token', None) self.timeout = kwargs.get('timeout', None) self.verify = kwargs.get('verify', None) self.pagination = kwargs.get('pagination', False) self.http = requests.Session()
def setupSession(): session = requests.Session() session.header = { 'User-Agent': "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:34.0) Gecko/20100101 Firefox/34.0","Accept-Encoding": "gzip, deflate, sdch"} return session
def __init__(self, name: str = None, description: str = None, version: str = None): self.app_id = {'X-TBA-App-Id': ""} self.session = requests.Session() self.session = CacheControl(self.session, heuristic=LastModified()) self.session.headers.update(self.app_id) if name is not None: self.set_api_key(name, description, version)
def __init__(self): Analyzer.__init__(self) self.service = self.get_param('config.service', None, 'EmergingThreats service is missing') self.apikey = self.get_param('config.key', None, 'EmergingThreats apikey is missing') self.session = requests.Session() self.session.headers.update({"Authorization": self.apikey})
def __init__(self, key, client_id, client_version='0.1'): self.api_key = key self.session = requests.Session() self.url = 'https://safebrowsing.googleapis.com/v4/threatMatches:find?key={}'.format(key) self.client_id = client_id self.client_version = client_version
def run(path, quiet=False): """ Downloads all available hash files to a given path. :param path: Path to download directory :param quiet: If set to True, no progressbar is displayed """ if os.path.isdir(path): session = requests.Session() session.headers = {'User-agent': 'Mozilla/5.0 Chrome/57.0.2987.110'} max_num = max(list(map(int, re.sub(r'[\<\>]', '', '\n'.join(re.findall(r'\>[1-9][0-9]{2}\<', session.get('https://virusshare.com/hashes.4n6').text ) ) ).split('\n') ) ) ) if not quiet: p = progressbar.ProgressBar(max_value=max_num) for i in range(max_num): filename = str(i).zfill(3) + '.md5' if os.path.exists(os.path.join(path, filename)): continue if not quiet: p.update(i) url = URL + filename head = session.head(url) if head.status_code == 200: body = session.get(url, stream=True) with io.open(os.path.join(path, str(i).zfill(3) + '.md5'), mode='wb') as afile: for chunk in body.iter_content(chunk_size=1024): afile.write(b'' + chunk) body.close() else: print('Given path is not a directory.') sys.exit(1)
def session(self, email, password): session = requests.Session() session.headers.update({ 'content-type': 'application/x-www-form-urlencoded' }) response = session.post( ZoomClient.SIGNIN_URL, data={'email': email, 'password': password} ) return session, response
def check(auth_ref, args): # We call get_keystone_client here as there is some logic within to get a # new token if previous one is bad. keystone = get_keystone_client(auth_ref) auth_token = keystone.auth_token registry_endpoint = 'http://{ip}:9191'.format(ip=args.ip) s = requests.Session() s.headers.update( {'Content-type': 'application/json', 'x-auth-token': auth_token}) try: # /images returns a list of public, non-deleted images r = s.get('%s/images' % registry_endpoint, verify=False, timeout=10) is_up = r.ok except (exc.ConnectionError, exc.HTTPError, exc.Timeout): is_up = False except Exception as e: status_err(str(e)) metric_values = dict() status_ok() metric_bool('glance_registry_local_status', is_up) # only want to send other metrics if api is up if is_up: milliseconds = r.elapsed.total_seconds() * 1000 metric('glance_registry_local_response_time', 'double', '%.3f' % milliseconds, 'ms') metric_values['glance_registry_local_response_time'] = ('%.3f' % milliseconds) metric_influx(INFLUX_MEASUREMENT_NAME, metric_values)
def check(args): metadata_endpoint = ('http://{ip}:8775'.format(ip=args.ip)) is_up = True s = requests.Session() try: # looks like we can only get / (ec2 versions) without specifying # an instance ID and other headers versions = s.get('%s/' % metadata_endpoint, verify=False, timeout=10) milliseconds = versions.elapsed.total_seconds() * 1000 if not versions.ok or '1.0' not in versions.content.splitlines(): is_up = False except (exc.ConnectionError, exc.HTTPError, exc.Timeout) as e: is_up = False except Exception as e: status_err(str(e)) metric_values = dict() status_ok() metric_bool('nova_api_metadata_local_status', is_up) # only want to send other metrics if api is up if is_up: metric('nova_api_metadata_local_response_time', 'double', '%.3f' % milliseconds, 'ms') metric_values['nova_api_metadata_local_response_time'] = ('%.3f' % milliseconds) metric_influx(INFLUX_MEASUREMENT_NAME, metric_values)
def upload(path, imagestore_string='fabric:ImageStore', show_progress=False): # pylint: disable=too-many-locals,missing-docstring from sfctl.config import (client_endpoint, no_verify_setting, ca_cert_info, cert_info) import requests abspath = validate_app_path(path) basename = os.path.basename(abspath) endpoint = client_endpoint() cert = cert_info() ca_cert = True if no_verify_setting(): ca_cert = False elif ca_cert_info(): ca_cert = ca_cert_info() if all([no_verify_setting(), ca_cert_info()]): raise CLIError('Cannot specify both CA cert info and no verify') # Upload to either to a folder, or native image store only if 'file:' in imagestore_string: dest_path = path_from_imagestore_string(imagestore_string) upload_to_fileshare(abspath, os.path.join(dest_path, basename), show_progress) elif imagestore_string == 'fabric:ImageStore': with requests.Session() as sesh: sesh.verify = ca_cert sesh.cert = cert upload_to_native_imagestore(sesh, endpoint, abspath, basename, show_progress) else: raise CLIError('Unsupported image store connection string')
def __init__(self, service, role_source, configfile=DEFAULT_CONFIGFILE): self.service = service self.role_source = role_source self.api_endpoint = 'http://127.0.0.1:8500/v1' self.api_session = requests.Session() self.hostname = gethostname() self.short_hostname = self.hostname.split('.')[0] self.update_service = False self.valid_states = ['master', 'slave', 'fail'] self.configfile = configfile self.leader_uri = self.api_endpoint + '/kv/session/' + self.service + '/leader'
def current_leader_session_id(self): check_current_leader = self.api_session.get(self.leader_uri) if check_current_leader.status_code == 200: return check_current_leader.json()[0].get('Session')
def __init__(self): self.URL_vendor = 'http://shop.bdgastore.com/' self.URL_product = 'http://shop.bdgastore.com/collections/footwear/products/y-3-pureboost-zg' self.URL_addToCart = 'http://shop.bdgastore.com/cart/add.js' self.URL_cart = 'http://shop.bdgastore.com/cart' self.user_size = '8' self.user_session = requests.Session()
def get_data(username, no): if no == 0: z = 'followers' else: z = 'following' # these lines of code gets the list of followers or the following on the first page # when there are no further pages of followers or following. And if there are go forward with the next page s = requests.Session() final=[] x = 1 pages = [""] data = [] while(pages != [] and x <= max_number/no_per_page): r = s.get('https://github.com/' + username + '?page=' + str(x) + '&tab=' + z) #first getting all the followers for z=0, and following for z=1 soup = BeautifulSoup(r.text) data = data + soup.find_all("div", {"class" : "d-table col-12 width-full py-4 border-bottom border-gray-light"}) pages = soup.find_all("div", {"class" : "pagination"}) x += 1 # getting company and area. for i in data: username = i.find_all("a")[0]['href'] try: company = i.find_all("span", {"class" : "mr-3"})[0].text.strip() except: company = "xxxxx" try: area = i.find_all("p", {"class" : "text-gray text-small mb-0"})[0].text.strip() except: area = "xxxxx" soup2 = BeautifulSoup(str(i)) name = soup2.find_all("span",{"class" : "f4 link-gray-dark"})[0].text final.append([username,company,area,name]) return final
def scrape_org(org,main_list,organisation): s = requests.Session() r = s.get('https://github.com/orgs/'+org+'/people') soup = BeautifulSoup(r.text) data = soup.find_all("li", {"class" : "table-list-item member-list-item js-bulk-actions-item "}) for i in data: soup2=BeautifulSoup(str(i)) data2=soup2.find_all("div",{"class" : "table-list-cell py-3 pl-3 v-align-middle member-avatar-cell css-truncate pr-0"}) username = data2[0].find_all("a")[0]['href'] data3 = soup2.find_all("div",{"class" : "table-list-cell py-3 v-align-middle member-info css-truncate pl-3"}) name = data3[0].find_all("a")[0].text.strip() main_list.append([username,name])
def update_org_list(main_list,organisation): s = requests.Session() for i in main_list: r = s.get('https://github.com/'+i[0]) soup = BeautifulSoup(r.text) data = soup.find_all("li",{"aria-label":"Organization"}) try: if data[0].text not in organisation: organisation.append(data[0].text) except: continue return organisation
def scrape_org_general(org,main_list,organisation): org.replace(" ","+") s = requests.Session() count = 1 k = "https://github.com/search?p="+str(count)+"&q="+org+"+type%3Auser&type=Users&utf8=%E2%9C%93" r = s.get(k) soup = BeautifulSoup(r.text,"lxml") data = soup.find_all("div",{"class":"user-list-info ml-2"}) while data!=[]: for i in data: username = i.find_all("a")[0]['href'] name = i.find_all("span",{"class":"f4 ml-1"})[0].text.strip() main_list.append([username,name]) count+=1 k = "https://github.com/search?p="+str(count)+"&q="+org+"+type%3Auser&type=Users&utf8=%E2%9C%93" r = s.get(k) soup = BeautifulSoup(r.text,"lxml") data = soup.find_all("div",{"class":"user-list-info ml-2"}) # scraping the github pages
def setUp(self, conf='/test.cfg'): settings = Settings() settings.setFile(base_path + conf) Env.set('settings', settings) Env.set('http_opener', requests.Session()) Env.set('cache', NoCache()) YGG.log.logger.setLevel('DEBUG') YGG.log.logger.addHandler(handler) return YGG()
def get_av_magnet(avcode): Referer={ "Referer": "123" } s = requests.Session() gid,uc = download_image(avcode) params={ 'gid':gid, 'uc':uc, 'lang':'zh' } r2 = s.get("http://www.javbus.com/ajax/uncledatoolsbyajax.php",params=params, proxies=proxy, headers=Referer) soup = BeautifulSoup(r2.content.decode('utf-8', 'ignore'),'html.parser') trs = soup.findAll('tr',attrs={"height":"35px"}) print '[*] get magnet link' for tr in trs: trsoup = BeautifulSoup(str(tr).decode('utf-8', 'ignore'),'html.parser') td2 = trsoup.findAll('td',attrs={"style":"text-align:center;white-space:nowrap"}) a = td2[0].find('a') magnet = a.get("href") #unicode object size = a.text.strip() print '[*] '+magnet,size os.chdir("../..")
def _connect(self): self._session = requests.Session() adaptator = requests.adapters.HTTPAdapter() adaptator.max_retries = HttpRetry( read=self.READ_MAX_RETRIES, connect=self.CONN_MAX_RETRIES, backoff_factor=self.BACKOFF_FACTOR) self._session.mount(str(self.url), adaptator) self.__conn = self._session.get( self.url, stream=True, timeout=(self.CONN_TIMEOUT, self.READ_TIMEOUT))
def makeRequestSession(self): host = self.requestHandler.headers.getheader('host',None) path = self.requestHandler.path self.url = self.uri + "://" + host + path session = requests.Session() for header in self.requestHandler.headers.keys(): if header != 'content-length': session.headers.update({header : self.requestHandler.headers.getheader(header)}) if self.proxies: session.proxies = self.proxies return session
def setUp(self): self.tls_adapter = CbAPISessionAdapter(force_tls_1_2=True) self.session = requests.Session() self.session.mount("https://", self.tls_adapter)
def _url_to_key(self, url): session = requests.Session() return self.create_key(session.prepare_request(requests.Request('GET', url)))
def uninstall_cache(): """ Restores ``requests.Session`` and disables cache """ _patch_session_factory(OriginalSession)
def __init__(self, server, ssl_verify=True, token=None, ignore_system_proxy=False, use_https_proxy=None, ssl_verify_hostname=True, use_http_proxy=None): """ Requires: server - URL to the Carbon Black server. Usually the same as the web GUI. ssl_verify - verify server SSL certificate token - this is for CLI API interface """ # We will uncomment this once cbapi 1.0.0 is released # warn("CbApi is deprecated and will be removed as of cbapi 2.0.0.", DeprecationWarning) if not server.startswith("http"): raise TypeError("Server must be URL: e.g, http://cb.example.com") if token is None: raise TypeError("Missing required authentication token.") self.server = server.rstrip("/") self.ssl_verify = ssl_verify self.token = token self.token_header = {'X-Auth-Token': self.token} self.session = requests.Session() if not ssl_verify_hostname: self.session.mount("https://", HostNameIgnoringAdapter()) self.proxies = {} if ignore_system_proxy: # see https://github.com/kennethreitz/requests/issues/879 self.proxies = { 'no': 'pass' } else: if use_http_proxy: self.proxies['http'] = use_http_proxy if use_https_proxy: self.proxies['https'] = use_https_proxy
def send(self, *a, **kw): a[0].url = a[0].url.replace(urllib.quote("<"), "<") a[0].url = a[0].url.replace(urllib.quote(" "), " ") a[0].url = a[0].url.replace(urllib.quote(">"), ">") return requests.Session.send(self, *a, **kw)
def index(): try: api_key = os.environ['DASHBOARD_DUTY_KEY'] service_key = os.environ['DASHBOARD_DUTY_SERVICE'] except KeyError: logging.error('Missing Environment Variable(s)') exit(1) session = requests.Session() d_session = dashboard_duty.Core(session, api_key, service_key) service = d_session.service() incidents = d_session.incident(service['id']) if service['escalation_policy']['escalation_rules'][0]['targets'][0]['type'] == 'schedule_reference': service_id = service['escalation_policy']['escalation_rules'][0]['targets'][0]['id'] oncall = d_session.oncall_schedule_policy(service_id) elif service['escalation_policy']['escalation_rules'][0]['targets'][0]['type'] == 'user_reference': username = service['escalation_policy']['escalation_rules'][0]['targets'][0]['summary'] oncall = d_session.oncall_user_policy(username) else: logging.error('Unable to handle oncall policy for %s' % service_key) exit(1) return render_template('index.html', service=service, incidents=incidents, oncall=oncall)
def list_DL(List): List=List[1:] List2=[] ID_list=[] List_dl=[] k=0 j=0 p=0 while p*2<len(List): List2.append(List[p*2]) p+=1 for i in List2: #-------------------- s1=requests.Session() s1.get(url0) url_list=s1.get(List2[j], headers=headers) #-------------------- #a1=requests.get(List2[j]) #html=a1.text html=url_list.text a2=html.find('<a href="javascript:void(0)" link="') a2_len=len('<a href="javascript:void(0)" link="') if a2<0: a2=html.find("RJ.currentMP3Url = 'mp3/") a2_len=len("RJ.currentMP3Url = 'mp3/") a3=html.find('" target="_blank" class="mp3_download_link">') if a3<0: a3=html.find("RJ.currentMP3 = '") a4=html[a2+a2_len:a3] if a4.find("'")>0: a4=a4[:a4.find("'")] ID_list.append(a4+'.mp3') while k < len(ID_list): List_dl.append(check_host(l_mp3[:4],ID_list[k])[0]) k+=1 else: List_dl.append(a4) j+=1 return(List_dl)
def __init__(self, service_urls=None, user_agent=DEFAULT_USER_AGENT): self.session = requests.Session() self.session.headers.update({ 'User-Agent': user_agent, }) self.service_urls = service_urls or ['translate.google.com'] self.token_acquirer = TokenAcquirer(session=self.session, host=self.service_urls[0]) # Use HTTP2 Adapter if hyper is installed try: # pragma: nocover from hyper.contrib import HTTP20Adapter self.session.mount(urls.BASE, HTTP20Adapter()) except ImportError: # pragma: nocover pass
def __init__(self, tkk='0', session=None, host='translate.google.com'): self.session = session or requests.Session() self.tkk = tkk self.host = host if 'http' in host else 'https://' + host
def AkamaiEdgeGridSession_Setup(AkamaiEdgeGridConfig): session = requests.Session() session.auth = EdgeGridAuth( client_token=AkamaiEdgeGridConfig['client_token'], client_secret=AkamaiEdgeGridConfig['client_secret'], access_token=AkamaiEdgeGridConfig['access_token'] ) return session #Actually call akamai and return the result.
def __init__(self, host="127.0.0.1", port=46657): # Tendermint endpoint self.uri = "http://{}:{}".format(host, port) # Keep a session self.session = requests.Session() # Request counter for json-rpc self.request_counter = itertools.count() # request headers self.headers = { 'user-agent': AGENT, 'Content-Type': 'application/json' }
def __init__(self, api_key=None, endpoint=None): if api_key is None or endpoint is None: try: from pymatgen import SETTINGS except ImportError: warnings.warn('MPResterBase: not using pymatgen SETTINGS!') SETTINGS = {} if api_key is not None: self.api_key = api_key else: self.api_key = SETTINGS.get("PMG_MAPI_KEY", "") if endpoint is not None: self.preamble = endpoint else: self.preamble = SETTINGS.get( "PMG_MAPI_ENDPOINT", "https://www.materialsproject.org/rest/v2" ) if not self.api_key: raise ValueError('API key not set. Run `pmg config --add PMG_MAPI_KEY <USER_API_KEY>`.') self.session = requests.Session() self.session.headers = {"x-api-key": self.api_key}
def __init__(self, broker="http://127.0.0.1:19820/api", encoding="utf-8", enc_key=None, enc_iv=None): super().__init__() self._endpoint = broker self._encoding = "utf-8" if enc_key == None or enc_iv == None: self._transport_enc = False self._transport_enc_key = None self._transport_enc_iv = None self._cipher = None else: self._transport_enc = True self._transport_enc_key = enc_key self._transport_enc_iv = enc_iv backend = default_backend() self._cipher = Cipher(algorithms.AES( enc_key), modes.CBC(enc_iv), backend=backend) self._session = requests.Session() self._event_dict = {'logon': self.on_login, 'logoff': self.on_logout, 'ping': self.on_ping, 'query_data': self.on_query_data, 'send_order': self.on_insert_order, 'cancel_order': self.on_cancel_order_event, 'get_quote': self.on_get_quote} self.client_id = '' self.account_id = ''
def test_hearthhead(self): with requests.Session() as s: self.assertEqual(scrape.getHearthHeadId('Quick Shot', 'Spell', s), 'quick-shot') self.assertEqual(scrape.getHearthHeadId('Undercity Valiant', 'Minion', s), 'undercity-valiant') self.assertEqual(scrape.getHearthHeadId('Gorehowl', 'Weapon', s), 'gorehowl') self.assertEqual(scrape.getHearthHeadId('V-07-TR-0N', 'Minion', s), 'v-07-tr-0n') self.assertEqual(scrape.getHearthHeadId("Al'Akir the Windlord", 'Minion', s), 'alakir-the-windlord')
def test_Hearthpwn(self): with requests.Session() as s: self.assertEqual(scrape.getHearthpwnIdAndUrl('Quick Shot', 'Blackrock Mountain', 'Spell', False, s), (14459, 'https://media-hearth.cursecdn.com/avatars/328/302/14459.png')) self.assertEqual(scrape.getHearthpwnIdAndUrl('Upgrade!', 'Classic', 'Spell', False, s), (638, 'https://media-hearth.cursecdn.com/avatars/330/899/638.png'))
def loadTokens(tokens = {}, wantedTokens = {}): resultCards = {} with requests.Session() as session: for name, ids in wantedTokens.items(): card = None if 'id' in ids: card = tokens[ids['id']] if name != card['name']: log.warning('loadTokens() names do not match: %s - %s', name, tokens[ids['id']]['name']) if 'id' not in ids: for token in tokens.values(): if name == token['name']: if card: log.warning('loadTokens() found token again: %s', name) card = token if not card: log.warning('loadTokens() could not find: %s', name) exit() r = session.get('http://www.hearthpwn.com/cards/{}'.format(ids['hpwn'])) r.raise_for_status() image = fromstring(r.text).xpath('//img[@class="hscard-static"]')[0].get('src') if not image: image = 'https://media-hearth.cursecdn.com/avatars/148/738/687.png' card['cdn'] = image.replace('http://', 'https://').lower() card['hpwn'] = ids['hpwn'] card['head'] = getHearthHeadId(card['name'], "ignored", "ignored") # since jade golem: overwrite scraped stats with prepared ones card['atk'] = ids.get('atk', card['atk']) card['cost'] = ids.get('cost', card['cost']) card['hp'] = ids.get('hp', card['hp']) resultCards[card['name']] = card print('.', end='') return resultCards
def download_file_from_google_drive(id, destination): URL = "https://docs.google.com/uc?export=download" session = requests.Session() response = session.get(URL, params={ 'id': id }, stream=True) token = get_confirm_token(response) if token: params = { 'id' : id, 'confirm' : token } response = session.get(URL, params=params, stream=True) save_response_content(response, destination)
def __init__(self, config): self.dbFilename = "downloads.db" self.showsFilename = "SHOWS" self.sourceIP = "" self.transmissionHostRemote = "" self.userpass = "" self.downloadQuality = [720, 1080] self.speakDownload = True # We don't retain 'proxs' or 'requestsFromSource' in this # instance since they get stored/retained inside TorrentApiController proxs = {} requestsFromSource = requests.Session() self.establishConfiguration(config, requestsFromSource, proxs) self.torrentController = TorrentApiController(requestsFromSource, proxs) self.establishDatabase()
def _reset_session(self): ''' Get a new request session and try logging into the current CVP node. If the login succeeded None will be returned and self.session will be valid. If the login failed then an exception error will be returned and self.session will be set to None. ''' self.session = requests.Session() error = None try: self._login() except (ConnectionError, CvpApiError, CvpRequestError, CvpSessionLogOutError, HTTPError, ReadTimeout, Timeout, TooManyRedirects) as error: self.log.error(error) # Any error that occurs during login is a good reason not to use # this CVP node. self.session = None return error
def getSession(self): if self._session is None: self._session = requests.Session() return self._session
def __init__(self, user_mode): self.user_mode = user_mode self.interface = None self.quota = None self.is_online = None self.new_api = None self.json_decoder = json.JSONDecoder() self.session = requests.Session() self.csrf_token = None self.resolver = dns.resolver.Resolver() self.resolver.nameservers = ['172.16.0.1'] self.api_host_ip = self.resolve_url(self.api_host) self.api_host_new_ip = self.resolve_url(self.api_host_new)
def gethtml(link): user_agent = "Mozilla/5.0 (Windows NT 6.1; rv:38.0) Gecko/20100101 Firefox/38.0" headers={'user-agent':user_agent} s = requests.Session() try: res = s.get(link,headers=headers).text except requests.exceptions.InvalidSchema: req=urllib2.Request(link,None,headers) r = urllib2.urlopen(req) res = r.read().decode('utf8') return res
def main(): # Start a session so we can have persistant cookies # Session() >> http://docs.python-requests.org/en/latest/api/#request-sessions session = requests.Session() # This is the form data that the page sends when logging in login_data = { 'username': RegisterNumber, 'password': DateofBirth, 'submit': 'id', } print login_data # Authenticate r = session.post(URL, data = login_data) # Try accessing a page that requires you to be logged in r = session.get('http://sdpdr.nic.in/annauniv/result') web = QWebView() web.load(QUrl("http://sdpdr.nic.in/annauniv/result")) #web.show() printer = QPrinter() printer.setPageSize(QPrinter.A4) printer.setOutputFormat(QPrinter.PdfFormat) printer.setOutputFileName("result.pdf") # convertion of page to pdf format
def __init__(self, word, dirpath=None, processNum=30): #if " " in word: #raise AttributeError("This Script Only Support Single Keyword!") self.word = word self.char_table = {ord(key): ord(value) for key, value in BaiduImgDownloader.char_table.items()} if not dirpath: dirpath = os.path.join(sys.path[0], 'results') self.dirpath = dirpath self.jsonUrlFile = os.path.join(sys.path[0], 'jsonUrl.txt') self.logFile = os.path.join(sys.path[0], 'logInfo.txt') self.errorFile = os.path.join(sys.path[0], 'errorUrl.txt') if os.path.exists(self.errorFile): os.remove(self.errorFile) if not os.path.exists(self.dirpath): os.mkdir(self.dirpath) self.pool = Pool(30) self.session = requests.Session() self.session.headers = BaiduImgDownloader.headers self.queue = Queue() self.messageQueue = Queue() self.index = 0 self.promptNum = 10 self.lock = threading.Lock() self.delay = 1.5 self.QUIT = "QUIT" self.printPrefix = "**" self.processNum = processNum