我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pytest.fail()。
def test_ufodiff_delta_class_instantiation_branch_with_ufo_filter(): make_testing_branch() try: deltaobj = Delta('.', ['Font-Regular.ufo'], is_branch_test=True, compare_branch_name='testing_branch') assert deltaobj.is_commit_test is False assert deltaobj.is_branch_test is True assert deltaobj.compare_branch_name == "testing_branch" assert deltaobj.commit_number == "0" assert len(deltaobj.ufo_directory_list) == 1 assert deltaobj.ufo_directory_list[0] == "Font-Regular.ufo" except Exception as e: delete_testing_branch() pytest.fail(e) delete_testing_branch()
def _runTest(self): self._linter.check([self._test_file.module]) expected_messages, expected_text = self._get_expected() received_messages, received_text = self._get_received() if expected_messages != received_messages: msg = ['Wrong results for file "%s":' % (self._test_file.base)] missing, unexpected = multiset_difference(expected_messages, received_messages) if missing: msg.append('\nExpected in testdata:') msg.extend(' %3d: %s' % msg for msg in sorted(missing)) if unexpected: msg.append('\nUnexpected in testdata:') msg.extend(' %3d: %s' % msg for msg in sorted(unexpected)) pytest.fail('\n'.join(msg)) self._check_output_text(expected_messages, expected_text, received_text)
def test_connect(): n1 = WithParameters(a=1, b=2) n2 = WithParameters(c=1, d=2) g = graph.Graph('testgraph', [n1, n2]) not_included = WithParameters(e=1, f=2) # Errors when trying to link nodes coming from out of the graph with pytest.raises(exceptions.NodeConnectionError): g.connect(n1, not_included, 'blabla') pytest.fail() with pytest.raises(exceptions.NodeConnectionError): g.connect(not_included, n2, 'blabla') pytest.fail() # Now the correct procedure assert n2.output_label is None assert len(g.nxgraph.edges()) == 0 g.connect(n1, n2, 'label') assert n2.output_label == 'label' assert len(g.nxgraph.edges()) == 1
def pytest_runtest_item(self, item): lines1 = self.get_open_files() yield if hasattr(sys, "pypy_version_info"): gc.collect() lines2 = self.get_open_files() new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1]) leaked_files = [t for t in lines2 if t[0] in new_fds] if leaked_files: error = [] error.append("***** %s FD leakage detected" % len(leaked_files)) error.extend([str(f) for f in leaked_files]) error.append("*** Before:") error.extend([str(f) for f in lines1]) error.append("*** After:") error.extend([str(f) for f in lines2]) error.append(error[0]) error.append("*** function %s:%s: %s " % item.location) pytest.fail("\n".join(error), pytrace=False) # XXX copied from execnet's conftest.py - needs to be merged
def _addexcinfo(self, rawexcinfo): # unwrap potential exception info (see twisted trial support below) rawexcinfo = getattr(rawexcinfo, '_rawexcinfo', rawexcinfo) try: excinfo = _pytest._code.ExceptionInfo(rawexcinfo) except TypeError: try: try: l = traceback.format_exception(*rawexcinfo) l.insert(0, "NOTE: Incompatible Exception Representation, " "displaying natively:\n\n") pytest.fail("".join(l), pytrace=False) except (pytest.fail.Exception, KeyboardInterrupt): raise except: pytest.fail("ERROR: Unknown Incompatible Exception " "representation:\n%r" %(rawexcinfo,), pytrace=False) except KeyboardInterrupt: raise except pytest.fail.Exception: excinfo = _pytest._code.ExceptionInfo() self.__dict__.setdefault('_excinfo', []).append(excinfo)
def test_binary_hook_sequence_is_lower_than_030( self, write_cloud_config_in_memory, monkeypatch): # That's the lowest sequence of disks and we want to filter before that filter_template = '-- binary hook filter:{} --' hook_filter = 'some*glob' monkeypatch.setattr( generate_build_config, "BINARY_HOOK_FILTER_CONTENT", filter_template) output = write_cloud_config_in_memory(binary_hook_filter=hook_filter) cloud_config = yaml.load(output) expected_content = filter_template.format(hook_filter) for stanza in cloud_config['write_files']: content = base64.b64decode(stanza['content']).decode('utf-8') if content == expected_content: break else: pytest.fail('Binary hook filter not correctly included.') path = stanza['path'].rsplit('/')[-1] assert path < '030-some-file.binary'
def assert_vec(vec, x, y, z, msg=''): """Asserts that Vec is equal to (x,y,z).""" # Don't show in pytest tracebacks. __tracebackhide__ = True # Ignore slight variations if not vec.x == pytest.approx(x): failed = 'x' elif not vec.y == pytest.approx(y): failed = 'y' elif not vec.z == pytest.approx(z): failed = 'z' else: # Success! return new_msg = "{!r} != ({}, {}, {})".format(vec, failed, x, y, z) if msg: new_msg += ': ' + msg pytest.fail(new_msg)
def test_collect(self): from dallinger.experiments import Bartlett1932 exp = Bartlett1932() existing_uuid = "12345-12345-12345-12345" data = exp.collect(existing_uuid, recruiter=u'bots') assert isinstance(data, dallinger.data.Data) dataless_uuid = "ed9e7ddd-3e97-452d-9e34-fee5d432258e" dallinger.data.register(dataless_uuid, 'https://bogus-url.com/something') try: data = exp.collect(dataless_uuid, recruiter=u'bots') except RuntimeError: # This is expected for an already registered UUID with no accessible data pass else: pytest.fail('Did not raise RuntimeError for existing UUID') # In debug mode an unknown UUID fails unknown_uuid = "c85d5086-2fa7-4baf-9103-e142b9170cca" with pytest.raises(RuntimeError): data = exp.collect(unknown_uuid, mode=u'debug', recruiter=u'bots')
def test_index_2dsphere(self, test_db): assert 'geo_2dsphere' == await test_db.test.create_index([('geo', GEOSPHERE)]) for dummy, info in (await test_db.test.index_information()).items(): field, idx_type = info['key'][0] if field == 'geo' and idx_type == '2dsphere': break else: pytest.fail('2dsphere index not found.') poly = {'type': 'Polygon', 'coordinates': [[[40, 5], [40, 6], [41, 6], [41, 5], [40, 5]]]} query = {'geo': {'$within': {'$geometry': poly}}} # This query will error without a 2dsphere index. async with test_db.test.find(query) as cursor: async for _ in cursor: pass
def test_forbidden(self, client, media_type): headers = {'Accept': media_type} expected_body = { 'title': 'Request denied', 'description': ('You do not have write permissions for this ' 'queue.'), 'link': { 'text': 'Documentation related to this error', 'href': 'http://example.com/api/rbac', 'rel': 'help', }, } response = client.simulate_post(path='/fail', headers=headers) assert response.status == falcon.HTTP_403 assert response.json == expected_body
def test_epic_fail_json(self, client): headers = {'Accept': 'application/json'} expected_body = { 'title': 'Internet crashed', 'description': 'Catastrophic weather event due to climate change.', 'code': 8733224, 'link': { 'text': 'Drill baby drill!', 'href': 'http://example.com/api/climate', 'rel': 'help', }, } response = client.simulate_put('/fail', headers=headers) assert response.status == falcon.HTTP_792 assert response.json == expected_body
def test_epic_fail_xml(self, client, media_type): headers = {'Accept': media_type} expected_body = ('<?xml version="1.0" encoding="UTF-8"?>' + '<error>' + '<title>Internet crashed</title>' + '<description>' + 'Catastrophic weather event due to climate change.' + '</description>' + '<code>8733224</code>' + '<link>' + '<text>Drill baby drill!</text>' + '<href>http://example.com/api/climate</href>' + '<rel>help</rel>' + '</link>' + '</error>') response = client.simulate_put(path='/fail', headers=headers) assert response.status == falcon.HTTP_792 try: et.fromstring(response.content.decode('utf-8')) except ValueError: pytest.fail() assert response.text == expected_body
def test_actxmgr_exception_nested(): @aiotools.actxmgr async def simple_ctx(msg): yield msg try: async with simple_ctx('hello') as msg1: async with simple_ctx('world') as msg2: assert msg1 == 'hello' assert msg2 == 'world' raise IndexError('bomb1') except BaseException as exc: assert isinstance(exc, IndexError) assert 'bomb1' == exc.args[0] else: pytest.fail()
def test_actxmgr_exception_chained(): @aiotools.actxmgr async def simple_ctx(msg): try: await asyncio.sleep(0) yield msg except Exception as e: await asyncio.sleep(0) # exception is chained raise ValueError('bomb2') from e try: async with simple_ctx('hello') as msg: assert msg == 'hello' raise IndexError('bomb1') except BaseException as exc: assert isinstance(exc, ValueError) assert 'bomb2' == exc.args[0] assert isinstance(exc.__cause__, IndexError) assert 'bomb1' == exc.__cause__.args[0] else: pytest.fail()
def test(): t = timestamp_from_string("423423423") d = datetime(1983, 6, 2, 17, 37, 3, tzinfo=pytz.utc) assert t == d try: t = timestamp_from_string("asd") except ValueError: pass else: pytest.fail("This call should raise a ValueError") t = timestamp_from_string("0") d = datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc) assert t == d t = timestamp_from_string("-1") d = datetime(1969, 12, 31, 23, 59, 59, tzinfo=pytz.utc) assert t == d
def test(): packages = list_all() assert packages is not None assert isinstance(packages, list) assert len(packages) > 0 found = False for pkg in packages: if pkg._key == "rapydo-utils": found = True if not found: pytest.fail("rapydo.utils not found in the list of packages") ver = check_version("rapydo-utils") assert ver is not None ver = check_version("rapydo-blabla") assert ver is None install("docker-compose")
def test_repeatable_one_zero_rr_conflicts(): """ Check that translations of B+ and B* don't produce R/R conflict. """ grammar = """ S: A B+ C; S: A B* D; A:; B:; C:; D:; """ g = Grammar.from_string(grammar, _no_check_recognizers=True) # Check if parser construction raises exception try: Parser(g) except RRConflicts: pytest.fail("R/R conflicts not expected here.")
def test_exceptions_in_server_bubble_up(self): """ The callbacks thrown in the server callback bubble up to the caller. """ class SentinelException(Exception): pass def server_callback(*args): raise SentinelException() def client_callback(*args): # pragma: nocover pytest.fail("Should not be called") client = self._client_connection(callback=client_callback, data=None) server = self._server_connection(callback=server_callback, data=None) with pytest.raises(SentinelException): handshake_in_memory(client, server)
def test_gateway_status_busy(self, gw): numchannels = gw.remote_status().numchannels ch1 = gw.remote_exec("channel.send(1); channel.receive()") ch2 = gw.remote_exec("channel.receive()") ch1.receive() status = gw.remote_status() assert status.numexecuting == 2 # number of active execution threads assert status.numchannels == numchannels + 2 ch1.send(None) ch2.send(None) ch1.waitclose() ch2.waitclose() for i in range(10): status = gw.remote_status() if status.numexecuting == 0: break else: pytest.fail("did not get correct remote status") # race condition assert status.numchannels <= numchannels
def test_popen_filetracing(self, testdir, monkeypatch, makegateway): tmpdir = testdir.tmpdir monkeypatch.setenv("TMP", tmpdir) monkeypatch.setenv("TEMP", tmpdir) # windows monkeypatch.setenv('EXECNET_DEBUG', "1") gw = makegateway("popen") # hack out the debuffilename fn = gw.remote_exec( "import execnet;channel.send(execnet.gateway_base.fn)" ).receive() slavefile = py.path.local(fn) assert slavefile.check() slave_line = "creating slavegateway" for line in slavefile.readlines(): if slave_line in line: break else: py.test.fail("did not find %r in tracefile" % (slave_line,)) gw.exit()
def test_can_have_same_cors_configurations(self, sample_app): @sample_app.route('/cors', methods=['GET'], cors=True) def cors(): pass @sample_app.route('/cors', methods=['PUT'], cors=True) def same_cors(): pass try: validate_routes(sample_app.routes) except ValueError: pytest.fail( 'A ValueError was unexpectedly thrown. Applications ' 'may have multiple view functions that share the same ' 'route and CORS configuration.' )
def test_can_have_one_cors_configured_and_others_not(self, sample_app): @sample_app.route('/cors', methods=['GET'], cors=True) def cors(): pass @sample_app.route('/cors', methods=['PUT']) def no_cors(): pass try: validate_routes(sample_app.routes) except ValueError: pytest.fail( 'A ValueError was unexpectedly thrown. Applications ' 'may have multiple view functions that share the same ' 'route but only one is configured for CORS.' )
def get_isis_topology(self, instance=None): """Get IS-IS topology table. Returns: list[dict]: List of dictionary with keys: vertex, type, metric, next_hop, interface, parent """ if instance: try: topology_output = instance.cli_send_command('vtysh -c "show isis topology"').stdout except UICmdException as ex: self.class_logger.error(ex) pytest.fail('Failed to get IS-IS topology, node id {}.'.format( instance.switch.id)) table = list(self.parse_isis_table_topology(topology_output.strip().splitlines()[4:])) for record in table: record["interface"] = instance.name_to_portid_map.get(record["interface"]) return table else: raise UIException("UI instance isn't specified.")
def get_node_hostname(self, instance=None): """Get node's hostname. Returns: str: Hostname of node. """ if instance: try: hostname_output = instance.cli_send_command('hostname').stdout except UICmdException as ex: self.class_logger.error(ex) pytest.fail('Failed to get hostname, node id {}.'.format( instance.switch.id)) return hostname_output.strip() else: raise UIException("UI instance isn't specified.")
def is_row_added_to_l2multicast_table(mac_address=None, port_id=None, vlan_id=1, switch_instance=None, result=True): """Check if row with specified parameters added to L2Multicast table. """ # Need to wait until entry will be added to L2Multicast table. time.sleep(1) table = switch_instance.getprop_table("L2Multicast") is_entry_added = False if table: for row in table: if (row['macAddress'] == mac_address) and (row['portId'] == port_id) and (row['vlanId'] == vlan_id): is_entry_added = True if is_entry_added == result: return True else: if is_entry_added is False: pytest.fail("Entry is not added to L2Multicast table!") if is_entry_added is True: pytest.fail("Entry is added to L2Multicast table (should not be)!")
def console_clear_config(self): """Clear device configuration using console connection """ cmd = [ "from xmlrpclib import ServerProxy", "rc = -1", "rc = ServerProxy('http://127.0.0.1:8081/RPC2').nb.clearConfig()", "print 'clearConfig() returnCode={0}.'.format(rc)"] command = "python -c \"" + "; ".join(cmd) + "\"" output, err, _ = self.telnet.exec_command(command.encode("ascii")) if err: message = "Cannot perform clearConfig.\nCommand: %s.\nStdout: %s\nStdErr: %s" % (command, output, err) self.class_logger.error(message) self.db_corruption = True pytest.fail(message) else: if "returnCode=0." in output: self.class_logger.debug("ClearConfig finished. StdOut:\n%s" % (output, )) else: message = "ClearConfig failed. StdOut:\n%s" % (output, ) self.class_logger.error(message) pytest.fail(message)
def check(self): """Check if switch is operational using waiton method. Notes: This mandatory method for all environment classes. """ if not self.status: self.class_logger.info("Skip switch id:%s(%s) check because it's has Off status." % (self.id, self.name)) return status = self.waiton() # Verify Ports table is not empty if self.ui.get_table_ports() == []: if self.opts.fail_ctrl == 'stop': self.class_logger.debug("Exit switch check. Ports table is empty!") pytest.exit('Ports table is empty!') else: self.class_logger.debug("Fail switch check. Ports table is empty!") pytest.fail('Ports table is empty!') return status
def test_import_helpers_module(): """Verify that all modules can be imported within 'helpers' module and classes objects can be created. """ module_name = "helpers" try: # define test data def assertion_error_func(): """Assertion error. """ assert 0 from testlib import helpers helpers.raises(AssertionError, "", assertion_error_func) except ImportError as err: pytest.fail("Import failure in '%s' module: %s" % (module_name, err))
def test_import_common3_module(monkeypatch): """Verify that all modules can be imported within 'common3' module and 'Cross'/'Environment' objects can be created. """ def fake_get_conf(env_object, path_string): """Get config. """ return {'env': []} module_name = "common3" try: from testlib import common3 common3.Cross(None, None) # replace Environment _get_setup method monkeypatch.setattr(common3.Environment, "_get_setup", fake_get_conf) common3.Environment(FakeOpts()) except ImportError as err: pytest.fail("Import failure in '%s' module: %s" % (module_name, err))
def test_readme(cookies): """The generated README.rst file should pass some sanity checks and validate as a PyPI long description.""" extra_context = {'repo_name': 'helloworld'} with bake_in_temp_dir(cookies, extra_context=extra_context) as result: readme_file = result.project.join('README.rst') readme_lines = [x.strip() for x in readme_file.readlines(cr=False)] assert 'helloworld' in readme_lines assert 'The full documentation is at https://helloworld.readthedocs.org.' in readme_lines setup_path = str(result.project.join('setup.py')) try: sh.python(setup_path, 'check', restructuredtext=True, strict=True) except sh.ErrorReturnCode as exc: pytest.fail(str(exc))
def check_quality(result): """Run quality tests on the given generated output.""" for dirpath, _dirnames, filenames in os.walk(str(result.project)): pylintrc = str(result.project.join('pylintrc')) for filename in filenames: name = os.path.join(dirpath, filename) if not name.endswith('.py'): continue try: sh.pylint(name, rcfile=pylintrc) sh.pylint(name, py3k=True) sh.pycodestyle(name) if filename != 'setup.py': sh.pydocstyle(name) sh.isort(name, check_only=True) except sh.ErrorReturnCode as exc: pytest.fail(str(exc)) tox_ini = result.project.join('tox.ini') docs_build_dir = result.project.join('docs/_build') try: # Sanity check the generated Makefile sh.make('help') # quality check docs sh.doc8(result.project.join("README.rst"), ignore_path=docs_build_dir, config=tox_ini) sh.doc8(result.project.join("docs"), ignore_path=docs_build_dir, config=tox_ini) except sh.ErrorReturnCode as exc: pytest.fail(str(exc))
def test_all_iam_templates(template_name): """Verify all IAM templates render as proper JSON.""" *_, service_json = template_name.split('/') service, *_ = service_json.split('.') items = ['resource1', 'resource2'] if service == 'rds-db': items = { 'resource1': 'user1', 'resource2': 'user2', } try: rendered = render_policy_template( account_number='', app='coreforrest', env='dev', group='forrest', items=items, pipeline_settings={ 'lambda': { 'vpc_enabled': False, }, }, region='us-east-1', service=service) except json.decoder.JSONDecodeError: pytest.fail('Bad template: {0}'.format(template_name), pytrace=False) assert isinstance(rendered, list)
def _release(self, name): attr = getattr(self, '_orig_' + name) setattr(subprocess, name, attr) delattr(self, '_orig_' + name) responder = self.responders[name] if not responder.is_satisfied: pytest.fail('Unsatisfied responder: %s' % responder)
def test_ufodiff_delta_class_instantiation_commit(): try: deltaobj = Delta('.', [], is_commit_test=True, commit_number='2') assert deltaobj.is_commit_test is True assert deltaobj.is_branch_test is False assert deltaobj.commit_number == "2" assert deltaobj.compare_branch_name is None assert len(deltaobj.ufo_directory_list) == 0 except Exception as e: pytest.fail(e)
def test_ufodiff_delta_class_instantiation_branch(): make_testing_branch() try: deltaobj = Delta('.', [], is_branch_test=True, compare_branch_name='testing_branch') assert deltaobj.is_commit_test is False assert deltaobj.is_branch_test is True assert deltaobj.compare_branch_name == "testing_branch" assert deltaobj.commit_number == "0" assert len(deltaobj.ufo_directory_list) == 0 except Exception as e: delete_testing_branch() pytest.fail(e) delete_testing_branch()
def test_ufodiff_delta_class_instantiation_commit_with_ufo_filter(): try: deltaobj = Delta('.', ['Font-Regular.ufo'], is_commit_test=True, commit_number='2') assert deltaobj.is_commit_test is True assert deltaobj.is_branch_test is False assert deltaobj.commit_number == "2" assert deltaobj.compare_branch_name is None assert len(deltaobj.ufo_directory_list) == 1 assert deltaobj.ufo_directory_list[0] == "Font-Regular.ufo" except Exception as e: pytest.fail(e)
def error_on_ResourceWarning(): """This fixture captures ResourceWarning's and reports an "error" describing the file handles left open. This is shown regardless of how successful the test was, if a test fails and leaves files open then those files will be reported. Ideally, even those files should be closed properly after a test failure or exception. Since only Python 3 and PyPy3 have ResourceWarning's, this context will have no effect when running tests on Python 2 or PyPy. Because of autouse=True, this function will be automatically enabled for all test_* functions in this module. This code is primarily based on the examples found here: https://stackoverflow.com/questions/24717027/convert-python-3-resourcewarnings-into-exception """ try: ResourceWarning except NameError: # Python 2, PyPy yield return # Python 3, PyPy3 with warnings.catch_warnings(record=True) as caught: warnings.resetwarnings() # clear all filters warnings.simplefilter('ignore') # ignore all warnings.simplefilter('always', ResourceWarning) # add filter yield # run tests in this context gc.collect() # run garbage collection (for pypy3) if not caught: return pytest.fail('The following file descriptors were not closed properly:\n' + '\n'.join((str(warning.message) for warning in caught)), pytrace=False)
def yocto_id_from_ext4(self, filename): try: cmd = "debugfs -R 'cat %s' %s| sed -n 's/^%s=//p'" % (self.artifact_info_file, filename, self.artifact_prefix) output = subprocess.check_output(cmd, shell=True).strip() logger.info("Running: " + cmd + " returned: " + output) return output except subprocess.CalledProcessError: pytest.fail("Unable to read: %s, is it a broken image?" % (filename)) except Exception, e: pytest.fail("Unexpected error trying to read ext4 image: %s, error: %s" % (filename, str(e)))
def artifact_id_randomize(self, install_image, device_type="vexpress-qemu", specific_image_id=None): if specific_image_id: imageid = specific_image_id else: imageid = "mender-%s" % str(random.randint(0, 99999999)) config_file = r"""%s=%s""" % (self.artifact_prefix, imageid) tfile = tempfile.NamedTemporaryFile(delete=False) tfile.write(config_file) tfile.close() try: cmd = "debugfs -w -R 'rm %s' %s" % (self.artifact_info_file, install_image) logger.info("Running: " + cmd) output = subprocess.check_output(cmd, shell=True).strip() logger.info("Returned: " + output) cmd = ("printf 'cd %s\nwrite %s %s\n' | debugfs -w %s" % (os.path.dirname(self.artifact_info_file), tfile.name, os.path.basename(self.artifact_info_file), install_image)) logger.info("Running: " + cmd) output = subprocess.check_output(cmd, shell=True).strip() logger.info("Returned: " + output) except subprocess.CalledProcessError: pytest.fail("Trying to modify ext4 image failed, probably because it's not a valid image.") except Exception, e: pytest.fail("Unexpted error trying to modify ext4 image: %s, error: %s" % (install_image, str(e))) finally: os.remove(tfile.name) return imageid
def verify_reboot_performed(self, max_wait=60*30): logger.info("waiting for system to reboot") successful_connections = 0 timeout = time.time() + max_wait while time.time() <= timeout: try: with settings(warn_only=True, abort_exception=FabricFatalException): time.sleep(1) if exists(self.tfile): logger.debug("temp. file still exists, device hasn't rebooted.") continue else: logger.debug("temp. file no longer exists, device has rebooted.") successful_connections += 1 # try connecting 10 times before returning if successful_connections <= 9: continue return except (BaseException): logger.debug("system exit was caught, this is probably because SSH connectivity is broken while the system is rebooting") continue if time.time() > timeout: pytest.fail("Device never rebooted!")
def common_update_procedure(install_image, regenerate_image_id=True, device_type="vexpress-qemu", broken_image=False, verify_status=True, signed=False, devices=None, scripts=[]): with artifact_lock: if broken_image: artifact_id = "broken_image_" + str(random.randint(0, 999999)) elif regenerate_image_id: artifact_id = Helpers.artifact_id_randomize(install_image) logger.debug("randomized image id: " + artifact_id) else: artifact_id = Helpers.yocto_id_from_ext4(install_image) # create atrifact with tempfile.NamedTemporaryFile() as artifact_file: created_artifact = image.make_artifact(install_image, device_type, artifact_id, artifact_file, signed=signed, scripts=scripts) if created_artifact: deploy.upload_image(created_artifact) if devices is None: devices = list(set([device["device_id"] for device in adm.get_devices_status("accepted")])) deployment_id = deploy.trigger_deployment(name="New valid update", artifact_name=artifact_id, devices=devices) else: logger.warn("failed to create artifact") pytest.fail("error creating artifact") # wait until deployment is in correct state if verify_status: deploy.check_expected_status("inprogress", deployment_id) return deployment_id, artifact_id
def test_ssl_only(self): """ make sure we are not exposing any non-ssl connections in production environment """ done = False sleep_time = 2 # start production environment subprocess.call(["./production_test_env.py", "--start"]) # get all exposed ports from docker for _ in range(3): exposed_hosts = subprocess.check_output("docker ps | grep %s | grep -o -E '0.0.0.0:[0-9]*'" % ("testprod"), shell=True) try: for host in exposed_hosts.split(): with contextlib.closing(ssl.wrap_socket(socket.socket())) as sock: logging.info("%s: connect to host with TLS" % host) host, port = host.split(":") sock.connect((host, int(port))) done = True except: sleep_time *= 2 time.sleep(sleep_time) continue if done: break # tear down production env subprocess.call(["./production_test_env.py", "--kill"]) if not done: pytest.fail("failed to connect to production env. using SSL")
def test_token_token_expiration(self): """ verify that an expired token is handled correctly (client gets a new, valid one) and that deployments are still recieved by the client """ if not env.host_string: execute(self.test_token_token_expiration, hosts=get_mender_clients()) return timeout_time = int(time.time()) + 60 while int(time.time()) < timeout_time: with quiet(): output = run("journalctl -u mender -l --no-pager | grep \"received new authorization data\"") time.sleep(1) if output.return_code == 0: logging.info("mender logs indicate new authorization data available") break if timeout_time <= int(time.time()): pytest.fail("timed out waiting for download retries") # this call verifies that the deployment process goes into an "inprogress" state # which is only possible when the client has a valid token. common_update_procedure(install_image=conftest.get_valid_image())
def mender_log_contains_aborted_string(self, mender_client_container="mender-client"): expected_string = "deployment aborted at the backend" for _ in range(60*5): with settings(hide('everything'), warn_only=True): out = run("journalctl -u mender | grep \"%s\"" % expected_string) if out.succeeded: return else: time.sleep(2) pytest.fail("deployment never aborted.")
def perform_update(self, mender_client_container="mender-client", fail=False): if fail: execute(update_image_failed, hosts=get_mender_client_by_container_name(mender_client_container)) else: execute(update_image_successful, install_image=conftest.get_valid_image(), skip_reboot_verification=True, hosts=get_mender_client_by_container_name(mender_client_container))
def wait_for_containers(expected_containers, defined_in): for _ in range(60 * 5): out = subprocess.check_output("docker-compose -p %s %s ps -q" % (conftest.docker_compose_instance, "-f " + " -f ".join(defined_in)), shell=True) if len(out.split()) == expected_containers: time.sleep(60) return else: time.sleep(1) pytest.fail("timeout: %d containers not running for docker-compose project: %s" % (expected_containers, conftest.docker_compose_instance))
def get_ursula_by_id(self, ursula_id): try: ursula = self._ursulas[ursula_id] except KeyError: pytest.fail("No Ursula with ID {}".format(ursula_id)) return ursula
def test_all_ursulas_know_about_all_other_ursulas(ursulas): """ Once launched, all Ursulas know about - and can help locate - all other Ursulas in the network. """ ignorance = [] for acounter, announcing_ursula in enumerate(blockchain_client._ursulas_on_blockchain): for counter, propagating_ursula in enumerate(ursulas): if not digest(announcing_ursula) in propagating_ursula.server.storage: ignorance.append((counter, acounter)) if ignorance: pytest.fail(str(["{} didn't know about {}".format(counter, acounter) for counter, acounter in ignorance]))
def test_execute_fails_with_graphs_with_isles(): g = make_a_graph_with_isles() with pytest.raises(exceptions.GraphExecutionError): g.execute() pytest.fail()