我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pytest.skip()。
def collect(self): import doctest if self.fspath.basename == "conftest.py": module = self.config.pluginmanager._importconftest(self.fspath) else: try: # XXX patch pyimport in pytest._pytest.doctest.DoctestModule module = _patch_pyimport(self.fspath) except ImportError: if self.config.getoption('--cython-ignore-import-errors'): pytest.skip('unable to import module %r' % self.fspath) else: raise # uses internal doctest module parsing mechanism finder = doctest.DocTestFinder() optionflags = get_optionflags(self) checker = None if _get_checker is None else _get_checker() runner = doctest.DebugRunner(verbose=0, optionflags=optionflags, checker=checker) for test in finder.find(module, module.__name__): if test.examples: # skip empty doctests yield DoctestItem(test.name, self, runner, test)
def test_no_underscores_all_dashes(requirements_files=REQUIREMENTS_FILES): if all( not os.path.exists(reqfile) for reqfile in requirements_files ): # pragma: no cover pytest.skip('No requirements files found') for requirement_file in requirements_files: if not os.path.exists(requirement_file): continue for line in get_lines_from_file(requirement_file): if '_' in line: raise AssertionError( 'Use dashes for package names {}: {}'.format( requirement_file, line, ) )
def install_server(db): db.socket = str(db.installation_dir.join('mysql.sock')) db.installation_dir.ensure_dir('tmp') mycnf = db.installation_dir.join('my.cnf') mycnf.write('[mysqld]\n' 'datadir=%(installation_dir)s\n' #'log\n' #'log-error\n' 'skip-networking\n' 'socket=mysql.sock\n' 'pid-file=mysqld.pid\n' 'tmpdir=tmp\n' % { 'installation_dir': db.installation_dir }) subprocess.check_call(['mysql_install_db', '--defaults-file=' + str(mycnf)]) server = subprocess.Popen(['mysqld', '--defaults-file=' + str(mycnf)]) import time, MySQLdb tries = 30 for t in range(tries): try: with db.root() as cur: cur.execute("CREATE USER 'abe'@'localhost' IDENTIFIED BY 'Bitcoin'") return server except MySQLdb.OperationalError as e: if t+1 == tries: raise e time.sleep(1)
def setUp(self): if self._should_be_skipped_due_to_version(): pytest.skip('Test cannot run with Python %s.' % (sys.version.split(' ')[0],)) missing = [] for req in self._test_file.options['requires']: try: __import__(req) except ImportError: missing.append(req) if missing: pytest.skip('Requires %s to be present.' % (','.join(missing),)) if self._test_file.options['except_implementations']: implementations = [ item.strip() for item in self._test_file.options['except_implementations'].split(",") ] implementation = platform.python_implementation() if implementation in implementations: pytest.skip( 'Test cannot run with Python implementation %r' % (implementation, ))
def test_cXY_E0(nr_sites, gamma, rgen, ldim=2): if sys.version_info[:2] == (3, 3) and gamma == -0.5: # Skip this test on Python 3.3 because it fails on Travis (but # only for Python 3.3). eigsh() fails with: # scipy.sparse.linalg.eigen.arpack.arpack.ArpackNoConvergence: # ARPACK error -1: No convergence (xxx iterations, 0/1 # eigenvectors converged) [ARPACK error -14: ZNAUPD did not # find any eigenvalues to sufficient accuracy.] pt.skip("Test fails on Travis for unknown reason") return # Verify that the analytical solution of the ground state energy # matches the numerical value from eigsh() E0 = physics.cXY_E0(nr_sites, gamma) H = physics.sparse_cH(physics.cXY_local_terms(nr_sites, gamma)) # Fix start vector for eigsh() v0 = rgen.randn(ldim**nr_sites) + 1j * rgen.randn(ldim**nr_sites) ev = eigsh(H, k=1, which='SR', v0=v0, return_eigenvectors=False).min() assert abs(E0 - ev) <= 1e-13
def spawn(self, cmd, expect_timeout=10.0): """Run a command using pexpect. The pexpect child is returned. """ pexpect = pytest.importorskip("pexpect", "3.0") if hasattr(sys, 'pypy_version_info') and '64' in platform.machine(): pytest.skip("pypy-64 bit not supported") if sys.platform == "darwin": pytest.xfail("pexpect does not work reliably on darwin?!") if sys.platform.startswith("freebsd"): pytest.xfail("pexpect does not work reliably on freebsd") logfile = self.tmpdir.join("spawn.out").open("wb") child = pexpect.spawn(cmd, logfile=logfile) self.request.addfinalizer(logfile.close) child.timeout = expect_timeout return child
def pytest_runtest_setup(item): # Check if skip or skipif are specified as pytest marks skipif_info = item.keywords.get('skipif') if isinstance(skipif_info, (MarkInfo, MarkDecorator)): eval_skipif = MarkEvaluator(item, 'skipif') if eval_skipif.istrue(): item._evalskip = eval_skipif pytest.skip(eval_skipif.getexplanation()) skip_info = item.keywords.get('skip') if isinstance(skip_info, (MarkInfo, MarkDecorator)): item._evalskip = True if 'reason' in skip_info.kwargs: pytest.skip(skip_info.kwargs['reason']) elif skip_info.args: pytest.skip(skip_info.args[0]) else: pytest.skip("unconditional skip") item._evalxfail = MarkEvaluator(item, 'xfail') check_xfail_no_run(item)
def getoption(self, name, default=notset, skip=False): """ return command line option value. :arg name: name of the option. You may also specify the literal ``--OPT`` option instead of the "dest" option name. :arg default: default value if no option of that name exists. :arg skip: if True raise pytest.skip if option does not exists or has a None value. """ name = self._opt2dest.get(name, name) try: val = getattr(self.option, name) if val is None and skip: raise AttributeError(name) return val except AttributeError: if default is not notset: return default if skip: import pytest pytest.skip("no %r option found" %(name,)) raise ValueError("no option named %r" % (name,))
def collect(self): import doctest if self.fspath.basename == "conftest.py": module = self.config.pluginmanager._importconftest(self.fspath) else: try: module = self.fspath.pyimport() except ImportError: if self.config.getvalue('doctest_ignore_import_errors'): pytest.skip('unable to import module %r' % self.fspath) else: raise # uses internal doctest module parsing mechanism finder = doctest.DocTestFinder() optionflags = get_optionflags(self) runner = doctest.DebugRunner(verbose=0, optionflags=optionflags, checker=_get_checker()) for test in finder.find(module, module.__name__): if test.examples: # skip empty doctests yield DoctestItem(test.name, self, runner, test)
def test_setup_teardown_sequence_numbers(self): if list(self.kwargs.keys()) == ['binary_customisation_script']: pytest.skip('Test only applies to chroot hooks.') generate_build_config._write_cloud_config( open(self.output_file.strpath, 'w'), **self.kwargs) cloud_config = yaml.load(self.output_file.open()) sequence_numbers = {} for stanza in cloud_config['write_files']: sequence_number = stanza['path'].rsplit('/')[-1].split('-')[0] content = base64.b64decode(stanza['content']).decode('utf-8') if '-- chroot --' in content: sequence_numbers['chroot'] = sequence_number elif '-- setup --' in content: sequence_numbers['setup'] = sequence_number elif '-- teardown --' in content: sequence_numbers['teardown'] = sequence_number assert sequence_numbers['setup'] < sequence_numbers['chroot'] assert sequence_numbers['chroot'] < sequence_numbers['teardown']
def test_setup_teardown_content_matches_template(self, hook, monkeypatch): if list(self.kwargs.keys()) == ['binary_customisation_script']: pytest.skip('Test only applies to chroot hooks.') expected_string = '#!/bin/sh\n-- specific test content --' monkeypatch.setattr( generate_build_config, "{}_CONTENT".format(hook.upper()), expected_string) generate_build_config._write_cloud_config( open(self.output_file.strpath, 'w'), **self.kwargs) cloud_config = yaml.load(self.output_file.open()) contents = [base64.b64decode(stanza['content']) for stanza in cloud_config['write_files']] expected_bytes = expected_string.encode('utf-8') assert expected_bytes in contents assert 1 == len( [content for content in contents if expected_bytes == content])
def _get_style_test_options(filename): """ Returns (skip, ignores) for the specifies source file. """ skip = False ignores = [] text = open(filename, 'rb').read().decode('utf-8') # Iterate over lines for i, line in enumerate(text.splitlines()): if i > 20: break if line.startswith('# styletest:'): if 'skip' in line: skip = True elif 'ignore' in line: words = line.replace(',', ' ').split(' ') words = [w.strip() for w in words if w.strip()] words = [w for w in words if (w[1:].isnumeric() and w[0] in 'EWFCN')] ignores.extend(words) return skip, ignores
def test_batch_log_pdf_mask(dist): if dist.get_test_distribution_name() not in ('Normal', 'Bernoulli', 'Categorical'): pytest.skip('Batch pdf masking not supported for the distribution.') d = dist.pyro_dist for idx in range(dist.get_num_test_data()): dist_params = dist.get_dist_params(idx) x = dist.get_test_data(idx) with xfail_if_not_implemented(): batch_pdf_shape = d.batch_shape(**dist_params) + (1,) batch_pdf_shape_broadcasted = d.batch_shape(x, **dist_params) + (1,) zeros_mask = ng_zeros(1) # should be broadcasted to data dims ones_mask = ng_ones(batch_pdf_shape) # should be broadcasted to data dims half_mask = ng_ones(1) * 0.5 batch_log_pdf = d.batch_log_pdf(x, **dist_params) batch_log_pdf_zeros_mask = d.batch_log_pdf(x, log_pdf_mask=zeros_mask, **dist_params) batch_log_pdf_ones_mask = d.batch_log_pdf(x, log_pdf_mask=ones_mask, **dist_params) batch_log_pdf_half_mask = d.batch_log_pdf(x, log_pdf_mask=half_mask, **dist_params) assert_equal(batch_log_pdf_ones_mask, batch_log_pdf) assert_equal(batch_log_pdf_zeros_mask, ng_zeros(batch_pdf_shape_broadcasted)) assert_equal(batch_log_pdf_half_mask, 0.5 * batch_log_pdf)
def test_uint64(self): if sys.maxsize != (1 << 63)-1: pytest.skip('64 bit only') if IS_PYPY and sys.pypy_version_info < (5, 6): pytest.skip('Broken on PyPy<5.6') # buf = struct.pack('QQ', sys.maxsize, sys.maxsize+1) s = BaseSegment(buf) # val = s.read_uint64_magic(0) assert val == sys.maxsize == s.read_uint64(0) assert type(val) is int # val = s.read_primitive(0, ord('Q')) assert val == sys.maxsize == s.read_uint64(0) assert type(val) is int # val = s.read_uint64_magic(8) assert val == sys.maxsize+1 == s.read_uint64(8) if PY3: assert type(val) is int else: assert type(val) is long
def test_copy_pointer(self, schema, benchmark): # this is similar to test_struct, but the struct we set has a very # deep structure, which means that we are effectively measuring the # performance of copy_pointer if schema.__name__ not in ('Capnpy', 'PyCapnp'): pytest.skip('N/A') # #self._make_big_tree() # uncomment this if you want to regenerate the file s = self.BIG_TREE.read("rb") tree = schema.Tree.loads(s) def loop(oldtree): for i in range(1000): new_tree = schema.Tree(oldtree.root) return new_tree new_tree = benchmark(loop, tree) assert new_tree.root.x == 9999
def test_copy_buffer(self, schema, benchmark): # this is not really a dumps, but it is used as a baseline to compare # the performance if schema.__name__ != 'Capnpy': pytest.skip('N/A') # def dumps_N(obj): myobjs = (obj, obj) res = 0 for i in range(self.N): obj = myobjs[i%2] res = obj._seg.buf[:] return res # obj = get_obj(schema) res = benchmark(dumps_N, obj) assert type(res) is six.binary_type
def test_dumps_not_compact(self, schema, benchmark): if schema.__name__ != 'Capnpy': pytest.skip('N/A') # def dumps_N(obj): myobjs = (obj, obj) res = 0 for i in range(self.N): obj = myobjs[i%2] res = obj.dumps() return res # obj = get_obj(schema) container = schema.MyStructContainer(items=[obj, obj]) obj0 = container.items[0] assert not obj0._is_compact() res = benchmark(dumps_N, obj0) assert type(res) is six.binary_type
def test_dumps_not_compact_no_fastpath(self, schema, benchmark): if schema.__name__ != 'Capnpy': pytest.skip('N/A') # def dumps_N(obj): myobjs = (obj, obj) res = 0 for i in range(self.N): obj = myobjs[i%2] res = obj.dumps(fastpath=False) return res # obj = get_obj(schema) container = schema.MyStructContainer(items=[obj, obj]) obj0 = container.items[0] assert not obj0._is_compact() res = benchmark(dumps_N, obj0) assert type(res) is six.binary_type
def test_count(self, test_db): assert 0 == await test_db.test.find().count() await test_db.test.insert_many([{'x': i} for i in range(10)]) assert 10 == await test_db.test.find().count() assert isinstance(await test_db.test.find().count(), int) assert 10 == await test_db.test.find().limit(5).count() assert 10 == await test_db.test.find().skip(5).count() assert 1 == await test_db.test.find({'x': 1}).count() assert 5 == await test_db.test.find({'x': {'$lt': 5}}).count() a = test_db.test.find() b = await a.count() async for _ in a: break assert b == await a.count() assert 0 == await test_db.test.acollectionthatdoesntexist.find().count()
def test_count_with_limit_and_skip(self, test_db): with pytest.raises(TypeError): await test_db.test.find().count('foo') async def check_len(cursor, length): assert len(await cursor.to_list()) == await cursor.count(True) assert length == await cursor.count(True) await test_db.test.insert_many([{'i': i} for i in range(100)]) await check_len(test_db.test.find(), 100) await check_len(test_db.test.find().limit(10), 10) await check_len(test_db.test.find().limit(110), 100) await check_len(test_db.test.find().skip(10), 90) await check_len(test_db.test.find().skip(110), 0) await check_len(test_db.test.find().limit(10).skip(10), 10) await check_len(test_db.test.find().limit(10).skip(95), 5)
def test_profiling_info(self, mongo, test_db): connection = await mongo.get_connection() if connection.is_mongos: pytest.skip('Profiling works only without mongos.') return await test_db.system.profile.drop() await test_db.set_profiling_level(ALL) await test_db.test.find_one() await test_db.set_profiling_level(OFF) info = await test_db.profiling_info() assert isinstance(info, list) assert len(info) >= 1 # These basically clue us in to server changes. assert isinstance(info[0]['responseLength'], int) assert isinstance(info[0]['millis'], int) assert isinstance(info[0]['client'], str) assert isinstance(info[0]['user'], str) assert isinstance(info[0]['ns'], str) assert isinstance(info[0]['op'], str) assert isinstance(info[0]['ts'], datetime.datetime)
def skip_by_version(request, openshift_version): if request.node.cls.tasks.get('version_limits') and openshift_version: lowest_version = request.node.cls.tasks['version_limits'].get('min') highest_version = request.node.cls.tasks['version_limits'].get('max') skip_latest = request.node.cls.tasks['version_limits'].get('skip_latest') too_low = lowest_version and parse_version(lowest_version) > parse_version(openshift_version) too_high = highest_version and parse_version(highest_version) < parse_version(openshift_version) if openshift_version == 'latest': if skip_latest: pytest.skip('This API is not supported in the latest openshift version') elif too_low: pytest.skip('This API is not supported in openshift versions > {}. You are using version {}'.format(lowest_version, openshift_version)) elif too_high: pytest.skip('This API is not supported in openshift versions < {}. You are using version {}'.format(highest_version, openshift_version))
def test_comm_broadcast_op(hetr_device): if hetr_device == 'gpu': pytest.skip('gpu communication broadcast op is not supported.') H = ng.make_axis(length=4, name='height') N = ng.make_axis(length=8, name='batch') weight = ng.make_axis(length=2, name='weight') x = ng.placeholder(axes=[N, H]) # w will be broadcasted to devices w = ng.placeholder(axes=[H, weight]) with ng.metadata(device_id=('0', '1'), parallel=N): dot = ng.dot(x, w) np_x = np.random.randint(100, size=[N.length, H.length]) np_weight = np.random.randint(100, size=[H.length, weight.length]) with ExecutorFactory() as ex: computation = ex.executor(dot, x, w) res = computation(np_x, np_weight) np.testing.assert_array_equal(res, np.dot(np_x, np_weight))
def test_hetr_benchmark(hetr_device, config): pytest.skip('Possible issue only on jenkins, disable until figured out.') """ Description: Test to ensure benchmarks are working. Benchmark used for test is mini_resnet """ from examples.benchmarks.mini_resnet import run_resnet_benchmark c = config run_resnet_benchmark(dataset=c['dataset'], num_iterations=c['iter_count'], n_skip=1, batch_size=c['batch_size'], device_id=c['device_id'], transformer_type='hetr', device=hetr_device, bprop=c['bprop'], batch_norm=c['batch_norm'], visualize=False)
def test_allreduce_hint(hetr_device, config): if hetr_device == 'gpu': if 'gpu' not in ngt.transformer_choices(): pytest.skip("GPUTransformer not available") input = config['input'] device_id = config['device_id'] axis_A = ng.make_axis(length=4, name='axis_A') parallel_axis = ng.make_axis(name='axis_parallel', length=16) with ng.metadata(device=hetr_device, device_id=device_id, parallel=parallel_axis): var_A = ng.variable(axes=[axis_A], initial_value=UniformInit(1, 1)) var_B = ng.variable(axes=[axis_A], initial_value=UniformInit(input, input)) var_B.metadata['reduce_func'] = 'sum' var_B_mean = var_B / len(device_id) var_minus = (var_A - var_B_mean) with closing(ngt.make_transformer_factory('hetr', device=hetr_device)()) as hetr: out_comp = hetr.computation(var_minus) result = out_comp() np_result = np.full((axis_A.length), config['expected_result'], np.float32) np.testing.assert_array_equal(result, np_result)
def test_multiple_gather_ops(hetr_device): if hetr_device == 'gpu': if 'gpu' not in ngt.transformer_choices(): pytest.skip("GPUTransformer not available") pytest.xfail("Failure due to gather recv tensor being returned in wrong shape, " " possible mismatch between op layout and op.tensor layout") H = ng.make_axis(length=2, name='height') W = ng.make_axis(length=4, name='width') x = ng.placeholder(axes=[H, W]) with ng.metadata(device_id=('0', '1'), parallel=W): x_plus_one = x + 1 x_mul_two = x_plus_one * 2 input = np.random.randint(100, size=x.axes.lengths) with closing(ngt.make_transformer_factory('hetr', device=hetr_device)()) as hetr: plus = hetr.computation([x_mul_two, x_plus_one], x) result_mul_two, result_plus_one = plus(input) np.testing.assert_array_equal(result_plus_one, input + 1) np.testing.assert_array_equal(result_mul_two, (input + 1) * 2)
def pytest_configure(config): # when marking argon_disabled for a whole test, but flex_disabled only on one # parametrized version of that test, the argon marking disappeared config.flex_and_argon_disabled = pytest.mark.xfail(config.getvalue("transformer") == "flexgpu" or config.getvalue("transformer") == "argon", reason="Not supported by argon or flex backend", strict=True) config.argon_disabled = pytest.mark.xfail(config.getvalue("transformer") == "argon", reason="Not supported by argon backend", strict=True) config.flex_disabled = pytest.mark.xfail(config.getvalue("transformer") == "flexgpu", reason="Failing test for Flex", strict=True) config.hetr_and_cpu_enabled_only = pytest.mark.xfail(config.getvalue("transformer") != "hetr" and config.getvalue("transformer") != "cpu", reason="Only Hetr/CPU and CPU transformers supported", strict=True) config.flex_skip = pytest.mark.skipif(config.getvalue("transformer") == "flexgpu", reason="Randomly failing test for Flex") config.argon_skip = pytest.mark.skipif(config.getvalue("transformer") == "argon") config.flex_skip_now = pytest.skip if config.getvalue("transformer") == "flexgpu" \ else pass_method config.argon_skip_now = pytest.skip if config.getvalue("transformer") == "argon" \ else pass_method
def test_matmul(self): if not TEST_MATMUL: pytest.skip("matmul is only tested in Python 3.5+") D = {'shape': self.A.shape, 'matvec': lambda x: np.dot(self.A, x).reshape(self.A.shape[0]), 'rmatvec': lambda x: np.dot(self.A.T.conj(), x).reshape(self.A.shape[1]), 'matmat': lambda x: np.dot(self.A, x)} A = interface.LinearOperator(**D) B = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) b = B[0] assert_equal(operator.matmul(A, b), A * b) assert_equal(operator.matmul(A, B), A * B) assert_raises(ValueError, operator.matmul, A, 2) assert_raises(ValueError, operator.matmul, 2, A)
def test_geth_installation_as_function_call(monkeypatch, tmpdir, platform, version): if get_platform() != platform: pytest.skip("Wront platform for install script") base_install_path = str(tmpdir.mkdir("temporary-dir")) monkeypatch.setenv('GETH_BASE_INSTALL_PATH', base_install_path) # sanity check that it's not already installed. executable_path = get_executable_path(version) assert not os.path.exists(executable_path) install_geth(identifier=version, platform=platform) assert os.path.exists(executable_path) monkeypatch.setenv('GETH_BINARY', executable_path) actual_version = get_geth_version() expected_version = semantic_version.Spec(version.lstrip('v')) assert actual_version in expected_version
def test_unsigned_to_eip155_signed_transaction(txn_fixture, transaction_class): if txn_fixture['chainId'] is None: pytest.skip('No chain id for EIP155 signing') elif not hasattr(transaction_class, 'chain_id'): pytest.skip('Transaction class is not chain aware') key = keys.PrivateKey(decode_hex(txn_fixture['key'])) unsigned_txn = transaction_class.create_unsigned_transaction( nonce=txn_fixture['nonce'], gas_price=txn_fixture['gasPrice'], gas=txn_fixture['gas'], to=( to_canonical_address(txn_fixture['to']) if txn_fixture['to'] else b'' ), value=txn_fixture['value'], data=decode_hex(txn_fixture['data']), ) signed_txn = unsigned_txn.as_signed_transaction(key, chain_id=txn_fixture['chainId']) assert is_same_address(signed_txn.sender, key.public_key.to_canonical_address()) assert signed_txn.chain_id == txn_fixture['chainId']
def fixture_vm_class(fixture_data): _, _, fork_name, _ = fixture_data if fork_name == 'Frontier': return FrontierVMForTesting elif fork_name == 'Homestead': return HomesteadVMForTesting elif fork_name == 'EIP150': return EIP150VMForTesting elif fork_name == 'EIP158': return SpuriousDragonVMForTesting elif fork_name == 'Byzantium': return ByzantiumVMForTesting elif fork_name == 'Constantinople': pytest.skip("Constantinople VM has not been implemented") elif fork_name == 'Metropolis': pytest.skip("Metropolis VM has not been implemented") else: raise ValueError("Unknown Fork Name: {0}".format(fork_name))
def sample_result(collector_cls, collector_cls_with_sample_result): if collector_cls != collector_cls_with_sample_result[0]: pytest.skip('this sample result is not for this plugin') result = collector_cls_with_sample_result[-1] return result
def test_plat_name_ext(temp_ext_pkg): try: subprocess.check_call( [sys.executable, 'setup.py', 'bdist_wheel', '--plat-name=testplat.arch'], cwd=str(temp_ext_pkg)) except subprocess.CalledProcessError: pytest.skip("Cannot compile C Extensions") dist_dir = temp_ext_pkg.join('dist') assert dist_dir.check(dir=1) wheels = dist_dir.listdir() assert len(wheels) == 1 assert wheels[0].basename.endswith('-testplat_arch.whl') assert wheels[0].ext == '.whl'
def test_plat_name_ext_in_setupcfg(temp_ext_pkg): temp_ext_pkg.join('setup.cfg').write('[bdist_wheel]\nplat_name=testplat.arch') try: subprocess.check_call( [sys.executable, 'setup.py', 'bdist_wheel'], cwd=str(temp_ext_pkg)) except subprocess.CalledProcessError: pytest.skip("Cannot compile C Extensions") dist_dir = temp_ext_pkg.join('dist') assert dist_dir.check(dir=1) wheels = dist_dir.listdir() assert len(wheels) == 1 assert wheels[0].basename.endswith('-testplat_arch.whl') assert wheels[0].ext == '.whl'
def test_verifying_zipfile(): if not hasattr(zipfile.ZipExtFile, '_update_crc'): pytest.skip('No ZIP verification. Missing ZipExtFile._update_crc.') sio = StringIO() zf = zipfile.ZipFile(sio, 'w') zf.writestr("one", b"first file") zf.writestr("two", b"second file") zf.writestr("three", b"third file") zf.close() # In default mode, VerifyingZipFile checks the hash of any read file # mentioned with set_expected_hash(). Files not mentioned with # set_expected_hash() are not checked. vzf = wheel.install.VerifyingZipFile(sio, 'r') vzf.set_expected_hash("one", hashlib.sha256(b"first file").digest()) vzf.set_expected_hash("three", "blurble") vzf.open("one").read() vzf.open("two").read() try: vzf.open("three").read() except wheel.install.BadWheelFile: pass else: raise Exception("expected exception 'BadWheelFile()'") # In strict mode, VerifyingZipFile requires every read file to be # mentioned with set_expected_hash(). vzf.strict = True try: vzf.open("two").read() except wheel.install.BadWheelFile: pass else: raise Exception("expected exception 'BadWheelFile()'") vzf.set_expected_hash("two", None) vzf.open("two").read()
def check_requirements_integrity(): raw_requirements = _get_all_raw_requirements() if not raw_requirements: raise AssertionError( 'check-requirements expects at least requirements-minimal.txt ' 'and requirements.txt', ) incorrect = [] for req, filename in raw_requirements: version = to_version(req) if version is None: # Not pinned, just skip continue if req.key not in installed_things: raise AssertionError( '{} is required in {}, but is not installed'.format( req.key, filename, ) ) installed_version = to_version(parse_requirement('{}=={}'.format( req.key, installed_things[req.key].version, ))) if installed_version != version: incorrect.append((filename, req.key, version, installed_version)) if incorrect: raise AssertionError( 'Installed requirements do not match requirement files!\n' 'Rebuild your virtualenv:\n{}'.format(''.join( ' - ({}) {}=={} (installed) {}=={}\n'.format( filename, pkg, depped, pkg, installed, ) for filename, pkg, depped, installed in incorrect )) )
def test_requirements_pinned(): raw_requirements = _get_all_raw_requirements() if raw_requirements is None: # pragma: no cover pytest.skip('No requirements files found') unpinned_requirements = find_unpinned_requirements(raw_requirements) if unpinned_requirements: raise AssertionError( 'Unpinned requirements detected!\n\n{}'.format( format_unpinned_requirements(unpinned_requirements), ) )
def create_server(dbtype=None): if dbtype in (None, 'sqlite3', 'sqlite'): return SqliteMemoryDB() if dbtype in ('mysql', 'MySQLdb'): return MysqlDB() if dbtype in ('psycopg2', 'postgres'): return PostgresDB() pytest.skip('Unknown dbtype: %s' % dbtype)
def __init__(db, dbtype): pytest.importorskip(dbtype) import tempfile db.installation_dir = py.path.local(tempfile.mkdtemp(prefix='abe-test-')) print("Created temporary directory %s" % db.installation_dir) try: db.server = db.install_server() except Exception as e: #print("EXCEPTION %s" % e) db._delete_tmpdir() pytest.skip(e) raise DB.__init__(db, dbtype, db.get_connect_args())
def max200(testdb): try: Abe.util.sha3_256('x') except Exception as e: pytest.skip('SHA3 not working: e') dirname = os.path.join(os.path.split(__file__)[0], 'max200') store = testdb.load('--datadir', dirname) return store
def tagschecker(request): tags = set(request.config.getini('TAGS')) tags_marker = request.node.get_marker('tags') xfailtags_marker = request.node.get_marker('xfailtags') skiptags_marker = request.node.get_marker('skiptags') if xfailtags_marker and not tags.isdisjoint(set(xfailtags_marker.args)): request.node.add_marker(pytest.mark.xfail()) elif ( tags_marker and tags.isdisjoint(set(tags_marker.args)) or skiptags_marker and not tags.isdisjoint(set(skiptags_marker.args)) ): pytest.skip('skipped for this tags: {}'.format(tags))
def connection(gremlin_url, event_loop, provider): try: conn = event_loop.run_until_complete( driver.Connection.open( gremlin_url, event_loop, message_serializer=GraphSONMessageSerializer, provider=provider)) except OSError: pytest.skip('Gremlin Server is not running') return conn
def remote_connection(event_loop, gremlin_url): try: remote_conn = event_loop.run_until_complete( DriverRemoteConnection.open(gremlin_url, 'g')) except OSError: pytest.skip('Gremlin Server is not running') else: return remote_conn