我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用requests.sessions()。
def _get_shortname_from_docs(auth, types_url): # Get the server version from types.xsd. We can't necessarily use the service auth type since it may not be the # same as the auth type for docs. log.debug('Getting %s with auth type %s', types_url, auth.__class__.__name__) # Some servers send an empty response if we send 'Connection': 'close' header from .protocol import BaseProtocol with requests.sessions.Session() as s: s.mount('http://', adapter=BaseProtocol.get_adapter()) s.mount('https://', adapter=BaseProtocol.get_adapter()) r = s.get(url=types_url, auth=auth, allow_redirects=False, stream=False) log.debug('Request headers: %s', r.request.headers) log.debug('Response code: %s', r.status_code) log.debug('Response headers: %s', r.headers) if r.status_code != 200: raise TransportError('Unexpected HTTP status %s when getting %s (%s)' % (r.status_code, types_url, r.text)) if not is_xml(r.text): raise TransportError('Unexpected result when getting %s. Maybe this is not an EWS server?%s' % ( types_url, '\n\n%s[...]' % r.text[:200] if len(r.text) > 200 else '\n\n%s' % r.text if r.text else '', )) return to_xml(r.text).get('version')
def guess(cls, protocol): """ Tries to ask the server which version it has. We haven't set up an Account object yet, so we generate requests by hand. We only need a response header containing a ServerVersionInfo element. The types.xsd document contains a 'shortname' value that we can use as a key for VERSIONS to get the API version that we need in SOAP headers to generate valid requests. Unfortunately, the Exchagne server may be misconfigured to either block access to types.xsd or serve up a wrong version of the document. Therefore, we only use 'shortname' as a hint, but trust the SOAP version returned in response headers. To get API version and build numbers from the server, we need to send a valid SOAP request. We can't do that without a valid API version. To solve this chicken-and-egg problem, we try all possible API versions that this package supports, until we get a valid response. If we managed to get a 'shortname' previously, we try the corresponding API version first. """ log.debug('Asking server for version info') # We can't use a session object from the protocol pool for docs because sessions are created with service auth. auth = get_auth_instance(credentials=protocol.credentials, auth_type=protocol.docs_auth_type) try: shortname = cls._get_shortname_from_docs(auth=auth, types_url=protocol.types_url) log.debug('Shortname according to %s: %s', protocol.types_url, shortname) except (TransportError, ParseError) as e: log.info(text_type(e)) shortname = None api_version = VERSIONS[shortname][0] if shortname else None return cls._guess_version_from_service(protocol=protocol, hint=api_version)
def close(self): log.debug('Server %s: Closing sessions', self.server) while True: try: self._session_pool.get(block=False).close() except Empty: break
def get_session(self): _timeout = 60 # Rate-limit messages about session starvation while True: try: log.debug('Server %s: Waiting for session', self.server) session = self._session_pool.get(timeout=_timeout) log.debug('Server %s: Got session %s', self.server, session.session_id) return session except Empty: # This is normal when we have many worker threads starving for available sessions log.debug('Server %s: No sessions available for %s seconds', self.server, _timeout)
def release_session(self, session): # This should never fail, as we don't have more sessions than the queue contains log.debug('Server %s: Releasing session %s', self.server, session.session_id) try: self._session_pool.put(session, block=False) except Full: log.debug('Server %s: Session pool was already full %s', self.server, session.session_id)
def __init__(self, *args, **kwargs): version = kwargs.pop('version', None) super(Protocol, self).__init__(*args, **kwargs) scheme = 'https' if self.has_ssl else 'http' self.wsdl_url = '%s://%s/EWS/Services.wsdl' % (scheme, self.server) self.messages_url = '%s://%s/EWS/messages.xsd' % (scheme, self.server) self.types_url = '%s://%s/EWS/types.xsd' % (scheme, self.server) # Autodetect authentication type if necessary if self.auth_type is None: self.auth_type = get_service_authtype(service_endpoint=self.service_endpoint, versions=API_VERSIONS, name=self.credentials.username) # Default to the auth type used by the service. We only need this if 'version' is None self.docs_auth_type = self.auth_type # Try to behave nicely with the Exchange server. We want to keep the connection open between requests. # We also want to re-use sessions, to avoid the NTLM auth handshake on every request. self._session_pool = LifoQueue(maxsize=self.SESSION_POOLSIZE) for _ in range(self.SESSION_POOLSIZE): self._session_pool.put(self.create_session(), block=False) if version: isinstance(version, Version) self.version = version else: # Version.guess() needs auth objects and a working session pool try: # Try to get the auth_type of 'types.xsd' so we can fetch it and look at the version contained there self.docs_auth_type = get_docs_authtype(docs_url=self.types_url) except TransportError: pass self.version = Version.guess(self) # Used by services to process service requests that are able to run in parallel. Thread pool should be # larger than the connection pool so we have time to process data without idling the connection. # Create the pool as the last thing here, since we may fail in the version or auth type guessing, which would # leave open threads around to be garbage collected. thread_poolsize = 4 * self.SESSION_POOLSIZE self.thread_pool = ThreadPool(processes=thread_poolsize)
def test_basicauth_with_netrc(self, httpbin): auth = ('user', 'pass') wrong_auth = ('wronguser', 'wrongpass') url = httpbin('basic-auth', 'user', 'pass') old_auth = requests.sessions.get_netrc_auth try: def get_netrc_auth_mock(url): return auth requests.sessions.get_netrc_auth = get_netrc_auth_mock # Should use netrc and work. r = requests.get(url) assert r.status_code == 200 # Given auth should override and fail. r = requests.get(url, auth=wrong_auth) assert r.status_code == 401 s = requests.session() # Should use netrc and work. r = s.get(url) assert r.status_code == 200 # Given auth should override and fail. s.auth = wrong_auth r = s.get(url) assert r.status_code == 401 finally: requests.sessions.get_netrc_auth = old_auth