Python nose 模块,SkipTest() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用nose.SkipTest()

项目:MCSManager-fsmodule    作者:Suwings    | 项目源码 | 文件源码
def skip_if(predicate, reason=None):
    """Skip a test if predicate is true."""
    reason = reason or predicate.__name__

    from nose import SkipTest

    def decorate(fn):
        fn_name = fn.__name__

        def maybe(*args, **kw):
            if predicate():
                msg = "'%s' skipped: %s" % (
                    fn_name, reason)
                raise SkipTest(msg)
            else:
                return fn(*args, **kw)
        return function_named(maybe, fn_name)
    return decorate
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def setup_class(cls):
        if not tests.is_datastore_supported():
            raise nose.SkipTest("Datastore not supported")
        p.load('datastore')
        ctd.CreateTestData.create()
        cls.sysadmin_user = model.User.get('testsysadmin')
        cls.normal_user = model.User.get('annafan')
        resource = model.Package.get('annakarenina').resources[0]
        cls.data = {
            'resource_id': resource.id,
            'aliases': u'b\xfck2',
            'fields': [{'id': 'book', 'type': 'text'},
                       {'id': 'author', 'type': 'text'},
                       {'id': 'rating with %', 'type': 'text'}],
            'records': [{'book': 'annakarenina', 'author': 'tolstoy',
                         'rating with %': '90%'},
                        {'book': 'warandpeace', 'author': 'tolstoy',
                         'rating with %': '42%'}]
        }

        engine = db._get_engine(
            {'connection_url': config['ckan.datastore.write_url']})
        cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
        set_url_type(
            model.Package.get('annakarenina').resources, cls.sysadmin_user)
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def setup_class(cls):
        wsgiapp = middleware.make_app(config['global_conf'], **config)
        cls.app = paste.fixture.TestApp(wsgiapp)
        if not tests.is_datastore_supported():
            raise nose.SkipTest("Datastore not supported")
        p.load('datastore')
        p.load('datapusher')
        p.load('test_datapusher_plugin')

        resource = factories.Resource(url_type='datastore')
        cls.dataset = factories.Dataset(resources=[resource])

        cls.sysadmin_user = factories.User(name='testsysadmin', sysadmin=True)
        cls.normal_user = factories.User(name='annafan')
        engine = db._get_engine(
            {'connection_url': config['ckan.datastore.write_url']})
        cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_interrupted_systemcall(self):
        '''
        Make sure interrupted system calls don't break the world, since we
        can't control what all signals our connection thread will get
        '''
        if 'linux' not in platform:
            raise SkipTest('Unable to reproduce error case on'
                           ' non-linux platforms')

        path = 'interrupt_test'
        value = b"1"
        self.client.create(path, value)

        # set the euid to the current process' euid.
        # glibc sends SIGRT to all children, which will interrupt the
        # system call
        os.seteuid(os.geteuid())

        # basic sanity test that it worked alright
        assert self.client.get(path)[0] == value
项目:ckan-timeseries    作者:namgk    | 项目源码 | 文件源码
def setup_class(cls):

        if not pylons.config.get('ckan.datastore.read_url'):
            raise nose.SkipTest('Datastore runs on legacy mode, skipping...')

        engine = db._get_engine(
            {'connection_url': pylons.config['ckan.datastore.write_url']}
        )
        cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))

        datastore_test_helpers.clear_db(cls.Session)

        create_tables = [
            u'CREATE TABLE test_a (id_a text)',
            u'CREATE TABLE test_b (id_b text)',
            u'CREATE TABLE "TEST_C" (id_c text)',
            u'CREATE TABLE test_d ("?/?" integer)',
        ]
        for create_table_sql in create_tables:
            cls.Session.execute(create_table_sql)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def skip(msg=None):
    """Decorator factory - mark a test function for skipping from test suite.

    Parameters
    ----------
      msg : string
        Optional message to be added.

    Returns
    -------
       decorator : function
         Decorator, which, when applied to a function, causes SkipTest
         to be raised, with the optional message added.
      """

    return skipif(True,msg)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def test_obj_del(self):
        """Test that object's __del__ methods are called on exit."""
        if sys.platform == 'win32':
            try:
                import win32api
            except ImportError:
                raise SkipTest("Test requires pywin32")
        src = ("class A(object):\n"
               "    def __del__(self):\n"
               "        print 'object A deleted'\n"
               "a = A()\n")
        self.mktmp(py3compat.doctest_refactor_print(src))
        if dec.module_not_available('sqlite3'):
            err = 'WARNING: IPython History requires SQLite, your history will not be saved\n'
        else:
            err = None
        tt.ipexec_validate(self.fname, 'object A deleted', err)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def test_not_writable_ipdir():
    tmpdir = tempfile.mkdtemp()
    os.name = "posix"
    env.pop('IPYTHON_DIR', None)
    env.pop('IPYTHONDIR', None)
    env.pop('XDG_CONFIG_HOME', None)
    env['HOME'] = tmpdir
    ipdir = os.path.join(tmpdir, '.ipython')
    os.mkdir(ipdir, 0o555)
    try:
        open(os.path.join(ipdir, "_foo_"), 'w').close()
    except IOError:
        pass
    else:
        # I can still write to an unwritable dir,
        # assume I'm root and skip the test
        raise SkipTest("I can't create directories that I can't write to")
    with AssertPrints('is not a writable location', channel='stderr'):
        ipdir = paths.get_ipython_dir()
    env.pop('IPYTHON_DIR', None)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def monkeypatch_xunit():
    try:
        knownfailureif(True)(lambda: None)()
    except Exception as e:
        KnownFailureTest = type(e)

    def addError(self, test, err, capt=None):
        if issubclass(err[0], KnownFailureTest):
            err = (SkipTest,) + err[1:]
        return self.orig_addError(test, err, capt)

    Xunit.orig_addError = Xunit.addError
    Xunit.addError = addError

#-----------------------------------------------------------------------------
# Check which dependencies are installed and greater than minimum version.
#-----------------------------------------------------------------------------
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def __call__(self, fn):
        @decorator
        def decorate(fn, *args, **kw):
            if self.predicate():
                if self.reason:
                    msg = "'%s' : %s" % (
                            fn.__name__,
                            self.reason
                        )
                else:
                    msg = "'%s': %s" % (
                            fn.__name__, self.predicate
                        )
                raise SkipTest(msg)
            else:
                if self._fails_on:
                    with self._fails_on.fail_if(name=fn.__name__):
                        return fn(*args, **kw)
                else:
                    return fn(*args, **kw)
        return decorate(fn)
项目:nifpga-python    作者:ni    | 项目源码 | 文件源码
def test_status_exceptions_can_be_pickled_across_processes(self):
        try:
            import jobrunner
        except ImportError:
            raise SkipTest("jobrunner not installed, skipping")
        runner = jobrunner.JobRunner(jobrunner.JobRunner.RUN_MODE_MULTIPROCESS,
                                     runnables=[raise_an_exception],
                                     auto_assert=False)
        result = runner.run()[0]
        self.assertTrue(result.exception_occured())
        self.assertEqual(str(result.err_type), str(nifpga.FifoTimeoutError))
        self.assertIn("session: 0xbeef", result.err_class)
        if python_version == 2:
            self.assertIn("a bogus string argument: 'I am a string'", result.err_class)
        else:
            self.assertIn("a bogus string argument: b'I am a string'", result.err_class)
项目:python-group-proj    作者:Sharcee    | 项目源码 | 文件源码
def skip_if(predicate, reason=None):
    """Skip a test if predicate is true."""
    reason = reason or predicate.__name__

    from nose import SkipTest

    def decorate(fn):
        fn_name = fn.__name__

        def maybe(*args, **kw):
            if predicate():
                msg = "'%s' skipped: %s" % (
                    fn_name, reason)
                raise SkipTest(msg)
            else:
                return fn(*args, **kw)
        return function_named(maybe, fn_name)
    return decorate
项目:logfury    作者:ppolewicz    | 项目源码 | 文件源码
def test_complex_signature_py3(self):
        if six.PY2:
            raise SkipTest()
        with LogCapture() as l:
            # without this exec Python 2 and pyflakes complain about syntax errors etc
            exec (
                """@trace_call(self.logger)
def foo(a, b, c, d, e, *varargs_, f=None, g='G', h='H', i='ii', j='jj', **varkwargs_: None):
    pass
foo('a', 'b', *['c', 'd'], e='E', f='F', Z='Z', **{'g':'g', 'h':'h'})
""", locals(), globals()
            )
            l.check(
                (
                    'test.v0_1.test_base',
                    'DEBUG',
                    "calling foo(a='a', b='b', c='c', d='d', e='E', f='F', "
                    "g='g', h='h', varkwargs_={'Z': 'Z'}, varargs_=<class '%s._empty'>, "
                    "i='ii', j='jj')" % (INSPECT_MODULE_NAME,)  # prefix does not work because of the eval, inspect module is for pypy3
                ),
            )
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def testWLS(self):
        # WLS centered SS changed (fixed) in 0.5.0
        sm_version = sm.version.version
        if sm_version < LooseVersion('0.5.0'):
            raise nose.SkipTest("WLS centered SS not fixed in statsmodels"
                                " version {0}".format(sm_version))

        X = DataFrame(np.random.randn(30, 4), columns=['A', 'B', 'C', 'D'])
        Y = Series(np.random.randn(30))
        weights = X.std(1)

        self._check_wls(X, Y, weights)

        weights.ix[[5, 15]] = np.nan
        Y[[2, 21]] = np.nan
        self._check_wls(X, Y, weights)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_compat():
    # test we have compat with our version of nu

    from pandas.computation import _NUMEXPR_INSTALLED
    try:
        import numexpr as ne
        ver = ne.__version__
        if ver == LooseVersion('2.4.4'):
            assert not _NUMEXPR_INSTALLED
        elif ver < LooseVersion('2.1'):
            with tm.assert_produces_warning(UserWarning,
                                            check_stacklevel=False):
                assert not _NUMEXPR_INSTALLED
        else:
            assert _NUMEXPR_INSTALLED

    except ImportError:
        raise nose.SkipTest("not testing numexpr version compat")
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def check_invalid_numexpr_version(engine, parser):
    def testit():
        a, b = 1, 2
        res = pd.eval('a + b', engine=engine, parser=parser)
        tm.assert_equal(res, 3)

    if engine == 'numexpr':
        try:
            import numexpr as ne
        except ImportError:
            raise nose.SkipTest("no numexpr")
        else:
            if ne.__version__ < LooseVersion('2.1'):
                with tm.assertRaisesRegexp(ImportError, "'numexpr' version is "
                                           ".+, must be >= 2.1"):
                    testit()
            elif ne.__version__ == LooseVersion('2.4.4'):
                raise nose.SkipTest("numexpr version==2.4.4")
            else:
                testit()
    else:
        testit()
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def get_expected_pow_result(self, lhs, rhs):
        try:
            expected = _eval_single_bin(lhs, '**', rhs, self.engine)
        except ValueError as e:
            msg = 'negative number cannot be raised to a fractional power'
            try:
                emsg = e.message
            except AttributeError:
                emsg = e

            emsg = u(emsg)

            if emsg == msg:
                if self.engine == 'python':
                    raise nose.SkipTest(emsg)
                else:
                    expected = np.nan
            else:
                raise
        return expected
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_legacy_pickle(self):
        if PY3:
            raise nose.SkipTest("testing for legacy pickles not "
                                "support on py3")

        path = tm.get_data_path('multiindex_v1.pickle')
        obj = pd.read_pickle(path)

        obj2 = MultiIndex.from_tuples(obj.values)
        self.assertTrue(obj.equals(obj2))

        res = obj.get_indexer(obj)
        exp = np.arange(len(obj))
        assert_almost_equal(res, exp)

        res = obj.get_indexer(obj2[::-1])
        exp = obj.get_indexer(obj[::-1])
        exp2 = obj2.get_indexer(obj2[::-1])
        assert_almost_equal(res, exp)
        assert_almost_equal(exp, exp2)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_quantile_interpolation_np_lt_1p9(self):
        # GH #10174
        if not _np_version_under1p9:
            raise nose.SkipTest("Numpy version is greater than 1.9")

        from numpy import percentile

        # interpolation = linear (default case)
        q = self.ts.quantile(0.1, interpolation='linear')
        self.assertEqual(q, percentile(self.ts.valid(), 10))
        q1 = self.ts.quantile(0.1)
        self.assertEqual(q1, percentile(self.ts.valid(), 10))

        # interpolation other than linear
        expErrMsg = "Interpolation methods other than "
        with tm.assertRaisesRegexp(ValueError, expErrMsg):
            self.ts.quantile(0.9, interpolation='nearest')

        # object dtype
        with tm.assertRaisesRegexp(ValueError, expErrMsg):
            q = Series(self.ts, dtype=object).quantile(0.7,
                                                       interpolation='higher')
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_to_excel(self):
        try:
            import xlwt  # noqa
            import xlrd  # noqa
            import openpyxl  # noqa
            from pandas.io.excel import ExcelFile
        except ImportError:
            raise nose.SkipTest("need xlwt xlrd openpyxl")

        for ext in ['xls', 'xlsx']:
            path = '__tmp__.' + ext
            with ensure_clean(path) as path:
                self.panel.to_excel(path)
                try:
                    reader = ExcelFile(path)
                except ImportError:
                    raise nose.SkipTest("need xlwt xlrd openpyxl")

                for item, df in self.panel.iteritems():
                    recdf = reader.parse(str(item), index_col=0)
                    assert_frame_equal(df, recdf)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_to_excel_xlsxwriter(self):
        try:
            import xlrd  # noqa
            import xlsxwriter  # noqa
            from pandas.io.excel import ExcelFile
        except ImportError:
            raise nose.SkipTest("Requires xlrd and xlsxwriter. Skipping test.")

        path = '__tmp__.xlsx'
        with ensure_clean(path) as path:
            self.panel.to_excel(path, engine='xlsxwriter')
            try:
                reader = ExcelFile(path)
            except ImportError as e:
                raise nose.SkipTest("cannot write excel file: %s" % e)

            for item, df in self.panel.iteritems():
                recdf = reader.parse(str(item), index_col=0)
                assert_frame_equal(df, recdf)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_setitem_ndarray(self):
        raise nose.SkipTest("skipping for now")
    #    from pandas import DateRange, datetools

    #    timeidx = DateRange(start=datetime(2009,1,1),
    #                        end=datetime(2009,12,31),
    #                        offset=datetools.MonthEnd())
    #    lons_coarse = np.linspace(-177.5, 177.5, 72)
    #    lats_coarse = np.linspace(-87.5, 87.5, 36)
    #    P = Panel(items=timeidx, major_axis=lons_coarse,
    #              minor_axis=lats_coarse)
    #    data = np.random.randn(72*36).reshape((72,36))
    #    key = datetime(2009,2,28)
    #    P[key] = data#

    #    assert_almost_equal(P[key].values, data)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_shift(self):
        raise nose.SkipTest("skipping for now")
    #    # major
    #    idx = self.panel.major_axis[0]
    #    idx_lag = self.panel.major_axis[1]

    #    shifted = self.panel.shift(1)

    #    assert_frame_equal(self.panel.major_xs(idx),
    #                       shifted.major_xs(idx_lag))

    #    # minor
    #    idx = self.panel.minor_axis[0]
    #    idx_lag = self.panel.minor_axis[1]

    #    shifted = self.panel.shift(1, axis='minor')

    #    assert_frame_equal(self.panel.minor_xs(idx),
    #                       shifted.minor_xs(idx_lag))

    #    self.assertRaises(Exception, self.panel.shift, 1, axis='items')
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_multiindex_get(self):
        raise nose.SkipTest("skipping for now")
    #    ind = MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1), ('b',2)],
    #                                 names=['first', 'second'])
    #    wp = Panel(np.random.random((4,5,5)),
    #                                items=ind,
    #                                major_axis=np.arange(5),
    #                                minor_axis=np.arange(5))
    #    f1 = wp['a']
    #    f2 = wp.ix['a']
    #    assert_panel_equal(f1, f2)

    #    self.assertTrue((f1.items == [1, 2]).all())
    #    self.assertTrue((f2.items == [1, 2]).all())

    #    ind = MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)],
    #                                 names=['first', 'second'])
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_unpacker_hook_refcnt(self):
        if not hasattr(sys, 'getrefcount'):
            raise nose.SkipTest('no sys.getrefcount()')
        result = []

        def hook(x):
            result.append(x)
            return x

        basecnt = sys.getrefcount(hook)

        up = Unpacker(object_hook=hook, list_hook=hook)

        assert sys.getrefcount(hook) >= basecnt + 2

        up.feed(packb([{}]))
        up.feed(packb([{}]))
        assert up.unpack() == [{}]
        assert up.unpack() == [{}]
        assert result == [{}, [{}], {}, [{}]]

        del up

        assert sys.getrefcount(hook) == basecnt
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_built_in_round(self):
        if not compat.PY3:
            raise nose.SkipTest("build in round cannot be overriden "
                                "prior to Python 3")

        # GH11763
        # Here's the test frame we'll be working with
        df = DataFrame(
            {'col1': [1.123, 2.123, 3.123], 'col2': [1.234, 2.234, 3.234]})

        # Default round to integer (i.e. decimals=0)
        expected_rounded = DataFrame(
            {'col1': [1., 2., 3.], 'col2': [1., 2., 3.]})
        assert_frame_equal(round(df), expected_rounded)

    # Clip
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_split_block_at(self):

        # with dup column support this method was taken out
        # GH3679
        raise nose.SkipTest("skipping for now")

        bs = list(self.fblock.split_block_at('a'))
        self.assertEqual(len(bs), 1)
        self.assertTrue(np.array_equal(bs[0].items, ['c', 'e']))

        bs = list(self.fblock.split_block_at('c'))
        self.assertEqual(len(bs), 2)
        self.assertTrue(np.array_equal(bs[0].items, ['a']))
        self.assertTrue(np.array_equal(bs[1].items, ['e']))

        bs = list(self.fblock.split_block_at('e'))
        self.assertEqual(len(bs), 1)
        self.assertTrue(np.array_equal(bs[0].items, ['a', 'c']))

        # bblock = get_bool_ex(['f'])
        # bs = list(bblock.split_block_at('f'))
        # self.assertEqual(len(bs), 0)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_partial_ix_missing(self):
        raise nose.SkipTest("skipping for now")

        result = self.ymd.ix[2000, 0]
        expected = self.ymd.ix[2000]['A']
        assert_series_equal(result, expected)

        # need to put in some work here

        # self.ymd.ix[2000, 0] = 0
        # self.assertTrue((self.ymd.ix[2000]['A'] == 0).all())

        # Pretty sure the second (and maybe even the first) is already wrong.
        self.assertRaises(Exception, self.ymd.ix.__getitem__, (2000, 6))
        self.assertRaises(Exception, self.ymd.ix.__getitem__, (2000, 6), 0)

    # ---------------------------------------------------------------------
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_header_not_enough_lines_as_recarray(self):

        if compat.is_platform_windows():
            raise nose.SkipTest(
                "segfaults on win-64, only when all tests are run")

        data = ('skip this\n'
                'skip this\n'
                'a,b,c\n'
                '1,2,3\n'
                '4,5,6')

        reader = TextReader(StringIO(data), delimiter=',', header=2,
                            as_recarray=True)
        header = reader.header
        expected = [['a', 'b', 'c']]
        self.assertEqual(header, expected)

        recs = reader.read()
        expected = {'a': [1, 4], 'b': [2, 5], 'c': [3, 6]}
        assert_array_dicts_equal(expected, recs)

        # not enough rows
        self.assertRaises(parser.CParserError, TextReader, StringIO(data),
                          delimiter=',', header=5, as_recarray=True)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_numpy_string_dtype_as_recarray(self):
        data = """\
a,1
aa,2
aaa,3
aaaa,4
aaaaa,5"""

        if compat.is_platform_windows():
            raise nose.SkipTest(
                "segfaults on win-64, only when all tests are run")

        def _make_reader(**kwds):
            return TextReader(StringIO(data), delimiter=',', header=None,
                              **kwds)

        reader = _make_reader(dtype='S4', as_recarray=True)
        result = reader.read()
        self.assertEqual(result['0'].dtype, 'S4')
        ex_values = np.array(['a', 'aa', 'aaa', 'aaaa', 'aaaa'], dtype='S4')
        self.assertTrue((result['0'] == ex_values).all())
        self.assertEqual(result['1'].dtype, 'S4')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def configure(self, options, conf):
        super(NoseSQLAlchemy, self).configure(options, conf)
        plugin_base.pre_begin(options)

        plugin_base.set_coverage_flag(options.enable_plugin_coverage)

        plugin_base.set_skip_test(nose.SkipTest)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def skip(reason):
    """
    Unconditionally skip a test.
    """
    def decorator(test_item):
        if not (isinstance(test_item, type) and issubclass(test_item, TestCase)):
            @functools.wraps(test_item)
            def skip_wrapper(*args, **kwargs):
                raise SkipTest(reason)
            test_item = skip_wrapper

        test_item.__unittest_skip__ = True
        test_item.__unittest_skip_why__ = reason
        return test_item
    return decorator
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def setUp(self):
        super(BaseZMQTestCase, self).setUp()
        if self.green and not have_gevent:
                raise SkipTest("requires gevent")
        self.context = self.Context.instance()
        self.sockets = []
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def ping_pong_json(self, s1, s2, o):
        if jsonapi.jsonmod is None:
            raise SkipTest("No json library")
        s1.send_json(o)
        o2 = s2.recv_json()
        s2.send_json(o2)
        o3 = s1.recv_json()
        return o3
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def assertRaisesErrno(self, errno, func, *args, **kwargs):
        if errno == zmq.EAGAIN:
            raise SkipTest("Skipping because we're green.")
        try:
            func(*args, **kwargs)
        except zmq.ZMQError:
            e = sys.exc_info()[1]
            self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \
got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
        else:
            self.fail("Function did not raise any error")
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def skip_green(self):
        raise SkipTest("Skipping because we are green")
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def setUp(self):
        if not socks:
            raise nose.SkipTest('socks module unavailable')
        if not subprocess:
            raise nose.SkipTest('subprocess module unavailable')

        # start a short-lived miniserver so we can get a likely port
        # for the proxy
        self.httpd, self.proxyport = miniserver.start_server(
            miniserver.ThisDirHandler)
        self.httpd.shutdown()
        self.httpd, self.port = miniserver.start_server(
            miniserver.ThisDirHandler)

        self.pidfile = tempfile.mktemp()
        self.logfile = tempfile.mktemp()
        fd, self.conffile = tempfile.mkstemp()
        f = os.fdopen(fd, 'w')
        our_cfg = tinyproxy_cfg % {'user': os.getlogin(),
                                   'pidfile': self.pidfile,
                                   'port': self.proxyport,
                                   'logfile': self.logfile}
        f.write(our_cfg)
        f.close()
        try:
            # TODO use subprocess.check_call when 2.4 is dropped
            ret = subprocess.call(['tinyproxy', '-c', self.conffile])
            self.assertEqual(0, ret)
        except OSError, e:
            if e.errno == errno.ENOENT:
                raise nose.SkipTest('tinyproxy not available')
            raise
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def configure(self, options, conf):
        super(NoseSQLAlchemy, self).configure(options, conf)
        plugin_base.pre_begin(options)

        plugin_base.set_coverage_flag(options.enable_plugin_coverage)

        plugin_base.set_skip_test(nose.SkipTest)
项目:pathspider    作者:mami-project    | 项目源码 | 文件源码
def test_plugin_load():
    try:
        # attempt to load dependencies of plugins
        import pldns
    except ImportError:
        raise nose.SkipTest

    expected_names = set(['TFO', 'ECN', 'DSCP', 'UDPZero', 'UDPOpts', 'DNSResolv', 'H2', 'EvilBit'])
    names = set()

    for plugin in pathspider.cmd.measure.plugins:
        assert issubclass(plugin, pathspider.base.Spider)
        names.add(plugin.__name__)

    assert names == expected_names
项目:pyexcel-odsr    作者:pyexcel    | 项目源码 | 文件源码
def test_issue_23():
    if not IN_TRAVIS:
        raise SkipTest()
    pe.get_book(url="https://github.com/pyexcel/pyexcel-ods/raw/master/tests/fixtures/white_space.ods", library='pyexcel-odsr');  # flake8: noqa
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def setup_class(cls):
        if p.toolkit.check_ckan_version(max_version='2.4.99'):
            raise nose.SkipTest()

        super(TestTranslations, cls).setup_class()
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def setup_class(cls):
        if not tests.is_datastore_supported():
            raise nose.SkipTest("Datastore not supported")
        p.load('datastore')
        ctd.CreateTestData.create()
        cls.sysadmin_user = model.User.get('testsysadmin')
        cls.normal_user = model.User.get('annafan')
        resource = model.Package.get('annakarenina').resources[0]
        cls.data = dict(
            resource_id=resource.id,
            force=True,
            fields=[
              {'id': 'id'},
              {'id': 'date', 'type':'date'},
              {'id': 'x'},
              {'id': 'y'},
              {'id': 'z'},
              {'id': 'country'},
              {'id': 'title'},
              {'id': 'lat'},
              {'id': 'lon'}
            ],
            records=[
              {'id': 0, 'date': '2011-01-01', 'x': 1, 'y': 2, 'z': 3, 'country': 'DE', 'title': 'first 99', 'lat':52.56, 'lon':13.40},
              {'id': 1, 'date': '2011-02-02', 'x': 2, 'y': 4, 'z': 24, 'country': 'UK', 'title': 'second', 'lat':54.97, 'lon':-1.60},
              {'id': 2, 'date': '2011-03-03', 'x': 3, 'y': 6, 'z': 9, 'country': 'US', 'title': 'third', 'lat':40.00, 'lon':-75.5},
              {'id': 3, 'date': '2011-04-04', 'x': 4, 'y': 8, 'z': 6, 'country': 'UK', 'title': 'fourth', 'lat':57.27, 'lon':-6.20},
              {'id': 4, 'date': '2011-05-04', 'x': 5, 'y': 10, 'z': 15, 'country': 'UK', 'title': 'fifth', 'lat':51.58, 'lon':0},
              {'id': 5, 'date': '2011-06-02', 'x': 6, 'y': 12, 'z': 18, 'country': 'DE', 'title': 'sixth 53.56', 'lat':51.04, 'lon':7.9}
            ]
        )
        postparams = '%s=1' % json.dumps(cls.data)
        auth = {'Authorization': str(cls.normal_user.apikey)}
        res = cls.app.post('/api/action/datastore_create', params=postparams,
                           extra_environ=auth)
        res_dict = json.loads(res.body)
        assert res_dict['success'] is True
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_pg_version_check(self):
        if not tests.is_datastore_supported():
            raise nose.SkipTest("Datastore not supported")
        engine = db._get_engine(
            {'connection_url': config['sqlalchemy.url']})
        connection = engine.connect()
        assert db._pg_version_is_at_least(connection, '8.0')
        assert not db._pg_version_is_at_least(connection, '10.0')
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def setup_class(cls):
        wsgiapp = middleware.make_app(config['global_conf'], **config)
        cls.app = paste.fixture.TestApp(wsgiapp)
        if not tests.is_datastore_supported():
            raise nose.SkipTest("Datastore not supported")
        p.load('datastore')
        ctd.CreateTestData.create()
        cls.sysadmin_user = model.User.get('testsysadmin')
        cls.normal_user = model.User.get('annafan')
        resource = model.Package.get('annakarenina').resources[0]
        cls.data = {
            'resource_id': resource.id,
            'force': True,
            'aliases': 'books',
            'fields': [{'id': u'b\xfck', 'type': 'text'},
                       {'id': 'author', 'type': 'text'},
                       {'id': 'published'},
                       {'id': u'characters', u'type': u'_text'},
                       {'id': 'random_letters', 'type': 'text[]'}],
            'records': [{u'b\xfck': 'annakarenina',
                         'author': 'tolstoy',
                         'published': '2005-03-01',
                         'nested': ['b', {'moo': 'moo'}],
                         u'characters': [u'Princess Anna', u'Sergius'],
                         'random_letters': ['a', 'e', 'x']},
                        {u'b\xfck': 'warandpeace', 'author': 'tolstoy',
                         'nested': {'a': 'b'}, 'random_letters': []}]
        }
        postparams = '%s=1' % json.dumps(cls.data)
        auth = {'Authorization': str(cls.sysadmin_user.apikey)}
        res = cls.app.post('/api/action/datastore_create', params=postparams,
                           extra_environ=auth)
        res_dict = json.loads(res.body)
        assert res_dict['success'] is True

        engine = db._get_engine({
            'connection_url': config['ckan.datastore.write_url']})
        cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def setup_class(cls):
        if not tests.is_datastore_supported():
            raise nose.SkipTest("Datastore not supported")
        p.load('datastore')
        ctd.CreateTestData.create()
        cls.sysadmin_user = model.User.get('testsysadmin')
        cls.normal_user = model.User.get('annafan')
        set_url_type(
            model.Package.get('annakarenina').resources, cls.sysadmin_user)
        resource = model.Package.get('annakarenina').resources[0]
        cls.data = {
            'resource_id': resource.id,
            'fields': [{'id': u'b\xfck', 'type': 'text'},
                       {'id': 'author', 'type': 'text'},
                       {'id': 'nested', 'type': 'json'},
                       {'id': 'characters', 'type': 'text[]'},
                       {'id': 'published'}],
            'primary_key': u'b\xfck',
            'records': [{u'b\xfck': 'annakarenina', 'author': 'tolstoy',
                        'published': '2005-03-01', 'nested': ['b', {'moo': 'moo'}]},
                        {u'b\xfck': 'warandpeace', 'author': 'tolstoy',
                        'nested': {'a':'b'}}
                       ]
            }
        postparams = '%s=1' % json.dumps(cls.data)
        auth = {'Authorization': str(cls.sysadmin_user.apikey)}
        res = cls.app.post('/api/action/datastore_create', params=postparams,
                           extra_environ=auth)
        res_dict = json.loads(res.body)
        assert res_dict['success'] is True

        engine = db._get_engine(
            {'connection_url': config['ckan.datastore.write_url']})
        cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def setup_class(cls):
        if not tests.is_datastore_supported():
            raise nose.SkipTest("Datastore not supported")
        p.load('datastore')
        ctd.CreateTestData.create()
        cls.sysadmin_user = model.User.get('testsysadmin')
        cls.normal_user = model.User.get('annafan')
        set_url_type(
            model.Package.get('annakarenina').resources, cls.sysadmin_user)
        resource = model.Package.get('annakarenina').resources[0]
        cls.data = {
            'resource_id': resource.id,
            'fields': [{'id': u'b\xfck', 'type': 'text'},
                       {'id': 'author', 'type': 'text'},
                       {'id': 'nested', 'type': 'json'},
                       {'id': 'characters', 'type': 'text[]'},
                       {'id': 'published'}],
            'primary_key': u'b\xfck',
            'records': [{u'b\xfck': 'annakarenina', 'author': 'tolstoy',
                        'published': '2005-03-01', 'nested': ['b', {'moo': 'moo'}]},
                        {u'b\xfck': 'warandpeace', 'author': 'tolstoy',
                        'nested': {'a':'b'}}
                       ]
            }
        postparams = '%s=1' % json.dumps(cls.data)
        auth = {'Authorization': str(cls.sysadmin_user.apikey)}
        res = cls.app.post('/api/action/datastore_create', params=postparams,
                           extra_environ=auth)
        res_dict = json.loads(res.body)
        assert res_dict['success'] is True

        engine = db._get_engine(
            {'connection_url': config['ckan.datastore.write_url']})
        cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def setup_class(cls):
        if not tests.is_datastore_supported():
            raise nose.SkipTest("Datastore not supported")
        p.load('datastore')
        ctd.CreateTestData.create()
        cls.sysadmin_user = model.User.get('testsysadmin')
        cls.normal_user = model.User.get('annafan')
        set_url_type(
            model.Package.get('annakarenina').resources, cls.sysadmin_user)
        resource = model.Package.get('annakarenina').resources[0]
        hhguide = u"hitchhiker's guide to the galaxy"
        cls.data = {
            'resource_id': resource.id,
            'fields': [{'id': u'b\xfck', 'type': 'text'},
                       {'id': 'author', 'type': 'text'},
                       {'id': 'nested', 'type': 'json'},
                       {'id': 'characters', 'type': 'text[]'},
                       {'id': 'published'}],
            'primary_key': u'b\xfck',
            'records': [{u'b\xfck': 'annakarenina', 'author': 'tolstoy',
                        'published': '2005-03-01', 'nested': ['b', {'moo': 'moo'}]},
                        {u'b\xfck': 'warandpeace', 'author': 'tolstoy',
                        'nested': {'a':'b'}},
                        {'author': 'adams',
                        'characters': ['Arthur Dent', 'Marvin'],
                        'nested': {'foo': 'bar'},
                        u'b\xfck': hhguide}
                       ]
            }
        postparams = '%s=1' % json.dumps(cls.data)
        auth = {'Authorization': str(cls.sysadmin_user.apikey)}
        res = cls.app.post('/api/action/datastore_create', params=postparams,
                           extra_environ=auth)
        res_dict = json.loads(res.body)
        assert res_dict['success'] is True

        engine = db._get_engine(
            {'connection_url': config['ckan.datastore.write_url']})
        cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def setup_class(cls):
        if not is_datastore_supported():
            raise nose.SkipTest("Datastore not supported")
        plugin = p.load('datastore')
        if plugin.legacy_mode:
            # make sure we undo adding the plugin
            p.unload('datastore')
            raise nose.SkipTest("Info is not supported in legacy mode")
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def setup_class(cls):

        wsgiapp = middleware.make_app(config['global_conf'], **config)
        cls.app = paste.fixture.TestApp(wsgiapp)
        if not tests.is_datastore_supported():
            raise nose.SkipTest("Datastore not supported")
        p.load('datastore')
        ctd.CreateTestData.create()
        cls.sysadmin_user = model.User.get('testsysadmin')
        cls.normal_user = model.User.get('annafan')
        engine = db._get_engine(
            {'connection_url': config['ckan.datastore.write_url']})
        cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
        set_url_type(
            model.Package.get('annakarenina').resources, cls.sysadmin_user)
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def setup_class(cls):
        wsgiapp = middleware.make_app(config['global_conf'], **config)
        cls.app = paste.fixture.TestApp(wsgiapp)
        if not tests.is_datastore_supported():
            raise nose.SkipTest("Datastore not supported")
        p.load('datastore')
        p.load('datapusher')
        ctd.CreateTestData.create()
        cls.sysadmin_user = model.User.get('testsysadmin')
        cls.normal_user = model.User.get('annafan')
        engine = db._get_engine(
            {'connection_url': config['ckan.datastore.write_url']})
        cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
        set_url_type(
            model.Package.get('annakarenina').resources, cls.sysadmin_user)