我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用pytest.mark()。
def test_k8s_installed_default(self, underlay, k8s_actions): """Test for deploying an k8s environment and check it pytest.mark: k8s_installed_default Scenario: 1. Install k8s. 2. Check number of nodes. 3. Basic check of running containers on nodes. 4. Check requirement base settings. 5. Create nginx pod. 6. Check created pod is reached 7. Delete pod. """ k8s_actions.install_k8s() k8sclient = k8s_actions.api self.check_number_kube_nodes(underlay, k8sclient) self.check_list_required_images( underlay, required_images=self.base_images) self.check_etcd_health(underlay) nginx = self.get_nginx_spec() pod = k8s_actions.check_pod_create(body=nginx) self.check_nginx_pod_is_reached(underlay, pod.status.pod_ip) k8s_actions.check_pod_delete(pod)
def pytest_collection_modifyitems(self, session, config, items): env = config.getoption('--env') # TODO: Really naive, we need add a batch call. Until then, # this is going to be unworkable for anyone outside the # Toronto office... for item in items: for mark in self.get_marks(env=env, name=item.name): name = mark['name'] args = mark.get('args', []) kwargs = mark.get('kwargs', {}) pytest.log.info( "Applying {} mark to {}".format(name, item.name) ) mark = getattr(pytest.mark, name)(*args, **kwargs) item.add_marker(mark) # Proceed with the collection yield
def skip_by_skin_names(request, skin): """ Skip by skin name. We support validation for multi skin applications providing the best page object class match. We expect many failures we want to avoid because many tests will fail because the related page object implementation still not exists. If you want you can omit a test execution for a given skin adding a a ```@pytest.mark.skip_skins(['skin2'])``` decorator on your tests. Tests marked with a skin2 skip will be executed for all skins except for skin2. See http://bit.ly/2dYnOSv for further info. """ if request.node.get_marker('skip_skins'): if skin in request.node.get_marker('skip_skins').args[0]: pytest.skip('skipped on this skin: {}'.format(skin))
def __call__(self, fun): marker_name = self.marker_name or fun.__name__ @pytest.fixture() @wraps(fun) def _inner(request, *args, **kwargs): marker = request.node.get_marker(marker_name) print(request.node) return fun(request, *args, **dict(marker.kwargs, **kwargs)) def options(*args, **kwargs): return getattr(pytest.mark, marker_name)(*args, **kwargs) _inner.options = options _inner.__wrapped__ = fun return _inner
def test_query_one(): db = SqliteStore(config=MockConfig()) # Due to pytest.mark, this test runs after the insert, # so we should have this RegisteredCidr cidr = db.query_one(TEST_CIDR) assert cidr.cidr == TEST_CIDR assert cidr.description == TEST_DESCRIPTION assert cidr.location == TEST_LOCATION assert cidr.owner == TEST_OWNER assert cidr.expiration == TEST_EXPIRATION fake_cidr = db.query_one("FAKE CIDR") assert fake_cidr is None
def pytest_namespace(): return {'mark': MarkGenerator()}
def pytest_addoption(parser): group = parser.getgroup("general") group._addoption( '-k', action="store", dest="keyword", default='', metavar="EXPRESSION", help="only run tests which match the given substring expression. " "An expression is a python evaluatable expression " "where all names are substring-matched against test names " "and their parent classes. Example: -k 'test_method or test_" "other' matches all test functions and classes whose name " "contains 'test_method' or 'test_other'. " "Additionally keywords are matched to classes and functions " "containing extra names in their 'extra_keyword_matches' set, " "as well as functions which have names assigned directly to them." ) group._addoption( "-m", action="store", dest="markexpr", default="", metavar="MARKEXPR", help="only run tests matching given mark expression. " "example: -m 'mark1 and not mark2'." ) group.addoption( "--markers", action="store_true", help="show markers (builtin, plugin and per-project ones)." ) parser.addini("markers", "markers for test functions", 'linelist')
def pytest_cmdline_main(config): import _pytest.config if config.option.markers: config._do_configure() tw = _pytest.config.create_terminal_writer(config) for line in config.getini("markers"): name, rest = line.split(":", 1) tw.write("@pytest.mark.%s:" % name, bold=True) tw.line(rest) tw.line() config._ensure_unconfigure() return 0
def pytest_configure(config): import pytest if config.option.strict: pytest.mark._config = config
def __call__(self, *args, **kwargs): """ if passed a single callable argument: decorate it with mark info. otherwise add *args/**kwargs in-place to mark information. """ if args and not kwargs: func = args[0] is_class = inspect.isclass(func) if len(args) == 1 and (istestfunc(func) or is_class): if is_class: if hasattr(func, 'pytestmark'): mark_list = func.pytestmark if not isinstance(mark_list, list): mark_list = [mark_list] # always work on a copy to avoid updating pytestmark # from a superclass by accident mark_list = mark_list + [self] func.pytestmark = mark_list else: func.pytestmark = [self] else: holder = getattr(func, self.name, None) if holder is None: holder = MarkInfo( self.name, self.args, self.kwargs ) setattr(func, self.name, holder) else: holder.add(self.args, self.kwargs) return func kw = self.kwargs.copy() kw.update(kwargs) args = self.args + args return self.__class__(self.name, args=args, kwargs=kw)
def test_lcm_k8s_scale_up(self, hardware, underlay, k8scluster): """Test for scale an k8s environment pytest.mark: k8s_installed_default Require: - already installed k8s cluster with node roles 'k8s' - fuel-devops environment with additional node roles 'k8s_scale' Scenario: 1. Check number of kube nodes match underlay nodes. 2. Check etcd health. 3. Add to 'underlay' new nodes for k8s scale 4. Run fuel-ccp installer for old+new k8s nodes 5. Check number of kube nodes match underlay nodes. 6. Check etcd health. """ k8sclient = k8scluster.api self.check_number_kube_nodes(underlay, k8sclient) self.check_etcd_health(underlay) config_ssh_scale = hardware.get_ssh_data( roles=[ext.NODE_ROLE.k8s_scale]) underlay.add_config_ssh(config_ssh_scale) k8scluster.install_k8s() self.check_number_kube_nodes(underlay, k8sclient) self.check_etcd_health(underlay)
def test_k8s_installed_with_etcd_on_host(self, underlay, k8s_actions): """Test for deploying an k8s environment and check it pytest.mark: k8s_installed_with_etcd_on_host Scenario: 1. Install k8s with forced etcd on host. 2. Check number of nodes. 3. Basic check of running containers on nodes. 4. Check requirement base settings. 5. Create nginx pod. 6. Check created pod is reached 7. Delete pod. """ kube_settings = dict() kube_settings.update(self.kube_settings) kube_settings.update({ 'etcd_deployment_type': 'host', 'kube_network_plugin': 'calico' }) required_images = filter( lambda x: x != kube_settings.get( 'etcd_image_repo', settings.ETCD_IMAGE_REPO), self.custom_yaml_images) k8s_actions.install_k8s(custom_yaml=kube_settings) k8sclient = k8s_actions.api self.check_number_kube_nodes(underlay, k8sclient) self.check_list_required_images(underlay, required_images=required_images) self.check_etcd_health(underlay) nginx = self.get_nginx_spec() pod = k8s_actions.check_pod_create(body=nginx) self.check_nginx_pod_is_reached(underlay, pod.status.pod_ip) k8s_actions.check_pod_delete(pod)
def test_k8s_installed_with_etcd_in_container(self, underlay, k8s_actions): """Test for deploying an k8s environment and check it pytest.mark: k8s_installed_with_etcd_in_container Scenario: 1. Install k8s with forced etcd in container. 2. Check number of nodes. 3. Basic check of running containers on nodes. 4. Check requirement base settings. 5. Create nginx pod. 6. Check created pod is reached 7. Delete pod. """ kube_settings = dict() kube_settings.update(self.kube_settings) kube_settings.update({ 'etcd_deployment_type': 'docker', 'kube_network_plugin': 'calico', 'etcd_image_repo': settings.ETCD_IMAGE_REPO, 'etcd_image_tag': settings.ETCD_IMAGE_TAG, }) required_images = list(self.base_images) required_images.append(kube_settings['etcd_image_repo']) k8s_actions.install_k8s(custom_yaml=kube_settings) k8sclient = k8s_actions.api self.check_number_kube_nodes(underlay, k8sclient) self.check_list_required_images(underlay, required_images=required_images) self.check_etcd_health(underlay) nginx = self.get_nginx_spec() pod = k8s_actions.check_pod_create(body=nginx) self.check_nginx_pod_is_reached(underlay, pod.status.pod_ip) k8s_actions.check_pod_delete(pod)
def test_k8s_installed_with_ready_ssh_keys(self, ssh_keys_dir, underlay, k8s_actions): """Test for deploying an k8s environment and check it pytest.mark: k8s_installed_with_ready_ssh_keys Scenario: 1. Install k8s (with prepared ssh keys). 2. Check number of nodes. 3. Basic check of running containers on nodes. 4. Check requirement base settings. 5. Create nginx pod. 6. Check created pod is reached 7. Delete pod. """ add_var = { "WORKSPACE": ssh_keys_dir } k8s_actions.install_k8s(env_var=add_var) k8sclient = k8s_actions.api self.check_number_kube_nodes(underlay, k8sclient) self.check_list_required_images( underlay, required_images=self.base_images) self.check_etcd_health(underlay) nginx = self.get_nginx_spec() pod = k8s_actions.check_pod_create(body=nginx) self.check_nginx_pod_is_reached(underlay, pod.status.pod_ip) k8s_actions.check_pod_delete(pod)
def test_ccp_idempotency_with_etcd_on_host(self, config, underlay, k8s_actions): """Test for deploying an k8s environment and check it pytest.mark: k8s_installed_with_etcd_on_host Scenario: 1. Install k8s with forced etcd on host. 2. Check that count of changes in the ansible log is more than 0 3. Re-install k8s with forced etcd on host. 4. Check that count of changes in the ansible log is 0 """ kube_settings = dict() kube_settings.update(self.kube_settings) kube_settings.update({ 'etcd_deployment_type': 'host', 'kube_network_plugin': 'calico' }) result = k8s_actions.install_k8s(custom_yaml=kube_settings, verbose=False) changed = self.get_ansible_changes_count(result.stdout_str) assert changed != 0, "No changes during k8s install!" result = k8s_actions.install_k8s(custom_yaml=kube_settings, verbose=False) changed = self.get_ansible_changes_count(result.stdout_str) assert changed == 0, ( "Should be no changes during the second install " "of k8s while there are '{0}' changes!".format(changed))
def test_ccp_idempotency_with_etcd_in_container(self, config, underlay, k8s_actions): """Test for deploying an k8s environment and check it pytest.mark: k8s_installed_with_etcd_in_container Scenario: 1. Install k8s with forced etcd in container. 2. Check that count of changes in the ansible log is more than 0 3. Re-install k8s with forced etcd in container. 4. Check that count of changes in the ansible log is 0 """ kube_settings = dict() kube_settings.update(self.kube_settings) kube_settings.update({ 'etcd_deployment_type': 'docker', 'kube_network_plugin': 'calico', 'etcd_image_repo': settings.ETCD_IMAGE_REPO, 'etcd_image_tag': settings.ETCD_IMAGE_TAG, }) result = k8s_actions.install_k8s(custom_yaml=kube_settings, verbose=False) changed = self.get_ansible_changes_count(result.stdout_str) assert changed != 0, "No changes during k8s install!" result = k8s_actions.install_k8s(custom_yaml=kube_settings, verbose=False) changed = self.get_ansible_changes_count(result.stdout_str) assert changed == 0, ( "Should be no changes during the second install " "of k8s while there are '{0}' changes!".format(changed))
def test_ccp_idempotency_with_ready_ssh_keys(self, ssh_keys_dir, config, underlay, k8s_actions): """Test for deploying an k8s environment and check it pytest.mark: k8s_installed_with_ready_ssh_keys Scenario: 1. Install k8s (with prepared ssh keys). 2. Check that count of changes in the ansible log is more than 0 3. Re-install k8s (with prepared ssh keys). 4. Check that count of changes in the ansible log is 0 """ add_var = { "WORKSPACE": ssh_keys_dir } result = k8s_actions.install_k8s(env_var=add_var, verbose=False) changed = self.get_ansible_changes_count(result.stdout_str) assert changed != 0, "No changes during k8s install!" result = k8s_actions.install_k8s(env_var=add_var, verbose=False) changed = self.get_ansible_changes_count(result.stdout_str) assert changed == 0, ( "Should be no changes during the second install " "of k8s while there are '{0}' changes!".format(changed))
def case(*ids): """ Decorator to mark tests with testcase ids. ie. @pytestrail.case('C123', 'C12345') :return pytest.mark: """ return pytest.mark.testrail(ids=ids)
def testrail(*ids): """ Decorator to mark tests with testcase ids. ie. @testrail('C123', 'C12345') :return pytest.mark: """ deprecation_msg = ('pytest_testrail: the @testrail decorator is deprecated and will be removed. Please use the ' '@pytestrail.case decorator instead.') warnings.warn(deprecation_msg, DeprecatedTestDecorator) return pytestrail.case(*ids)
def get_marks(self, **params): api_url = urljoin(self.url, '/v1/mark') response = self.session.get(api_url, params=params) return response.json()
def filter_fixtures(all_fixtures, fixtures_base_dir, mark_fn=None, ignore_fn=None): """ Helper function for filtering test fixtures. - `fixtures_base_dir` should be the base directory that the fixtures were collected from. - `mark_fn` should be a function which either returns `None` or a `pytest.mark` object. - `ignore_fn` should be a function which returns `True` for any fixture which should be ignored. """ for fixture_data in all_fixtures: fixture_path = fixture_data[0] fixture_relpath = os.path.relpath(fixture_path, fixtures_base_dir) if ignore_fn: if ignore_fn(fixture_relpath, *fixture_data[1:]): continue if mark_fn is not None: mark = mark_fn(fixture_relpath, *fixture_data[1:]) if mark: yield pytest.param( (fixture_path, *fixture_data[1:]), marks=mark, ) continue yield fixture_data
def _backend(request): backend = request.param request.applymarker(getattr(pytest.mark, backend.__name__)()) inst = backend(**backend.settings_for('test', 'test', request.fixturename)) yield inst inst.unset_all()
def step(function): """Allow pytest -m stepX to run test up to a certain number.""" step_number = len(_steps) + 1 step_only_marker = "step{}only".format(step_number) marker_only = getattr(pytest.mark, step_only_marker) step_marker = "step{}".format(step_number) marker = getattr(pytest.mark, step_marker) def mark_function(marker): marker(function) for mark_step in _steps: mark_step(marker) _steps.append(mark_function) return marker_only(marker(function))
def pytest_configure(config): # register an additional marker config.addinivalue_line( "markers", "skip_skins(skins): mark test to be skipped for the given skin ids" )
def skip_skins(skins): """ Decorator to mark tests to be skipped for the given skin ids. ie. @skip_skins(['skin1', 'skin2']) :return pytest.mark: """ return pytest.mark.skip_skins(skins)
def test_namespace_has_default_and_env_plugins(testdir): p = testdir.makepyfile(""" import pytest pytest.mark """) result = testdir.runpython(p) assert result.ret == 0
def pytest_addoption(parser): group = parser.getgroup("general") group._addoption( '-k', action="store", dest="keyword", default='', metavar="EXPRESSION", help="only run tests which match the given substring expression. " "An expression is a python evaluatable expression " "where all names are substring-matched against test names " "and their parent classes. Example: -k 'test_method or test " "other' matches all test functions and classes whose name " "contains 'test_method' or 'test_other'. " "Additionally keywords are matched to classes and functions " "containing extra names in their 'extra_keyword_matches' set, " "as well as functions which have names assigned directly to them." ) group._addoption( "-m", action="store", dest="markexpr", default="", metavar="MARKEXPR", help="only run tests matching given mark expression. " "example: -m 'mark1 and not mark2'." ) group.addoption( "--markers", action="store_true", help="show markers (builtin, plugin and per-project ones)." ) parser.addini("markers", "markers for test functions", 'linelist')
def decorate_as_label(self, label_type, labels): allure_label_marker = '{prefix}.{label_type}'.format(prefix=ALLURE_LABEL_PREFIX, label_type=label_type) allure_label = getattr(pytest.mark, allure_label_marker) return allure_label(*labels, label_type=label_type)
def decorate_as_link(self, url, link_type, name): allure_link_marker = '{prefix}.{link_type}'.format(prefix=ALLURE_LINK_PREFIX, link_type=link_type) pattern = dict(self.config.option.allure_link_pattern).get(link_type, u'{}') url = pattern.format(url) allure_link = getattr(pytest.mark, allure_link_marker) return allure_link(url, name=name, link_type=link_type)