我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib2.HTTPBasicAuthHandler()。
def download_from_url(url): proxy = env_server.get_proxy() if proxy['enabled']: server = proxy['server'].replace('http://', '') proxy_dict = { 'http': 'http://{login}:{pass}@{0}'.format(server, **proxy) } proxy_handler = urllib2.ProxyHandler(proxy_dict) auth = urllib2.HTTPBasicAuthHandler() opener = urllib2.build_opener(proxy_handler, auth, urllib2.HTTPHandler) urllib2.install_opener(opener) run_thread = tc.ServerThread(env_inst.ui_main) run_thread.kwargs = dict(url=url, timeout=1) run_thread.routine = urllib2.urlopen run_thread.run() result_thread = tc.treat_result(run_thread, silent=True) if result_thread.isFailed(): return False else: return result_thread.result
def run(self): password = getword() try: print "-"*12 print "User:",username,"Password:",password req = urllib2.Request(sys.argv[1]) passman = urllib2.HTTPPasswordMgrWithDefaultRealm() passman.add_password(None, sys.argv[1], username, password) authhandler = urllib2.HTTPBasicAuthHandler(passman) opener = urllib2.build_opener(authhandler) fd = opener.open(req) print "\t\n\n[+] Login successful: Username:",username,"Password:",password,"\n" print "[+] Retrieved", fd.geturl() info = fd.info() for key, value in info.items(): print "%s = %s" % (key, value) sys.exit(2) except (urllib2.HTTPError,socket.error): pass
def run(self): username, password = getword() try: print "-"*12 print "User:",username,"Password:",password req = urllib2.Request(sys.argv[1]) passman = urllib2.HTTPPasswordMgrWithDefaultRealm() passman.add_password(None, sys.argv[1], username, password) authhandler = urllib2.HTTPBasicAuthHandler(passman) opener = urllib2.build_opener(authhandler) fd = opener.open(req) print "\t\n\nUsername:",username,"Password:",password,"----- Login successful!!!\n\n" print "Retrieved", fd.geturl() info = fd.info() for key, value in info.items(): print "%s = %s" % (key, value) sys.exit(2) except (urllib2.HTTPError, httplib.BadStatusLine,socket.error), msg: print "An error occurred:", msg pass
def _update_opener(self): ''' Builds and installs a new opener to be used by all future calls to :func:`urllib2.urlopen`. ''' if self._http_debug: http = urllib2.HTTPHandler(debuglevel=1) else: http = urllib2.HTTPHandler() if self._proxy: opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj), urllib2.ProxyHandler({'http': self._proxy}), urllib2.HTTPBasicAuthHandler(), http) else: opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cj), urllib2.HTTPBasicAuthHandler(), http) urllib2.install_opener(opener)
def authenticate(top_level_url=u'https://api.github.com'): try: if 'GH_AUTH_USER' not in os.environ: try: username = raw_input(u'Username: ') except NameError: username = input(u'Username: ') else: username = os.environ['GH_AUTH_USER'] if 'GH_AUTH_PASS' not in os.environ: password = getpass.getpass(u'Password: ') else: password = os.environ['GH_AUTH_USER'] except KeyboardInterrupt: sys.exit(u'') try: import urllib.request as urllib_alias except ImportError: import urllib2 as urllib_alias password_mgr = urllib_alias.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None, top_level_url, username, password) handler = urllib_alias.HTTPBasicAuthHandler(password_mgr) opener = urllib_alias.build_opener(handler) urllib_alias.install_opener(opener)
def test_basic_auth_with_unquoted_realm(self): opener = OpenerDirector() password_manager = MockPasswordManager() auth_handler = urllib2.HTTPBasicAuthHandler(password_manager) realm = "ACME Widget Store" http_handler = MockHTTPHandler( 401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm) opener.add_handler(auth_handler) opener.add_handler(http_handler) msg = "Basic Auth Realm was unquoted" with test_support.check_warnings((msg, UserWarning)): self._test_basic_auth(opener, auth_handler, "Authorization", realm, http_handler, password_manager, "http://acme.example.com/protected", "http://acme.example.com/protected" )
def __init__(self, configuration): self.setup(configuration) self.echo = None if "ECHO" in configuration: self.echo = configuration['ECHO'] if self.proxy_scheme is not None and self.proxy_host is not None and \ self.proxy_port is not None: credentials = "" if self.proxy_username is not None and self.proxy_password is not None: credentials = self.proxy_username + ":" + self.proxy_password + "@" proxyDict = { self.proxy_scheme: self.proxy_scheme + "://" + credentials + self.proxy_host + ":" + self.proxy_port } proxy = urllib2.ProxyHandler(proxyDict) if credentials != '': auth = urllib2.HTTPBasicAuthHandler() opener = urllib2.build_opener(proxy, auth, urllib2.HTTPHandler) else: opener = urllib2.build_opener(proxy) urllib2.install_opener(opener)
def update_tags(self,tags,expression): # support cases where we might be doing basic auth through a proxy if self.MOLOCH_AUTH == "basic": auth_handler = urllib2.HTTPPasswordMgrWithDefaultRealm() auth_handler.add_password(None, self.MOLOCH_URL, self.MOLOCH_USER, self.MOLOCH_PASSWORD) handler = urllib2.HTTPBasicAuthHandler(auth_handler) opener = urllib2.build_opener(handler) else: auth_handler = urllib2.HTTPDigestAuthHandler() auth_handler.add_password(self.MOLOCH_REALM, self.MOLOCH_URL, self.MOLOCH_USER, self.MOLOCH_PASSWORD) opener = urllib2.build_opener(auth_handler) data = urllib.urlencode({'tags' : tags}) qstring = urllib.urlencode({'date' : "-1",'expression' : expression}) TAG_URL = self.MOLOCH_URL + 'addTags?' + qstring try: response = opener.open(TAG_URL,data=data) if response.code == 200: plain_answer = response.read() json_data = json.loads(plain_answer) except Exception, e: log.warning("Moloch: Unable to update tags %s" % (e))
def __init__(self, base_url, exc_class=None, logger=None): """ @param base_url: The base url to the API. @param exc_class: An exception class to handle non-200 results. Creates an HTTP(S) client to connect to the Cloudera Manager API. """ self._base_url = base_url.rstrip('/') self._exc_class = exc_class or RestException self._logger = logger or LOG self._headers = { } # Make a basic auth handler that does nothing. Set credentials later. self._passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm() authhandler = urllib2.HTTPBasicAuthHandler(self._passmgr) # Make a cookie processor cookiejar = cookielib.CookieJar() self._opener = urllib2.build_opener( HTTPErrorProcessor(), urllib2.HTTPCookieProcessor(cookiejar), authhandler)
def httpConnection(url, proxy): #TODO: habilitar autenticacion ntlm if (proxy.auth == "ntlm"): passman = urllib2.HTTPPasswordMgrWithDefaultRealm() passman.add_password(None, proxy.url, proxy.user, proxy.password) auth = HTTPNtlmAuthHandler.HTTPNtlmAuthHandler(passman) else: passman = urllib2.HTTPPasswordMgr() passman.add_password(None, proxy.url, proxy.user, proxy.password) auth = urllib2.HTTPBasicAuthHandler(passman) if (proxy.url): proxy = urllib2.ProxyHandler({'http': proxy.url}) opener = urllib2.build_opener(proxy.url, auth, urllib2.HTTPHandler) urllib2.install_opener(opener) return urllib2.urlopen(url)
def threader(site): username, password = getword() global logins try: print "-"*12 print "User:",username,"Password:",password req = urllib2.Request(site) passman = urllib2.HTTPPasswordMgrWithDefaultRealm() passman.add_password(None, site, username, password) authhandler = urllib2.HTTPBasicAuthHandler(passman) opener = urllib2.build_opener(authhandler) fd = opener.open(req) site = urllib2.urlopen(fd.geturl()).read() print "\n[+] Checking the authenticity of the login...\n" if not re.search(('denied'), site.lower()): print "\t\n\n[+] Username:",username,"Password:",password,"----- Login successful!!!\n\n" print "[+] Writing Successful Login:",sys.argv[5],"\n" logins +=1 file = open(sys.argv[5], "a") file.writelines("Site: "+site+" Username: "+username+ " Password: "+password+"\n") file.close() print "Retrieved", fd.geturl() info = fd.info() for key, value in info.items(): print "%s = %s" % (key, value) else: print "- Redirection" except (urllib2.HTTPError, httplib.BadStatusLine,socket.error), msg: print "An error occurred:", msg pass
def run(self): username, password = getword() try: print "-"*12 print "User:",username,"Password:",password auth_handler = urllib2.HTTPBasicAuthHandler() auth_handler.add_password("cPanel", server, base64encodestring(username)[:-1], base64encodestring(password)[:-1]) opener = urllib2.build_opener(auth_handler) urllib2.install_opener(opener) urllib2.urlopen(server) print "\t\n\nUsername:",username,"Password:",password,"----- Login successful!!!\n\n" except (urllib2.HTTPError, httplib.BadStatusLine), msg: #print "An error occurred:", msg pass
def __init__(self, api_key, url=GCM_URL, proxy=None): """ api_key : google api key url: url of gcm service. proxy: can be string "http://host:port" or dict {'https':'host:port'} """ self.api_key = api_key self.url = url if proxy: if isinstance(proxy, basestring): protocol = url.split(':')[0] proxy = {protocol: proxy} auth = urllib2.HTTPBasicAuthHandler() opener = urllib2.build_opener(urllib2.ProxyHandler(proxy), auth, urllib2.HTTPHandler) urllib2.install_opener(opener)
def __build_url_opener(self, uri): """ Build the url opener for managing HTTP Basic Athentication """ # Create an OpenerDirector with support # for Basic HTTP Authentication... password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(realm=None, uri=uri, user=self.client_id, passwd=self.client_secret) handler = urllib2.HTTPBasicAuthHandler(password_mgr) opener = urllib2.build_opener(handler) return opener
def getUrl(self,url, ischunkDownloading=False): try: post=None print 'url',url #openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler) cookie_handler = urllib2.HTTPCookieProcessor(self.cookieJar) openner = urllib2.build_opener(cookie_handler, urllib2.HTTPBasicAuthHandler(), urllib2.HTTPHandler()) if post: req = urllib2.Request(url, post) else: req = urllib2.Request(url) ua_header=False if self.clientHeader: for n,v in self.clientHeader: req.add_header(n,v) if n=='User-Agent': ua_header=True if not ua_header: req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; Win64; x64; Trident/7.0; rv:11.0) like Gecko') #response = urllib2.urlopen(req) if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ): req.set_proxy(self.proxy, 'http') response = openner.open(req) data=response.read() return data except: print 'Error in getUrl' traceback.print_exc() return None
def test_basic_auth_success(self): ah = urllib2.HTTPBasicAuthHandler() ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD) urllib2.install_opener(urllib2.build_opener(ah)) try: self.assertTrue(urllib2.urlopen(self.server_url)) except urllib2.HTTPError: self.fail("Basic Auth Failed for url: %s" % self.server_url) except Exception as e: raise e
def test_basic_auth_httperror(self): ah = urllib2.HTTPBasicAuthHandler() ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD) urllib2.install_opener(urllib2.build_opener(ah)) self.assertRaises(urllib2.HTTPError, urllib2.urlopen, self.server_url)
def test_basic_auth(self, quote_char='"'): opener = OpenerDirector() password_manager = MockPasswordManager() auth_handler = urllib2.HTTPBasicAuthHandler(password_manager) realm = "ACME Widget Store" http_handler = MockHTTPHandler( 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' % (quote_char, realm, quote_char) ) opener.add_handler(auth_handler) opener.add_handler(http_handler) self._test_basic_auth(opener, auth_handler, "Authorization", realm, http_handler, password_manager, "http://acme.example.com/protected", "http://acme.example.com/protected" )
def __init__(self, username, password=None): # Get password if necessary if password is None: password = getpass() # Get URL for the database self.db_url = "http://galaxy-catalogue.dur.ac.uk:8080/Eagle" # Set up authentication and cookies self.password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() self.password_mgr.add_password(None, self.db_url, username, password) self.opener = urllib2.OpenerDirector() self.auth_handler = urllib2.HTTPBasicAuthHandler(self.password_mgr) self.cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar) ## This functions executes an SQL query on the database and returns the result as a record array.
def __init__(self, *args, **kwargs): urllib2.HTTPBasicAuthHandler.__init__(self, *args, **kwargs) self.retried_req = set() self.retried_count = 0
def http_error_auth_reqed(self, auth_header, host, req, headers): # Reset the retry counter once for each request. if hash(req) not in self.retried_req: self.retried_req.add(hash(req)) self.retried_count = 0 else: if self.retried_count > 5: raise urllib2.HTTPError(req.get_full_url(), 401, "basic auth failed", headers, None) else: self.retried_count += 1 return urllib2.HTTPBasicAuthHandler.http_error_auth_reqed( self, auth_header, host, req, headers)
def call_api(self, path): '''Call the REST API and convert the results into JSON.''' url = '{0}://{1}:{2}/api/{3}'.format(self.protocol, self.host_name, self.port, path) password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None, url, self.user_name, self.password) handler = urllib2.HTTPBasicAuthHandler(password_mgr) logging.debug('Issue a rabbit API call to get data on ' + path) return json.loads(urllib2.build_opener(handler).open(url).read())
def test_urlopen(): # urllib url = urllib.quote('file:///bin/ls') urllib.urlopen(url, 'blah', 32) urllib.urlretrieve('file:///bin/ls', '/bin/ls2') opener = urllib.URLopener() opener.open('file:///bin/ls') opener.retrieve('file:///bin/ls') opener = urllib.FancyURLopener() opener.open('file:///bin/ls') opener.retrieve('file:///bin/ls') # urllib2 handler = urllib2.HTTPBasicAuthHandler() handler.add_password(realm='test', uri='http://mysite.com', user='bob') opener = urllib2.build_opener(handler) urllib2.install_opener(opener) urllib2.urlopen('file:///bin/ls') urllib2.Request('file:///bin/ls') # Python 3 urllib.request.urlopen('file:///bin/ls') urllib.request.urlretrieve('file:///bin/ls', '/bin/ls2') opener = urllib.request.URLopener() opener.open('file:///bin/ls') opener.retrieve('file:///bin/ls') opener = urllib.request.FancyURLopener() opener.open('file:///bin/ls') opener.retrieve('file:///bin/ls') # Six six.moves.urllib.request.urlopen('file:///bin/ls') six.moves.urllib.request.urlretrieve('file:///bin/ls', '/bin/ls2') opener = six.moves.urllib.request.URLopener() opener.open('file:///bin/ls') opener.retrieve('file:///bin/ls') opener = six.moves.urllib.request.FancyURLopener() opener.open('file:///bin/ls') opener.retrieve('file:///bin/ls')
def getUrl(self,url, ischunkDownloading=False): try: post=None print 'url',url openner = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPSHandler) #cookie_handler = urllib2.HTTPCookieProcessor(self.cookieJar) #openner = urllib2.build_opener(cookie_handler, urllib2.HTTPBasicAuthHandler(), urllib2.HTTPHandler()) if post: req = urllib2.Request(url, post) else: req = urllib2.Request(url) ua_header=False if self.clientHeader: for n,v in self.clientHeader: req.add_header(n,v) if n=='User-Agent': ua_header=True if not ua_header: req.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; WOW64; rv:30.0) Gecko/20100101 Firefox/30.0') #response = urllib2.urlopen(req) if self.proxy and ( (not ischunkDownloading) or self.use_proxy_for_chunks ): req.set_proxy(self.proxy, 'http') response = openner.open(req) data=response.read() return data except: print 'Error in getUrl' traceback.print_exc() return None
def auth (usr, pwd, dhusAlt=None): """Globally install Basic Auth, and set site specific string constants.""" SITE["NAME"] = dhusAlt if dhusAlt is not None else "SciHub" site,collspec = "https://scihub.copernicus.eu/", "producttype:S2MSI%s" if SITE["NAME"]=="CODE-DE": site,collspec = "https://code-de.org/", "platformname:Sentinel-2" SITE["BASE"] = site+"dhus/" # pm=urllib2.HTTPPasswordMgrWithDefaultRealm() # pm.add_password(None, SITE["BASE"], usr, pwd) # urllib2.install_opener(urllib2.build_opener(urllib2.HTTPBasicAuthHandler(pm))) #...does not work transparently in combination with proxy support => workaround: urlOpen(). import base64; SITE["AUTH"] = "Basic " + base64.b64encode("%s:%s" % (usr, pwd)) SITE["SEARCH"] = SITE["BASE"] + "search?format=xml&sortedby=beginposition&order=desc&rows=%d&q=%s" % (ROWSSTEP, collspec) product = SITE["BASE"] + "odata/v1/Products('%s')/" SITE["CHECKSUM"], SITE["SAFEZIP"], SITE["SAFEROOT"] = product+"Checksum/Value/$value", product+"$value", product+"Nodes('%s.SAFE')/"
def authorize_basic(user=None, password=None, realm=None, uri=None): ''' Call before calling download ''' if user and password and realm and uri: auth_handler = urllib2.HTTPBasicAuthHandler() auth_handler.add_password(realm=realm, uri=uri, user=user, passwd=password) opener = urllib2.build_opener(auth_handler) urllib2.install_opener(opener)
def _login_BA(self): try: # Creates a PasswordMgr instance passmanager = urllib2.HTTPPasswordMgrWithDefaultRealm() passmanager.add_password(None, self.url, self.username, self.password) # Creates an auth handling object and builds it with opener auth = urllib2.HTTPBasicAuthHandler(passmanager) opener = urllib2.build_opener(auth) response = opener.open(self.url, timeout=8) data = response.read() response.close() return data except Exception, e: if 'Error 401' in str(e): raise Exception('Login credentials incorrect.')
def _open_url(self, url): password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None, self._routes.host, '', self._api_key) handler = urllib2.HTTPBasicAuthHandler(password_mgr) opener = urllib2.build_opener(handler) return opener.open(url)