我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用unittest.skip()。
def allocDelayProg(size, duration): """Python program as str that allocates object of the specified size in Gigabytes and then waits for the specified time size - allocation size, bytes; None to skip allocation duration - execution duration, sec; None to skip the sleep return - Python program as str """ return """from __future__ import print_function, division # Required for stderr output, must be the first import import sys import time import array if {size} is not None: a = array.array('b', [0]) asize = sys.getsizeof(a) # Note: allocate at least empty size, i.e. empty list buffer = a * int(max({size} - asize + 1, 0)) #if _DEBUG_TRACE: # print(''.join((' allocDelayProg(), allocated ', str(sys.getsizeof(buffer)) # , ' bytes for ', str({duration}),' sec')), file=sys.stderr) if {duration} is not None: time.sleep({duration}) """.format(size=size, duration=duration)
def skip_if_no_uuid(f): """Decorator to skip a test if uuid is not supported by Py/PG.""" @wraps(f) def skip_if_no_uuid_(self): try: import uuid # noqa except ImportError: return self.skipTest("uuid not available in this Python version") try: cur = self.conn.cursor() cur.execute("select typname from pg_type where typname = 'uuid'") has = cur.fetchone() finally: self.conn.rollback() if has: return f(self) else: return self.skipTest("uuid type not available on the server") return skip_if_no_uuid_
def test_callback_is_run(self): await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue", exchange_name="fake_exch", exchange_type="direct", routing_keys=["fake_routing_key"], callback = self.fake_callback) await self.GIVEN_MessagePublished(exchange_name="fake_exch", msg="fake_message", routing_key="fake_routing_key") await self.WHEN_ProcessEventsNTimes(2) self.THEN_CallbackReceivesMessage("fake_message") # @unittest.skip("skipped")
def test_callback_is_not_run_if_routing_key_mismatch(self): fake_callback = Mock() await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue", exchange_name="fake_exch", exchange_type="direct", routing_keys=["fake_routing_key"], callback = fake_callback) await self.GIVEN_MessagePublished(exchange_name="fake_exch", msg="fake_message", routing_key="another_routing_key") await self.WHEN_ProcessEventsOnce() self.THEN_CallbackIsNotRun(fake_callback) # @unittest.skip("skipped")
def test_callback_run_if_multiple_routing_keys(self): await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue", exchange_name="fake_exch", exchange_type="direct", routing_keys=["fake_routing_key","fake_routing_key2"], callback = self.fake_callback) await self.GIVEN_MessagePublished(exchange_name="fake_exch", msg="fake_message", routing_key="fake_routing_key2") await self.WHEN_ProcessEventsOnce() self.THEN_CallbackReceivesMessage("fake_message") # @unittest.skip("skipped")
def test_callback_run_if_two_exclusive_queues_registered(self): await self.GIVEN_ConsumerRegistered(queue_name = None, exchange_name="fake_exch", exchange_type="direct", routing_keys=["fake_routing_key"], callback = self.fake_callback) await self.GIVEN_ConsumerRegistered(queue_name = None, exchange_name="fake_exch", exchange_type="direct", routing_keys=["fake_routing_key"], callback = self.fake_callback2) await self.GIVEN_MessagePublished(exchange_name="fake_exch", msg="fake_message", routing_key="fake_routing_key") await self.WHEN_ProcessEventsOnce() self.THEN_CallbackReceivesMessage("fake_message") self.THEN_Callback2ReceivesMessage("fake_message") # @unittest.skip("skipped")
def test_routes_to_all_consumer_queues(self): await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue1", exchange_name="fake_exch", exchange_type="fanout", routing_keys="", callback = self.fake_callback) await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue2", exchange_name="fake_exch", exchange_type="fanout", routing_keys="", callback = self.fake_callback2) await self.GIVEN_MessagePublished(exchange_name="fake_exch", msg="fake_msg", routing_key="") await self.WHEN_ProcessEventsNTimes(1) self.THEN_CallbackReceivesMessage("fake_msg") self.THEN_Callback2ReceivesMessage("fake_msg") # @unittest.skip("skipped")
def test_direct_runs_callback(self): await self.GIVEN_ProducerRegistered(exchange_name="fake_direct_exch", exchange_type="direct") await self.GIVEN_ConsumerRegistered(queue_name="fake_direct_consumer_queue", exchange_name="fake_direct_exch", exchange_type="direct", routing_keys=["fake_routing_key"], callback = self.fake_callback) await self.GIVEN_MessagePublished(exchange_name="fake_direct_exch", msg="fake_message", routing_key="fake_routing_key") await self.WHEN_ProcessEventsOnce() self.THEN_CallbackReceivesMessage("fake_message") # @unittest.skip("skipped")
def test_topic_runs_callback(self): await self.GIVEN_ProducerRegistered(exchange_name="fake_topic_exch", exchange_type="topic") await self.GIVEN_ConsumerRegistered(queue_name="fake_topic_consumer_queue", exchange_name="fake_topic_exch", exchange_type="topic", routing_keys=["*.red","ball.*"], callback = self.fake_callback) await self.GIVEN_MessagePublished(exchange_name="fake_topic_exch", msg="fake_message", routing_key="apple.red") await self.WHEN_ProcessEventsOnce() self.THEN_CallbackReceivesMessage("fake_message") # @unittest.skip("skipped")
def test_skipping(self): class Foo(unittest.TestCase): def test_skip_me(self): self.skipTest("skip") events = [] result = LoggingResult(events) test = Foo("test_skip_me") test.run(result) self.assertEqual(events, ['startTest', 'addSkip', 'stopTest']) self.assertEqual(result.skipped, [(test, "skip")]) # Try letting setUp skip the test now. class Foo(unittest.TestCase): def setUp(self): self.skipTest("testing") def test_nothing(self): pass events = [] result = LoggingResult(events) test = Foo("test_nothing") test.run(result) self.assertEqual(events, ['startTest', 'addSkip', 'stopTest']) self.assertEqual(result.skipped, [(test, "testing")]) self.assertEqual(result.testsRun, 1)
def test_decorated_skip(self): def decorator(func): def inner(*a): return func(*a) return inner class Foo(unittest.TestCase): @decorator @unittest.skip('testing') def test_1(self): pass result = unittest.TestResult() test = Foo("test_1") suite = unittest.TestSuite([test]) suite.run(result) self.assertEqual(result.skipped, [(test, "testing")])
def test_class_not_setup_or_torndown_when_skipped(self): class Test(unittest.TestCase): classSetUp = False tornDown = False @classmethod def setUpClass(cls): Test.classSetUp = True @classmethod def tearDownClass(cls): Test.tornDown = True def test_one(self): pass Test = unittest.skip("hop")(Test) self.runTests(Test) self.assertFalse(Test.classSetUp) self.assertFalse(Test.tornDown)
def test_notification_pub_sub_mix(self): """ Test notification messaging pattern mixed with basic pub/sub calls. """ self.m.register_notification_endpoint(self._simple_subscribe_cbf1, "test.notification1") self.m.subscribe(self._simple_subscribe_cbf1, "test.notification2") time.sleep(0.5) # give broker some time to register subscriptions # send publish to notify endpoint self.m.publish("test.notification1", "my-notification1") self.assertEqual(self.wait_for_messages()[0], "my-notification1") # send notify to subscribe endpoint self.m.notify("test.notification2", "my-notification2") #res = self.wait_for_messages(n_messages=2) self.assertTrue(self.wait_for_particular_messages("my-notification1")) self.assertTrue(self.wait_for_particular_messages("my-notification2")) #@unittest.skip("disabled")
def test_double_subscriptions(self): """ Ensure that messages are delivered to all subscriptions of a topic. (e.g. identifies queue setup problems) :return: """ self.m.subscribe(self._simple_subscribe_cbf1, "test.interleave") self.m.subscribe(self._simple_subscribe_cbf2, "test.interleave") time.sleep(0.5) # send publish to notify endpoint self.m.publish("test.interleave", "my-notification1") # enusre that it is received by each subscription self.assertTrue(self.wait_for_particular_messages("my-notification1", buffer=0)) self.assertTrue(self.wait_for_particular_messages("my-notification1", buffer=1)) #@unittest.skip("disabled")
def test_strings(self): clause = self.parser.parse_line_clause_body('X is \'bar\', S is format_str(\'test %d %s foo\', 42, X)') solutions = self.rt.search(clause) self.assertEqual (solutions[0]['S'].s, 'test 42 bar foo') clause = self.parser.parse_line_clause_body('X is \'foobar\', sub_string(X, 0, 2, _, Y)') solutions = self.rt.search(clause) self.assertEqual (solutions[0]['Y'].s, 'fo') clause = self.parser.parse_line_clause_body('atom_chars(foo, X), atom_chars(Y, "bar").') solutions = self.rt.search(clause) self.assertEqual (solutions[0]['X'].s, 'foo') self.assertEqual (solutions[0]['Y'].name, 'bar') # @unittest.skip("temporarily disabled")
def test_setz_multi(self): # self.rt.set_trace(True) clause = self.parser.parse_line_clause_body('I is ias2, setz(ias (I, a, _), a), setz(ias (I, b, _), b), setz(ias (I, c, _), c), ias(I, X, Y). ') solutions = self.rt.search(clause) logging.debug(repr(solutions)) self.assertEqual (len(solutions), 3) self.assertEqual (solutions[0]['X'].name, 'a') self.assertEqual (solutions[0]['Y'].name, 'a') self.assertEqual (solutions[1]['X'].name, 'b') self.assertEqual (solutions[1]['Y'].name, 'b') self.assertEqual (solutions[2]['X'].name, 'c') self.assertEqual (solutions[2]['Y'].name, 'c') # @unittest.skip("temporarily disabled")
def test_custom_builtins(self): global recorded_moves self.parser.compile_file('samples/hanoi2.pl', UNITTEST_MODULE) clause = self.parser.parse_line_clause_body('move(3,left,right,center)') logging.debug('clause: %s' % clause) # register our custom builtin recorded_moves = [] self.rt.register_builtin('record_move', record_move) solutions = self.rt.search(clause) logging.debug('solutions: %s' % repr(solutions)) self.assertEqual (len(solutions), 1) self.assertEqual (len(recorded_moves), 7) #@unittest.skip("temporarily disabled")
def test_double_negation(self): self.parser.compile_file('samples/not_test.pl', UNITTEST_MODULE) clause = self.parser.parse_line_clause_body('not(not(chancellor(helmut_kohl))).') logging.debug('clause: %s' % clause) solutions = self.rt.search(clause, {}) logging.debug('solutions: %s' % repr(solutions)) self.assertEqual (len(solutions), 1) clause = self.parser.parse_line_clause_body('not(not(chancellor(angela_merkel))).') logging.debug('clause: %s' % clause) solutions = self.rt.search(clause, {}) logging.debug('solutions: %s' % repr(solutions)) self.assertEqual (len(solutions), 1) clause = self.parser.parse_line_clause_body('not(not(chancellor(X))).') logging.debug('clause: %s' % clause) solutions = self.rt.search(clause, {}) logging.debug('solutions: %s' % repr(solutions)) self.assertEqual (len(solutions), 2) # @unittest.skip("temporarily disabled")
def test_parse_line_clauses(self): line = 'time_span(TE) :- date_time_stamp(+(D, 1.0)).' tree = self.parser.parse_line_clauses(line) logging.debug (unicode(tree[0].body)) self.assertEqual (tree[0].body.name, 'date_time_stamp') self.assertEqual (tree[0].head.name, 'time_span') line = 'time_span(tomorrow, TS, TE) :- context(currentTime, T), stamp_date_time(T, date(Y, M, D, H, Mn, S, "local")), date_time_stamp(date(Y, M, +(D, 1.0), 0.0, 0.0, 0.0, "local"), TS), date_time_stamp(date(Y, M, +(D, 1.0), 23.0, 59.0, 59.0, "local"), TE).' tree = self.parser.parse_line_clauses(line) logging.debug (unicode(tree[0].body)) self.assertEqual (tree[0].head.name, 'time_span') self.assertEqual (tree[0].body.name, 'and') self.assertEqual (len(tree[0].body.args), 4) # @unittest.skip("temporarily disabled")
def test_cut(self): self.parser.compile_file('samples/cut_test.pl', UNITTEST_MODULE) # self.rt.set_trace(True) clause = self.parser.parse_line_clause_body(u'bar(R, X)') logging.debug(u'clause: %s' % clause) solutions = self.rt.search(clause) logging.debug('solutions: %s' % repr(solutions)) self.assertEqual (len(solutions), 4) self.assertEqual (solutions[0]['R'].s, "one") self.assertEqual (solutions[1]['R'].s, "two") self.assertEqual (solutions[2]['R'].s, "many") self.assertEqual (solutions[3]['R'].s, "many") # @unittest.skip("temporarily disabled")
def test_create_signal_graph(self): node_signal = sptgraph.create_node_signal(gen_signal(), 'baseID', 'layer', False) for is_directed in [True, False]: g = utils.networkx_to_graphlab(gen_graph(is_directed)) sg = sptgraph.merge_signal_on_graph(g, node_signal, 'baseID', 'layer', use_fast=False, verbose=False) # Node 5 is never activated self.assertEqual(4, len(sg.vertices)) if not is_directed: self.assertEqual(8, len(sg.edges)) else: self.assertEqual(4, len(sg.edges)) actual_columns = set(sg.vertices.column_names()) expected_columns = {'layers', 'node_weight', '__id'} self.assertItemsEqual(expected_columns, actual_columns) # @unittest.skip('Skipping aggregate layers')
def test_aggregate_layers(self): signal = gl.SFrame(gen_signal()) nb_layers = signal['layer'].max() + 1 # starts at 0 # Python 'slow' original = sptgraph.create_node_signal(signal, 'baseID', 'layer', False) # Fast c++ version res = sptgraph_fast.aggregate_layers(signal, 'baseID', 'layer', nb_layers) # Transform output to compare l1 = original['layers'].apply(int) l2 = res['layers'].apply(utils.reform_layer_int_from_blocks) m = l1 == l2 self.assertTrue(m.all(), 'Layers should be equal') # @unittest.skip('Skipping rebuilt_bitset')
def test_directed_no_self_edge(self): # Directed no self-edge g = sptgraph.create_spatio_temporal_graph(gen_graph(True), gen_signal(), False, verbose=False) # The graph has only 1 connected component h = components.find_connected_components(g) cc = components.create_component_sframe(h) comps = components.extract_components(h, cc) self.assertEqual(1, len(comps)) # We remove the edge (7, 12) to create 2 weakly connected components nodes = g.vertices edges = g.edges.add_row_number('eid') to_remove = g.get_edges(7, 12) edges = edges[edges['eid'] != to_remove['eid'][0]] g = gl.SGraph(nodes, edges) h = components.find_connected_components(g) cc = components.create_component_sframe(h) comps = components.extract_components(h, cc) self.assertEqual(2, len(comps)) # @unittest.skip('Skipping undirected self edge')
def test_bayes_factor_delta(self): ndecimals = 5 res = self.getExperiment(['normal_same']).delta(method='bayes_factor', num_iters=2000) variants = find_list_of_dicts_element(res['kpis'], 'name', 'normal_same', 'variants') aStats = find_list_of_dicts_element(variants, 'name', 'A', 'delta_statistics') self.assertNumericalEqual(aStats['delta'], 0.033053, ndecimals) self.assertEqual(aStats['stop'], True, ndecimals) self.assertEqual(aStats['number_of_iterations'], 2000, ndecimals) self.assertNumericalEqual(aStats['confidence_interval'][0]['value'], -0.00829, ndecimals) self.assertNumericalEqual(aStats['confidence_interval'][1]['value'], 0.07127, ndecimals) self.assertEqual(aStats['treatment_sample_size'], 6108) self.assertEqual(aStats['control_sample_size'], 3892) self.assertNumericalEqual(aStats['treatment_mean'], 0.025219, ndecimals) self.assertNumericalEqual(aStats['control_mean'], -0.007833, ndecimals) self.assertNumericalEqual(aStats['statistical_power'], 0.36401, ndecimals) # @unittest.skip("sometimes takes too much time")
def test_bayes_precision_delta(self): ndecimals = 5 res = self.getExperiment(['normal_same']).delta(method='bayes_precision', num_iters=2000) variants = find_list_of_dicts_element(res['kpis'], 'name', 'normal_same', 'variants') aStats = find_list_of_dicts_element(variants, 'name', 'A', 'delta_statistics') self.assertNumericalEqual(aStats['delta'], 0.033053, ndecimals) self.assertEqual(aStats['stop'], True, ndecimals) self.assertEqual(aStats['number_of_iterations'], 2000, ndecimals) self.assertNumericalEqual(aStats['confidence_interval'][0]['value'], -0.00829, ndecimals) self.assertNumericalEqual(aStats['confidence_interval'][1]['value'], 0.07127, ndecimals) self.assertEqual(aStats['treatment_sample_size'], 6108) self.assertEqual(aStats['control_sample_size'], 3892) self.assertNumericalEqual(aStats['treatment_mean'], 0.025219, ndecimals) self.assertNumericalEqual(aStats['control_mean'], -0.007833, ndecimals) self.assertNumericalEqual(aStats['statistical_power'], 0.36401, ndecimals) # @unittest.skip("sometimes takes too much time")
def test_bayes_factor(self): """ Check the Bayes factor function. """ res = es.bayes_factor(self.rand_s1, self.rand_s2, num_iters=2000) self.assertEqual (res['stop'], True) self.assertAlmostEqual (res['delta'], -0.15887364780635896) value025 = find_list_of_dicts_element(res['confidence_interval'], 'percentile', 2.5, 'value') value975 = find_list_of_dicts_element(res['confidence_interval'], 'percentile', 97.5, 'value') self.assertAlmostEqual (value025, -0.24293384641452503) self.assertAlmostEqual (value975, -0.075064346336461404) self.assertEqual (res['treatment_sample_size'], 1000) self.assertEqual (res['control_sample_size'], 1000) self.assertAlmostEqual (res['treatment_mean'], -0.045256707490195384) self.assertAlmostEqual (res['control_mean'], 0.11361694031616358) # @unittest.skip("sometimes takes too much time")
def test_get_by_user(self): response = self.admin_open(self.item_url('FooBarUser')) self.assertAPIError(response, 404, 'UserNotFound') response = self.admin_open(self.item_url(self.user.username)) self.assert200(response) # only Admin has permission validator = UsageResponseValidator() if not validator.validate_get(response.json): self.fail(validator.errors) # first user only self.assertEqual(len(response.json['data']['ip_usage']), 2) # first user only self.assertEqual(len(response.json['data']['pd_usage']), 2) # @unittest.skip('')
def skip(reason, condition=None): """ Mark a test case or method for skipping. @param reason: message @type reason: str @param condition: skip only if the specified condition is True @type condition: bool/expression """ if isinstance(reason, types.FunctionType): raise TypeError('skip: no reason specified') if condition is None: return unittest.skip(reason) else: return unittest.skipIf(condition, reason)
def test_download_one_file(self): try: shutil.rmtree("/tmp/nitty_test") except: pass remote_path = "/alice/cern.ch/user/c/cbourjau/nitty_test/AnalysisResults.root" utils.download_file(remote_path, "/tmp/nitty_test/AnalysisResults.root") self.assertTrue(os.path.isfile("/tmp/nitty_test/AnalysisResults.root")) shutil.rmtree("/tmp/nitty_test") # without specifiying dest file name: utils.download_file(remote_path, "/tmp/nitty_test/") self.assertTrue(os.path.isfile("/tmp/nitty_test/AnalysisResults.root")) shutil.rmtree("/tmp/nitty_test") # @skip("Skip download test")
def mandatory(test_item): """A decorator to mark whole test cases or individual methods as mandatory. If a mandatory test fails, DataTestRunner will stop immediately (this is similar to the ``--failfast`` command line argument behavior):: @datatest.mandatory class TestFileFormat(datatest.DataTestCase): def test_columns(self): ... """ test_item.__datatest_mandatory__ = True return test_item # The standard unittest.skip decorators are reimplemented to add a # _wrapped attribute that points to the orignal object so that the # _sort_key() function can find the proper line number when test_item # gets wrapped by functools.wraps().
def skip(reason): """A decorator to unconditionally skip a test: .. code-block:: python @datatest.skip('Not finished collecting raw data.') class TestSumTotals(datatest.DataTestCase): def test_totals(self): ... """ def decorator(test_item): if not isinstance(test_item, type): orig_item = test_item # <- Not in unittest.skip() @functools.wraps(test_item) def skip_wrapper(*args, **kwargs): raise unittest.SkipTest(reason) test_item = skip_wrapper test_item._wrapped = orig_item # <- Not in unittest.skip() test_item.__unittest_skip__ = True test_item.__unittest_skip_why__ = reason return test_item return decorator
def _check_unchunked_files(self, d): self.assertEqual(d['$chunk.ref_contigset_id'], self.INPUT_FILES[1]) self.assertEqual(d['$chunk.ccsset_id'], self.INPUT_FILES[2]) #@unittest.skip("GMAP disabled") #class TestScatterContigSetGMAP(ContigSetScatterBase, # pbcommand.testkit.core.PbTestScatterApp): # DRIVER_BASE = "python -m pbtranscript.tasks.scatter_contigset_gmap" # INPUT_FILES = [ # identical to TestIcePartial inputs # op.join(MNT_DATA, "sa3", "nfl.contigset.xml"), # "/pbi/dept/secondary/siv/references/rat_UCSC/rat_UCSC.referenceset.xml", # ] # # def _check_unchunked_files(self, d): # self.assertEqual(d['$chunk.reference_id'], self.INPUT_FILES[1])
def test_skip_doesnt_run_setup(self): class Foo(unittest.TestCase): wasSetUp = False wasTornDown = False def setUp(self): Foo.wasSetUp = True def tornDown(self): Foo.wasTornDown = True @unittest.skip('testing') def test_1(self): pass result = unittest.TestResult() test = Foo("test_1") suite = unittest.TestSuite([test]) suite.run(result) self.assertEqual(result.skipped, [(test, "testing")]) self.assertFalse(Foo.wasSetUp) self.assertFalse(Foo.wasTornDown)
def test_gc(self): for tp in self._types: if not isinstance(tp, type): # If tp is a factory rather than a plain type, skip continue class MySource(tp): pass class MyObject: pass # Create a reference cycle through a memoryview object b = MySource(tp(b'abc')) m = self._view(b) o = MyObject() b.m = m b.o = o wr = weakref.ref(o) b = m = o = None # The cycle must be broken gc.collect() self.assertTrue(wr() is None, wr())
def test_1(self): try: from sys import getrefcount as grc except ImportError: return unittest.skip("no sys.getrefcount()") f = dll._testfunc_callback_i_if f.restype = ctypes.c_int f.argtypes = [ctypes.c_int, MyCallback] def callback(value): #print "called back with", value return value self.assertEqual(grc(callback), 2) cb = MyCallback(callback) self.assertTrue(grc(callback) > 2) result = f(-10, cb) self.assertEqual(result, -18) cb = None gc.collect() self.assertEqual(grc(callback), 2)
def testSingleDatacenter(self): """ Create a single data center and add check if its switch is up by using manually added hosts. Tests especially the data center specific addLink method. """ # create network self.createNet(nswitches=0, ndatacenter=1, nhosts=2, ndockers=0) # setup links self.net.addLink(self.dc[0], self.h[0]) self.net.addLink(self.h[1], self.dc[0]) # start Mininet network self.startNet() # check number of running nodes self.assertTrue(len(self.getContainernetContainers()) == 0) self.assertTrue(len(self.net.hosts) == 2) self.assertTrue(len(self.net.switches) == 1) # check connectivity by using ping self.assertTrue(self.net.ping([self.h[0], self.h[1]]) <= 0.0) # stop Mininet network self.stopNet() #@unittest.skip("disabled to test if CI fails because this is the first test.")
def test_Xlinked_Norine_BMX(self): """This gene has no X-linked variant""" db = 'lgueneau' ss = samples_selection_factory(db=db, groups = {'affected': ['09818','09819'], 'not_affected':['09960','09961']}) qs = Variant.objects.using(db).filter(gene_symbol__in=['BMX'], chrom='chrX') # Using GenotypesFilterXLinked.apply() variants = variants_collection_factory(db=db, qs=qs) var = GenotypesFilterXLinked(ss).apply(variants).variants self.assertEqual(len(var), 0) # Using FilterCollection.apply() fc = FiltersCollection([GenotypesFilterXLinked(ss)]) var = fc.apply(initqs=qs, db=db).variants self.assertEqual(len(var), 0) ############################ # COMPOUND HET # ############################ #@unittest.skip('')
def test_compound_Lucie_FNDC1(self): """These 2(3) variants are annotated on different transcripts of the same gene, and because of the group_by(transcript_id), the compound was not found. Check here that it is fixed. """ ss = samples_selection_factory(db='test', groups = {'affected': ['101563','101591'], 'not_affected':['101564','101565']}) variants = variants_collection_factory(db='test', qs=Variant.objects.using('test').filter(gene_symbol='FNDC1')) ## To select them directly: #variants = variants_collection_factory( # Variants.objects.using('test').filter(start__in=['159653504','159653634']), db='test') var = GenotypesFilterCompoundHeterozygous(ss).apply(variants).variants self.assertEqual(len(var), 3) ############################ # FROM REQUEST # ############################ #@unittest.skip('')
def test_genotypes_Nothing_filter_from_request_with_ss_from_request(self): """With a sound grouping passed by url.""" samples = [('samples', 'A=09818,09819'), ('samples', 'B=09960,09961')] filters = [('filter', 'genotype=nothing')] request = RequestFactory().get('', samples + filters) fc = variant_filters_from_request(request, db='test') self.assertIsInstance(fc, FiltersCollection) self.assertTrue(len(fc) == len(filters)) gf = fc['genotype'] self.assertEqual(sorted(gf.ss.groups.keys()), ['A','B']) self.assertEqual(gf.name, 'genotype') self.assertEqual(gf.val, 'nothing') self.assertEqual(gf.op, '=') self.assertIsInstance(gf, GenotypesFilterDoNothing) #@unittest.skip('')
def skip_if_no_uuid(f): """Decorator to skip a test if uuid is not supported by Py/PG.""" @wraps(f) def skip_if_no_uuid_(self): try: import uuid except ImportError: return self.skipTest("uuid not available in this Python version") try: cur = self.conn.cursor() cur.execute("select typname from pg_type where typname = 'uuid'") has = cur.fetchone() finally: self.conn.rollback() if has: return f(self) else: return self.skipTest("uuid type not available on the server") return skip_if_no_uuid_