我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.path.expanduser()。
def handle_template(self, template, subdir): """ Determines where the app or project templates are. Use django.__path__[0] as the default because we don't know into which directory Django has been installed. """ if template is None: return path.join(django.__path__[0], 'conf', subdir) else: if template.startswith('file://'): template = template[7:] expanded_template = path.expanduser(template) expanded_template = path.normpath(expanded_template) if path.isdir(expanded_template): return expanded_template if self.is_url(template): # downloads the file and returns the path absolute_path = self.download(template) else: absolute_path = path.abspath(expanded_template) if path.exists(absolute_path): return self.extract(absolute_path) raise CommandError("couldn't handle %s template %s." % (self.app_or_project, template))
def tidy_path(path): """take a filename and or directory and attempts to tidy it up by removing trailing slashes and correcting any formatting issues. For example: ////absolute//path// becomes: /absolute/path """ # Windows path = TIDY_WIN_PATH_RE.sub('\\1', path.strip()) # Linux path = TIDY_NUX_PATH_RE.sub('\\1', path.strip()) # Linux Based Trim path = TIDY_NUX_TRIM_RE.sub('\\1', path.strip()) # Windows Based Trim path = expanduser(TIDY_WIN_TRIM_RE.sub('\\1', path.strip())) return path
def connect(proxy_prefix, verbose=False, nowait=False, **kwargs): connect = Connect(proxy_prefix) if not nowait: if verbose: sys.stdout.write("Waiting...") sys.stdout.flush() while True: try: connect.check_health() if verbose: print break except: if verbose: sys.stdout.write('.') sys.stdout.flush() sleep(0.5) savefile = expanduser('~/.fastscore') connect.dump(savefile) if verbose: print "Connected to FastScore proxy at %s" % proxy_prefix
def rinexobs(obsfn,writeh5=None,maxtimes=None): stem,ext = splitext(expanduser(obsfn)) if ext[-1].lower() == 'o': #raw text file with open(obsfn,'r') as f: t=time.time() lines = f.read().splitlines(True) lines.append('') header,version,headlines,obstimes,sats,svset = scan(lines) print('{} is a RINEX {} file, {} kB.'.format(obsfn,version,getsize(obsfn)/1000.0)) data = processBlocks(lines,header,obstimes,svset,headlines,sats) print("finished in {0:.2f} seconds".format(time.time()-t)) #%% save to disk (optional) if writeh5: h5fn = stem + '.h5' print('saving OBS data to {}'.format(h5fn)) data.to_hdf(h5fn,key='OBS',mode='a',complevel=6,append=False) elif ext.lower() == '.h5': data = read_hdf(obsfn,key='OBS') print('loaded OBS data from {} to {}'.format(blocks.items[0],blocks.items[-1])) return data # this will scan the document for the header info and for the line on # which each block starts
def __init__(self, rt): super().__init__(rt) self.update_freq = 24 # in hours self.url_base = 'https://raw.githubusercontent.com/MycroftAI/precise-data/' self.exe_name = 'precise-stream' ww = self.listener_config['wake_word'] model_name = ww.replace(' ', '-') + '.pb' model_folder = expanduser('~/.mycroft/precise') if not isdir(model_folder): mkdir(model_folder) model_path = join(model_folder, model_name) exe_file = self.find_download_exe() log.info('Found precise executable: ' + exe_file) self.update_model(model_name, model_path) args = [exe_file, model_path, '1024'] self.proc = Popen(args, stdin=PIPE, stdout=PIPE) self.has_found = False self.cooldown = 20 t = Thread(target=self.check_stdout) t.daemon = True t.start()
def removeall_clicked(self, button, store): """ @description: Same as the past function but remove all lines of the treeview """ # if there is still an entry in the model old = expanduser('~') + '/.config/mama/mama.xml' new = expanduser('~') + '/.config/mama/.mama.bak' if os.path.exists(old): os.rename(old, new) if len(store) != 0: # remove all the entries in the model self.labelState.set_text('Remove all commands') for i in range(len(store)): iter = store.get_iter(0) store.remove(iter) self.saveTree(store) print("Empty list")
def declare_engines(cell, mode='new', **kwargs): home = expanduser("~") filepath = home + '/.ipython/profile_default/startup/SQLCell/engines/engines.py' engines_json = {} if mode == 'new' else __ENGINES_JSON__ for n,i in enumerate(cell.split('\n')): eng = i.split('=') name, conn = str(eng[0]), str(eng[1]) engines_json[name] = { 'engine': conn, 'caution_level': 'warning', 'order': n } with open(filepath, 'w') as f: f.write( 'import os\nimport json\n\n\n__ENGINES_JSON__ = {0}\n\n__ENGINES_JSON_DUMPS__ = json.dumps(__ENGINES_JSON__)'.format(engines_json) ) __SQLCell_GLOBAL_VARS__.__ENGINES_JSON_DUMPS__ = json.dumps(engines_json) print 'new engines created' return ''
def set_logfile(self, path=expanduser('~'), interval=5, bucket=None, prefix=''): self.data_logger = logging.getLogger('telemetry_file_logger') self.data_logger.setLevel(logging.INFO) if not bucket: handler = TimedRotatingFileHandler( path, when='S', interval=interval * 60, backupCount=0 ) else: handler = log.S3Batch( path, bucket, prefix, when='S', interval=interval * 60) handler.setFormatter('') self.data_logger.addHandler(handler)
def write_cert(content, type_, unique_name): """ Write a certificate to the filesystem args: content (str): content of certificate file type (str): type of certificate file unique_name (str): description of file returns: str: full path to certificate file """ home = expanduser("~") path = home + "/.cert/nm-openvpn/" + unique_name + "_" + type_ + ".pem" logger.info("writing {} file to {}".format(type_, path)) if not os.path.exists(os.path.dirname(path)): os.makedirs(os.path.dirname(path)) with open(path, "w") as f: f.write(content) os.chmod(path, 0o600) return path
def _cygexpath(drive, path): if osp.isabs(path) and not drive: ## Invoked from `cygpath()` directly with `D:Apps\123`? # It's an error, leave it alone just slashes) p = path else: p = path and osp.normpath(osp.expandvars(osp.expanduser(path))) if osp.isabs(p): if drive: # Confusing, maybe a remote system should expand vars. p = path else: p = cygpath(p) elif drive: p = '/cygdrive/%s/%s' % (drive.lower(), p) return p.replace('\\', '/')
def read_config(): """ If the file ~/.macshrew/MacShrew.conf does not exist, just copy the default values and load the config :return: """ global IKEC_PATH, IKED_PATH, SELECTED_PROFILE, DEBUG_ENABLED config = ConfigParser.ConfigParser() if not os.path.isdir(os.path.expanduser("~/.macshrew")): os.makedirs(os.path.expanduser("~/.macshrew")) if not os.path.isfile(os.path.expanduser("~/.macshrew")+"/MacShrew.conf"): copyfile("resources/conf/MacShrew.conf", os.path.expanduser("~/.macshrew/MacShrew.conf")) config.readfp(open(os.path.expanduser("~/.macshrew/MacShrew.conf"))) SELECTED_PROFILE = config.get("UI", "Profile") DEBUG_ENABLED = config.getboolean("UI", "VerboseLogging") if len(config.get("IKE", "ikedpath", IKED_PATH)) > 0: IKED_PATH = config.get("IKE", "ikedpath", IKED_PATH) if len(config.get("IKE", "ikecpath", IKEC_PATH)) > 0: IKEC_PATH = config.get("IKE", "ikecpath", IKEC_PATH)
def main(): # Check for environment variables if "REPOS" not in env: print("No repos. Export REPOS") return 1 if "OAUTHTOKEN" not in env: print("No Oauth token. Export OAUTHTOKEN") return 1 # Get into the right directory or set it up if it doesn't exist. root_path = path.expanduser(path.join('~', 'strongjobs-data')) if not path.exists(root_path): os.makedirs(root_path) for repo_path in env["REPOS"].split(): repo = GitRepo(root_path, repo_path) # Make sure everything is nice and up to date repo.update() package_updates = get_package_updates(repo, REQUIREMENTS_FILES, piprot) update_packages(repo, env["OAUTHTOKEN"], REQUIREMENTS_FILES, package_updates, update_requirements)
def main(): # Check for environment variables if "REPOS" not in env: print("No repos. Export REPOS") return 1 if "OAUTHTOKEN" not in env: print("No Oauth token. Export OAUTHTOKEN") return 1 # Get into the right directory or set it up if it doesn't exist. root_path = path.expanduser(path.join('~', 'strongjobs-data')) if not path.exists(root_path): os.makedirs(root_path) for repo_path in env["REPOS"].split(): repo = GitRepo(root_path, repo_path) # Make sure everything is nice and up to date repo.update() package_updates = get_package_updates(repo, PACKAGE_FILES, npm_outdated) update_packages(repo, env["OAUTHTOKEN"], PACKAGE_FILES, package_updates, update_package_json)
def main(): parser = argparse.ArgumentParser() parser.add_argument( '-d', '--data-dir', default=None, help='Dataset Destination: default $HOME/data/datasets/VOC') parser.add_argument('--voc', action='store_true') parser.add_argument('--sbd', action='store_true') args = parser.parse_args() data_dir = args.data_dir if data_dir is None: data_dir = osp.expanduser('~/data/datasets/VOC') download_funcs = [] if args.voc: download_funcs.append(get_voc) if args.sbd: download_funcs.append(get_sbd) for get_dataset in download_funcs: print('Downloading datasets') get_dataset(data_dir)
def catalyst_root(environ=None): """ Get the root directory for all catalyst-managed files. For testing purposes, this accepts a dictionary to interpret as the os environment. Parameters ---------- environ : dict, optional A dict to interpret as the os environment. Returns ------- root : string Path to the catalyst root dir. """ if environ is None: environ = os.environ root = environ.get('ZIPLINE_ROOT', None) if root is None: root = os.path.join(expanduser('~'), '.catalyst') return root
def login_as_bot(): """ Login as the bot account "octogrid", if user isn't authenticated on Plotly """ plotly_credentials_file = join( join(expanduser('~'), PLOTLY_DIRECTORY), PLOTLY_CREDENTIALS_FILENAME) if isfile(plotly_credentials_file): with open(plotly_credentials_file, 'r') as f: credentials = loads(f.read()) if (credentials['username'] == '' or credentials['api_key'] == ''): plotly.sign_in(BOT_USERNAME, BOT_API_KEY) else: plotly.sign_in(BOT_USERNAME, BOT_API_KEY)
def get_pgpass(pgpass=None): """ Get postgres' password using pgpass file. http://www.postgresql.org/docs/9.2/static/libpq-pgpass.html http://wiki.postgresql.org/wiki/Pgpass """ if pgpass is None: from os.path import expanduser home = expanduser("~") pgpass = "{0}/.pgpass".format(str(home)) ret = [] with open(pgpass, 'r') as filep: content = filep.readlines() for line in content: res = None res = re.match(r"^([^:]+):([^:]+):([^:]+):([^:]+):(.*)$", line) if res is not None: ret.append(res.group(1, 2, 3, 4, 5)) return ret raise Exception("pgpass file not found")
def _get_profile(self, profile="default"): path = expanduser("~/.azure/credentials") try: config = ConfigParser.ConfigParser() config.read(path) except Exception as exc: self.fail("Failed to access {0}. Check that the file exists and you have read " "access. {1}".format(path, str(exc))) credentials = dict() for key in AZURE_CREDENTIAL_ENV_MAPPING: try: credentials[key] = config.get(profile, key, raw=True) except: pass if credentials.get('subscription_id'): return credentials return None
def __init__(self, argsv): self.certout = os.path.join(os.path.join(expanduser("~"), 'pkcs'), 'cert') self.argsv = argsv self.parser = argparse.ArgumentParser(prog='pkcs cert', description='pkcs tool set') self.parser.add_argument('-ca-key', '--ca-key', dest='cakey', help="ca key for sign cert") self.parser.add_argument('-ca-cert', '--ca-cert', dest='cacert', help="ca cert for sign cert") self.parser.add_argument('-ca-conf', '--ca-conf', dest='caconf', help="ca openssl.conf file path") self.parser.add_argument('-cert-out', '--cert-out', dest='certout', default=self.certout, help="cert output path") self.parser.add_argument('-len', '--key-len', dest='len', default=4096, help="ca key length") self.parser.add_argument('-c', '--country', dest='c', default='CN', help="country name in subject") self.parser.add_argument('-st', '--state', dest='st', default='GuangDong', help="state or province name in subject") self.parser.add_argument('-l', '--locality', dest='l', default='ShenZhen', help="locality name in subject") self.parser.add_argument('-o', '--organization', dest='o', default='YunWeiPai', help="organization name in subject") self.parser.add_argument('-ou', '--organization-unit', dest='ou', default='YunWeiPai', help="organization unit name in subject") self.parser.add_argument('-cn', '--common-name', dest='cn', default='YunWeiPai', help="common name in subject") self.parser.add_argument('-subj-alt-name', '--subject-alt-name', dest='subjectAltName', default='*.yunweipai.com', help="subject alternative name in x509 extension") self.args = None
def invoke_ansible(arg_vars, project_root, playbook, extras = {}): config = ConfigParser.ConfigParser() engraver_profile = expanduser("~") + "/.engraver" config.read(engraver_profile) aws_access_key = config.get('aws', 'aws_access_key', 0) aws_secret_key = config.get('aws', 'aws_secret_key', 0) aws_key_name = config.get('aws', 'aws_key_name', 0) pem_file_path = config.get('aws', 'pem_file_name', 0) remote_user = config.get('aws', 'remote_user', 0) chdir(project_root + "/ansible") pre = ["ansible-playbook", "--private-key", pem_file_path, "-i", ",", "-e", ("remote_user=" + remote_user), "-e", ("onyx_cluster_id=" + arg_vars['cluster_id']), "-e", ("aws_key_name=" + aws_key_name), "-e", ("aws_access_key=" + aws_access_key), "-e", ("aws_secret_key=" + aws_secret_key), "-e", ("engraver_root=" + project_root)] raw = shlex.split(arg_vars.get('ansible') or "") post = [project_root + "/ansible/" + playbook] call(pre + raw + form_env_vars(extras) + post)
def stream_logs(arg_vars, project_root): config = ConfigParser.ConfigParser() engraver_profile = expanduser("~") + "/.engraver" config.read(engraver_profile) pem_file_path = config.get('aws', 'pem_file_name', 0) remote_user = config.get('aws', 'remote_user', 0) service = arg_vars['service'] container_name = service + "_container_name" f = util.service_path(project_root, service) if util.verify_cluster_exists(arg_vars, project_root): if exists(f): with open(f + "/defaults/main.yml", "r") as stream: content = yaml.load(stream) if content.get(container_name): container = content[container_name] call(["ssh", "-t", "-i", pem_file_path, remote_user + "@" + arg_vars['host'], "docker logs -f " + container]) else: base = "Service does not define {0}_container_name in defaults/main.yml of its Ansible role. Cannot stream logs." print_fail(base.format(service)) else: print_fail("Service not found.")
def main(): arguments = docopt(__doc__, version='BIC clustering') db_yml = expanduser(arguments['--database']) protocol_name = arguments['<database.task.protocol>'] subset = arguments['--subset'] if arguments['tune']: experiment_dir = arguments['<experiment_dir>'] if subset is None: subset = 'development' application = BICClustering(experiment_dir, db_yml=db_yml) application.tune(protocol_name, subset=subset) # if arguments['apply']: # tune_dir = arguments['<tune_dir>'] # if subset is None: # subset = 'test' # application = BICClustering.from_tune_dir( # tune_dir, db_yml=db_yml) # application.apply(protocol_name, subset=subset)
def show_dialog(self, importing=False): """ Displays FileChooserDialog with ePub file filters and returns Gtk.ResponseType and filename string :return (response, filename): """ dialog = Gtk.FileChooserDialog(_("Please choose a file"), self, Gtk.FileChooserAction.OPEN, (Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL, Gtk.STOCK_OPEN, Gtk.ResponseType.OK)) # Always start in user dir dialog.set_current_folder(path.expanduser("~")) # Add filters so only .epub files show # TODO: Filter list for all conversion supported ebooks self.__add_filters(dialog, importing) response = dialog.run() filename = dialog.get_filename() dialog.destroy() return response, filename
def main(): avds = get_emulators(avd_path, get_avds_in_dir) genymotion_vms = get_emulators(genymotion_path, get_genymotion_vms_in_dir) if verbose: print('creating .app files for %d Genymotion Virtual Machines' % len(genymotion_vms)) print('creating .app files for %d Android Virtual Devices' % len(avds)) rmtree("%s/Applications/Quick Emulators" % expanduser('~'), ignore_errors=True) emulator_path = get_emulator_path() if emulator_path: for avd in avds: create_app(avd, get_avd_launch_command(emulator_path, avd)) for vm in genymotion_vms: create_app(vm, get_genymotion_launch_command(vm), True)
def before_exit(): lines_of_code = process_history() if not PySession.save or len(lines_of_code) == 0: stdout.write(DO_NOTHING) return filename = expanduser(os.getenv('PYSESSION_FILENAME', 'session.py')) if PySession.save_locally: stdout.write(SAVING_FILE.format(filename=filename)) PySession.save_to_file('\n'.join(lines_of_code), filename) stdout.write(SUCCESS) return try: stdout.write(SAVING_GIST.format(filename=filename)) gist_response = PySession.save_to_gist('\n'.join(lines_of_code), filename) gist_url = gist_response['html_url'] PySession.save_gist_url(gist_url) webbrowser.open_new_tab(gist_url) stdout.write(SUCCESS) except: stdout.write(FAILED) PySession.save_to_file('\n'.join(lines_of_code), filename)
def __init__(self, *args, **kwargs): """Initialize custom class variables.""" super(pycuda, self).__init__(*args, **kwargs) self.sitecfg = None self.sitecfgfn = 'site.cfg' self.sitecfglibdir = None self.sitecfgincdir = None self.testinstall = False self.testcmd = None self.unpack_options = '' self.pylibdir = UNKNOWN # make sure there's no site.cfg in $HOME, because setup.py will find it and use it if os.path.exists(os.path.join(expanduser('~'), 'site.cfg')): raise EasyBuildError("Found site.cfg in your home directory (%s), please remove it.", expanduser('~')) if not 'modulename' in self.options: self.options['modulename'] = self.name.lower()
def mim_tcpdump(): while True: t = datetime.datetime.now() print 'Select one of the following interface to listen:\n' for i in get_interfaces(): print i tcp_face = raw_input('\033[1;32mYour Selection >> \033[1;m') if tcp_face == 'back': break else: cmd = 'tcpdump -ni {0} -w {1}/{2}.pcap >/dev/null 2>&1 &'\ .format(tcp_face, expanduser('~'), t.strftime("%s")) print '\033[1;31mRunning the following command: \033[1;m' print cmd print '-i (interface)' print "-n (don't convert addresses to names)" print '-w (write pcap file)' print 'sniffing...\n' print 'This will create a pcap file in your home dir.' os.system(cmd)
def __init__(self, config): """ Initializer :param config: configuration object """ self.config = config self.platform = platform.system().lower() self.ROOT = os.path.abspath(os.sep) if WINDOWS in self.platform: self.ROOT = "\\" if self.config[AUDIO][MUSIC_FOLDER]: self.USER_HOME = self.config[AUDIO][MUSIC_FOLDER] else: self.USER_HOME = expanduser("~") self.current_folder = self.config[FILE_PLAYBACK][CURRENT_FOLDER] or self.USER_HOME self.cre = compile(r'(\d+)') # compiled regular expression
def __init__(self, model, max_size, extra_dirs, output_dir=None, uniq=None, addons=None, addons_file=None, exclude=None, compression='xz'): if model: set_model(model) self.max_size = max_size self.extra_dirs = extra_dirs self.cwd = os.getcwd() self.tempdir = tempfile.mkdtemp(dir=expanduser('~')) os.chdir(self.tempdir) self.uniq = uniq or uuid.uuid4() self.output_dir = output_dir or '.' self.addons = addons self.addons_file = addons_file if exclude is None: exclude = tuple() self.exclude = exclude self.compression = compression
def _load_config(): """ Load the configuration file, if it exists :returns: dict """ config_dict = {_USER_KEY: None, _PASS_KEY: None, _URL_KEY: None} conf_file_path = path.expanduser(path.join('~', CONFIG_FILE_NAME)) if path.exists(conf_file_path): config_parser = ConfigParser() config_parser.read(conf_file_path) t_conf = config_parser['TRAW'] config_dict[_USER_KEY] = t_conf.get('username', None) config_dict[_PASS_KEY] = t_conf.get('user_api_key', None) or t_conf.get('password', None) config_dict[_URL_KEY] = t_conf.get('url', None) return config_dict
def _template_to_command(self, template, identifier = None): import re from os.path import expanduser cmd_str = getattr(self.device_type, "%s_template" % template) % { 'agent': self.device_type.agent, 'address': self.address, 'port': self.port, 'username': self.username, 'password': self.password, 'identifier': identifier, 'options': self.options, 'home': expanduser("~"), 'list_parameter': 'list-status' if platform_info.distro_version >= 7.0 else 'list' } return re.split(r'\s+', cmd_str)
def tmp_user_dir(request): original = settings.USER_DIR settings.USER_DIR = join(expanduser('~'), '.gobble.tmp') try: makedirs(settings.USER_DIR) except IOError: pass def switch_back(): settings.USER_DIR = original try: rmtree(settings.USER_DIR) except IOError: pass request.addfinalizer(switch_back) # User authentication and permission # -----------------------------------------------------------------------------
def apply_subscription_update(subscription_id, src_manifest_path, shards): user_subscriptions_folder = path.expanduser( '~/.local/share/com.endlessm.subscriptions/%s/' % (subscription_id, )) mkdir_p(user_subscriptions_folder) # now look at this manifest, that i just found with open(src_manifest_path, 'r') as f: manifest_obj = json.load(f) # Place the new shards into the zone... for src_shard_entry in shards: src_shard_path = src_shard_entry['cache_path'] dst_shard_filename = src_shard_entry['manifest_path'] dst_shard_path = path.join(user_subscriptions_folder, dst_shard_filename) if path.exists(dst_shard_path): # Skip existing shards... continue copyfile(src_shard_path, dst_shard_path) # Place the new manifest into the zone... new_manifest_path = path.join( user_subscriptions_folder, 'manifest.json.new') copyfile(src_manifest_path, new_manifest_path) # Let ekn's downloader apply updates itself.
def create_streaming_context(spark_context, config): """ Create a streaming context with a custom Streaming Listener that will log every event. :param spark_context: Spark context :type spark_context: pyspark.SparkContext :param config: dict :return: Returns a new streaming context from the given context. :rtype: pyspark.streaming.StreamingContext """ ssc = streaming.StreamingContext(spark_context, config[ "spark_config"]["streaming"]["batch_interval"]) ssc.addStreamingListener(DriverStreamingListener) directory = os_path.expanduser("~/checkpointing") logger.info("Checkpointing to `{}`".format(directory)) # Commented out to fix a crash occurring when # phase 1 is used. The reason of the crash is still unclear # but Spark complains about the SSC being transferred # to workers. # ssc.checkpoint(directory) return ssc
def save(self, cfg_file=None): """ Save's configuration back to disk. """ if cfg_file is None: cfg_file = self.cfg_file else: # acquire new configuration path cfg_file = abspath(expanduser(cfg_file)) try: with open(cfg_file, 'w') as fp: yaml.dump(self.cfg_data, fp, default_flow_style=False) except IOError, e: logger.debug('%s' % (str(e))) logger.error('Failed to write configuration file %s' % ( cfg_file, )) return False # Update central configuration self.cfg_file = cfg_file if hasattr(self, '__mask_re'): # Presumably something has changed if the user called save so we # destroy our cached mask to be safe del self.__mask_re return True
def get_creds(): # TODO: check permisions on file and fail if not set to 600 homeDir = expanduser("~") credsFile = ".kauth" credsFile = homeDir + "/" + credsFile if os.path.isfile(credsFile): with open(credsFile) as f: content = f.read() creds = json.loads(content) return(creds) else: pass
def load_config(): config_filenames = (realpath('config.json'), expanduser('~/.gimel/config.json')) for config_filename in config_filenames: name, content = _load_config(config_filename) if content: break return name, content
def generate_config(config_filename=None): if config_filename is None: config_filename = expanduser('~/.gimel/config.json') _create_file(config_filename) with open(config_filename, 'w') as config_file: config_file.write(_config_template()) return config_filename
def find_config(): """Look for **foremast.cfg** in config_locations. Raises: SystemExit: No configuration file found. Returns: ConfigParser: found configuration file """ config_locations = [ '/etc/foremast/foremast.cfg', expanduser('~/.foremast/foremast.cfg'), './.foremast/foremast.cfg', ] configurations = ConfigParser() cfg_file = configurations.read(config_locations) dynamic_config_file = '{path}/config.py'.format(path=getcwd()) if cfg_file: LOG.info('Loading static configuration file.') elif exists(dynamic_config_file): LOG.info('Loading dynamic configuration file.') load_dynamic_config(configurations) else: config_locations.append(dynamic_config_file) LOG.warning('No configuration found in the following locations:\n%s', '\n'.join(config_locations)) LOG.warning('Using defaults...') return configurations
def __expand(self, path): path = path.replace("${MECHANIC_ROOT_DIR}", self.mechanicRootDir) path = expanduser(path) path = expandvars(path) return path
def __init__(self): """ Initialize with defaut values """ self.game_config = { "window": { "width": 800, "height": 600, "ticks_per_second": 60, "resizeable": True, "exclusive_mouse": True, }, "controls": { "forward": "W", "backward": "S", "right": "D", "left": "A", "jump": "SPACE", "down": "LSHIFT", "fly": "TAB", }, "world": { "gravity": 20.0, "player_height": 2, "max_jump_height": 2.0, "terminal_velocity": 50, "walking_speed": 5, "flying_speed": 15, } } # Prepare acess to the configuration file home_directory = expanduser("~") self.configuration_file_path = home_directory + "/.pycraftconfig.json"
def readIPFSConfig(): ipfsConfigPath = expanduser("~") + '/.ipfs/config' if 'IPFS_PATH' in os.environ: ipfsConfigPath = os.environ.get('IPFS_PATH') + '/config' try: with open(ipfsConfigPath, 'r') as f: return json.load(f) except IOError: logError("IPFS config not found.") logError("Have you installed ipfs and run ipfs init?") sys.exit()
def writeIPFSConfig(jsonToWrite): ipfsConfigPath = expanduser("~") + '/.ipfs/config' if 'IPFS_PATH' in os.environ: ipfsConfigPath = os.environ.get('IPFS_PATH') + '/config' with open(ipfsConfigPath, 'w') as f: f.write(json.dumps(jsonToWrite, indent=4, sort_keys=True))