我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用test.support()。
def _listener(cls, conn, families): for fam in families: l = cls.connection.Listener(family=fam) conn.send(l.address) new_conn = l.accept() conn.send(new_conn) new_conn.close() l.close() l = socket.socket() l.bind((test.support.HOST, 0)) l.listen(1) conn.send(l.getsockname()) new_conn, addr = l.accept() conn.send(new_conn) new_conn.close() l.close() conn.recv()
def test_run_call_order__error_in_tearDown_default_result(self): class Foo(Test.LoggingTestCase): def defaultTestResult(self): return LoggingResult(self.events) def tearDown(self): super(Foo, self).tearDown() raise RuntimeError('raised by Foo.tearDown') events = [] Foo(events).run() expected = ['startTestRun', 'startTest', 'setUp', 'test', 'tearDown', 'addError', 'stopTest', 'stopTestRun'] self.assertEqual(events, expected) # "TestCase.run() still works when the defaultTestResult is a TestResult # that does not support startTestRun and stopTestRun.
def check_enough_semaphores(): """Check that the system supports enough semaphores to run the test.""" # minimum number of semaphores available according to POSIX nsems_min = 256 try: nsems = os.sysconf("SC_SEM_NSEMS_MAX") except (AttributeError, ValueError): # sysconf not available or setting not available return if nsems == -1 or nsems >= nsems_min: return raise unittest.SkipTest("The OS doesn't support enough semaphores " "to run the test (required: %d)." % nsems_min) # # Creates a wrapper for a function which records the time it takes to finish #
def test_stderr_flush(self): # sys.stderr is flushed at process shutdown (issue #13812) if self.TYPE == "threads": return testfn = test.support.TESTFN self.addCleanup(test.support.unlink, testfn) proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) proc.start() proc.join() with open(testfn, 'r') as f: err = f.read() # The whole traceback was printed self.assertIn("ZeroDivisionError", err) self.assertIn("test_multiprocessing.py", err) self.assertIn("1/0 # MARKER", err)
def test_fd_transfer(self): if self.TYPE != 'processes': self.skipTest("only makes sense with processes") conn, child_conn = self.Pipe(duplex=True) p = self.Process(target=self._writefd, args=(child_conn, b"foo")) p.daemon = True p.start() self.addCleanup(test.support.unlink, test.support.TESTFN) with open(test.support.TESTFN, "wb") as f: fd = f.fileno() if msvcrt: fd = msvcrt.get_osfhandle(fd) reduction.send_handle(conn, fd, p.pid) p.join() with open(test.support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"foo")
def test_large_fd_transfer(self): # With fd > 256 (issue #11657) if self.TYPE != 'processes': self.skipTest("only makes sense with processes") conn, child_conn = self.Pipe(duplex=True) p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) p.daemon = True p.start() self.addCleanup(test.support.unlink, test.support.TESTFN) with open(test.support.TESTFN, "wb") as f: fd = f.fileno() for newfd in range(256, MAXFD): if not self._is_fd_assigned(newfd): break else: self.fail("could not find an unassigned large file descriptor") os.dup2(fd, newfd) try: reduction.send_handle(conn, newfd, p.pid) finally: os.close(newfd) p.join() with open(test.support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"bar")
def setUp(self): self.c = len(struct.pack('c', b' ')) self.H = len(struct.pack('H', 0)) self.i = len(struct.pack('i', 0)) self.l = len(struct.pack('l', 0)) self.P = len(struct.pack('P', 0)) # due to missing size_t information from struct, it is assumed that # sizeof(Py_ssize_t) = sizeof(void*) self.header = 'PP' self.vheader = self.header + 'P' if hasattr(sys, "gettotalrefcount"): self.header += '2P' self.vheader += '2P' self.longdigit = sys.int_info.sizeof_digit import _testcapi self.gc_headsize = _testcapi.SIZEOF_PYGC_HEAD self.file = open(test.support.TESTFN, 'wb')
def test_whichdb(self): for module in dbm_iterator(): # Check whether whichdb correctly guesses module name # for databases opened with "module" module. # Try with empty files first name = module.__name__ if name == 'dbm.dumb': continue # whichdb can't support dbm.dumb delete_files() f = module.open(_fname, 'c') f.close() self.assertEqual(name, dbm.whichdb(_fname)) # Now add a key f = module.open(_fname, 'w') f[b"1"] = b"1" # and test that we can find it self.assertIn(b"1", f) # and read it self.assertTrue(f[b"1"] == b"1") f.close() self.assertEqual(name, dbm.whichdb(_fname))
def _run_object_doctest(obj, module): finder = doctest.DocTestFinder(verbose=verbose, recurse=False) runner = doctest.DocTestRunner(verbose=verbose) # Use the object's fully qualified name if it has one # Otherwise, use the module's name try: name = "%s.%s" % (obj.__module__, obj.__name__) except AttributeError: name = module.__name__ for example in finder.find(obj, name, module): runner.run(example) f, t = runner.failures, runner.tries if f: raise test.support.TestFailed("%d of %d doctests failed" % (f, t)) if verbose: print ('doctest (%s) ... %d tests with zero failures' % (module.__name__, t)) return f, t
def test_whichdb(self): for module in dbm_iterator(): # Check whether whichdb correctly guesses module name # for databases opened with "module" module. # Try with empty files first name = module.__name__ if name == 'dbm.dumb': continue # whichdb can't support dbm.dumb delete_files() f = module.open(_fname, 'c') f.close() self.assertEqual(name, self.dbm.whichdb(_fname)) # Now add a key f = module.open(_fname, 'w') f[b"1"] = b"1" # and test that we can find it self.assertIn(b"1", f) # and read it self.assertTrue(f[b"1"] == b"1") f.close() self.assertEqual(name, self.dbm.whichdb(_fname))
def test_weakref(self): # Check memoryviews are weakrefable for tp in self._types: b = tp(self._source) m = self._view(b) L = [] def callback(wr, b=b): L.append(b) wr = weakref.ref(m, callback) self.assertIs(wr(), m) del m test.support.gc_collect() self.assertIs(wr(), None) self.assertIs(L[0], b) # Variations on source objects for the buffer: bytes-like objects, then arrays # with itemsize > 1. # NOTE: support for multi-dimensional objects is unimplemented.
def test_recursionlimit_fatalerror(self): # A fatal error occurs if a second recursion limit is hit when recovering # from a first one. code = textwrap.dedent(""" import sys def f(): try: f() except RuntimeError: f() sys.setrecursionlimit(%d) f()""") with test.support.SuppressCrashReport(): for i in (50, 1000): sub = subprocess.Popen([sys.executable, '-c', code % i], stderr=subprocess.PIPE) err = sub.communicate()[1] self.assertTrue(sub.returncode, sub.returncode) self.assertIn( b"Fatal Python error: Cannot recover from stack overflow", err)
def test_pythontypes(self): # check all types defined in Python/ size = test.support.calcobjsize vsize = test.support.calcvobjsize check = self.check_sizeof # _ast.AST import _ast check(_ast.AST(), size('P')) try: raise TypeError except TypeError: tb = sys.exc_info()[2] # traceback if tb is not None: check(tb, size('2P2i')) # symtable entry # XXX # sys.flags check(sys.flags, vsize('') + self.P * len(sys.flags))
def test_stderr_flush(self): # sys.stderr is flushed at process shutdown (issue #13812) if self.TYPE == "threads": self.skipTest('test not appropriate for {}'.format(self.TYPE)) testfn = test.support.TESTFN self.addCleanup(test.support.unlink, testfn) proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) proc.start() proc.join() with open(testfn, 'r') as f: err = f.read() # The whole traceback was printed self.assertIn("ZeroDivisionError", err) self.assertIn("test_multiprocessing.py", err) self.assertIn("1/0 # MARKER", err)
def test_no_import_lock_contention(self): with test.support.temp_cwd(): module_name = 'imported_by_an_imported_module' with open(module_name + '.py', 'w') as f: f.write("""if 1: import multiprocessing q = multiprocessing.Queue() q.put('knock knock') q.get(timeout=3) q.close() del q """) with test.support.DirsOnSysPath(os.getcwd()): try: __import__(module_name) except pyqueue.Empty: self.fail("Probable regression on import lock contention;" " see Issue #22853")
def test_threads_join(self): # Non-daemon threads should be joined at subinterpreter shutdown # (issue #18808) r, w = os.pipe() self.addCleanup(os.close, r) self.addCleanup(os.close, w) code = r"""if 1: import os import threading import time def f(): # Sleep a bit so that the thread is still running when # Py_EndInterpreter is called. time.sleep(0.05) os.write(%d, b"x") threading.Thread(target=f).start() """ % (w,) ret = test.support.run_in_subinterp(code) self.assertEqual(ret, 0) # The thread was joined properly. self.assertEqual(os.read(r, 1), b"x")
def test_daemon_threads_fatal_error(self): subinterp_code = r"""if 1: import os import threading import time def f(): # Make sure the daemon thread is still running when # Py_EndInterpreter is called. time.sleep(10) threading.Thread(target=f, daemon=True).start() """ script = r"""if 1: import _testcapi _testcapi.run_in_subinterp(%r) """ % (subinterp_code,) with test.support.SuppressCrashReport(): rc, out, err = assert_python_failure("-c", script) self.assertIn("Fatal Python error: Py_EndInterpreter: " "not the last thread", err.decode())
def test_issue22668(self): a = array.array('H', [256, 256, 256, 256]) x = memoryview(a) m = x.cast('B') b = m.cast('H') c = b[0:2] d = memoryview(b) del b self.assertEqual(c[0], 256) self.assertEqual(d[0], 256) self.assertEqual(c.format, "H") self.assertEqual(d.format, "H") _ = m.cast('I') self.assertEqual(c[0], 256) self.assertEqual(d[0], 256) self.assertEqual(c.format, "H") self.assertEqual(d.format, "H") # Variations on source objects for the buffer: bytes-like objects, then arrays # with itemsize > 1. # NOTE: support for multi-dimensional objects is unimplemented.
def test_traceback(self): # We want ensure that the traceback from the child process is # contained in the traceback raised in the main process. if self.TYPE == 'processes': with self.Pool(1) as p: try: p.apply(self._test_traceback) except Exception as e: exc = e else: raise AssertionError('expected RuntimeError') self.assertIs(type(exc), RuntimeError) self.assertEqual(exc.args, (123,)) cause = exc.__cause__ self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback) self.assertIn('raise RuntimeError(123) # some comment', cause.tb) with test.support.captured_stderr() as f1: try: raise exc except RuntimeError: sys.excepthook(*sys.exc_info()) self.assertIn('raise RuntimeError(123) # some comment', f1.getvalue())
def tearDown(self): sys.stdout = self.orig_stdout sys.stderr = self.orig_stderr sys.displayhook = self.orig_displayhook test.support.reap_children()
def test_excepthook(self): with test.support.captured_output("stderr") as stderr: sys.excepthook(1, '1', 1) self.assertTrue("TypeError: print_exception(): Exception expected for " \ "value, str found" in stderr.getvalue()) # FIXME: testing the code for a lost or replaced excepthook in # Python/pythonrun.c::PyErr_PrintEx() is tricky.
def test_getwindowsversion(self): # Raise SkipTest if sys doesn't have getwindowsversion attribute test.support.get_attribute(sys, "getwindowsversion") v = sys.getwindowsversion() self.assertEqual(len(v), 5) self.assertIsInstance(v[0], int) self.assertIsInstance(v[1], int) self.assertIsInstance(v[2], int) self.assertIsInstance(v[3], int) self.assertIsInstance(v[4], str) self.assertRaises(IndexError, operator.getitem, v, 5) self.assertIsInstance(v.major, int) self.assertIsInstance(v.minor, int) self.assertIsInstance(v.build, int) self.assertIsInstance(v.platform, int) self.assertIsInstance(v.service_pack, str) self.assertIsInstance(v.service_pack_minor, int) self.assertIsInstance(v.service_pack_major, int) self.assertIsInstance(v.suite_mask, int) self.assertIsInstance(v.product_type, int) self.assertEqual(v[0], v.major) self.assertEqual(v[1], v.minor) self.assertEqual(v[2], v.build) self.assertEqual(v[3], v.platform) self.assertEqual(v[4], v.service_pack) # This is how platform.py calls it. Make sure tuple # still has 5 elements maj, min, buildno, plat, csd = sys.getwindowsversion()
def tearDown(self): self.file.close() test.support.unlink(test.support.TESTFN)
def delete_files(): # we don't know the precise name the underlying database uses # so we use glob to locate all names for f in glob.glob(_fname + "*"): test.support.unlink(f)
def setUp(self): delete_files() self.filename = test.support.TESTFN self.d = dbm.open(self.filename, 'c') self.d.close()
def test_main(): classes = [WhichDBTestCase] for mod in dbm_iterator(): classes.append(type("TestCase-" + mod.__name__, (AnyDBMTestCase,), {'module': mod})) test.support.run_unittest(*classes)
def setUp(self): self._threads = test.support.threading_setup()
def tearDown(self): test.support.threading_cleanup(*self._threads) test.support.reap_children()
def test_various_ops_small_stack(self): if verbose: print('with 256kB thread stack size...') try: threading.stack_size(262144) except _thread.error: raise unittest.SkipTest( 'platform does not support changing thread stack size') self.test_various_ops() threading.stack_size(0) # run with a large thread stack size (1MB)
def test_various_ops_large_stack(self): if verbose: print('with 1MB thread stack size...') try: threading.stack_size(0x100000) except _thread.error: raise unittest.SkipTest( 'platform does not support changing thread stack size') self.test_various_ops() threading.stack_size(0)
def test_main(): test.support.run_unittest(LockTests, PyRLockTests, CRLockTests, EventTests, ConditionAsRLockTests, ConditionTests, SemaphoreTests, BoundedSemaphoreTests, ThreadTests, ThreadJoinOnShutdown, ThreadingExceptionTests, BarrierTests )
def test_writable_readonly(self): # Issue #10451: memoryview incorrectly exposes a readonly # buffer as writable causing a segfault if using mmap tp = self.ro_type if tp is None: return b = tp(self._source) m = self._view(b) i = io.BytesIO(b'ZZZZ') self.assertRaises(TypeError, i.readinto, m) # Variations on source objects for the buffer: bytes-like objects, then arrays # with itemsize > 1. # NOTE: support for multi-dimensional objects is unimplemented.
def test_main(): test.support.run_unittest(__name__)
def test_sys_exit(self): # See Issue 13854 if self.TYPE == 'threads': return testfn = test.support.TESTFN self.addCleanup(test.support.unlink, testfn) for reason, code in (([1, 2, 3], 1), ('ignore this', 0)): p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) p.daemon = True p.start() p.join(5) self.assertEqual(p.exitcode, code) with open(testfn, 'r') as f: self.assertEqual(f.read().rstrip(), str(reason)) for reason in (True, False, 8): p = self.Process(target=sys.exit, args=(reason,)) p.daemon = True p.start() p.join(5) self.assertEqual(p.exitcode, reason) # # #