Python unittest2 模块,SkipTest() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用unittest2.SkipTest()

项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def download_test_files_if_not_present(self):
        '''
        Download %s file at G-node for testing
        url_for_tests is global at beginning of this file.

        ''' % self.ioclass.__name__
        if not self.use_network:
            raise unittest.SkipTest("Requires download of data from the web")

        url = url_for_tests+self.shortname
        try:
            make_all_directories(self.files_to_download, self.local_test_dir)
            download_test_file(self.files_to_download,
                               self.local_test_dir, url)
        except IOError as exc:
            raise unittest.SkipTest(exc)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_paged_result_handling(self):
        if PROTOCOL_VERSION < 2:
            raise unittest.SkipTest("Paging requires native protocol 2+, currently using: {0}".format(PROTOCOL_VERSION))

        # addresses #225
        class PagingTest(Model):
            id = columns.Integer(primary_key=True)
            val = columns.Integer()
        sync_table(PagingTest)

        PagingTest.create(id=1, val=1)
        PagingTest.create(id=2, val=2)

        session = get_session()
        with mock.patch.object(session, 'default_fetch_size', 1):
            results = PagingTest.objects()[:]

        assert len(results) == 2
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        """
        Test is skipped if run with native protocol version <4
        """
        self.support_v5 = True
        if PROTOCOL_VERSION < 4:
            raise unittest.SkipTest(
                "Native protocol 4,0+ is required for custom payloads, currently using %r"
                % (PROTOCOL_VERSION,))
        try:
            self.cluster = Cluster(protocol_version=ProtocolVersion.MAX_SUPPORTED, allow_beta_protocol_version=True)
            self.session = self.cluster.connect()
        except NoHostAvailable:
            log.info("Protocol Version 5 not supported,")
            self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
            self.session = self.cluster.connect()
            self.support_v5 = False

        self.nodes_currently_failing = []
        self.node1, self.node2, self.node3 = get_cluster().nodes.values()
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        """
        Test is skipped if run with cql version < 2

        """
        if PROTOCOL_VERSION < 2:
            raise unittest.SkipTest(
                "Protocol 2.0+ is required for Lightweight transactions, currently testing against %r"
                % (PROTOCOL_VERSION,))

        self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
        self.session = self.cluster.connect()

        ddl = '''
            CREATE TABLE test3rf.lwt (
                k int PRIMARY KEY,
                v int )'''
        self.session.execute(ddl)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_cql_compatibility(self):
        if CASS_SERVER_VERSION >= (3, 0):
            raise unittest.SkipTest("cql compatibility does not apply Cassandra 3.0+")

        # having more than one non-PK column is okay if there aren't any
        # clustering columns
        create_statement = self.make_create_statement(["a"], [], ["b", "c", "d"], compact=True)
        self.session.execute(create_statement)
        tablemeta = self.get_table_metadata()

        self.assertEqual([u'a'], [c.name for c in tablemeta.partition_key])
        self.assertEqual([], tablemeta.clustering_key)
        self.assertEqual([u'a', u'b', u'c', u'd'], sorted(tablemeta.columns.keys()))

        self.assertTrue(tablemeta.is_cql_compatible)

        # ... but if there are clustering columns, it's not CQL compatible.
        # This is a hacky way to simulate having clustering columns.
        tablemeta.clustering_key = ["foo", "bar"]
        tablemeta.columns["foo"] = None
        tablemeta.columns["bar"] = None
        self.assertFalse(tablemeta.is_cql_compatible)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_replicas(self):
        """
        Ensure cluster.metadata.get_replicas return correctly when not attached to keyspace
        """
        if murmur3 is None:
            raise unittest.SkipTest('the murmur3 extension is not available')

        cluster = Cluster(protocol_version=PROTOCOL_VERSION)
        self.assertEqual(cluster.metadata.get_replicas('test3rf', 'key'), [])

        cluster.connect('test3rf')

        self.assertNotEqual(list(cluster.metadata.get_replicas('test3rf', six.b('key'))), [])
        host = list(cluster.metadata.get_replicas('test3rf', six.b('key')))[0]
        self.assertEqual(host.datacenter, 'dc1')
        self.assertEqual(host.rack, 'r1')
        cluster.shutdown()
项目:cqlmapper    作者:reddit    | 项目源码 | 文件源码
def test_paged_result_handling(self):
        if PROTOCOL_VERSION < 2:
            raise unittest.SkipTest(
                "Paging requires native protocol 2+, "
                "currently using: {0}".format(PROTOCOL_VERSION)
            )

        # addresses #225
        class PagingTest(Model):
            id = columns.Integer(primary_key=True)
            val = columns.Integer()
        sync_table(self.conn, PagingTest)

        PagingTest.create(self.conn, id=1, val=1)
        PagingTest.create(self.conn, id=2, val=2)

        with mock.patch.object(self.conn.session, 'default_fetch_size', 1):
            results = PagingTest.objects().find_all(self.conn)

        assert len(results) == 2
        drop_table(self.conn, PagingTest)
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def test_skiptest_in_setupclass(self):
        class Test(unittest2.TestCase):
            @classmethod
            def setUpClass(cls):
                raise unittest2.SkipTest('foo')
            def test_one(self):
                pass
            def test_two(self):
                pass

        result = self.runTests(Test)
        self.assertEqual(result.testsRun, 0)
        self.assertEqual(len(result.errors), 0)
        self.assertEqual(len(result.skipped), 1)
        skipped = result.skipped[0][0]
        self.assertEqual(str(skipped),
                    'setUpClass (%s.%s)' % (__name__,
                    getattr(Test, '__qualname__', Test.__name__)))
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def test_skiptest_in_setupmodule(self):
        class Test(unittest2.TestCase):
            def test_one(self):
                pass
            def test_two(self):
                pass

        class Module(object):
            @staticmethod
            def setUpModule():
                raise unittest2.SkipTest('foo')

        Test.__module__ = 'Module'
        sys.modules['Module'] = Module

        result = self.runTests(Test)
        self.assertEqual(result.testsRun, 0)
        self.assertEqual(len(result.errors), 0)
        self.assertEqual(len(result.skipped), 1)
        skipped = result.skipped[0][0]
        self.assertEqual(str(skipped), 'setUpModule (Module)')
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def test_discover_with_init_module_that_raises_SkipTest_on_import(self):
        vfs = {abspath('/foo'): ['my_package'],
               abspath('/foo/my_package'): ['__init__.py', 'test_module.py']}
        self.setup_import_issue_package_tests(vfs)
        import_calls = []
        def _get_module_from_name(name):
            import_calls.append(name)
            raise unittest.SkipTest('skipperoo')
        loader = unittest.TestLoader()
        loader._get_module_from_name = _get_module_from_name
        suite = loader.discover(abspath('/foo'))

        self.assertIn(abspath('/foo'), sys.path)
        self.assertEqual(suite.countTestCases(), 1)
        result = unittest.TestResult()
        suite.run(result)
        self.assertEqual(len(result.skipped), 1)
        self.assertEqual(result.testsRun, 1)
        self.assertEqual(import_calls, ['my_package'])

        # Check picklability
        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
            pickle.loads(pickle.dumps(suite, proto))
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def assert_garbage_collect_test_after_run(self, TestSuiteClass):
        if not unittest.BaseTestSuite._cleanup:
            raise unittest.SkipTest("Suite cleanup is disabled")

        class Foo(unittest.TestCase):
            def test_nothing(self):
                pass

        test = Foo('test_nothing')
        wref = weakref.ref(test)

        suite = TestSuiteClass([wref()])
        suite.run(unittest.TestResult())

        del test

        # for the benefit of non-reference counting implementations
        gc.collect()

        self.assertEqual(suite._tests, [None])
        self.assertIsNone(wref())
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def test_json(self):
        args = self.databases[0].copy()
        args["charset"] = "utf8mb4"
        conn = pymysql.connect(**args)
        if not self.mysql_server_is(conn, (5, 7, 0)):
            raise SkipTest("JSON type is not supported on MySQL <= 5.6")

        self.safe_create_table(conn, "test_json", """\
create table test_json (
    id int not null,
    json JSON not null,
    primary key (id)
);""")
        cur = conn.cursor()

        json_str = u'{"hello": "?????"}'
        cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
        cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))

        cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def test_json(self):
        args = self.databases[0].copy()
        args["charset"] = "utf8mb4"
        conn = pymysql.connect(**args)
        if not self.mysql_server_is(conn, (5, 7, 0)):
            raise SkipTest("JSON type is not supported on MySQL <= 5.6")

        self.safe_create_table(conn, "test_json", """\
create table test_json (
    id int not null,
    json JSON not null,
    primary key (id)
);""")
        cur = conn.cursor()

        json_str = u'{"hello": "?????"}'
        cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
        cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))

        cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def requires(resource, msg=None):
    """Raise ResourceDenied if the specified resource is not available.

    If the caller's module is __main__ then automatically return True.  The
    possibility of False being returned occurs when regrtest.py is
    executing.
    """
    if resource == 'gui' and not _is_gui_available():
        raise unittest.SkipTest("Cannot use the 'gui' resource")
    # see if the caller's module is __main__ - if so, treat as if
    # the resource was set
    if sys._getframe(1).f_globals.get("__name__") == "__main__":
        return
    if not is_resource_enabled(resource):
        if msg is None:
            msg = "Use of the %r resource not enabled" % resource
        raise ResourceDenied(msg)
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def bigaddrspacetest(f):
    """Decorator for tests that fill the address space."""
    def wrapper(self):
        if max_memuse < MAX_Py_ssize_t:
            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
                raise unittest.SkipTest(
                    "not enough memory: try a 32-bit build instead")
            else:
                raise unittest.SkipTest(
                    "not enough memory: %.1fG minimum needed"
                    % (MAX_Py_ssize_t / (1024 ** 3)))
        else:
            return f(self)
    return wrapper

#=======================================================================
# unittest integration.
项目:bawk    作者:jttwnsnd    | 项目源码 | 文件源码
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
项目:bawk    作者:jttwnsnd    | 项目源码 | 文件源码
def test_json(self):
        args = self.databases[0].copy()
        args["charset"] = "utf8mb4"
        conn = pymysql.connect(**args)
        if not self.mysql_server_is(conn, (5, 7, 0)):
            raise SkipTest("JSON type is not supported on MySQL <= 5.6")

        self.safe_create_table(conn, "test_json", """\
create table test_json (
    id int not null,
    json JSON not null,
    primary key (id)
);""")
        cur = conn.cursor()

        json_str = u'{"hello": "?????"}'
        cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
        cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))

        cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def requires(resource, msg=None):
    """Raise ResourceDenied if the specified resource is not available.

    If the caller's module is __main__ then automatically return True.  The
    possibility of False being returned occurs when regrtest.py is
    executing.
    """
    if resource == 'gui' and not _is_gui_available():
        raise unittest.SkipTest("Cannot use the 'gui' resource")
    # see if the caller's module is __main__ - if so, treat as if
    # the resource was set
    if sys._getframe(1).f_globals.get("__name__") == "__main__":
        return
    if not is_resource_enabled(resource):
        if msg is None:
            msg = "Use of the %r resource not enabled" % resource
        raise ResourceDenied(msg)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def bigaddrspacetest(f):
    """Decorator for tests that fill the address space."""
    def wrapper(self):
        if max_memuse < MAX_Py_ssize_t:
            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
                raise unittest.SkipTest(
                    "not enough memory: try a 32-bit build instead")
            else:
                raise unittest.SkipTest(
                    "not enough memory: %.1fG minimum needed"
                    % (MAX_Py_ssize_t / (1024 ** 3)))
        else:
            return f(self)
    return wrapper

#=======================================================================
# unittest integration.
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def requires(resource, msg=None):
    """Raise ResourceDenied if the specified resource is not available.

    If the caller's module is __main__ then automatically return True.  The
    possibility of False being returned occurs when regrtest.py is
    executing.
    """
    if resource == 'gui' and not _is_gui_available():
        raise unittest.SkipTest("Cannot use the 'gui' resource")
    # see if the caller's module is __main__ - if so, treat as if
    # the resource was set
    if sys._getframe(1).f_globals.get("__name__") == "__main__":
        return
    if not is_resource_enabled(resource):
        if msg is None:
            msg = "Use of the %r resource not enabled" % resource
        raise ResourceDenied(msg)
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def bigaddrspacetest(f):
    """Decorator for tests that fill the address space."""
    def wrapper(self):
        if max_memuse < MAX_Py_ssize_t:
            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
                raise unittest.SkipTest(
                    "not enough memory: try a 32-bit build instead")
            else:
                raise unittest.SkipTest(
                    "not enough memory: %.1fG minimum needed"
                    % (MAX_Py_ssize_t / (1024 ** 3)))
        else:
            return f(self)
    return wrapper

#=======================================================================
# unittest integration.
项目:PDF_text_extract    作者:theemadnes    | 项目源码 | 文件源码
def get_test_client(nowait=False, **kwargs):
    # construct kwargs from the environment
    kw = {'timeout': 30}
    if 'TEST_ES_CONNECTION' in os.environ:
        from elasticsearch import connection
        kw['connection_class'] = getattr(connection, os.environ['TEST_ES_CONNECTION'])

    kw.update(kwargs)
    client = Elasticsearch([os.environ.get('TEST_ES_SERVER', {})], **kw)

    # wait for yellow status
    for _ in range(1 if nowait else 100):
        try:
            client.cluster.health(wait_for_status='yellow')
            return client
        except ConnectionError:
            time.sleep(.1)
    else:
        # timeout
        raise SkipTest("Elasticsearch failed to start.")
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
项目:rekall-agent-server    作者:rekall-innovations    | 项目源码 | 文件源码
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
项目:rekall-agent-server    作者:rekall-innovations    | 项目源码 | 文件源码
def test_json(self):
        args = self.databases[0].copy()
        args["charset"] = "utf8mb4"
        conn = pymysql.connect(**args)
        if not self.mysql_server_is(conn, (5, 7, 0)):
            raise SkipTest("JSON type is not supported on MySQL <= 5.6")

        self.safe_create_table(conn, "test_json", """\
create table test_json (
    id int not null,
    json JSON not null,
    primary key (id)
);""")
        cur = conn.cursor()

        json_str = u'{"hello": "?????"}'
        cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
        cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))

        cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))
项目:FightstickDisplay    作者:calexil    | 项目源码 | 文件源码
def requires(resource, msg=None):
    """Raise ResourceDenied if the specified resource is not available.

    If the caller's module is __main__ then automatically return True.  The
    possibility of False being returned occurs when regrtest.py is
    executing.
    """
    if resource == 'gui' and not _is_gui_available():
        raise unittest.SkipTest("Cannot use the 'gui' resource")
    # see if the caller's module is __main__ - if so, treat as if
    # the resource was set
    if sys._getframe(1).f_globals.get("__name__") == "__main__":
        return
    if not is_resource_enabled(resource):
        if msg is None:
            msg = "Use of the %r resource not enabled" % resource
        raise ResourceDenied(msg)
项目:FightstickDisplay    作者:calexil    | 项目源码 | 文件源码
def bigaddrspacetest(f):
    """Decorator for tests that fill the address space."""
    def wrapper(self):
        if max_memuse < MAX_Py_ssize_t:
            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
                raise unittest.SkipTest(
                    "not enough memory: try a 32-bit build instead")
            else:
                raise unittest.SkipTest(
                    "not enough memory: %.1fG minimum needed"
                    % (MAX_Py_ssize_t / (1024 ** 3)))
        else:
            return f(self)
    return wrapper

#=======================================================================
# unittest integration.
项目:deb-python-pymysql    作者:openstack    | 项目源码 | 文件源码
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
项目:deb-python-pymysql    作者:openstack    | 项目源码 | 文件源码
def test_json(self):
        args = self.databases[0].copy()
        args["charset"] = "utf8mb4"
        conn = pymysql.connect(**args)
        if not self.mysql_server_is(conn, (5, 7, 0)):
            raise SkipTest("JSON type is not supported on MySQL <= 5.6")

        self.safe_create_table(conn, "test_json", """\
create table test_json (
    id int not null,
    json JSON not null,
    primary key (id)
);""")
        cur = conn.cursor()

        json_str = u'{"hello": "?????"}'
        cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
        cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))

        cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))
项目:ServerlessCrawler-VancouverRealState    作者:MarcelloLins    | 项目源码 | 文件源码
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
项目:ServerlessCrawler-VancouverRealState    作者:MarcelloLins    | 项目源码 | 文件源码
def test_json(self):
        args = self.databases[0].copy()
        args["charset"] = "utf8mb4"
        conn = pymysql.connect(**args)
        if not self.mysql_server_is(conn, (5, 7, 0)):
            raise SkipTest("JSON type is not supported on MySQL <= 5.6")

        self.safe_create_table(conn, "test_json", """\
create table test_json (
    id int not null,
    json JSON not null,
    primary key (id)
);""")
        cur = conn.cursor()

        json_str = u'{"hello": "?????"}'
        cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
        cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))

        cur.execute("SELECT CAST(%s AS JSON) AS x", (json_str,))
        res = cur.fetchone()[0]
        self.assertEqual(json.loads(res), json.loads(json_str))
项目:cryptogram    作者:xinmingzhang    | 项目源码 | 文件源码
def requires(resource, msg=None):
    """Raise ResourceDenied if the specified resource is not available.

    If the caller's module is __main__ then automatically return True.  The
    possibility of False being returned occurs when regrtest.py is
    executing.
    """
    if resource == 'gui' and not _is_gui_available():
        raise unittest.SkipTest("Cannot use the 'gui' resource")
    # see if the caller's module is __main__ - if so, treat as if
    # the resource was set
    if sys._getframe(1).f_globals.get("__name__") == "__main__":
        return
    if not is_resource_enabled(resource):
        if msg is None:
            msg = "Use of the %r resource not enabled" % resource
        raise ResourceDenied(msg)
项目:cryptogram    作者:xinmingzhang    | 项目源码 | 文件源码
def bigaddrspacetest(f):
    """Decorator for tests that fill the address space."""
    def wrapper(self):
        if max_memuse < MAX_Py_ssize_t:
            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
                raise unittest.SkipTest(
                    "not enough memory: try a 32-bit build instead")
            else:
                raise unittest.SkipTest(
                    "not enough memory: %.1fG minimum needed"
                    % (MAX_Py_ssize_t / (1024 ** 3)))
        else:
            return f(self)
    return wrapper

#=======================================================================
# unittest integration.
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def requires(resource, msg=None):
    """Raise ResourceDenied if the specified resource is not available.

    If the caller's module is __main__ then automatically return True.  The
    possibility of False being returned occurs when regrtest.py is
    executing.
    """
    if resource == 'gui' and not _is_gui_available():
        raise unittest.SkipTest("Cannot use the 'gui' resource")
    # see if the caller's module is __main__ - if so, treat as if
    # the resource was set
    if sys._getframe(1).f_globals.get("__name__") == "__main__":
        return
    if not is_resource_enabled(resource):
        if msg is None:
            msg = "Use of the %r resource not enabled" % resource
        raise ResourceDenied(msg)
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def bigaddrspacetest(f):
    """Decorator for tests that fill the address space."""
    def wrapper(self):
        if max_memuse < MAX_Py_ssize_t:
            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
                raise unittest.SkipTest(
                    "not enough memory: try a 32-bit build instead")
            else:
                raise unittest.SkipTest(
                    "not enough memory: %.1fG minimum needed"
                    % (MAX_Py_ssize_t / (1024 ** 3)))
        else:
            return f(self)
    return wrapper

#=======================================================================
# unittest integration.
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def setUpClass(cls):
        ReusedPySparkTestCase.setUpClass()
        cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
        try:
            cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
        except py4j.protocol.Py4JError:
            cls.tearDownClass()
            raise unittest.SkipTest("Hive is not available")
        except TypeError:
            cls.tearDownClass()
            raise unittest.SkipTest("Hive is not available")
        os.unlink(cls.tempdir.name)
        _scala_HiveContext =\
            cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
        cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
        cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
        cls.df = cls.sc.parallelize(cls.testData).toDF()
项目:Chromium_DepotTools    作者:p07r0457    | 项目源码 | 文件源码
def quiet_run(self, result, func, *args, **kwargs):
        try:
            func(*args, **kwargs)
        except (KeyboardInterrupt, SystemExit):
            raise
        except unittest.SkipTest as e:
            if hasattr(result, 'addSkip'):
                result.addSkip(self, str(e))
            else:
                warnings.warn("TestResult has no addSkip method, skips not reported",
                              RuntimeWarning, 2)
                result.addSuccess(self)
            return False
        except:
            result.addError(self, self.__exc_info())
            return False
        return True
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def testSocketAuthInstallPlugin(self):
        # needs plugin. lets install it.
        cur = self.connections[0].cursor()
        try:
            cur.execute("install plugin auth_socket soname 'auth_socket.so'")
            TestAuthentication.socket_found = True
            self.socket_plugin_name = 'auth_socket'
            self.realtestSocketAuth()
        except pymysql.err.InternalError:
            try:
                cur.execute("install soname 'auth_socket'")
                TestAuthentication.socket_found = True
                self.socket_plugin_name = 'unix_socket'
                self.realtestSocketAuth()
            except pymysql.err.InternalError:
                TestAuthentication.socket_found = False
                raise unittest2.SkipTest('we couldn\'t install the socket plugin')
        finally:
            if TestAuthentication.socket_found:
                cur.execute("uninstall plugin %s" % self.socket_plugin_name)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def setUp(self):
        self.dirname = os.path.join(tempfile.gettempdir(),
                                    'files_for_testing_neo',
                                    'klustakwik/test1')
        if not os.path.exists(self.dirname):
            raise unittest.SkipTest('data directory does not exist: ' +
                                    self.dirname)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def test1(self):
        """Tests that files can be loaded by basename"""
        kio = KlustaKwikIO(filename=os.path.join(self.dirname, 'basename'))
        if not BaseTestIO.use_network:
            raise unittest.SkipTest("Requires download of data from the web")
        fetfiles = kio._fp.read_filenames('fet')

        self.assertEqual(len(fetfiles), 2)
        self.assertEqual(os.path.abspath(fetfiles[0]),
                         os.path.abspath(os.path.join(self.dirname,
                                                      'basename.fet.0')))
        self.assertEqual(os.path.abspath(fetfiles[1]),
                         os.path.abspath(os.path.join(self.dirname,
                                                      'basename.fet.1')))
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def test3(self):
        """Tests that files can be loaded by basename2"""
        kio = KlustaKwikIO(filename=os.path.join(self.dirname, 'basename2'))
        if not BaseTestIO.use_network:
            raise unittest.SkipTest("Requires download of data from the web")
        clufiles = kio._fp.read_filenames('clu')

        self.assertEqual(len(clufiles), 1)
        self.assertEqual(os.path.abspath(clufiles[1]),
                         os.path.abspath(os.path.join(self.dirname,
                                                      'basename2.clu.1')))
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def setUp(self):
        self.dirname = os.path.join(tempfile.gettempdir(),
                                    'files_for_testing_neo',
                                    'klustakwik/test2')
        if not os.path.exists(self.dirname):
            raise unittest.SkipTest('data directory does not exist: ' +
                                    self.dirname)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def setUp(self):
        self.dirname = os.path.join(tempfile.gettempdir(),
                                    'files_for_testing_neo',
                                    'klustakwik/test3')
        if not os.path.exists(self.dirname):
            raise unittest.SkipTest('data directory does not exist: ' +
                                    self.dirname)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def setUp(self):
        super(CommonTests, self).setUp()
        data_dir = os.path.join(self.local_test_dir,
                                'Cheetah_v{}'.format(self.cheetah_version))

        self.sn = os.path.join(data_dir, 'original_data')
        self.pd = os.path.join(data_dir, 'plain_data')
        if not os.path.exists(self.sn):
            raise unittest.SkipTest('data file does not exist:' + self.sn)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def setUp(self):
        BaseTestIO.setUp(self)
        if sys.platform.startswith('win'):
            distantfile = 'http://download.multichannelsystems.com/download_data/software/neuroshare/nsMCDLibrary_3.7b.zip'
            localfile = os.path.join(tempfile.gettempdir(),'nsMCDLibrary_3.7b.zip')
            if not os.path.exists(localfile):
                urlretrieve(distantfile, localfile)
            if platform.architecture()[0].startswith('64'):
                self.dllname = os.path.join(tempfile.gettempdir(),'Matlab/Matlab-Import-Filter/Matlab_Interface/nsMCDLibrary64.dll')
                if not os.path.exists(self.dllname):
                    zip = zipfile.ZipFile(localfile)
                    zip.extract('Matlab/Matlab-Import-Filter/Matlab_Interface/nsMCDLibrary64.dll', path = tempfile.gettempdir())
            else:
                self.dllname = os.path.join(tempfile.gettempdir(),'Matlab/Matlab-Import-Filter/Matlab_Interface/nsMCDLibrary.dll')
                if not os.path.exists(self.dllname):
                    zip = zipfile.ZipFile(localfile)
                    zip.extract('Matlab/Matlab-Import-Filter/Matlab_Interface/nsMCDLibrary.dll', path = tempfile.gettempdir())

        elif sys.platform.startswith('linux'):
            if platform.architecture()[0].startswith('64'):
                distantfile = 'http://download.multichannelsystems.com/download_data/software/neuroshare/nsMCDLibrary_Linux64_3.7b.tar.gz'
                localfile = os.path.join(tempfile.gettempdir(),'nsMCDLibrary_Linux64_3.7b.tar.gz')
            else:
                distantfile = 'http://download.multichannelsystems.com/download_data/software/neuroshare/nsMCDLibrary_Linux32_3.7b.tar.gz'
                localfile = os.path.join(tempfile.gettempdir(),'nsMCDLibrary_Linux32_3.7b.tar.gz')
            if not os.path.exists(localfile):
                urlretrieve(distantfile, localfile)
            self.dllname = os.path.join(tempfile.gettempdir(),'nsMCDLibrary/nsMCDLibrary.so')
            if not os.path.exists(self.dllname):
                tar = tarfile.open(localfile)
                tar.extract('nsMCDLibrary/nsMCDLibrary.so', path = tempfile.gettempdir())
        else:
            raise unittest.SkipTest("Not currently supported on OS X")
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def setUp(self):
        self.dirname = os.path.join(tempfile.gettempdir(),
                                    'files_for_testing_neo',
                                    'klustakwik/test1')
        if not os.path.exists(self.dirname):
            raise unittest.SkipTest('data directory does not exist: ' +
                                    self.dirname)