Python django.db.connection 模块,queries() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.connection.queries()

项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def get_sample_results(cls):
        read = """QUERY - 'SELECT "auth_user"."id" FROM "auth_group"'""" \
            """- PARAMS = ()"""
        write = """QUERY - 'UPDATE "auth_group" SET "name" = %s'""" \
            """- PARAMS = ('bar',)"""
        other = """QUERY - 'BEGIN TRANSACTION' - PARAMS = ()"""

        def to_query(sql):
            return {'sql': sql, 'time': '%.3f' % random.random()}

        def to_single_result(*sqls):
            qc = cls()
            qc.queries = [to_query(sql) for sql in sqls]
            return qc.get_results_to_send()

        return [
            to_single_result(*sqls)
            for sqls in [
                [read], [read, write], [read, read], [write, write],
                [other, other], [read, write, other]
            ]
        ]
项目:zing    作者:evernote    | 项目源码 | 文件源码
def test_debug_sql_logger(caplog, settings):
    from pootle_project.models import Project

    from django.db import connection

    settings.DEBUG = True

    queries = len(connection.queries)

    log_new_queries(queries)
    assert caplog.records == []

    # trigger some sql and log
    Project.objects.count()
    log_new_queries(queries)

    timing = caplog.records[0].message
    sql = caplog.records[1].message

    # match the timing, sql
    assert re.match("^\d+?\.\d+?$", timing)
    assert "SELECT COUNT" in sql
    assert "pootle_app_project" in sql
项目:zing    作者:evernote    | 项目源码 | 文件源码
def log_test_timing(debug_logger, timings, name, start):
    from django.db import connection

    time_taken = time.time() - start
    timings["tests"][name] = dict(
        slow_queries=[
            q for q
            in connection.queries
            if float(q["time"]) > 0],
        query_count=len(connection.queries),
        timing=time_taken)
    debug_logger.debug(
        "{: <70} {: <10} {: <10}".format(
            *(name,
              round(time_taken, 4),
              len(connection.queries))))
项目:zing    作者:evernote    | 项目源码 | 文件源码
def log_test_report(debug_logger, timings):
    debug_logger.debug(
        "%s\nTESTS END: %s",
        "=" * 80,
        datetime.now())
    total_time = time.time() - timings["start"]
    total_queries = sum(
        t["query_count"]
        for t
        in timings["tests"].values())
    if total_queries:
        avg_query_time = total_time / total_queries
        debug_logger.debug(
            "TESTS AVERAGE query time: %s",
            avg_query_time)
    debug_logger.debug(
        "TESTS TOTAL test time: %s",
        total_time)
    debug_logger.debug(
        "TESTS TOTAL queries: %s",
        total_queries)
    debug_logger.debug("%s\n" % ("=" * 80))
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def assertQueries(self, *prefixes):
        "Assert the correct queries are efficiently executed for a block."

        debug = connection.use_debug_cursor
        connection.use_debug_cursor = True

        count = len(connection.queries)
        yield

        if type(prefixes[0]) == int:
            assert prefixes[0] == len(connection.queries[count:])
        else:
            for prefix, query in itertools.izip_longest(prefixes, connection.queries[count:]):
                assert prefix and query and query['sql'].startswith(prefix), (prefix, query)

        connection.use_debug_cursor = debug
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def test_no_locks_query_count(self):
        """Check that query count to pull in available jobs hasn't changed

        If this test fails, consider changing the EXPECTED_QUERIES, or why
        it regressed.
        """

        EXPECTED_QUERIES = 0

        #  no jobs locking this object
        host_ct_key = ContentType.objects.get_for_model(
            self.host.downcast()).natural_key()
        host_id = self.host.id

        #  Loads up the caches
        js = JobScheduler()

        reset_queries()
        js.available_transitions([(host_ct_key, host_id, ), ])

        query_sum = len(connection.queries)
        self.assertEqual(query_sum, EXPECTED_QUERIES,
            "something changed with queries! "
            "got %s expected %s" % (query_sum, EXPECTED_QUERIES))
项目:djangolab    作者:DhiaTN    | 项目源码 | 文件源码
def query_statistic(func):
    @wraps(func)
    def func_wrapper(*args, **kwargs):
        query_count = len(connection.queries)
        time = 0.0
        result = func(*args, **kwargs)
        queries_number = len(connection.queries) - query_count
        performed_query_list = connection.queries[-queries_number:]
        for query in performed_query_list:
            if query['sql'] == 'BEGIN':
                queries_number -= 1  # ignore begin transaction
            else:
                time += float(query['time'])
        message = "[Statistics] : {total} queries performed in {time}s."
        print(message.format(total=queries_number, time=time))
        return result
    return func_wrapper
项目:grical    作者:wikical    | 项目源码 | 文件源码
def display_queries(request, stats, queries):
    """
    Generate a HttpResponse of SQL queries for a profiling run.

    _stats_ should contain a pstats.Stats of a hotshot session.
    _queries_ should contain a list of SQL queries.
    """
    sort = request.REQUEST.get('sort_by', 'time')
    sort_buttons = RadioButtons('sort_by', sort,
                                (('order', 'by order'),
                                 ('time', 'time'),
                                 ('queries', 'query count')))
    output = render_queries(queries, sort)
    output.reset()
    output = [html.escape(unicode(line))
              for line in output.readlines()]
    response = HttpResponse(mimetype='text/html; charset=utf-8')
    response.content = (queries_template %
                        {'sort_buttons': sort_buttons,
                         'num_queries': len(queries),
                         'queries': "".join(output),
                         'rawqueries' : b64encode(cPickle.dumps(queries)),
                         'rawstats': b64encode(pickle_stats(stats)),
                         'url': request.path})
    return response
项目:django-icekit    作者:ic-labs    | 项目源码 | 文件源码
def display_queries(request, stats, queries):
    """
    Generate a HttpResponse of SQL queries for a profiling run.

    _stats_ should contain a pstats.Stats of a hotshot session.
    _queries_ should contain a list of SQL queries.
    """
    sort = request.REQUEST.get('sort_by', 'time')
    sort_buttons = RadioButtons('sort_by', sort, (
        ('order', 'by order'), ('time', 'time'), ('queries', 'query count')
    ))
    output = render_queries(queries, sort)
    output.reset()
    output = [html.escape(unicode(line)) for line in output.readlines()]
    response = HttpResponse(mimetype='text/html; charset=utf-8')
    response.content = (queries_template % {
        'sort_buttons': sort_buttons,
        'num_queries': len(queries),
        'queries': "".join(output),
        'rawqueries' : b64encode(cPickle.dumps(queries)),
        'rawstats': b64encode(pickle_stats(stats)),
        'url': request.path
    })
    return response
项目:WebHashcat    作者:hegusung    | 项目源码 | 文件源码
def csv_masks(request, hashfile_id):
    hashfile = get_object_or_404(Hashfile, id=hashfile_id)

    # didn't found the correct way in pure django...
    res = Cracked.objects.raw("SELECT id, password_mask, COUNT(*) AS count FROM Hashcat_cracked USE INDEX (hashfileid_id_index) WHERE hashfile_id=%s GROUP BY password_mask ORDER BY count DESC", [hashfile.id])

    fp = tempfile.SpooledTemporaryFile(mode='w')
    csvfile = csv.writer(fp, quotechar='"', quoting=csv.QUOTE_ALL)
    for item in res:
        csvfile.writerow([item.count, item.password_mask])
    fp.seek(0)   # rewind the file handle

    csvfile_data = fp.read()

    for query in connection.queries[-1:]:
        print(query["sql"])
        print(query["time"])

    response = HttpResponse(csvfile_data, content_type='application/force-download') # mimetype is replaced by content_type for django 1.7
    response['Content-Disposition'] = 'attachment; filename=%s_masks.csv' % hashfile.name
    return response
项目:montage    作者:storyful    | 项目源码 | 文件源码
def log_sql_queries_to_console(path):
    """
        Logs SQL queries to terminal if in debug mode.
        We need to import connection at runtime as this is
        used in the wsgi handlers for the API endpoints and
        django settings are not available at import time there.
    """
    from django.db import connection
    if settings.DEBUG and len(connection.queries) > 0:
        total_time = 0
        output = "\033[1;31m[Request Started: %s]\033[0m\n" % (path)
        for query in connection.queries:
            total_time += float(query.get('time'))
            output = output + "\033[1;31m[%s]\033[0m \033[1m%s\033[0m\n" % (
                query.get('time'), " ".join(query['sql'].split()))
        output = output + "\033[1;31m[Request Finished: %s queries in %s seconds] \
        \033[0m" % (len(connection.queries), total_time)
        print output.encode('utf-8')
项目:studentsdb2    作者:trivvet    | 项目源码 | 文件源码
def __call__(self, request):

        # get number of db queries
        n = len(connection.queries)

        # time the view
        response = self.get_response(request)

        if settings.DEBUG == False and not n:
            return response
        # compute the db time for the queries just run
        db_queries = len(connection.queries) - n
        if db_queries:
            db_time = reduce(add, [float(q['time']) for q in connection.queries[n:]])
        else:
            db_time = 0.0

        if 'text/html' in response.get('Content-Type', ''):
            response.content = response.content.replace('<p id="response-time-db">', '<p class="bg-info">Database found took %s' % str(db_time))

        return response
项目:django_postgres_extensions    作者:primal100    | 项目源码 | 文件源码
def test_traverse_GFK(self):
        """
        Test that we can traverse a 'content_object' with prefetch_related() and
        get to related objects on the other side (assuming it is suitably
        filtered)
        """
        TaggedItem.objects.create(tag="awesome", content_object=self.book1)
        TaggedItem.objects.create(tag="awesome", content_object=self.book2)
        TaggedItem.objects.create(tag="awesome", content_object=self.book3)
        TaggedItem.objects.create(tag="awesome", content_object=self.reader1)
        TaggedItem.objects.create(tag="awesome", content_object=self.reader2)

        ct = ContentType.objects.get_for_model(Book)

        # We get 3 queries - 1 for main query, 1 for content_objects since they
        # all use the same table, and 1 for the 'read_by' relation.
        with self.assertNumQueries(3):
            # If we limit to books, we know that they will have 'read_by'
            # attributes, so the following makes sense:
            qs = TaggedItem.objects.filter(content_type=ct, tag='awesome').prefetch_related('content_object__read_by')
            readers_of_awesome_books = {r.name for tag in qs
                                        for r in tag.content_object.read_by.all()}
            self.assertEqual(readers_of_awesome_books, {"me", "you", "someone"})
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def setup_sending_before_clearing_queries_log_signal():
    class SignalSendingBeforeClearingQueriesProxy(DelegatingProxy):
        def clear(self):
            before_clearing_queries_log.send(sender=None, queries=tuple(self))
            self.wrapped.clear()

    connection.queries_log = SignalSendingBeforeClearingQueriesProxy(
        connection.queries_log)
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def __init__(self, queries, name):
        self.queries = queries
        super(QueryCountResult, self).__init__(
            name=name, value=self.number_of_queries)
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def number_of_queries(self):
        return len(self.queries)
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def __enter__(self):
        self.queries = []
        self.nr_of_queries_when_entering = len(connection.queries)
        self.orig_force_debug_cursor = connection.force_debug_cursor
        connection.force_debug_cursor = True
        before_clearing_queries_log.connect(
            self.queries_about_to_be_reset_handler)
        return self
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def queries_about_to_be_reset_handler(self,
                                          signal, sender, queries, **kwargs):
        self.store_queries()
        self.nr_of_queries_when_entering = 0
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def store_queries(self):
        self.queries += connection.queries[self.nr_of_queries_when_entering:]
项目:WPS-4th    作者:Fastcampus-WPS    | 项目源码 | 文件源码
def lsql():
    print(connection.queries[-1])
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def debug(request):
    """
    Returns context variables helpful for debugging.
    """
    context_extras = {}
    if settings.DEBUG and request.META.get('REMOTE_ADDR') in settings.INTERNAL_IPS:
        context_extras['debug'] = True
        from django.db import connection
        # Return a lazy reference that computes connection.queries on access,
        # to ensure it contains queries triggered after this function runs.
        context_extras['sql_queries'] = lazy(lambda: connection.queries, list)
    return context_extras
项目:django-strict    作者:kyle-eshares    | 项目源码 | 文件源码
def test_fetch(data):
    settings.DEBUG = True
    base_queries = len(connection.queries)

    book = Book.objects.first()
    assert len(connection.queries) - base_queries == 1
    author = book.fetch_author()
    assert len(connection.queries) - base_queries == 2
    assert author.id == book.author_id
项目:django-strict    作者:kyle-eshares    | 项目源码 | 文件源码
def test_queryset_methods2(data):
    settings.DEBUG = True
    base_queries = len(connection.queries)

    book_qs = Book.objects.all()
    assert len(connection.queries) - base_queries == 0
    book_list = book_qs.to_list()
    assert len(connection.queries) - base_queries == 1
    book_qs.to_list()
    assert len(connection.queries) - base_queries == 2
项目:blog    作者:tomming233    | 项目源码 | 文件源码
def process_response(self, request, response):
        if settings.DEBUG:
            for query in connection.queries:
                print(
                    "\033[1;31m[%s]\033[0m \033[1m%s\033[0m" % (query['time'], " ".join(query['sql'].split())))
        return response
项目:zing    作者:evernote    | 项目源码 | 文件源码
def log_new_queries(queries, debug_logger=None):
    from django.db import connection

    debug_logger = debug_logger or logger
    new_queries = list(connection.queries[queries:])
    for query in new_queries:
        debug_logger.debug(query["time"])
        debug_logger.debug("\t%s", query["sql"])
项目:zing    作者:evernote    | 项目源码 | 文件源码
def debug_sql(debug_logger=None):
    from django.conf import settings
    from django.db import connection

    debug = settings.DEBUG
    settings.DEBUG = True
    queries = len(connection.queries)
    try:
        yield
    finally:
        log_new_queries(
            queries,
            debug_logger)
        settings.DEBUG = debug
项目:ks-python-api    作者:Kriegspiel    | 项目源码 | 文件源码
def post(self, request):
        data = SignInSerializer().load_data(request.POST)
        user = get_user_model().objects.filter(email=data['email']).first()
        if user is None or not user.check_password(data['password']):
            raise AuthenticationError()
        token = AuthToken.objects.create(user_id=user.id)
        print(connection.queries)
        return ApiResponse({'token': token.value})
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def _measure_scaling(self, create_n, measured_resource, scaled_resource = None):
        """

        :param create_n: Function to create N of scaled_resource
        :param measured_resource: The resource we will measure the query load for
        :param scaled_resource: The object which is actually being scaled with N
        :return: Instance of Order1, OrderN, OrderBad
        """
        if scaled_resource is None:
            scaled_resource = measured_resource

        query_counts = {}
        samples = [5, 6, 7, 8]
        for n in samples:
            ObjectCache.clear()
            create_n(n)
            # Queries get reset at the start of a request
            self.assertEqual(scaled_resource._meta.queryset.count(), n)
            response = self.api_client.get("/api/%s/" % measured_resource._meta.resource_name, data = {'limit': 0})
            self.assertEqual(response.status_code, 200, "%s:%s" % (response.content, measured_resource._meta.resource_name))
            query_count = len(connection.queries)

            self.assertEqual(len(self.deserialize(response)['objects']), measured_resource._meta.queryset.count())
            query_counts[n] = query_count

        # Ignore samples[0], it was just to clear out any setup overhead from first call to API

        # gradient between samples[1] and samples[2]
        grad1 = (query_counts[samples[2]] - query_counts[samples[1]]) / (samples[2] - samples[1])
        # gradient between samples[2] and samples[3]
        grad2 = (query_counts[samples[3]] - query_counts[samples[2]]) / (samples[3] - samples[2])

        if grad1 == 0 and grad2 == 0:
            # Hoorah, O(1)
            return Order1(query_counts[samples[3]])
        elif grad1 > 0 and grad1 == grad2:
            # O(N)
            return OrderN(grad1)
        else:
            # Worse than O(N)
            return OrderBad()
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def test_cached_hosts(self):
        instance = HostListMixin()
        instance.host_ids = json.dumps([self.hosts[1].id])
        self.assertListEqual(instance.hosts, [self.hosts[1]])

        db_hits = len(connection.queries)
        self.assertListEqual(instance.hosts, [self.hosts[1]])
        self.assertEqual(db_hits, len(connection.queries))
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def test_changing_hosts(self):
        instance = HostListMixin()
        instance.host_ids = json.dumps([self.hosts[1].id])
        self.assertListEqual(instance.hosts, [self.hosts[1]])

        db_hits = len(connection.queries)
        instance.host_ids = json.dumps([self.hosts[0].id])
        self.assertListEqual(instance.hosts, [self.hosts[0]])
        self.assertNotEqual(db_hits, len(connection.queries))
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def assertQueries(*prefixes):
    "Assert the correct queries are efficiently executed for a block."
    count = len(connection.queries)
    yield
    for prefix, query in itertools.izip_longest(prefixes, connection.queries[count:]):
        assert prefix and query and query['sql'].startswith(prefix), (prefix, query)
        cursor = connection.cursor()
        cursor.execute('EXPLAIN ' + query['sql'])
        plan = ''.join(row for row, in cursor)
        assert prefix == 'INSERT' or 'Index Scan' in plan, (plan, query)
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def test_locks_query_count(self):
        """Check that query count to pull in available jobs hasn't changed"""

        EXPECTED_QUERIES = 6  # but 3 are for setup

        host_ct_key = ContentType.objects.get_for_model(
            self.host.downcast()).natural_key()
        host_id = self.host.id

        #  create 200 host ups and down jobs in 'pending' default state
        #  key point is they are not in the 'complete' state.
        for job_num in xrange(200):
            if job_num % 2 == 0:
                RebootHostJob.objects.create(host=self.host)
            else:
                ShutdownHostJob.objects.create(host=self.host)

        #  Loads up the caches, including the _lock_cache while should find
        #  these jobs.
        js = JobScheduler()

        reset_queries()

        #  Getting jobs here may incur a higher cost.
        js.available_jobs([(host_ct_key, host_id), ])

        query_sum = len(connection.queries)
        self.assertGreaterEqual(query_sum, EXPECTED_QUERIES,
            "something changed with queries! "
            "got %s expected %s" % (query_sum, EXPECTED_QUERIES))
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def test_locks_query_count(self):
        """Check that query count to pull in available jobs hasn't changed"""

        EXPECTED_QUERIES = 0

        #  object to be locked by jobs
        host_ct_key = ContentType.objects.get_for_model(
            self.host.downcast()).natural_key()
        host_id = self.host.id

        #  create 200 host ups and down jobs in 'pending' default state
        #  key point is they are not in the 'complete' state.
        for job_num in xrange(200):
            if job_num % 2 == 0:
                RebootHostJob.objects.create(host=self.host)
            else:
                ShutdownHostJob.objects.create(host=self.host)

        #  Loads up the caches, including the _lock_cache while should find
        #  these jobs.
        js = JobScheduler()

        reset_queries()

        #  Getting jobs here may incur a higher cost.
        js.available_jobs([(host_ct_key, host_id), ])

        query_sum = len(connection.queries)
        self.assertEqual(query_sum, EXPECTED_QUERIES,
            "something changed with queries! "
            "got %s expected %s" % (query_sum, EXPECTED_QUERIES))
项目:beg-django-e-commerce    作者:Apress    | 项目源码 | 文件源码
def show_category(request, category_slug, template_name="catalog/category.html"):
    """ view for each individual category page """
    category_cache_key = request.path
    c = cache.get(category_cache_key)
    if not c:
        c = get_object_or_404(Category.active, slug=category_slug)
        cache.set(category_cache_key, c, CACHE_TIMEOUT)
    products = c.product_set.filter(is_active=True)
    page_title = c.name
    meta_keywords = c.meta_keywords
    meta_description = c.meta_description

    from django.db import connection
    queries = connection.queries
    return render_to_response(template_name, locals(), context_instance=RequestContext(request))
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def process_response(self, request, response):
        from sys import stdout
        if stdout.isatty():
            for query in connection.queries:
                print "\033[1;31m[%s]\033[0m \033[1m%s\033[0m" % (
                    query['time'], " ".join(query['sql'].split()))

        return response
项目:grical    作者:wikical    | 项目源码 | 文件源码
def render_queries(queries, sort):
    """
    Returns a StringIO containing the formatted SQL queries.

    _sort_ is a field to sort by.
    """
    output = StringIO()
    if sort == 'order':
        print >>output, "     time query"
        for query in queries:
            print >>output, " %8s %s" % (query["time"], query["sql"])
        return output

    def sorter(x, y):
        if sort == 'time':
            return cmp(x[1][1], y[1][1])
        elif sort == 'queries':
            return cmp(x[1][0], y[1][0])
        else:
            raise RuntimeError("Unknown sort: %s" % sort)

    print >>output, "  queries     time query"
    results = {}
    for query in queries:
        try:
            result = results[query["sql"]]
            result[0] += 1
            result[1] += Decimal(query["time"])
        except KeyError:
            results[query["sql"]] = [1, Decimal(query["time"])]
    results = sorted(results.iteritems(), cmp=sorter, reverse=True)
    for result in results:
        print >>output, " %8d %8.3f %s" % (result[1][0],
                                           result[1][1],
                                           result[0])
    return output
项目:grical    作者:wikical    | 项目源码 | 文件源码
def display_stats(request, stats, queries):
    """
    Generate a HttpResponse of functions for a profiling run.

    _stats_ should contain a pstats.Stats of a hotshot session.
    _queries_ should contain a list of SQL queries.
    """
    sort = [request.REQUEST.get('sort_first', 'time'),
            request.REQUEST.get('sort_second', 'calls')]
    format = request.REQUEST.get('format', 'print_stats')
    sort_first_buttons = RadioButtons('sort_first', sort[0],
                                      sort_categories)
    sort_second_buttons = RadioButtons('sort_second', sort[1],
                                       sort_categories)
    format_buttons = RadioButtons('format', format,
                                  (('print_stats', 'by function'),
                                   ('print_callers', 'by callers'),
                                   ('print_callees', 'by callees')))
    output = render_stats(stats, sort, format)
    output.reset()
    output = [html.escape(unicode(line)) for line in output.readlines()]
    response = HttpResponse(mimetype='text/html; charset=utf-8')
    response.content = (stats_template %
                        {'format_buttons': format_buttons,
                         'sort_first_buttons': sort_first_buttons,
                         'sort_second_buttons': sort_second_buttons,
                         'rawqueries' : b64encode(cPickle.dumps(queries)),
                         'rawstats': b64encode(pickle_stats(stats)),
                         'stats': "".join(output),
                         'url': request.path})
    return response
项目:grical    作者:wikical    | 项目源码 | 文件源码
def process_request(self, request):
        """
    Setup the profiler for a profiling run and clear the SQL query log.

    If this is a resort of an existing profiling run, just return
    the resorted list.
    """
        def unpickle(params):
            stats = unpickle_stats(b64decode(params.get('stats', '')))
            queries = cPickle.loads(b64decode(params.get('queries', '')))
            return stats, queries

        if request.method != 'GET' and \
           not (request.META.get('HTTP_CONTENT_TYPE',
                                 request.META.get('CONTENT_TYPE', '')) in
                ['multipart/form-data', 'application/x-www-form-urlencoded']):
            return
        if (request.REQUEST.get('profile', False) and
            (settings.DEBUG == True or request.user.is_staff)):
            request.statsfile = tempfile.NamedTemporaryFile()
            params = request.REQUEST
            if (params.get('show_stats', False)
                and params.get('show_queries', '1') == '1'):
                # Instantly re-sort the existing stats data
                stats, queries = unpickle(params)
                return display_stats(request, stats, queries)
            elif (params.get('show_queries', False)
                  and params.get('show_stats', '1') == '1'):
                stats, queries = unpickle(params)
                return display_queries(request, stats, queries)
            else:
                # We don't have previous data, so initialize the profiler
                request.profiler = hotshot.Profile(request.statsfile.name)
                reset_queries()
项目:grical    作者:wikical    | 项目源码 | 文件源码
def process_response(self, request, response):
        """Finish profiling and render the results."""
        profiler = getattr(request, 'profiler', None)
        if profiler:
            profiler.close()
            params = request.REQUEST
            stats = hotshot.stats.load(request.statsfile.name)
            queries = connection.queries
            if (params.get('show_queries', False)
                and params.get('show_stats', '1') == '1'):
                response = display_queries(request, stats, queries)
            else:
                response = display_stats(request, stats, queries)
        return response
项目:studentsdb    作者:PyDev777    | 项目源码 | 文件源码
def process_response(self, request, response):
        if settings.DEBUG:
            self.db_qcount = len(connection.queries)
            self.db_time += sum([float(q['time']) for q in connection.queries])
            if 'text/html' in response.get('Content-Type', ''):
                soup = BeautifulSoup(response.content)
                if soup.body:
                    tag = soup.new_tag('code', style='position: fixed; top: 0; left: 0px')
                    tag.string = 'DB took: %s, DB queries count: %s' % (str(self.db_time), str(self.db_qcount))
                    soup.body.insert(0, tag)
                    response.content = soup.prettify()
        return response
项目:django-icekit    作者:ic-labs    | 项目源码 | 文件源码
def render_queries(queries, sort):
    """
    Returns a StringIO containing the formatted SQL queries.

    _sort_ is a field to sort by.
    """
    output = StringIO()
    if sort == 'order':
        print >>output, "     time query"
        for query in queries:
            print >>output, " %8s %s" % (query["time"], query["sql"])
        return output
    if sort == 'time':
        def sorter(x, y):
            return cmp(x[1][1], y[1][1])
    elif sort == 'queries':
        def sorter(x, y):
            return cmp(x[1][0], y[1][0])
    else:
        raise RuntimeError("Unknown sort: %s" % sort)
    print >>output, "  queries     time query"
    results = {}
    for query in queries:
        try:
            result = results[query["sql"]]
            result[0] += 1
            result[1] += Decimal(query["time"])
        except KeyError:
            results[query["sql"]] = [1, Decimal(query["time"])]
    results = sorted(results.iteritems(), cmp=sorter, reverse=True)
    for result in results:
        print >>output, " %8d %8.3f %s" % (
            result[1][0], result[1][1], result[0]
        )
    return output
项目:django-icekit    作者:ic-labs    | 项目源码 | 文件源码
def display_stats(request, stats, queries):
    """
    Generate a HttpResponse of functions for a profiling run.

    _stats_ should contain a pstats.Stats of a hotshot session.
    _queries_ should contain a list of SQL queries.
    """
    sort = [
        request.REQUEST.get('sort_first', 'time'),
        request.REQUEST.get('sort_second', 'calls')
    ]
    fmt = request.REQUEST.get('format', 'print_stats')
    sort_first_buttons = RadioButtons('sort_first', sort[0], sort_categories)
    sort_second_buttons = RadioButtons('sort_second', sort[1], sort_categories)
    format_buttons = RadioButtons('format', fmt, (
        ('print_stats', 'by function'),
        ('print_callers', 'by callers'),
        ('print_callees', 'by callees')
    ))
    output = render_stats(stats, sort, fmt)
    output.reset()
    output = [html.escape(unicode(line)) for line in output.readlines()]
    response = HttpResponse(content_type='text/html; charset=utf-8')
    response.content = (stats_template % {
        'format_buttons': format_buttons,
        'sort_first_buttons': sort_first_buttons,
        'sort_second_buttons': sort_second_buttons,
        'rawqueries' : b64encode(cPickle.dumps(queries)),
        'rawstats': b64encode(pickle_stats(stats)),
        'stats': "".join(output),
        'url': request.path
    })
    return response
项目:django-icekit    作者:ic-labs    | 项目源码 | 文件源码
def process_request(self, request):
        """
        Setup the profiler for a profiling run and clear the SQL query log.

        If this is a resort of an existing profiling run, just return
        the resorted list.
        """
        def unpickle(params):
            stats = unpickle_stats(b64decode(params.get('stats', '')))
            queries = cPickle.loads(b64decode(params.get('queries', '')))
            return stats, queries

        if request.method != 'GET' and \
           not (request.META.get(
               'HTTP_CONTENT_TYPE', request.META.get('CONTENT_TYPE', '')
           ) in ['multipart/form-data', 'application/x-www-form-urlencoded']):
            return
        if (request.REQUEST.get('profile', False) and
            (settings.DEBUG == True or request.user.is_staff)):
            request.statsfile = tempfile.NamedTemporaryFile()
            params = request.REQUEST
            if (params.get('show_stats', False)
                and params.get('show_queries', '1') == '1'):
                # Instantly re-sort the existing stats data
                stats, queries = unpickle(params)
                return display_stats(request, stats, queries)
            elif (params.get('show_queries', False)
                  and params.get('show_stats', '1') == '1'):
                stats, queries = unpickle(params)
                return display_queries(request, stats, queries)
            else:
                # We don't have previous data, so initialize the profiler
                request.profiler = hotshot.Profile(request.statsfile.name)
                reset_queries()
项目:django-icekit    作者:ic-labs    | 项目源码 | 文件源码
def process_response(self, request, response):
        """Finish profiling and render the results."""
        profiler = getattr(request, 'profiler', None)
        if profiler:
            profiler.close()
            params = request.REQUEST
            stats = hotshot.stats.load(request.statsfile.name)
            queries = connection.queries
            if (params.get('show_queries', False)
                and params.get('show_stats', '1') == '1'):
                response = display_queries(request, stats, queries)
            else:
                response = display_stats(request, stats, queries)
        return response
项目:WebHashcat    作者:hegusung    | 项目源码 | 文件源码
def api_hashfile_top_password(request, hashfile_id, N):
    if request.method == "POST":
        params = request.POST
    else:
        params = request.GET

    hashfile = get_object_or_404(Hashfile, id=hashfile_id)

    pass_count_list = Cracked.objects.raw("SELECT id, password, COUNT(*) AS count FROM Hashcat_cracked USE INDEX (hashfileid_id_index) WHERE hashfile_id=%s GROUP BY BINARY password ORDER BY count DESC LIMIT 10", [hashfile.id])

    top_password_list = []
    count_list = []
    for item in pass_count_list:
        top_password_list.append(item.password)
        count_list.append(item.count)

    res = {
        "top_password_list": top_password_list,
        "count_list": count_list,
    }

    for query in connection.queries[-1:]:
        print(query["sql"])
        print(query["time"])

    return HttpResponse(json.dumps(res), content_type="application/json")
项目:jiango    作者:yefei    | 项目源码 | 文件源码
def process_response(self, request, response):
        if connection.queries:
            sys.stdout.write("SQL %s\n" % ('=' * 26))
            for query in connection.queries:
                sys.stdout.write("[%s] %s\n" % (query['time'], query['sql']))
            sys.stdout.write("%s\n" % ('=' * 30))
        return response
项目:montage    作者:storyful    | 项目源码 | 文件源码
def executemany(self, sql, param_list):
        """
            Outputs a batch of SQL queries to an appstats trace
        """
        self.start_appstats_recording()
        try:
            return super(CursorDebugWrapper, self).executemany(sql, param_list)
        finally:
            try:
                times = len(param_list)
            except TypeError:           # param_list could be an iterator
                times = '?'

            sql = '{0} times: {1}'.format(times, sql)
            self.end_appstats_recording(sql)
项目:djanoDoc    作者:JustinChavez    | 项目源码 | 文件源码
def debug(request):
    """
    Returns context variables helpful for debugging.
    """
    context_extras = {}
    if settings.DEBUG and request.META.get('REMOTE_ADDR') in settings.INTERNAL_IPS:
        context_extras['debug'] = True
        from django.db import connection
        # Return a lazy reference that computes connection.queries on access,
        # to ensure it contains queries triggered after this function runs.
        context_extras['sql_queries'] = lazy(lambda: connection.queries, list)
    return context_extras
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def debug(_request):
    """Returns context variables helpful for debugging.

    Same as django.core.context_processors.debug, just without the check
    against INTERNAL_IPS."""
    context_extras = {}
    if settings.DEBUG:
        context_extras['debug'] = True
        from django.db import connection
        context_extras['sql_queries'] = connection.queries
    return context_extras
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def log_last_django_query(logger):
    """Debug logs the latest SQL query made by Django.

    Will only work if the DEBUG=True in the Django settings.

    :param logger: The logging.Logger object to use for logging.
    """
    from nav.models import manage as _manage
    from django.db import connection
    if connection.queries:
        logger.debug("Last Django SQL query was: %s",
                     connection.queries[-1]['sql'])