我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用testtools.StreamToDict()。
def main(): args = parse_args() stream = subunit.ByteStreamToStreamResult( sys.stdin, non_subunit_name='stdout') starts = Starts(sys.stdout) outcomes = testtools.StreamToDict( functools.partial(show_outcome, sys.stdout, print_failures=args.print_failures, failonly=args.failonly )) summary = testtools.StreamSummary() result = testtools.CopyStreamResult([starts, outcomes, summary]) result.startTestRun() try: stream.run(result) finally: result.stopTestRun() if count_tests('status', '.*') == 0: print("The test run didn't actually run any tests") return 1 if args.post_fails: print_fails(sys.stdout) print_summary(sys.stdout) return (0 if summary.wasSuccessful() else 1)
def main(): args = parse_args() stream = subunit.ByteStreamToStreamResult( sys.stdin, non_subunit_name='stdout') starts = Starts(sys.stdout) outcomes = testtools.StreamToDict( functools.partial(show_outcome, sys.stdout, print_failures=args.print_failures, failonly=args.failonly)) summary = testtools.StreamSummary() result = testtools.CopyStreamResult([starts, outcomes, summary]) result.startTestRun() try: stream.run(result) finally: result.stopTestRun() if count_tests('status', '.*') == 0: print("The test run didn't actually run any tests") return 1 if args.post_fails: print_fails(sys.stdout) print_summary(sys.stdout) return (0 if summary.wasSuccessful() else 1)
def _find_failing(repo): run = repo.get_failing() case = run.get_test() ids = [] def gather_errors(test_dict): if test_dict['status'] == 'fail': ids.append(test_dict['id']) result = testtools.StreamToDict(gather_errors) result.startTestRun() try: case.run(result) finally: result.stopTestRun() return ids
def __init__(self, repository, partial=False, run_id=None): # XXX: Perhaps should factor into a decorator and use an unaltered # TestProtocolClient. self._repository = repository self._run_id = run_id if not self._run_id: fd, name = tempfile.mkstemp(dir=self._repository.base) self.fname = name stream = os.fdopen(fd, 'wb') else: self.fname = os.path.join(self._repository.base, self._run_id) stream = open(self.fname, 'ab') self.partial = partial # The time take by each test, flushed at the end. self._times = {} self._test_start = None self._time = None subunit_client = testtools.StreamToExtendedDecorator( TestProtocolClient(stream)) self.hook = testtools.CopyStreamResult([ subunit_client, testtools.StreamToDict(self._handle_test)]) self._stream = stream
def startTestRun(self): self._subunit = io.BytesIO() self.subunit_stream = subunit.v2.StreamResultToBytes(self._subunit) self.hook = testtools.CopyStreamResult([ testtools.StreamToDict(self._handle_test), self.subunit_stream]) self.hook.startTestRun() self.start_time = datetime.datetime.utcnow() session = self.session_factory() if not self._run_id: self.run = db_api.create_run(session=session) self._run_id = self.run.uuid else: int_id = db_api.get_run_id_from_uuid(self._run_id, session=session) self.run = db_api.get_run_by_id(int_id, session=session) session.close() self.totals = {}
def trace(stdin, stdout, print_failures=False, failonly=False, enable_diff=False, abbreviate=False, color=False, post_fails=False, no_summary=False): stream = subunit.ByteStreamToStreamResult( stdin, non_subunit_name='stdout') outcomes = testtools.StreamToDict( functools.partial(show_outcome, stdout, print_failures=print_failures, failonly=failonly, enable_diff=enable_diff, abbreviate=abbreviate, enable_color=color)) summary = testtools.StreamSummary() result = testtools.CopyStreamResult([outcomes, summary]) result = testtools.StreamResultRouter(result) cat = subunit.test_results.CatFiles(stdout) result.add_rule(cat, 'test_id', test_id=None) start_time = datetime.datetime.utcnow() result.startTestRun() try: stream.run(result) finally: result.stopTestRun() stop_time = datetime.datetime.utcnow() elapsed_time = stop_time - start_time if count_tests('status', '.*') == 0: print("The test run didn't actually run any tests") return 1 if post_fails: print_fails(stdout) if not no_summary: print_summary(stdout, elapsed_time) # NOTE(mtreinish): Ideally this should live in testtools streamSummary # this is just in place until the behavior lands there (if it ever does) if count_tests('status', '^success$') == 0: print("\nNo tests were successful during the run") return 1 return 0 if summary.wasSuccessful() else 1
def _prior_tests(self, run, failing_id): """Calculate what tests from the test run run ran before test_id. Tests that ran in a different worker are not included in the result. """ if not getattr(self, '_worker_to_test', False): case = run.get_test() # Use None if there is no worker-N tag # If there are multiple, map them all. # (worker-N -> [testid, ...]) worker_to_test = {} # (testid -> [workerN, ...]) test_to_worker = {} def map_test(test_dict): tags = test_dict['tags'] id = test_dict['id'] workers = [] for tag in tags: if tag.startswith('worker-'): workers.append(tag) if not workers: workers = [None] for worker in workers: worker_to_test.setdefault(worker, []).append(id) test_to_worker.setdefault(id, []).extend(workers) mapper = testtools.StreamToDict(map_test) mapper.startTestRun() try: case.run(mapper) finally: mapper.stopTestRun() self._worker_to_test = worker_to_test self._test_to_worker = test_to_worker failing_workers = self._test_to_worker[failing_id] prior_tests = [] for worker in failing_workers: worker_tests = self._worker_to_test[worker] prior_tests.extend(worker_tests[:worker_tests.index(failing_id)]) return prior_tests
def assertRunExit(self, cmd, expected, subunit=False, stdin=None): if stdin: p = subprocess.Popen( "%s" % cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate(stdin) else: p = subprocess.Popen( "%s" % cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() if not subunit: self.assertEqual( p.returncode, expected, "Stdout: %s; Stderr: %s" % (out, err)) return (out, err) else: self.assertEqual(p.returncode, expected, "Expected return code: %s doesn't match actual " "return code of: %s" % (expected, p.returncode)) output_stream = io.BytesIO(out) stream = subunit_lib.ByteStreamToStreamResult(output_stream) starts = testtools.StreamResult() summary = testtools.StreamSummary() tests = [] def _add_dict(test): tests.append(test) outcomes = testtools.StreamToDict(functools.partial(_add_dict)) result = testtools.CopyStreamResult([starts, outcomes, summary]) result.startTestRun() try: stream.run(result) finally: result.stopTestRun() self.assertThat(len(tests), testtools.matchers.GreaterThan(0)) return (out, err)
def startTestRun(self): self._subunit = BytesIO() serialiser = subunit.v2.StreamResultToBytes(self._subunit) self._hook = testtools.CopyStreamResult([ testtools.StreamToDict(self._handle_test), serialiser]) self._hook.startTestRun()
def to_disk(argv=None, stdin=None, stdout=None): if stdout is None: stdout = sys.stdout if stdin is None: stdin = sys.stdin parser = optparse.OptionParser( description="Export a subunit stream to files on disk.", epilog=dedent("""\ Creates a directory per test id, a JSON file with test metadata within that directory, and each attachment is written to their name relative to that directory. Global packages (no test id) are discarded. Exits 0 if the export was completed, or non-zero otherwise. """)) parser.add_option( "-d", "--directory", help="Root directory to export to.", default=".") options, args = parser.parse_args(argv) if len(args) > 1: raise Exception("Unexpected arguments.") if len(args): source = io.open(args[0], 'rb') else: source = stdin exporter = DiskExporter(options.directory) result = StreamToDict(exporter.export) run_tests_from_stream(source, result, protocol_version=2) return 0