我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用doctest.ELLIPSIS。
def pytest_addoption(parser): parser.addini('doctest_optionflags', 'option flags for doctests', type="args", default=["ELLIPSIS"]) group = parser.getgroup("collect") group.addoption("--doctest-modules", action="store_true", default=False, help="run doctests in all .py modules", dest="doctestmodules") group.addoption("--doctest-report", type=str.lower, default="udiff", help="choose another output format for diffs on doctest failure", choices=DOCTEST_REPORT_CHOICES, dest="doctestreport") group.addoption("--doctest-glob", action="append", default=[], metavar="pat", help="doctests file matching pattern, default: test*.txt", dest="doctestglob") group.addoption("--doctest-ignore-import-errors", action="store_true", default=False, help="ignore doctest ImportErrors", dest="doctest_ignore_import_errors")
def test_context_suppression(self): try: try: raise Exception except: raise_from(ZeroDivisionError, None) except ZeroDivisionError as _: e = _ tb = sys.exc_info()[2] lines = self.get_report(e, tb) self.assertThat(lines, DocTestMatches("""\ Traceback (most recent call last): File "...traceback2/tests/test_traceback.py", line ..., in test_context_suppression raise_from(ZeroDivisionError, None) File "<string>", line 2, in raise_from ZeroDivisionError """, doctest.ELLIPSIS))
def _test(): import doctest from pyspark.sql import SparkSession import pyspark.sql.column globs = pyspark.sql.column.__dict__.copy() spark = SparkSession.builder\ .master("local[4]")\ .appName("sql.column tests")\ .getOrCreate() sc = spark.sparkContext globs['sc'] = sc globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \ .toDF(StructType([StructField('age', IntegerType()), StructField('name', StringType())])) (failure_count, test_count) = doctest.testmod( pyspark.sql.column, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) spark.stop() if failure_count: exit(-1)
def _test(): import doctest from pyspark.sql import Row, SparkSession import pyspark.sql.functions globs = pyspark.sql.functions.__dict__.copy() spark = SparkSession.builder\ .master("local[4]")\ .appName("sql.functions tests")\ .getOrCreate() sc = spark.sparkContext globs['sc'] = sc globs['spark'] = spark globs['df'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.functions, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) spark.stop() if failure_count: exit(-1)
def _test(): import os import doctest from pyspark.context import SparkContext from pyspark.sql import Row import pyspark.sql.session os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.sql.session.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['spark'] = SparkSession(sc) globs['rdd'] = rdd = sc.parallelize( [Row(field1=1, field2="row1"), Row(field1=2, field2="row2"), Row(field1=3, field2="row3")]) globs['df'] = rdd.toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.session, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) globs['sc'].stop() if failure_count: exit(-1)
def _test(): import doctest from pyspark.sql import SparkSession import pyspark.mllib.fpm globs = pyspark.mllib.fpm.__dict__.copy() spark = SparkSession.builder\ .master("local[4]")\ .appName("mllib.fpm tests")\ .getOrCreate() globs['sc'] = spark.sparkContext import tempfile temp_path = tempfile.mkdtemp() globs['temp_path'] = temp_path try: (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) spark.stop() finally: from shutil import rmtree try: rmtree(temp_path) except OSError: pass if failure_count: exit(-1)
def _test(): import doctest from pyspark.sql import SparkSession from pyspark.mllib.linalg import Matrices import pyspark.mllib.linalg.distributed globs = pyspark.mllib.linalg.distributed.__dict__.copy() spark = SparkSession.builder\ .master("local[2]")\ .appName("mllib.linalg.distributed tests")\ .getOrCreate() globs['sc'] = spark.sparkContext globs['Matrices'] = Matrices (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) spark.stop() if failure_count: exit(-1)
def _test(): import doctest from pyspark.sql import SparkSession globs = globals().copy() # The small batch size here ensures that we see multiple batches, # even in these small test examples: spark = SparkSession.builder\ .master("local[2]")\ .appName("mllib.util tests")\ .getOrCreate() globs['spark'] = spark globs['sc'] = spark.sparkContext (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) spark.stop() if failure_count: exit(-1)
def makeTest(self, obj, parent): """Look for doctests in the given object, which will be a function, method or class. """ #print 'Plugin analyzing:', obj, parent # dbg # always use whitespace and ellipsis options optionflags = doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS doctests = self.finder.find(obj, module=getmodule(parent)) if doctests: for test in doctests: if len(test.examples) == 0: continue yield DocTestCase(test, obj=obj, optionflags=optionflags, checker=self.checker)
def suite(package): """Assemble test suite for doctests in path (recursively)""" from importlib import import_module for module in find_modules(package.__file__): try: module = import_module(module) yield DocTestSuite(module, globs=Context(module.__dict__.copy()), optionflags=ELLIPSIS | NORMALIZE_WHITESPACE) except ValueError: pass # No doctests in module except ImportError: import warnings warnings.warn('Unimportable module: {}'.format(module)) # Add documentation tests yield DocFileSuite(path.normpath(path.join(path.dirname(__file__), '..', '..', '..', 'doc', 'scripting.rst')), module_relative=False, globs=Context(module.__dict__.copy()), optionflags=ELLIPSIS | NORMALIZE_WHITESPACE )
def _test(): import doctest from pyspark.context import SparkContext from pyspark.sql import SQLContext import pyspark.sql.column globs = pyspark.sql.column.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \ .toDF(StructType([StructField('age', IntegerType()), StructField('name', StringType())])) (failure_count, test_count) = doctest.testmod( pyspark.sql.column, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) globs['sc'].stop() if failure_count: exit(-1)
def _test(): import doctest from pyspark.context import SparkContext from pyspark.sql import Row, SQLContext import pyspark.sql.functions globs = pyspark.sql.functions.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) globs['df'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.functions, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) globs['sc'].stop() if failure_count: exit(-1)
def load_tests(loader, tests, ignore): module_doctests = [ urwid.widget, urwid.wimp, urwid.decoration, urwid.display_common, urwid.main_loop, urwid.monitored_list, urwid.raw_display, 'urwid.split_repr', # override function with same name urwid.util, urwid.signals, ] for m in module_doctests: tests.addTests(doctest.DocTestSuite(m, optionflags=doctest.ELLIPSIS | doctest.IGNORE_EXCEPTION_DETAIL)) return tests
def pytest_addoption(parser): parser.addini('doctest_optionflags', 'option flags for doctests', type="args", default=["ELLIPSIS"]) group = parser.getgroup("collect") group.addoption("--doctest-modules", action="store_true", default=False, help="run doctests in all .py modules", dest="doctestmodules") group.addoption("--doctest-glob", action="store", default="test*.txt", metavar="pat", help="doctests file matching pattern, default: test*.txt", dest="doctestglob") group.addoption("--doctest-ignore-import-errors", action="store_true", default=False, help="ignore doctest ImportErrors", dest="doctest_ignore_import_errors")
def test_print_whats_next(self): profile = { "name": factory.make_name("profile"), "url": factory.make_name("url"), } stdout = self.patch(sys, "stdout", StringIO()) cli.cmd_login.print_whats_next(profile) expected = dedent("""\ You are now logged in to the MAAS server at %(url)s with the profile name '%(name)s'. For help with the available commands, try: maas %(name)s --help """) % profile observed = stdout.getvalue() flags = doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE self.assertThat(observed, DocTestMatches(expected, flags))
def additional_tests(): import doctest, unittest suite = unittest.TestSuite(( doctest.DocFileSuite( os.path.join('tests', 'api_tests.txt'), optionflags=doctest.ELLIPSIS, package='pkg_resources', ), )) if sys.platform == 'win32': suite.addTest(doctest.DocFileSuite('win_script_wrapper.txt')) return suite
def testDoc(filename, name=None): print "--- %s: Run tests" % filename failure, nb_test = testfile( filename, optionflags=ELLIPSIS, name=name) if failure: exit(1) print "--- %s: End of tests" % filename
def _get_flag_lookup(): import doctest return dict(DONT_ACCEPT_TRUE_FOR_1=doctest.DONT_ACCEPT_TRUE_FOR_1, DONT_ACCEPT_BLANKLINE=doctest.DONT_ACCEPT_BLANKLINE, NORMALIZE_WHITESPACE=doctest.NORMALIZE_WHITESPACE, ELLIPSIS=doctest.ELLIPSIS, IGNORE_EXCEPTION_DETAIL=doctest.IGNORE_EXCEPTION_DETAIL, COMPARISON_FLAGS=doctest.COMPARISON_FLAGS, ALLOW_UNICODE=_get_allow_unicode_flag(), ALLOW_BYTES=_get_allow_bytes_flag(), )
def applicationId(self): """ A unique identifier for the Spark application. Its format depends on the scheduler implementation. * in case of local spark app something like 'local-1433865536131' * in case of YARN something like 'application_1433865536131_34483' >>> sc.applicationId # doctest: +ELLIPSIS u'local-...' """ return self._jsc.sc().applicationId()
def _test(): import atexit import doctest import tempfile globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest') globs['tempdir'] = tempfile.mkdtemp() atexit.register(lambda: shutil.rmtree(globs['tempdir'])) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1)
def _test(): import doctest (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS) if failure_count: exit(-1)
def _test(): import doctest from pyspark.context import SparkContext from pyspark.sql import SparkSession globs = globals() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['spark'] = SparkSession.builder.getOrCreate() (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1)
def _test(): import doctest from pyspark.context import SparkContext from pyspark.sql import Row, SQLContext, SparkSession import pyspark.sql.dataframe from pyspark.sql.functions import from_unixtime globs = pyspark.sql.dataframe.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) globs['spark'] = SparkSession(sc) globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')])\ .toDF(StructType([StructField('age', IntegerType()), StructField('name', StringType())])) globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF() globs['df3'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF() globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80), Row(name='Bob', age=5, height=None), Row(name='Tom', age=None, height=None), Row(name=None, age=None, height=None)]).toDF() globs['sdf'] = sc.parallelize([Row(name='Tom', time=1479441846), Row(name='Bob', time=1479442946)]).toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.dataframe, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) globs['sc'].stop() if failure_count: exit(-1)
def _test(): import os import doctest import tempfile from pyspark.context import SparkContext from pyspark.sql import Row, SQLContext import pyspark.sql.context os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.sql.context.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['tempfile'] = tempfile globs['os'] = os globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) globs['rdd'] = rdd = sc.parallelize( [Row(field1=1, field2="row1"), Row(field1=2, field2="row2"), Row(field1=3, field2="row3")] ) globs['df'] = rdd.toDF() jsonStrings = [ '{"field1": 1, "field2": "row1", "field3":{"field4":11}}', '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},' '"field6":[{"field7": "row2"}]}', '{"field1" : null, "field2": "row3", ' '"field3":{"field4":33, "field5": []}}' ] globs['jsonStrings'] = jsonStrings globs['json'] = sc.parallelize(jsonStrings) (failure_count, test_count) = doctest.testmod( pyspark.sql.context, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) globs['sc'].stop() if failure_count: exit(-1)
def _test(): import doctest import os import tempfile import py4j from pyspark.context import SparkContext from pyspark.sql import SparkSession, Row import pyspark.sql.readwriter os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.sql.readwriter.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') try: spark = SparkSession.builder.enableHiveSupport().getOrCreate() except py4j.protocol.Py4JError: spark = SparkSession(sc) globs['tempfile'] = tempfile globs['os'] = os globs['sc'] = sc globs['spark'] = spark globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned') (failure_count, test_count) = doctest.testmod( pyspark.sql.readwriter, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) sc.stop() if failure_count: exit(-1)
def _test(): import doctest import os import tempfile from pyspark.sql import Row, SparkSession, SQLContext import pyspark.sql.streaming os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.sql.streaming.__dict__.copy() try: spark = SparkSession.builder.getOrCreate() except py4j.protocol.Py4JError: spark = SparkSession(sc) globs['tempfile'] = tempfile globs['os'] = os globs['spark'] = spark globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext) globs['sdf'] = \ spark.readStream.format('text').load('python/test_support/sql/streaming') globs['sdf_schema'] = StructType([StructField("data", StringType(), False)]) globs['df'] = \ globs['spark'].readStream.format('text').load('python/test_support/sql/streaming') (failure_count, test_count) = doctest.testmod( pyspark.sql.streaming, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) globs['spark'].stop() if failure_count: exit(-1)
def _test(): import doctest from pyspark.context import SparkContext globs = globals().copy() # The small batch size here ensures that we see multiple batches, # even in these small test examples: globs['sc'] = SparkContext('local[4]', 'PythonTest') (failure_count, test_count) = doctest.testmod( globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1)
def _test(): import doctest from pyspark.sql import SparkSession import pyspark.mllib.evaluation globs = pyspark.mllib.evaluation.__dict__.copy() spark = SparkSession.builder\ .master("local[4]")\ .appName("mllib.evaluation tests")\ .getOrCreate() globs['sc'] = spark.sparkContext (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) spark.stop() if failure_count: exit(-1)
def _test(): import doctest from pyspark.sql import SparkSession globs = globals().copy() spark = SparkSession.builder\ .master("local[4]")\ .appName("mllib.stat.statistics tests")\ .getOrCreate() globs['sc'] = spark.sparkContext (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) spark.stop() if failure_count: exit(-1)
def _test(): import doctest from pyspark.sql import SparkSession import pyspark.mllib.classification globs = pyspark.mllib.classification.__dict__.copy() spark = SparkSession.builder\ .master("local[4]")\ .appName("mllib.classification tests")\ .getOrCreate() globs['sc'] = spark.sparkContext (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) spark.stop() if failure_count: exit(-1)
def _test(): import doctest globs = globals().copy() from pyspark.sql import SparkSession spark = SparkSession.builder\ .master("local[4]")\ .appName("mllib.tree tests")\ .getOrCreate() globs['sc'] = spark.sparkContext (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) spark.stop() if failure_count: exit(-1)
def _test(): import doctest from pyspark.sql import SparkSession globs = globals().copy() spark = SparkSession.builder\ .master("local[4]")\ .appName("mllib.feature tests")\ .getOrCreate() globs['sc'] = spark.sparkContext (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) spark.stop() if failure_count: exit(-1)
def _test(): import doctest from pyspark.sql import SparkSession globs = globals().copy() # The small batch size here ensures that we see multiple batches, # even in these small test examples: spark = SparkSession.builder\ .master("local[2]")\ .appName("mllib.random tests")\ .getOrCreate() globs['sc'] = spark.sparkContext (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) spark.stop() if failure_count: exit(-1)
def _test(): import doctest import pyspark.mllib.recommendation from pyspark.sql import SQLContext globs = pyspark.mllib.recommendation.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1)
def test_suite(): """Discover all tests in the tests dir""" test_loader = unittest.TestLoader() # Read in unit tests test_suite = test_loader.discover('tests') # Doctest all md and rst files for root, dirs, files in os.walk("."): for f in files: if f.endswith(".rst") or f.endswith(".md"): test_suite.addTests( doctest.DocFileSuite(os.path.join(root, f), optionflags=doctest.ELLIPSIS)) return test_suite
def test_suite(): import doctest return doctest.DocFileSuite( 'README.txt', optionflags=doctest.ELLIPSIS|doctest.REPORT_ONLY_FIRST_FAILURE, )
def loadTestsFromModule(self, module): #print '*** ipdoctest - lTM',module # dbg if not self.matches(module.__name__): log.debug("Doctest doesn't want module %s", module) return tests = self.finder.find(module,globs=self.globs, extraglobs=self.extraglobs) if not tests: return # always use whitespace and ellipsis options optionflags = doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS tests.sort() module_file = module.__file__ if module_file[-4:] in ('.pyc', '.pyo'): module_file = module_file[:-1] for test in tests: if not test.examples: continue if not test.filename: test.filename = module_file yield DocTestCase(test, optionflags=optionflags, checker=self.checker)