我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用nose.SkipTest()。
def skip_if(predicate, reason=None): """Skip a test if predicate is true.""" reason = reason or predicate.__name__ from nose import SkipTest def decorate(fn): fn_name = fn.__name__ def maybe(*args, **kw): if predicate(): msg = "'%s' skipped: %s" % ( fn_name, reason) raise SkipTest(msg) else: return fn(*args, **kw) return function_named(maybe, fn_name) return decorate
def setup_class(cls): if not tests.is_datastore_supported(): raise nose.SkipTest("Datastore not supported") p.load('datastore') ctd.CreateTestData.create() cls.sysadmin_user = model.User.get('testsysadmin') cls.normal_user = model.User.get('annafan') resource = model.Package.get('annakarenina').resources[0] cls.data = { 'resource_id': resource.id, 'aliases': u'b\xfck2', 'fields': [{'id': 'book', 'type': 'text'}, {'id': 'author', 'type': 'text'}, {'id': 'rating with %', 'type': 'text'}], 'records': [{'book': 'annakarenina', 'author': 'tolstoy', 'rating with %': '90%'}, {'book': 'warandpeace', 'author': 'tolstoy', 'rating with %': '42%'}] } engine = db._get_engine( {'connection_url': config['ckan.datastore.write_url']}) cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine)) set_url_type( model.Package.get('annakarenina').resources, cls.sysadmin_user)
def setup_class(cls): wsgiapp = middleware.make_app(config['global_conf'], **config) cls.app = paste.fixture.TestApp(wsgiapp) if not tests.is_datastore_supported(): raise nose.SkipTest("Datastore not supported") p.load('datastore') p.load('datapusher') p.load('test_datapusher_plugin') resource = factories.Resource(url_type='datastore') cls.dataset = factories.Dataset(resources=[resource]) cls.sysadmin_user = factories.User(name='testsysadmin', sysadmin=True) cls.normal_user = factories.User(name='annafan') engine = db._get_engine( {'connection_url': config['ckan.datastore.write_url']}) cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
def test_interrupted_systemcall(self): ''' Make sure interrupted system calls don't break the world, since we can't control what all signals our connection thread will get ''' if 'linux' not in platform: raise SkipTest('Unable to reproduce error case on' ' non-linux platforms') path = 'interrupt_test' value = b"1" self.client.create(path, value) # set the euid to the current process' euid. # glibc sends SIGRT to all children, which will interrupt the # system call os.seteuid(os.geteuid()) # basic sanity test that it worked alright assert self.client.get(path)[0] == value
def setup_class(cls): if not pylons.config.get('ckan.datastore.read_url'): raise nose.SkipTest('Datastore runs on legacy mode, skipping...') engine = db._get_engine( {'connection_url': pylons.config['ckan.datastore.write_url']} ) cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine)) datastore_test_helpers.clear_db(cls.Session) create_tables = [ u'CREATE TABLE test_a (id_a text)', u'CREATE TABLE test_b (id_b text)', u'CREATE TABLE "TEST_C" (id_c text)', u'CREATE TABLE test_d ("?/?" integer)', ] for create_table_sql in create_tables: cls.Session.execute(create_table_sql)
def skip(msg=None): """Decorator factory - mark a test function for skipping from test suite. Parameters ---------- msg : string Optional message to be added. Returns ------- decorator : function Decorator, which, when applied to a function, causes SkipTest to be raised, with the optional message added. """ return skipif(True,msg)
def test_obj_del(self): """Test that object's __del__ methods are called on exit.""" if sys.platform == 'win32': try: import win32api except ImportError: raise SkipTest("Test requires pywin32") src = ("class A(object):\n" " def __del__(self):\n" " print 'object A deleted'\n" "a = A()\n") self.mktmp(py3compat.doctest_refactor_print(src)) if dec.module_not_available('sqlite3'): err = 'WARNING: IPython History requires SQLite, your history will not be saved\n' else: err = None tt.ipexec_validate(self.fname, 'object A deleted', err)
def test_not_writable_ipdir(): tmpdir = tempfile.mkdtemp() os.name = "posix" env.pop('IPYTHON_DIR', None) env.pop('IPYTHONDIR', None) env.pop('XDG_CONFIG_HOME', None) env['HOME'] = tmpdir ipdir = os.path.join(tmpdir, '.ipython') os.mkdir(ipdir, 0o555) try: open(os.path.join(ipdir, "_foo_"), 'w').close() except IOError: pass else: # I can still write to an unwritable dir, # assume I'm root and skip the test raise SkipTest("I can't create directories that I can't write to") with AssertPrints('is not a writable location', channel='stderr'): ipdir = paths.get_ipython_dir() env.pop('IPYTHON_DIR', None)
def monkeypatch_xunit(): try: knownfailureif(True)(lambda: None)() except Exception as e: KnownFailureTest = type(e) def addError(self, test, err, capt=None): if issubclass(err[0], KnownFailureTest): err = (SkipTest,) + err[1:] return self.orig_addError(test, err, capt) Xunit.orig_addError = Xunit.addError Xunit.addError = addError #----------------------------------------------------------------------------- # Check which dependencies are installed and greater than minimum version. #-----------------------------------------------------------------------------
def __call__(self, fn): @decorator def decorate(fn, *args, **kw): if self.predicate(): if self.reason: msg = "'%s' : %s" % ( fn.__name__, self.reason ) else: msg = "'%s': %s" % ( fn.__name__, self.predicate ) raise SkipTest(msg) else: if self._fails_on: with self._fails_on.fail_if(name=fn.__name__): return fn(*args, **kw) else: return fn(*args, **kw) return decorate(fn)
def test_status_exceptions_can_be_pickled_across_processes(self): try: import jobrunner except ImportError: raise SkipTest("jobrunner not installed, skipping") runner = jobrunner.JobRunner(jobrunner.JobRunner.RUN_MODE_MULTIPROCESS, runnables=[raise_an_exception], auto_assert=False) result = runner.run()[0] self.assertTrue(result.exception_occured()) self.assertEqual(str(result.err_type), str(nifpga.FifoTimeoutError)) self.assertIn("session: 0xbeef", result.err_class) if python_version == 2: self.assertIn("a bogus string argument: 'I am a string'", result.err_class) else: self.assertIn("a bogus string argument: b'I am a string'", result.err_class)
def test_complex_signature_py3(self): if six.PY2: raise SkipTest() with LogCapture() as l: # without this exec Python 2 and pyflakes complain about syntax errors etc exec ( """@trace_call(self.logger) def foo(a, b, c, d, e, *varargs_, f=None, g='G', h='H', i='ii', j='jj', **varkwargs_: None): pass foo('a', 'b', *['c', 'd'], e='E', f='F', Z='Z', **{'g':'g', 'h':'h'}) """, locals(), globals() ) l.check( ( 'test.v0_1.test_base', 'DEBUG', "calling foo(a='a', b='b', c='c', d='d', e='E', f='F', " "g='g', h='h', varkwargs_={'Z': 'Z'}, varargs_=<class '%s._empty'>, " "i='ii', j='jj')" % (INSPECT_MODULE_NAME,) # prefix does not work because of the eval, inspect module is for pypy3 ), )
def testWLS(self): # WLS centered SS changed (fixed) in 0.5.0 sm_version = sm.version.version if sm_version < LooseVersion('0.5.0'): raise nose.SkipTest("WLS centered SS not fixed in statsmodels" " version {0}".format(sm_version)) X = DataFrame(np.random.randn(30, 4), columns=['A', 'B', 'C', 'D']) Y = Series(np.random.randn(30)) weights = X.std(1) self._check_wls(X, Y, weights) weights.ix[[5, 15]] = np.nan Y[[2, 21]] = np.nan self._check_wls(X, Y, weights)
def test_compat(): # test we have compat with our version of nu from pandas.computation import _NUMEXPR_INSTALLED try: import numexpr as ne ver = ne.__version__ if ver == LooseVersion('2.4.4'): assert not _NUMEXPR_INSTALLED elif ver < LooseVersion('2.1'): with tm.assert_produces_warning(UserWarning, check_stacklevel=False): assert not _NUMEXPR_INSTALLED else: assert _NUMEXPR_INSTALLED except ImportError: raise nose.SkipTest("not testing numexpr version compat")
def check_invalid_numexpr_version(engine, parser): def testit(): a, b = 1, 2 res = pd.eval('a + b', engine=engine, parser=parser) tm.assert_equal(res, 3) if engine == 'numexpr': try: import numexpr as ne except ImportError: raise nose.SkipTest("no numexpr") else: if ne.__version__ < LooseVersion('2.1'): with tm.assertRaisesRegexp(ImportError, "'numexpr' version is " ".+, must be >= 2.1"): testit() elif ne.__version__ == LooseVersion('2.4.4'): raise nose.SkipTest("numexpr version==2.4.4") else: testit() else: testit()
def get_expected_pow_result(self, lhs, rhs): try: expected = _eval_single_bin(lhs, '**', rhs, self.engine) except ValueError as e: msg = 'negative number cannot be raised to a fractional power' try: emsg = e.message except AttributeError: emsg = e emsg = u(emsg) if emsg == msg: if self.engine == 'python': raise nose.SkipTest(emsg) else: expected = np.nan else: raise return expected
def test_legacy_pickle(self): if PY3: raise nose.SkipTest("testing for legacy pickles not " "support on py3") path = tm.get_data_path('multiindex_v1.pickle') obj = pd.read_pickle(path) obj2 = MultiIndex.from_tuples(obj.values) self.assertTrue(obj.equals(obj2)) res = obj.get_indexer(obj) exp = np.arange(len(obj)) assert_almost_equal(res, exp) res = obj.get_indexer(obj2[::-1]) exp = obj.get_indexer(obj[::-1]) exp2 = obj2.get_indexer(obj2[::-1]) assert_almost_equal(res, exp) assert_almost_equal(exp, exp2)
def test_quantile_interpolation_np_lt_1p9(self): # GH #10174 if not _np_version_under1p9: raise nose.SkipTest("Numpy version is greater than 1.9") from numpy import percentile # interpolation = linear (default case) q = self.ts.quantile(0.1, interpolation='linear') self.assertEqual(q, percentile(self.ts.valid(), 10)) q1 = self.ts.quantile(0.1) self.assertEqual(q1, percentile(self.ts.valid(), 10)) # interpolation other than linear expErrMsg = "Interpolation methods other than " with tm.assertRaisesRegexp(ValueError, expErrMsg): self.ts.quantile(0.9, interpolation='nearest') # object dtype with tm.assertRaisesRegexp(ValueError, expErrMsg): q = Series(self.ts, dtype=object).quantile(0.7, interpolation='higher')
def test_to_excel(self): try: import xlwt # noqa import xlrd # noqa import openpyxl # noqa from pandas.io.excel import ExcelFile except ImportError: raise nose.SkipTest("need xlwt xlrd openpyxl") for ext in ['xls', 'xlsx']: path = '__tmp__.' + ext with ensure_clean(path) as path: self.panel.to_excel(path) try: reader = ExcelFile(path) except ImportError: raise nose.SkipTest("need xlwt xlrd openpyxl") for item, df in self.panel.iteritems(): recdf = reader.parse(str(item), index_col=0) assert_frame_equal(df, recdf)
def test_to_excel_xlsxwriter(self): try: import xlrd # noqa import xlsxwriter # noqa from pandas.io.excel import ExcelFile except ImportError: raise nose.SkipTest("Requires xlrd and xlsxwriter. Skipping test.") path = '__tmp__.xlsx' with ensure_clean(path) as path: self.panel.to_excel(path, engine='xlsxwriter') try: reader = ExcelFile(path) except ImportError as e: raise nose.SkipTest("cannot write excel file: %s" % e) for item, df in self.panel.iteritems(): recdf = reader.parse(str(item), index_col=0) assert_frame_equal(df, recdf)
def test_setitem_ndarray(self): raise nose.SkipTest("skipping for now") # from pandas import DateRange, datetools # timeidx = DateRange(start=datetime(2009,1,1), # end=datetime(2009,12,31), # offset=datetools.MonthEnd()) # lons_coarse = np.linspace(-177.5, 177.5, 72) # lats_coarse = np.linspace(-87.5, 87.5, 36) # P = Panel(items=timeidx, major_axis=lons_coarse, # minor_axis=lats_coarse) # data = np.random.randn(72*36).reshape((72,36)) # key = datetime(2009,2,28) # P[key] = data# # assert_almost_equal(P[key].values, data)
def test_shift(self): raise nose.SkipTest("skipping for now") # # major # idx = self.panel.major_axis[0] # idx_lag = self.panel.major_axis[1] # shifted = self.panel.shift(1) # assert_frame_equal(self.panel.major_xs(idx), # shifted.major_xs(idx_lag)) # # minor # idx = self.panel.minor_axis[0] # idx_lag = self.panel.minor_axis[1] # shifted = self.panel.shift(1, axis='minor') # assert_frame_equal(self.panel.minor_xs(idx), # shifted.minor_xs(idx_lag)) # self.assertRaises(Exception, self.panel.shift, 1, axis='items')
def test_multiindex_get(self): raise nose.SkipTest("skipping for now") # ind = MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1), ('b',2)], # names=['first', 'second']) # wp = Panel(np.random.random((4,5,5)), # items=ind, # major_axis=np.arange(5), # minor_axis=np.arange(5)) # f1 = wp['a'] # f2 = wp.ix['a'] # assert_panel_equal(f1, f2) # self.assertTrue((f1.items == [1, 2]).all()) # self.assertTrue((f2.items == [1, 2]).all()) # ind = MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)], # names=['first', 'second'])
def test_unpacker_hook_refcnt(self): if not hasattr(sys, 'getrefcount'): raise nose.SkipTest('no sys.getrefcount()') result = [] def hook(x): result.append(x) return x basecnt = sys.getrefcount(hook) up = Unpacker(object_hook=hook, list_hook=hook) assert sys.getrefcount(hook) >= basecnt + 2 up.feed(packb([{}])) up.feed(packb([{}])) assert up.unpack() == [{}] assert up.unpack() == [{}] assert result == [{}, [{}], {}, [{}]] del up assert sys.getrefcount(hook) == basecnt
def test_built_in_round(self): if not compat.PY3: raise nose.SkipTest("build in round cannot be overriden " "prior to Python 3") # GH11763 # Here's the test frame we'll be working with df = DataFrame( {'col1': [1.123, 2.123, 3.123], 'col2': [1.234, 2.234, 3.234]}) # Default round to integer (i.e. decimals=0) expected_rounded = DataFrame( {'col1': [1., 2., 3.], 'col2': [1., 2., 3.]}) assert_frame_equal(round(df), expected_rounded) # Clip
def test_split_block_at(self): # with dup column support this method was taken out # GH3679 raise nose.SkipTest("skipping for now") bs = list(self.fblock.split_block_at('a')) self.assertEqual(len(bs), 1) self.assertTrue(np.array_equal(bs[0].items, ['c', 'e'])) bs = list(self.fblock.split_block_at('c')) self.assertEqual(len(bs), 2) self.assertTrue(np.array_equal(bs[0].items, ['a'])) self.assertTrue(np.array_equal(bs[1].items, ['e'])) bs = list(self.fblock.split_block_at('e')) self.assertEqual(len(bs), 1) self.assertTrue(np.array_equal(bs[0].items, ['a', 'c'])) # bblock = get_bool_ex(['f']) # bs = list(bblock.split_block_at('f')) # self.assertEqual(len(bs), 0)
def test_partial_ix_missing(self): raise nose.SkipTest("skipping for now") result = self.ymd.ix[2000, 0] expected = self.ymd.ix[2000]['A'] assert_series_equal(result, expected) # need to put in some work here # self.ymd.ix[2000, 0] = 0 # self.assertTrue((self.ymd.ix[2000]['A'] == 0).all()) # Pretty sure the second (and maybe even the first) is already wrong. self.assertRaises(Exception, self.ymd.ix.__getitem__, (2000, 6)) self.assertRaises(Exception, self.ymd.ix.__getitem__, (2000, 6), 0) # ---------------------------------------------------------------------
def test_header_not_enough_lines_as_recarray(self): if compat.is_platform_windows(): raise nose.SkipTest( "segfaults on win-64, only when all tests are run") data = ('skip this\n' 'skip this\n' 'a,b,c\n' '1,2,3\n' '4,5,6') reader = TextReader(StringIO(data), delimiter=',', header=2, as_recarray=True) header = reader.header expected = [['a', 'b', 'c']] self.assertEqual(header, expected) recs = reader.read() expected = {'a': [1, 4], 'b': [2, 5], 'c': [3, 6]} assert_array_dicts_equal(expected, recs) # not enough rows self.assertRaises(parser.CParserError, TextReader, StringIO(data), delimiter=',', header=5, as_recarray=True)
def test_numpy_string_dtype_as_recarray(self): data = """\ a,1 aa,2 aaa,3 aaaa,4 aaaaa,5""" if compat.is_platform_windows(): raise nose.SkipTest( "segfaults on win-64, only when all tests are run") def _make_reader(**kwds): return TextReader(StringIO(data), delimiter=',', header=None, **kwds) reader = _make_reader(dtype='S4', as_recarray=True) result = reader.read() self.assertEqual(result['0'].dtype, 'S4') ex_values = np.array(['a', 'aa', 'aaa', 'aaaa', 'aaaa'], dtype='S4') self.assertTrue((result['0'] == ex_values).all()) self.assertEqual(result['1'].dtype, 'S4')
def configure(self, options, conf): super(NoseSQLAlchemy, self).configure(options, conf) plugin_base.pre_begin(options) plugin_base.set_coverage_flag(options.enable_plugin_coverage) plugin_base.set_skip_test(nose.SkipTest)
def skip(reason): """ Unconditionally skip a test. """ def decorator(test_item): if not (isinstance(test_item, type) and issubclass(test_item, TestCase)): @functools.wraps(test_item) def skip_wrapper(*args, **kwargs): raise SkipTest(reason) test_item = skip_wrapper test_item.__unittest_skip__ = True test_item.__unittest_skip_why__ = reason return test_item return decorator
def setUp(self): super(BaseZMQTestCase, self).setUp() if self.green and not have_gevent: raise SkipTest("requires gevent") self.context = self.Context.instance() self.sockets = []
def ping_pong_json(self, s1, s2, o): if jsonapi.jsonmod is None: raise SkipTest("No json library") s1.send_json(o) o2 = s2.recv_json() s2.send_json(o2) o3 = s1.recv_json() return o3
def assertRaisesErrno(self, errno, func, *args, **kwargs): if errno == zmq.EAGAIN: raise SkipTest("Skipping because we're green.") try: func(*args, **kwargs) except zmq.ZMQError: e = sys.exc_info()[1] self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno))) else: self.fail("Function did not raise any error")
def skip_green(self): raise SkipTest("Skipping because we are green")
def setUp(self): if not socks: raise nose.SkipTest('socks module unavailable') if not subprocess: raise nose.SkipTest('subprocess module unavailable') # start a short-lived miniserver so we can get a likely port # for the proxy self.httpd, self.proxyport = miniserver.start_server( miniserver.ThisDirHandler) self.httpd.shutdown() self.httpd, self.port = miniserver.start_server( miniserver.ThisDirHandler) self.pidfile = tempfile.mktemp() self.logfile = tempfile.mktemp() fd, self.conffile = tempfile.mkstemp() f = os.fdopen(fd, 'w') our_cfg = tinyproxy_cfg % {'user': os.getlogin(), 'pidfile': self.pidfile, 'port': self.proxyport, 'logfile': self.logfile} f.write(our_cfg) f.close() try: # TODO use subprocess.check_call when 2.4 is dropped ret = subprocess.call(['tinyproxy', '-c', self.conffile]) self.assertEqual(0, ret) except OSError, e: if e.errno == errno.ENOENT: raise nose.SkipTest('tinyproxy not available') raise
def test_plugin_load(): try: # attempt to load dependencies of plugins import pldns except ImportError: raise nose.SkipTest expected_names = set(['TFO', 'ECN', 'DSCP', 'UDPZero', 'UDPOpts', 'DNSResolv', 'H2', 'EvilBit']) names = set() for plugin in pathspider.cmd.measure.plugins: assert issubclass(plugin, pathspider.base.Spider) names.add(plugin.__name__) assert names == expected_names
def test_issue_23(): if not IN_TRAVIS: raise SkipTest() pe.get_book(url="https://github.com/pyexcel/pyexcel-ods/raw/master/tests/fixtures/white_space.ods", library='pyexcel-odsr'); # flake8: noqa
def setup_class(cls): if p.toolkit.check_ckan_version(max_version='2.4.99'): raise nose.SkipTest() super(TestTranslations, cls).setup_class()
def setup_class(cls): if not tests.is_datastore_supported(): raise nose.SkipTest("Datastore not supported") p.load('datastore') ctd.CreateTestData.create() cls.sysadmin_user = model.User.get('testsysadmin') cls.normal_user = model.User.get('annafan') resource = model.Package.get('annakarenina').resources[0] cls.data = dict( resource_id=resource.id, force=True, fields=[ {'id': 'id'}, {'id': 'date', 'type':'date'}, {'id': 'x'}, {'id': 'y'}, {'id': 'z'}, {'id': 'country'}, {'id': 'title'}, {'id': 'lat'}, {'id': 'lon'} ], records=[ {'id': 0, 'date': '2011-01-01', 'x': 1, 'y': 2, 'z': 3, 'country': 'DE', 'title': 'first 99', 'lat':52.56, 'lon':13.40}, {'id': 1, 'date': '2011-02-02', 'x': 2, 'y': 4, 'z': 24, 'country': 'UK', 'title': 'second', 'lat':54.97, 'lon':-1.60}, {'id': 2, 'date': '2011-03-03', 'x': 3, 'y': 6, 'z': 9, 'country': 'US', 'title': 'third', 'lat':40.00, 'lon':-75.5}, {'id': 3, 'date': '2011-04-04', 'x': 4, 'y': 8, 'z': 6, 'country': 'UK', 'title': 'fourth', 'lat':57.27, 'lon':-6.20}, {'id': 4, 'date': '2011-05-04', 'x': 5, 'y': 10, 'z': 15, 'country': 'UK', 'title': 'fifth', 'lat':51.58, 'lon':0}, {'id': 5, 'date': '2011-06-02', 'x': 6, 'y': 12, 'z': 18, 'country': 'DE', 'title': 'sixth 53.56', 'lat':51.04, 'lon':7.9} ] ) postparams = '%s=1' % json.dumps(cls.data) auth = {'Authorization': str(cls.normal_user.apikey)} res = cls.app.post('/api/action/datastore_create', params=postparams, extra_environ=auth) res_dict = json.loads(res.body) assert res_dict['success'] is True
def test_pg_version_check(self): if not tests.is_datastore_supported(): raise nose.SkipTest("Datastore not supported") engine = db._get_engine( {'connection_url': config['sqlalchemy.url']}) connection = engine.connect() assert db._pg_version_is_at_least(connection, '8.0') assert not db._pg_version_is_at_least(connection, '10.0')
def setup_class(cls): wsgiapp = middleware.make_app(config['global_conf'], **config) cls.app = paste.fixture.TestApp(wsgiapp) if not tests.is_datastore_supported(): raise nose.SkipTest("Datastore not supported") p.load('datastore') ctd.CreateTestData.create() cls.sysadmin_user = model.User.get('testsysadmin') cls.normal_user = model.User.get('annafan') resource = model.Package.get('annakarenina').resources[0] cls.data = { 'resource_id': resource.id, 'force': True, 'aliases': 'books', 'fields': [{'id': u'b\xfck', 'type': 'text'}, {'id': 'author', 'type': 'text'}, {'id': 'published'}, {'id': u'characters', u'type': u'_text'}, {'id': 'random_letters', 'type': 'text[]'}], 'records': [{u'b\xfck': 'annakarenina', 'author': 'tolstoy', 'published': '2005-03-01', 'nested': ['b', {'moo': 'moo'}], u'characters': [u'Princess Anna', u'Sergius'], 'random_letters': ['a', 'e', 'x']}, {u'b\xfck': 'warandpeace', 'author': 'tolstoy', 'nested': {'a': 'b'}, 'random_letters': []}] } postparams = '%s=1' % json.dumps(cls.data) auth = {'Authorization': str(cls.sysadmin_user.apikey)} res = cls.app.post('/api/action/datastore_create', params=postparams, extra_environ=auth) res_dict = json.loads(res.body) assert res_dict['success'] is True engine = db._get_engine({ 'connection_url': config['ckan.datastore.write_url']}) cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
def setup_class(cls): if not tests.is_datastore_supported(): raise nose.SkipTest("Datastore not supported") p.load('datastore') ctd.CreateTestData.create() cls.sysadmin_user = model.User.get('testsysadmin') cls.normal_user = model.User.get('annafan') set_url_type( model.Package.get('annakarenina').resources, cls.sysadmin_user) resource = model.Package.get('annakarenina').resources[0] cls.data = { 'resource_id': resource.id, 'fields': [{'id': u'b\xfck', 'type': 'text'}, {'id': 'author', 'type': 'text'}, {'id': 'nested', 'type': 'json'}, {'id': 'characters', 'type': 'text[]'}, {'id': 'published'}], 'primary_key': u'b\xfck', 'records': [{u'b\xfck': 'annakarenina', 'author': 'tolstoy', 'published': '2005-03-01', 'nested': ['b', {'moo': 'moo'}]}, {u'b\xfck': 'warandpeace', 'author': 'tolstoy', 'nested': {'a':'b'}} ] } postparams = '%s=1' % json.dumps(cls.data) auth = {'Authorization': str(cls.sysadmin_user.apikey)} res = cls.app.post('/api/action/datastore_create', params=postparams, extra_environ=auth) res_dict = json.loads(res.body) assert res_dict['success'] is True engine = db._get_engine( {'connection_url': config['ckan.datastore.write_url']}) cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
def setup_class(cls): if not tests.is_datastore_supported(): raise nose.SkipTest("Datastore not supported") p.load('datastore') ctd.CreateTestData.create() cls.sysadmin_user = model.User.get('testsysadmin') cls.normal_user = model.User.get('annafan') set_url_type( model.Package.get('annakarenina').resources, cls.sysadmin_user) resource = model.Package.get('annakarenina').resources[0] hhguide = u"hitchhiker's guide to the galaxy" cls.data = { 'resource_id': resource.id, 'fields': [{'id': u'b\xfck', 'type': 'text'}, {'id': 'author', 'type': 'text'}, {'id': 'nested', 'type': 'json'}, {'id': 'characters', 'type': 'text[]'}, {'id': 'published'}], 'primary_key': u'b\xfck', 'records': [{u'b\xfck': 'annakarenina', 'author': 'tolstoy', 'published': '2005-03-01', 'nested': ['b', {'moo': 'moo'}]}, {u'b\xfck': 'warandpeace', 'author': 'tolstoy', 'nested': {'a':'b'}}, {'author': 'adams', 'characters': ['Arthur Dent', 'Marvin'], 'nested': {'foo': 'bar'}, u'b\xfck': hhguide} ] } postparams = '%s=1' % json.dumps(cls.data) auth = {'Authorization': str(cls.sysadmin_user.apikey)} res = cls.app.post('/api/action/datastore_create', params=postparams, extra_environ=auth) res_dict = json.loads(res.body) assert res_dict['success'] is True engine = db._get_engine( {'connection_url': config['ckan.datastore.write_url']}) cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine))
def setup_class(cls): if not is_datastore_supported(): raise nose.SkipTest("Datastore not supported") plugin = p.load('datastore') if plugin.legacy_mode: # make sure we undo adding the plugin p.unload('datastore') raise nose.SkipTest("Info is not supported in legacy mode")
def setup_class(cls): wsgiapp = middleware.make_app(config['global_conf'], **config) cls.app = paste.fixture.TestApp(wsgiapp) if not tests.is_datastore_supported(): raise nose.SkipTest("Datastore not supported") p.load('datastore') ctd.CreateTestData.create() cls.sysadmin_user = model.User.get('testsysadmin') cls.normal_user = model.User.get('annafan') engine = db._get_engine( {'connection_url': config['ckan.datastore.write_url']}) cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine)) set_url_type( model.Package.get('annakarenina').resources, cls.sysadmin_user)
def setup_class(cls): wsgiapp = middleware.make_app(config['global_conf'], **config) cls.app = paste.fixture.TestApp(wsgiapp) if not tests.is_datastore_supported(): raise nose.SkipTest("Datastore not supported") p.load('datastore') p.load('datapusher') ctd.CreateTestData.create() cls.sysadmin_user = model.User.get('testsysadmin') cls.normal_user = model.User.get('annafan') engine = db._get_engine( {'connection_url': config['ckan.datastore.write_url']}) cls.Session = orm.scoped_session(orm.sessionmaker(bind=engine)) set_url_type( model.Package.get('annakarenina').resources, cls.sysadmin_user)