我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用sh.ErrorReturnCode()。
def _has_commit(version, debug=False): """ Determine a version is a local git commit sha or not. :param version: A string containing the branch/tag/sha to be determined. :param debug: An optional bool to toggle debug output. :return: bool """ if _has_tag(version, debug) or _has_branch(version, debug): return False cmd = sh.git.bake('cat-file', '-e', version) try: util.run_command(cmd, debug=debug) return True except sh.ErrorReturnCode: return False
def delete_path(self, path_or_fn, cascade=True): """Simple kubectl delete wrapper. :param path_or_fn: Path or filename of yaml resource descriptions """ LOG.info('(-) kubectl delete -f %s', path_or_fn) try: self.kubectl.delete( '-f', path_or_fn, '--namespace={}'.format(self.config.namespace), '--context={}'.format(self.config.context), '--cascade={}'.format('true' if cascade else 'false') ) except sh.ErrorReturnCode: return False return True
def update(): for organization in get_organization_list(): if not os.path.exists(os.path.join(get_env_path(organization), '.git')): info("Skip %s" % organization) continue info("Updating %s ..." % organization) os.chdir(get_env_path(organization)) if not git('remote').strip(): info("No remotes are set for this organization, skipping") continue try: vgit('pull') except ErrorReturnCode: fatal("Unable to update the organizations.") run_galaxy_install(organization) success("All the organizations have been updated.")
def _create_archive(self, archive_path, export_dir): """ Create the final archive of all the exported project files. Args: archive_path: Path to the ``tar.xz`` archive. export_dir: Path to the directory containing the files to export. """ try: logger.info('Creating archive %s', archive_path) create_archive = tar.bake(create=True, xz=True, verbose=True, file=archive_path, directory=export_dir, _fg=True, _out=sys.stdout, _err=sys.stderr) create_archive(self._project_name) except ErrorReturnCode as e: raise CommandError('Failed to archive project - %s' % e)
def _gen_html(self, lcov_info_path): """ Generate an LCOV HTML report. Returns the directory containing the HTML report. """ from sh import genhtml, ErrorReturnCode lcov_html_dir = self.project_path('s2e-last', 'lcov') try: genhtml(lcov_info_path, output_directory=lcov_html_dir, _out=sys.stdout, _err=sys.stderr, _fg=True) except ErrorReturnCode as e: raise CommandError(e) return lcov_html_dir
def _update_s2e_sources(self): """ Update all of the S2E repositories with repo. """ repo = sh.Command(self.install_path('bin', 'repo')) # cd into the S2E source directory orig_dir = os.getcwd() os.chdir(self.source_path('s2e')) try: logger.info('Updating S2E') repo.sync(_out=sys.stdout, _err=sys.stderr, _fg=True) except ErrorReturnCode as e: raise CommandError(e) finally: # Change back to the original directory os.chdir(orig_dir) # Success! logger.success('Updated S2E')
def test_readme(cookies): """The generated README.rst file should pass some sanity checks and validate as a PyPI long description.""" extra_context = {'repo_name': 'helloworld'} with bake_in_temp_dir(cookies, extra_context=extra_context) as result: readme_file = result.project.join('README.rst') readme_lines = [x.strip() for x in readme_file.readlines(cr=False)] assert 'helloworld' in readme_lines assert 'The full documentation is at https://helloworld.readthedocs.org.' in readme_lines setup_path = str(result.project.join('setup.py')) try: sh.python(setup_path, 'check', restructuredtext=True, strict=True) except sh.ErrorReturnCode as exc: pytest.fail(str(exc))
def check_quality(result): """Run quality tests on the given generated output.""" for dirpath, _dirnames, filenames in os.walk(str(result.project)): pylintrc = str(result.project.join('pylintrc')) for filename in filenames: name = os.path.join(dirpath, filename) if not name.endswith('.py'): continue try: sh.pylint(name, rcfile=pylintrc) sh.pylint(name, py3k=True) sh.pycodestyle(name) if filename != 'setup.py': sh.pydocstyle(name) sh.isort(name, check_only=True) except sh.ErrorReturnCode as exc: pytest.fail(str(exc)) tox_ini = result.project.join('tox.ini') docs_build_dir = result.project.join('docs/_build') try: # Sanity check the generated Makefile sh.make('help') # quality check docs sh.doc8(result.project.join("README.rst"), ignore_path=docs_build_dir, config=tox_ini) sh.doc8(result.project.join("docs"), ignore_path=docs_build_dir, config=tox_ini) except sh.ErrorReturnCode as exc: pytest.fail(str(exc))
def test_custom_yaml(): from sh import ErrorReturnCode, chmod, ldap2pg, rm LDAP2PG_CONFIG = 'my-test-ldap2pg.yml' rm('-f', LDAP2PG_CONFIG) with pytest.raises(ErrorReturnCode): ldap2pg(_env=dict(os.environ, LDAP2PG_CONFIG=LDAP2PG_CONFIG)) yaml = YAML_FMT % os.environ with open(LDAP2PG_CONFIG, 'w') as fo: fo.write(yaml) # Purge env from value set in file. Other are reads from ldaprc. blacklist = ('LDAPURI', 'LDAPHOST', 'LDAPPORT', 'LDAPPASSWORD') ldapfree_env = dict( (k, v) for k, v in os.environ.items() if k not in blacklist ) # Ensure world readable password is denied with pytest.raises(ErrorReturnCode): ldap2pg(config=LDAP2PG_CONFIG, _env=ldapfree_env) # And that fixing file mode do the trick. chmod('0600', LDAP2PG_CONFIG) ldap2pg('--config', LDAP2PG_CONFIG, _env=ldapfree_env)
def _has_tag(version, debug=False): """ Determine a version is a local git tag name or not. :param version: A string containing the branch/tag/sha to be determined. :param debug: An optional bool to toggle debug output. :return: bool """ cmd = sh.git.bake('show-ref', '--verify', '--quiet', "refs/tags/{}".format(version)) try: util.run_command(cmd, debug=debug) return True except sh.ErrorReturnCode: return False
def _has_branch(version, debug=False): """ Determine a version is a local git branch name or not. :param version: A string containing the branch/tag/sha to be determined. :param debug: An optional bool to toggle debug output. :return: bool """ cmd = sh.git.bake('show-ref', '--verify', '--quiet', "refs/heads/{}".format(version)) try: util.run_command(cmd, debug=debug) return True except sh.ErrorReturnCode: return False
def test_flake8_compliance(cookies): """generated project should pass flake8""" result = cookies.bake() try: sh.flake8(str(result.project)) except sh.ErrorReturnCode as e: pytest.fail(e)
def _wait_running_commands(self): for search_index, running_command in self.running_commands: training_label = self.training_label(search_index) logging('Waiting {} to finish..'.format(training_label)) try: running_command.wait() except sh.ErrorReturnCode as e: logging('{} returned a non-zero code!'.format(training_label)) except: traceback.print_exc(file=sys.stderr)
def _find_java_home(self): if self._java_home: return self._java_home # First check if the enviornment variable is set. java_home = os.environ.get("JAVA_HOME", None) if java_home: return java_home # On OS X, there's a magical command that gives you $JAVA_HOME if sys.platform == "darwin": try: cmd = sh.Command("/usr/libexec/java_home") return cmd().strip() except sh.ErrorReturnCode: pass # If only one Java is installed in the default Linux JVM folder, use # that if sys.platform in {"linux", "linux2"}: if os.path.isdir(self.DEFAULT_LINUX_JVM): javas = os.listdir(self.DEFAULT_LINUX_JVM) if len(javas) == 1: return javas[0] # Give up return None
def describe(self, name): """Return a yaml-ish text blob. Not helpful for automation, very helpful for humans. """ try: return self.kubectl.describe( self.url_type, name, '--context={}'.format(self.config.context), '--namespace={}'.format(self.config.namespace) ) except sh.ErrorReturnCode as err: logging.error("Unexpected response: %s", err)
def delete(self, name): """Delete the named resource. TODO: should be easy to rewrite this as a kube api delete call instead of going through kubectl. """ try: self.kubectl.delete( self.url_type, name, '--context={}'.format(self.config.context), '--namespace={}'.format(self.config.namespace) ) except sh.ErrorReturnCode as err: logging.error("Unexpected response: %s", err)
def pip_install(ctx, *specifiers): # type: (click.Context, str) -> None try: result = sh.pip.install(*specifiers, _err_to_out=True, _iter=True) for line in result: click.echo(line, nl=False) except sh.ErrorReturnCode: ctx.abort()
def test_flake8_compliance(cookies): """generated project should pass flake8""" result = cookies.bake() #try: # sh.flake8(str(result.project)) #except sh.ErrorReturnCode as e: # print(e) # pytest.fail(str(e))
def _get_code(self, nmpi_job, job_desc): """ Obtain the code and place it in the working directory. If the experiment description is the URL of a Git repository, try to clone it. If it is the URL of a zip or .tar.gz archive, download and unpack it. Otherwise, the content of "code" is the code: write it to a file. """ url_candidate = urlparse(nmpi_job['code']) logger.debug("Get code: %s %s", url_candidate.netloc, url_candidate.path) if url_candidate.scheme and url_candidate.path.endswith((".tar.gz", ".zip", ".tgz")): self._create_working_directory(job_desc.working_directory) target = os.path.join(job_desc.working_directory, os.path.basename(url_candidate.path)) #urlretrieve(nmpi_job['code'], target) # not working via KIP https proxy curl(nmpi_job['code'], '-o', target) logger.info("Retrieved file from {} to local target {}".format(nmpi_job['code'], target)) if url_candidate.path.endswith((".tar.gz", ".tgz")): tar("xzf", target, directory=job_desc.working_directory) elif url_candidate.path.endswith(".zip"): try: # -o for auto-overwrite unzip('-o', target, d=job_desc.working_directory) except: logger.error("Could not unzip file {}".format(target)) else: try: # Check the "code" field for a git url (clone it into the workdir) or a script (create a file into the workdir) # URL: use git clone git.clone('--recursive', nmpi_job['code'], job_desc.working_directory) logger.info("Cloned repository {}".format(nmpi_job['code'])) except (sh.ErrorReturnCode_128, sh.ErrorReturnCode): # SCRIPT: create file (in the current directory) logger.info("The code field appears to be a script.") self._create_working_directory(job_desc.working_directory) with codecs.open(job_desc.arguments[0], 'w', encoding='utf8') as job_main_script: job_main_script.write(nmpi_job['code'])
def run(self, resume=1): """Execute ansible-playbook using information gathered from config. Args: resume (int): Used as list index - 1 from which to resume workflow. """ # list index to start working on (for support of --resume) try: i = int(resume) - 1 except ValueError: # generally if passed a non-int i = 0 cmds = self._config.playbook_cmds kwargs = { '_out': self._print_stdout, '_err': self._print_stderr, '_env': self._config.env } for counter, cmd in enumerate(cmds): # skip execution until we reach our --resume index # using a list slice doesn't work here since we need to be aware of # the full list to produce a resume index on failure if counter < i: continue try: sh.ansible_playbook(*cmd, **kwargs) except (sh.ErrorReturnCode, sh.ErrorReturnCode_1): msg = ('An error was encountered during playbook execution. ' 'Please resolve manually and then use the following ' 'command to resume execution of this script:\n\n') cmd = self._construct_resume_cli(counter + 1) print(colorama.Fore.RED + msg + cmd) sys.exit(1)
def check_project_result(result): """ Method to common project baking verification """ assert result.exit_code == 0 assert result.exception is None assert result.project.isdir() # Check project with flake8 try: sh.flake8(str(result.project)) except sh.ErrorReturnCode as e: pytest.fail(e)
def update(): for inventory in os.listdir(inventory_path): if not os.path.exists(os.path.join(inventory_path, inventory, '.git')): info("Skip %s" % inventory) continue info("Updating %s ..." % inventory) os.chdir(os.path.join(inventory_path, inventory)) try: vgit('pull') except ErrorReturnCode: fatal("Unable to update the inventories.") success("All the inventories have been updated.")
def update_inventory(name, dest_path): if not os.path.exists(os.path.join(dest_path, '.git')): warning("The %s inventory is not a git repository and has not " "been updated." % name) return click.echo('We will update the %s inventory' % name) os.chdir(dest_path) try: vgit('pull') except ErrorReturnCode: fatal("Unable to update the inventory %s." % name) success("The %s inventory has been updated." % name)
def install(name, path): """ Install inventories. """ if not name: update() return if not name.isalnum(): fatal("Your inventory name should only contains alphanumeric " "characters.") dest_path = os.path.join(inventory_path, name) if os.path.exists(dest_path): update_inventory(name, dest_path) return if not path: fatal("You must specify a path to a local directory or an URL to a " "git repository to install a new inventory.") if os.path.exists(path) and os.path.isdir(path): if not os.path.exists(os.path.dirname(dest_path)): os.mkdir(os.path.dirname(dest_path)) os.symlink(path, dest_path) else: click.echo('We will clone %s in %s\n' % (path, dest_path)) try: vgit('clone', path, dest_path) except ErrorReturnCode: fatal("Unable to install the inventory %s." % name) success("The %s inventory has been installed." % name)
def update_organization(name, dest_path): click.echo('We will update the %s organization' % name) os.chdir(dest_path) if os.path.exists(os.path.join(dest_path, '.git')): try: vgit('pull') except ErrorReturnCode: fatal("Unable to update the organization %s." % name) success("The %s organization has been updated." % name) run_galaxy_install(name)
def _get_git_root(): """ Retrieve the git directory, or prompt to create one if not found """ git_root = None try: git_root = str(git('rev-parse', '--show-toplevel')).strip() except ErrorReturnCode as e: if e.exit_code != 128: fatal(e.message, e.exit_code) if not git_root: warning('You must be in a git repository directory to ' 'initialize a new project.') if not click.confirm('Do you want to create a new git ' 'repository here?', default=True): fatal('Please run %s' % style('git init', bold=True)) try: vgit('init') git_root = os.getcwd() except ErrorReturnCode as e: fatal('An error occurred when trying to initialize a ' 'new repo.', e.exit_code) if git_root == aeriscloud_path: fatal('You cannot init AerisCloud from the AerisCloud directory!') return git_root
def _run_nosetests(): click.echo('Running unit tests ... ', nl=False) nose_bin = os.path.join(aeriscloud_path, 'venv/bin/nosetests') errors = 0 try: python(nose_bin, '-v', '--with-id', module_path(), _err_to_out=True, _ok_code=[0]) click.echo('[%s]' % click.style('OK', fg='green')) except ErrorReturnCode as e: click.echo('[%s]\n' % click.style('FAIL', fg='red')) for line in e.stdout.split('\n')[:-2]: if line.startswith('#'): print(line) (id, name, test_file, ellipsis, res) = line.rstrip().split(' ') if res == 'ok': res = click.style(res, fg='green', bold=True) elif res == 'FAIL': res = click.style(res, fg='red', bold=True) line = ' '.join([ click.style(id, bold=True, fg='yellow'), click.style(name, fg='blue'), test_file, ellipsis, res ]) elif line.startswith('FAIL:'): (_, name, test_file) = line.split(' ') line = ' '.join([ click.style('FAIL', bold=True, fg='red') + ':', click.style(name, fg='blue'), test_file ]) click.echo(' ' + line) errors += 1 return errors
def _retry(func, *args, **kwargs): for _ in range(300): try: func(*args, **kwargs) break except sh.ErrorReturnCode: sleep(0.1) else: raise
def install_docker(version=None, container_id=None): container_id = container_id or work.last_container_id try: quiet_docker('exec', container_id, *'which docker'.split(' ')) logger.info('Docker already installed on container. Doing nothing') return except sh.ErrorReturnCode: pass cp(resources.DIR / 'docker.repo', ':/etc/yum.repos.d/docker.repo', container_id=container_id) version = version or _get_docker_version() install_docker_command = 'yum install -y -q docker-engine-{}'.format( version) docker('exec', container_id, *install_docker_command.split(' '))
def _get_docker_version(): try: version = quiet_docker.version('-f', '{{.Client.Version}}').strip() except sh.ErrorReturnCode as e: version = e.stdout.strip() # Replacing the -ce in the version with .ce, as the versions in # https://yum.dockerproject.org/repo/main/centos/7/Packages/ # adhere to this notation if version.endswith('-ce'): version = version.replace('-ce', '.ce') return version
def _ssh_setup(container_id, container_ip): logger.info('Applying ssh configuration to manager container') try: known_hosts = path('~/.ssh/known_hosts').expanduser() # Known hosts file may not exist ssh_keygen('-R', container_ip) fingerprint = None while not fingerprint: fingerprint = ssh_keyscan( container_ip).stdout.split('\n')[0].strip() time.sleep(0.01) if fingerprint and known_hosts.exists(): current = known_hosts.text() prefix = '' if not current.endswith('\n'): prefix = '\n' known_hosts.write_text( '{}{}\n'.format(prefix, fingerprint), append=True) except sh.ErrorReturnCode: pass quiet_docker('exec', container_id, 'mkdir', '-p', '/root/.ssh') ssh_public_key = ssh_keygen('-y', '-f', configuration.ssh_key_path).strip() with tempfile.NamedTemporaryFile() as f: f.write(ssh_public_key) f.flush() quiet_docker.cp(f.name, '{}:/root/.ssh/authorized_keys'.format( container_id)) # due to a bug in docker 17.06, the file keeps ownership and is not # chowned to the main container user automatically quiet_docker('exec', container_id, 'chown', 'root:root', '/root/.ssh/authorized_keys')
def _get_qstat_job_state(self): try: self.logger.debug('getting qstat infos') ssh_output = self.ssh_host(self.cfg.path_qstat, self.job_id) self.logger.debug('qstat output:\n{}'.format(ssh_output)) if ssh_output != '': # slice the header and the last line which is empty jobs_displayed = ssh_output.split('\n')[2:-1] for job in jobs_displayed: # remove whitespace job_info = ' '.join(job.split()) # split into stuff (self.job_id, job_name, job_user, job_time, job_status, job_queue) = job_info.split(' ') self.logger.debug( 'job {} has status {}'.format(self.job_id, job_status)) return job_status else: return None except ErrorReturnCode as e: self.logger.error('\nError in ssh call:\n{}'.format(e)) print e.stderr exit(1)
def main(argv=None): argv = argv or sys.argv[1:] if {'-h', '-help', '--help'}.intersection(argv): sh.ffmpeg(help=True, _fg=True) return 0 notifier = ProgressNotifier() try: sh.ffmpeg( sys.argv[1:], _in=queue.Queue(), _err=notifier, _out_bufsize=0, _err_bufsize=0, #_in_bufsize=0, _no_out=True, _no_pipe=True, _tty_in=True, #_fg=True, #_bg=True, ) except sh.ErrorReturnCode as err: print(notifier.lines[-1]) return err.exit_code else: print() return 0
def run(self): ''' run command ''' if self.roles is not None: print("Roles:\n{0}".format(yaml.dump(self.roles, default_flow_style=False))) if self.excludes is not None: print("Excludes:\n{0}".format(yaml.dump(self.excludes, default_flow_style=False))) starting_dir = '.' if self.roles is not None: starting_dir = 'roles' molecule_dirs = find_dirs(starting_dir, self.excludes, self.roles, 'molecule') print("Found:\n{0}".format(yaml.dump(molecule_dirs, default_flow_style=False))) base_dir = os.getcwd() errors = "" warnings = "" for role in molecule_dirs: role = os.path.dirname(role) print("Testing: {0}".format(role)) os.chdir(role) try: print(sh.molecule.test()) except ErrorReturnCode as e: print(e.stdout) errors += e.stdout os.chdir(base_dir) if len(warnings) > 0: print("Warnings:\n{0}\n".format(warnings)) if len(errors) > 0: print("Errors:\n{0}\n".format(errors)) sys.exit(1)
def git_clone(git_repo_url, git_repo_dir): try: logger.info('Fetching from %s to %s', git_repo_url, git_repo_dir) git.clone(git_repo_url, git_repo_dir, _out=sys.stdout, _err=sys.stderr, _fg=True) except ErrorReturnCode as e: raise CommandError(e)
def _decompress(path): """ Decompress a .tar.xz file at the given path. The decompressed data will be located in the same directory as ``path``. """ logger.info('Decompressing %s', path) try: tar(extract=True, xz=True, verbose=True, file=path, directory=os.path.dirname(path), _fg=True, _out=sys.stdout, _err=sys.stderr) except ErrorReturnCode as e: raise CommandError(e)
def _install_dependencies(): """ Install S2E's dependencies. Only apt-get is supported for now. """ logger.info('Installing S2E dependencies') ubuntu_ver = _get_ubuntu_version() if not ubuntu_ver: return install_packages = CONSTANTS['dependencies']['common'] + \ CONSTANTS['dependencies']['ubuntu_%d' % ubuntu_ver] + \ CONSTANTS['dependencies']['ida'] try: # Enable 32-bit libraries dpkg_add_arch = sudo.bake('dpkg', add_architecture=True, _fg=True) dpkg_add_arch('i386') # Perform apt-get install apt_get = sudo.bake('apt-get', _fg=True) apt_get.update() apt_get.install(install_packages) except ErrorReturnCode as e: raise CommandError(e)
def _get_s2e_sources(env_path): """ Download the S2E manifest repository and initialize all of the S2E repositories with repo. """ # Download repo repo = _get_repo(env_path) s2e_source_path = os.path.join(env_path, 'source', 's2e') # Create the S2E source directory and cd to it to run repo os.mkdir(s2e_source_path) orig_dir = os.getcwd() os.chdir(s2e_source_path) git_url = CONSTANTS['repos']['url'] git_s2e_repo = CONSTANTS['repos']['s2e'] try: # Now use repo to initialize all the repositories logger.info('Fetching %s from %s', git_s2e_repo, git_url) repo.init(u='%s/%s' % (git_url, git_s2e_repo), _out=sys.stdout, _err=sys.stderr, _fg=True) repo.sync(_out=sys.stdout, _err=sys.stderr, _fg=True) except ErrorReturnCode as e: # Clean up - remove the half-created S2E environment shutil.rmtree(env_path) raise CommandError(e) finally: # Change back to the original directory os.chdir(orig_dir) # Success! logger.success('Fetched %s', git_s2e_repo)
def _get_project_name(archive): """ Get the project name from the archive. The project name is the name of the root directory in the archive. """ try: contents = tar(exclude='*/*', list=True, file=archive) return os.path.dirname(str(contents)) except ErrorReturnCode as e: raise CommandError('Failed to list archive - %s' % e)
def handle(self, *args, **options): # Exit if the makefile doesn't exist makefile = self.env_path('source', 's2e', 'Makefile') if not os.path.isfile(makefile): raise CommandError('No makefile found in %s' % os.path.dirname(makefile)) # If the build directory doesn't exist, create it build_dir = self.env_path('build', 's2e') if not os.path.isdir(build_dir): os.mkdir(build_dir) # Set up some environment variables env_vars = os.environ.copy() env_vars['S2EPREFIX'] = self.install_path() components = options['components'] self._make = sh.Command('make').bake(directory=build_dir, file=makefile, _env=env_vars) # If the user has specified any components to rebuild, do this before # the build if components: self._rebuild_components(components) try: # Run make if options['debug']: logger.info('Building S2E (debug) in %s', build_dir) self._make('all-debug', _out=sys.stdout, _err=sys.stderr, _fg=True) else: logger.info('Building S2E (release) in %s', build_dir) self._make('install', _out=sys.stdout, _err=sys.stderr, _fg=True) except ErrorReturnCode as e: raise CommandError(e) return 'S2E built'
def _invoke_make(self, img_build_dir, rule_names, num_cores, iso_dir=''): env = os.environ.copy() env['S2E_INSTALL_ROOT'] = self.install_path() env['S2E_LINUX_KERNELS_ROOT'] = \ self.source_path(CONSTANTS['repos']['images']['linux']) env['OUTDIR'] = self.image_path() if iso_dir: env['ISODIR'] = iso_dir logger.debug('Invoking makefile with:') logger.debug('export S2E_INSTALL_ROOT=%s', env['S2E_INSTALL_ROOT']) logger.debug('export S2E_LINUX_KERNELS_ROOT=%s', env['S2E_LINUX_KERNELS_ROOT']) logger.debug('export OUTDIR=%s', env['OUTDIR']) logger.debug('export ISODIR=%s', env.get('ISODIR', '')) if not self._headless: env['GRAPHICS'] = '' else: logger.warn('Image creation will run in headless mode. ' 'Use --gui to see graphic output for debugging.') try: make = sh.Command('make').bake(file=os.path.join(img_build_dir, 'Makefile'), directory=self.image_path(), _out=sys.stdout, _err=sys.stderr, _env=env, _fg=True) make_image = make.bake(j=num_cores) make_image(rule_names) except ErrorReturnCode as e: raise CommandError(e)
def run_docker_dev_test(path, coverage=False): """ Method to check that docker runs with dev.yml """ try: # build django, power up the stack and run the test sh.docker_compose( "--file", "{}/dev.yml".format(path), "build", "django" ) sh.docker_compose("--file", "{}/dev.yml".format(path), "build") if coverage: sh.docker_compose( "--file", "{}/dev.yml".format(path), "run", "django", "coverage", "run", "manage.py", "test" ) sh.docker_compose( "--file", "{}/dev.yml".format(path), "run", "django", "coverage", "xml", "-o", "coverage.xml" ) shutil.copyfile(os.path.join(str(path), ".coverage"), os.path.join(PROJECT_DIR, ".coverage")) shutil.copyfile(os.path.join(str(path), "coverage.xml"), os.path.join(PROJECT_DIR, "coverage.xml")) else: sh.docker_compose( "--file", "{}/dev.yml".format(path), "run", "django", "python", "manage.py", "test" ) # test that the development server is running sh.docker_compose("--file", "{}/dev.yml".format(path), "up", "-d") time.sleep(10) curl = sh.curl("-I", "http://localhost:8000/") assert "200 OK" in curl assert "Server: Werkzeug" in curl # since we are running a lot of tests with different configurations, # we need to clean up the environment. Stop all running containers, # remove them and remove the postgres_data volume. sh.docker_compose("--file", "{}/dev.yml".format(path), "stop") sh.docker_compose("--file", "{}/dev.yml".format(path), "rm", "-f") sh.docker("volume", "rm", "cookiecuttersaastestproject_postgres_data_dev") except sh.ErrorReturnCode as e: # in case there are errors it's good to have full output of # stdout and stderr. pytest.fail("STDOUT: {} \n\n\n STDERR: {}".format( e.stdout.decode("utf-8"), e.stderr.decode("utf-8")) )
def _run_ansible_lint(organization): al_bin = os.path.join(aeriscloud_path, 'venv/bin/ansible-lint') env = ansible_env(os.environ.copy()) if organization: environment_files = glob.glob(get_env_path(organization) + '/*.yml') else: environment_files = glob.glob(organization_path + '/*/*.yml') if not environment_files: return 0 args = environment_files + ['-r', os.path.join(ansible_path, 'rules')] click.echo('Running ansible-lint ... ', nl=False) errors = 0 try: python(al_bin, *args, _env=env, _err_to_out=True, _ok_code=[0]) click.echo('[%s]' % click.style('OK', fg='green')) except ErrorReturnCode as e: parser = re.compile( r'^\[(?P<error_code>[^\]]+)\] (?P<error_message>[^\n]+)\n' r'%s(?P<file_name>[^:]+):(?P<line_number>[0-9]+)\n' r'Task/Handler: (?P<task_name>[^\n]+)\n\n' % (ansible_path + '/'), re.MULTILINE ) click.echo('[%s]\n' % click.style('FAIL', fg='red')) last_file = None pos = 0 while pos < len(e.stdout): match = parser.match(e.stdout, pos) if not match: click.secho("Error: %s" % e.stdout) errors += 1 break error = match.groupdict() if error['file_name'] != last_file: click.secho(' Errors in file: %s' % error['file_name'], fg='blue', bold=True) last_file = error['file_name'] click.echo(' line %s task %s: %s %s' % ( click.style(error['line_number'], fg='green'), click.style(error['task_name'], fg='green'), click.style(error['error_code'], fg='red'), click.style(error['error_message'], fg='red'), )) errors += 1 pos = match.end() return errors
def submit_job(self): """Submit the job to qsub, returns job_id.""" self.logger.info('Submitting job ...') job_script_path = self._get_job_script_path() arg_list = self._build_qsub_args() arg_list.append(job_script_path) self.logger.debug('arg_liste: {}'.format(arg_list)) try: self.logger.debug( '{} {}'.format(self.cfg.path_qsub, arg_list) ) # get stating time and convert self.time_stamp_jobstart = int(time.time()) * 1000 self.logger.debug( 'stat time stamp: {}'.format(self.time_stamp_jobstart) ) # Job submit ssh_output = self.ssh_host(self.cfg.path_qsub, *arg_list) # searching job id for line in ssh_output: self.logger.debug('searching for job id in \n{}'.format(line)) if "hlrs.de" in line: self.logger.debug('possible job id found: {}'.format(line)) self.job_id = str(line) if self.cfg.grafana: self.logger.info( 'Job performance data at:\n' '{}var-JobId=snapTask-{}-{}&' 'from={}&' 'to=now'.format( self.cfg.grafana_base_string, self.cfg.user_name, self.job_id.rstrip(), self.time_stamp_jobstart ) ) return self.logger.error( 'no job id found in \n{}\nexiting!!!'.format(ssh_output) ) exit(1) except ErrorReturnCode as e: self.logger.error('\nError in ssh call:\n{}'.format(e)) print e.stderr exit(1)
def _get_basic_blocks(self): """ Extract basic block information from the target binary using S2E's IDA Pro script. This extraction is done within a temporary directory so that we don't pollute the file system with temporary idbs and other such things. """ logger.info('Generating basic block information from IDA Pro') try: with TemporaryDirectory() as temp_dir: target_path = self._project_desc['target_path'] # Copy the binary to the temporary directory. Because projects # are created with a symlink to the target program, then IDA # Pro will generate the idb and bblist files in the symlinked # target's directory. Which is not what we want target_name = os.path.basename(target_path) temp_target_path = os.path.join(temp_dir, target_name) shutil.copyfile(target_path, temp_target_path) # Run the IDA Pro extractBasicBlocks script env_vars = os.environ.copy() env_vars['TVHEADLESS'] = '1' # This is required if s2e-env runs inside screen env_vars['TERM'] = 'xterm' ida = sh.Command(self._ida_path) ida('-A', '-B', '-S%s' % self.install_path('bin', 'extractBasicBlocks.py'), temp_target_path, _out=os.devnull, _tty_out=False, _cwd=temp_dir, _env=env_vars) # Check that the basic block list file was correctly generated bblist_file = os.path.join(temp_dir, '%s.bblist' % target_name) if not os.path.isfile(bblist_file): raise CommandError('Failed to generate bblist file for ' '%s' % target_name) # Parse the basic block list file # # to_basic_block takes a 3-tuple read from the bblist file and # converts it to a BasicBlock to_basic_block = lambda tup: BasicBlock(int(tup[0], 16), int(tup[1], 16), tup[2]) with open(bblist_file, 'r') as f: return [to_basic_block(l.rstrip().split(' ')) for l in f] except ErrorReturnCode as e: raise CommandError(e)