我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用urllib.request.HTTPPasswordMgrWithDefaultRealm()。
def authenticate(top_level_url=u'https://api.github.com'): try: if 'GH_AUTH_USER' not in os.environ: try: username = raw_input(u'Username: ') except NameError: username = input(u'Username: ') else: username = os.environ['GH_AUTH_USER'] if 'GH_AUTH_PASS' not in os.environ: password = getpass.getpass(u'Password: ') else: password = os.environ['GH_AUTH_USER'] except KeyboardInterrupt: sys.exit(u'') try: import urllib.request as urllib_alias except ImportError: import urllib2 as urllib_alias password_mgr = urllib_alias.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None, top_level_url, username, password) handler = urllib_alias.HTTPBasicAuthHandler(password_mgr) opener = urllib_alias.build_opener(handler) urllib_alias.install_opener(opener)
def __init__(self, username, password, # pylint: disable=E1002 verbose=0, use_datetime=False, https_handler=None): """Initialize""" if PY2: Transport.__init__(self, use_datetime=False) else: super().__init__(use_datetime=False) self._username = username self._password = password self._use_datetime = use_datetime self.verbose = verbose self._username = username self._password = password self._handlers = [] if self._username and self._password: self._passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm() self._auth_handler = urllib2.HTTPDigestAuthHandler(self._passmgr) else: self._auth_handler = None self._passmgr = None if https_handler: self._handlers.append(https_handler) self._scheme = 'https' else: self._scheme = 'http' if self._auth_handler: self._handlers.append(self._auth_handler)
def __init__(self, username, password, #pylint: disable=E1002 verbose=0, use_datetime=False, https_handler=None): """Initialize""" if PY2: Transport.__init__(self, use_datetime=False) else: super().__init__(use_datetime=False) self._username = username self._password = password self._use_datetime = use_datetime self.verbose = verbose self._username = username self._password = password self._handlers = [] if self._username and self._password: self._passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm() self._auth_handler = urllib2.HTTPDigestAuthHandler(self._passmgr) else: self._auth_handler = None self._passmgr = None if https_handler: self._handlers.append(https_handler) self._scheme = 'https' else: self._scheme = 'http' if self._auth_handler: self._handlers.append(self._auth_handler)
def _downloadFile(self, toDownload): ''' Downloads the file from the url and saves it in the directory folderPath with the name fileName. ''' fileName, url = toDownload # Opens the web page and creates a file in the folder folderPAth and with the name fileName try: #=============================================================================== # passman = request.HTTPPasswordMgrWithDefaultRealm() # passman.add_password(self.realm, url, self.username, self.password) # # authhandler = request.HTTPBasicAuthHandler(passman) # opener = request.build_opener(authhandler) # request.install_opener(opener) #=============================================================================== u = request.urlopen(url) f = open(fileName, 'wb') block_sz = 8192 while True: buffer = u.read(block_sz) if not buffer: break f.write(buffer) # Closes the file f.close() u.close() return os.path.getsize(fileName) except Exception as ex: warnings.warn(str(ex), UserWarning) return -1
def _open_url(self, url): password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None, self._routes.host, '', self._api_key) handler = urllib2.HTTPBasicAuthHandler(password_mgr) opener = urllib2.build_opener(handler) return opener.open(url)
def readUrl(self,host,port,url,user,password): error=False tomcatUrl = "http://"+host+":"+str(port)+url try: pwdManager = urlconnection.HTTPPasswordMgrWithDefaultRealm() pwdManager.add_password(None,tomcatUrl,user,password) authHandler = urlconnection.HTTPBasicAuthHandler(pwdManager) opener=urlconnection.build_opener(authHandler) urlconnection.install_opener(opener) req = urlconnection.Request(tomcatUrl) handle = urlconnection.urlopen(req, None) data = handle.read() except HTTPError as e: if(e.code==401): data="ERROR: Unauthorized user. Does not have permissions. %s" %(e) elif(e.code==403): data="ERROR: Forbidden, yours credentials are not correct. %s" %(e) else: data="ERROR: The server couldn\'t fulfill the request. %s" %(e) error=True except URLError as e: data = 'ERROR: We failed to reach a server. Reason: %s' %(e.reason) error = True except socket.timeout as e: data = 'ERROR: Timeout error' error = True except socket.error as e: data = "ERROR: Unable to connect with host "+self.host+":"+self.port error = True except: data = "ERROR: Unexpected error: %s"%(sys.exc_info()[0]) error = True return data,error
def metricCollector(): data = {} #defaults data['plugin_version'] = PLUGIN_VERSION data['heartbeat_required']=HEARTBEAT data['units']=METRICS_UNITS URL = "http://"+COUCHDB_HOST+":"+COUCHDB_PORT+COUCHDB_STATS_URI try: if COUCHDB_USERNAME and COUCHDB_PASSWORD: password_mgr = connector.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(REALM, URL, COUCHDB_USERNAME, COUCHDB_PASSWORD) auth_handler = connector.HTTPBasicAuthHandler(password_mgr) opener = connector.build_opener(auth_handler) connector.install_opener(opener) response = connector.urlopen(URL, timeout=10) byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') couch_dict = json.loads(str_responseData) for attribute, attribute_value in couch_dict.items(): for metric, val in attribute_value.items(): if 'current' in val and val['current'] is not None: if metric in METRICS_KEY_VS_NAME: metric = METRICS_KEY_VS_NAME[metric] data[metric]=val['current'] except Exception as e: data['status']=0 data['msg']=str(e) return data
def metricCollector(): data = {} #defaults data['plugin_version'] = PLUGIN_VERSION data['heartbeat_required']=HEARTBEAT data['units']=METRICS_UNITS URL = "http://"+RIAK_HOST+":"+RIAK_PORT+"/"+RIAK_STATS_URI try: if RIAK_USERNAME and RIAK_PASSWORD: password_mgr = connector.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(REALM, URL, RIAK_USERNAME, RIAK_PASSWORD) auth_handler = connector.HTTPBasicAuthHandler(password_mgr) opener = connector.build_opener(auth_handler) connector.install_opener(opener) response = connector.urlopen(URL, timeout=10) byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') riak_dict = json.loads(str_responseData) for metric in riak_dict: if metric in METRICS_TO_BE_PUSHED_TO_SERVER: value=riak_dict[metric] if metric in BYTES_TO_MB_LIST: value=convertBytesToMB(value) data[metric]=value except Exception as e: data['status']=0 data['msg']=str(e) return data
def getOverview(data): try: URL = RABBITMQ_SERVER+RABBITMQ_API_URI if RABBITMQ_USERNAME and RABBITMQ_PASSWORD: password_mgr = connector.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(REALM, URL, RABBITMQ_USERNAME, RABBITMQ_PASSWORD) auth_handler = connector.HTTPBasicAuthHandler(password_mgr) opener = connector.build_opener(auth_handler) connector.install_opener(opener) response = connector.urlopen(URL, timeout=10) byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') rabbit_dict = json.loads(str_responseData) if rabbit_dict: if 'consumers' in rabbit_dict['object_totals']: data['consumers']=rabbit_dict['object_totals']['consumers'] if 'queues' in rabbit_dict['object_totals']: data['queues']=rabbit_dict['object_totals']['queues'] if 'exchanges' in rabbit_dict['object_totals']: data['exchanges']=rabbit_dict['object_totals']['exchanges'] if 'channels' in rabbit_dict['object_totals']: data['channels']=rabbit_dict['object_totals']['channels'] data['messages_ready']=rabbit_dict['queue_totals']['messages_ready'] data['messages_unack']=rabbit_dict['queue_totals']['messages_unacknowledged'] data['messages']=rabbit_dict['queue_totals']['messages'] data['messages_rate']=rabbit_dict['queue_totals']['messages_details']['rate'] data['messages_ready_rate']=rabbit_dict['queue_totals']['messages_ready_details']['rate'] data['messages_unack_rate']=rabbit_dict['queue_totals']['messages_unacknowledged_details']['rate'] if 'deliver_details' in rabbit_dict['message_stats']: data['deliverrate']=rabbit_dict['message_stats']['deliver_details']['rate'] if 'ack_details' in rabbit_dict['message_stats']: data['ackrate']=rabbit_dict['message_stats']['ack_details']['rate'] if 'publish_details' in rabbit_dict['message_stats']: data['publishrate']=rabbit_dict['message_stats']['publish_details']['rate'] except Exception as e: data['status']=0 data['msg']=str(e)
def getNodes(data): try: NODES_URL=RABBITMQ_SERVER+RABBITMQ_NODES_URI if RABBITMQ_USERNAME and RABBITMQ_PASSWORD: password_mgr = connector.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(REALM, NODES_URL, RABBITMQ_USERNAME, RABBITMQ_PASSWORD) auth_handler = connector.HTTPBasicAuthHandler(password_mgr) opener = connector.build_opener(auth_handler) connector.install_opener(opener) response = connector.urlopen(NODES_URL, timeout=10) byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') rabbit_nodes_dict = json.loads(str_responseData) nodes_dict=rabbit_nodes_dict[0] if nodes_dict: if 'mem_used' in nodes_dict: value = convertBytesToMB(nodes_dict['mem_used']) data['mem_used']=value if 'fd_used' in nodes_dict: data['fd_used']=nodes_dict['fd_used'] if 'run_queue' in nodes_dict: data['run_queue']=nodes_dict['run_queue'] if 'sockets_used' in nodes_dict: data['sockets_used']=nodes_dict['sockets_used'] if 'proc_used' in nodes_dict: data['proc_used']=nodes_dict['proc_used'] if 'processors' in nodes_dict: data['processors']=nodes_dict['processors'] if 'fd_total' in nodes_dict: data['fd_total']=nodes_dict['fd_total'] if 'sockets_total' in nodes_dict: data['sockets_total']=nodes_dict['sockets_total'] if 'disk_free_limit' in nodes_dict: value=convertBytesToMB(nodes_dict['disk_free_limit']) data['disk_free_limit']=value if 'partitions' in nodes_dict: partitions=nodes_dict['partitions'] data['partitions']=len(partitions) except Exception as e: data['status']=0 data['msg']=str(e)
def metricCollector(): data = {} #defaults data['plugin_version'] = PLUGIN_VERSION data['heartbeat_required']=HEARTBEAT data['units']=METRICS_UNITS URL = "http://"+COUCHBASE_SERVER_HOST+":"+COUCHBASE_SERVER_PORT+"/"+COUCHBASE_SERVER_STATS_URI try: if COUCHBASE_SERVER_USERNAME and COUCHBASE_SERVER_PASSWORD: password_mgr = connector.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(REALM, URL, COUCHBASE_SERVER_USERNAME, COUCHBASE_SERVER_PASSWORD) auth_handler = connector.HTTPBasicAuthHandler(password_mgr) opener = connector.build_opener(auth_handler) connector.install_opener(opener) response = connector.urlopen(URL, timeout=10) byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') couchserver_dict = json.loads(str_responseData) data['hdd.total']=couchserver_dict['storageTotals']['hdd']['total'] data['hdd.quotaTotal']=couchserver_dict['storageTotals']['hdd']['quotaTotal'] data['hdd.usedByData']=couchserver_dict['storageTotals']['hdd']['usedByData'] data['hdd.used']=couchserver_dict['storageTotals']['hdd']['used'] data['ram.used']=couchserver_dict['storageTotals']['ram']['used'] data['ram.quotaUsed']=couchserver_dict['storageTotals']['ram']['quotaUsed'] data['ram.quotaUsedPerNode']=couchserver_dict['storageTotals']['ram']['quotaUsedPerNode'] data['ram.quotaTotalPerNode']=couchserver_dict['storageTotals']['ram']['quotaTotalPerNode'] data['ram.total']=couchserver_dict['storageTotals']['ram']['total'] for item in data: if '.' in item: data[item]=convertBytesToMB(data[item]) except Exception as e: data['status']=0 data['msg']=str(e) return data