我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用logging.config.read()。
def __init__(self, ini_file, dry_run=False): self.dry_run = dry_run self.build_info = [] self.logger = logging.getLogger('util.agentBuilder') import ConfigParser # config = ConfigParser.RawConfigParser(allow_no_value=True) config = ConfigParser.ConfigParser() config.read(ini_file) root_dir = config.get('base', 'root_dir') self.lib_base_dir = config.get('base', 'lib_base_dir', 0, {'root_dir': root_dir}) self.channels_dir = config.get('base', 'channels_dir', 0, {'root_dir': root_dir}) self.demo_dir = config.get('demo', 'demo_dir', 0, {'root_dir': root_dir}) from datetime import date self.apk_dir = config.get('demo', 'apk_dir', 0, {'root_dir': root_dir, 'day':date.today().strftime('%m%d')}) self.plugin_dir = config.get('plugins', 'plugin_dir', 0, {'root_dir': root_dir})
def get_mapping_managers(): """{ 'sdn-manager': [ 'SdnResource' ], 'nfv-manager': [ 'NfvResource' ], 'monitoring-manager': [ 'MonitoringResource' ], 'security-manager': [ 'SecurityResource' ], 'physical-device-manager': [ 'PhysicalResource' ] }""" with open(get_config('system', 'mapping-manager-file', '/etc/softfire/mapping-managers.json')) as f: return json.loads(f.read())
def __call__(self, filename): if not filename and not self._opts.default_config: return None assert(issubclass(self._opts.ConfigClass, Config)) if filename: filename = os.path.expanduser(os.path.expandvars(filename)) config = self._opts.ConfigClass(filename, **self._opts.configClassOpts(), **self._opts.configparser_opts) # Default config? Start with that... if self._opts.default_config: config.read_string(self._opts.default_config, source="<default>") # User file. if filename: config.read() if self._opts.init_logging_fileConfig: logging.config.fileConfig(config) return config
def get_status_data(self): r = Request(self.url) if self.basic_auth: r.add_header('Authorization', "Basic %s" % self.basic_auth) try: u = urlopen(r) except HTTPError as e: LOG.error("request for %s returned %d", self.url, e.code) return None except URLError as e: LOG.error("request for %s failed: %s", self.url, e.reason) return None ct = u.info().getheader('Content-Type') return { 'content-type': ct, 'body': u.read() }
def getCamera(schoolId): cameraInfo = "" try: url = vod_camera.format(schoolId) result = urllib2.urlopen(url, timeout=10).read() cameraInfo = result jsonData = json.loads(result) result = jsonData["resCode"] == '000000' except Exception, e: logging.error(e) if result == False: sendMailNotifier("vod_api_camera", "vod_api_camera", "vod????{0}".format(url)) sendSMSNotifier("vod_api_camera", "[vod]vod_api_camera????") return cameraInfo # ????
def getCamera(schoolId): cameraInfo = "" try: url = vod_camera.format(schoolId) result = urllib2.urlopen(url, timeout=10).read() cameraInfo = result jsonData = json.loads(result) logging.debug(jsonData) result = jsonData["resCode"] == '000000' except Exception, e: logging.error(e) if result == False: sendMailNotifier("vod_api_camera", "vod_api_camera", "vod????{0}".format(url)) sendSMSNotifier("vod_api_camera", "[vod]vod_api_camera????") return cameraInfo # ????
def __init__(self, filename, header): config = SafeConfigParser() cfgfile = config.read(filename) if not cfgfile: raise ValueError('Config file not found:', filename) self.__dict__.update(config.items(header))
def read(self): """ Returns RFC 3339 timestamp string. Tries to read given file. If file cannot be read, current time is returned. """ logging.debug("Looking for bookmark file") try: if os.path.getsize(self.bmfile) < 10: logging.error("Bookmark file appears corrupt: %s", self.bmfile) self._generate_bm_time() return self.s_time except FileNotFoundError: logging.debug("Bookmark file not found: %s.", self.bmfile) self._generate_bm_time() return self.s_time try: with open(self.bmfile, 'r') as self.open_bmfile: logging.debug("Opening: %s", self.bmfile) self.bookmark = self.open_bmfile.read() logging.debug("File found. Reading timestamp: %s", convert_time(self.bookmark)) if validate_time('s', self.bookmark): logging.debug("Bookmark time is valid") self.s_time = self.bookmark return self.s_time else: logging.error("Invalid bookmark data. Using current time") self._generate_bm_time() return self.s_time except OSError: logging.debug("Bookmark file cannot be accessed: %s.", self.bmfile) self._generate_bm_time() return self.s_time
def update_index_html(self, ipa_name, url_base): # read .plist on self.base_dir plist_array = [] for f in os.listdir(self.base_dir): if f.endswith('.plist') : plist_array.append(f) # update content content = PlistBuddy.html_head % ipa_name plist_array.sort(reverse=True) for f in plist_array: content = content + PlistBuddy.plist_link % (os.path.join(url_base, f) , os.path.splitext(f)[0]) content = content + PlistBuddy.html_tail open(os.path.join(self.base_dir,'index.html'), 'w').write(content)
def get_config_parser(): """ Get the ConfigParser object containing the system configurations :return: ConfigParser object containing the system configurations """ config = configparser.ConfigParser() if os.path.exists(CONFIG_FILE_PATH) and os.path.isfile(CONFIG_FILE_PATH): config.read(CONFIG_FILE_PATH) return config else: logging.error("Config file not found, create %s" % CONFIG_FILE_PATH) exit(1)
def read(self, filenames=None, encoding=None, touch=False): filenames = filenames or [] super().read(filenames, encoding=encoding) self.input_filenames += filenames # TODO: deprecate touch? see ctor if not self.filename.exists() and touch: self.filename.touch() with open(str(self.filename), encoding=encoding) as fp: self.read_file(fp, source=str(self.filename)) return self
def __str__(self): out = io.StringIO() self.write(out) out.seek(0) return out.read()
def load_conf(conf_file): """Loads and parses conf file Loads and parses the module conf file Args: conf_file: string with the conf file name Returns: ConfigParser object with conf_file configs Exception: OneViewRedfishResourceNotFoundError: - if conf file not found """ if not os.path.isfile(conf_file): raise errors.OneViewRedfishResourceNotFoundError(conf_file, 'File') config = configparser.ConfigParser() config.optionxform = str try: config.read(conf_file) except Exception: raise return config
def read_config(self): # if self.config: # return config = ConfigParser.RawConfigParser() config.read(self.config_file) for s in config.sections(): if s == 'global': if config.has_option(s, 'poll_interval'): self.poll_interval = int(config.get(s, 'poll_interval')) if config.has_option(s, 'newrelic_license_key'): self.license_key = config.get(s, 'newrelic_license_key') continue if not config.has_option(s, 'name') or not config.has_option(s, 'url'): continue ns = NginxStatusCollector(s, config.get(s, 'name'), config.get(s, 'url'), self.poll_interval) if config.has_option(s, 'http_user') and config.has_option(s, 'http_pass'): ns.basic_auth = base64.b64encode(config.get(s, 'http_user') + b':' + config.get(s, 'http_pass')) self.sources.append(ns) self.config = config
def getConfig(section, key): config = ConfigParser.ConfigParser() path = os.path.split(os.path.realpath(__file__))[0] + '/application.conf' config.read(path) return config.get(section, key)
def sendSMSNotifier(type, msg): try: url = vod_api_notice.format("sms_notice", type) result = urllib2.urlopen(url, timeout=10).read() jsonData = json.loads(result) result = jsonData["data"] if result != None: to = ','.join(result) smsurl = sms_url.format(to, 'vod',msg) smsResult = urllib2.urlopen(smsurl, timeout=10).read() logging.debug(smsResult) except Exception as e: logging.error("??????:{0}".format(e))
def chkVodFile(checkDate=None): checkForFixed=True if checkDate ==None: checkDate=getNowYmd() checkForFixed=False #?????? url=edu_api_course.format(schoolId,checkDate ,'00:00','23:59') result = urllib2.urlopen(url, timeout=10).read() jsonObj = json.loads(result) logging.info("????????") if jsonObj["status"]=="0" : nowTime = time.localtime() for course in jsonObj["data"]: if checkForFixed ==True: logging.info("????????{0}??".format(checkDate)) chkVodDone(course) else: startTime = time.strptime(course["startTime"],"%H:%M") endTime = time.strptime(course["endTime"],"%H:%M") if startTime.tm_hour == nowTime.tm_hour and 5< nowTime.tm_min - startTime.tm_min < 10: logging.info("???????????:{0} ??????:{1}".format(time.strftime("%H:%M",nowTime),startTime)) chkVodCapture(course) if endTime.tm_hour == nowTime.tm_hour and 5< nowTime.tm_min - endTime.tm_min < 10: logging.info("????????,????:{0} ??????:{1}".format(time.strftime("%H:%M",nowTime), endTime)) chkVodDone(course) else: logging.info("{0}-{1}??????".format(schoolId,checkDate))
def daily_report_capture(reportDate=None): try: if reportDate==None: reportDate=(datetime.now() + timedelta(days=-1)).strftime("%Y-%m-%d") videoUrl=vod_video.format(schoolId,reportDate) courseUrl= edu_api_course.format(schoolId,reportDate ,'00:00','23:59') total=0 valid=0 capture=0 post=0 done =0 mailDetail='' courseResult = urllib2.urlopen(courseUrl, timeout=10).read() courseData = json.loads(courseResult) total = len(courseData["data"]) if courseData["status"]=="0": videoResult = urllib2.urlopen(videoUrl, timeout=10).read() videoData = json.loads(videoResult) result = videoData["resCode"] == '000000' for data in videoData["data"]: valid+=1 if data["postStatus"]==2: post+=1 if data["captureStatus"] == 2: capture+=1 if data["videoStatus"] == 10: done+=1 if total!=capture: _total,invalid,mailDetail = chkClassRoom(reportDate,False) valid=_total-invalid msg= "???{0} ?? {1} ???{2} ????{3} ???{4} ???{5} ???{6}\n"\ .format(schoolName,reportDate,total,valid,capture,post,done) logging.info(msg) sendMailNotifier("daily_report_capture","capture_Report",msg +mailDetail) except Exception, e: logging.error(e)
def chkClassRoom(checkDate=None,sendMsg=True): if checkDate ==None: checkDate=datetime.now() + timedelta(days=1) result = "" mailDetail="" total=0 invalid=0 camera = upper(getCamera(schoolId)) courseUrl = edu_api_course.format(schoolId, checkDate, '00:00', '23:59') jsonData = json.loads(urllib2.urlopen(courseUrl, timeout=10).read()) if jsonData["status"]=="0": mailDetail = "<div><br></div><div><span><div><table border=\"1\"bordercolor=\"#000000\"cellpadding=\"2\"cellspacing=\"0\"style=\"font-size: 10pt; border-collapse:collapse; border:none\"width=\"50%\"><caption><font size=\"2\"face=\"Verdana\">test</font></caption><tbody><tr><td width=\"20%\"style=\"border: solid 1 #000000\"nowrap=\"\"><font size=\"2\"face=\"Verdana\"><div> ??</div></font></td><td width=\"20%\"style=\"border: solid 1 #000000\"nowrap=\"\"><font size=\"2\"face=\"Verdana\"><div> ????</div></font></td><td width=\"20%\"style=\"border: solid 1 #000000\"nowrap=\"\"><font size=\"2\"face=\"Verdana\"><div> ??</div></font></td><td width=\"20%\"style=\"border: solid 1 #000000\"nowrap=\"\"><font size=\"2\"face=\"Verdana\"><div> ????</div></font></td><td width=\"20%\"style=\"border: solid 1 #000000\"nowrap=\"\"><font size=\"2\"face=\"Verdana\"><div> ??</div></font></td></tr>" total=len(jsonData["data"]) for data in jsonData["data"]: if "\"" + upper(data["classRoom"]) + "\"" not in camera \ and "\"" + upper(data["classRoom"]).replace("??","") + "\"" not in camera \ and "\"" + upper(data["classRoom"])+"??" + "\"" not in camera: invalid+=1 result += json.dumps(data) + "\n" logging.error("classRoom??" + json.dumps(data)) mailDetail += "<tr>" mailDetail += "<td width=\"20%\"style=\"border: solid 1 #000000\"nowrap=\"\"><span>{0}</span></td>".format(invalid) mailDetail += "<td width=\"20%\"style=\"border: solid 1 #000000\"nowrap=\"\"><span>{0}</span></td>".format(data["courseNo"]) mailDetail += "<td width=\"20%\"style=\"border: solid 1 #000000\"nowrap=\"\"><span>{0}</span></td>".format(data["classIdx"]) mailDetail += "<td width=\"20%\"style=\"border: solid 1 #000000\"nowrap=\"\"><span>{0}</span></td>".format(data["startTime"]) mailDetail += "<td width=\"20%\"style=\"border: solid 1 #000000\"nowrap=\"\"><span>{0}</span></td>".format(data["classRoom"]) mailDetail+="</tr>" mailDetail+="</tbody></table></div></span></div>" if len(result) > 0: if sendMsg==True: errmsg = "??:{0}\n ??:{1}\n ????:{2}\n ????:\n{3}\n"\ .format(schoolId,checkDate,courseUrl,mailDetail) sendMailNotifier("edu_room", "edu_room", errmsg) return total,invalid,mailDetail else: return total,invalid,"" # 02 07:0-23:00 10???????????
def chkEduApiResult(url): result = False try: result = urllib2.urlopen(url, timeout=10).read() jsonData = json.loads(result) logging.debug(jsonData) result = jsonData["status"] == '0' except Exception, e: logging.error(e) if result == False: sendMailNotifier("edu_api", "edu_api", "??????{0}".format(url)) sendSMSNotifier("edu_api", "[vod]" + "??????") return result
def daily_report_capture(reportDate=None): try: if reportDate==None: reportDate=(datetime.now() + timedelta(days=-1)).strftime("%Y-%m-%d") videoUrl=vod_video.format(schoolId,reportDate) courseUrl= edu_api_course.format(schoolId,reportDate ,'00:00','23:59') total=0 valid=0 capture=0 post=0 done =0 mailDetail='' courseResult = urllib2.urlopen(courseUrl, timeout=10).read() courseData = json.loads(courseResult) total = len(courseData["data"]) if courseData["status"]=="0": videoResult = urllib2.urlopen(videoUrl, timeout=10).read() videoData = json.loads(videoResult) result = videoData["resCode"] == '000000' for data in videoData["data"]: valid+=1 if data["postStatus"]==2: post+=1 if data["captureStatus"] == 2: capture+=1 if data["videoStatus"] == 10: done+=1 if total!=capture: _total,invalid,mailDetail = chkClassRoom(reportDate,False) valid=_total-invalid msg= "???{0} ?? {1} ???{2} ????{3} ???{4} ???{5} ???{6}\n"\ .format(schoolId,reportDate,total,valid,capture,post,done) logging.info(msg) # sendMailNotifier("daily_report_capture","capture_Report",msg +mailDetail) except Exception, e: logging.error(e)
def load_config(conf_file): """Loads redfish.conf file Loads and parsers the system conf file into config global var Loads json schemas into schemas_dict global var Established a connection with OneView and sets in as ov_conn global var Args: conf_file: string with the conf file name Returns: None Exception: OneViewRedfishResourceNotFoundError: - if conf file not found - if any of the schemas files are not found - if the schema directory is not found OneViewRedFishResourceNotAccessibleError: - if can't access schema's directory HPOneViewException: - if fails to connect to oneview """ config = load_conf(conf_file) globals()['config'] = config # Config file read set global vars # Setting ov_config ov_config = dict(config.items('oneview_config')) ov_config['credentials'] = dict(config.items('credentials')) ov_config['api_version'] = int(ov_config['api_version']) globals()['ov_config'] = ov_config # Setting schemas_dict schemas = dict(config.items('schemas')) globals()['schemas'] = schemas registries = dict(config.items('registry')) # Load schemas | Store schemas | Connect to OneView try: ov_client = OneViewClient(ov_config) globals()['ov_client'] = ov_client registry_dict = load_registry( config['redfish']['registry_dir'], registries) globals()['registry_dict'] = registry_dict store_schemas(config['redfish']['schema_dir']) except errors.OneViewRedfishResourceNotFoundError as e: raise errors.OneViewRedfishError( 'Failed to load schemas or registries: {}'.format(e) ) except Exception as e: raise errors.OneViewRedfishError( 'Failed to connect to OneView: {}'.format(e) )