我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.connection.queries()。
def get_sample_results(cls): read = """QUERY - 'SELECT "auth_user"."id" FROM "auth_group"'""" \ """- PARAMS = ()""" write = """QUERY - 'UPDATE "auth_group" SET "name" = %s'""" \ """- PARAMS = ('bar',)""" other = """QUERY - 'BEGIN TRANSACTION' - PARAMS = ()""" def to_query(sql): return {'sql': sql, 'time': '%.3f' % random.random()} def to_single_result(*sqls): qc = cls() qc.queries = [to_query(sql) for sql in sqls] return qc.get_results_to_send() return [ to_single_result(*sqls) for sqls in [ [read], [read, write], [read, read], [write, write], [other, other], [read, write, other] ] ]
def test_debug_sql_logger(caplog, settings): from pootle_project.models import Project from django.db import connection settings.DEBUG = True queries = len(connection.queries) log_new_queries(queries) assert caplog.records == [] # trigger some sql and log Project.objects.count() log_new_queries(queries) timing = caplog.records[0].message sql = caplog.records[1].message # match the timing, sql assert re.match("^\d+?\.\d+?$", timing) assert "SELECT COUNT" in sql assert "pootle_app_project" in sql
def log_test_timing(debug_logger, timings, name, start): from django.db import connection time_taken = time.time() - start timings["tests"][name] = dict( slow_queries=[ q for q in connection.queries if float(q["time"]) > 0], query_count=len(connection.queries), timing=time_taken) debug_logger.debug( "{: <70} {: <10} {: <10}".format( *(name, round(time_taken, 4), len(connection.queries))))
def log_test_report(debug_logger, timings): debug_logger.debug( "%s\nTESTS END: %s", "=" * 80, datetime.now()) total_time = time.time() - timings["start"] total_queries = sum( t["query_count"] for t in timings["tests"].values()) if total_queries: avg_query_time = total_time / total_queries debug_logger.debug( "TESTS AVERAGE query time: %s", avg_query_time) debug_logger.debug( "TESTS TOTAL test time: %s", total_time) debug_logger.debug( "TESTS TOTAL queries: %s", total_queries) debug_logger.debug("%s\n" % ("=" * 80))
def assertQueries(self, *prefixes): "Assert the correct queries are efficiently executed for a block." debug = connection.use_debug_cursor connection.use_debug_cursor = True count = len(connection.queries) yield if type(prefixes[0]) == int: assert prefixes[0] == len(connection.queries[count:]) else: for prefix, query in itertools.izip_longest(prefixes, connection.queries[count:]): assert prefix and query and query['sql'].startswith(prefix), (prefix, query) connection.use_debug_cursor = debug
def test_no_locks_query_count(self): """Check that query count to pull in available jobs hasn't changed If this test fails, consider changing the EXPECTED_QUERIES, or why it regressed. """ EXPECTED_QUERIES = 0 # no jobs locking this object host_ct_key = ContentType.objects.get_for_model( self.host.downcast()).natural_key() host_id = self.host.id # Loads up the caches js = JobScheduler() reset_queries() js.available_transitions([(host_ct_key, host_id, ), ]) query_sum = len(connection.queries) self.assertEqual(query_sum, EXPECTED_QUERIES, "something changed with queries! " "got %s expected %s" % (query_sum, EXPECTED_QUERIES))
def query_statistic(func): @wraps(func) def func_wrapper(*args, **kwargs): query_count = len(connection.queries) time = 0.0 result = func(*args, **kwargs) queries_number = len(connection.queries) - query_count performed_query_list = connection.queries[-queries_number:] for query in performed_query_list: if query['sql'] == 'BEGIN': queries_number -= 1 # ignore begin transaction else: time += float(query['time']) message = "[Statistics] : {total} queries performed in {time}s." print(message.format(total=queries_number, time=time)) return result return func_wrapper
def display_queries(request, stats, queries): """ Generate a HttpResponse of SQL queries for a profiling run. _stats_ should contain a pstats.Stats of a hotshot session. _queries_ should contain a list of SQL queries. """ sort = request.REQUEST.get('sort_by', 'time') sort_buttons = RadioButtons('sort_by', sort, (('order', 'by order'), ('time', 'time'), ('queries', 'query count'))) output = render_queries(queries, sort) output.reset() output = [html.escape(unicode(line)) for line in output.readlines()] response = HttpResponse(mimetype='text/html; charset=utf-8') response.content = (queries_template % {'sort_buttons': sort_buttons, 'num_queries': len(queries), 'queries': "".join(output), 'rawqueries' : b64encode(cPickle.dumps(queries)), 'rawstats': b64encode(pickle_stats(stats)), 'url': request.path}) return response
def display_queries(request, stats, queries): """ Generate a HttpResponse of SQL queries for a profiling run. _stats_ should contain a pstats.Stats of a hotshot session. _queries_ should contain a list of SQL queries. """ sort = request.REQUEST.get('sort_by', 'time') sort_buttons = RadioButtons('sort_by', sort, ( ('order', 'by order'), ('time', 'time'), ('queries', 'query count') )) output = render_queries(queries, sort) output.reset() output = [html.escape(unicode(line)) for line in output.readlines()] response = HttpResponse(mimetype='text/html; charset=utf-8') response.content = (queries_template % { 'sort_buttons': sort_buttons, 'num_queries': len(queries), 'queries': "".join(output), 'rawqueries' : b64encode(cPickle.dumps(queries)), 'rawstats': b64encode(pickle_stats(stats)), 'url': request.path }) return response
def csv_masks(request, hashfile_id): hashfile = get_object_or_404(Hashfile, id=hashfile_id) # didn't found the correct way in pure django... res = Cracked.objects.raw("SELECT id, password_mask, COUNT(*) AS count FROM Hashcat_cracked USE INDEX (hashfileid_id_index) WHERE hashfile_id=%s GROUP BY password_mask ORDER BY count DESC", [hashfile.id]) fp = tempfile.SpooledTemporaryFile(mode='w') csvfile = csv.writer(fp, quotechar='"', quoting=csv.QUOTE_ALL) for item in res: csvfile.writerow([item.count, item.password_mask]) fp.seek(0) # rewind the file handle csvfile_data = fp.read() for query in connection.queries[-1:]: print(query["sql"]) print(query["time"]) response = HttpResponse(csvfile_data, content_type='application/force-download') # mimetype is replaced by content_type for django 1.7 response['Content-Disposition'] = 'attachment; filename=%s_masks.csv' % hashfile.name return response
def log_sql_queries_to_console(path): """ Logs SQL queries to terminal if in debug mode. We need to import connection at runtime as this is used in the wsgi handlers for the API endpoints and django settings are not available at import time there. """ from django.db import connection if settings.DEBUG and len(connection.queries) > 0: total_time = 0 output = "\033[1;31m[Request Started: %s]\033[0m\n" % (path) for query in connection.queries: total_time += float(query.get('time')) output = output + "\033[1;31m[%s]\033[0m \033[1m%s\033[0m\n" % ( query.get('time'), " ".join(query['sql'].split())) output = output + "\033[1;31m[Request Finished: %s queries in %s seconds] \ \033[0m" % (len(connection.queries), total_time) print output.encode('utf-8')
def __call__(self, request): # get number of db queries n = len(connection.queries) # time the view response = self.get_response(request) if settings.DEBUG == False and not n: return response # compute the db time for the queries just run db_queries = len(connection.queries) - n if db_queries: db_time = reduce(add, [float(q['time']) for q in connection.queries[n:]]) else: db_time = 0.0 if 'text/html' in response.get('Content-Type', ''): response.content = response.content.replace('<p id="response-time-db">', '<p class="bg-info">Database found took %s' % str(db_time)) return response
def test_traverse_GFK(self): """ Test that we can traverse a 'content_object' with prefetch_related() and get to related objects on the other side (assuming it is suitably filtered) """ TaggedItem.objects.create(tag="awesome", content_object=self.book1) TaggedItem.objects.create(tag="awesome", content_object=self.book2) TaggedItem.objects.create(tag="awesome", content_object=self.book3) TaggedItem.objects.create(tag="awesome", content_object=self.reader1) TaggedItem.objects.create(tag="awesome", content_object=self.reader2) ct = ContentType.objects.get_for_model(Book) # We get 3 queries - 1 for main query, 1 for content_objects since they # all use the same table, and 1 for the 'read_by' relation. with self.assertNumQueries(3): # If we limit to books, we know that they will have 'read_by' # attributes, so the following makes sense: qs = TaggedItem.objects.filter(content_type=ct, tag='awesome').prefetch_related('content_object__read_by') readers_of_awesome_books = {r.name for tag in qs for r in tag.content_object.read_by.all()} self.assertEqual(readers_of_awesome_books, {"me", "you", "someone"})
def setup_sending_before_clearing_queries_log_signal(): class SignalSendingBeforeClearingQueriesProxy(DelegatingProxy): def clear(self): before_clearing_queries_log.send(sender=None, queries=tuple(self)) self.wrapped.clear() connection.queries_log = SignalSendingBeforeClearingQueriesProxy( connection.queries_log)
def __init__(self, queries, name): self.queries = queries super(QueryCountResult, self).__init__( name=name, value=self.number_of_queries)
def number_of_queries(self): return len(self.queries)
def __enter__(self): self.queries = [] self.nr_of_queries_when_entering = len(connection.queries) self.orig_force_debug_cursor = connection.force_debug_cursor connection.force_debug_cursor = True before_clearing_queries_log.connect( self.queries_about_to_be_reset_handler) return self
def queries_about_to_be_reset_handler(self, signal, sender, queries, **kwargs): self.store_queries() self.nr_of_queries_when_entering = 0
def store_queries(self): self.queries += connection.queries[self.nr_of_queries_when_entering:]
def lsql(): print(connection.queries[-1])
def debug(request): """ Returns context variables helpful for debugging. """ context_extras = {} if settings.DEBUG and request.META.get('REMOTE_ADDR') in settings.INTERNAL_IPS: context_extras['debug'] = True from django.db import connection # Return a lazy reference that computes connection.queries on access, # to ensure it contains queries triggered after this function runs. context_extras['sql_queries'] = lazy(lambda: connection.queries, list) return context_extras
def test_fetch(data): settings.DEBUG = True base_queries = len(connection.queries) book = Book.objects.first() assert len(connection.queries) - base_queries == 1 author = book.fetch_author() assert len(connection.queries) - base_queries == 2 assert author.id == book.author_id
def test_queryset_methods2(data): settings.DEBUG = True base_queries = len(connection.queries) book_qs = Book.objects.all() assert len(connection.queries) - base_queries == 0 book_list = book_qs.to_list() assert len(connection.queries) - base_queries == 1 book_qs.to_list() assert len(connection.queries) - base_queries == 2
def process_response(self, request, response): if settings.DEBUG: for query in connection.queries: print( "\033[1;31m[%s]\033[0m \033[1m%s\033[0m" % (query['time'], " ".join(query['sql'].split()))) return response
def log_new_queries(queries, debug_logger=None): from django.db import connection debug_logger = debug_logger or logger new_queries = list(connection.queries[queries:]) for query in new_queries: debug_logger.debug(query["time"]) debug_logger.debug("\t%s", query["sql"])
def debug_sql(debug_logger=None): from django.conf import settings from django.db import connection debug = settings.DEBUG settings.DEBUG = True queries = len(connection.queries) try: yield finally: log_new_queries( queries, debug_logger) settings.DEBUG = debug
def post(self, request): data = SignInSerializer().load_data(request.POST) user = get_user_model().objects.filter(email=data['email']).first() if user is None or not user.check_password(data['password']): raise AuthenticationError() token = AuthToken.objects.create(user_id=user.id) print(connection.queries) return ApiResponse({'token': token.value})
def _measure_scaling(self, create_n, measured_resource, scaled_resource = None): """ :param create_n: Function to create N of scaled_resource :param measured_resource: The resource we will measure the query load for :param scaled_resource: The object which is actually being scaled with N :return: Instance of Order1, OrderN, OrderBad """ if scaled_resource is None: scaled_resource = measured_resource query_counts = {} samples = [5, 6, 7, 8] for n in samples: ObjectCache.clear() create_n(n) # Queries get reset at the start of a request self.assertEqual(scaled_resource._meta.queryset.count(), n) response = self.api_client.get("/api/%s/" % measured_resource._meta.resource_name, data = {'limit': 0}) self.assertEqual(response.status_code, 200, "%s:%s" % (response.content, measured_resource._meta.resource_name)) query_count = len(connection.queries) self.assertEqual(len(self.deserialize(response)['objects']), measured_resource._meta.queryset.count()) query_counts[n] = query_count # Ignore samples[0], it was just to clear out any setup overhead from first call to API # gradient between samples[1] and samples[2] grad1 = (query_counts[samples[2]] - query_counts[samples[1]]) / (samples[2] - samples[1]) # gradient between samples[2] and samples[3] grad2 = (query_counts[samples[3]] - query_counts[samples[2]]) / (samples[3] - samples[2]) if grad1 == 0 and grad2 == 0: # Hoorah, O(1) return Order1(query_counts[samples[3]]) elif grad1 > 0 and grad1 == grad2: # O(N) return OrderN(grad1) else: # Worse than O(N) return OrderBad()
def test_cached_hosts(self): instance = HostListMixin() instance.host_ids = json.dumps([self.hosts[1].id]) self.assertListEqual(instance.hosts, [self.hosts[1]]) db_hits = len(connection.queries) self.assertListEqual(instance.hosts, [self.hosts[1]]) self.assertEqual(db_hits, len(connection.queries))
def test_changing_hosts(self): instance = HostListMixin() instance.host_ids = json.dumps([self.hosts[1].id]) self.assertListEqual(instance.hosts, [self.hosts[1]]) db_hits = len(connection.queries) instance.host_ids = json.dumps([self.hosts[0].id]) self.assertListEqual(instance.hosts, [self.hosts[0]]) self.assertNotEqual(db_hits, len(connection.queries))
def assertQueries(*prefixes): "Assert the correct queries are efficiently executed for a block." count = len(connection.queries) yield for prefix, query in itertools.izip_longest(prefixes, connection.queries[count:]): assert prefix and query and query['sql'].startswith(prefix), (prefix, query) cursor = connection.cursor() cursor.execute('EXPLAIN ' + query['sql']) plan = ''.join(row for row, in cursor) assert prefix == 'INSERT' or 'Index Scan' in plan, (plan, query)
def test_locks_query_count(self): """Check that query count to pull in available jobs hasn't changed""" EXPECTED_QUERIES = 6 # but 3 are for setup host_ct_key = ContentType.objects.get_for_model( self.host.downcast()).natural_key() host_id = self.host.id # create 200 host ups and down jobs in 'pending' default state # key point is they are not in the 'complete' state. for job_num in xrange(200): if job_num % 2 == 0: RebootHostJob.objects.create(host=self.host) else: ShutdownHostJob.objects.create(host=self.host) # Loads up the caches, including the _lock_cache while should find # these jobs. js = JobScheduler() reset_queries() # Getting jobs here may incur a higher cost. js.available_jobs([(host_ct_key, host_id), ]) query_sum = len(connection.queries) self.assertGreaterEqual(query_sum, EXPECTED_QUERIES, "something changed with queries! " "got %s expected %s" % (query_sum, EXPECTED_QUERIES))
def test_locks_query_count(self): """Check that query count to pull in available jobs hasn't changed""" EXPECTED_QUERIES = 0 # object to be locked by jobs host_ct_key = ContentType.objects.get_for_model( self.host.downcast()).natural_key() host_id = self.host.id # create 200 host ups and down jobs in 'pending' default state # key point is they are not in the 'complete' state. for job_num in xrange(200): if job_num % 2 == 0: RebootHostJob.objects.create(host=self.host) else: ShutdownHostJob.objects.create(host=self.host) # Loads up the caches, including the _lock_cache while should find # these jobs. js = JobScheduler() reset_queries() # Getting jobs here may incur a higher cost. js.available_jobs([(host_ct_key, host_id), ]) query_sum = len(connection.queries) self.assertEqual(query_sum, EXPECTED_QUERIES, "something changed with queries! " "got %s expected %s" % (query_sum, EXPECTED_QUERIES))
def show_category(request, category_slug, template_name="catalog/category.html"): """ view for each individual category page """ category_cache_key = request.path c = cache.get(category_cache_key) if not c: c = get_object_or_404(Category.active, slug=category_slug) cache.set(category_cache_key, c, CACHE_TIMEOUT) products = c.product_set.filter(is_active=True) page_title = c.name meta_keywords = c.meta_keywords meta_description = c.meta_description from django.db import connection queries = connection.queries return render_to_response(template_name, locals(), context_instance=RequestContext(request))
def process_response(self, request, response): from sys import stdout if stdout.isatty(): for query in connection.queries: print "\033[1;31m[%s]\033[0m \033[1m%s\033[0m" % ( query['time'], " ".join(query['sql'].split())) return response
def render_queries(queries, sort): """ Returns a StringIO containing the formatted SQL queries. _sort_ is a field to sort by. """ output = StringIO() if sort == 'order': print >>output, " time query" for query in queries: print >>output, " %8s %s" % (query["time"], query["sql"]) return output def sorter(x, y): if sort == 'time': return cmp(x[1][1], y[1][1]) elif sort == 'queries': return cmp(x[1][0], y[1][0]) else: raise RuntimeError("Unknown sort: %s" % sort) print >>output, " queries time query" results = {} for query in queries: try: result = results[query["sql"]] result[0] += 1 result[1] += Decimal(query["time"]) except KeyError: results[query["sql"]] = [1, Decimal(query["time"])] results = sorted(results.iteritems(), cmp=sorter, reverse=True) for result in results: print >>output, " %8d %8.3f %s" % (result[1][0], result[1][1], result[0]) return output
def display_stats(request, stats, queries): """ Generate a HttpResponse of functions for a profiling run. _stats_ should contain a pstats.Stats of a hotshot session. _queries_ should contain a list of SQL queries. """ sort = [request.REQUEST.get('sort_first', 'time'), request.REQUEST.get('sort_second', 'calls')] format = request.REQUEST.get('format', 'print_stats') sort_first_buttons = RadioButtons('sort_first', sort[0], sort_categories) sort_second_buttons = RadioButtons('sort_second', sort[1], sort_categories) format_buttons = RadioButtons('format', format, (('print_stats', 'by function'), ('print_callers', 'by callers'), ('print_callees', 'by callees'))) output = render_stats(stats, sort, format) output.reset() output = [html.escape(unicode(line)) for line in output.readlines()] response = HttpResponse(mimetype='text/html; charset=utf-8') response.content = (stats_template % {'format_buttons': format_buttons, 'sort_first_buttons': sort_first_buttons, 'sort_second_buttons': sort_second_buttons, 'rawqueries' : b64encode(cPickle.dumps(queries)), 'rawstats': b64encode(pickle_stats(stats)), 'stats': "".join(output), 'url': request.path}) return response
def process_request(self, request): """ Setup the profiler for a profiling run and clear the SQL query log. If this is a resort of an existing profiling run, just return the resorted list. """ def unpickle(params): stats = unpickle_stats(b64decode(params.get('stats', ''))) queries = cPickle.loads(b64decode(params.get('queries', ''))) return stats, queries if request.method != 'GET' and \ not (request.META.get('HTTP_CONTENT_TYPE', request.META.get('CONTENT_TYPE', '')) in ['multipart/form-data', 'application/x-www-form-urlencoded']): return if (request.REQUEST.get('profile', False) and (settings.DEBUG == True or request.user.is_staff)): request.statsfile = tempfile.NamedTemporaryFile() params = request.REQUEST if (params.get('show_stats', False) and params.get('show_queries', '1') == '1'): # Instantly re-sort the existing stats data stats, queries = unpickle(params) return display_stats(request, stats, queries) elif (params.get('show_queries', False) and params.get('show_stats', '1') == '1'): stats, queries = unpickle(params) return display_queries(request, stats, queries) else: # We don't have previous data, so initialize the profiler request.profiler = hotshot.Profile(request.statsfile.name) reset_queries()
def process_response(self, request, response): """Finish profiling and render the results.""" profiler = getattr(request, 'profiler', None) if profiler: profiler.close() params = request.REQUEST stats = hotshot.stats.load(request.statsfile.name) queries = connection.queries if (params.get('show_queries', False) and params.get('show_stats', '1') == '1'): response = display_queries(request, stats, queries) else: response = display_stats(request, stats, queries) return response
def process_response(self, request, response): if settings.DEBUG: self.db_qcount = len(connection.queries) self.db_time += sum([float(q['time']) for q in connection.queries]) if 'text/html' in response.get('Content-Type', ''): soup = BeautifulSoup(response.content) if soup.body: tag = soup.new_tag('code', style='position: fixed; top: 0; left: 0px') tag.string = 'DB took: %s, DB queries count: %s' % (str(self.db_time), str(self.db_qcount)) soup.body.insert(0, tag) response.content = soup.prettify() return response
def render_queries(queries, sort): """ Returns a StringIO containing the formatted SQL queries. _sort_ is a field to sort by. """ output = StringIO() if sort == 'order': print >>output, " time query" for query in queries: print >>output, " %8s %s" % (query["time"], query["sql"]) return output if sort == 'time': def sorter(x, y): return cmp(x[1][1], y[1][1]) elif sort == 'queries': def sorter(x, y): return cmp(x[1][0], y[1][0]) else: raise RuntimeError("Unknown sort: %s" % sort) print >>output, " queries time query" results = {} for query in queries: try: result = results[query["sql"]] result[0] += 1 result[1] += Decimal(query["time"]) except KeyError: results[query["sql"]] = [1, Decimal(query["time"])] results = sorted(results.iteritems(), cmp=sorter, reverse=True) for result in results: print >>output, " %8d %8.3f %s" % ( result[1][0], result[1][1], result[0] ) return output
def display_stats(request, stats, queries): """ Generate a HttpResponse of functions for a profiling run. _stats_ should contain a pstats.Stats of a hotshot session. _queries_ should contain a list of SQL queries. """ sort = [ request.REQUEST.get('sort_first', 'time'), request.REQUEST.get('sort_second', 'calls') ] fmt = request.REQUEST.get('format', 'print_stats') sort_first_buttons = RadioButtons('sort_first', sort[0], sort_categories) sort_second_buttons = RadioButtons('sort_second', sort[1], sort_categories) format_buttons = RadioButtons('format', fmt, ( ('print_stats', 'by function'), ('print_callers', 'by callers'), ('print_callees', 'by callees') )) output = render_stats(stats, sort, fmt) output.reset() output = [html.escape(unicode(line)) for line in output.readlines()] response = HttpResponse(content_type='text/html; charset=utf-8') response.content = (stats_template % { 'format_buttons': format_buttons, 'sort_first_buttons': sort_first_buttons, 'sort_second_buttons': sort_second_buttons, 'rawqueries' : b64encode(cPickle.dumps(queries)), 'rawstats': b64encode(pickle_stats(stats)), 'stats': "".join(output), 'url': request.path }) return response
def process_request(self, request): """ Setup the profiler for a profiling run and clear the SQL query log. If this is a resort of an existing profiling run, just return the resorted list. """ def unpickle(params): stats = unpickle_stats(b64decode(params.get('stats', ''))) queries = cPickle.loads(b64decode(params.get('queries', ''))) return stats, queries if request.method != 'GET' and \ not (request.META.get( 'HTTP_CONTENT_TYPE', request.META.get('CONTENT_TYPE', '') ) in ['multipart/form-data', 'application/x-www-form-urlencoded']): return if (request.REQUEST.get('profile', False) and (settings.DEBUG == True or request.user.is_staff)): request.statsfile = tempfile.NamedTemporaryFile() params = request.REQUEST if (params.get('show_stats', False) and params.get('show_queries', '1') == '1'): # Instantly re-sort the existing stats data stats, queries = unpickle(params) return display_stats(request, stats, queries) elif (params.get('show_queries', False) and params.get('show_stats', '1') == '1'): stats, queries = unpickle(params) return display_queries(request, stats, queries) else: # We don't have previous data, so initialize the profiler request.profiler = hotshot.Profile(request.statsfile.name) reset_queries()
def api_hashfile_top_password(request, hashfile_id, N): if request.method == "POST": params = request.POST else: params = request.GET hashfile = get_object_or_404(Hashfile, id=hashfile_id) pass_count_list = Cracked.objects.raw("SELECT id, password, COUNT(*) AS count FROM Hashcat_cracked USE INDEX (hashfileid_id_index) WHERE hashfile_id=%s GROUP BY BINARY password ORDER BY count DESC LIMIT 10", [hashfile.id]) top_password_list = [] count_list = [] for item in pass_count_list: top_password_list.append(item.password) count_list.append(item.count) res = { "top_password_list": top_password_list, "count_list": count_list, } for query in connection.queries[-1:]: print(query["sql"]) print(query["time"]) return HttpResponse(json.dumps(res), content_type="application/json")
def process_response(self, request, response): if connection.queries: sys.stdout.write("SQL %s\n" % ('=' * 26)) for query in connection.queries: sys.stdout.write("[%s] %s\n" % (query['time'], query['sql'])) sys.stdout.write("%s\n" % ('=' * 30)) return response
def executemany(self, sql, param_list): """ Outputs a batch of SQL queries to an appstats trace """ self.start_appstats_recording() try: return super(CursorDebugWrapper, self).executemany(sql, param_list) finally: try: times = len(param_list) except TypeError: # param_list could be an iterator times = '?' sql = '{0} times: {1}'.format(times, sql) self.end_appstats_recording(sql)
def debug(_request): """Returns context variables helpful for debugging. Same as django.core.context_processors.debug, just without the check against INTERNAL_IPS.""" context_extras = {} if settings.DEBUG: context_extras['debug'] = True from django.db import connection context_extras['sql_queries'] = connection.queries return context_extras
def log_last_django_query(logger): """Debug logs the latest SQL query made by Django. Will only work if the DEBUG=True in the Django settings. :param logger: The logging.Logger object to use for logging. """ from nav.models import manage as _manage from django.db import connection if connection.queries: logger.debug("Last Django SQL query was: %s", connection.queries[-1]['sql'])