我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用logging.getLogger()。
def configure_logging(path_to_log_directory): """ Configure logger :param path_to_log_directory: path to directory to write log file in :return: """ log_filename = datetime.datetime.now().strftime('%Y-%m-%d') + '.log' importer_logger = logging.getLogger('importer_logger') importer_logger.setLevel(LOG_LEVEL) formatter = logging.Formatter('%(asctime)s : %(levelname)s : %(message)s') fh = logging.FileHandler(filename=os.path.join(path_to_log_directory, log_filename)) fh.setLevel(LOG_LEVEL) fh.setFormatter(formatter) importer_logger.addHandler(fh) sh = logging.StreamHandler(sys.stdout) sh.setLevel(LOG_LEVEL) sh.setFormatter(formatter) importer_logger.addHandler(sh)
def create_logger(): """ Setup the logging environment """ log = logging.getLogger() # root logger log.setLevel(logging.INFO) format_str = '%(asctime)s - %(levelname)-8s - %(message)s' date_format = '%Y-%m-%d %H:%M:%S' if HAVE_COLORLOG and os.isatty(2): cformat = '%(log_color)s' + format_str colors = {'DEBUG': 'reset', 'INFO': 'reset', 'WARNING': 'bold_yellow', 'ERROR': 'bold_red', 'CRITICAL': 'bold_red'} formatter = colorlog.ColoredFormatter(cformat, date_format, log_colors=colors) else: formatter = logging.Formatter(format_str, date_format) stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) log.addHandler(stream_handler) return logging.getLogger(__name__)
def setup_logging(log_level=logging.INFO): """Set up the logging.""" logging.basicConfig(level=log_level) fmt = ("%(asctime)s %(levelname)s (%(threadName)s) " "[%(name)s] %(message)s") colorfmt = "%(log_color)s{}%(reset)s".format(fmt) datefmt = '%Y-%m-%d %H:%M:%S' # Suppress overly verbose logs from libraries that aren't helpful logging.getLogger('requests').setLevel(logging.WARNING) logging.getLogger('urllib3').setLevel(logging.WARNING) logging.getLogger('aiohttp.access').setLevel(logging.WARNING) try: from colorlog import ColoredFormatter logging.getLogger().handlers[0].setFormatter(ColoredFormatter( colorfmt, datefmt=datefmt, reset=True, log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red', } )) except ImportError: pass logger = logging.getLogger('') logger.setLevel(log_level)
def kas(argv): """ The main entry point of kas. """ create_logger() parser = kas_get_argparser() args = parser.parse_args(argv) if args.debug: logging.getLogger().setLevel(logging.DEBUG) logging.info('%s %s started', os.path.basename(sys.argv[0]), __version__) loop = asyncio.get_event_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, interruption) atexit.register(_atexit_handler) for plugin in getattr(kasplugin, 'plugins', []): if plugin().run(args): return parser.print_help()
def __init__(self, pool_names, max_restarts=0, options=None): self.names = pool_names self.queue = multiprocessing.Queue() self.pool = dict() self.max_restarts = max_restarts self.options = options or dict() self.dog_path = os.curdir self.dog_handler = LiveReload(self) # self.dog_observer = Observer() # self.dog_observer.schedule(self.dog_handler, self.dog_path, recursive=True) if multiprocessing.get_start_method() != 'fork': # pragma: no cover root_logger = logging.getLogger() self.log_listener = QueueListener(self.queue, *root_logger.handlers) # TODO: Find out how to get the watchdog + livereload working on a later moment. # self.dog_observer.start() self._restarts = dict()
def init_logger(logger_name): # initialize logger log = logging.getLogger(logger_name) _h = logging.FileHandler('%s/%s' % ( cfg.CONF.service.service_log_path, cfg.CONF.service.service_log_filename)) _h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:" "%(lineno)s - %(levelname)s" " - %(message)s'")) log.addHandler(_h) if cfg.CONF.service.enable_debug_log_entries: log.setLevel(logging.DEBUG) else: log.setLevel(logging.INFO) return log
def configure_logging(self): """ Configure logging to log to std output as well as to log file """ log_level = logging.DEBUG log_filename = datetime.now().strftime('%Y-%m-%d') + '.log' sp_logger = logging.getLogger('sp_logger') sp_logger.setLevel(log_level) formatter = logging.Formatter('%(asctime)s : %(levelname)s : %(message)s') fh = logging.FileHandler(filename=self.log_dir + log_filename) fh.setLevel(log_level) fh.setFormatter(formatter) sp_logger.addHandler(fh) sh = logging.StreamHandler(sys.stdout) sh.setLevel(log_level) sh.setFormatter(formatter) sp_logger.addHandler(sh)
def get_audit_actions(self, date_modified, offset=0, page_length=100): """ Get all actions created after a specified date. If the number of actions found is more than 100, this function will page until it has collected all actions :param date_modified: ISO formatted date/time string. Only actions created after this date are are returned. :param offset: The index to start retrieving actions from :param page_length: How many actions to fetch for each page of action results :return: Array of action objects """ logger = logging.getLogger('sp_logger') actions_url = self.api_url + 'actions/search' response = self.authenticated_request_post( actions_url, data=json.dumps({ "modified_at": {"from": str(date_modified)}, "offset": offset, "status": [0, 10, 50, 60] }) ) result = self.parse_json(response.content) if response.status_code == requests.codes.ok else None self.log_http_status(response.status_code, 'GET actions') if result is None or None in [result.get('count'), result.get('offset'), result.get('total'), result.get('actions')]: return None return self.get_page_of_actions(logger, date_modified, result, offset, page_length)
def init(verbose=2, sendto=True, backupCount=5): """ Set's up some simple default handling to make it easier for those wrapping this library. You do not need to call this function if you don't wnat to; ideally one might want to set up things their own way. """ # Add our handlers at the parent level add_handler( logging.getLogger(SQLALCHEMY_LOGGER), sendto=True, backupCount=backupCount, ) add_handler( logging.getLogger(NEWSREAP_LOGGER), sendto=True, backupCount=backupCount, ) if verbose: set_verbosity(verbose=verbose)
def forwarder(tasks, interval, batch_size, source, dest): '''Forward items from one storage to another.''' from .utils import RunFlag, load_manager, redis_client from .store import QueueStore log = logging.getLogger('dsq.forwarder') if not tasks and not source: print('--tasks or --source must be provided') sys.exit(1) s = QueueStore(redis_client(source)) if source else load_manager(tasks).queue d = QueueStore(redis_client(dest)) run = RunFlag() while run: batch = s.take_many(batch_size) if batch['schedule'] or batch['queues']: try: d.put_many(batch) except Exception: s.put_many(batch) log.exception('Forward error') raise else: time.sleep(interval)
def main(): import argparse logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler(sys.stdout)) parser = argparse.ArgumentParser(description="Linux distro info tool") parser.add_argument( '--json', '-j', help="Output in machine readable format", action="store_true") args = parser.parse_args() if args.json: logger.info(json.dumps(info(), indent=4, sort_keys=True)) else: logger.info('Name: %s', name(pretty=True)) distribution_version = version(pretty=True) if distribution_version: logger.info('Version: %s', distribution_version) distribution_codename = codename() if distribution_codename: logger.info('Codename: %s', distribution_codename)
def add_stderr_logger(level=logging.DEBUG): """ Helper for quickly adding a StreamHandler to the logger. Useful for debugging. Returns the handler after adding it. """ # This method needs to be in this __init__.py to get the __name__ correct # even if urllib3 is vendored within another package. logger = logging.getLogger(__name__) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s')) logger.addHandler(handler) logger.setLevel(level) logger.debug('Added a stderr logging handler to logger: %s', __name__) return handler # ... Clean up.
def configureLogging(level, console, file): logger = logging.getLogger() logger.setLevel(level) formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') if console: cons = logging.StreamHandler() cons.setLevel(level) cons.setFormatter(formatter) logger.addHandler(cons) print("logging to console") if file: f = logging.FileHandler(file) f.setLevel(level) f.setFormatter(formatter) logger.addHandler(f) print("logging to file {0}".format(file))
def init_logging(logfile, debug=True, level=None): """ Simple configuration of logging. """ if debug: log_level = logging.DEBUG else: log_level = logging.INFO # allow user to override exact log_level if level: log_level = level logging.basicConfig(level=log_level, format='%(asctime)s %(levelname)-8s [%(name)s] %(message)s', filename=logfile, filemode='a') return logging.getLogger("circus")
def setup_platform(hass, config, add_devices, discovery_info=None): """Set up the MyQ garage door.""" username = config.get(CONF_USERNAME) password = config.get(CONF_PASSWORD) logger = logging.getLogger(__name__) if username is None or password is None: logger.error("MyQ Cover - Missing username or password.") return try: brand = BRAND_MAPPINGS[config.get(CONF_BRAND)]; except KeyError: logger.error("MyQ Cover - Missing or unsupported brand. Supported brands: %s", ', '.join(SUPPORTED_BRANDS)) return myq = MyQAPI(username, password, brand, logger) add_devices(MyQCoverDevice(myq, door) for door in myq.get_garage_doors())
def __init__(self, parent_orb, execparams, poa): # The CORBA name this object is registered under self.naming_service_name = execparams['NAME_BINDING'] # The parent ORB for this object self.parent_orb = parent_orb # The CORBA portable object adapter self.poa = poa # The uuid assigned to this instance of the component self.uuid = execparams['COMPONENT_IDENTIFIER'] # The storage of property values that don't have getters/setters self.propertySet = {} execparams_value = " ".join(["%s %s" % x for x in execparams.items()]) self.propertySet[getId("execparams")] = CF.DataType(id=getId("execparams"), value=omniORB.any.to_any(execparams_value)) # The PID of the child process self._pid = None self._log = logging.getLogger(self.naming_service_name) ###################################### # Implement the Resource interface
def __init__(self, devmgr=None, uuid=None, label=None, softwareProfile=None): self.props = {} self.uuid = uuid self._devmgr = devmgr self._label = label self._usageState = CF.Device.IDLE self._adminState = CF.Device.UNLOCKED self._operationalState = CF.Device.ENABLED self._softwareProfile = softwareProfile self._compositeDevice = None self._log = logging.getLogger(label) if self._devmgr: self._devmgr.registerDevice(self._this()) # Helper Methods
def main(): # Set up a console logger. console = logging.StreamHandler() formatter = logging.Formatter("%(asctime)s %(name)-12s:%(levelname)-8s: %(message)s") console.setFormatter(formatter) logging.getLogger().addHandler(console) logging.getLogger().setLevel(logging.INFO) kw = {} longopts = ['domainname=', 'verbose'] opts, args = getopt.getopt(sys.argv[1:], 'v', longopts) for opt, val in opts: if opt == '--domainname': kw['domainName'] = val if opt in ['-v', '--verbose']: kw['verbose'] = True a = QApplication(sys.argv) QObject.connect(a,SIGNAL("lastWindowClosed()"),a,SLOT("quit()")) w = BrowseWindow(**kw) w.show() a.exec_()
def __init__(self, resource=None ): self._mgr_lock = threading.Lock() self._ecm = None self._logger = logging.getLogger("ossie.events.Manager") self._logger.setLevel(logging.INFO) self._allow = True self._registrations=[] if resource : try: self._logger.debug("Requesting Domain Manager Access....") dom = resource.getDomainManager() self._logger.debug("Requesting EventChannelManager Access....") self._ecm = dom.getRef()._get_eventChannelMgr() self._logger.debug("Acquired reference to EventChannelManager") except: #print traceback.format_exc() self._logger.warn("EventChannelManager - unable to resolve DomainManager's EventChannelManager ") pass
def run(self, args=None, namespace=None): options = self.parser.parse_args(args=args, namespace=namespace) enable_pretty_logging() logger = logging.getLogger(__name__) # todo configure_logger() method ? if options.debug: logging.getLogger('root').setLevel(logging.INFO) if options.verbose: if options.verbose >= 1: logging.getLogger('root').setLevel(logging.DEBUG) if options.verbose >= 2: logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO if options.verbose < 2 else logging.DEBUG) try: handler = options.handler except AttributeError as e: if not callable(self.default_handler): raise handler = None return (handler or self.default_handler)(logger, options)
def setup(name=__name__, level=logging.INFO): logger = logging.getLogger(name) if logger.handlers: return logger logger.setLevel(level) try: # check if click exists to swap the logger import click # noqa formatter = ColorFormatter('[.] %(message)s') except ImportError: formatter = CustomFormatter('[.] %(message)s') handler = logging.StreamHandler(None) handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger
def merge_files(groups, outdir): """ Merge files that belong to the same filename group. Merged files are created in the output directory. Args: groups: Dictionary of filename groups from `group_filenames`. outdir: Output path for merged files. """ logger = logging.getLogger("mergeFQs." + "merge") for groupname, filenames in groups.iteritems(): logger.info("Merging group " + groupname + " with " + str(len(filenames)) + " files...") outpath = os.path.join(outdir, groupname) logger.info("Creating merge file " + outpath + "...") with open(outpath, "wb") as outfile: for filename in filenames: logger.info("Adding " + filename + "...") with open(filename, "rb") as fq_file: shutil.copyfileobj(fq_file, outfile)
def main(): """ Run main code 1. Get arguments 2. Setup logging 3. Group filenames 4. Merge files """ args = get_args() setup_logging(args.outdir) logger = logging.getLogger("mergeFQs." + __name__) logger.info(str(len(args.fastqs)) + " input files provided") logger.info("Filename pattern is " + args.pattern) pattern = args.pattern.split(args.separator) ex_file = args.fastqs[0] ex_merge = merge_filename(ex_file, pattern, args.separator) logger.info("Example merge: " + ex_file + " -> " + os.path.join(args.outdir, ex_merge)) file_groups = group_filenames(args.fastqs, pattern, args.separator) logger.info(str(len(file_groups)) + " file groups found...") merge_files(file_groups, args.outdir)
def __init__( self, interval_in_seconds, service_name, result_dict, max_delay_seconds, disable=False ): super(SensuAlertManager, self).__init__(interval_in_seconds) self._service_name = service_name self._setup_ok_result_dict(result_dict) self._setup_delayed_result_dict() self._setup_disabled_alert_dict() self._log = logging.getLogger('{}.util.sensu_alert_manager'.format(service_name)) self._disable = disable self._should_send_sensu_disabled_message = False self._max_delay = timedelta(seconds=max_delay_seconds)
def __init__(self, session, api_id, api_hash, proxy=None, timeout=timedelta(seconds=5)): """Initializes the Telegram client with the specified API ID and Hash. Session must always be a Session instance, and an optional proxy can also be specified to be used on the connection. """ self.session = session self.api_id = int(api_id) self.api_hash = api_hash self.proxy = proxy self._timeout = timeout self._logger = logging.getLogger(__name__) # Cache "exported" senders 'dc_id: TelegramBareClient' and # their corresponding sessions not to recreate them all # the time since it's a (somewhat expensive) process. self._cached_clients = {} # These will be set later self.dc_options = None self._sender = None # endregion # region Connecting
def run_daemon(server, pidfile, daemonize=True): """Run the server as a daemon :param server: cutlery (a Spoon or Spork) :param pidfile: the file to keep the parent PID :param daemonize: if True fork the processes into a daemon. :return: """ logger = logging.getLogger(server.server_logger) if daemonize: detach(pidfile=pidfile, logger=logger) elif pidfile: with open(pidfile, "w+") as pidf: pidf.write("%s\n" % os.getpid()) try: server.serve_forever() finally: try: os.remove(pidfile) except OSError: pass
def __init__(self, address): self.log = logging.getLogger(self.server_logger) self.socket = None if ":" in address[0]: self.address_family = socket.AF_INET6 else: self.address_family = socket.AF_INET self.log.debug("Listening on %s", address) super(_SpoonMixIn, self).__init__(address, self.handler_klass, bind_and_activate=False) self.load_config() self._setup_socket() # Finally, set signals if self.signal_reload is not None: signal.signal(self.signal_reload, self.reload_handler) if self.signal_shutdown is not None: signal.signal(self.signal_shutdown, self.shutdown_handler)
def instantiate(p): print("*** instantiate ***") print(p) with rlock: global logger logger = logging.getLogger("freepydius-logger") logger.setLevel(logging.INFO) handler = TimedRotatingFileHandler(_LOG_FILE, when="midnight", interval=1) formatter = logging.Formatter("%(asctime)s %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) log = Log("INSTANCE") log.log(( ('Response', 'created'), )) # return 0 for success or -1 for failure return 0
def monitor(): """Wrapper to call console with a loop.""" devicelist = ( { "address": "3c4fc5", "cat": 0x05, "subcat": 0x0b, "firmware": 0x00 }, { "address": "43af9b", "cat": 0x02, "subcat": 0x1a, "firmware": 0x00 } ) log = logging.getLogger(__name__) loop = asyncio.get_event_loop() asyncio.async(console(loop, log, devicelist)) loop.run_forever()
def __init__(self): """Create the Protocol object.""" self.log = logging.getLogger(__name__) self._codelist = [] self.add(0x50, name='INSTEON Standard Message Received', size=11) self.add(0x51, name='INSTEON Extended Message Received', size=25) self.add(0x52, name='X10 Message Received', size=4) self.add(0x53, name='ALL-Linking Completed', size=10) self.add(0x54, name='Button Event Report', size=3) self.add(0x55, name='User Reset Detected', size=2) self.add(0x56, name='ALL-Link CLeanup Failure Report', size=2) self.add(0x57, name='ALL-Link Record Response', size=10) self.add(0x58, name='ALL-Link Cleanup Status Report', size=3) self.add(0x60, name='Get IM Info', size=2, rsize=9) self.add(0x61, name='Send ALL-Link Command', size=5, rsize=6) self.add(0x62, name='INSTEON Fragmented Message', size=8, rsize=9) self.add(0x64, name='Start ALL-Linking', size=4, rsize=5) self.add(0x65, name='Cancel ALL-Linking', size=4) self.add(0x67, name='Reset the IM', size=2, rsize=3) self.add(0x69, name='Get First ALL-Link Record', size=2) self.add(0x6a, name='Get Next ALL-Link Record', size=2) self.add(0x73, name='Get IM Configuration', size=2, rsize=6)
def __init__(self, strat): self.log = logging.getLogger('strategy.StrategyIterator({})'.format(str(strat))) self.strategy = strat sig = inspect.signature(strat.generate) params = sig.parameters kws = {} self.log.debug('init') for kw in params: if kw in strat._kws: self.log.debug('add keyword {kw}'.format(kw=kw)) kws[kw] = strat._kws[kw] elif params[kw].kind == inspect.Parameter.VAR_KEYWORD: self.log.debug('merge keywords on VAR_KEYWORD {kw}'.format(kw=kw)) kws.update(strat._kws) break self._generator = strat.generate(strat._depth, *strat._args, **kws)
def main(): """Run newer stuffs.""" logging.basicConfig(format=LOGGING_FORMAT) log = logging.getLogger(__name__) parser = argparse.ArgumentParser() add_debug(parser) add_app(parser) add_env(parser) add_region(parser) add_properties(parser) parser.add_argument("--elb-subnet", help="Subnetnet type, e.g. external, internal", required=True) args = parser.parse_args() logging.getLogger(__package__.split('.')[0]).setLevel(args.debug) log.debug('Parsed arguments: %s', args) spinnakerapps = SpinnakerDns( app=args.app, env=args.env, region=args.region, prop_path=args.properties, elb_subnet=args.elb_subnet) spinnakerapps.create_elb_dns()
def main(): """Destroy any DNS related resources of an application Records in any Hosted Zone for an Environment will be deleted. """ logging.basicConfig(format=LOGGING_FORMAT) parser = argparse.ArgumentParser(description=main.__doc__) add_debug(parser) add_app(parser) add_env(parser) args = parser.parse_args() logging.getLogger(__package__.split('.')[0]).setLevel(args.debug) assert destroy_dns(**vars(args))
def main(): """Command to create IAM Instance Profiles, Roles, Users, and Groups. IAM Roles will retain any attached Managed Policies. Inline Policies that do not match the name *iam-project_repo_policy* will also be left untouched. **WARNING**: Inline Policies named *iam-project_repo_policy* will be rewritten. """ logging.basicConfig(format=LOGGING_FORMAT) parser = argparse.ArgumentParser(description=main.__doc__) add_debug(parser) add_app(parser) add_env(parser) args = parser.parse_args() logging.getLogger(__package__.split('.')[0]).setLevel(args.debug) assert create_iam_resources(**args.__dict__)
def main(): """CLI entrypoint for scaling policy creation""" logging.basicConfig(format=LOGGING_FORMAT) log = logging.getLogger(__name__) parser = argparse.ArgumentParser() add_debug(parser) add_app(parser) add_properties(parser) add_env(parser) add_region(parser) args = parser.parse_args() logging.getLogger(__package__.split('.')[0]).setLevel(args.debug) log.debug('Parsed arguments: %s', args) asgpolicy = AutoScalingPolicy(app=args.app, prop_path=args.properties, env=args.env, region=args.region) asgpolicy.create_policy()
def main(): """Entry point for ELB creation""" logging.basicConfig(format=LOGGING_FORMAT) parser = argparse.ArgumentParser(description='Example with non-optional arguments') add_debug(parser) add_app(parser) add_env(parser) add_region(parser) add_properties(parser) args = parser.parse_args() logging.getLogger(__package__.split('.')[0]).setLevel(args.debug) elb = SpinnakerELB(app=args.app, env=args.env, region=args.region, prop_path=args.properties) elb.create_elb()
def main(): """Send Slack notification to a configured channel.""" logging.basicConfig(format=LOGGING_FORMAT) log = logging.getLogger(__name__) parser = argparse.ArgumentParser() add_debug(parser) add_app(parser) add_env(parser) add_properties(parser) args = parser.parse_args() logging.getLogger(__package__.split(".")[0]).setLevel(args.debug) log.debug('Parsed arguements: %s', args) if "prod" not in args.env: log.info('No slack message sent, not a production environment') else: log.info("Sending slack message, production environment") slacknotify = SlackNotification(app=args.app, env=args.env, prop_path=args.properties) slacknotify.post_message()
def __init__(self, app='', trigger_job='', prop_path='', base='', runway_dir=''): self.log = logging.getLogger(__name__) self.header = {'content-type': 'application/json'} self.here = os.path.dirname(os.path.realpath(__file__)) self.runway_dir = os.path.expandvars(os.path.expanduser(runway_dir or '')) self.base = base self.trigger_job = trigger_job self.generated = get_details(app=app) self.app_name = self.generated.app_name() self.group_name = self.generated.project self.settings = get_properties(prop_path) self.environments = self.settings['pipeline']['env']
def main(): """Create Lambda events.""" logging.basicConfig(format=LOGGING_FORMAT) log = logging.getLogger(__name__) parser = argparse.ArgumentParser(description=main.__doc__) add_debug(parser) add_app(parser) add_env(parser) add_properties(parser) add_region(parser) args = parser.parse_args() logging.getLogger(__package__.split('.')[0]).setLevel(args.debug) log.debug('Parsed arguments: %s', args) lambda_function = LambdaFunction(app=args.app, env=args.env, region=args.region, prop_path=args.properties) lambda_function.create_lambda_function() lambda_event = LambdaEvent(app=args.app, env=args.env, region=args.region, prop_path=args.properties) lambda_event.create_lambda_events()
def main(): """Create any API Gateway event related resources.""" logging.basicConfig(format=LOGGING_FORMAT) parser = argparse.ArgumentParser(description=main.__doc__) add_debug(parser) add_app(parser) add_env(parser) add_region(parser) add_properties(parser) args = parser.parse_args() logging.getLogger(__package__.split('.')[0]).setLevel(args.debug) apigateway = APIGateway(**vars(args)) apigateway.setup_lambda_api()
def __init__(self, app='', env='', region='', rules={}, prop_path=''): self.log = logging.getLogger(__name__) self.generated = get_details(app=app, env=env) self.trigger_settings = rules self.app_name = self.generated.app_name() self.env = env self.account_id = get_env_credential(env=self.env)['accountId'] self.region = region self.properties = get_properties(properties_file=prop_path, env=self.env) session = boto3.Session(profile_name=env, region_name=region) self.client = session.client('apigateway') self.lambda_client = session.client('lambda') self.api_version = self.lambda_client.meta.service_model.api_version self.api_id = self.find_api_id() self.resource_id, self.parent_id = self.find_resource_ids()
def __init__(self): self.db = cms.config['db'] self.logger = logging.getLogger('redberry')
def __init__(self, methodName): super(RedTestCase, self).__init__(methodName) self.test_client = app.test_client() self.db = db self.logger = logging.getLogger('redberry.tests') self.url_prefix = url_prefix
def init_logger(): formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]') logger = logging.getLogger('redberry') logger.setLevel(logging.DEBUG) console = logging.StreamHandler() console.setFormatter(formatter) logger.addHandler(console)
def __init__(self, *args): self.log = logging.getLogger(resource_filename(__name__, __file__)) self.cluster = Cluster(args[0]) self.connection = self.cluster.connect() self.connection.row_factory = tuple_factory self.connection.execute("CREATE KEYSPACE IF NOT EXISTS public \ WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };") self.connection.execute("CREATE KEYSPACE IF NOT EXISTS internal \ WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };") self.connection.execute("CREATE TABLE IF NOT EXISTS public.users ( \ name text PRIMARY KEY, \ n text, \ e text, \ secret text);") self.connection.execute("CREATE TABLE IF NOT EXISTS public.contracts ( \ id uuid PRIMARY KEY, \ owner text, \ package text, \ template blob, \ example blob);")