Python pathlib.Path 模块,cwd() 实例源码

我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用pathlib.Path.cwd()

项目:utils    作者:ReCodEx    | 项目源码 | 文件源码
def details(exercise_folder):
    soup = load_content(exercise_folder)

    print("### Exercise details")
    pprint(load_details(soup))
    print()

    print("### Exercise assignment text")
    pprint(load_active_text(soup))
    print()

    config = Config.load(Path.cwd() / "import-config.yml")
    api = ApiClient(config.api_url, config.api_token)
    tests = load_codex_test_config(Path(exercise_folder) / "testdata" / "config")
    files = defaultdict(lambda: "random-file-uuid")

    print("### Exercise files")
    for name, path in load_exercise_files(exercise_folder):
        print(f"{path} as {name}")
        files.get(name) # Make sure the keys are present in the exercise file map

    print("### Exercise configuration")
    pprint(make_exercise_config(config, soup, files, api.get_pipelines(), tests))
    print()
项目:utils    作者:ReCodEx    | 项目源码 | 文件源码
def get_id(exercise_folder, config_path=None):
    config = Config.load(Path.cwd() / (config_path or "import-config.yml"))
    api = ApiClient(config.api_url, config.api_token)
    exercises = api.get_exercises()

    for folder in exercise_folder:
        found = False
        content_soup = load_content(folder)
        details = load_details(content_soup)
        for exercise in exercises:
            if exercise["name"] == details["name"]:
                print(folder, exercise["id"])
                found = True
                break

        if not found:
            print(folder, "Nothing found")
项目:got    作者:mrozekma    | 项目源码 | 文件源码
def what(dir: Optional[str]) -> Optional[RepoSpec]:
    path = findRoot(dir)
    if path is None:
        d = Path(dir) if dir is not None else Path.cwd()
        raise RuntimeError(f"Not a got repository: {d.resolve()}")

    # If any clone has this exact path already, return it
    clone = Clone.load(path = str(path))
    if clone is not None:
        return clone.repospec

    # If not, try resolving each path to find a match
    for clone in Clone.loadAll():
        if clone.path.resolve() == path:
            return clone.repospec

    # Shouldn't be able to get here
    d = Path(dir) if dir is not None else Path.cwd()
    raise RuntimeError(f"Not a got repository: {d.resolve()}")
项目:got    作者:mrozekma    | 项目源码 | 文件源码
def test_deps_cwd(self):
        self.deps_helper()
        testRoot = Path.cwd()
        expected = {
            'repo1': {'repo1', 'repo2', 'repo3', 'repo4'},
            'repo2': {'repo2', 'repo4'},
            'repo3': {'repo3'},
            'repo4': {'repo4'},
        }
        for name, expectedDeps in expected.items():
            with self.subTest(name = name):
                with chdir(name):
                    with GotRun(['--deps']) as r:
                        self.assertEqual(set(r.stdout.strip().split(os.linesep)), {str((testRoot / n).resolve()) for n in expectedDeps})
                        if len(expectedDeps) == 1:
                            r.assertInStderr(f"host:{name} has no dependencies file")
项目:markdownreveal    作者:markdownreveal    | 项目源码 | 文件源码
def test_load_config_local():
    """
    Test `load_config()` with local configuration file.
    """
    with TemporaryDirectory() as tmpdir:
        tmpdir = Path(tmpdir)
        # Change home directory for testing
        os.environ['MARKDOWNREVEAL_HOME'] = str(tmpdir)
        # Create local configuration file
        config_file = tmpdir / 'config.yaml'
        config_file.write_text('footer: "local footer"\n'
                               'header: "local header"\n'
                               'style: "https://other/style/file.tar.gz"')
        # Load configuration
        old = Path.cwd()
        os.chdir(str(tmpdir))
        config = load_config()
        os.chdir(str(old))

    assert config['local_path'] == tmpdir / '.markdownreveal'
    assert config['output_path'] == config['local_path'] / 'out'
    assert config['footer'] == 'local footer'
    assert config['header'] == 'local header'
    assert 'other/style' in config['style']
项目:quokka_ng    作者:rochacbruno    | 项目源码 | 文件源码
def media(filename):
    media_root = Path.cwd() / Path(current_app.config.get('MEDIA_ROOT'))
    return send_from_directory(media_root, filename)
项目:quokka_ng    作者:rochacbruno    | 项目源码 | 文件源码
def configure(app):
    app.jinja_env.add_extension('jinja2.ext.do')
    app.jinja_env.add_extension('jinja2.ext.i18n')

    OVERLOAD_ENABLED = app.theme_context.get('OVERLOAD_ENABLED', True)
    TEMPLATES = Path('templates')
    THEME_FOLDER = Path(app.theme_context.get('FOLDER', 'themes'))
    ACTIVE_NAME = app.theme_context.get('ACTIVE', 'default')
    THEME_ACTIVE = Path(ACTIVE_NAME)
    THEME_TEMPLATE_FOLDER = THEME_FOLDER / THEME_ACTIVE / TEMPLATES
    PREFIXED = Path(f'pelican-{ACTIVE_NAME}')
    PREFIXED_TEMPLATE_FOLDER = THEME_FOLDER / PREFIXED / TEMPLATES
    THEME_STATIC_FOLDER = THEME_FOLDER / THEME_ACTIVE / Path('static')
    ABS_THEME_STATIC_FOLDER = Path.cwd() / THEME_STATIC_FOLDER
    DEFAULT_PATH = Path(app.jinja_loader.searchpath[0])
    OVERLOAD_FOLDER = DEFAULT_PATH / f'overload_{THEME_ACTIVE}' / TEMPLATES

    FOLDERS = [THEME_TEMPLATE_FOLDER, PREFIXED_TEMPLATE_FOLDER]
    if OVERLOAD_ENABLED:
        FOLDERS.insert(0, OVERLOAD_FOLDER)

    my_loader = jinja2.ChoiceLoader([
        QuokkaTemplateLoader(FOLDERS),
        app.jinja_loader
    ])
    app.jinja_loader = my_loader

    @app.route('/theme/<path:filename>')
    def theme_static(filename):
        return send_from_directory(ABS_THEME_STATIC_FOLDER, filename)
项目:phredutils    作者:doctaphred    | 项目源码 | 文件源码
def get_main_dir():
    main = sys.modules['__main__']
    try:
        main_path = main.__file__
    except AttributeError:
        return Path.cwd()
    else:
        return Path(main_path).resolve().parent
项目:utils    作者:rapydo    | 项目源码 | 文件源码
def current():
    return Path.cwd()
项目:utils    作者:ReCodEx    | 项目源码 | 文件源码
def add_localization(language, exercise_id, config_path):
    config = Config.load(Path.cwd() / (config_path or "import-config.yml"))
    api = ApiClient(config.api_url, config.api_token)

    exercise = api.get_exercise(exercise_id)
    exercise["localizedTexts"].append({
        "locale": language,
        "text": html2text(sys.stdin.read())
    })

    api.update_exercise(exercise_id, exercise)
项目:utils    作者:ReCodEx    | 项目源码 | 文件源码
def set_score_config(exercise_id, config_path, exercise_folder):
    config = Config.load(Path.cwd() / (config_path or "import-config.yml"))
    api = ApiClient(config.api_url, config.api_token)
    tests = load_codex_test_config(Path(exercise_folder) / "testdata" / "config")

    score_config = {test.name: int(test.points) for test in tests}
    api.set_exercise_score_config(exercise_id, yaml.dump({"testWeights": score_config}, default_flow_style=False))
项目:utils    作者:ReCodEx    | 项目源码 | 文件源码
def evaluate_all_rs(config_path):
    """
    Request evaluation for all reference solutions
    """
    config = Config.load(Path.cwd() / (config_path or "import-config.yml"))
    api = ApiClient(config.api_url, config.api_token)

    with click.progressbar(api.get_exercises()) as bar:
        for exercise in bar:
            try:
                api.evaluate_reference_solutions(exercise["id"])
            except Exception as e:
                logging.error("Error in exercise {}: {}".format(exercise["id"], str(e)))
项目:alexa-skill-kit    作者:KNNCreative    | 项目源码 | 文件源码
def init(self, event, requires_permission=False, script_path='alexa_skill_kit/script.yml'):
        self.event = event
        request = event['request']
        session = event['session']
        context = event['context']
        perm = session['user']['permissions']

        self.device_id = context['System']['device']['deviceId']

        self.request_app_id = session['application']['applicationId']
        self.user_id = session['user']['userId']
        self.new_session = session['new']
        self.token = perm['consentToken'] if 'consentToken' in perm else False

        self.request_id = request['requestId']
        self.timestamp = request['timestamp']
        self.request_type = request['type']

        if 'intent' in request:
            self.intent_name = request['intent']['name']
            self.slots = request['intent']['slots']
        else:
            self.intent_name = False
            self.slots = False

        with Path.cwd().joinpath(script_path).open() as f:
            self.script = yaml.load(f)
            # print('script is ', self.script)
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def get_transformation(location: str) -> Transformation:
    # TODO allow use of contributed transformations
    if ':' in location:
        module_path, transformation_name = location.split(':')
    else:
        module_path, transformation_name = location, None

    if not module_path.endswith('.py'):
        module_path += '.py'
    module_path = (Path.cwd() / module_path).resolve()
    dbg(f'Transformation module path: {module_path}')

    dbg('Loading module.')
    module_spec = importlib.util.spec_from_file_location('transformation_file', module_path)
    _module = importlib.util.module_from_spec(module_spec)
    module_spec.loader.exec_module(_module)

    dbg('Inspecting module.')
    transformation_objects = {k: v for k, v in vars(_module).items()
                              if isinstance(v, Transformation)}

    if transformation_name is None:
        if 'main' in transformation_objects:
            dbg("Selected symbol 'main' from module as transformation.")
            return transformation_objects['main']
        else:
            name, instance = next((k, v) for k, v in transformation_objects.items())
            dbg(f"Selected symbol '{name}' from module as transformation.")
            return instance

    for instance in transformation_objects.values():
        if instance.name == transformation_name:
            dbg(f"Selected transformation named '{transformation_name}' from module "
                f"as transformation.")
            return instance

    dbg(f"Selected symbol '{transformation_name}' from module as transformation.")
    return transformation_objects[transformation_name]
项目:aioworkers    作者:aioworkers    | 项目源码 | 文件源码
def load_conf(*filenames, **kwargs):
    sd = kwargs.pop('search_dirs', None)
    c = Config(search_dirs=sd or [Path.cwd()])
    conf = MergeDict({
        'http.port': 0,
        'http.host': '0.0.0.0',
        **kwargs,
    })
    conf(c.load(*filenames))
    return conf
项目:got    作者:mrozekma    | 项目源码 | 文件源码
def findRoot(dir: Optional[str]) -> Optional[Path]:
    # This could theoretically be one query that ORs together a bunch of paths
    path = (Path(dir) if dir is not None else Path.cwd()).resolve()
    clone = Clone.tryLoad(path = str(path))
    if clone:
        return clone.path
    for path in path.parents:
        clone = Clone.tryLoad(path = str(path))
        if clone:
            return clone.path
项目:got    作者:mrozekma    | 项目源码 | 文件源码
def run(repos: Iterable[Iterable[RepoSpec]], cmd: List[str], bg: bool, ignore_errors: bool):
    env = dict(os.environ)
    procs = []
    for set in repos:
        for repo in set:
            clone: Clone = where(repo, 'py', 'clone')
            print(str(clone.repospec), file = sys.stderr)
            env['GOT_REPOSPEC'] = str(clone.repospec)

            if platform.system() == 'Windows':
                # Passing a whole command as a single string inside a list won't work on Windows, e.g. -x 'foo bar baz'. Turn it into a raw string instead
                shell = True
                if len(cmd) == 1:
                    cmd = cmd[0]
            else:
                shell = (len(cmd) == 1)

            proc = subprocess.Popen(cmd, cwd = str(clone.path), shell = shell, env = env)
            procs.append(proc)
            if not bg:
                proc.wait()
                if not ignore_errors and proc.returncode != 0:
                    raise RuntimeError(f"Failed on {repo}: exit code {proc.returncode}")
                print()
    # Wait for every process to exit. Then exit with the number of processes that failed
    exit(sum(proc.wait() != 0 for proc in procs))
项目:got    作者:mrozekma    | 项目源码 | 文件源码
def makeEnvironment(self):
        env = os.environ.copy()
        # The got root is the test case directory under runDir, but we might be in a subdirectory
        root = Path.cwd()
        if root.parent != runDir:
            for root in root.parents:
                if root.parent == runDir:
                    break
            else:
                raise RuntimeError(f"Current directory {Path.cwd().resolve()} is not within a test case rundir")
        env['GOT_ROOT'] = str(root)
        return env
项目:got    作者:mrozekma    | 项目源码 | 文件源码
def test_find_root_cwd(self):
        self.deps_helper()
        with chdir('repo1'):
            with GotRun(['--find-root']) as r:
                expected = Path.cwd().resolve()
                actual = Path(r.stdout.strip())
                self.assertEqual(expected, actual)
项目:got    作者:mrozekma    | 项目源码 | 文件源码
def test_find_root_bad_dir(self):
        with GotRun(['--find-root']) as r:
            r.assertFails()
            r.assertInStderr(f"{Path.cwd().resolve()} is not within a got repository")
项目:got    作者:mrozekma    | 项目源码 | 文件源码
def chdir(path):
    old = Path.cwd()
    os.chdir(path)
    try:
        yield
    finally:
        os.chdir(old)
项目:env-diff    作者:jacebrowning    | 项目源码 | 文件源码
def tmp():
    cwd = Path.cwd()
    dirpath = Path("tmp", "int", "cli").resolve()
    dirpath.mkdir(parents=True, exist_ok=True)
    os.chdir(dirpath)
    yield dirpath
    os.chdir(cwd)
项目:env-diff    作者:jacebrowning    | 项目源码 | 文件源码
def describe_do_run():

    @pytest.yield_fixture
    def config(tmpdir):
        cwd = os.getcwd()
        tmpdir.chdir()

        with Path(".env").open('w') as f:
            f.write("FOO=1")

        with Path("app.py").open('w') as f:
            f.write("os.getenv('FOO', 2)")

        yield Config.new(
            sourcefiles=[
                SourceFile(".env"),
                SourceFile("app.py"),
            ],
            environments=[
                Environment("test", command="echo FOO=3"),
            ],
        )

        os.chdir(cwd)

    def it_returns_table_data(runner, config):
        print(config.sourcefiles)
        data = do_run(config)

        expect(list(data)) == [
            ['Variable', 'File: .env', 'File: app.py', 'Environment: test'],
            ['FOO', 'FOO=1', "os.getenv('FOO', 2)", '3'],
        ]
项目:env-diff    作者:jacebrowning    | 项目源码 | 文件源码
def do_report(rows):
    path = Path.cwd().joinpath("env-diff.md")
    utils.write_markdown(rows, path)
    click.echo(green("Created Markdown report: ") +
               white(f"{path}", bold=True), err=True)

    path = Path.cwd().joinpath("env-diff.csv")
    utils.write_csv(rows, path)
    click.echo(green("Created CSV report: ") +
               white(f"{path}", bold=True), err=True)
项目:env-diff    作者:jacebrowning    | 项目源码 | 文件源码
def __init__(self, filename="env-diff.yml", root=None,
                 sourcefiles=None, environments=None):
        self.root = root or Path.cwd()
        self.filename = filename
        self.sourcefiles = sourcefiles or []
        self.environments = environments or []
项目:sunbeam    作者:eclarke    | 项目源码 | 文件源码
def check_config(cfg):
    """Resolve root in config file, then validate paths."""
    if 'root' in cfg['all']:
        root = verify(cfg['all']['root'])
    else:
        root = Path.cwd()
    # Iteratively check paths for each subsection
    new_cfg = dict()
    for section, values in cfg.items():
        new_cfg[section] = validate_paths(values, root)
    return new_cfg
项目:python-WoLo    作者:AKuederle    | 项目源码 | 文件源码
def __init__(self, name, log_dic=None):
        if log_dic:
            self._log_dic = Path(log_dic)
        else:
            self._log_dic = Path.cwd()
        self._log_dic = self._log_dic / ".wolo"
        self._log_path = self._log_dic / ".{}".format(name)
        self._log = None
        self._flattened = None
项目:beyond    作者:galactics    | 项目源码 | 文件源码
def __init__(self):
        path = config.get('eop', 'folder', fallback=Path.cwd())

        # Data reading
        f = Finals(path / 'finals.all')
        f2 = Finals2000A(path / 'finals2000A.all')
        t = TaiUtc(path / "tai-utc.dat")

        # Extracting data from finals files
        self._finals = {}
        for date, values in f.items():
            self._finals[date] = values
            self._finals[date].update(f2[date])

        self._tai_utc = t.data.copy()
项目:ThinkingInPython    作者:BruceEckel    | 项目源码 | 文件源码
def main():
    source_dir = Path.cwd().parent / "src"
    temp_dir = Path.cwd().parent / "temp"
    dest_dir = Path.cwd().parent / "Markdown"

    def source_file(f): return source_dir / (f + ".rst")

    def pre_converted(f): return temp_dir / (f + ".rst")

    def converted(f): return temp_dir / (f + ".md")

    def destination(f, n): return dest_dir / ("%02d_" % n + f + ".md")
    assert all([source_file(f).exists() for f in source_order])
    all_files = [
        (source_file(f), pre_converted(f), converted(f), destination(f, n), n)
        for n, f in enumerate(source_order)
    ]

    create_clean_dir(temp_dir)
    create_clean_dir(dest_dir)
    for rst, pre, conv, md, n in all_files:
        print(f"{rst.name} -> {md.name}:")
        pre_convert(rst, pre)
        os.system(f"pandoc {pre} -o {conv}")
        post_convert(conv, md)
        if n > -1:
            os.system(f"subl {md}")
项目:markdownreveal    作者:markdownreveal    | 项目源码 | 文件源码
def pdf(url: str):
    """
    Generate a PDF file with the presentation.
    """
    try:
        code = request.urlopen(url).getcode()
    except URLError:
        raise ValueError('Invalid URL provided!')

    if code != 200:
        raise ValueError('Unexpected server response!')

    with TemporaryDirectory() as tmpdir:
        name = 'slides.pdf'
        tmpdir = Path(tmpdir)
        command = 'docker run --user={uid}:{gid} --rm --net="host" ' + \
                  '-v {tmp}:{tmp}:Z -w {tmp} astefanutti/decktape ' + \
                  '{url} {name}'
        command = command.format(
            uid=os.getuid(),
            gid=os.getgid(),
            tmp=tmpdir,
            url=url,
            name=name,
        )
        run(shlex.split(command))
        move(str(tmpdir/name), str(Path.cwd()))
项目:markdownreveal    作者:markdownreveal    | 项目源码 | 文件源码
def test_load_config_local_and_style():
    """
    Test `load_config()` with local and style configuration files. Local file
    should always override style, which should override template.
    """
    with TemporaryDirectory() as tmpdir:
        tmpdir = Path(tmpdir)
        # Change home directory for testing
        os.environ['MARKDOWNREVEAL_HOME'] = str(tmpdir)
        # Create local configuration file
        config_file = tmpdir / 'config.yaml'
        config_file.write_text('footer: "local footer"')
        # Create style configuration file
        style_path = tmpdir / '.markdownreveal' / 'out' / 'markdownrevealstyle'
        style_path.mkdir(parents=True)
        config_file = style_path / 'config.yaml'
        config_file.write_text('footer: "style footer"\n'
                               'header: "style header"')
        # Load configuration
        old = Path.cwd()
        os.chdir(str(tmpdir))
        config = load_config()
        os.chdir(str(old))

    assert config['local_path'] == tmpdir / '.markdownreveal'
    assert config['output_path'] == config['local_path'] / 'out'
    assert config['footer'] == 'local footer'
    assert config['header'] == 'style header'
    assert 'markdownreveal/style-default' in config['style']
项目:nbhosting    作者:parmentelat    | 项目源码 | 文件源码
def share_notebook(request, course, student, notebook):
    """
    the URL to create static snapshots; it is intended to be fetched through ajax

    * computes a hash for storing the output
    * runs nbconvert in the student's container
    * stores the result in /nbhosting/snapshots/<course>/<hash>.html
    * returns a JSON-encoded dict that is either
      * { url: "/snapshots/flotpython/5465789765789.html" }
      * or { error: "the error message" }
    """

    # the ipynb extension is removed from the notebook name in urls.py
    notebook_withext = notebook + ".ipynb"
    # compute hash from the input, so that a second run on the same notebook
    # will override any previsouly published static snapshot
    hasher = hashlib.sha1(bytes('{}-{}-{}'.format(course, student, notebook),
                                encoding='utf-8'))
    hash = hasher.hexdigest()

    subcommand = 'docker-share-student-course-notebook-in-hash'

    command = ['nbh', '-d', sitesettings.root]
    if DEBUG:
        command.append('-x')
    command.append(subcommand)

    command += [ student, course, notebook_withext, hash]

    logger.info("In {}\n-> Running command {}".format(Path.cwd(), " ".join(command)))
    completed_process = subprocess.run(
        command, universal_newlines=True,
        stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    log_completed_process(completed_process, subcommand)

    if completed_process.returncode != 0:
        message = "command {} returned {}\nstderr:{}"\
                  .format(" ".join(command),
                          completed_process.returncode,
                          completed_process.stderr)
        return JsonResponse(dict(error=message))

    # expect docker-share-student-course-notebook to write a url_path on its stdout
    url_path = completed_process.stdout.strip()
    logger.info("reading url_path={}".format(url_path))
    # rebuild a full URL with proto and hostname,
    url = "{scheme}://{hostname}{path}"\
          .format(scheme=request.scheme, hostname=request.get_host(), path=url_path)
    return JsonResponse(dict(url_path=url_path, url=url))
项目:layercake    作者:bcsaller    | 项目源码 | 文件源码
def setup(args=None):
    parser = argparse.ArgumentParser()
    parser.add_argument("-l", "--log-level", default=logging.INFO)
    parser.set_defaults(func=lambda options: parser.print_help())

    parsers = parser.add_subparsers()
    layer = parsers.add_parser("layer", help=layer_main.__doc__.split("\n", 1)[0])
    layer.add_argument("--layer-endpoint",
            help="API endpoint for metadata",
            default="http://layer-cake.io")
    layer.add_argument("-d", "--directory", default=Path.cwd())
    layer.add_argument("-f", "--force", action="store_true",
                        help=("Force overwrite of existing layers "
                              "in directory (-d)"))
    layer.add_argument("-n", "--no-install", action="store_true",
                        help=("when set exit after pulling layers, "
                              "and before the install phase"))

    layer.add_argument(
            "layer",
            nargs="+",
            help=("The name of the layer to include, if more "
                  "than one is provided they will be included in order"))
    layer.set_defaults(func=layer_main)

    baker = parsers.add_parser("bake", help=bake_main.__doc__.split("\n", 1)[0])
    baker.add_argument("-d", "--dockerfile",
                       help="Dockerfile to process",
                       )
    baker.add_argument("--layer-endpoint",
            help="API endpoint for metadata",
            default="http://layer-cake.io")
    baker.add_argument("-n", "--no-build", action="store_true",
                       help="Don't build Dockerfile")
    baker.add_argument("--use-devel", action="store_true")
    baker.add_argument("config",
                       nargs="?",
                       default="cake.conf")
    baker.set_defaults(func=bake_main)

    search = parsers.add_parser("search")
    search.add_argument("--layer-endpoint",
            help="API endpoint for metadata",
            default="http://layer-cake.io")
    search.add_argument("-f", "--format", default="text", help="Options text|json|yaml")
    search.add_argument("term", nargs="+")
    search.set_defaults(func=search_main)

    options = parser.parse_args(args)
    return options