我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.connections.all()。
def keyword_split(keywords): """ Return all the keywords in a keyword string. Keeps keywords surrounded by quotes together, removing the surrounding quotes: >>> keyword_split('Hello I\\'m looking for "something special"') ['Hello', "I'm", 'looking', 'for', 'something special'] Nested quoted strings are returned as is: >>> keyword_split("He said \\"I'm looking for 'something special'\\" so I've given him the 'special item'") ['He', 'said', "I'm looking for 'something special'", 'so', "I've", 'given', 'him', 'the', 'special item'] """ matches = re.findall(r'"([^"]+)"|\'([^\']+)\'|(\S+)', keywords) return [match[0] or match[1] or match[2] for match in matches]
def close_db_connections(func, *args, **kwargs): """ Decorator to explicitly close db connections during threaded execution Note this is necessary to work around: https://code.djangoproject.com/ticket/22420 """ def _close_db_connections(*args, **kwargs): ret = None try: ret = func(*args, **kwargs) finally: from django.db import connections for conn in connections.all(): conn.close() return ret return _close_db_connections
def _etcd_publish_config(**kwargs): config = kwargs['instance'] # we purge all existing config when adding the newest instance. This is because # deis config:unset would remove an existing value, but not delete the # old config object try: _etcd_client.delete('/deis/config/{}'.format(config.app), prevExist=True, dir=True, recursive=True) except KeyError: pass for k, v in config.values.iteritems(): _etcd_client.write( '/deis/config/{}/{}'.format( config.app, unicode(k).encode('utf-8').lower()), unicode(v).encode('utf-8'))
def update_connections_time_zone(**kwargs): if kwargs['setting'] == 'TIME_ZONE': # Reset process time zone if hasattr(time, 'tzset'): if kwargs['value']: os.environ['TZ'] = kwargs['value'] else: os.environ.pop('TZ', None) time.tzset() # Reset local time zone cache timezone.get_default_timezone.cache_clear() # Reset the database connections' time zone if kwargs['setting'] in {'TIME_ZONE', 'USE_TZ'}: for conn in connections.all(): try: del conn.timezone except AttributeError: pass try: del conn.timezone_name except AttributeError: pass conn.ensure_timezone()
def get_postgresql_connections(): return [connection for connection in connections.all() if connection.vendor == 'postgresql'] # Reduce any iterable to a single value using a logical OR e.g. (a | b | ...)
def get_descendant_models(model): """ Returns all descendants of a model, including the model itself. """ descendant_models = {other_model for other_model in apps.get_models() if issubclass(other_model, model)} descendant_models.add(model) return descendant_models
def force_debug_cursor(): for conn in connections.all(): conn._cavalry_old_force_debug_cursor = conn.force_debug_cursor conn.force_debug_cursor = True yield for conn in connections.all(): conn.force_debug_cursor = conn._cavalry_old_force_debug_cursor
def _process(request, get_response): with force_debug_cursor(), managed( db_record_stacks=getattr(settings, 'CAVALRY_DB_RECORD_STACKS', True), ) as data: data['start_time'] = get_time() response = get_response(request) if isinstance(response, SimpleTemplateResponse): response.render() data['end_time'] = get_time() data['duration'] = data['end_time'] - data['start_time'] data['databases'] = {} for conn in connections.all(): queries = conn.queries data['databases'][conn.alias] = { 'queries': queries, 'n_queries': len(queries), 'time': (sum(q.get('hrtime', 0) * 1000 for q in queries) if queries else 0), } inject_stats(request, response, data) post_stats_kwargs = {'request': request, 'response': response, 'data': data} if getattr(settings, 'CAVALRY_THREADED_POST', False): Thread(name='cavalry poster', target=post_stats, kwargs=post_stats_kwargs, daemon=False).start() else: post_stats(**post_stats_kwargs) return response
def make_view_atomic(self, view): non_atomic_requests = getattr(view, '_non_atomic_requests', set()) for db in connections.all(): if (db.settings_dict['ATOMIC_REQUESTS'] and db.alias not in non_atomic_requests): view = transaction.atomic(using=db.alias)(view) return view
def update_connections_time_zone(**kwargs): if kwargs['setting'] == 'TIME_ZONE': # Reset process time zone if hasattr(time, 'tzset'): if kwargs['value']: os.environ['TZ'] = kwargs['value'] else: os.environ.pop('TZ', None) time.tzset() # Reset local time zone cache timezone.get_default_timezone.cache_clear() # Reset the database connections' time zone if kwargs['setting'] in {'TIME_ZONE', 'USE_TZ'}: for conn in connections.all(): try: del conn.timezone except AttributeError: pass try: del conn.timezone_name except AttributeError: pass tz_sql = conn.ops.set_time_zone_sql() if tz_sql and conn.timezone_name: with conn.cursor() as cursor: cursor.execute(tz_sql, [conn.timezone_name])
def _databases_names(cls, include_mirrors=True): # If the test case has a multi_db=True flag, act on all databases, # including mirrors or not. Otherwise, just on the default DB. if getattr(cls, 'multi_db', False): return [alias for alias in connections if include_mirrors or not connections[alias].settings_dict['TEST']['MIRROR']] else: return [DEFAULT_DB_ALIAS]
def _post_teardown(self): """Performs any post-test things. This includes: * Flushing the contents of the database, to leave a clean slate. If the class has an 'available_apps' attribute, post_migrate isn't fired. * Force-closing the connection, so the next test gets a clean cursor. """ try: self._fixture_teardown() super(TransactionTestCase, self)._post_teardown() if self._should_reload_connections(): # Some DB cursors include SQL statements as part of cursor # creation. If you have a test that does a rollback, the effect # of these statements is lost, which can affect the operation of # tests (e.g., losing a timezone setting causing objects to be # created with the wrong time). To make sure this doesn't # happen, get a clean connection at the start of every test. for conn in connections.all(): conn.close() finally: if self.available_apps is not None: apps.unset_available_apps() setting_changed.send(sender=settings._wrapped.__class__, setting='INSTALLED_APPS', value=settings.INSTALLED_APPS, enter=False)
def tearDownClass(cls): if connections_support_transactions(): cls._rollback_atomics(cls.cls_atomics) for conn in connections.all(): conn.close() super(TestCase, cls).tearDownClass()
def skipUnlessDBFeature(*features): """ Skip a test unless a database has all the named features. """ return _deferredSkip( lambda: not all(getattr(connection.features, feature, False) for feature in features), "Database doesn't support feature(s): %s" % ", ".join(features) )
def _tearDownClassInternal(cls): # There may not be a 'server_thread' attribute if setUpClass() for some # reasons has raised an exception. if hasattr(cls, 'server_thread'): # Terminate the live server's thread cls.server_thread.terminate() cls.server_thread.join() # Restore sqlite in-memory database connections' non-shareability for conn in connections.all(): if conn.vendor == 'sqlite' and conn.is_in_memory_db(conn.settings_dict['NAME']): conn.allow_thread_sharing = False
def check_database_backends(*args, **kwargs): issues = [] for conn in connections.all(): issues.extend(conn.validation.check(**kwargs)) return issues
def make_view_atomic(self, view): non_atomic_requests = getattr(view, '_non_atomic_requests', set()) for db in connections.all(): if db.settings_dict['ATOMIC_REQUESTS'] and db.alias not in non_atomic_requests: view = transaction.atomic(using=db.alias)(view) return view
def ready(self): # Connections may already exist before we are called. for conn in connections.all(): if conn.connection is not None: register_type_handlers(conn) connection_created.connect(register_type_handlers) CharField.register_lookup(Unaccent) TextField.register_lookup(Unaccent) CharField.register_lookup(SearchLookup) TextField.register_lookup(SearchLookup) CharField.register_lookup(TrigramSimilar) TextField.register_lookup(TrigramSimilar)
def delete(self, *args, **kwargs): """Delete this application including all containers""" try: # attempt to remove containers from the scheduler self._destroy_containers([c for c in self.container_set.exclude(type='run')]) except RuntimeError: pass self._clean_app_logs() return super(App, self).delete(*args, **kwargs)
def restart(self, **kwargs): to_restart = self.container_set.all() if kwargs.get('type'): to_restart = to_restart.filter(type=kwargs.get('type')) if kwargs.get('num'): to_restart = to_restart.filter(num=kwargs.get('num')) self._restart_containers(to_restart) return to_restart
def patch_db(tracer): for c in connections.all(): patch_conn(tracer, c)
def unpatch_db(): for c in connections.all(): unpatch_conn(c)
def process_request(self, request): for connection in connections.all(): if not hasattr(connection, '_devserver_cursor_old'): connection._devserver_queries = [] connection._devserver_cursor_old = connection.cursor connection.cursor = lambda: DevserverCursorWrapper(connection._devserver_cursor_old(), connection)
def get_debug_promise(self): if not self.debug_promise: self.debug_promise = Promise.all(self.promises) return self.debug_promise.then(self.on_resolve_all_promises)
def enable_instrumentation(self): # This is thread-safe because database connections are thread-local. for connection in connections.all(): wrap_cursor(connection, self)
def disable_instrumentation(self): for connection in connections.all(): unwrap_cursor(connection)
def ready(self): # Connections may already exist before we are called. for conn in connections.all(): if conn.connection is not None: register_hstore_handler(conn) connection_created.connect(register_hstore_handler) CharField.register_lookup(Unaccent) TextField.register_lookup(Unaccent) CharField.register_lookup(SearchLookup) TextField.register_lookup(SearchLookup) CharField.register_lookup(TrigramSimilar) TextField.register_lookup(TrigramSimilar)
def connections_support_transactions(): """ Returns True if all connections support transactions. """ return all(conn.features.supports_transactions for conn in connections.all())