我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用requests.utils()。
def loadCartAndCheckout(self): #Import Cookies driver = webdriver.Chrome(executable_path="./chromedriver") driver.delete_all_cookies() driver.get(self.URL_cart) cookies = requests.utils.dict_from_cookiejar(self.user_session.cookies) for cookie in cookies.items(): cookie_dict = {'name': '', 'value': '', 'path': '/'} cookie_dict['name'] = cookie[0] cookie_dict['value'] = cookie[1] driver.add_cookie(cookie_dict) driver.get(self.URL_cart) #time.sleep(5) #driver.quit()
def acquire_authentication_cookie(self, options): """Retrieve SPO auth cookie""" logger = self.logger(self.acquire_authentication_cookie.__name__) url = options['endpoint'] session = requests.session() logger.debug_secrets("session: %s\nsession.post(%s, data=%s)", session, url, self.token) session.post(url, data=self.token, headers={'Content-Type': 'application/x-www-form-urlencoded'}) logger.debug_secrets("session.cookies: %s", session.cookies) cookies = requests.utils.dict_from_cookiejar(session.cookies) logger.debug_secrets("cookies: %s", cookies) if 'FedAuth' in cookies and 'rtFa' in cookies: self.FedAuth = cookies['FedAuth'] self.rtFa = cookies['rtFa'] return True self.error = "An error occurred while retrieving auth cookies" logger.error(self.error) return False
def load_session_from_file(self, username: str, filename: Optional[str] = None) -> None: """Internally stores :class:`requests.Session` object loaded from file. If filename is None, the file with the default session path is loaded. :raises FileNotFoundError: If the file does not exist. """ if filename is None: filename = get_default_session_filename(username) with open(filename, 'rb') as sessionfile: session = requests.Session() session.cookies = requests.utils.cookiejar_from_dict(pickle.load(sessionfile)) session.headers.update(self._default_http_header()) session.headers.update({'X-CSRFToken': session.cookies.get_dict()['csrftoken']}) self._log("Loaded session from %s." % filename) self.session = session self.username = username
def authenticated(func): def wrapper(self, *args, **kwargs): success = False # ??????cookie??, ???cookie???? if 'z_c0' in requests.utils.dict_from_cookiejar(self.cookies): from ..url import URL r = self._execute(method="get", url=URL.profile(user_slug="zhijun-liu")) success = r.ok while not success: account = input("???Email??????:") password = input("?????:") obj = Account() data = obj.login(account, password) if data.get("r") == 0: success = True self.cookies = obj.cookies else: print(data.get("msg")) else: return func(self, *args, **kwargs) return wrapper
def _search(self, limit, format): ''' Returns a list of result objects, with the url for the next page bing search url. ''' url = self.QUERY_URL.format(requests.utils.quote("'{}'".format(self.query)), min(50, limit), self.current_offset, format) r = requests.get(url, auth=("", self.api_key)) try: json_results = r.json() except ValueError as vE: if not self.safe: raise PyBingWebException("Request returned with code %s, error msg: %s" % (r.status_code, r.text)) else: print ("[ERROR] Request returned with code %s, error msg: %s. \nContinuing in 5 seconds." % ( r.status_code, r.text)) time.sleep(5) packaged_results = [WebResult(single_result_json) for single_result_json in json_results['d']['results']] self.current_offset += min(50, limit, len(packaged_results)) return packaged_results
def _search(self, limit, format): ''' Returns a list of result objects, with the url for the next page bing search url. ''' url = self.QUERY_URL.format(requests.utils.quote("'{}'".format(self.query)), min(50, limit), self.current_offset, format) r = requests.get(url, auth=("", self.api_key)) try: json_results = r.json() except ValueError as vE: if not self.safe: raise PyBingVideoException("Request returned with code %s, error msg: %s" % (r.status_code, r.text)) else: print ("[ERROR] Request returned with code %s, error msg: %s. \nContinuing in 5 seconds." % ( r.status_code, r.text)) time.sleep(5) packaged_results = [VideoResult(single_result_json) for single_result_json in json_results['d']['results']] self.current_offset += min(50, limit, len(packaged_results)) return packaged_results
def _search(self, limit, format): ''' Returns a list of result objects, with the url for the next page bing search url. ''' url = self.QUERY_URL.format(requests.utils.quote("'{}'".format(self.query)), min(50, limit), self.current_offset, format) r = requests.get(url, auth=("", self.api_key)) try: json_results = r.json() except ValueError as vE: if not self.safe: raise PyBingNewsException("Request returned with code %s, error msg: %s" % (r.status_code, r.text)) else: print ("[ERROR] Request returned with code %s, error msg: %s. \nContinuing in 5 seconds." % ( r.status_code, r.text)) time.sleep(5) packaged_results = [NewsResult(single_result_json) for single_result_json in json_results['d']['results']] self.current_offset += min(50, limit, len(packaged_results)) return packaged_results
def __init__(self, app, disable_background_sync): self.user_agent = 'Boartty/%s %s' % (boartty.version.version_info.release_string(), requests.utils.default_user_agent()) self.version = (0, 0, 0) self.offline = False self.app = app self.log = logging.getLogger('boartty.sync') self.queue = MultiQueue([HIGH_PRIORITY, NORMAL_PRIORITY, LOW_PRIORITY]) self.result_queue = queue.Queue() self.session = requests.Session() self.token = 'Bearer %s' % (self.app.config.token) self.submitTask(GetVersionTask(HIGH_PRIORITY)) self.submitTask(SyncOwnUserTask(HIGH_PRIORITY)) if not disable_background_sync: self.submitTask(UpdateStoriesTask(HIGH_PRIORITY)) self.submitTask(SyncProjectListTask(HIGH_PRIORITY)) self.submitTask(SyncUserListTask(HIGH_PRIORITY)) self.submitTask(SyncProjectSubscriptionsTask(NORMAL_PRIORITY)) self.submitTask(SyncSubscribedProjectsTask(NORMAL_PRIORITY)) self.submitTask(SyncBoardsTask(NORMAL_PRIORITY)) self.submitTask(SyncWorklistsTask(NORMAL_PRIORITY)) #self.submitTask(SyncSubscribedProjectBranchesTask(LOW_PRIORITY)) #self.submitTask(SyncOutdatedChangesTask(LOW_PRIORITY)) #self.submitTask(PruneDatabaseTask(self.app.config.expire_age, LOW_PRIORITY)) self.periodic_thread = threading.Thread(target=self.periodicSync) self.periodic_thread.daemon = True self.periodic_thread.start()
def copy_session(session: requests.Session) -> requests.Session: """Duplicates a requests.Session.""" new = requests.Session() new.cookies = \ requests.utils.cookiejar_from_dict(requests.utils.dict_from_cookiejar(session.cookies)) new.headers = session.headers.copy() return new
def save_session_to_file(self, filename: Optional[str] = None) -> None: """Saves internally stored :class:`requests.Session` object.""" if filename is None: filename = get_default_session_filename(self.username) dirname = os.path.dirname(filename) if dirname != '' and not os.path.exists(dirname): os.makedirs(dirname) os.chmod(dirname, 0o700) with open(filename, 'wb') as sessionfile: os.chmod(filename, 0o600) pickle.dump(requests.utils.dict_from_cookiejar(self.session.cookies), sessionfile) self._log("Saved session to %s." % filename)
def load_cookies(self, path): with open(path, 'rb') as f: self.session.cookies = requests.utils.cookiejar_from_dict(pickle.load(f))
def save_cookies(self, path): with open(path, 'wb') as f: cookies_dic = requests.utils.dict_from_cookiejar(self.session.cookies) pickle.dump(cookies_dic, f)
def login(ctx, url, username, password): """Performs login on the remote server at the specified URL.""" login_url = urljoin(url, "/hub/login") payload = {"username": username, "password": password} # Unfortunately, jupyterhub handles the afterlogin with an immediate # redirection, meaning that we have to check for a 302 and prevent # redirection in order to capture the cookies. try: response = requests.post(login_url, payload, verify=False, allow_redirects=False) except Exception as e: print("Could not perform request. {}".format(e), file=sys.stderr) sys.exit(1) if response.status_code == 302: cookies_dict = requests.utils.dict_from_cookiejar(response.cookies) cred = Credentials(url, username, cookies_dict) cred.write(ctx.obj.credentials_file) else: print("Failed to perform login. Server replied with error: {}".format( response.status_code), file=sys.stderr) sys.exit(1) # -------------------------------------------------------------------------
def test4(): from requests.utils import get_netrc_auth url = "http://www.126.com" print get_netrc_auth(url)
def _search(self, limit, format): ''' Returns a list of result objects, with the url for the next page bing search url. Image filters: Array of strings that filter the response the API sends based on size, aspect, color, style, face or any combination thereof. Valid values are: Size:Small, Size:Medium, Size:Large, Size:Width:[Width], Size:Height:[Height], Aspect:Square, Aspect:Wide, Aspect:Tall, Color:Color, Color:Monochrome, Style:Photo, Style:Graphics, Face:Face, Face:Portrait, Face:Other. ''' url = self.QUERY_URL.format(requests.utils.quote("'{}'".format(self.query)), min(50, limit), self.current_offset, format, requests.utils.quote("'{}'".format(self.image_filters))) r = requests.get(url, auth=("", self.api_key)) try: json_results = r.json() except ValueError as vE: if not self.safe: raise PyBingImageException("Request returned with code %s, error msg: %s" % (r.status_code, r.text)) else: print ("[ERROR] Request returned with code %s, error msg: %s. \nContinuing in 5 seconds." % ( r.status_code, r.text)) time.sleep(5) packaged_results = [ImageResult(single_result_json) for single_result_json in json_results['d']['results']] self.current_offset += min(50, limit, len(packaged_results)) return packaged_results
def save_cookies(self, path): with open(path, 'wb') as f: cookies_dic = requests.utils.dict_from_cookiejar(self.session.cookies) pickle.dump(cookies_dic, f) # ??????????
def save_cookies(self, path): with open(path, 'wb') as f: cookies_dic = requests.utils.dict_from_cookiejar(self.session.cookies) pickle.dump(cookies_dic, f) #??????
def login(): #cf = open('.cookie','r') if os.path.exists(cookie_file_name): cf = open(cookie_file_name,'r') cookies = json.load(cf) s.cookies.update(cookies) logging.info("Load cookies from cookie file: " + str(cookies)) r = s.get(website+"/user/login",headers = headers) print("Old cookies:" + str(r.headers)) else: user = config.get('user','id') password = config.get('user','password') logging.info("Login as " + user) url = website + '/User/Login/ajaxLogin' payload = 'account=%s&password=%s&from=loginpage&remember=0&url_back='%(user, password) r = s.post(url, headers=headers, data=payload) cookies = requests.utils.dict_from_cookiejar(r.cookies) logging.info("Login cookie " + str(cookies)) print("New Cookies:" + str(cookies)) with open(cookie_file_name,'w') as cf: json.dump(cookies, cf)
def test_html_charset(self): """HTML5 meta charset attribute""" content = '<meta charset="UTF-8">' encodings = requests.utils.get_encodings_from_content(content) assert len(encodings) == 1 assert encodings[0] == 'UTF-8'
def test_html4_pragma(self): """HTML4 pragma directive""" content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8">' encodings = requests.utils.get_encodings_from_content(content) assert len(encodings) == 1 assert encodings[0] == 'UTF-8'
def test_xhtml_pragma(self): """XHTML 1.x served with text/html MIME type""" content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8" />' encodings = requests.utils.get_encodings_from_content(content) assert len(encodings) == 1 assert encodings[0] == 'UTF-8'
def test_xml(self): """XHTML 1.x served as XML""" content = '<?xml version="1.0" encoding="UTF-8"?>' encodings = requests.utils.get_encodings_from_content(content) assert len(encodings) == 1 assert encodings[0] == 'UTF-8'
def test_precedence(self): content = ''' <?xml version="1.0" encoding="XML"?> <meta charset="HTML5"> <meta http-equiv="Content-type" content="text/html;charset=HTML4" /> '''.strip() encodings = requests.utils.get_encodings_from_content(content) assert encodings == ['HTML5', 'HTML4', 'XML']
def test_super_len_correctly_calculates_len_of_partially_read_file(self): """Ensure that we handle partially consumed file like objects.""" from requests.utils import super_len s = StringIO.StringIO() s.write('foobarbogus') assert super_len(s) == 0
def test_get_environ_proxies_ip_ranges(self): """Ensures that IP addresses are correctly matches with ranges in no_proxy variable.""" from requests.utils import get_environ_proxies os.environ['no_proxy'] = "192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1" assert get_environ_proxies('http://192.168.0.1:5000/') == {} assert get_environ_proxies('http://192.168.0.1/') == {} assert get_environ_proxies('http://172.16.1.1/') == {} assert get_environ_proxies('http://172.16.1.1:5000/') == {} assert get_environ_proxies('http://192.168.1.1:5000/') != {} assert get_environ_proxies('http://192.168.1.1/') != {}
def test_get_environ_proxies(self): """Ensures that IP addresses are correctly matches with ranges in no_proxy variable.""" from requests.utils import get_environ_proxies os.environ['no_proxy'] = "127.0.0.1,localhost.localdomain,192.168.0.0/24,172.16.1.1" assert get_environ_proxies( 'http://localhost.localdomain:5000/v1.0/') == {} assert get_environ_proxies('http://www.requests.com/') != {}
def test_guess_filename_when_int(self): from requests.utils import guess_filename assert None is guess_filename(1)
def test_guess_filename_with_file_like_obj(self): from requests.utils import guess_filename from requests import compat fake = type('Fake', (object,), {'name': b'value'})() guessed_name = guess_filename(fake) assert b'value' == guessed_name assert isinstance(guessed_name, compat.bytes)
def test_guess_filename_with_unicode_name(self): from requests.utils import guess_filename from requests import compat filename = b'value'.decode('utf-8') fake = type('Fake', (object,), {'name': filename})() guessed_name = guess_filename(fake) assert filename == guessed_name assert isinstance(guessed_name, compat.str)
def test_is_ipv4_address(self): from requests.utils import is_ipv4_address assert is_ipv4_address('8.8.8.8') assert not is_ipv4_address('8.8.8.8.8') assert not is_ipv4_address('localhost.localdomain')
def test_is_valid_cidr(self): from requests.utils import is_valid_cidr assert not is_valid_cidr('8.8.8.8') assert is_valid_cidr('192.168.1.0/24')
def test_dotted_netmask(self): from requests.utils import dotted_netmask assert dotted_netmask(8) == '255.0.0.0' assert dotted_netmask(24) == '255.255.255.0' assert dotted_netmask(25) == '255.255.255.128'
def test_get_auth_from_url(self): """Ensures that username and password in well-encoded URI as per RFC 3986 are correclty extracted.""" from requests.utils import get_auth_from_url from requests.compat import quote percent_encoding_test_chars = "%!*'();:@&=+$,/?#[] " url_address = "request.com/url.html#test" url = "http://" + quote( percent_encoding_test_chars, '') + ':' + quote( percent_encoding_test_chars, '') + '@' + url_address (username, password) = get_auth_from_url(url) assert username == percent_encoding_test_chars assert password == percent_encoding_test_chars
def test_requote_uri_with_unquoted_percents(self): """Ensure we handle unquoted percent signs in redirects. See: https://github.com/kennethreitz/requests/issues/2356 """ from requests.utils import requote_uri bad_uri = 'http://example.com/fiz?buz=%ppicture' quoted = 'http://example.com/fiz?buz=%25ppicture' assert quoted == requote_uri(bad_uri)
def test_requote_uri_properly_requotes(self): """Ensure requoting doesn't break expectations.""" from requests.utils import requote_uri quoted = 'http://example.com/fiz?buz=%25ppicture' assert quoted == requote_uri(quoted)