我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用urllib.request.install_opener()。
def get_response(url, faker = False): logging.debug('get_response: %s' % url) # install cookies if cookies: opener = request.build_opener(request.HTTPCookieProcessor(cookies)) request.install_opener(opener) if faker: response = request.urlopen(request.Request(url, headers = fake_headers), None) else: response = request.urlopen(url) data = response.read() if response.info().get('Content-Encoding') == 'gzip': data = ungzip(data) elif response.info().get('Content-Encoding') == 'deflate': data = undeflate(data) response.data = data return response # DEPRECATED in favor of get_content()
def authenticate(top_level_url=u'https://api.github.com'): try: if 'GH_AUTH_USER' not in os.environ: try: username = raw_input(u'Username: ') except NameError: username = input(u'Username: ') else: username = os.environ['GH_AUTH_USER'] if 'GH_AUTH_PASS' not in os.environ: password = getpass.getpass(u'Password: ') else: password = os.environ['GH_AUTH_USER'] except KeyboardInterrupt: sys.exit(u'') try: import urllib.request as urllib_alias except ImportError: import urllib2 as urllib_alias password_mgr = urllib_alias.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None, top_level_url, username, password) handler = urllib_alias.HTTPBasicAuthHandler(password_mgr) opener = urllib_alias.build_opener(handler) urllib_alias.install_opener(opener)
def proxyurllib(): print(COLOR_GREEN+'-'*30+COLOR_NONE) #TODO proxy handler=request.ProxyHandler({'http':'http://10.112.5.173:49908'}) ''' proxy_auth_handler = urllib.request.ProxyBasicAuthHandler() proxy_auth_handler.add_password('realm', 'host', 'username', 'password') ''' opener=request.build_opener(handler) request.install_opener(opener) #??opener??urlopen()?????URL opener??????urlopen()????????opener???response= google = request.urlopen('http://www.google.com') print(google.read()) print("?????",request.getproxies()) #proxyurllib() #FIXME ROBOT.TXT??
def index(request): if request.method == "GET": try: ssl._create_default_https_context = ssl._create_unverified_context opener = wdf_urllib.build_opener( wdf_urllib.HTTPCookieProcessor(CookieJar())) wdf_urllib.install_opener(opener) except: pass uuid = getUUID() url = 'https://login.weixin.qq.com/qrcode/' + uuid params = { 't': 'webwx', '_': int(time.time()), } request = getRequest(url=url, data=urlencode(params)) response = wdf_urllib.urlopen(request) context = { 'uuid': uuid, 'response': response.read(), 'delyou': '', } return render_to_response('index.html', context)
def __init__(self, server, port, username, password): """ Connection Class init call """ self.server = server self.port = port self.username = username self.password = password self.url = 'https://{0}:{1}'.format(self.server,self.port) self.api = '/api/1.1/xml' self.authtoken = '' self.response = None self.sync_id = '' #force urllib2 to not use a proxy proxy_handler = urllib2.ProxyHandler({}) opener = urllib2.build_opener(proxy_handler) urllib2.install_opener(opener) self.login() #Gets called in __init__
def login(self, username, pwd, cookie_file): """" Login with use name, password and cookies. (1) If cookie file exists then try to load cookies; (2) If no cookies found then do login """ # If cookie file exists then try to load cookies if os.path.exists(cookie_file): try: cookie_jar = cookielib.LWPCookieJar(cookie_file) cookie_jar.load(ignore_discard=True, ignore_expires=True) loaded = 1 except cookielib.LoadError: loaded = 0 LOG.info('Loading cookies error') # install loaded cookies for urllib2 if loaded: cookie_support = urllib2.HTTPCookieProcessor(cookie_jar) opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler) urllib2.install_opener(opener) LOG.info('Loading cookies success') return 1 else: return self.do_login(username, pwd, cookie_file) else: # If no cookies found return self.do_login(username, pwd, cookie_file)
def save_cookie(self, text, cookie_file=CONF.cookie_file): cookie_jar2 = cookielib.LWPCookieJar() cookie_support2 = urllib2.HTTPCookieProcessor(cookie_jar2) opener2 = urllib2.build_opener(cookie_support2, urllib2.HTTPHandler) urllib2.install_opener(opener2) if six.PY3: text = text.decode('gbk') p = re.compile('location\.replace\(\'(.*?)\'\)') # ???httpfox?????????????? # location.replace('http://weibo.com ????????? # ?????????????# ????login_url?? ??????re????? # p = re.compile('location\.replace\(\B'(.*?)'\B\)') # ??? ??????? re?????\'??????? try: # Search login redirection URL login_url = p.search(text).group(1) data = urllib2.urlopen(login_url).read() # Verify login feedback, check whether result is TRUE patt_feedback = 'feedBackUrlCallBack\((.*)\)' p = re.compile(patt_feedback, re.MULTILINE) feedback = p.search(data).group(1) feedback_json = json.loads(feedback) if feedback_json['result']: cookie_jar2.save(cookie_file, ignore_discard=True, ignore_expires=True) return 1 else: return 0 except: return 0
def login(self, username, pwd, cookie_file): """" Login with use name, password and cookies. (1) If cookie file exists then try to load cookies; (2) If no cookies found then do login """ # If cookie file exists then try to load cookies if os.path.exists(cookie_file): try: cookie_jar = cookielib.LWPCookieJar(cookie_file) cookie_jar.load(ignore_discard=True, ignore_expires=True) loaded = 1 except cookielib.LoadError: loaded = 0 print('Loading cookies error') #install loaded cookies for urllib2 if loaded: cookie_support = urllib2.HTTPCookieProcessor(cookie_jar) opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler) urllib2.install_opener(opener) print('Loading cookies success') return 1 else: return self.do_login(username, pwd, cookie_file) else: #If no cookies found return self.do_login(username, pwd, cookie_file)
def build_opener(): cookie = http.cookiejar.CookieJar() cookie_processor = request.HTTPCookieProcessor(cookie) opener = request.build_opener(cookie_processor) opener.addheaders = [("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1"), ("Referer", "https://passport.weibo.cn"), ("Origin", "https://passport.weibo.cn"), ("Host", "passport.weibo.cn")] request.install_opener(opener) #??
def build_opener(): cookie = http.cookiejar.CookieJar() cookie_processor = request.HTTPCookieProcessor(cookie) opener = request.build_opener(cookie_processor) opener.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:49.0) Gecko/20100101 Firefox/49.0"), ("Referer", "http://cn.v2ex.com/signin"), ("Origin", "http://cn.v2ex.com"), ("Host", "cn.v2ex.com")] request.install_opener(opener)
def build_opener(): cookie = http.cookiejar.CookieJar() cookie_processor = request.HTTPCookieProcessor(cookie) opener = request.build_opener(cookie_processor) opener.addheaders = [("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36"), ("Referer", "https://wx.qq.com/"), ("Origin", "https://wx.qq.com/"), ("Host", "wx.qq.com")] request.install_opener(opener) #??uuid
def build_opener(): cookie = http.cookiejar.CookieJar() cookie_processor = request.HTTPCookieProcessor(cookie) opener = request.build_opener(cookie_processor) opener.addheaders = [("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1"), ("Referer", "https://www.zhihu.com/"), ("Origin", "https://www.zhihu.com/"), ("Host", "www.zhihu.com")] request.install_opener(opener)
def __init__(self): self.DEBUG = False self.appid = 'wx782c26e4c19acffb' self.uuid = '' self.base_uri = '' self.redirect_uri = '' self.uin = '' self.sid = '' self.skey = '' self.pass_ticket = '' self.deviceId = 'e' + repr(random.random())[2:17] self.BaseRequest = {} self.synckey = '' self.SyncKey = [] self.User = [] self.MemberList = [] self.ContactList = [] self.GroupList = [] self.autoReplyMode = False self.syncHost = '' self._handlers = dict((k, []) for k in self.message_types) self._handlers['location'] = [] self._handlers['all'] = [] self._filters = dict() opener = request.build_opener(request.HTTPCookieProcessor(CookieJar())) opener.addheaders = [('User-agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.109 Safari/537.36'), ('Referer','https://wx2.qq.com/')] request.install_opener(opener)
def getFile(cls, getfile, unpack=True): if cls.getProxy(): proxy = req.ProxyHandler({'http': cls.getProxy(), 'https': cls.getProxy()}) auth = req.HTTPBasicAuthHandler() opener = req.build_opener(proxy, auth, req.HTTPHandler) req.install_opener(opener) try: response = req.urlopen(getfile) except: msg = "[!] Could not fetch file %s"%getfile if cls.exitWhenNoSource(): sys.exit(msg) else: print(msg) data = None data = response.read() # TODO: if data == text/plain; charset=utf-8, read and decode if unpack: if 'gzip' in response.info().get('Content-Type'): data = gzip.GzipFile(fileobj = BytesIO(data)) elif 'bzip2' in response.info().get('Content-Type'): data = BytesIO(bz2.decompress(data)) elif 'zip' in response.info().get('Content-Type'): fzip = zipfile.ZipFile(BytesIO(data), 'r') if len(fzip.namelist())>0: data=BytesIO(fzip.read(fzip.namelist()[0])) # In case the webserver is being generic elif 'application/octet-stream' in response.info().get('Content-Type'): if data[:4] == b'PK\x03\x04': # Zip fzip = zipfile.ZipFile(BytesIO(data), 'r') if len(fzip.namelist())>0: data=BytesIO(fzip.read(fzip.namelist()[0])) return (data, response)
def api_request_native(url, data=None, token=None, https_proxy=None, method=None): request = urllib.Request(url) # print('API request url:', request.get_full_url()) if method: request.get_method = lambda: method token = token if token != None else token_auth_string() request.add_header('Authorization', 'token ' + token) request.add_header('Accept', 'application/json') request.add_header('Content-Type', 'application/json') if data is not None: request.add_data(bytes(data.encode('utf8'))) # print('API request data:', request.get_data()) # print('API request header:', request.header_items()) # https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy') # if https_proxy: # opener = urllib.build_opener(urllib.HTTPHandler(), urllib.HTTPSHandler(), # urllib.ProxyHandler({'https': https_proxy})) # urllib.install_opener(opener) try: with contextlib.closing(urllib.urlopen(request)) as response: if response.code == 204: # No Content return None else: return json.loads(response.read().decode('utf8', 'ignore')) except urllib.HTTPError as err: with contextlib.closing(err): raise SimpleHTTPError(err.code, err.read())
def set_proxy(self): proxy_handler = request.ProxyHandler({ 'http': '%s:%s' % (_proxy,_port), 'https': '%s:%s' % (_proxy,_port) }) opener = request.build_opener(proxy_handler) request.install_opener(opener) return #Unset Proxy
def unset_proxy(self): proxy_handler = request.ProxyHandler({}) opener = request.build_opener(proxy_handler) request.install_opener(opener) return #Encode URL Download
def _downloadFile(self, toDownload): ''' Downloads the file from the url and saves it in the directory folderPath with the name fileName. ''' fileName, url = toDownload # Opens the web page and creates a file in the folder folderPAth and with the name fileName try: #=============================================================================== # passman = request.HTTPPasswordMgrWithDefaultRealm() # passman.add_password(self.realm, url, self.username, self.password) # # authhandler = request.HTTPBasicAuthHandler(passman) # opener = request.build_opener(authhandler) # request.install_opener(opener) #=============================================================================== u = request.urlopen(url) f = open(fileName, 'wb') block_sz = 8192 while True: buffer = u.read(block_sz) if not buffer: break f.write(buffer) # Closes the file f.close() u.close() return os.path.getsize(fileName) except Exception as ex: warnings.warn(str(ex), UserWarning) return -1
def __init__(self, bot, config_file): super().__init__(bot) opener = urllib_request.build_opener() opener.addheaders = [("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) Tnybot/1.0 Chrome/55.0")] urllib_request.install_opener(opener) config = configparser.RawConfigParser() config.read(config_file) self.base_dir = config["Images"]["dir"] self.checksum = config["Images"]["checksum"] == "True" or False if self.checksum: print( """!!!! Warning! Using checksums to detect duplicate images. Processing image hashes may take awhile on older CPUs. This will save disk space, but will cause an increase in downloads on restart. """ ) self.channels = self.get_config_values(config, "Channels") self.merged_channels = self.get_config_values(config, "MergedChannels") or [] self.upload_channels = self.get_config_values(config, "Upload") if not self.bot.unit_tests: # pragma: no cover self.bot.loop.create_task(self.background()) # self.bot.loop.create_task(self.upload())
def __init__(self, bot): super().__init__(bot) opener = urllib_request.build_opener() opener.addheaders = [ ("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) Tnybot/1.0 Chrome/55.0")] urllib_request.install_opener(opener)
def set_proxy(proxy): proxy_handler = request.ProxyHandler({ 'http': '%s:%s' % proxy, 'https': '%s:%s' % proxy, }) opener = request.build_opener(proxy_handler) request.install_opener(opener)
def unset_proxy(): proxy_handler = request.ProxyHandler({}) opener = request.build_opener(proxy_handler) request.install_opener(opener) # DEPRECATED in favor of set_proxy() and unset_proxy()
def set_http_proxy(proxy): if proxy == None: # Use system default setting proxy_support = request.ProxyHandler() elif proxy == '': # Don't use any proxy proxy_support = request.ProxyHandler({}) else: # Use proxy proxy_support = request.ProxyHandler({'http': '%s' % proxy, 'https': '%s' % proxy}) opener = request.build_opener(proxy_support) request.install_opener(opener)
def install_opener(): if 'http_proxy' in os.environ or 'https_proxy' in os.environ: raise RuntimeError( 'http_proxy or https_proxy set in environment, please unset') handlers = [WSGI_HTTPHandler()] if WSGI_HTTPSHandler is not None: handlers.append(WSGI_HTTPSHandler()) opener = url_lib.build_opener(*handlers) url_lib.install_opener(opener) return opener
def uninstall_opener(): url_lib.install_opener(None)
def Proxy_read(proxy_ip_list, user_agent_list): proxy_ip = random.choice(proxy_ip_list) print('????ip?%s'%proxy_ip) user_agent = random.choice(user_agent_list) print('????user_agent?%s'%user_agent) sleep_time = random.randint(1,5) print('?????%s' %sleep_time) time.sleep(sleep_time) print('????') headers = { 'Host': 'www.baidu.com', 'User-Agent': user_agent, 'Accept': r'application/json, text/javascript, */*; q=0.01', 'Referer': r'http://www.cnblogs.com/Lands-ljk/p/5589888.html', } proxy_support = request.ProxyHandler({'http':proxy_ip}) opener = request.build_opener(proxy_support) request.install_opener(opener) req = request.Request(r'http://www.cnblogs.com/mvc/blog/ViewCountCommentCout.aspx?postId=5589888',headers=headers) try: html = request.urlopen(req).read().decode('utf-8') except Exception as e: print('?????') else: print('OK!')
def _openURL2(self): try: if (self._userName and self._userPass): password_mgr = urlconnection.HTTPPasswordMgr() password_mgr.add_password(self._realm, self._url, self._userName, self._userPass) auth_handler = urlconnection.HTTPBasicAuthHandler(password_mgr) opener = urlconnection.build_opener(auth_handler) urlconnection.install_opener(opener) response = urlconnection.urlopen(self._url, timeout=10) if (response.getcode() == 200): byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') self._parseStats(str_responseData) else: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Response status code from haproxy url is :' + str(response.getcode()) except HTTPError as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] ='Haproxy stats url has HTTP Error '+str(e.code) except URLError as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Haproxy stats url has URL Error '+str(e.reason) except InvalidURL as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Haproxy stats url is invalid URL' except Exception as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Haproxy stats URL error : ' + str(e)
def _openURL3(self): try: if (self._userName and self._userPass): password_mgr = urlconnection.HTTPPasswordMgr() password_mgr.add_password(self._realm, self._url, self._userName, self._userPass) auth_handler = urlconnection.HTTPBasicAuthHandler(password_mgr) opener = urlconnection.build_opener(auth_handler) urlconnection.install_opener(opener) response = urlconnection.urlopen(self._url, timeout=10) if (response.status == 200): byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') self._parseStats(str_responseData) else: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Response status code from haproxy url is :' + str(response.status) except HTTPError as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] ='Haproxy stats url has HTTP Error '+str(e.code) except URLError as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Haproxy stats url has URL Error '+str(e.reason) except InvalidURL as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Haproxy stats url is invalid URL' except Exception as e: #self.dictInterfaceData['status'] = 0 self.dictInterfaceData['msg'] = 'Haproxy stats URL error : ' + str(e)
def readUrl(self,host,port,url,user,password): error=False tomcatUrl = "http://"+host+":"+str(port)+url try: pwdManager = urlconnection.HTTPPasswordMgrWithDefaultRealm() pwdManager.add_password(None,tomcatUrl,user,password) authHandler = urlconnection.HTTPBasicAuthHandler(pwdManager) opener=urlconnection.build_opener(authHandler) urlconnection.install_opener(opener) req = urlconnection.Request(tomcatUrl) handle = urlconnection.urlopen(req, None) data = handle.read() except HTTPError as e: if(e.code==401): data="ERROR: Unauthorized user. Does not have permissions. %s" %(e) elif(e.code==403): data="ERROR: Forbidden, yours credentials are not correct. %s" %(e) else: data="ERROR: The server couldn\'t fulfill the request. %s" %(e) error=True except URLError as e: data = 'ERROR: We failed to reach a server. Reason: %s' %(e.reason) error = True except socket.timeout as e: data = 'ERROR: Timeout error' error = True except socket.error as e: data = "ERROR: Unable to connect with host "+self.host+":"+self.port error = True except: data = "ERROR: Unexpected error: %s"%(sys.exc_info()[0]) error = True return data,error
def metricCollector3(self): try: if (self._userName and self._userPass): password_mgr = urlconnection.HTTPPasswordMgr() password_mgr.add_password(self._realm, self._url, self._userName, self._userPass) auth_handler = urlconnection.HTTPBasicAuthHandler(password_mgr) opener = urlconnection.build_opener(auth_handler) urlconnection.install_opener(opener) response = urlconnection.urlopen(self._url, timeout=10) if response.status == 200: byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') self._parseStats(str_responseData) else: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Error_code' + str(response.status) except HTTPError as e: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Error_code : HTTP Error ' + str(e.code) except URLError as e: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Error_code : URL Error ' + str(e.reason) except InvalidURL as e: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Error_code : Invalid URL' except Exception as e: self.dictApacheData['status'] = 0 self.dictApacheData['msg'] = 'Exception occured in collecting data : ' + str(e)
def metricCollector(): data = {} #defaults data['plugin_version'] = PLUGIN_VERSION data['heartbeat_required']=HEARTBEAT data['units']=METRICS_UNITS URL = "http://"+COUCHDB_HOST+":"+COUCHDB_PORT+COUCHDB_STATS_URI try: if COUCHDB_USERNAME and COUCHDB_PASSWORD: password_mgr = connector.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(REALM, URL, COUCHDB_USERNAME, COUCHDB_PASSWORD) auth_handler = connector.HTTPBasicAuthHandler(password_mgr) opener = connector.build_opener(auth_handler) connector.install_opener(opener) response = connector.urlopen(URL, timeout=10) byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') couch_dict = json.loads(str_responseData) for attribute, attribute_value in couch_dict.items(): for metric, val in attribute_value.items(): if 'current' in val and val['current'] is not None: if metric in METRICS_KEY_VS_NAME: metric = METRICS_KEY_VS_NAME[metric] data[metric]=val['current'] except Exception as e: data['status']=0 data['msg']=str(e) return data
def metricCollector(): data = {} data['plugin_version'] = PLUGIN_VERSION data['heartbeat_required'] = HEARTBEAT data['units'] = METRICS_UNITS URL = 'http://%s:%s/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost' % (ACTIVEMQ_HOST, ACTIVEMQ_PORT) try: if ACTIVEMQ_USERNAME and ACTIVEMQ_PASSWORD: password_mgr = connector.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(REALM, URL, ACTIVEMQ_USERNAME, ACTIVEMQ_PASSWORD) auth_handler = connector.HTTPBasicAuthHandler(password_mgr) opener = connector.build_opener(auth_handler) connector.install_opener(opener) response = connector.urlopen(URL, timeout=10) byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') json_data = json.loads(str_responseData) total_message_count = json_data['value']['TotalMessageCount'] total_connections_count = json_data['value']['TotalConnectionsCount'] total_consumer_count = json_data['value']['TotalConsumerCount'] total_producer_count = json_data['value']['TotalProducerCount'] data['total_message_count'] = total_message_count data['total_connections_count'] = total_connections_count data['total_consumer_count'] = total_consumer_count data['total_producer_count'] = total_producer_count except Exception as e: data['status'] = 0 data['msg'] = str(e) return data
def metricCollector(): data = {} #defaults data['plugin_version'] = PLUGIN_VERSION data['heartbeat_required']=HEARTBEAT data['units']=METRICS_UNITS URL = "http://"+RIAK_HOST+":"+RIAK_PORT+"/"+RIAK_STATS_URI try: if RIAK_USERNAME and RIAK_PASSWORD: password_mgr = connector.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(REALM, URL, RIAK_USERNAME, RIAK_PASSWORD) auth_handler = connector.HTTPBasicAuthHandler(password_mgr) opener = connector.build_opener(auth_handler) connector.install_opener(opener) response = connector.urlopen(URL, timeout=10) byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') riak_dict = json.loads(str_responseData) for metric in riak_dict: if metric in METRICS_TO_BE_PUSHED_TO_SERVER: value=riak_dict[metric] if metric in BYTES_TO_MB_LIST: value=convertBytesToMB(value) data[metric]=value except Exception as e: data['status']=0 data['msg']=str(e) return data
def getOverview(data): try: URL = RABBITMQ_SERVER+RABBITMQ_API_URI if RABBITMQ_USERNAME and RABBITMQ_PASSWORD: password_mgr = connector.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(REALM, URL, RABBITMQ_USERNAME, RABBITMQ_PASSWORD) auth_handler = connector.HTTPBasicAuthHandler(password_mgr) opener = connector.build_opener(auth_handler) connector.install_opener(opener) response = connector.urlopen(URL, timeout=10) byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') rabbit_dict = json.loads(str_responseData) if rabbit_dict: if 'consumers' in rabbit_dict['object_totals']: data['consumers']=rabbit_dict['object_totals']['consumers'] if 'queues' in rabbit_dict['object_totals']: data['queues']=rabbit_dict['object_totals']['queues'] if 'exchanges' in rabbit_dict['object_totals']: data['exchanges']=rabbit_dict['object_totals']['exchanges'] if 'channels' in rabbit_dict['object_totals']: data['channels']=rabbit_dict['object_totals']['channels'] data['messages_ready']=rabbit_dict['queue_totals']['messages_ready'] data['messages_unack']=rabbit_dict['queue_totals']['messages_unacknowledged'] data['messages']=rabbit_dict['queue_totals']['messages'] data['messages_rate']=rabbit_dict['queue_totals']['messages_details']['rate'] data['messages_ready_rate']=rabbit_dict['queue_totals']['messages_ready_details']['rate'] data['messages_unack_rate']=rabbit_dict['queue_totals']['messages_unacknowledged_details']['rate'] if 'deliver_details' in rabbit_dict['message_stats']: data['deliverrate']=rabbit_dict['message_stats']['deliver_details']['rate'] if 'ack_details' in rabbit_dict['message_stats']: data['ackrate']=rabbit_dict['message_stats']['ack_details']['rate'] if 'publish_details' in rabbit_dict['message_stats']: data['publishrate']=rabbit_dict['message_stats']['publish_details']['rate'] except Exception as e: data['status']=0 data['msg']=str(e)
def getNodes(data): try: NODES_URL=RABBITMQ_SERVER+RABBITMQ_NODES_URI if RABBITMQ_USERNAME and RABBITMQ_PASSWORD: password_mgr = connector.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(REALM, NODES_URL, RABBITMQ_USERNAME, RABBITMQ_PASSWORD) auth_handler = connector.HTTPBasicAuthHandler(password_mgr) opener = connector.build_opener(auth_handler) connector.install_opener(opener) response = connector.urlopen(NODES_URL, timeout=10) byte_responseData = response.read() str_responseData = byte_responseData.decode('UTF-8') rabbit_nodes_dict = json.loads(str_responseData) nodes_dict=rabbit_nodes_dict[0] if nodes_dict: if 'mem_used' in nodes_dict: value = convertBytesToMB(nodes_dict['mem_used']) data['mem_used']=value if 'fd_used' in nodes_dict: data['fd_used']=nodes_dict['fd_used'] if 'run_queue' in nodes_dict: data['run_queue']=nodes_dict['run_queue'] if 'sockets_used' in nodes_dict: data['sockets_used']=nodes_dict['sockets_used'] if 'proc_used' in nodes_dict: data['proc_used']=nodes_dict['proc_used'] if 'processors' in nodes_dict: data['processors']=nodes_dict['processors'] if 'fd_total' in nodes_dict: data['fd_total']=nodes_dict['fd_total'] if 'sockets_total' in nodes_dict: data['sockets_total']=nodes_dict['sockets_total'] if 'disk_free_limit' in nodes_dict: value=convertBytesToMB(nodes_dict['disk_free_limit']) data['disk_free_limit']=value if 'partitions' in nodes_dict: partitions=nodes_dict['partitions'] data['partitions']=len(partitions) except Exception as e: data['status']=0 data['msg']=str(e)