我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用os.EX_OK。
def manage_events(self, notify): filename = os.path.basename(self.source_path) while True: try: events = notify.read() except KeyboardInterrupt: return os.EX_OK else: LOG.debug("Caught %d events", len(events)) events = self.filter_events(filename, events) descriptions = self.describe_events(events) LOG.debug("Got %d events after filtration: %s", len(descriptions), descriptions) if events: self.output() LOG.info("Config was managed. Going to the next loop.")
def __str__(self): try: assert self.project_name assert self.case_name result = 'PASS' if(self.is_successful( ) == TestCase.EX_OK) else 'FAIL' msg = prettytable.PrettyTable( header_style='upper', padding_width=5, field_names=['test case', 'project', 'duration', 'result']) msg.add_row([self.case_name, self.project_name, self.get_duration(), result]) return msg.get_string() except AssertionError: self.__logger.error("We cannot print invalid objects") return super(TestCase, self).__str__()
def run_tier(self, tier): """Run one tier""" tier_name = tier.get_name() tests = tier.get_tests() if not tests: LOGGER.info("There are no supported test cases in this tier " "for the given scenario") self.overall_result = Result.EX_ERROR else: LOGGER.info("Running tier '%s'", tier_name) for test in tests: self.run_test(test) test_case = self.executed_test_cases[test.get_name()] if test_case.is_successful() != testcase.TestCase.EX_OK: LOGGER.error("The test case '%s' failed.", test.get_name()) if test.get_project() == "functest": self.overall_result = Result.EX_ERROR if test.is_blocking(): raise BlockingTestFailed( "The test case {} failed and is blocking".format( test.get_name())) return self.overall_result
def test_option_repeat_interval(self): """test --retry and --interval options""" # run with --retry, see 2 lines, then kill -INT cmd, output = runCmdOutput(['-p', '7788', '-r'], wait=False, limit=2) cmd.send_signal(signal.SIGINT) self.assertEqual(cmd.wait(), 1) cmd.stdout.close() # run with --retry, see 4 lines, then kill -INT cmd, output = runCmdOutput(['-p', '7788', '-r', '-i', '1'], wait=False, limit=4) cmd.send_signal(signal.SIGINT) self.assertEqual(cmd.wait(), 1) cmd.stdout.close() # invalid --interval option argument (int > 0) cmd, output = runCmdOutput(['-p', '7788', '-i', '0']) self.assertEqual(cmd.returncode, os.EX_USAGE) # --interval option argument ignored if no --retry cmd, output = runCmdOutput(['-p', '7788', '-i', '1000']) self.assertEqual(cmd.returncode, os.EX_OK)
def _instancecheck_impl(self, value, info: Info) -> InfoMsg: if not isinstance(value, str) or value == "": return info.errormsg(self) value = os.path.expanduser(value) if self.allow_std and value == "-" and (self.constraint is None or self.constraint(value)): return info.wrap(True) is_valid = True if os.path.exists(value): if os.path.isfile(value) and os.access(os.path.abspath(value), os.W_OK)\ and (self.constraint is None or self.constraint(value)): return info.wrap(True) return info.errormsg(self) if not self.allow_non_existent: return info.errormsg(self, "File doesn't exist") abs_name = os.path.abspath(value) dir_name = os.path.dirname(abs_name) if os.path.exists(dir_name) and os.access(dir_name, os.EX_OK) and os.access(dir_name, os.W_OK) \ and (self.constraint is None or self.constraint(value)): return info.wrap(True) return info.errormsg(self)
def test_main_list(monkeypatch, capsys, mocked_sysexit, mocked_configure): server_id = pytest.faux.gen_uuid() host = pytest.faux.gen_alphanumeric() username = pytest.faux.gen_alphanumeric() initiator_id = pytest.faux.gen_uuid() tsk = task.ServerDiscoveryTask(server_id, host, username, initiator_id) tsk = tsk.create() monkeypatch.setenv(process.ENV_ENTRY_POINT, "server_discovery") monkeypatch.setenv(process.ENV_TASK_ID, str(tsk._id)) monkeypatch.setattr("sys.argv", ["progname", "--list"]) assert inventory.main() == os.EX_OK mocked_sysexit.assert_not_called() out, _ = capsys.readouterr() arg = json.loads(out) assert arg["new"]["hosts"] == [host] assert arg["_meta"]["hostvars"][host]["ansible_user"] == username
def test_main_host_ok(monkeypatch, capsys, mocked_sysexit, mocked_configure): server_id = pytest.faux.gen_uuid() host = pytest.faux.gen_alphanumeric() username = pytest.faux.gen_alphanumeric() initiator_id = pytest.faux.gen_uuid() tsk = task.ServerDiscoveryTask(server_id, host, username, initiator_id) tsk = tsk.create() monkeypatch.setenv(process.ENV_ENTRY_POINT, "server_discovery") monkeypatch.setenv(process.ENV_TASK_ID, str(tsk._id)) monkeypatch.setattr("sys.argv", ["progname", "--host", host]) assert inventory.main() == os.EX_OK mocked_sysexit.assert_not_called() out, _ = capsys.readouterr() arg = json.loads(out) assert arg["ansible_user"] == username
def run(self): if self.finished: return self.process = subprocess.Popen( [str(self.path)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True ) LOG.info("Run %s. Pid %d", self.path, self.process.pid) self.process.wait() logmethod = LOG.info if self.process.returncode == os.EX_OK \ else LOG.warning logmethod("%s has been finished. Exit code %s", self.path, self.process.returncode) self.stdout = self.process.stdout.read().decode("utf-8") self.stderr = self.process.stderr.read().decode("utf-8") if self.process.returncode != os.EX_OK: raise RuntimeError( "Program {0} has been finished with exit code {1}", self.path, self.process.returncode)
def get_package_version(prefix, connection, package_name): command = "dpkg-query --showformat='${Version}' --show %s" % shlex.quote( package_name) result = await connection.run(command) if result.exit_status != os.EX_OK: click.echo( "{0}package (failed {1}): {2} - {3}".format( prefix, result.exit_status, package_name, result.stderr.strip() ) ) else: click.echo( "{0}package (ok): {1}=={2}".format( prefix, package_name, result.stdout.strip() ) )
def call(self, cmd, **kwargs): print('Running "{}"'.format(cmd), file=sys.stderr) expect = kwargs.pop("expect", [dict(return_codes=[os.EX_OK], stdout=None, stderr=None)]) process = subprocess.Popen(cmd, stdin=kwargs.get("stdin", subprocess.PIPE), stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs) out, err = process.communicate() return_code = process.poll() out = out.decode(sys.stdin.encoding) err = err.decode(sys.stdin.encoding) def match(return_code, out, err, expected): exit_ok = return_code in expected["return_codes"] stdout_ok = re.search(expected.get("stdout") or "", out) stderr_ok = re.search(expected.get("stderr") or "", err) return exit_ok and stdout_ok and stderr_ok if not any(match(return_code, out, err, exp) for exp in expect): print(err) e = subprocess.CalledProcessError(return_code, cmd, output=out) e.stdout, e.stderr = out, err raise e return self.SubprocessResult(out, err, return_code)
def grep(args): filter_args = dict(logGroupName=args.log_group) if args.log_stream: filter_args.update(logStreamNames=[args.log_stream]) if args.pattern: filter_args.update(filterPattern=args.pattern) if args.start_time: filter_args.update(startTime=int(timestamp(args.start_time) * 1000)) if args.end_time: filter_args.update(endTime=int(timestamp(args.end_time) * 1000)) num_results = 0 while True: for event in paginate(clients.logs.get_paginator("filter_log_events"), **filter_args): if "timestamp" not in event or "message" not in event: continue print(str(Timestamp(event["timestamp"])), event["message"]) num_results += 1 if args.follow: time.sleep(1) else: return SystemExit(os.EX_OK if num_results > 0 else os.EX_DATAERR)
def build(config, docs, **kwargs): if not config.pubdir: return ERR_NEEDPUBDIR + "to --build" ready, error = builddir_setup(config) if not ready: return error ready, error = prepare_docs_build_mode(config, docs) if not ready: return error buildsuccess, results = docbuild(config, docs, **kwargs) for x, (buildcode, source) in enumerate(results, 1): if buildcode: logger.info("success (%d of %d) available in %s", x, len(results), source.working.dirname) else: logger.info("FAILURE (%d of %d) available in %s", x, len(results), source.working.dirname) if buildsuccess: return os.EX_OK else: return "Build failed, see logging output in %s." % (config.builddir,)
def publish(config, docs, **kwargs): config.build = True result = build(config, docs, **kwargs) if result != os.EX_OK: return result for x, source in enumerate(docs, 1): logger.info("Publishing (%d of %d) to %s.", x, len(docs), source.output.dirname) # -- swapdirs must raise an error if there are problems # swapdirs(source.working.dirname, source.output.dirname) if os.path.isdir(source.working.dirname): logger.debug("%s removing old directory %s", source.stem, source.working.dirname) shutil.rmtree(source.working.dirname) workingdirs = list(set([x.dtworkingdir for x in docs])) workingdirs.append(config.builddir) post_publish_cleanup(workingdirs) return os.EX_OK
def test_summary_longnames(self): c = self.config names = self.publishDocumentsWithLongNames(5) stdout = io.StringIO() result = tldp.driver.summary(c, file=stdout) self.assertEqual(result, os.EX_OK) stdout.seek(0) data = stdout.read() self.assertTrue('and 4 more' in data) c.verbose = True stdout = io.StringIO() result = tldp.driver.summary(c, file=stdout) self.assertEqual(result, os.EX_OK) stdout.seek(0) data = stdout.read() for name in names: self.assertTrue(name in data)
def test_run(self): c = self.config ex = example.ex_linuxdoc self.add_published('Published-HOWTO', ex) self.add_new('New-HOWTO', ex) self.add_stale('Stale-HOWTO', ex) self.add_orphan('Orphan-HOWTO', ex) self.add_broken('Broken-HOWTO', ex) fullpath = opj(self.tempdir, 'sources', 'New-HOWTO.sgml') argv = self.argv argv.extend(['--publish', 'stale', 'Orphan-HOWTO', fullpath]) exitcode = tldp.driver.run(argv) self.assertEqual(exitcode, os.EX_OK) inv = tldp.inventory.Inventory(c.pubdir, c.sourcedir) self.assertEqual(4, len(inv.published.keys())) self.assertEqual(1, len(inv.broken.keys()))
def is_successful(self): """Interpret the result of the test case. It allows getting the result of TestCase. It completes run() which only returns the execution status. It can be overriden if checking result is not suitable. Returns: TestCase.EX_OK if result is 'PASS'. TestCase.EX_TESTCASE_FAILED otherwise. """ try: assert self.criteria assert self.result is not None if (not isinstance(self.result, str) and not isinstance(self.criteria, str)): if self.result >= self.criteria: return TestCase.EX_OK else: # Backward compatibility # It must be removed as soon as TestCase subclasses # stop setting result = 'PASS' or 'FAIL'. # In this case criteria is unread. self.__logger.warning( "Please update result which must be an int!") if self.result == 'PASS': return TestCase.EX_OK except AssertionError: self.__logger.error("Please run test before checking the results") return TestCase.EX_TESTCASE_FAILED
def push_to_db(self): """Push the results of the test case to the DB. It allows publishing the results and to check the status. It could be overriden if the common implementation is not suitable. The following attributes must be set before pushing the results to DB: * project_name, * case_name, * result, * start_time, * stop_time. Returns: TestCase.EX_OK if results were pushed to DB. TestCase.EX_PUSH_TO_DB_ERROR otherwise. """ try: assert self.project_name assert self.case_name assert self.start_time assert self.stop_time pub_result = 'PASS' if self.is_successful( ) == TestCase.EX_OK else 'FAIL' if ft_utils.push_results_to_db( self.project_name, self.case_name, self.start_time, self.stop_time, pub_result, self.details): self.__logger.info( "The results were successfully pushed to DB") return TestCase.EX_OK else: self.__logger.error("The results cannot be pushed to DB") return TestCase.EX_PUSH_TO_DB_ERROR except Exception: # pylint: disable=broad-except self.__logger.exception("The results cannot be pushed to DB") return TestCase.EX_PUSH_TO_DB_ERROR
def _run(self, args): # pylint: disable=no-self-use """ The built_in function to run a test case """ case_name = args.get('testcase') self._update_logging_ini(args.get('task_id')) try: cmd = "run_tests -t {}".format(case_name) runner = ft_utils.execute_command(cmd) except Exception: # pylint: disable=broad-except result = 'FAIL' LOGGER.exception("Running test case %s failed!", case_name) if runner == os.EX_OK: result = 'PASS' else: result = 'FAIL' env_info = { 'installer': CONST.__getattribute__('INSTALLER_TYPE'), 'scenario': CONST.__getattribute__('DEPLOY_SCENARIO'), 'build_tag': CONST.__getattribute__('BUILD_TAG'), 'ci_loop': CONST.__getattribute__('CI_LOOP') } result = { 'task_id': args.get('task_id'), 'testcase': case_name, 'env_info': env_info, 'result': result } return {'result': result}
def __init__(self): self.executed_test_cases = {} self.overall_result = Result.EX_OK self.clean_flag = True self.report_flag = False self._tiers = tb.TierBuilder( CONST.__getattribute__('INSTALLER_TYPE'), CONST.__getattribute__('DEPLOY_SCENARIO'), pkg_resources.resource_filename('functest', 'ci/testcases.yaml'))
def main(self, **kwargs): """Entry point of class Runner""" if 'noclean' in kwargs: self.clean_flag = not kwargs['noclean'] if 'report' in kwargs: self.report_flag = kwargs['report'] try: if 'test' in kwargs: self.source_rc_file() LOGGER.debug("Test args: %s", kwargs['test']) if self._tiers.get_tier(kwargs['test']): self.run_tier(self._tiers.get_tier(kwargs['test'])) elif self._tiers.get_test(kwargs['test']): result = self.run_test( self._tiers.get_test(kwargs['test'])) if result != testcase.TestCase.EX_OK: LOGGER.error("The test case '%s' failed.", kwargs['test']) self.overall_result = Result.EX_ERROR elif kwargs['test'] == "all": self.run_all() else: LOGGER.error("Unknown test case or tier '%s', or not " "supported by the given scenario '%s'.", kwargs['test'], CONST.__getattribute__('DEPLOY_SCENARIO')) LOGGER.debug("Available tiers are:\n\n%s", self._tiers) return Result.EX_ERROR else: self.run_all() except BlockingTestFailed: pass except Exception: # pylint: disable=broad-except LOGGER.exception("Failures when running testcase(s)") self.overall_result = Result.EX_ERROR if not self._tiers.get_test(kwargs['test']): self.summary(self._tiers.get_tier(kwargs['test'])) LOGGER.info("Execution exit value: %s", self.overall_result) return self.overall_result
def summary(self, tier=None): """To generate functest report showing the overall results""" msg = prettytable.PrettyTable( header_style='upper', padding_width=5, field_names=['env var', 'value']) for env_var in ['INSTALLER_TYPE', 'DEPLOY_SCENARIO', 'BUILD_TAG', 'CI_LOOP']: msg.add_row([env_var, CONST.__getattribute__(env_var)]) LOGGER.info("Deployment description:\n\n%s\n", msg) msg = prettytable.PrettyTable( header_style='upper', padding_width=5, field_names=['test case', 'project', 'tier', 'duration', 'result']) tiers = [tier] if tier else self._tiers.get_tiers() for each_tier in tiers: for test in each_tier.get_tests(): try: test_case = self.executed_test_cases[test.get_name()] except KeyError: msg.add_row([test.get_name(), test.get_project(), each_tier.get_name(), "00:00", "SKIP"]) else: result = 'PASS' if(test_case.is_successful( ) == test_case.EX_OK) else 'FAIL' msg.add_row( [test_case.case_name, test_case.project_name, self._tiers.get_tier_name(test_case.case_name), test_case.get_duration(), result]) for test in each_tier.get_skipped_test(): msg.add_row([test.get_name(), test.get_project(), each_tier.get_name(), "00:00", "SKIP"]) LOGGER.info("FUNCTEST REPORT:\n\n%s\n", msg)
def main(): version_info = 'pyrate version ' + __version__ try: if os.environ.get('TESTOLDIMPORTS'): raise ImportError() import argparse parser = argparse.ArgumentParser() parser.add_argument('build_file', nargs = '?', default = 'build.py', help = 'name of the input file - default: build.py') parser.add_argument('-V', '--version', action = 'version', version = version_info) parser.add_argument('-M', '--makefile', action = 'store_true', help = 'enable makefile mode') parser.add_argument('-o', '--output', nargs = 1, default = None, help = 'name of output build file') args = parser.parse_args() if args.output: args.output = args.output[0] bfn = args.build_file except ImportError: optparse = __import__('optparse') parser = optparse.OptionParser(usage = 'pyrate [options] build_file') parser.add_option('-V', '--version', action='store_true', help = 'display version') parser.add_option('-M', '--makefile', action = 'store_true', help = 'enable makefile mode') parser.add_option('-o', '--output', default = None, help = 'name of output build file', dest='output') (args, posargs) = parser.parse_args() if len(posargs) > 1: sys.stderr.write('too many build_file arguments provided! %s\n' % repr(posargs)) return os.EX_USAGE elif not posargs: posargs = ['build.py'] bfn = posargs[0] if args.version: sys.stderr.write(version_info + '\n') sys.exit(os.EX_OK) generate_build_file(bfn, args.output, args.makefile) ################################################################################ # Externals + helper functions ################################################################################
def test_option_help(self): """test --help option""" cmd, output = runCmdOutput(['--help']) self.assertEqual(cmd.returncode, os.EX_OK)
def test_option_version(self): """test --version option""" cmd, output = runCmdOutput(['--version']) self.assertEqual(cmd.returncode, os.EX_OK)
def test_logging_config(self): """test logging config from file or default""" topdir = os.path.dirname(os.path.dirname(__file__)) # logging config from default os.system('rm %s/logging.conf' % topdir) cmd, output = runCmdOutput(['-p', '7788']) self.assertEqual(cmd.returncode, os.EX_OK) # logging config from file os.system('cp %s/logging.conf.sample %s/logging.conf' % (topdir, topdir)) cmd, output = runCmdOutput(['-p', '7788']) self.assertEqual(cmd.returncode, os.EX_OK)
def parse_args(): to_parse, cmd, capturer = split_args_to_parse() global_argparser = create_argparser() args = global_argparser.parse_args(to_parse) if capturer: return args, cmd, capturer else: global_argparser.print_help() sys.exit(os.EX_OK)
def _instancecheck_impl(self, value, info: Info) -> InfoMsg: if not isinstance(value, str): return info.errormsg(self) is_valid = True if os.path.exists(value): if os.path.isdir(value) and os.access(os.path.abspath(value), os.W_OK)\ and (self.constraint is None or self.constraint(value)): return info.wrap(True) return info.errormsg(self) abs_name = os.path.abspath(value) dir_name = os.path.dirname(abs_name) if os.path.exists(dir_name) and os.access(dir_name, os.EX_OK) and os.access(dir_name, os.W_OK) \ and (self.constraint is None or self.constraint(value)): return info.wrap(True) return info.errormsg(self)
def main(): argument_parser = get_argument_parser() options = argument_parser.parse_args() if options.list_checkers: list_checkers() return program = Program(options) program.check() program.print_issues() errors = [e for e in program.issues if e.level == 'error' or e.level == 'syntax-error'] sys.exit(os.EX_OK if len(errors) == 0 else os.EX_DATAERR)
def check_if_run(): pid = read_file(PIDFILE) current_pid = os.getpid() if pid is None: return if int(pid) > 0 and int(pid) != current_pid: if os.path.exists("/proc/%d" % int(pid)): log("[%s] Already running - keepalive done." % time.ctime()) sys.exit(os.EX_OK)
def mocked_plugin(): patch = unittest.mock.patch("decapod_common.plugins.get_playbook_plugins") with patch as ptch: plugin = unittest.mock.MagicMock() required_mock = unittest.mock.MagicMock() required_mock.pid = 100 required_mock.returncode = os.EX_OK plugin.execute.return_value.__enter__.return_value = required_mock ptch.return_value.get.return_value.return_value = plugin yield required_mock
def test_command_result(proc): proc.options["-c"] = "" result = proc.run() time.sleep(2) assert result.pid assert result.returncode == os.EX_OK assert result.stdout assert result.stdin is None assert not result.alive() assert str(result) assert repr(result)
def test_command_result_running(proc): proc.options["-c"] = "import time; time.sleep(2)" result = proc.run() assert result.pid assert result.returncode is None assert result.alive() assert str(result) assert repr(result) time.sleep(3.5) assert result.pid assert result.returncode == os.EX_OK assert not result.alive()
def execute(self, tsk, stop_ev): # Small hack to prevent execution of callback BEFORE task # happen to arrive into self.data. It is possible because # submitting task into pool is eager. while tsk.id not in self.data: time.sleep(0.1) plugin = self.get_plugin(tsk) with plugin.execute(tsk) as process: tsk = tsk.set_executor_data(platform.node(), process.pid) LOG.info( "Management process for task %s was started. Pid %d", tsk, process.pid ) while not stop_ev.is_set() and process.alive(): stop_ev.wait(0.5) process.stop() LOG.info( "Management process for task %s with PID %d has " "stopped with exit code %d", tsk, process.pid, process.returncode ) if process.returncode != os.EX_OK: raise ChildProcessError( "Process exit with code {0}".format(process.returncode))
def main(): if not PATH_ANSIBLE: sys.exit("Cannot find ansible-playbook executable.") logging.basicConfig( format="%(asctime)s [%(levelname)-5s] %(message)s", level=logging.DEBUG) if not os.path.isfile(ANSIBLE_CONFIG_PATH): LOG.warning("Cannot find Ansible config at %r", ANSIBLE_CONFIG_PATH) for cluster in get_clusters(): if not cluster["configuration"]: LOG.info("Skip cluster %s because it has no servers", cluster["name"]) continue server = random.choice(cluster["configuration"]) server = server["server_id"] server = get_server_by_id(server) commandline = get_ansible_commandline( server["username"], server["ip"], cluster["name"]) LOG.info("Collect from %s", cluster["name"]) LOG.debug("Execute %s", commandline) code = execute(commandline) if code == os.EX_OK: LOG.info("Collected from %s", cluster["name"]) else: LOG.warning("Cannot collect from %s: %d", cluster["name"], code)
def get_ceph_version(prefix, connection, cluster_name): command = "sudo -EHn -- ceph --cluster {0} version".format( shlex.quote(cluster_name)) result = await connection.run(command) if result.exit_status != os.EX_OK: click.echo( "{0}ceph-version (failed {1}): {2}".format( prefix, result.exit_status, result.stderr.strip())) else: click.echo( "{0}ceph-version (ok): {1}".format( prefix, result.stdout.strip()))
def test_dry_run_commands(self): unauthorized_ok = [dict(return_codes=[os.EX_OK]), dict(return_codes=[1, os.EX_SOFTWARE], stderr="UnauthorizedOperation")] self.call("aegea launch unittest --dry-run --no-verify-ssh-key-pem-file", shell=True, expect=unauthorized_ok) self.call("aegea launch unittest --dry-run --spot --no-verify-ssh-key-pem-file", shell=True, expect=unauthorized_ok) self.call("aegea launch unittest --dry-run --duration-hours 1 --no-verify-ssh-key-pem-file", shell=True, expect=unauthorized_ok) self.call("aegea launch unittest --duration 0.5 --min-mem 6 --cores 2 --dry-run --no-verify --client-token t", shell=True, expect=unauthorized_ok) self.call("aegea build_ami i --dry-run --no-verify-ssh-key-pem-file", shell=True, expect=unauthorized_ok)
def test_secrets(self): unauthorized_ok = [dict(return_codes=[os.EX_OK]), dict(return_codes=[1, os.EX_SOFTWARE], stderr="(AccessDenied|NoSuchKey)")] self.call("test_secret=test aegea secrets put test_secret --iam-role aegea.launch", shell=True, expect=unauthorized_ok) self.call("aegea secrets put test_secret --generate-ssh-key --iam-role aegea.launch", shell=True, expect=unauthorized_ok) self.call("aegea secrets ls", shell=True, expect=unauthorized_ok) self.call("aegea secrets ls --json", shell=True, expect=unauthorized_ok) self.call("aegea secrets get test_secret --iam-role aegea.launch", shell=True, expect=unauthorized_ok) self.call("aegea secrets delete test_secret --iam-role aegea.launch", shell=True, expect=unauthorized_ok)
def check_output(self, command, input_data=None, stderr=sys.stderr): logger.debug('Running "%s"', command) ssh_stdin, ssh_stdout, ssh_stderr = self.exec_command(command) if input_data is not None: ssh_stdin.write(input_data) exit_code = ssh_stdout.channel.recv_exit_status() stderr.write(ssh_stderr.read().decode("utf-8")) if exit_code != os.EX_OK: raise Exception('Error while running "{}": {}'.format(command, os.errno.errorcode.get(exit_code))) return ssh_stdout.read().decode("utf-8")
def main(argv): parser = get_parser() # The parser arguments (cfg.args) are accessible everywhere after this call. cfg.args = parser.parse_args() # This initiates the global yml configuration instance so it will be # accessible everywhere after this call. cfg.initiate_config() if not cfg.args.file and not cfg.args.q: eprint("No file provided and not in query mode\n") parser.print_help() sys.exit(os.EX_USAGE) jira, username = jiralogin.get_jira_instance(cfg.args.t) if cfg.args.x or cfg.args.e: if not cfg.args.q: eprint("Arguments '-x' and '-e' can only be used together with '-q'") sys.exit(os.EX_USAGE) if cfg.args.p and not cfg.args.q: eprint("Arguments '-p' can only be used together with '-q'") sys.exit(os.EX_USAGE) if cfg.args.q: filename = get_jira_issues(jira, username) if cfg.args.p: print_status_file(filename) sys.exit(os.EX_OK) elif cfg.args.file is not None: filename = cfg.args.file else: eprint("Trying to run script with unsupported configuration. Try using --help.") sys.exit(os.EX_USAGE) if get_editor(): open_editor(filename) parse_status_file(jira, filename)
def ping_vm0(ovirt_prefix): nt.assert_equals(_ping(ovirt_prefix, VM0_PING_DEST), EX_OK)
def restore_vm0_networking(ovirt_prefix): # Networking may not work after resume. We need this pseudo-test for the # purpose of reviving VM networking by rebooting the VM. We must be # careful to reboot just the guest OS, not to restart the whole VM, to keep # checking for contingent failures after resume. # A better solution might be using a guest OS other than Cirros. if _ping(ovirt_prefix, VM0_PING_DEST) == EX_OK: return host = _vm_host(ovirt_prefix, VM0_NAME) uri = 'qemu+tls://%s/system' % host.name() ret = host.ssh(['virsh', '-c', uri, 'reboot', '--mode', 'acpi', VM0_NAME]) nt.assert_equals(ret.code, EX_OK) # We might want to wait until ssh server inside the VM gets up. But the # interim tests, especially *_recovery, and repeated ssh connection # attempts in host.ssh calls should give enough time.
def script(config, docs, **kwargs): ready, error = prepare_docs_script_mode(config, docs) if not ready: return error file = kwargs.get('file', sys.stdout) print(preamble, file=file) buildsuccess, results = docbuild(config, docs, **kwargs) print(postamble, file=file) for errcode, source in results: if not errcode: logger.error("Could not generate script for %s", source.stem) if buildsuccess: return os.EX_OK else: return "Script generation failed."
def test_run_detail(self): self.add_published('Published-HOWTO', example.ex_linuxdoc) self.add_new('New-HOWTO', example.ex_linuxdoc) self.add_stale('Stale-HOWTO', example.ex_linuxdoc) self.add_orphan('Orphan-HOWTO', example.ex_linuxdoc) self.add_broken('Broken-HOWTO', example.ex_linuxdoc) argv = self.argv argv.append('--detail') exitcode = tldp.driver.run(argv) self.assertEqual(exitcode, os.EX_OK)
def test_run_doctypes(self): exitcode = tldp.driver.run(['--doctypes']) self.assertEqual(exitcode, os.EX_OK)
def test_show_statustypes(self): stdout = io.StringIO() result = tldp.driver.show_statustypes(Namespace(), file=stdout) self.assertEqual(result, os.EX_OK) stdout.seek(0) data = stdout.read() for status in status_types: self.assertTrue(stypes[status] in data)
def test_run_statustypes(self): exitcode = tldp.driver.run(['--statustypes']) self.assertEqual(exitcode, os.EX_OK)
def test_run_summary(self): self.add_published('Published-HOWTO', example.ex_linuxdoc) self.add_new('New-HOWTO', example.ex_linuxdoc) self.add_stale('Stale-HOWTO', example.ex_linuxdoc) self.add_orphan('Orphan-HOWTO', example.ex_linuxdoc) self.add_broken('Broken-HOWTO', example.ex_linuxdoc) argv = self.argv argv.append('--summary') exitcode = tldp.driver.run(argv) self.assertEqual(exitcode, os.EX_OK)