Python django.core.cache.cache 模块,set() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.cache.cache.set()

项目:sensu_drive    作者:ilavender    | 项目源码 | 文件源码
def onduty_members(self):

        OnDuty = []
        if 'OnDuty' in cache.keys('OnDuty'):
            OnDuty = cache.get('OnDuty')      
        else:
            try:
                event_start, event_end, instance = ScheduledOccurrence.objects.filter(event__in=ScheduledEvent.objects.filter(event=0)).next_occurrence()    
                NOW = datetime.datetime.now(datetime.timezone.utc).timestamp()
                if NOW >= event_start.timestamp() and NOW <= event_end.timestamp():
                    for user in instance.event.members_list():
                        OnDuty.append(user.pk)
                    logger.debug('onduty_members found: %s' % OnDuty)
                    #cache.set('OnDuty', OnDuty, timeout=event_end.timestamp())
                    cache.set('OnDuty', OnDuty, timeout=settings.ON_DUTY_CACHE_MEMBERS)
                else:
                    logger.debug('onduty_members can not find onduty_members')
            except:
                logger.error('onduty_members failed finding onduty_members')
                pass

        return OnDuty
项目:sensu_drive    作者:ilavender    | 项目源码 | 文件源码
def user_dnd(self, user_pk):

        if 'DnD_' + str(user_pk) in cache.keys("DnD_*"):
            #DnD = cache.get('DnD_' + str(user_pk))
            DnD = True        
        else:
            DnD = False
            try:
                event_start, event_end, instance = ScheduledOccurrence.objects.filter(event__in=ScheduledEvent.objects.filter(event=1, members__in=[user_pk])).next_occurrence()    
                NOW = datetime.datetime.now(datetime.timezone.utc).timestamp()
                if NOW >= event_start.timestamp() and NOW <= event_end.timestamp():
                    DnD = True
                    cache.set('DnD_' + str(user_pk), DnD, timeout=event_end.timestamp())
            except:
                pass

        return DnD
项目:django-lrucache-backend    作者:kogan    | 项目源码 | 文件源码
def test_binary_string(self):
        # Binary strings should be cacheable
        cache = self.cache
        from zlib import compress, decompress
        value = 'value_to_be_compressed'
        compressed_value = compress(value.encode())

        # Test set
        cache.set('binary1', compressed_value)
        compressed_result = cache.get('binary1')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())

        # Test add
        cache.add('binary1-add', compressed_value)
        compressed_result = cache.get('binary1-add')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())

        # Test set_many
        cache.set_many({'binary1-set_many': compressed_value})
        compressed_result = cache.get('binary1-set_many')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())
项目:django-lrucache-backend    作者:kogan    | 项目源码 | 文件源码
def test_long_timeout(self):
        """
        Followe memcached's convention where a timeout greater than 30 days is
        treated as an absolute expiration timestamp instead of a relative
        offset (#12399).
        """
        cache = self.cache
        cache.set('key1', 'eggs', 60 * 60 * 24 * 30 + 1)  # 30 days + 1 second
        self.assertEqual(cache.get('key1'), 'eggs')

        cache.add('key2', 'ham', 60 * 60 * 24 * 30 + 1)
        self.assertEqual(cache.get('key2'), 'ham')

        cache.set_many({'key3': 'sausage', 'key4': 'lobster bisque'}, 60 * 60 * 24 * 30 + 1)
        self.assertEqual(cache.get('key3'), 'sausage')
        self.assertEqual(cache.get('key4'), 'lobster bisque')
项目:django-lrucache-backend    作者:kogan    | 项目源码 | 文件源码
def test_forever_timeout(self):
        """
        Passing in None into timeout results in a value that is cached forever
        """
        cache = self.cache
        cache.set('key1', 'eggs', None)
        self.assertEqual(cache.get('key1'), 'eggs')

        cache.add('key2', 'ham', None)
        self.assertEqual(cache.get('key2'), 'ham')
        added = cache.add('key1', 'new eggs', None)
        self.assertIs(added, False)
        self.assertEqual(cache.get('key1'), 'eggs')

        cache.set_many({'key3': 'sausage', 'key4': 'lobster bisque'}, None)
        self.assertEqual(cache.get('key3'), 'sausage')
        self.assertEqual(cache.get('key4'), 'lobster bisque')
项目:project-status-dashboard    作者:cmheisel    | 项目源码 | 文件源码
def generate_dashboard():
    logger = logging.getLogger("dashboard.jobs.generate_dashboard")
    logger.info("Start")
    sheet_id = settings.GOOGLE_SPREADSHEET_ID
    data = sheets.load_sheet(sheet_id, settings.GOOGLE_SPREADSHEET_AUTH_FILE)
    for row in data:
        row.xtras = _add_target_date(row.xtras, row.xtras.get('_target_date'))
        if row.xtras.get('_jira_filter'):
            row.xtras = _add_current_jira_summary(row.xtras, row.xtras['_jira_filter'], logger)
        if row.xtras.get('jira_summary'):
            row.xtras = _add_week_ago_summary(row.xtras, row.xtras['jira_summary'], logger)
            row.xtras = _add_forecasts(row.xtras, row.xtras['jira_summary'], logger)
    cache.set('dashboard_data', data, None)
    cache.set('dashboard_data_updated', datetime.datetime.now(get_default_timezone()), None)
    logger.info("End")
    return True
项目:project-status-dashboard    作者:cmheisel    | 项目源码 | 文件源码
def ready(self):
        """Do this when the app is ready."""
        cache_key = 'dashboard.config.jobs_scheduled'
        scheduled = cache.get(cache_key, False)

        if not scheduled and settings.RQ_QUEUES.get('default', {}).get('ASYNC', True):
            logger.info("Scheduling jobs")
            scheduler = django_rq.get_scheduler('default')
            scheduler.schedule(
                scheduled_time=datetime.utcnow(),
                func='dashboard.jobs.generate_dashboard',
                interval=INTERVAL,
                result_ttl=INTERVAL + 30
            )
            cache.set(cache_key, True, INTERVAL)
        return True
项目:tecken    作者:mozilla-services    | 项目源码 | 文件源码
def test_client_task_tester(client, clear_redis_store):
    url = reverse('task_tester')

    def fake_task(key, value, expires):
        cache.set(key, value, expires)

    _mock_function = 'tecken.views.sample_task.delay'
    with mock.patch(_mock_function, new=fake_task):

        response = client.get(url)
        assert response.status_code == 400
        assert b'Make a POST request to this URL first' in response.content

        response = client.post(url)
        assert response.status_code == 201
        assert b'Now make a GET request to this URL' in response.content

        response = client.get(url)
        assert response.status_code == 200
        assert b'It works!' in response.content
项目:tecken    作者:mozilla-services    | 项目源码 | 文件源码
def task_tester(request):
    if request.method == 'POST':
        cache.set('marco', 'ping', 100)
        sample_task.delay('marco', 'polo', 10)
        return http.HttpResponse(
            'Now make a GET request to this URL\n',
            status=201,
        )
    else:
        if not cache.get('marco'):
            return http.HttpResponseBadRequest(
                'Make a POST request to this URL first\n'
            )
        for i in range(3):
            value = cache.get('marco')
            if value == 'polo':
                return http.HttpResponse('It works!\n')
            time.sleep(1)

        return http.HttpResponseServerError(
            'Tried 4 times (4 seconds) and no luck :(\n'
        )
项目:zing    作者:evernote    | 项目源码 | 文件源码
def resources(self):
        """Returns a list of :cls:`~pootle_app.models.Directory` and
        :cls:`~pootle_store.models.Store` resource paths available for
        this :cls:`~pootle_project.models.Project` across all languages.
        """
        cache_key = make_method_key(self, 'resources', self.code)
        resources = cache.get(cache_key, None)
        if resources is not None:
            return resources

        stores = Store.objects.live().order_by().filter(
            translation_project__project__pk=self.pk)
        dirs = Directory.objects.live().order_by().filter(
            pootle_path__regex=r"^/[^/]*/%s/" % self.code)
        resources = sorted(
            {to_tp_relative_path(pootle_path)
             for pootle_path
             in (set(stores.values_list("pootle_path", flat=True))
                 | set(dirs.values_list("pootle_path", flat=True)))},
            key=get_path_sortkey)
        cache.set(cache_key, resources, settings.POOTLE_CACHE_TIMEOUT)
        return resources

    # # # # # # # # # # # # # #  Methods # # # # # # # # # # # # # # # # # # #
项目:valhalla    作者:LCOGT    | 项目源码 | 文件源码
def get(self, request):
        try:
            last_query_time = parse(request.query_params.get('last_query_time'))
        except (TypeError, ValueError):
            last_query_time = cache.get('isDirty_query_time', (timezone.now() - timedelta(days=7)))

        url = settings.POND_URL + '/pond/pond/blocks/new/?since={}&using=default'.format(last_query_time.strftime('%Y-%m-%dT%H:%M:%S'))
        now = timezone.now()
        try:
            response = requests.get(url)
            response.raise_for_status()
        except Exception as e:
            return HttpResponseServerError({'error': repr(e)})

        pond_blocks = response.json()
        is_dirty = update_request_states_from_pond_blocks(pond_blocks)
        cache.set('isDirty_query_time', now, None)

        # also factor in if a change in requests (added, updated, cancelled) has occurred since we last checked
        last_update_time = max(Request.objects.latest('modified').modified,
                               UserRequest.objects.latest('modified').modified)
        is_dirty |= last_update_time >= last_query_time

        return Response({'isDirty': is_dirty})
项目:valhalla    作者:LCOGT    | 项目源码 | 文件源码
def __init__(self, start, end, telescopes=None, sites=None, instrument_types=None):
        try:
            self.es = Elasticsearch([settings.ELASTICSEARCH_URL])
        except LocationValueError:
            logger.error('Could not find host. Make sure ELASTICSEARCH_URL is set.')
            raise ImproperlyConfigured('ELASTICSEARCH_URL')

        self.instrument_types = instrument_types
        self.available_telescopes = self._get_available_telescopes()

        sites = list({tk.site for tk in self.available_telescopes}) if not sites else sites
        telescopes = list({tk.telescope for tk in self.available_telescopes if tk.site in sites}) \
            if not telescopes else telescopes

        self.start = start.replace(tzinfo=timezone.utc).replace(microsecond=0)
        self.end = end.replace(tzinfo=timezone.utc).replace(microsecond=0)
        cached_event_data = cache.get('tel_event_data')
        if cached_event_data:
            self.event_data = cached_event_data
        else:
            self.event_data = self._get_es_data(sites, telescopes)
            cache.set('tel_event_data', self.event_data, 1800)
项目:malvo    作者:shivamMg    | 项目源码 | 文件源码
def set_mcqs_in_cache():
    """
    Set MCQs in cache if they have changed or have not been set.
    """
    languages = {
        'C': 'c_mcqs',
        'J': 'java_mcqs',
    }

    # If MCQs have been changed or have not been created
    if not cache.get('mcqs_flag', False):
        for lang_code, cache_key in languages.items():
            mcqs_json = extract_mcqs(lang_code)
            cache.set(cache_key, mcqs_json)

        # Mark MCQs as unchanged
        cache.set('mcqs_flag', True)
项目:django-wechat-base    作者:ChanMo    | 项目源码 | 文件源码
def get_token(self):
        """Get wechat access token.Store in cache"""
        access_token = cache.get('wx_access_token')
        if access_token:
            return access_token
        else:
            param = {
                'grant_type': 'client_credential',
                'appid': self.appid,
                'secret': self.appsecret,
            }
            url = self.get_url('token', param)
            data = self.get_data(url)
            cache.set('wx_access_token', data['access_token'],\
                    int(data['expires_in']))
            return data['access_token']
项目:django-wechat-base    作者:ChanMo    | 项目源码 | 文件源码
def get_token(self):
        """Get wechat access token.Store in cache"""
        access_token = cache.get('wx_access_token')
        if access_token:
            return access_token
        else:
            param = {
                'grant_type': 'client_credential',
                'appid': self.appid,
                'secret': self.appsecret,
            }
            url = self.get_url('token', param)
            data = self.get_data(url)
            cache.set('wx_access_token', data['access_token'],\
                    int(data['expires_in']))
            return data['access_token']
项目:endorsementdb.com    作者:endorsementdb    | 项目源码 | 文件源码
def search_endorsers(request):
    query = request.GET.get('q')
    endorsers = []
    endorser_pks = set()
    if query:
        # First find the endorsers whose names start with this query.
        results = Endorser.objects.filter(name__istartswith=query)
        for endorser in results[:5]:
            endorser_pks.add(endorser.pk)
            endorsers.append(endorser)

        if results.count() < 5:
            results = Endorser.objects.filter(name__icontains=query)
            for endorser in results:
                if endorser.pk in endorser_pks:
                    continue

                endorsers.append(endorser)
                if len(endorsers) == 5:
                    break

    return JsonResponse({
        'endorsers': [{'pk': e.pk, 'name': e.name} for e in endorsers],
    })
项目:feincms3    作者:matthiask    | 项目源码 | 文件源码
def render(self, region, context, timeout=None):
        """render(self, region, context, *, timeout=None)
        Render a single region using the context passed

        If ``timeout`` is ``None`` caching is disabled.

        .. note::
           You should treat anything except for the ``region`` and ``context``
           argument as keyword-only.
        """
        if timeout is not None:
            key = self.cache_key(region)
            html = cache.get(key)
            if html is not None:
                return html

        html = mark_safe(''.join(
            self._renderer.render_plugin_in_context(plugin, context)
            for plugin in self._contents[region]
        ))

        if timeout is not None:
            cache.set(key, html, timeout=timeout)
        return html
项目:base_function    作者:Rockyzsu    | 项目源码 | 文件源码
def general_image(self, image_format='PNG'):
        fm_width = self.cleaned_data['width']
        fm_height = self.cleaned_data['height']

        key = '{}.{}.{}'.format(fm_width, fm_height, image_format)
        content = cache.get(key)
        if content is None:
            image = Image.new('RGB', (fm_width, fm_height), color=122)

            draw = ImageDraw.Draw(image)
            text = '{}x{}'.format(fm_width, fm_height)
            text_width, text_height = draw.textsize(text)

            if text_width < fm_width and text_height < fm_height:
                text_top = (fm_height - text_height) // 2
                text_left = (fm_width - text_width) // 2

                draw.text((text_top, text_left), text, fill=(255, 255, 255))

            content = BytesIO()
            image.save(content, image_format)
            content.seek(0)
            cache.set(key, content, 60 * 60)

        return content
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def test_custom_key_function(self):
        for key in ["foo-aa", "foo-ab", "foo-bb", "foo-bc"]:
            self.cache.set(key, "foo")

        res = self.cache.delete_pattern("*foo-a*")
        self.assertTrue(bool(res))

        keys = self.cache.keys("foo*")
        self.assertEqual(set(keys), set(["foo-bb", "foo-bc"]))
        # ensure our custom function was actually called
        try:
            self.assertEqual(set(k.decode('utf-8') for k in self.cache.raw_client.keys('*')),
                             set(['#1#foo-bc', '#1#foo-bb']))
        except (NotImplementedError, AttributeError):
            # not all clients support .keys()
            pass
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def test_setnx(self):
        # we should ensure there is no test_key_nx in redis
        self.cache.delete("test_key_nx")
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, None)

        res = self.cache.set("test_key_nx", 1, nx=True)
        self.assertTrue(res)
        # test that second set will have
        res = self.cache.set("test_key_nx", 2, nx=True)
        self.assertFalse(res)
        res = self.cache.get("test_key_nx")
        self.assertEqual(res, 1)

        self.cache.delete("test_key_nx")
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, None)
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def test_setnx_timeout(self):
        # test that timeout still works for nx=True
        res = self.cache.set("test_key_nx", 1, timeout=2, nx=True)
        self.assertTrue(res)
        time.sleep(3)
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, None)

        # test that timeout will not affect key, if it was there
        self.cache.set("test_key_nx", 1)
        res = self.cache.set("test_key_nx", 2, timeout=2, nx=True)
        self.assertFalse(res)
        time.sleep(3)
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, 1)

        self.cache.delete("test_key_nx")
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, None)
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def test_save_dict(self):
        if isinstance(self.cache.client._serializer,
                      json_serializer.JSONSerializer):
            self.skipTest("Datetimes are not JSON serializable")

        if isinstance(self.cache.client._serializer,
                      msgpack_serializer.MSGPackSerializer):
            # MSGPackSerializer serializers use the isoformat for datetimes
            # https://github.com/msgpack/msgpack-python/issues/12
            now_dt = datetime.datetime.now().isoformat()
        else:
            now_dt = datetime.datetime.now()

        test_dict = {"id": 1, "date": now_dt, "name": "Foo"}

        self.cache.set("test_key", test_dict)
        res = self.cache.get("test_key")

        self.assertIsInstance(res, dict)
        self.assertEqual(res["id"], 1)
        self.assertEqual(res["name"], "Foo")
        self.assertEqual(res["date"], now_dt)
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def test_timeout_parameter_as_positional_argument(self):
        self.cache.set("test_key", 222, -1)
        res = self.cache.get("test_key", None)
        self.assertIsNone(res)

        self.cache.set("test_key", 222, 1)
        res1 = self.cache.get("test_key", None)
        time.sleep(2)
        res2 = self.cache.get("test_key", None)
        self.assertEqual(res1, 222)
        self.assertEqual(res2, None)

        # nx=True should not overwrite expire of key already in db
        self.cache.set("test_key", 222, 0)
        self.cache.set("test_key", 222, -1, nx=True)
        res = self.cache.get("test_key", None)
        self.assertEqual(res, 222)
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def test_iter_keys(self):
        cache = caches["default"]
        _params = cache._params
        _is_shard = (_params["OPTIONS"]["CLIENT_CLASS"] ==
                     "django_redis.client.ShardClient")

        if _is_shard:
            return

        cache.set("foo1", 1)
        cache.set("foo2", 1)
        cache.set("foo3", 1)

        # Test simple result
        result = set(cache.iter_keys("foo*"))
        self.assertEqual(result, set(["foo1", "foo2", "foo3"]))

        # Test limited result
        result = list(cache.iter_keys("foo*", itersize=2))
        self.assertEqual(len(result), 3)

        # Test generator object
        result = cache.iter_keys("foo*")
        self.assertNotEqual(next(result), None)
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def test_sentinel_switching(self):
        if not isinstance(self.cache.client,
                          SentinelClient):
            self.skipTest("Not Sentinel clients use default master-slave setup")
        try:
            cache = caches["sample"]
            client = cache.client
            master = client.get_client(write=True)
            slave = client.get_client(write=False)

            master.set("Foo", "Bar")
            self.assertEqual(slave.get("Foo"), "Bar")
            self.assertEqual(master.info()['role'], "master")
            self.assertEqual(slave.info()['role'], "slave")
        except NotImplementedError:
            pass
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_user_by_id(user_id=0, use_cache=True):
        """
        ????????
        :param user_id: ???ID
        :param use_cache: ??????
        """
        key = CACHE_KEY+str(user_id)
        if use_cache:
            account = cache.get(key)
            if account:
                return account
        try:
            account = UserAccount.objects.get(id=user_id)
            cache.set(key, account, CACHE_TIME)
            return account
        except UserAccount.DoesNotExist:
            return None
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_token_by_user_id(user_id, use_cache=True):
        """
        ?????ID?????Token
        :param user_id: ??ID
        :param use_cache: ??????
        """
        key = CACHE_TOKEN_ID+str(user_id)
        if use_cache:
            token = cache.get(key)
            if token:
                return token
        try:
            token = AccessToken.objects.order_by("-id").filter(status=1).get(user_id=user_id)
            cache.set(key, token, CACHE_TIME)
            return token
        except AccessToken.DoesNotExist:
            return None
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_token(token, use_cache=True):
        """
        ????token??????
        """
        key = CACHE_TOKEN+token
        if use_cache:
            token_result = cache.get(key)
            if token_result:
                return token_result

        try:
            token = AccessToken.objects.order_by("-id").filter(status=1).get(access_token=token)
            cache.set(key, token, CACHE_TIME)
            return token
        except AccessToken.DoesNotExist:
            return None
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_user_meta_count(user_id, is_follow=True, use_cache=True):
        """
        ???????????????
        :param user_id: ???????ID
        :param is_follow: ???True???????????????
        :param use_cache: ??????
        """
        cache_key = CACHE_FANS_COUNT_KEY + str(user_id)
        if is_follow:
            cache_key = CACHE_FOLLOW_COUNT_KEY + str(user_id)

        if use_cache:
            count = cache.get(cache_key)
            if count:
                return count

        if is_follow:
            count = UserFollow.objects.filter(user_id=user_id, status=1).aggregate(Count("id"))
        else:
            count = UserFollow.objects.filter(follow_user=user_id, status=1).aggregate(Count("id"))

        count = count["id__count"]
        cache.set(cache_key, count, CACHE_COUNT_TIME)

        return count
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_format_info_by_user_id(user_id, use_cache=True):
        """
        ????ID??????
        :param user_id: ??ID
        :param use_cache: ??????
        """
        key = CACHE_KEY + str(user_id)
        if use_cache:
            result = cache.get(key)
            if result:
                return result

        try:
            user_info = UserInfo.objects.get(user_id=user_id)
            format_user_info = UserInfo.format_user_info(user_info)
            cache.set(key, format_user_info, CACHE_TIME)
            return format_user_info
        except UserInfo.DoesNotExist:
            return None
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_published_article_count(user_id, use_cache=True):
        """
        ??????????????
        :param user_id: ??ID
        :param use_cache: ??????
        """
        cache_key = CACHE_ARTICLE_COUNT + str(user_id)
        if use_cache:
            count = cache.get(cache_key)
            if count:
                return count

        count = BlogArticle.objects.filter(user_id=user_id, status=1).aggregate(Count("id"))
        count = count["id__count"]
        if count:
            cache.set(cache_key, count, CACHE_TIME)
        return count
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_article_by_id(article_id=0, use_cache=True):
        """
        ????ID???????
        :param article_id: ??ID
        :param use_cache: ??????
        """
        key = CACHE_KEY_ID + str(article_id)
        if use_cache:
            cache_result = cache.get(key)
            if cache_result:
                return cache_result

        try:
            article = BlogArticle.objects.get(id=article_id)
            article = BlogArticle.format_article(article)
            cache.set(key, article, CACHE_TIME)
            return article
        except BlogArticle.DoesNotExist:
            return None
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_top_list(order="hit", use_cache=True):
        """????"""
        key = CACHE_TOP_LIST_KEY+order

        if use_cache:
            top_list = cache.get(key)
            if top_list:
                return top_list

        try:
            # ????????10???
            top_list = BlogArticleMeta.objects.filter(status=1).order_by("-"+order)[:10]
            cache.set(key, top_list, CACHE_TOP_LIST_TIME)
            return top_list
        except BlogArticleMeta.DoesNotExist:
            return []
项目:steemprojects.com    作者:noisy    | 项目源码 | 文件源码
def commits_over_52(self):
        cache_name = self.cache_namer(self.commits_over_52)
        value = cache.get(cache_name)
        if value is not None:
            return value
        now = datetime.now()
        commits = self.commit_set.filter(
            commit_date__gt=now - timedelta(weeks=52),
        ).values_list('commit_date', flat=True)

        weeks = [0] * 52
        for cdate in commits:
            age_weeks = (now - cdate).days // 7
            if age_weeks < 52:
                weeks[age_weeks] += 1

        value = ','.join(map(str, reversed(weeks)))
        cache.set(cache_name, value)
        return value
项目:acacia_main    作者:AcaciaTrading    | 项目源码 | 文件源码
def prices_get(exchange, pair, num_prices=None, price_ratio=1, cached=True):
    filename = get_filename(exchange, pair)
    if cached == True:
        from django.core.cache import cache
        text = cache.get(filename)
    else:
        text = None

    if text == None:
        try:
            text = get_s3_text(filename)
            if cached == True:
                cache.set(filename, text, MAX_CACHE_TIME)
        except Exception as e:
            print str(e)
            return None

    result = json.loads(text)

    result = result[0::price_ratio]
    if num_prices != None:
        return result[:num_prices]
    else:
        return result
项目:django-eth-events    作者:gnosis    | 项目源码 | 文件源码
def setUp(self):
        # Run mocked testrpc for reorgs
        print('Starting httpd...')
        self.p = Process(target=start_mock_server)
        self.p.start()
        cache.set('block_number', '0x0')
        sleep(1)
        print('served')
        self.rpc = RPCProvider(
            host='127.0.0.1',
            port='8545',
            ssl=0
        )
        web3_service = Web3Service(self.rpc)
        self.web3 = web3_service.web3
        # Mock web3
        self.daemon = DaemonFactory()
项目:django-eth-events    作者:gnosis    | 项目源码 | 文件源码
def test_reorg_ok(self):
        # Last block hash haven't changed
        block_hash_0 = '{:040d}'.format(0)
        cache.set('0x0', block_hash_0)
        cache.set('block_number', '0x1')
        Block.objects.create(block_hash=block_hash_0, block_number=0, timestamp=0)
        Daemon.objects.all().update(block_number=0)
        (had_reorg, _) = check_reorg()
        self.assertFalse(had_reorg)

        block_hash_1 = '{:040d}'.format(1)
        cache.set('0x1', block_hash_1)
        cache.set('block_number', '0x2')
        Block.objects.create(block_hash=block_hash_1, block_number=1, timestamp=0)
        Daemon.objects.all().update(block_number=1)
        (had_reorg, _) = check_reorg()
        self.assertFalse(had_reorg)
项目:django-eth-events    作者:gnosis    | 项目源码 | 文件源码
def test_reorg_happened(self):
        # Last block hash haven't changed
        block_hash_0 = '{:040d}'.format(0)
        cache.set('0x0', block_hash_0)
        cache.set('block_number', '0x1')
        Block.objects.create(block_hash=block_hash_0, block_number=0, timestamp=0)
        Daemon.objects.all().update(block_number=0)
        (had_reorg, _) = check_reorg()
        self.assertFalse(had_reorg)

        # Last block hash changed
        block_hash_1 = '{:040d}'.format(1)
        cache.set('0x1', block_hash_1)
        cache.set('block_number', '0x2')
        block_hash_reorg = '{:040d}'.format(1313)
        Block.objects.create(block_hash=block_hash_reorg, block_number=1, timestamp=0)
        Daemon.objects.all().update(block_number=1)
        (had_reorg, block_number) = check_reorg()
        self.assertTrue(had_reorg)
        self.assertEqual(block_number, 0)

        Block.objects.filter(block_number=1).update(block_hash=block_hash_1, timestamp=0)
        (had_reorg, _) = check_reorg()
        self.assertFalse(had_reorg)
项目:django-eth-events    作者:gnosis    | 项目源码 | 文件源码
def test_reorg_mined_multiple_blocks_ok(self):
        # Last block hash haven't changed
        block_hash_0 = '{:040d}'.format(0)
        cache.set('0x0', block_hash_0)
        cache.set('block_number', '0x1')
        Block.objects.create(block_hash=block_hash_0, block_number=0, timestamp=0)
        Daemon.objects.all().update(block_number=0)
        (had_reorg, _) = check_reorg()
        self.assertFalse(had_reorg)

        # new block number changed more than one unit
        block_hash_1 = '{:040d}'.format(1)
        cache.set('0x1', block_hash_1) # set_mocked_testrpc_block_hash
        cache.set('block_number', '0x9')
        Block.objects.create(block_hash=block_hash_1, block_number=1, timestamp=0)
        Daemon.objects.all().update(block_number=1)
        (had_reorg, _) = check_reorg()
        self.assertFalse(had_reorg)
项目:prestashop-sync    作者:dragoon    | 项目源码 | 文件源码
def ajax_tag_autocomplete(request):
    """Offers a list of existing tags that match the specified query"""

    if 'q' in request.GET:
        q = request.GET['q']
        key = 'ajax_tag_auto_%s' % q
        response = cache.get(key)

        if response is not None:
            return response

        tags = list(Tag.objects.filter(name__istartswith=q)[:10])
        response = HttpResponse(u'\n'.join(tag.name for tag in tags))
        cache.set(key, response, 300)

        return response

    return HttpResponse()
项目:django-open-volunteering-platform    作者:OpenVolunteeringPlatform    | 项目源码 | 文件源码
def cached(func):
  """ Used to decorate .get_queryset method on search viewsets.

      Caches a queryset.
  """
  @wraps(func)
  def _impl(self, *args, **kwargs):
    key = self.get_cache_key()
    ttl = 120

    result = None
    if key:
      result = cache.get(key)

    if result:
      return result

    result = func(self, *args, **kwargs)
    cache.set(key, result, ttl)

    return result
  return _impl
项目:open-ledger    作者:creativecommons    | 项目源码 | 文件源码
def about(request):
    """Information about the current site, its goals, and what content is loaded"""
    # Provider counts
    providers = cache.get_or_set(CACHE_STATS_NAME, [], CACHE_STATS_DURATION)
    if not providers:
        for provider in sorted(settings.PROVIDERS.keys()):
            s = Search()
            q = Q('term', provider=provider)
            s = s.query(q)
            response = s.execute()
            if response.hits.total > 0:
                data = settings.PROVIDERS[provider]
                total = intcomma(response.hits.total)
                data.update({'hits': total})
                providers.append(data)
        # All results
        s = Search()
        response = s.execute()
        total = intcomma(response.hits.total)
        providers.append({'display_name': 'Total', 'hits': total})
        cache.set(CACHE_STATS_NAME, providers)
    return render(request, "about.html", {'providers': providers})
项目:basket    作者:mozmeao    | 项目源码 | 文件源码
def get_sf_session(force=False):
    if force:
        session_info = None
    else:
        session_info = cache.get(SFDC_SESSION_CACHE_KEY)

    if session_info is None:
        statsd.incr('news.backends.sfdc.session_refresh')
        session_id, sf_instance = sfapi.SalesforceLogin(**settings.SFDC_SETTINGS)
        session_info = {
            'id': session_id,
            'instance': sf_instance,
            'expires': time() + settings.SFDC_SESSION_TIMEOUT,
        }
        cache.set(SFDC_SESSION_CACHE_KEY, session_info, settings.SFDC_SESSION_TIMEOUT)

    return session_info
项目:basket    作者:mozmeao    | 项目源码 | 文件源码
def refresh_auth_tokens_from_cache(self):
        """Refresh the auth token and other values from cache"""
        if self.authToken is not None and time() + MAX_BUFFER < self.authTokenExpiration:
            # no need to refresh if the current tokens are still good
            return

        tokens = cache.get(self.token_cache_key)
        if tokens:
            if not isinstance(tokens, dict):
                # something wrong was cached
                cache.delete(self.token_cache_key)
                return

            for prop, value in tokens.items():
                if prop in self.token_property_names:
                    setattr(self, prop, value)

            # set the value so we can detect if it changed later
            self._old_authToken = self.authToken
            self.build_soap_client()
项目:kolibri    作者:learningequality    | 项目源码 | 文件源码
def _cache_kolibri_studio_channel_request(self, identifier=None):
        cache_key = get_channel_lookup_url(identifier=identifier)

        # cache channel lookup values
        if cache.get(cache_key):
            return Response(cache.get(cache_key))

        resp = requests.get(cache_key)

        # always check response code of request and set cache
        if resp.status_code == 404:
            raise Http404(
                _("The requested channel does not exist on the content server")
            )

        kolibri_mapped_response = []
        for channel in resp.json():
            kolibri_mapped_response.append(self._studio_response_to_kolibri_response(channel))

        cache.set(cache_key, kolibri_mapped_response, 5)

        return Response(kolibri_mapped_response)
项目:django-authlib    作者:matthiask    | 项目源码 | 文件源码
def get_authentication_url(self):
        oauth = OAuth1Session(
            self.client_id,
            client_secret=self.client_secret,
        )
        token = oauth.fetch_request_token(self.request_token_url)
        cache.set(
            'oa-token-%s' % token['oauth_token'],
            token,
            timeout=3600)
        self._request.session['oa_token'] = token['oauth_token']

        authorization_url = oauth.authorization_url(
            self.authorization_base_url,
        )

        return authorization_url
项目:ecs    作者:ecs-org    | 项目源码 | 文件源码
def timetable(self):
        duration = timedelta(seconds=0)
        users_by_entry_id = {}
        users_by_id = {}
        for user in self.users_with_constraints:
            users_by_id[user.id] = user
        entries = list()
        participations = (Participation.objects
            .filter(entry__meeting=self)
            .select_related('user')
            .order_by('user__username')
        )
        for participation in participations:
            users_by_entry_id.setdefault(participation.entry_id, set()).add((users_by_id.get(participation.user_id), participation.ignored_for_optimization))
        for entry in self.timetable_entries.filter(timetable_index__isnull=False).select_related('submission').order_by('timetable_index'):
            entry.users = users_by_entry_id.get(entry.id, set())
            entry.has_ignored_participations = any(ignored for user, ignored in entry.users)
            entry.start = self.start + duration
            duration += entry.duration
            entry.end = self.start + duration
            entries.append(entry)
        return tuple(entries), set(users_by_id.values())
项目:heltour    作者:cyanfish    | 项目源码 | 文件源码
def _login_cookies():
    login_cookies = cache.get('lichess_login_cookies')
    if login_cookies is None:
        # Read the credentials
        with open(settings.LICHESS_CREDS_FILE_PATH) as creds_file:
            lines = creds_file.readlines()
            creds = {'username': lines[0].strip(), 'password': lines[1].strip()}

        # Send a login request
        login_response = requests.post(settings.LICHESS_DOMAIN + 'login', data=creds, headers=_headers)
        if login_response.status_code != 200:
            logger.error('Received status %s when trying to log in to lichess' % login_response.status_code)
            return None

        # Save the cookies
        login_cookies = dict(login_response.cookies)
        cache.set('lichess_login_cookies', login_cookies, 60) # Cache cookies for 1 minute
    return login_cookies

# Sends a mail on lichess
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def get_token(self, token_only=True, scopes=None):
        if scopes is None:
            scopes = ['send_notification', 'view_room']

        cache_key = 'hipchat-tokens:%s:%s' % (self.id, ','.join(scopes))

        def gen_token():
            data = {
                'grant_type': 'client_credentials',
                'scope': ' '.join(scopes),
            }
            resp = requests.post(
                self.token_url, data=data, auth=HTTPBasicAuth(self.id, self.secret), timeout=10
            )
            if resp.status_code == 200:
                return resp.json()
            elif resp.status_code == 401:
                raise OauthClientInvalidError(self)
            else:
                raise Exception('Invalid token: %s' % resp.text)

        if token_only:
            token = cache.get(cache_key)
            if not token:
                data = gen_token()
                token = data['access_token']
                cache.set(cache_key, token, data['expires_in'] - 20)
            return token
        return gen_token()
项目:django-powerpages    作者:Open-E-WEB    | 项目源码 | 文件源码
def process_request(self, request, extra_context=None):
        """Main page processing logic"""
        context = self.get_rendering_context(request)
        if extra_context:
            context.update(extra_context)
        cache_key, seconds = self.get_cache_settings(request)
        if cache_key:
            content = cache.get(cache_key)
            if content is None:
                content = self.render(context)
                cache.set(cache_key, content, seconds)
        else:
            content = self.render(context)
        return self.create_response(request, content)