Python django.db.connections 模块,all() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.connections.all()

项目:wagtail-pg-search-backend    作者:wagtail    | 项目源码 | 文件源码
def keyword_split(keywords):
    """
    Return all the keywords in a keyword string.

    Keeps keywords surrounded by quotes together, removing the surrounding quotes:

    >>> keyword_split('Hello I\\'m looking for "something special"')
    ['Hello', "I'm", 'looking', 'for', 'something special']

    Nested quoted strings are returned as is:

    >>> keyword_split("He said \\"I'm looking for 'something special'\\" so I've given him the 'special item'")
    ['He', 'said', "I'm looking for 'something special'", 'so', "I've", 'given', 'him', 'the', 'special item']

    """
    matches = re.findall(r'"([^"]+)"|\'([^\']+)\'|(\S+)', keywords)
    return [match[0] or match[1] or match[2] for match in matches]
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def close_db_connections(func, *args, **kwargs):
    """
    Decorator to explicitly close db connections during threaded execution

    Note this is necessary to work around:
    https://code.djangoproject.com/ticket/22420
    """
    def _close_db_connections(*args, **kwargs):
        ret = None
        try:
            ret = func(*args, **kwargs)
        finally:
            from django.db import connections
            for conn in connections.all():
                conn.close()
        return ret
    return _close_db_connections
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def _etcd_publish_config(**kwargs):
    config = kwargs['instance']
    # we purge all existing config when adding the newest instance. This is because
    # deis config:unset would remove an existing value, but not delete the
    # old config object
    try:
        _etcd_client.delete('/deis/config/{}'.format(config.app),
                            prevExist=True, dir=True, recursive=True)
    except KeyError:
        pass
    for k, v in config.values.iteritems():
        _etcd_client.write(
            '/deis/config/{}/{}'.format(
                config.app,
                unicode(k).encode('utf-8').lower()),
            unicode(v).encode('utf-8'))
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def close_db_connections(func, *args, **kwargs):
    """
    Decorator to explicitly close db connections during threaded execution

    Note this is necessary to work around:
    https://code.djangoproject.com/ticket/22420
    """
    def _close_db_connections(*args, **kwargs):
        ret = None
        try:
            ret = func(*args, **kwargs)
        finally:
            from django.db import connections
            for conn in connections.all():
                conn.close()
        return ret
    return _close_db_connections
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def _etcd_publish_config(**kwargs):
    config = kwargs['instance']
    # we purge all existing config when adding the newest instance. This is because
    # deis config:unset would remove an existing value, but not delete the
    # old config object
    try:
        _etcd_client.delete('/deis/config/{}'.format(config.app),
                            prevExist=True, dir=True, recursive=True)
    except KeyError:
        pass
    for k, v in config.values.iteritems():
        _etcd_client.write(
            '/deis/config/{}/{}'.format(
                config.app,
                unicode(k).encode('utf-8').lower()),
            unicode(v).encode('utf-8'))
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def update_connections_time_zone(**kwargs):
    if kwargs['setting'] == 'TIME_ZONE':
        # Reset process time zone
        if hasattr(time, 'tzset'):
            if kwargs['value']:
                os.environ['TZ'] = kwargs['value']
            else:
                os.environ.pop('TZ', None)
            time.tzset()

        # Reset local time zone cache
        timezone.get_default_timezone.cache_clear()

    # Reset the database connections' time zone
    if kwargs['setting'] in {'TIME_ZONE', 'USE_TZ'}:
        for conn in connections.all():
            try:
                del conn.timezone
            except AttributeError:
                pass
            try:
                del conn.timezone_name
            except AttributeError:
                pass
            conn.ensure_timezone()
项目:liberator    作者:libscie    | 项目源码 | 文件源码
def update_connections_time_zone(**kwargs):
    if kwargs['setting'] == 'TIME_ZONE':
        # Reset process time zone
        if hasattr(time, 'tzset'):
            if kwargs['value']:
                os.environ['TZ'] = kwargs['value']
            else:
                os.environ.pop('TZ', None)
            time.tzset()

        # Reset local time zone cache
        timezone.get_default_timezone.cache_clear()

    # Reset the database connections' time zone
    if kwargs['setting'] in {'TIME_ZONE', 'USE_TZ'}:
        for conn in connections.all():
            try:
                del conn.timezone
            except AttributeError:
                pass
            try:
                del conn.timezone_name
            except AttributeError:
                pass
            conn.ensure_timezone()
项目:wagtail-pg-search-backend    作者:wagtail    | 项目源码 | 文件源码
def get_postgresql_connections():
    return [connection for connection in connections.all()
            if connection.vendor == 'postgresql']


# Reduce any iterable to a single value using a logical OR e.g. (a | b | ...)
项目:wagtail-pg-search-backend    作者:wagtail    | 项目源码 | 文件源码
def get_descendant_models(model):
    """
    Returns all descendants of a model, including the model itself.
    """
    descendant_models = {other_model for other_model in apps.get_models()
                         if issubclass(other_model, model)}
    descendant_models.add(model)
    return descendant_models
项目:django-cavalry    作者:valohai    | 项目源码 | 文件源码
def force_debug_cursor():
    for conn in connections.all():
        conn._cavalry_old_force_debug_cursor = conn.force_debug_cursor
        conn.force_debug_cursor = True
    yield
    for conn in connections.all():
        conn.force_debug_cursor = conn._cavalry_old_force_debug_cursor
项目:django-cavalry    作者:valohai    | 项目源码 | 文件源码
def _process(request, get_response):
    with force_debug_cursor(), managed(
        db_record_stacks=getattr(settings, 'CAVALRY_DB_RECORD_STACKS', True),
    ) as data:
        data['start_time'] = get_time()
        response = get_response(request)
        if isinstance(response, SimpleTemplateResponse):
            response.render()
        data['end_time'] = get_time()
        data['duration'] = data['end_time'] - data['start_time']
        data['databases'] = {}
        for conn in connections.all():
            queries = conn.queries
            data['databases'][conn.alias] = {
                'queries': queries,
                'n_queries': len(queries),
                'time': (sum(q.get('hrtime', 0) * 1000 for q in queries) if queries else 0),
            }
    inject_stats(request, response, data)
    post_stats_kwargs = {'request': request, 'response': response, 'data': data}

    if getattr(settings, 'CAVALRY_THREADED_POST', False):
        Thread(name='cavalry poster', target=post_stats, kwargs=post_stats_kwargs, daemon=False).start()
    else:
        post_stats(**post_stats_kwargs)
    return response
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def make_view_atomic(self, view):
        non_atomic_requests = getattr(view, '_non_atomic_requests', set())
        for db in connections.all():
            if (db.settings_dict['ATOMIC_REQUESTS']
                    and db.alias not in non_atomic_requests):
                view = transaction.atomic(using=db.alias)(view)
        return view
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def update_connections_time_zone(**kwargs):
    if kwargs['setting'] == 'TIME_ZONE':
        # Reset process time zone
        if hasattr(time, 'tzset'):
            if kwargs['value']:
                os.environ['TZ'] = kwargs['value']
            else:
                os.environ.pop('TZ', None)
            time.tzset()

        # Reset local time zone cache
        timezone.get_default_timezone.cache_clear()

    # Reset the database connections' time zone
    if kwargs['setting'] in {'TIME_ZONE', 'USE_TZ'}:
        for conn in connections.all():
            try:
                del conn.timezone
            except AttributeError:
                pass
            try:
                del conn.timezone_name
            except AttributeError:
                pass
            tz_sql = conn.ops.set_time_zone_sql()
            if tz_sql and conn.timezone_name:
                with conn.cursor() as cursor:
                    cursor.execute(tz_sql, [conn.timezone_name])
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def _databases_names(cls, include_mirrors=True):
        # If the test case has a multi_db=True flag, act on all databases,
        # including mirrors or not. Otherwise, just on the default DB.
        if getattr(cls, 'multi_db', False):
            return [alias for alias in connections
                    if include_mirrors or not connections[alias].settings_dict['TEST']['MIRROR']]
        else:
            return [DEFAULT_DB_ALIAS]
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def _post_teardown(self):
        """Performs any post-test things. This includes:

        * Flushing the contents of the database, to leave a clean slate. If
          the class has an 'available_apps' attribute, post_migrate isn't fired.
        * Force-closing the connection, so the next test gets a clean cursor.
        """
        try:
            self._fixture_teardown()
            super(TransactionTestCase, self)._post_teardown()
            if self._should_reload_connections():
                # Some DB cursors include SQL statements as part of cursor
                # creation. If you have a test that does a rollback, the effect
                # of these statements is lost, which can affect the operation of
                # tests (e.g., losing a timezone setting causing objects to be
                # created with the wrong time). To make sure this doesn't
                # happen, get a clean connection at the start of every test.
                for conn in connections.all():
                    conn.close()
        finally:
            if self.available_apps is not None:
                apps.unset_available_apps()
                setting_changed.send(sender=settings._wrapped.__class__,
                                     setting='INSTALLED_APPS',
                                     value=settings.INSTALLED_APPS,
                                     enter=False)
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def tearDownClass(cls):
        if connections_support_transactions():
            cls._rollback_atomics(cls.cls_atomics)
            for conn in connections.all():
                conn.close()
        super(TestCase, cls).tearDownClass()
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def skipUnlessDBFeature(*features):
    """
    Skip a test unless a database has all the named features.
    """
    return _deferredSkip(
        lambda: not all(getattr(connection.features, feature, False) for feature in features),
        "Database doesn't support feature(s): %s" % ", ".join(features)
    )
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def _tearDownClassInternal(cls):
        # There may not be a 'server_thread' attribute if setUpClass() for some
        # reasons has raised an exception.
        if hasattr(cls, 'server_thread'):
            # Terminate the live server's thread
            cls.server_thread.terminate()
            cls.server_thread.join()

        # Restore sqlite in-memory database connections' non-shareability
        for conn in connections.all():
            if conn.vendor == 'sqlite' and conn.is_in_memory_db(conn.settings_dict['NAME']):
                conn.allow_thread_sharing = False
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def check_database_backends(*args, **kwargs):
    issues = []
    for conn in connections.all():
        issues.extend(conn.validation.check(**kwargs))
    return issues
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def make_view_atomic(self, view):
        non_atomic_requests = getattr(view, '_non_atomic_requests', set())
        for db in connections.all():
            if db.settings_dict['ATOMIC_REQUESTS'] and db.alias not in non_atomic_requests:
                view = transaction.atomic(using=db.alias)(view)
        return view
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def check_database_backends(*args, **kwargs):
    issues = []
    for conn in connections.all():
        issues.extend(conn.validation.check(**kwargs))
    return issues
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def make_view_atomic(self, view):
        non_atomic_requests = getattr(view, '_non_atomic_requests', set())
        for db in connections.all():
            if db.settings_dict['ATOMIC_REQUESTS'] and db.alias not in non_atomic_requests:
                view = transaction.atomic(using=db.alias)(view)
        return view
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def ready(self):
        # Connections may already exist before we are called.
        for conn in connections.all():
            if conn.connection is not None:
                register_type_handlers(conn)
        connection_created.connect(register_type_handlers)
        CharField.register_lookup(Unaccent)
        TextField.register_lookup(Unaccent)
        CharField.register_lookup(SearchLookup)
        TextField.register_lookup(SearchLookup)
        CharField.register_lookup(TrigramSimilar)
        TextField.register_lookup(TrigramSimilar)
项目:django    作者:alexsukhrin    | 项目源码 | 文件源码
def check_database_backends(*args, **kwargs):
    issues = []
    for conn in connections.all():
        issues.extend(conn.validation.check(**kwargs))
    return issues
项目:django    作者:alexsukhrin    | 项目源码 | 文件源码
def make_view_atomic(self, view):
        non_atomic_requests = getattr(view, '_non_atomic_requests', set())
        for db in connections.all():
            if db.settings_dict['ATOMIC_REQUESTS'] and db.alias not in non_atomic_requests:
                view = transaction.atomic(using=db.alias)(view)
        return view
项目:django    作者:alexsukhrin    | 项目源码 | 文件源码
def ready(self):
        # Connections may already exist before we are called.
        for conn in connections.all():
            if conn.connection is not None:
                register_type_handlers(conn)
        connection_created.connect(register_type_handlers)
        CharField.register_lookup(Unaccent)
        TextField.register_lookup(Unaccent)
        CharField.register_lookup(SearchLookup)
        TextField.register_lookup(SearchLookup)
        CharField.register_lookup(TrigramSimilar)
        TextField.register_lookup(TrigramSimilar)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def delete(self, *args, **kwargs):
        """Delete this application including all containers"""
        try:
            # attempt to remove containers from the scheduler
            self._destroy_containers([c for c in self.container_set.exclude(type='run')])
        except RuntimeError:
            pass
        self._clean_app_logs()
        return super(App, self).delete(*args, **kwargs)
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def restart(self, **kwargs):
        to_restart = self.container_set.all()
        if kwargs.get('type'):
            to_restart = to_restart.filter(type=kwargs.get('type'))
        if kwargs.get('num'):
            to_restart = to_restart.filter(num=kwargs.get('num'))
        self._restart_containers(to_restart)
        return to_restart
项目:paas-tools    作者:imperodesign    | 项目源码 | 文件源码
def restart(self, **kwargs):
        to_restart = self.container_set.all()
        if kwargs.get('type'):
            to_restart = to_restart.filter(type=kwargs.get('type'))
        if kwargs.get('num'):
            to_restart = to_restart.filter(num=kwargs.get('num'))
        self._restart_containers(to_restart)
        return to_restart
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def patch_db(tracer):
    for c in connections.all():
        patch_conn(tracer, c)
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def unpatch_db():
    for c in connections.all():
        unpatch_conn(c)
项目:tumanov_castleoaks    作者:Roamdev    | 项目源码 | 文件源码
def process_request(self, request):
        for connection in connections.all():
            if not hasattr(connection, '_devserver_cursor_old'):
                connection._devserver_queries = []
                connection._devserver_cursor_old = connection.cursor
                connection.cursor = lambda: DevserverCursorWrapper(connection._devserver_cursor_old(), connection)
项目:Gypsy    作者:benticarlos    | 项目源码 | 文件源码
def check_database_backends(*args, **kwargs):
    issues = []
    for conn in connections.all():
        issues.extend(conn.validation.check(**kwargs))
    return issues
项目:Gypsy    作者:benticarlos    | 项目源码 | 文件源码
def make_view_atomic(self, view):
        non_atomic_requests = getattr(view, '_non_atomic_requests', set())
        for db in connections.all():
            if db.settings_dict['ATOMIC_REQUESTS'] and db.alias not in non_atomic_requests:
                view = transaction.atomic(using=db.alias)(view)
        return view
项目:DjangoBlog    作者:0daybug    | 项目源码 | 文件源码
def make_view_atomic(self, view):
        non_atomic_requests = getattr(view, '_non_atomic_requests', set())
        for db in connections.all():
            if (db.settings_dict['ATOMIC_REQUESTS']
                    and db.alias not in non_atomic_requests):
                view = transaction.atomic(using=db.alias)(view)
        return view
项目:wanblog    作者:wanzifa    | 项目源码 | 文件源码
def make_view_atomic(self, view):
        non_atomic_requests = getattr(view, '_non_atomic_requests', set())
        for db in connections.all():
            if (db.settings_dict['ATOMIC_REQUESTS']
                    and db.alias not in non_atomic_requests):
                view = transaction.atomic(using=db.alias)(view)
        return view
项目:tabmaster    作者:NicolasMinghetti    | 项目源码 | 文件源码
def make_view_atomic(self, view):
        non_atomic_requests = getattr(view, '_non_atomic_requests', set())
        for db in connections.all():
            if (db.settings_dict['ATOMIC_REQUESTS']
                    and db.alias not in non_atomic_requests):
                view = transaction.atomic(using=db.alias)(view)
        return view
项目:trydjango18    作者:lucifer-yqh    | 项目源码 | 文件源码
def make_view_atomic(self, view):
        non_atomic_requests = getattr(view, '_non_atomic_requests', set())
        for db in connections.all():
            if (db.settings_dict['ATOMIC_REQUESTS']
                    and db.alias not in non_atomic_requests):
                view = transaction.atomic(using=db.alias)(view)
        return view
项目:trydjango18    作者:wei0104    | 项目源码 | 文件源码
def make_view_atomic(self, view):
        non_atomic_requests = getattr(view, '_non_atomic_requests', set())
        for db in connections.all():
            if (db.settings_dict['ATOMIC_REQUESTS']
                    and db.alias not in non_atomic_requests):
                view = transaction.atomic(using=db.alias)(view)
        return view
项目:graphene-django    作者:graphql-python    | 项目源码 | 文件源码
def get_debug_promise(self):
        if not self.debug_promise:
            self.debug_promise = Promise.all(self.promises)
        return self.debug_promise.then(self.on_resolve_all_promises)
项目:graphene-django    作者:graphql-python    | 项目源码 | 文件源码
def enable_instrumentation(self):
        # This is thread-safe because database connections are thread-local.
        for connection in connections.all():
            wrap_cursor(connection, self)
项目:graphene-django    作者:graphql-python    | 项目源码 | 文件源码
def disable_instrumentation(self):
        for connection in connections.all():
            unwrap_cursor(connection)
项目:ims    作者:ims-team    | 项目源码 | 文件源码
def check_database_backends(*args, **kwargs):
    issues = []
    for conn in connections.all():
        issues.extend(conn.validation.check(**kwargs))
    return issues
项目:ims    作者:ims-team    | 项目源码 | 文件源码
def make_view_atomic(self, view):
        non_atomic_requests = getattr(view, '_non_atomic_requests', set())
        for db in connections.all():
            if db.settings_dict['ATOMIC_REQUESTS'] and db.alias not in non_atomic_requests:
                view = transaction.atomic(using=db.alias)(view)
        return view
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def check_database_backends(*args, **kwargs):
    issues = []
    for conn in connections.all():
        issues.extend(conn.validation.check(**kwargs))
    return issues
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def make_view_atomic(self, view):
        non_atomic_requests = getattr(view, '_non_atomic_requests', set())
        for db in connections.all():
            if db.settings_dict['ATOMIC_REQUESTS'] and db.alias not in non_atomic_requests:
                view = transaction.atomic(using=db.alias)(view)
        return view
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def ready(self):
        # Connections may already exist before we are called.
        for conn in connections.all():
            if conn.connection is not None:
                register_hstore_handler(conn)
        connection_created.connect(register_hstore_handler)
        CharField.register_lookup(Unaccent)
        TextField.register_lookup(Unaccent)
        CharField.register_lookup(SearchLookup)
        TextField.register_lookup(SearchLookup)
        CharField.register_lookup(TrigramSimilar)
        TextField.register_lookup(TrigramSimilar)
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def _post_teardown(self):
        """Performs any post-test things. This includes:

        * Flushing the contents of the database, to leave a clean slate. If
          the class has an 'available_apps' attribute, post_migrate isn't fired.
        * Force-closing the connection, so the next test gets a clean cursor.
        """
        try:
            self._fixture_teardown()
            super(TransactionTestCase, self)._post_teardown()
            if self._should_reload_connections():
                # Some DB cursors include SQL statements as part of cursor
                # creation. If you have a test that does a rollback, the effect
                # of these statements is lost, which can affect the operation of
                # tests (e.g., losing a timezone setting causing objects to be
                # created with the wrong time). To make sure this doesn't
                # happen, get a clean connection at the start of every test.
                for conn in connections.all():
                    conn.close()
        finally:
            if self.available_apps is not None:
                apps.unset_available_apps()
                setting_changed.send(sender=settings._wrapped.__class__,
                                     setting='INSTALLED_APPS',
                                     value=settings.INSTALLED_APPS,
                                     enter=False)
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def connections_support_transactions():
    """
    Returns True if all connections support transactions.
    """
    return all(conn.features.supports_transactions
               for conn in connections.all())
项目:lifesoundtrack    作者:MTG    | 项目源码 | 文件源码
def tearDownClass(cls):
        if connections_support_transactions():
            cls._rollback_atomics(cls.cls_atomics)
            for conn in connections.all():
                conn.close()
        super(TestCase, cls).tearDownClass()