我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.getcwd()。
def _git_update_requirements(venv, package_dir, reqs_dir): """ Update from global requirements. Update an OpenStack git directory's requirements.txt and test-requirements.txt from global-requirements.txt. """ orig_dir = os.getcwd() os.chdir(reqs_dir) python = os.path.join(venv, 'bin/python') cmd = [python, 'update.py', package_dir] try: subprocess.check_call(cmd) except subprocess.CalledProcessError: package = os.path.basename(package_dir) error_out("Error updating {} from " "global-requirements.txt".format(package)) os.chdir(orig_dir)
def client_proc(job_id, data_file, rtask, task=None): # send input file to rtask.location; this will be saved to dispycos process's # working directory if (yield pycos.Pycos().send_file(rtask.location, data_file, timeout=10)) < 0: print('Could not send input data to %s' % rtask.location) # terminate remote task rtask.send(None) raise StopIteration(-1) # send info about input obj = C(job_id, data_file, random.uniform(5, 8), task) if (yield rtask.deliver(obj)) != 1: print('Could not send input to %s' % rtask.location) raise StopIteration(-1) # rtask sends result to this task as message result = yield task.receive() if not result.result_file: print('Processing %s failed' % obj.i) raise StopIteration(-1) # rtask saves results file at this client, which is saved in pycos's # dest_path, not current working directory! result_file = os.path.join(pycos.Pycos().dest_path, result.result_file) # move file to cwd target = os.path.join(os.getcwd(), os.path.basename(result_file)) os.rename(result_file, target) print(' job %s output is in %s' % (obj.i, target))
def build(self, file): if self.built: raise PermissionError("You cannot build multiple times!") if not self.loaded: self.load(file) old = os.getcwd() sys.path.append(os.path.dirname(os.path.abspath(file))) # for module import that aren't "include" call try: content = open(file, "rb").read() os.chdir(os.path.dirname(os.path.abspath(file))) # set the current working directory, for open() etc. exec(compile(content, file, 'exec'), self.user_functions) except Exception as err: print("An exception occured while building: ", file=sys.stderr) lines = traceback.format_exc(None, err).splitlines() print(" " + lines[-1], file=sys.stderr) for l in lines[3:-1]: print(l, file=sys.stderr) exit(1) os.chdir(old) sys.path.remove(os.path.dirname(os.path.abspath(file))) self.built = True
def load(self, file): if self.loaded: return sys.path.append(os.path.dirname(os.path.abspath(file))) # for module import that aren't "include" call old = os.getcwd() try: content = open(file, "rb").read() os.chdir(os.path.dirname(os.path.abspath(file))) # set the current working directory, for open() etc. exec(compile(content, file, 'exec'), self.user_functions) except Exception as err: print("An exception occured while loading: ", file=sys.stderr) lines = traceback.format_exc(None, err).splitlines() print(" " + lines[-1], file=sys.stderr) for l in lines[3:-1]: print(l, file=sys.stderr) exit(1) os.chdir(old) sys.path.remove(os.path.dirname(os.path.abspath(file))) self.loaded = True self.mem_offset = 0
def archive_context(filename): # extracting the archive tmpdir = tempfile.mkdtemp() log.warn('Extracting in %s', tmpdir) old_wd = os.getcwd() try: os.chdir(tmpdir) with get_zip_class()(filename) as archive: archive.extractall() # going in the directory subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0]) os.chdir(subdir) log.warn('Now working in %s', subdir) yield finally: os.chdir(old_wd) shutil.rmtree(tmpdir)
def StartFileServer(fileServerDir): """ Start file server. """ if not fileServerDir: message = \ "The PYUPDATER_FILESERVER_DIR environment variable is not set." if hasattr(sys, "frozen"): logger.error(message) return None else: fileServerDir = os.path.join(os.getcwd(), 'pyu-data', 'deploy') message += "\n\tSetting fileServerDir to: %s\n" % fileServerDir logger.warning(message) fileServerPort = GetEphemeralPort() thread = threading.Thread(target=RunFileServer, args=(fileServerDir, fileServerPort)) thread.start() WaitForFileServerToStart(fileServerPort) return fileServerPort
def save_users_and_groups_to_csv(user_data, csv_output_filepath): """ Creates a CSV file with exported user data :param user_data: The exported user data :param csv_output_filepath: The output file to save :return: None """ full_output_path = os.path.join(os.getcwd(), csv_output_filepath) with open(full_output_path, 'wb') as f: fields = ['email', 'lastname', 'firstname', 'groups'] w = csv.DictWriter(f, fields) w.writeheader() for key, val in sorted(user_data.items()): val['groups'] = ", ".join(val['groups'][0::2]) row = {'email': key} row.update(val) w.writerow(row)
def configure(logger, path_to_config_file, export_formats): """ instantiate and configure logger, load config settings from file, instantiate SafetyCulture SDK :param logger: the logger :param path_to_config_file: path to config file :param export_formats: desired export formats :return: instance of SafetyCulture SDK object, config settings """ config_settings = load_config_settings(logger, path_to_config_file) config_settings[EXPORT_FORMATS] = export_formats sc_client = sp.SafetyCulture(config_settings[API_TOKEN]) if config_settings[EXPORT_PATH] is not None: create_directory_if_not_exists(logger, config_settings[EXPORT_PATH]) else: logger.info('Invalid export path was found in ' + path_to_config_file + ', defaulting to /exports') config_settings[EXPORT_PATH] = os.path.join(os.getcwd(), 'exports') create_directory_if_not_exists(logger, config_settings[EXPORT_PATH]) return sc_client, config_settings
def abspath(path): """Return the absolute version of a path.""" if path: # Empty path must return current working directory. path = os.fspath(path) try: path = _getfullpathname(path) except OSError: pass # Bad path - return unchanged. elif isinstance(path, bytes): path = os.getcwdb() else: path = os.getcwd() return normpath(path) # realpath is a no-op on systems without islink support
def _candidate_tempdir_list(): """Generate a list of candidate temporary directories which _get_default_tempdir will try.""" dirlist = [] # First, try the environment. for envname in 'TMPDIR', 'TEMP', 'TMP': dirname = _os.getenv(envname) if dirname: dirlist.append(dirname) # Failing that, try OS-specific locations. if _os.name == 'nt': dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ]) else: dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ]) # As a last resort, the current directory. try: dirlist.append(_os.getcwd()) except (AttributeError, OSError): dirlist.append(_os.curdir) return dirlist
def init_work_dir(self): retval = os.getcwd() print '#current dir is : ' + retval # ?????? store_dir = retval + os.sep + 'tmp' print '#all imgs are going to be stored in dir :' + store_dir if not os.path.exists(store_dir): print '#tmp dir does not exist, attemp to mkdir' os.mkdir(store_dir) print '#mkdir sucessfully' else: print '#tmp dir is already exist' self.store_dir = store_dir # print '#now change current dir to tmp' # os.chdir(store_dir) #no neccessary # print os.getcwd()
def _run_local_lambda(self, lambda_config): prev_folder = os.getcwd() os.chdir(self.config.get_projectdir()) sys.path.append(self.config.get_projectdir()) lambda_name = lambda_config["FunctionName"] lambda_handler = self.import_function(lambda_config["Handler"]) # Run and set a counter start = time.time() results = lambda_handler({}, MockContext(lambda_name)) end = time.time() # restore folder os.chdir(prev_folder) # Print results logger.info("{0}".format(results)) logger.info("\nexecution time: {:.8f}s\nfunction execution " "timeout: {:2}s".format(end - start, lambda_config["Timeout"]))
def find_dotenv(filename='.env', raise_error_if_not_found=False, usecwd=False): """ Search in increasingly higher folders for the given file Returns path to the file if found, or an empty string otherwise """ if usecwd or '__file__' not in globals(): # should work without __file__, e.g. in REPL or IPython notebook path = os.getcwd() else: # will work for .py files frame_filename = sys._getframe().f_back.f_code.co_filename path = os.path.dirname(os.path.abspath(frame_filename)) for dirname in _walk_to_root(path): check_path = os.path.join(dirname, filename) if os.path.exists(check_path): return check_path if raise_error_if_not_found: raise IOError('File not found') return ''
def _changing_cd(f, *args, **kwargs): def inner(*args, **kwargs): try: state = State(args[0].view) except AttributeError: state = State(args[0].window.active_view()) old = os.getcwd() try: # FIXME: Under some circumstances, like when switching projects to # a file whose _cmdline_cd has not been set, _cmdline_cd might # return 'None'. In such cases, change to the actual current # directory as a last measure. (We should probably fix this anyway). os.chdir(state.settings.vi['_cmdline_cd'] or old) f(*args, **kwargs) finally: os.chdir(old) return inner
def test_comp_macro_directories_config_python(self): file_loc = os.getcwd() self.comp = sb.launch(self.cname, impl="python", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} ) fp = None try: fp = open('foo/bar/test.log','r') except: pass try: os.remove('foo/bar/test.log') except: pass try: os.rmdir('foo/bar') except: pass try: os.rmdir('foo') except: pass self.assertNotEquals(fp, None)
def test_comp_macro_directories_config_cpp(self): file_loc = os.getcwd() self.comp = sb.launch(self.cname, impl="cpp", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} ) fp = None try: fp = open('foo/bar/test.log','r') except: pass try: os.remove('foo/bar/test.log') except: pass try: os.rmdir('foo/bar') except: pass try: os.rmdir('foo') except: pass self.assertNotEquals(fp, None)
def test_comp_macro_directories_config_java(self): file_loc = os.getcwd() self.comp = sb.launch(self.cname, impl="java", execparams={'LOGGING_CONFIG_URI':'file://'+os.getcwd()+'/logconfig.cfg'} ) fp = None try: fp = open('foo/bar/test.log','r') except: pass try: os.remove('foo/bar/test.log') except: pass try: os.rmdir('foo/bar') except: pass try: os.rmdir('foo') except: pass self.assertNotEquals(fp, None)
def setUp(self): cfg = "log4j.rootLogger=TRACE,CONSOLE,FILE\n" + \ "log4j.debug=false\n" + \ "# Direct log messages to FILE\n" + \ "log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender\n" + \ "log4j.appender.CONSOLE.File=stdout\n" + \ "log4j.appender.FILE=org.apache.log4j.FileAppender\n" + \ "log4j.appender.FILE.File="+os.getcwd()+"/tmp_logfile.log\n" + \ "log4j.appender.CONSOLE.threshold=TRACE\n" + \ "log4j.appender.FILE.threshold=TRACE\n" + \ "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" + \ "log4j.appender.CONSOLE.layout.ConversionPattern=%p:%c - %m [%F:%L]%n\n" + \ "log4j.appender.FILE.layout=org.apache.log4j.PatternLayout\n" + \ "log4j.appender.FILE.layout.ConversionPattern=%d %p:%c - %m [%F:%L]%n\n" fp = open('tmp_logfile.config','w') fp.write(cfg) fp.close() nodebooter, self._domMgr = self.launchDomainManager() self._domBooter = nodebooter
def setUp(self): cfg = "log4j.rootLogger=DEBUG,STDOUT,FILE\n " + \ "# Direct log messages to FILE\n" + \ "log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender\n" + \ "log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout\n" + \ "log4j.appender.FILE=org.apache.log4j.FileAppender\n" + \ "log4j.appender.FILE.File=tmp_logfile.log\n" + \ "log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout\n" + \ "log4j.appender.CONSOLE.layout.ConversionPattern=%p:%c - %m [%F:%L]%n\n" + \ "log4j.appender.FILE.layout=org.apache.log4j.PatternLayout\n" + \ "log4j.appender.FILE.layout.ConversionPattern=%d %p:%c - %m [%F:%L]%n\n" fp = open('tmp_logfile.config','w') fp.write(cfg) fp.close() self.domBooter, self._domMgr = self.launchDomainManager(loggingURI=os.getcwd()+'/tmp_logfile.config') self.devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_ExecutableDevice_node/DeviceManager.dcd.xml") self._app = None
def get_json(org): d = {"nodes":[],"links":[]} for i in graph: dt = {} dt["id"]=i dt["group"]=1 d["nodes"].append(dt) for i in graph: for j in graph[i]: for k in j: dt={} dt["source"]=i dt["target"]=k[1::] dt["value"]=10 d["links"].append(dt) string_json = json.dumps(d) filename = org + ".json" f = open(os.path.join(os.getcwd(), "static", filename), "w") f.write(string_json) f.close()
def __init__(self, additional_compose_file=None, additional_services=None): # To resolve docker client server version mismatch issue. os.environ["COMPOSE_API_VERSION"] = "auto" dir_name = os.path.split(os.getcwd())[-1] self.project = "{}{}".format( re.sub(r'[^a-z0-9]', '', dir_name.lower()), getpass.getuser() ) self.additional_compose_file = additional_compose_file self.services = ["zookeeper", "schematizer", "kafka"] if additional_services is not None: self.services.extend(additional_services) # This variable is meant to capture the running/not-running state of # the dependent testing containers when tests start running. The idea # is, we'll only start and stop containers if they aren't already # running. If they are running, we'll just use the ones that exist. # It takes a while to start all the containers, so when running lots of # tests, it's best to start them out-of-band and leave them up for the # duration of the session. self.containers_already_running = self._are_containers_already_running()
def archive_context(filename): """ Unzip filename to a temporary directory, set to the cwd. The unzipped target is cleaned up after. """ tmpdir = tempfile.mkdtemp() log.warn('Extracting in %s', tmpdir) old_wd = os.getcwd() try: os.chdir(tmpdir) with ContextualZipFile(filename) as archive: archive.extractall() # going in the directory subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0]) os.chdir(subdir) log.warn('Now working in %s', subdir) yield finally: os.chdir(old_wd) shutil.rmtree(tmpdir)
def startRPC(self, port, eventListenerPort): logging.basicConfig(filename='worldpay-within-wrapper.log', level=logging.DEBUG) reqOS = ["darwin", "win32", "windows", "linux"] reqArch = ["x64", "ia32"] cfg = launcher.Config(reqOS, reqArch) launcherLocal = launcher.launcher() # define log file name for rpc agent, so e.g # for "runConsumerOWP.py" it will be: "rpc-wpwithin-runConsumerOWP.log" logfilename = os.path.basename(sys.argv[0]) logfilename = "rpc-wpwithin-" + logfilename.rsplit(".", 1)[0] + ".log" args = [] if eventListenerPort > 0: logging.debug(str(os.getcwd()) + "" + "-port " + str(port) + " -logfile " + logfilename + " -loglevel debug,warn,error,fatal,info" + " -callbackport " + str(eventListenerPort)) args = ['-port', str(port), '-logfile', logfilename, '-loglevel', 'debug,warn,error,fatal,info', '-callbackport', str(eventListenerPort)] else: logging.debug(str(os.getcwd()) + "" + "-port " + str(port) + " -logfile " + logfilename + " -loglevel debug,warn,error,fatal,info") args = ['-port', str(port), '-logfile', logfilename, '-loglevel', 'debug,warn,error,fatal,info'] process = launcherLocal.launch(cfg, os.getcwd() + "", args) return process
def compose(env): """Compose the configuration.""" offset = _get_utils(env) rsync = ["rsync", "-aczv", USER_FOLDER, os.path.join(offset, USER_FOLDER), "--delete-after", _get_exclude("*.pyc"), _get_exclude("README.md"), _get_exclude("__init__.py"), _get_exclude("__config__.py")] call(rsync, "rsync user definitions") here = os.getcwd() composition = ["python2.7", "config_compose.py", "--output", os.path.join(here, FILE_NAME)] call(composition, "compose configuration", working_dir=offset)
def test_RTagsClientUpdateBuffers(self): try: os.chdir("dirty") except OSError: print("Test Error: Couldn't cd into 'dirty' test directory.") raise self.assertTrue(self.cmake_build_info["build_dir"].is_dir()) self.assertTrue(self.cmake_build_info["comp_data_cmake"].is_file()) self.plugin.setup_rtags_daemon() self.plugin.connect_rtags_client() self.plugin.update_rtags_buffers( [str(src_info["test_cpp"]), str(src_info["cpp"])]) try: rtags_client_status = subprocess.check_output( self.cmake_cmd_info["rtags_buffers"]) except subprocess.CalledProcessError as e: print(e.output) filepath = os.getcwd() + str(src_info["test_cpp"]) self.assertTrue(str(rtags_client_status).find(filepath))
def load_dynamic_config(configurations, config_dir=getcwd()): """Load and parse dynamic config""" # Create full path of config config_file = '{path}/config.py'.format(path=config_dir) # Insert config path so we can import it sys.path.insert(0, path.dirname(path.abspath(config_file))) try: config_module = __import__('config') for key, value in config_module.CONFIG.items(): LOG.debug('Importing %s with key %s', key, value) # Update configparser object configurations.update({key: value}) except ImportError: # Provide a default if config not found configurations = {}
def archive_context(filename): # extracting the archive tmpdir = tempfile.mkdtemp() log.warn('Extracting in %s', tmpdir) old_wd = os.getcwd() try: os.chdir(tmpdir) with ContextualZipFile(filename) as archive: archive.extractall() # going in the directory subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0]) os.chdir(subdir) log.warn('Now working in %s', subdir) yield finally: os.chdir(old_wd) shutil.rmtree(tmpdir)
def pquery(command, stdin=None, **kwargs): if very_verbose: info('Query "'+' '.join(command)+'" in '+getcwd()) try: proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs) except OSError as e: if e[0] == errno.ENOENT: error( "Could not execute \"%s\".\n" "Please verify that it's installed and accessible from your current path by executing \"%s\".\n" % (command[0], command[0]), e[0]) else: raise e stdout, _ = proc.communicate(stdin) if very_verbose: log(str(stdout).strip()+"\n") if proc.returncode != 0: raise ProcessException(proc.returncode, command[0], ' '.join(command), getcwd()) return stdout
def seturl(url): info("Setting url to \"%s\" in %s" % (url, getcwd())) hgrc = os.path.join('.hg', 'hgrc') tagpaths = '[paths]' remote = 'default' lines = [] try: with open(hgrc) as f: lines = f.read().splitlines() except IOError: pass if tagpaths in lines: idx = lines.index(tagpaths) m = re.match(r'^([\w_]+)\s*=\s*(.*)$', lines[idx+1]) if m: remote = m.group(1) del lines[idx+1] lines.insert(idx, remote+' = '+url) else: lines.append(tagpaths) lines.append(remote+' = '+url)
def unignore(dest): Hg.ignore_file = os.path.join('.hg', 'hgignore') try: with open(Hg.ignore_file) as f: lines = f.read().splitlines() except IOError: lines = [] if dest in lines: lines.remove(dest) try: with open(Hg.ignore_file, 'w') as f: f.write('\n'.join(lines) + '\n') except IOError: error("Unable to write ignore file in \"%s\"" % os.path.join(getcwd(), Hg.ignore_file), 1) # pylint: disable=no-self-argument, no-method-argument, no-member, no-self-use, unused-argument
def checkout(rev, clean=False): if not rev: return info("Checkout \"%s\" in %s" % (rev, os.path.basename(getcwd()))) branch = None refs = Git.getbranches(rev) for ref in refs: # re-associate with a local or remote branch (rev is the same) m = re.match(r'^(.*?)\/(.*?)$', ref) if m and m.group(2) != "HEAD": # matches origin/<branch> and isn't HEAD ref if not os.path.exists(os.path.join('.git', 'refs', 'heads', m.group(2))): # okay only if local branch with that name doesn't exist (git will checkout the origin/<branch> in that case) branch = m.group(2) elif ref != "HEAD": branch = ref # matches local branch and isn't HEAD ref if branch: info("Revision \"%s\" matches a branch \"%s\" reference. Re-associating with branch" % (rev, branch)) popen([git_cmd, 'checkout', branch] + ([] if very_verbose else ['-q'])) break if not branch: popen([git_cmd, 'checkout', rev] + (['-f'] if clean else []) + ([] if very_verbose else ['-q']))
def update(rev=None, clean=False, clean_files=False, is_local=False): if not is_local: Git.fetch() if clean: Git.discard(clean_files) if rev: Git.checkout(rev, clean) else: remote = Git.getremote() branch = Git.getbranch() if remote and branch: try: Git.merge('%s/%s' % (remote, branch)) except ProcessException: pass else: err = "Unable to update \"%s\" in \"%s\"." % (os.path.basename(getcwd()), getcwd()) if not remote: info(err+" The local repository is not associated with a remote one.") if not branch: info(err+" Working set is not on a branch.")
def ignore(dest): try: with open(Git.ignore_file) as f: exists = dest in f.read().splitlines() except IOError: exists = False if not exists: try: ignore_file_parent_directory = os.path.dirname(Git.ignore_file) if not os.path.exists(ignore_file_parent_directory): os.mkdir(ignore_file_parent_directory) with open(Git.ignore_file, 'a') as f: f.write(dest.replace("\\", "/") + '\n') except IOError: error("Unable to write ignore file in \"%s\"" % os.path.join(getcwd(), Git.ignore_file), 1)
def fromrepo(cls, path=None): repo = cls() if path is None: path = Repo.findparent(getcwd()) if path is None: error( "Could not find mbed program in current path \"%s\".\n" "You can fix this by calling \"mbed new .\" or \"mbed config root .\" in the root of your program." % getcwd()) repo.path = os.path.abspath(path) repo.name = os.path.basename(repo.path) cache_cfg = Global().get_cfg('CACHE', '') if cache_repositories and cache_cfg and cache_cfg != 'none' and cache_cfg != 'off' and cache_cfg != 'disabled': loc = cache_cfg if (cache_cfg and cache_cfg != 'on' and cache_cfg != 'enabled') else None repo.cache = loc or os.path.join(tempfile.gettempdir(), 'mbed-repo-cache') repo.sync() if repo.scm is None: warning( "Program \"%s\" in \"%s\" does not use source control management.\n" "To fix this you should use \"mbed new .\" in the root of your program." % (repo.name, repo.path)) return repo
def pathtype(cls, path=None): path = os.path.abspath(path or getcwd()) depth = 0 while cd(path): tpath = path path = Repo.findparent(path) if path: depth += 1 path = os.path.split(path)[0] if tpath == path: # Reached root. break else: break return "directory" if depth == 0 else ("program" if depth == 1 else "library")
def __init__(self, path=None, print_warning=False): path = os.path.abspath(path or getcwd()) self.path = path self.is_cwd = True while cd(path): tpath = path if os.path.isfile(os.path.join(path, Cfg.file)): self.path = path self.is_cwd = False break path = os.path.split(path)[0] if tpath == path: # Reached root. break self.name = os.path.basename(self.path) self.is_classic = os.path.isfile(os.path.join(self.path, 'mbed.bld')) # is_cwd flag indicates that current dir is assumed to be root, not root repo if self.is_cwd and print_warning: warning( "Could not find mbed program in current path \"%s\".\n" "You can fix this by calling \"mbed new .\" in the root of your program." % self.path)
def main(): # Add examples to path. parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir)) sys.path.insert(0, parent_dir) # Grab all example files. example_dir = os.path.join(os.getcwd(), "..", "examples") example_files = [f for f in os.listdir(example_dir) if os.path.isfile(os.path.join(example_dir, f)) and "py" == f.split(".")[-1] and "init" not in f and "viz_exam" not in f] print("\n" + "="*32) print("== Running", len(example_files), "simple_rl tests ==") print("="*32 + "\n") total_passed = 0 for i, ex in enumerate(example_files): print("\t [Test", str(i + 1) + "] ", ex + ": ",) result = run_example(os.path.join(example_dir, ex)) if result: total_passed += 1 print("\t\tPASS.") else: print("\t\tFAIL.") print("\nResults:", total_passed, "/", len(example_files), "passed.")
def create(ctx, maintype, subtype, app_name, factory, args): args = args or [] maintype = maintype or ctx.obj['SETTINGS'].get( 'default_maintype', 'python') subtype = subtype or ctx.obj['SETTINGS'].get( 'default_subtype', 'app') click.echo('Type: {};\t Subtype: {};\t App name: {};'.format( maintype, subtype, app_name)) current_dir = os.getcwd() additional_dirs = ctx.obj['SETTINGS'].get('templates', []) factory_module, path = get_factory(maintype, factory, additional_dirs) if not factory_module: click.echo('ERROR: factory not found:{}'.format(maintype), err=True) exit(1) app_factory = factory_module.AppFactory(path, args) app_factory.setup(subtype, app_name, current_dir) app_factory.set_context(ctx.obj['SETTINGS'].get('context', {})) app_factory.run() click.echo('Done!')
def chdir(directory): """Change the current working directory to a different directory for a code block and return the previous directory after the block exits. Useful to run commands from a specificed directory. :param str directory: The directory path to change to for this context. """ cur = os.getcwd() try: yield os.chdir(directory) finally: os.chdir(cur)
def _add_services(self, this_service, other_services): """Add services. Add services to the deployment where this_service is the local charm that we're testing and other_services are the other services that are being used in the local amulet tests. """ if this_service['name'] != os.path.basename(os.getcwd()): s = this_service['name'] msg = "The charm's root directory name needs to be {}".format(s) amulet.raise_status(amulet.FAIL, msg=msg) if 'units' not in this_service: this_service['units'] = 1 self.d.add(this_service['name'], units=this_service['units'], constraints=this_service.get('constraints')) for svc in other_services: if 'location' in svc: branch_location = svc['location'] elif self.series: branch_location = 'cs:{}/{}'.format(self.series, svc['name']), else: branch_location = None if 'units' not in svc: svc['units'] = 1 self.d.add(svc['name'], charm=branch_location, units=svc['units'], constraints=svc.get('constraints'))
def show_io(input_dir, output_dir): ''' show directory structure and inputs and autputs to scoring program''' swrite('\n=== DIRECTORIES ===\n\n') # Show this directory swrite("-- Current directory " + pwd() + ":\n") write_list(ls('.')) write_list(ls('./*')) write_list(ls('./*/*')) swrite("\n") # List input and output directories swrite("-- Input directory " + input_dir + ":\n") write_list(ls(input_dir)) write_list(ls(input_dir + '/*')) write_list(ls(input_dir + '/*/*')) write_list(ls(input_dir + '/*/*/*')) swrite("\n") swrite("-- Output directory " + output_dir + ":\n") write_list(ls(output_dir)) write_list(ls(output_dir + '/*')) swrite("\n") # write meta data to sdterr swrite('\n=== METADATA ===\n\n') swrite("-- Current directory " + pwd() + ":\n") try: metadata = yaml.load(open('metadata', 'r')) for key,value in metadata.items(): swrite(key + ': ') swrite(str(value) + '\n') except: swrite("none\n"); swrite("-- Input directory " + input_dir + ":\n") try: metadata = yaml.load(open(os.path.join(input_dir, 'metadata'), 'r')) for key,value in metadata.items(): swrite(key + ': ') swrite(str(value) + '\n') swrite("\n") except: swrite("none\n");