我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用pathlib.Path.cwd()。
def details(exercise_folder): soup = load_content(exercise_folder) print("### Exercise details") pprint(load_details(soup)) print() print("### Exercise assignment text") pprint(load_active_text(soup)) print() config = Config.load(Path.cwd() / "import-config.yml") api = ApiClient(config.api_url, config.api_token) tests = load_codex_test_config(Path(exercise_folder) / "testdata" / "config") files = defaultdict(lambda: "random-file-uuid") print("### Exercise files") for name, path in load_exercise_files(exercise_folder): print(f"{path} as {name}") files.get(name) # Make sure the keys are present in the exercise file map print("### Exercise configuration") pprint(make_exercise_config(config, soup, files, api.get_pipelines(), tests)) print()
def get_id(exercise_folder, config_path=None): config = Config.load(Path.cwd() / (config_path or "import-config.yml")) api = ApiClient(config.api_url, config.api_token) exercises = api.get_exercises() for folder in exercise_folder: found = False content_soup = load_content(folder) details = load_details(content_soup) for exercise in exercises: if exercise["name"] == details["name"]: print(folder, exercise["id"]) found = True break if not found: print(folder, "Nothing found")
def what(dir: Optional[str]) -> Optional[RepoSpec]: path = findRoot(dir) if path is None: d = Path(dir) if dir is not None else Path.cwd() raise RuntimeError(f"Not a got repository: {d.resolve()}") # If any clone has this exact path already, return it clone = Clone.load(path = str(path)) if clone is not None: return clone.repospec # If not, try resolving each path to find a match for clone in Clone.loadAll(): if clone.path.resolve() == path: return clone.repospec # Shouldn't be able to get here d = Path(dir) if dir is not None else Path.cwd() raise RuntimeError(f"Not a got repository: {d.resolve()}")
def test_deps_cwd(self): self.deps_helper() testRoot = Path.cwd() expected = { 'repo1': {'repo1', 'repo2', 'repo3', 'repo4'}, 'repo2': {'repo2', 'repo4'}, 'repo3': {'repo3'}, 'repo4': {'repo4'}, } for name, expectedDeps in expected.items(): with self.subTest(name = name): with chdir(name): with GotRun(['--deps']) as r: self.assertEqual(set(r.stdout.strip().split(os.linesep)), {str((testRoot / n).resolve()) for n in expectedDeps}) if len(expectedDeps) == 1: r.assertInStderr(f"host:{name} has no dependencies file")
def test_load_config_local(): """ Test `load_config()` with local configuration file. """ with TemporaryDirectory() as tmpdir: tmpdir = Path(tmpdir) # Change home directory for testing os.environ['MARKDOWNREVEAL_HOME'] = str(tmpdir) # Create local configuration file config_file = tmpdir / 'config.yaml' config_file.write_text('footer: "local footer"\n' 'header: "local header"\n' 'style: "https://other/style/file.tar.gz"') # Load configuration old = Path.cwd() os.chdir(str(tmpdir)) config = load_config() os.chdir(str(old)) assert config['local_path'] == tmpdir / '.markdownreveal' assert config['output_path'] == config['local_path'] / 'out' assert config['footer'] == 'local footer' assert config['header'] == 'local header' assert 'other/style' in config['style']
def media(filename): media_root = Path.cwd() / Path(current_app.config.get('MEDIA_ROOT')) return send_from_directory(media_root, filename)
def configure(app): app.jinja_env.add_extension('jinja2.ext.do') app.jinja_env.add_extension('jinja2.ext.i18n') OVERLOAD_ENABLED = app.theme_context.get('OVERLOAD_ENABLED', True) TEMPLATES = Path('templates') THEME_FOLDER = Path(app.theme_context.get('FOLDER', 'themes')) ACTIVE_NAME = app.theme_context.get('ACTIVE', 'default') THEME_ACTIVE = Path(ACTIVE_NAME) THEME_TEMPLATE_FOLDER = THEME_FOLDER / THEME_ACTIVE / TEMPLATES PREFIXED = Path(f'pelican-{ACTIVE_NAME}') PREFIXED_TEMPLATE_FOLDER = THEME_FOLDER / PREFIXED / TEMPLATES THEME_STATIC_FOLDER = THEME_FOLDER / THEME_ACTIVE / Path('static') ABS_THEME_STATIC_FOLDER = Path.cwd() / THEME_STATIC_FOLDER DEFAULT_PATH = Path(app.jinja_loader.searchpath[0]) OVERLOAD_FOLDER = DEFAULT_PATH / f'overload_{THEME_ACTIVE}' / TEMPLATES FOLDERS = [THEME_TEMPLATE_FOLDER, PREFIXED_TEMPLATE_FOLDER] if OVERLOAD_ENABLED: FOLDERS.insert(0, OVERLOAD_FOLDER) my_loader = jinja2.ChoiceLoader([ QuokkaTemplateLoader(FOLDERS), app.jinja_loader ]) app.jinja_loader = my_loader @app.route('/theme/<path:filename>') def theme_static(filename): return send_from_directory(ABS_THEME_STATIC_FOLDER, filename)
def get_main_dir(): main = sys.modules['__main__'] try: main_path = main.__file__ except AttributeError: return Path.cwd() else: return Path(main_path).resolve().parent
def current(): return Path.cwd()
def add_localization(language, exercise_id, config_path): config = Config.load(Path.cwd() / (config_path or "import-config.yml")) api = ApiClient(config.api_url, config.api_token) exercise = api.get_exercise(exercise_id) exercise["localizedTexts"].append({ "locale": language, "text": html2text(sys.stdin.read()) }) api.update_exercise(exercise_id, exercise)
def set_score_config(exercise_id, config_path, exercise_folder): config = Config.load(Path.cwd() / (config_path or "import-config.yml")) api = ApiClient(config.api_url, config.api_token) tests = load_codex_test_config(Path(exercise_folder) / "testdata" / "config") score_config = {test.name: int(test.points) for test in tests} api.set_exercise_score_config(exercise_id, yaml.dump({"testWeights": score_config}, default_flow_style=False))
def evaluate_all_rs(config_path): """ Request evaluation for all reference solutions """ config = Config.load(Path.cwd() / (config_path or "import-config.yml")) api = ApiClient(config.api_url, config.api_token) with click.progressbar(api.get_exercises()) as bar: for exercise in bar: try: api.evaluate_reference_solutions(exercise["id"]) except Exception as e: logging.error("Error in exercise {}: {}".format(exercise["id"], str(e)))
def init(self, event, requires_permission=False, script_path='alexa_skill_kit/script.yml'): self.event = event request = event['request'] session = event['session'] context = event['context'] perm = session['user']['permissions'] self.device_id = context['System']['device']['deviceId'] self.request_app_id = session['application']['applicationId'] self.user_id = session['user']['userId'] self.new_session = session['new'] self.token = perm['consentToken'] if 'consentToken' in perm else False self.request_id = request['requestId'] self.timestamp = request['timestamp'] self.request_type = request['type'] if 'intent' in request: self.intent_name = request['intent']['name'] self.slots = request['intent']['slots'] else: self.intent_name = False self.slots = False with Path.cwd().joinpath(script_path).open() as f: self.script = yaml.load(f) # print('script is ', self.script)
def get_transformation(location: str) -> Transformation: # TODO allow use of contributed transformations if ':' in location: module_path, transformation_name = location.split(':') else: module_path, transformation_name = location, None if not module_path.endswith('.py'): module_path += '.py' module_path = (Path.cwd() / module_path).resolve() dbg(f'Transformation module path: {module_path}') dbg('Loading module.') module_spec = importlib.util.spec_from_file_location('transformation_file', module_path) _module = importlib.util.module_from_spec(module_spec) module_spec.loader.exec_module(_module) dbg('Inspecting module.') transformation_objects = {k: v for k, v in vars(_module).items() if isinstance(v, Transformation)} if transformation_name is None: if 'main' in transformation_objects: dbg("Selected symbol 'main' from module as transformation.") return transformation_objects['main'] else: name, instance = next((k, v) for k, v in transformation_objects.items()) dbg(f"Selected symbol '{name}' from module as transformation.") return instance for instance in transformation_objects.values(): if instance.name == transformation_name: dbg(f"Selected transformation named '{transformation_name}' from module " f"as transformation.") return instance dbg(f"Selected symbol '{transformation_name}' from module as transformation.") return transformation_objects[transformation_name]
def load_conf(*filenames, **kwargs): sd = kwargs.pop('search_dirs', None) c = Config(search_dirs=sd or [Path.cwd()]) conf = MergeDict({ 'http.port': 0, 'http.host': '0.0.0.0', **kwargs, }) conf(c.load(*filenames)) return conf
def findRoot(dir: Optional[str]) -> Optional[Path]: # This could theoretically be one query that ORs together a bunch of paths path = (Path(dir) if dir is not None else Path.cwd()).resolve() clone = Clone.tryLoad(path = str(path)) if clone: return clone.path for path in path.parents: clone = Clone.tryLoad(path = str(path)) if clone: return clone.path
def run(repos: Iterable[Iterable[RepoSpec]], cmd: List[str], bg: bool, ignore_errors: bool): env = dict(os.environ) procs = [] for set in repos: for repo in set: clone: Clone = where(repo, 'py', 'clone') print(str(clone.repospec), file = sys.stderr) env['GOT_REPOSPEC'] = str(clone.repospec) if platform.system() == 'Windows': # Passing a whole command as a single string inside a list won't work on Windows, e.g. -x 'foo bar baz'. Turn it into a raw string instead shell = True if len(cmd) == 1: cmd = cmd[0] else: shell = (len(cmd) == 1) proc = subprocess.Popen(cmd, cwd = str(clone.path), shell = shell, env = env) procs.append(proc) if not bg: proc.wait() if not ignore_errors and proc.returncode != 0: raise RuntimeError(f"Failed on {repo}: exit code {proc.returncode}") print() # Wait for every process to exit. Then exit with the number of processes that failed exit(sum(proc.wait() != 0 for proc in procs))
def makeEnvironment(self): env = os.environ.copy() # The got root is the test case directory under runDir, but we might be in a subdirectory root = Path.cwd() if root.parent != runDir: for root in root.parents: if root.parent == runDir: break else: raise RuntimeError(f"Current directory {Path.cwd().resolve()} is not within a test case rundir") env['GOT_ROOT'] = str(root) return env
def test_find_root_cwd(self): self.deps_helper() with chdir('repo1'): with GotRun(['--find-root']) as r: expected = Path.cwd().resolve() actual = Path(r.stdout.strip()) self.assertEqual(expected, actual)
def test_find_root_bad_dir(self): with GotRun(['--find-root']) as r: r.assertFails() r.assertInStderr(f"{Path.cwd().resolve()} is not within a got repository")
def chdir(path): old = Path.cwd() os.chdir(path) try: yield finally: os.chdir(old)
def tmp(): cwd = Path.cwd() dirpath = Path("tmp", "int", "cli").resolve() dirpath.mkdir(parents=True, exist_ok=True) os.chdir(dirpath) yield dirpath os.chdir(cwd)
def describe_do_run(): @pytest.yield_fixture def config(tmpdir): cwd = os.getcwd() tmpdir.chdir() with Path(".env").open('w') as f: f.write("FOO=1") with Path("app.py").open('w') as f: f.write("os.getenv('FOO', 2)") yield Config.new( sourcefiles=[ SourceFile(".env"), SourceFile("app.py"), ], environments=[ Environment("test", command="echo FOO=3"), ], ) os.chdir(cwd) def it_returns_table_data(runner, config): print(config.sourcefiles) data = do_run(config) expect(list(data)) == [ ['Variable', 'File: .env', 'File: app.py', 'Environment: test'], ['FOO', 'FOO=1', "os.getenv('FOO', 2)", '3'], ]
def do_report(rows): path = Path.cwd().joinpath("env-diff.md") utils.write_markdown(rows, path) click.echo(green("Created Markdown report: ") + white(f"{path}", bold=True), err=True) path = Path.cwd().joinpath("env-diff.csv") utils.write_csv(rows, path) click.echo(green("Created CSV report: ") + white(f"{path}", bold=True), err=True)
def __init__(self, filename="env-diff.yml", root=None, sourcefiles=None, environments=None): self.root = root or Path.cwd() self.filename = filename self.sourcefiles = sourcefiles or [] self.environments = environments or []
def check_config(cfg): """Resolve root in config file, then validate paths.""" if 'root' in cfg['all']: root = verify(cfg['all']['root']) else: root = Path.cwd() # Iteratively check paths for each subsection new_cfg = dict() for section, values in cfg.items(): new_cfg[section] = validate_paths(values, root) return new_cfg
def __init__(self, name, log_dic=None): if log_dic: self._log_dic = Path(log_dic) else: self._log_dic = Path.cwd() self._log_dic = self._log_dic / ".wolo" self._log_path = self._log_dic / ".{}".format(name) self._log = None self._flattened = None
def __init__(self): path = config.get('eop', 'folder', fallback=Path.cwd()) # Data reading f = Finals(path / 'finals.all') f2 = Finals2000A(path / 'finals2000A.all') t = TaiUtc(path / "tai-utc.dat") # Extracting data from finals files self._finals = {} for date, values in f.items(): self._finals[date] = values self._finals[date].update(f2[date]) self._tai_utc = t.data.copy()
def main(): source_dir = Path.cwd().parent / "src" temp_dir = Path.cwd().parent / "temp" dest_dir = Path.cwd().parent / "Markdown" def source_file(f): return source_dir / (f + ".rst") def pre_converted(f): return temp_dir / (f + ".rst") def converted(f): return temp_dir / (f + ".md") def destination(f, n): return dest_dir / ("%02d_" % n + f + ".md") assert all([source_file(f).exists() for f in source_order]) all_files = [ (source_file(f), pre_converted(f), converted(f), destination(f, n), n) for n, f in enumerate(source_order) ] create_clean_dir(temp_dir) create_clean_dir(dest_dir) for rst, pre, conv, md, n in all_files: print(f"{rst.name} -> {md.name}:") pre_convert(rst, pre) os.system(f"pandoc {pre} -o {conv}") post_convert(conv, md) if n > -1: os.system(f"subl {md}")
def pdf(url: str): """ Generate a PDF file with the presentation. """ try: code = request.urlopen(url).getcode() except URLError: raise ValueError('Invalid URL provided!') if code != 200: raise ValueError('Unexpected server response!') with TemporaryDirectory() as tmpdir: name = 'slides.pdf' tmpdir = Path(tmpdir) command = 'docker run --user={uid}:{gid} --rm --net="host" ' + \ '-v {tmp}:{tmp}:Z -w {tmp} astefanutti/decktape ' + \ '{url} {name}' command = command.format( uid=os.getuid(), gid=os.getgid(), tmp=tmpdir, url=url, name=name, ) run(shlex.split(command)) move(str(tmpdir/name), str(Path.cwd()))
def test_load_config_local_and_style(): """ Test `load_config()` with local and style configuration files. Local file should always override style, which should override template. """ with TemporaryDirectory() as tmpdir: tmpdir = Path(tmpdir) # Change home directory for testing os.environ['MARKDOWNREVEAL_HOME'] = str(tmpdir) # Create local configuration file config_file = tmpdir / 'config.yaml' config_file.write_text('footer: "local footer"') # Create style configuration file style_path = tmpdir / '.markdownreveal' / 'out' / 'markdownrevealstyle' style_path.mkdir(parents=True) config_file = style_path / 'config.yaml' config_file.write_text('footer: "style footer"\n' 'header: "style header"') # Load configuration old = Path.cwd() os.chdir(str(tmpdir)) config = load_config() os.chdir(str(old)) assert config['local_path'] == tmpdir / '.markdownreveal' assert config['output_path'] == config['local_path'] / 'out' assert config['footer'] == 'local footer' assert config['header'] == 'style header' assert 'markdownreveal/style-default' in config['style']
def share_notebook(request, course, student, notebook): """ the URL to create static snapshots; it is intended to be fetched through ajax * computes a hash for storing the output * runs nbconvert in the student's container * stores the result in /nbhosting/snapshots/<course>/<hash>.html * returns a JSON-encoded dict that is either * { url: "/snapshots/flotpython/5465789765789.html" } * or { error: "the error message" } """ # the ipynb extension is removed from the notebook name in urls.py notebook_withext = notebook + ".ipynb" # compute hash from the input, so that a second run on the same notebook # will override any previsouly published static snapshot hasher = hashlib.sha1(bytes('{}-{}-{}'.format(course, student, notebook), encoding='utf-8')) hash = hasher.hexdigest() subcommand = 'docker-share-student-course-notebook-in-hash' command = ['nbh', '-d', sitesettings.root] if DEBUG: command.append('-x') command.append(subcommand) command += [ student, course, notebook_withext, hash] logger.info("In {}\n-> Running command {}".format(Path.cwd(), " ".join(command))) completed_process = subprocess.run( command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) log_completed_process(completed_process, subcommand) if completed_process.returncode != 0: message = "command {} returned {}\nstderr:{}"\ .format(" ".join(command), completed_process.returncode, completed_process.stderr) return JsonResponse(dict(error=message)) # expect docker-share-student-course-notebook to write a url_path on its stdout url_path = completed_process.stdout.strip() logger.info("reading url_path={}".format(url_path)) # rebuild a full URL with proto and hostname, url = "{scheme}://{hostname}{path}"\ .format(scheme=request.scheme, hostname=request.get_host(), path=url_path) return JsonResponse(dict(url_path=url_path, url=url))
def setup(args=None): parser = argparse.ArgumentParser() parser.add_argument("-l", "--log-level", default=logging.INFO) parser.set_defaults(func=lambda options: parser.print_help()) parsers = parser.add_subparsers() layer = parsers.add_parser("layer", help=layer_main.__doc__.split("\n", 1)[0]) layer.add_argument("--layer-endpoint", help="API endpoint for metadata", default="http://layer-cake.io") layer.add_argument("-d", "--directory", default=Path.cwd()) layer.add_argument("-f", "--force", action="store_true", help=("Force overwrite of existing layers " "in directory (-d)")) layer.add_argument("-n", "--no-install", action="store_true", help=("when set exit after pulling layers, " "and before the install phase")) layer.add_argument( "layer", nargs="+", help=("The name of the layer to include, if more " "than one is provided they will be included in order")) layer.set_defaults(func=layer_main) baker = parsers.add_parser("bake", help=bake_main.__doc__.split("\n", 1)[0]) baker.add_argument("-d", "--dockerfile", help="Dockerfile to process", ) baker.add_argument("--layer-endpoint", help="API endpoint for metadata", default="http://layer-cake.io") baker.add_argument("-n", "--no-build", action="store_true", help="Don't build Dockerfile") baker.add_argument("--use-devel", action="store_true") baker.add_argument("config", nargs="?", default="cake.conf") baker.set_defaults(func=bake_main) search = parsers.add_parser("search") search.add_argument("--layer-endpoint", help="API endpoint for metadata", default="http://layer-cake.io") search.add_argument("-f", "--format", default="text", help="Options text|json|yaml") search.add_argument("term", nargs="+") search.set_defaults(func=search_main) options = parser.parse_args(args) return options