我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用pytest.exit()。
def pytest_sessionstart(session): # Check options session.config.ctlogger = loggers.module_logger("conftest") session.config.ctlogger.debug("Session start...") if not (session.config.option.setup_scope in ["session", "module", "class", "function"]): session.config.ctlogger.error("Incorrect --setup_scope option.") pytest.exit("Incorrect --setup_scope option.") if not (session.config.option.call_check in ["none", "complete", "fast", "sanity_check_only"]): session.config.ctlogger.error("Incorrect --call_check option.") pytest.exit("Incorrect --call_check option.") if not (session.config.option.teardown_check in ["none", "complete", "fast", "sanity_check_only"]): session.config.ctlogger.error("Incorrect --teardown_check option.") pytest.exit("Incorrect --teardown_check option.") if not (session.config.option.fail_ctrl in ["stop", "restart", "ignore"]): session.config.ctlogger.error("Incorrect --fail_ctrl option.") pytest.exit("Incorrect --fail_ctrl option.") # Define environment session.config.env = common3.Environment(session.config.option)
def check(self): """Check if switch is operational using waiton method. Notes: This mandatory method for all environment classes. """ if not self.status: self.class_logger.info("Skip switch id:%s(%s) check because it's has Off status." % (self.id, self.name)) return status = self.waiton() # Verify Ports table is not empty if self.ui.get_table_ports() == []: if self.opts.fail_ctrl == 'stop': self.class_logger.debug("Exit switch check. Ports table is empty!") pytest.exit('Ports table is empty!') else: self.class_logger.debug("Fail switch check. Ports table is empty!") pytest.fail('Ports table is empty!') return status
def env_init(self, request): """Validate command line options. Args: request(pytest.request): pytest request Returns: testlib.common3.Environment: Environment instance """ if request.config.option.setup_scope not in {"session", "module", "class", "function"}: request.config.ctlogger.error("Incorrect --setup_scope option.") pytest.exit("Incorrect --setup_scope option.") if request.config.option.call_check not in {"none", "complete", "fast", "sanity_check_only"}: request.config.ctlogger.error("Incorrect --call_check option.") pytest.exit("Incorrect --call_check option.") if request.config.option.teardown_check not in {"none", "complete", "fast", "sanity_check_only"}: request.config.ctlogger.error("Incorrect --teardown_check option.") pytest.exit("Incorrect --teardown_check option.") if request.config.option.fail_ctrl not in {"stop", "restart", "ignore"}: request.config.ctlogger.error("Incorrect --fail_ctrl option.") pytest.exit("Incorrect --fail_ctrl option.") request.config.env.testenv_checkstatus = False return request.config.env
def pytest_configure(config): """Hook to check steps consistency.""" if config.option.disable_steps_checker: config.warn('P1', 'Step consistency checker is disabled!') return errors = [] for step_cls in _get_step_classes(): for attr_name in dir(step_cls): if attr_name not in STEPS: continue step_func = six.get_unbound_function(getattr(step_cls, attr_name)) step_func = utils.get_unwrapped_func(step_func) validator = StepValidator(step_func) errors.extend(validator.validate()) if errors: pytest.exit('Some steps are not consistent!\n' + '\n'.join(errors))
def exit_gracefully(signum, frame): raise pytest.exit('Interrupting from SIGTERM')
def _stop_if_necessary(self): try: exc, msg, tb = self._errors.get(False) traceback.print_exception(exc, msg, tb) sys.stderr.flush() if not self.ignore_errors: pytest.exit(msg) except queue.Empty: pass
def get(self, init_start=False, retry_count=5): """Checking OVS Controller. Args: init_start(bool): Flag to start OVS Controller retry_count(int): Retry attempts count """ first_loop = True # Try to restart if necessary at least several times retry = 1 # If fail_ctrl != "restart", restart retries won't be performed if self.opts.fail_ctrl != "restart": retry_count = 1 while retry <= retry_count: try: if first_loop: if init_start: self.start() else: self.waiton() else: self.restart() retry = retry_count + 1 except Exception: self.class_logger.info("Error while checking Ovs Controller %s:%s..." % (self.ipaddr, self.port)) retry += 1 first_loop = False exc_type, exc_value, exc_traceback = sys.exc_info() traceback_message = traceback.format_exception(exc_type, exc_value, exc_traceback) message = "Error while checking Ovs Controller %s:%s:\n%s" % (self.ipaddr, self.port, "".join(traceback_message)) sys.stderr.write(message) sys.stderr.flush() self.class_logger.log(loggers.levels['ERROR'], message) if retry >= retry_count + 1: sys.stderr.write("Could not restart Ovs Controller for %s times. Something goes wrong \n" % (retry_count, )) sys.stderr.flush() if self.opts.fail_ctrl == "stop": pytest.exit(message) else: pytest.fail(message)
def get(self, init_start=False, retry_count=1): """Get or start linux host instance. Args: init_start(bool): Perform switch start operation or not retry_count(int): Number of retries to start(restart) linux host Returns: None or raise an exception. Notes: Also self.opts.fail_ctrl attribute affects logic of this method. fail_ctrl is set in py.test command line options (read py.test --help for more information). """ # If fail_ctrl != "restart", restart retries won't be performed # as restart is not implemented for lhosts, retries makes no sense. # if self.opts.fail_ctrl != "restart": # retry_count = 1 try: if init_start: self.start() else: self.waiton() except KeyboardInterrupt as ex: message = "KeyboardInterrupt while checking device {0}({1})...".format( self.name, self.ipaddr) self.class_logger.info(message) self.sanitize() pytest.exit(message) except Exception: self.class_logger.error( "Error while checking device %s(%s)...", self.name, self.ipaddr) exc_type, exc_value, exc_traceback = sys.exc_info() traceback_message = traceback.format_exception(exc_type, exc_value, exc_traceback) message = "Error while checking device {0}({1}):\n{2}".format( self.name, self.ipaddr, "".join(traceback_message)) sys.stderr.write(message) sys.stderr.flush() pytest.fail(message)
def pytest_configure(config): """Registering plugin. """ if config.option.setup: config.pluginmanager.register(OnsEnvPlugin(), "_onsenv") else: config.ctlogger.error("SETUP") pytest.exit("SETUP")
def stop(self): """Stop the LogCatcher instance and perform resource cleanup.""" self._termination_flag.set() while self._logger_thread.is_alive(): self._logger_thread.join(timeout=0.5) log.info("Waiting for LogCatcher thread to exit") log.info("LogCatcher thread has terminated, bye!")
def start(self): """Start a subprocess This method makes python actually spawn the subprocess and wait for it to finish initializing. """ self._start_subprocess() self._register_stdout_stderr_to_logcatcher() if not self._wait_for_subprocess_to_finish_init(): self.stop() pytest.exit("Failed to start `{}` process".format(self.id))
def stop(self): """Stop ManagedSubprocess instance and perform a cleanup This method makes sure that there are no child processes left after the object destruction finalizes. In case when a process cannot stop on it's own, it's forced to using SIGTERM/SIGKILL. """ self._process.poll() if self._process.returncode is not None: msg_fmt = "`%s` process has already terminated with code `%s`" pytest.exit(msg_fmt % (self.id, self._process.returncode)) return log.info("Send SIGINT to `%s` session leader", self.id) self._process.send_signal(signal.SIGINT) try: self._process.wait(self._EXIT_TIMEOUT / 2.0) except subprocess.TimeoutExpired: log.info("Send SIGTERM to `%s` session leader", self.id) self._process.send_signal(signal.SIGTERM) try: self._process.wait(self._EXIT_TIMEOUT / 2.0) except subprocess.TimeoutExpired: log.info("Send SIGKILL to all `%s` processess", self.id) os.killpg(os.getpgid(self._process.pid), signal.SIGKILL) log.info("wait() for `%s` session leader to die", self.id) self._process.wait() log.info("`%s` session leader has terminated", self.id)
def test_exit_propagates(self, testdir): try: testdir.runitem(""" import pytest def test_func(): raise pytest.exit.Exception() """) except pytest.exit.Exception: pass else: pytest.fail("did not raise")
def test_pytest_exit(): try: pytest.exit("hello") except pytest.exit.Exception: excinfo = py.code.ExceptionInfo() assert excinfo.errisinstance(KeyboardInterrupt)
def pytest_collection_modifyitems(session, items): """Add marker to test name, if test marked with `idempotent_id` marker. If optional kwargs passed - test parameters should be a superset of this kwargs to mark be applied. Also kwargs can be passed as `params` argument. """ ids = defaultdict(list) for item in items: test_id = get_item_id(item) ids[test_id].append(item) if test_id is not None: item.name += '[id-{}]'.format(test_id) if session.config.option.check_idempotent_id: errors = [] without_id = ids.pop(None, []) if without_id: errors += ["Tests without idempotent_id:"] errors += [' ' + x.nodeid for x in without_id] for test_id, items in ids.items(): if len(items) > 1: errors += ["Single idempotent_id for many cases:"] errors += [' ' + x.nodeid for x in items] if errors: print('') print('\n'.join(errors)) pytest.exit('Errors with idempotent_id')
def pytest_collection_modifyitems(config, items): """Hook to detect forbidden calls inside test.""" if config.option.disable_steps_checker: config.warn('P1', 'Permitted calls checker is disabled!') return errors = [] for item in items: permitted_calls = PERMITTED_CALLS + STEPS + item.funcargnames validator = TestValidator(item.function, permitted_calls) errors.extend(validator.validate()) if errors: pytest.exit("Only steps and fixtures must be called in test!\n" + '\n'.join(errors))
def pytest_collection_modifyitems(config, items): """Hook to prevent run destructive tests without snapshot_name.""" stop_destructive = (config.option.snapshot_name is None and not config.option.force_destructive) for item in items: if item.get_marker(DESTRUCTIVE) and stop_destructive: pytest.exit("You try to start destructive tests without passing " "--snapshot-name for cloud reverting. Such tests can " "break your cloud. To run destructive tests without " "--snapshot-name you should pass --force-destructive " "argument.")
def pytest_runtest_call(): """ This function must be run after setup_module(), it does standarized post setup routines. It is only being used for the 'topology-only' option. """ # pylint: disable=E1101 # Trust me, 'config' exists. if pytest.config.getoption('--topology-only'): tgen = get_topogen() if tgen is not None: # Allow user to play with the setup. tgen.mininet_cli() pytest.exit('the topology executed successfully')
def pytest_configure(config): "Assert that the environment is correctly configured." if not diagnose_env(): pytest.exit('enviroment has errors, please read the logs')
def test_history_id(): """ >>> allure_report = getfixture('allure_report') >>> assert_that(allure_report, ... has_test_case('test_history_id', ... has_history_id() ... ) ... ) """ pass # pytest.exit("Just test")
def _connect_to_switch(self, prompt, timeout=20): """SSH connect to switch and wait until prompt string appeared. Args: prompt(str): expected CLI prompt. timeout(int): connection timeout. Returns: None Examples:: self._connect_to_switches(sw_keys=1, prompt="Switch ") """ cli_start_path = '' self.suite_logger.debug("Login on switch with login: {0} and expected prompt is: {1}".format(self.login, prompt)) self.conn.login(self.login, self.passw, timeout=self.timeout) self.suite_logger.debug("Create Shell") self.conn.open_shell(raw_output=True) self.conn.shell.settimeout(self.timeout) # lxc: run command "python main.py" if self.devtype == 'lxc': self.suite_logger.debug("Launched CLI on LXC") if os.path.exists(os.path.join(self.build_path, self.img_path, 'main.py')) is True: cli_start_path = os.path.join(self.build_path, self.img_path) self.conn.shell_command('cd %s && python main.py -a %s -p %s' % (cli_start_path, self.ipaddr, self.xmlrpcport), timeout=5, ret_code=False, quiet=True) else: self.suite_logger.error("Path to CLI image does not exist: %s" % (os.path.join(cli_start_path, 'main.py'))) pytest.exit("Path to CLI image does not exist: %s" % (os.path.join(cli_start_path, 'main.py'))) else: self.suite_logger.debug("Waiting for CLI on Real switch") alter = [] # Add one or few expected prompt(s) and action(s) to alternatives list if isinstance(prompt, str): prompt = [prompt] for single_prompt in prompt: alter.append((single_prompt, None, True, False)) iterations = 12 for i in range(iterations): time.sleep(10) login = self.conn.action_on_connect(self.conn.shell, alternatives=alter, timeout=timeout, is_shell=self.is_shell) if any(login.find(str(single_prompt)) != -1 for single_prompt in prompt): break else: self.suite_logger.debug("Waiting, current prompt is {0}".format(login)) if i == iterations: login = self.conn.action_on_expect(self.conn.shell, alternatives=alter, timeout=timeout, is_shell=self.is_shell)
def waiton(self, timeout=DEFAULT_SERVER_WAIT_ON_TIMEOUT): """Wait until device is fully operational. Args: timeout(int): Wait timeout Raises: SwitchException: device doesn't response Returns: dict: Status dictionary from probe method or raise an exception. """ status = None message = "Waiting until device {0}({1}) is up.".format(self.name, self.ipaddr) self.class_logger.info(message) stop_flag = False end_time = time.time() + timeout while not stop_flag: if loggers.LOG_STREAM: sys.stdout.write(".") sys.stdout.flush() if time.time() < end_time: # While time isn't elapsed continue probing switch. try: status = self.probe() except KeyboardInterrupt: message = "KeyboardInterrupt while checking switch {0}({1})...".format( self.name, self.ipaddr) self.class_logger.info(message) self.sanitize() pytest.exit(message) if status["isup"]: stop_flag = True else: # Time is elapsed. port = self._get_port_for_probe() message = "Timeout exceeded. IP address {0} port {1} doesn't respond.".format( self.ipaddr, port) self.class_logger.warning(message) raise Exception(message) if not stop_flag: time.sleep(0.75) return status
def waiton(self, timeout=90): """Wait until switch if fully operational. Args: timeout(int): Wait timeout Raises: SwitchException: device doesn't response Returns: dict: Status dictionary from probe method or raise an exception. """ status = None message = "Waiting until switch %s(%s) is up." % (self.name, self.ipaddr) self.class_logger.info(message) stop_flag = False end_time = time.time() + timeout while not stop_flag: if loggers.LOG_STREAM: sys.stdout.write(".") sys.stdout.flush() if time.time() < end_time: # While time isn't elapsed continue probing switch. try: status = self.probe() except KeyboardInterrupt: message = "KeyboardInterrupt while checking switch %s(%s)..." % (self.name, self.ipaddr) self.class_logger.info(message) self.sanitize() pytest.exit(message) if status["isup"] and status["type"] == "switchpp": stop_flag = True if status['prop'] == {}: # If type == switchpp but prop == empty dict then Platform table return incorrect data. message = "Found running switchpp on %s but Platform data is corrupted." % (self.ipaddr, ) self.class_logger.warning(message) raise SwitchException(message) self.class_logger.info("Switch instance on %s(%s) is OK." % (self.name, self.ipaddr)) else: # Time is elapsed. if status["isup"] and status["type"] != "switchpp": message = ("Port %s on host %s is opened but doesn't response queries." + " %s Check your environment!") % (self.port, self.ipaddr, self.waiton_err_message) else: port = self._get_port_for_probe() message = "Timeout exceeded. IP address %s port %s doesn't respond" % (self.ipaddr, port) self.class_logger.warning(message) raise SwitchException(message) if not stop_flag: time.sleep(0.75) return status
def get(self, init_start=False, retry_count=7): """Get or start switch instance. Args: init_start(bool): Perform switch start operation or not retry_count(int): Number of retries to start(restart) switch Returns: None or raise an exception. Notes: Also self.opts.fail_ctrl attribute affects logic of this method. fail_ctrl is set in py.test command line options (read py.test --help for more information). """ # If fail_ctrl != "restart", restart retries won't be performed if self.opts.fail_ctrl != "restart": retry_count = 1 for retry in range(retry_count): try: if retry == 0: if init_start: self.start() else: self.waiton() else: self.restart(mode=self.default_restart_type) break except KeyboardInterrupt: message = "KeyboardInterrupt while checking switch %s(%s)..." % (self.name, self.ipaddr) self.class_logger.info(message) self.sanitize() pytest.exit(message) except Exception: self.class_logger.warning("Error while checking switch %s(%s)..." % (self.name, self.ipaddr)) retry += 1 exc_type, exc_value, exc_traceback = sys.exc_info() traceback_message = traceback.format_exception(exc_type, exc_value, exc_traceback) message = "Error while checking switch %s(%s):\n%s" % (self.name, self.ipaddr, "".join(traceback_message)) sys.stderr.write(message) sys.stderr.flush() self.class_logger.error(message) if retry > 4: self.class_logger.warning( "Could not complete switch start method for the fourth time. Trying to " "reset the DB...") self.db_corruption = True if retry >= retry_count + 1: message = "Could not complete start switch method after {0} retries. Something " \ "went wrong...\n".format(retry_count) sys.stderr.write(message) sys.stderr.flush() self.class_logger.error(message) if self.opts.fail_ctrl != "ignore": pytest.exit(message) else: pytest.fail(message)