我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用click.get_app_dir()。
def __init__(self, config_file=None, default_profile='default'): self.default_profile = default_profile self.config = { 'default-profile': self.default_profile, 'requirements': None, 'profiles': {}} if config_file: self.config_file = config_file else: self.config_file = os.path.join( click.get_app_dir('datafs'), 'config.yml') # Create default directory if it does not exist # If using a user-supplied file, they are responsible for creating # the directory. if not os.path.isdir(os.path.dirname(self.config_file)): os.makedirs(os.path.dirname(self.config_file))
def load(self): config_paths = [ os.path.expanduser('~/.hipcat.ini'), os.path.join(click.get_app_dir(APP_NAME), 'config.ini'), os.path.join( click.get_app_dir(APP_NAME, force_posix=True), 'config.ini' ), ] config = configparser.ConfigParser() configs_found = config.read(config_paths) if not configs_found: raise ConfigurationError(""" Missing configuration! You must provide configuration in one of the following places: """ + "\n".join(' * {}'.format(path) for path in config_paths) + """ """ ) self.config = config return self
def main(): global cfgfile init_logging() cfgfile = os.path.join(click.get_app_dir("ecxcli"), 'config.ini') cfgdir = os.path.dirname(cfgfile) if not os.path.exists(cfgdir): os.makedirs(cfgdir) try: cli() except Exception as e: logging.error(traceback.format_exc()) exctype, value = sys.exc_info()[:2] click.secho(traceback.format_exception_only(exctype, value)[0], fg='red') process_http_error(e)
def setup_assistant(): # Load credentials. try: credentials = os.path.join( click.get_app_dir(common_settings.ASSISTANT_APP_NAME), common_settings.ASSISTANT_CREDENTIALS_FILENAME ) global creds creds = auth_helpers.load_credentials(credentials, scopes=[common_settings.ASSISTANT_OAUTH_SCOPE, common_settings.PUBSUB_OAUTH_SCOPE]) except Exception as e: logging.error('Error loading credentials: %s', e) logging.error('Run auth_helpers to initialize new OAuth2 credentials.') return -1 # Create gRPC channel grpc_channel = auth_helpers.create_grpc_channel(ASSISTANT_API_ENDPOINT, creds) logging.info('Connecting to %s', ASSISTANT_API_ENDPOINT) # Create Google Assistant API gRPC client. global assistant assistant = embedded_assistant_pb2.EmbeddedAssistantStub(grpc_channel) return 0
def __init__(self, app_name, mode='w+', cfg_name=None, box=None, keyring=True, service_name=None, **kwargs): args = (kwargs, Config.funct_args, Config.bad_kwds, Config.safe_kwds) self.kwargs = group_kwargs_by_funct(*args) self.box = box mode = mode or '' frozen = kwargs.get('frozen_box') self.readable = 'r' in mode or '+' in mode and not frozen self.writeable = 'w' in mode or '+' in mode if self.readable or self.writeable: cfg_name = cfg_name or Config.cfg_name app_dir_kwargs = self.kwargs['get_app_dir'] self.filename = get_filename(app_name, cfg_name, **app_dir_kwargs) self.keyring = keyring if keyring: service = (service_name or app_name) + '_' + get_user() KeyringAttrDict.service = service KeyringAttrDict.keyring = KeyringAttrDict.keyring or keyring_module if keyring and keyring is not True: KeyringAttrDict.set_keyring(keyring)
def write_config(self, fp=None): if fp is None: fp = os.path.join(click.get_app_dir('datafs'), 'config.yml') with open_filelike(fp, 'w+') as f: f.write(yaml.dump(self.config))
def main_group(ctx, name, daemon, mode, master, slave, etcd_host, quiet, verbose, config): """This is the command line interface to ScarlettOS. """ # NOTE: ctx # Most public functions are actually methods of a 'context' object which # is passed as the first parameter (ctx). The context object stores the # precision, cached data, and a few other things. It also defines # conversions so that the same high-level code can be used for several # different base types (mpf, mpfs in Sage, intervals, Python floats) by # switching contexts. # # The default context is called 'mp'. You can call most functions as # mpmath.mp.foo(). The top-level function mpmath.foo() is just an alias # for this. ctx.obj = {} config = config or os.path.join(click.get_app_dir('scarlett_os'), 'scarlett_os.ini') cfg = read_config(config) if cfg: ctx.obj['config_file'] = config ctx.obj['cfg'] = cfg ctx.default_map = cfg verbosity = (os.environ.get('SCARLETTOS_VERBOSE') or ctx.lookup_default('scarlett_os.verbosity') or 0) if verbose or quiet: verbosity = verbose - quiet verbosity = int(verbosity) configure_logging(verbosity) ctx.obj['verbosity'] = verbosity
def read_config(): """Reads configuration storj client configuration. Mac OS X (POSIX): ~/.foo-bar Unix (POSIX): ~/.foo-bar Win XP (not roaming): ``C:\Documents and Settings\<user>\Application Data\storj`` Win 7 (not roaming): ``C:\\Users\<user>\AppData\Local\storj`` Returns: (tuple[str, str]): storj account credentials (email, password). """ # OSX: /Users/<username>/.storj cfg = os.path.join( click.get_app_dir( APP_NAME, force_posix=True), 'storj.ini') parser = RawConfigParser() parser.read([cfg]) return parser.get(APP_NAME, CFG_EMAIL), parser.get(APP_NAME, CFG_PASSWORD)
def __init__(self, api_key=None, library_id=None, library_type='user', autosync=False): """ Service class for communicating with the Zotero API. This is mainly a thin wrapper around :py:class:`pyzotero.zotero.Zotero` that handles things like transparent HTML<->[edit-formt] conversion. :param api_key: API key for the Zotero API, will be loaded from the configuration if not specified :param library_id: Zotero library ID the API key is valid for, will be loaded from the configuration if not specified :param library_type: Type of the library, can be 'user' or 'group' """ self._logger = logging.getLogger() idx_path = os.path.join(click.get_app_dir(APP_NAME), 'index.sqlite') self.config = load_config() self.note_format = self.config['zotcli.note_format'] self.storage_dir = self.config.get('zotcli.storage_dir') api_key = api_key or self.config.get('zotcli.api_key') library_id = library_id or self.config.get('zotcli.library_id') if not api_key or not library_id: raise ValueError( "Please set your API key and library ID by running " "`zotcli configure` or pass them as command-line options.") self._zot = Zotero(library_id=library_id, api_key=api_key, library_type=library_type) self._index = SearchIndex(idx_path) sync_interval = self.config.get('zotcli.sync_interval', 300) since_last_sync = int(time.time()) - self._index.last_modified if autosync and since_last_sync >= int(sync_interval): click.echo("{} seconds since last sync, synchronizing." .format(since_last_sync)) num_updated = self.synchronize() click.echo("Updated {} items".format(num_updated))
def _get_config_path(): return os.path.join(click.get_app_dir(APP_NAME), 'config.ini')
def get_app_dir(): import click return click.get_app_dir('q2cli', roaming=False) # NOTE: `get_cache_dir` and `get_completion_path` live here instead of # `q2cli.cache` because `q2cli.cache` can be slow to import. # `get_completion_path` (which relies on `get_cache_dir`) is imported and # executed by the Bash completion function each time the user hits <tab>, so it # must be quick to import.
def get_cache_dir(): import os.path return os.path.join(get_app_dir(), 'cache')
def get_app_dir(*args): return os.path.join(click.get_app_dir('cget'), *args)
def get_cache_path(*args): return get_app_dir('cache', *args)
def read_config(config_path): if not config_path: config_path = click.get_app_dir(APP_NAME) else: config_path = os.path.abspath(config_path) config_file = os.path.join(config_path, 'config.yml') config_dict = copy.copy(DEFAULT_CONFIG) if os.path.exists(config_file): with io.open(config_file, encoding='utf-8') as file_handle: yaml_dict = poyo.parse_string(file_handle.read()) update(config_dict, yaml_dict) return config_dict
def get_template_path(template_name, config_path): if not config_path: config_path = click.get_app_dir(APP_NAME) else: config_path = os.path.abspath(config_path) template_path = os.path.join(config_path, 'templates', template_name) template_file = os.path.join(template_path, "Dockerfile") if not os.path.isdir(template_path): raise ValueError("{0} does not exist. Please create the template directory".format(template_path)) if not os.path.isfile(template_file): raise ValueError("{0}: Dockerfile was not found in {1}.".format(template_name, template_path)) return template_path
def setup_logging(): """Reads the Storj GUI logging configuration from logging.conf. If the file does not exist it will load a default configuration. Mac OS X (POSIX): ~/.storj-gui Unix (POSIX): ~/.storj-gui Win XP (not roaming): ``C:\Documents and Settings\<user>\Application Data\storj-gui`` Win 7 (not roaming): ``C:\\Users\<user>\AppData\Local\storj-gui`` """ logging_conf = os.path.join( click.get_app_dir(APP_NAME, force_posix=True), 'logging.conf') if not os.path.exists(logging_conf) or not os.path.isfile(logging_conf): load_default_logging() logging.getLogger(__name__).warning('%s logging configuration file does not exist', logging_conf) return try: config.fileConfig(logging_conf, disable_existing_loggers=False) logging.getLogger(__name__).info('%s configuration file was loaded.', logging_conf) except RuntimeError: load_default_logging() logging.getLogger(__name__).warning('failed to load configuration from %s', logging_conf) return logging.getLogger(__name__).info('using logging configuration from %s', logging_conf)
def completion_dir() -> str: """ Get the name of the completion directory """ return click.get_app_dir("temci")
def load_from_config_dir(self): """ Load the config file from the application directory (e.g. in the users home folder) if it exists. """ conf = os.path.join(click.get_app_dir("temci"), "config.yaml") if os.path.exists(conf) and os.path.isfile(conf): self.load_file(conf)
def __init__(self, loop): """ :param asyncio.AbstractEventLoop | None loop: """ # Information about host and user. self.host_name = os.uname()[1] self.user_name = get_login_username() self.user_uid = getpwnam(self.user_name).pw_uid self.user_home = os.path.expanduser('~' + self.user_name) self.config_dir = click.get_app_dir('onedrived') self._create_config_dir_if_missing() self.config = self.DEFAULT_CONFIG self.loop = loop self._watcher = None
def setUp(self): self.tempdir = tempfile.TemporaryDirectory() click.get_app_dir = lambda x: self.tempdir.name + '/' + x od_main.context = od_main.load_context() od_main.context._create_config_dir_if_missing()
def setUp(self): self.tempdir = tempfile.TemporaryDirectory() click.get_app_dir = lambda x: self.tempdir.name + '/' + x od_pref.context = od_pref.load_context() od_pref.context._create_config_dir_if_missing()
def initialize(self): self.ASSISTANT_API_ENDPOINT = 'embeddedassistant.googleapis.com' self.END_OF_UTTERANCE = embedded_assistant_pb2.ConverseResponse.END_OF_UTTERANCE self.DIALOG_FOLLOW_ON = embedded_assistant_pb2.ConverseResult.DIALOG_FOLLOW_ON self.CLOSE_MICROPHONE = embedded_assistant_pb2.ConverseResult.CLOSE_MICROPHONE api_endpoint=self.ASSISTANT_API_ENDPOINT credentials=os.path.join(click.get_app_dir(common_settings.ASSISTANT_APP_NAME), common_settings.ASSISTANT_CREDENTIALS_FILENAME) verbose=False self.audio_sample_rate=common_settings.DEFAULT_AUDIO_SAMPLE_RATE self.audio_sample_width=common_settings.DEFAULT_AUDIO_SAMPLE_WIDTH self.audio_iter_size=common_settings.DEFAULT_AUDIO_ITER_SIZE self.audio_block_size=common_settings.DEFAULT_AUDIO_DEVICE_BLOCK_SIZE self.audio_flush_size=common_settings.DEFAULT_AUDIO_DEVICE_FLUSH_SIZE self.grpc_deadline=common_settings.DEFAULT_GRPC_DEADLINE # Load credentials. try: creds = auth_helpers.load_credentials(credentials, scopes=[common_settings.ASSISTANT_OAUTH_SCOPE]) except Exception as e: self.error('Error loading credentials: %s', e) self.error('Run auth_helpers to initialize new OAuth2 credentials.') return # Create gRPC channel grpc_channel = auth_helpers.create_grpc_channel(api_endpoint, creds, ssl_credentials_file="", grpc_channel_options="") self.log('Connecting to google') # Create Google Assistant API gRPC client. self.assistant = embedded_assistant_pb2.EmbeddedAssistantStub(grpc_channel) # Configure audio source and sink. self.audio_device = None self.audio_source = self.audio_device = (self.audio_device or audio_helpers.SoundDeviceStream(sample_rate=self.audio_sample_rate, sample_width=self.audio_sample_width, block_size=self.audio_block_size, flush_size=self.audio_flush_size)) self.audio_sink = self.audio_device = (self.audio_device or audio_helpers.SoundDeviceStream(sample_rate=self.audio_sample_rate, sample_width=self.audio_sample_width, block_size=self.audio_block_size, flush_size=self.audio_flush_size)) # Create conversation stream with the given audio source and sink. self.conversation_stream = audio_helpers.ConversationStream(source=self.audio_source, sink=self.audio_sink, iter_size=self.audio_iter_size) self.conversation_state_bytes = None self.volume_percentage = 70 self.listen_state(self.startGH,self.args["activation_boolean"],new="on") self.log("App started. now listening to Homeassistant input")
def cli(ctx, status_file): """ Runs the tutorial. """ ctx.obj = {'status_filename': os.path.join( click.get_app_dir('Click Tutorial'), status_file)} lessons = get_lessons(ctx.obj['status_filename']) if lessons: ctx.obj['lessons'] = lessons else: ctx.abort()
def get_dir_path(cls): return click.get_app_dir(cls.APP_NAME, False, False)
def __init__(self, auction_contract_addr, chain, state_file_path: str=click.get_app_dir('event_sampler')): self.contract_addr = auction_contract_addr self.chain = chain Auction = self.chain.provider.get_contract_factory('DutchAuction') self.auction_contract = Auction(address=auction_contract_addr) self.auction_contract_addr = auction_contract_addr self.state = EventSamplerState(state_file_path) callbacks = { 'BidSubmission': self.on_bid_submission, 'AuctionEnded': self.on_auction_end, 'Deployed': self.on_deployed_event, 'AuctionStarted': self.on_auction_start, 'ClaimedTokens': self.on_claimed_tokens } self.events = defaultdict(list) self.auction_start_block = None self.total_claimed = 0 self.final_price = None self.auction_start_time = None self.auction_end_time = None self.price_start = None self.price_constant = None self.price_exponent = None for k, v in callbacks.items(): self.sync_events(k, v) watch_logs(self.auction_contract, k, v) # start state save event - after the events are synced self.save_event = StateSave(self.state) self.save_event.start()
def default_config_dir(): """Return default config directory.""" return click.get_app_dir(APP_NAME)
def get_game(path=None): global GAME if GAME: return GAME path = path or os.path.join(click.get_app_dir(APP_NAME), 'config.json') GAME = Game.load(path) if os.path.exists(path) else Game.create(path) return GAME
def __write_settings_file(url, username, password, authMethod, json): data = {} data['url'] = url.replace("/" + url.split('/')[-1], "") data['project'] = url.split('/')[-1] data['authMethod'] = authMethod data['username'] = username data['password'] = password data['repo_id'] = {} for project in json['value']: data['repo_id'][project['name'].lower()] = project['id'] if not os.path.exists(click.get_app_dir("Codereview")): os.makedirs(click.get_app_dir("Codereview")) stream = open(Configuration.file_path, 'w') yaml.dump(data, stream)
def test_kwarg_splitter(): kwargs = {'frozen_box': True, 'indent': 2} result = group_kwargs_by_funct(kwargs, FUNCTS) expect = {'open': {}, 'box': {'frozen_box': True}, 'get_app_dir': {}, 'load': {}, 'dump': {'indent': 2}} assert result == expect
def get_filename(app_name, cfg_name, **app_dir_kwargs): path, filename = os.path.split(app_name) if not path: path = click.get_app_dir(app_name, **app_dir_kwargs) filename = cfg_name if not os.path.exists(path): mkdirs(path) return os.path.join(path, filename)