我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用testtools.StreamSummary()。
def main(): args = parse_args() stream = subunit.ByteStreamToStreamResult( sys.stdin, non_subunit_name='stdout') starts = Starts(sys.stdout) outcomes = testtools.StreamToDict( functools.partial(show_outcome, sys.stdout, print_failures=args.print_failures, failonly=args.failonly )) summary = testtools.StreamSummary() result = testtools.CopyStreamResult([starts, outcomes, summary]) result.startTestRun() try: stream.run(result) finally: result.stopTestRun() if count_tests('status', '.*') == 0: print("The test run didn't actually run any tests") return 1 if args.post_fails: print_fails(sys.stdout) print_summary(sys.stdout) return (0 if summary.wasSuccessful() else 1)
def main(): args = parse_args() stream = subunit.ByteStreamToStreamResult( sys.stdin, non_subunit_name='stdout') starts = Starts(sys.stdout) outcomes = testtools.StreamToDict( functools.partial(show_outcome, sys.stdout, print_failures=args.print_failures, failonly=args.failonly)) summary = testtools.StreamSummary() result = testtools.CopyStreamResult([starts, outcomes, summary]) result.startTestRun() try: stream.run(result) finally: result.stopTestRun() if count_tests('status', '.*') == 0: print("The test run didn't actually run any tests") return 1 if args.post_fails: print_fails(sys.stdout) print_summary(sys.stdout) return (0 if summary.wasSuccessful() else 1)
def trace(stdin, stdout, print_failures=False, failonly=False, enable_diff=False, abbreviate=False, color=False, post_fails=False, no_summary=False): stream = subunit.ByteStreamToStreamResult( stdin, non_subunit_name='stdout') outcomes = testtools.StreamToDict( functools.partial(show_outcome, stdout, print_failures=print_failures, failonly=failonly, enable_diff=enable_diff, abbreviate=abbreviate, enable_color=color)) summary = testtools.StreamSummary() result = testtools.CopyStreamResult([outcomes, summary]) result = testtools.StreamResultRouter(result) cat = subunit.test_results.CatFiles(stdout) result.add_rule(cat, 'test_id', test_id=None) start_time = datetime.datetime.utcnow() result.startTestRun() try: stream.run(result) finally: result.stopTestRun() stop_time = datetime.datetime.utcnow() elapsed_time = stop_time - start_time if count_tests('status', '.*') == 0: print("The test run didn't actually run any tests") return 1 if post_fails: print_fails(stdout) if not no_summary: print_summary(stdout, elapsed_time) # NOTE(mtreinish): Ideally this should live in testtools streamSummary # this is just in place until the behavior lands there (if it ever does) if count_tests('status', '^success$') == 0: print("\nNo tests were successful during the run") return 1 return 0 if summary.wasSuccessful() else 1
def make_result(get_id, output=sys.stdout): serializer = subunit.StreamResultToBytes(output) # By pass user transforms - just forward it all, result = serializer # and interpret everything as success. summary = testtools.StreamSummary() summary.startTestRun() summary.stopTestRun() return result, summary
def assertRunExit(self, cmd, expected, subunit=False, stdin=None): if stdin: p = subprocess.Popen( "%s" % cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate(stdin) else: p = subprocess.Popen( "%s" % cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() if not subunit: self.assertEqual( p.returncode, expected, "Stdout: %s; Stderr: %s" % (out, err)) return (out, err) else: self.assertEqual(p.returncode, expected, "Expected return code: %s doesn't match actual " "return code of: %s" % (expected, p.returncode)) output_stream = io.BytesIO(out) stream = subunit_lib.ByteStreamToStreamResult(output_stream) starts = testtools.StreamResult() summary = testtools.StreamSummary() tests = [] def _add_dict(test): tests.append(test) outcomes = testtools.StreamToDict(functools.partial(_add_dict)) result = testtools.CopyStreamResult([starts, outcomes, summary]) result.startTestRun() try: stream.run(result) finally: result.stopTestRun() self.assertThat(len(tests), testtools.matchers.GreaterThan(0)) return (out, err)
def _make_result(repo, list_tests=False, stdout=sys.stdout): if list_tests: list_result = testtools.StreamSummary() return list_result, list_result else: def _get_id(): return repo.get_latest_run().get_id() output_result = results.CLITestResult(_get_id, stdout, None) summary_result = output_result.get_summary() return output_result, summary_result
def _run_tests(cmd, failing, analyze_isolation, isolated, until_failure, subunit_out=False, combine_id=None, repo_type='file', repo_url=None, pretty_out=True, color=False, stdout=sys.stdout, abbreviate=False): """Run the tests cmd was parameterised with.""" cmd.setUp() try: def run_tests(): run_procs = [('subunit', output.ReturnCodeToSubunit( proc)) for proc in cmd.run_tests()] if not run_procs: stdout.write("The specified regex doesn't match with anything") return 1 return load.load((None, None), in_streams=run_procs, subunit_out=subunit_out, repo_type=repo_type, repo_url=repo_url, run_id=combine_id, pretty_out=pretty_out, color=color, stdout=stdout, abbreviate=abbreviate) if not until_failure: return run_tests() else: while True: result = run_tests() # If we're using subunit output we want to make sure to check # the result from the repository because load() returns 0 # always on subunit output if subunit_out: repo = util.get_repo_open(repo_type, repo_url) summary = testtools.StreamSummary() last_run = repo.get_latest_run().get_subunit_stream() stream = subunit.ByteStreamToStreamResult(last_run) summary.startTestRun() try: stream.run(summary) finally: summary.stopTestRun() if not summary.wasSuccessful(): result = 1 if result: return result finally: cmd.cleanUp()