我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.cache.cache.delete()。
def delete(self, *args, **kwargs): directory = self.directory # Just doing a plain delete will collect all related objects in memory # before deleting: translation projects, stores, units, quality checks, # suggestions, and submissions. # This can easily take down a process. If we do a translation project # at a time and force garbage collection, things stay much more # managable. import gc gc.collect() for tp in self.translationproject_set.iterator(): tp.delete() gc.collect() super(Project, self).delete(*args, **kwargs) directory.delete()
def invalidate_resources_cache(**kwargs): instance = kwargs["instance"] if instance.__class__.__name__ not in ['Directory', 'Store']: return # Don't invalidate if the save didn't create new objects no_new_objects = ( ('created' in kwargs and 'raw' in kwargs) and (not kwargs['created'] or kwargs['raw'])) if no_new_objects and instance.parent.get_children(): return proj_code = split_pootle_path(instance.pootle_path)[1] if proj_code is not None: cache.delete(make_method_key(Project, 'resources', proj_code))
def single_instance_task(timeout): def task_exc(func): @functools.wraps(func) def wrapper(*args, **kwargs): lock_id = "celery-single-instance-" + func.__name__ def acquire_lock(): return cache.add(lock_id, "true", timeout) def release_lock(): return cache.delete(lock_id) if acquire_lock(): try: func(*args, **kwargs) finally: release_lock() return wrapper return task_exc
def business_area_handler(request, pk): if request.method == 'POST': data = json.loads(request.body) if pk is None: # New business area BusinessArea.objects.create(name=data['name']) return JsonResponse(ReturnStatus(True, 'OK').to_dict()) else: # existing business area update try: ba = BusinessArea.objects.get(pk=pk) ba.name = data['name'] ba.save() except BusinessArea.DoesNotExist: return JsonResponse(ReturnStatus(False, 'Key does not exist').to_dict()) return JsonResponse(ReturnStatus(True, 'OK').to_dict()) elif request.method == 'DELETE': try: ba = BusinessArea.objects.get(pk=pk) if ba.mission_set.all().count() != 0: return JsonResponse(ReturnStatus(False, 'Business Areas can not be deleted while missions are still associated with them.').to_dict()) ba.delete() except BusinessArea.DoesNotExist: return JsonResponse(ReturnStatus(False, 'Key does not exist').to_dict()) return JsonResponse(ReturnStatus(True, 'OK').to_dict())
def merge(self, profile): for account in profile.account_set.all(): account.profile = self if account.user_social_auth: account.user_social_auth.user = self.user account.user_social_auth.save() from package.models import Project change = False for project in Project.objects.filter(usage=profile.user): if self.user not in project.usage.all(): project.usage.add(self.user) change = True if change: cache.delete("sitewide_used_packages_list_{}".format(self.user.pk)) # TODO: add merge of verified_by, and email profile.user.delete() profile.delete()
def refresh_auth_tokens_from_cache(self): """Refresh the auth token and other values from cache""" if self.authToken is not None and time() + MAX_BUFFER < self.authTokenExpiration: # no need to refresh if the current tokens are still good return tokens = cache.get(self.token_cache_key) if tokens: if not isinstance(tokens, dict): # something wrong was cached cache.delete(self.token_cache_key) return for prop, value in tokens.items(): if prop in self.token_property_names: setattr(self, prop, value) # set the value so we can detect if it changed later self._old_authToken = self.authToken self.build_soap_client()
def delete_row(self, de_name, token=None, email=None): """ Delete a row from a data extension. Either token or email are required. @param de_name: name of the data extension @param token: user's token @param email: user's email address @return: None """ assert token or email, 'token or email required' if token: values = {'TOKEN': token} else: values = {'EMAIL_ADDRESS_': email} row = self._get_row_obj(de_name, values) resp = row.delete() assert_response(resp)
def transcode_video(public_video_id, delete=True): """ Args: public_video_id (str) delete (bool): delete video on failure """ with Lock('TASK_LOCK_TRANSCODE_VIDEO:' + public_video_id, 3600) as lock: if lock.is_acquired: try: models.invalidate_cache(public_video_id) _transcode_video(public_video_id, delete=delete) except Exception as e: # Store error message message = "\n".join([str(arg) for arg in e.args]) models.ProcessingState.objects.filter( video__public_id=public_video_id ).update( status=models.ProcessingState.STATUS_FAILED, message=message, ) raise finally: models.invalidate_cache(public_video_id)
def download_once(request, ref_key=None): cache_key = 'document-ref-{}'.format(ref_key) doc_id = cache.get(cache_key) if not doc_id: raise Http404() cache.delete(cache_key) doc = get_object_or_404(Document, pk=doc_id) response = FileResponse(doc.retrieve(request.user, 'view'), content_type=doc.mimetype) # Disable browser caching, so the PDF won't end up on the users hard disk. response['Cache-Control'] = 'no-cache, no-store, must-revalidate' response['Pragma'] = 'no-cache' response['Expires'] = '0' response['Vary'] = '*' return response
def all_nictags(cls, clear_cache=False): """Return dictionary with of nictags {name:type}""" if clear_cache: cache.delete(NICTAGS_ALL_KEY) nictags = cache.get(NICTAGS_ALL_KEY) if not nictags: nodes = cls.objects.all() nictags = {} for node in nodes: for nic in node.nictags: nic_name = nic['name'] nic_type = nic['type'].replace('aggr', 'normal') # if nictag name is already present and types are not equal if nic_name in nictags and nic_type != nictags[nic_name]: raise ValueError('Duplicate NIC tag name with different type exists on another compute node!') nictags[nic_name] = nic_type cache.set(NICTAGS_ALL_KEY, nictags, NICTAGS_ALL_EXPIRES) return nictags
def register(self, account, password, fullname, phone, captcha): if not account or not password or not fullname or not phone or not captcha: rsp = self.rsp_handler.generate_rsp_msg(29001, None) return rsp user = User.objects(account = account) if user: rsp = self.rsp_handler.generate_rsp_msg(21001, None) return rsp key = 'register:captcha:%s' % account if cache.get(key) != captcha: rsp = self.rsp_handler.generate_rsp_msg(21002, None) return rsp cache.delete(key) now_time = time.strftime('%Y-%m-%d %H:%M:%S') user = User(account = account, password = self.crypt_handler.encrypt(password), username = fullname, phone = phone, create_time = now_time) user.save() rsp = self.rsp_handler.generate_rsp_msg(200, None) return rsp
def remove_permission(self, name): if not name: rsp = self.rsp_handler.generate_rsp_msg(29001, None) return rsp permissions = Permission.objects(name = name) if not permissions: rsp = self.rsp_handler.generate_rsp_msg(22002, None) return rsp permission = permissions[0] permission.delete() roles = Role.objects() for role in roles: if name in role.permissions: role.permissions.remove(name) role.save() rsp = self.rsp_handler.generate_rsp_msg(200, None) return rsp
def redeem(self, user=None): if (user is None and self.redeemed) or self.accesspermissions.exists(): raise self.RedeemError('Already redeemed.') if timezone.now() > self.valid_until + timedelta(minutes=5 if self.redeemed else 0): raise self.RedeemError('No longer valid.') if user: with transaction.atomic(): if self.author_id and self.unique_key: AccessPermission.objects.filter(author_id=self.author_id, unique_key=self.unique_key).delete() for restriction in self.restrictions: AccessPermission.objects.create( user=user, access_restriction_id=restriction.pk, author_id=self.author_id, expire_date=restriction.expire_date, can_grant=self.can_grant, unique_key=self.unique_key, token=self if self.pk else None, ) if self.pk and not self.unlimited: self.redeemed = True self.save()
def save(self, *args, **kwargs): ''' Reset all cached infos ''' self.name = smart_capitalize(self.name_original) super(Exercise, self).save(*args, **kwargs) # Cached objects cache.delete(cache_mapper.get_exercise_muscle_bg_key(self)) # Cached template fragments for language in Language.objects.all(): delete_template_fragment_cache('muscle-overview', language.id) delete_template_fragment_cache('exercise-overview', language.id) delete_template_fragment_cache('exercise-overview-mobile', language.id) delete_template_fragment_cache('equipment-overview', language.id) # Cached workouts for set in self.set_set.all(): reset_workout_canonical_form(set.exerciseday.training_id)
def delete(self, *args, **kwargs): ''' Reset all cached infos ''' # Cached objects cache.delete(cache_mapper.get_exercise_muscle_bg_key(self)) # Cached template fragments for language in Language.objects.all(): delete_template_fragment_cache('muscle-overview', language.id) delete_template_fragment_cache('exercise-overview', language.id) delete_template_fragment_cache('exercise-overview-mobile', language.id) delete_template_fragment_cache('equipment-overview', language.id) # Cached workouts for set in self.set_set.all(): reset_workout_canonical_form(set.exerciseday.training.pk) super(Exercise, self).delete(*args, **kwargs)
def delete(self, *args, **kwargs): ''' Reset all cached infos ''' super(ExerciseImage, self).delete(*args, **kwargs) for language in Language.objects.all(): delete_template_fragment_cache('muscle-overview', language.id) delete_template_fragment_cache('exercise-overview', language.id) delete_template_fragment_cache('exercise-overview-mobile', language.id) delete_template_fragment_cache('equipment-overview', language.id) # Make sure there is always a main image if not ExerciseImage.objects.accepted() \ .filter(exercise=self.exercise, is_main=True).count() \ and ExerciseImage.objects.accepted() \ .filter(exercise=self.exercise) \ .filter(is_main=False) \ .count(): image = ExerciseImage.objects.accepted() \ .filter(exercise=self.exercise, is_main=False)[0] image.is_main = True image.save()
def remove_obj_perms_connected_with_user(sender, instance, **kwargs): """ Remove user's permissions upon user deletion (``pre_delete.connect``). :param sender: sender object :param instance: user instance :param kwargs: dictionary argument :return: None """ filters = Q(content_type=ContentType.objects.get_for_model(instance), object_pk=instance.pk) UserObjectPermission.objects.filter(filters).delete() GroupObjectPermission.objects.filter(filters).delete() if instance.profile: instance.profile.delete() if instance.preference: for i in instance.preference.all(): i.delete()
def page_changed(sender, **kwargs): """ post_save receiver for Page model: * clears Page-related cache keys, * refresh mappings: alias <-> page real url, * clears cache keys related to PageChanges. """ page = kwargs['instance'] cache_key = cachekeys.template_source(page.pk) cache.delete(cache_key) PageURLCache.refresh()
def sensu_client_list(): API_URL = settings.SENSU_API_URL + '/clients' userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii") headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Authorization' : 'Basic %s' % userAndPass } try: request = http.request('GET', API_URL, None, headers, preload_content=False) response = request.status if response == 200: reader = codecs.getreader('utf-8') data = json.load(reader(request)) request.release_conn() else: logger.error('response: %s' % str(response)) except: logger.error("sensu_client_list failed") raise subscriptions = [] [ r.delete(subscription) for subscription in r.keys("subscription_*") ] #[ cache.delete(client) for client in cache.keys("client_*") ] for object in data: cache.set('client_' + object['name'], object, timeout=settings.CACHE_CLIENT_TTL + 300) if 'subscriptions' in object: subscriptions.extend(object['subscriptions']) for subscription in object['subscriptions']: logger.debug("sensu_client_list update subscription_%s adding %s" % (subscription, object['name'])) r.rpush('subscription_' + subscription, object['name']) cache.set('subscriptions', list(set(subscriptions)), timeout=settings.CACHE_CLIENT_TTL + 300)
def user_rules(message): users_rules = {} try: for obj in Rule.objects.filter(owner=message['user_id']): if obj.owner.id not in users_rules: users_rules[obj.owner.id] = {} for status in obj.status: if int(status) not in users_rules[obj.owner.id]: users_rules[obj.owner.id][int(status)] = [] users_rules[obj.owner.id][int(status)].append(obj.regex_string) except: logger.error("user_rules failed") raise for user_id in users_rules: for status in users_rules[user_id]: uni_regex = '|'.join(users_rules[user_id][status]) logger.debug("user_rules build rule for user: %s status: %s" % (user_id, status)) r.set('regexrule_%s_%s' % (user_id, status), pickle.dumps(re.compile(r'(?:%s)' % uni_regex, re.IGNORECASE))) for status in [1,2]: if status not in users_rules[user_id]: logger.debug("user_rules clean rule for user: %s status: %s" % (user_id, status)) r.delete('regexrule_%s_%s' % (user_id, status)) return
def unlock(self): if not self.scratch: cache.delete(self.lock_id)
def delete_scratch_orgs(): reset_database_connection() from metaci.cumulusci.models import ScratchOrgInstance count = 0 for org in ScratchOrgInstance.objects.filter(deleted=False, delete_error__isnull=False): delete_scratch_org.delay(org.id) count += 1 if not count: return 'No orgs found to delete' return 'Scheduled deletion attempts for {} orgs'.format(count)
def delete_scratch_org(org_instance_id): reset_database_connection() from metaci.cumulusci.models import ScratchOrgInstance try: org = ScratchOrgInstance.objects.get(id=org_instance_id) except ScratchOrgInstance.DoesNotExist: return 'Failed: could not find ScratchOrgInstance with id {}'.format( org_instance_id) org.delete_org() if org.deleted: return 'Deleted org instance #{}'.format(org.id) else: return 'Failed to delete org instance #{}'.format(org.id)
def test_delete(self): # Cache keys can be deleted cache = self.cache cache.set("key1", "spam") cache.set("key2", "eggs") self.assertEqual(cache.get("key1"), "spam") cache.delete("key1") self.assertIsNone(cache.get("key1")) self.assertEqual(cache.get("key2"), "eggs")
def test_unicode(self): # Unicode values can be cached cache = self.cache stuff = { 'ascii': 'ascii_value', 'unicode_ascii': 'Iñtërnâtiônàlizætiøn1', 'Iñtërnâtiônàlizætiøn': 'Iñtërnâtiônàlizætiøn2', 'ascii2': {'x': 1} } # Test `set` for (key, value) in stuff.items(): cache.set(key, value) self.assertEqual(cache.get(key), value) # Test `add` for (key, value) in stuff.items(): cache.delete(key) cache.add(key, value) self.assertEqual(cache.get(key), value) # Test `set_many` for (key, value) in stuff.items(): cache.delete(key) cache.set_many(stuff) for (key, value) in stuff.items(): self.assertEqual(cache.get(key), value)
def test_cache_versioning_delete(self): cache = self.cache cache2 = LRUObjectCache('lru2', dict(VERSION=2)) cache2._cache = cache._cache cache.set('answer1', 37, version=1) cache.set('answer1', 42, version=2) cache.delete('answer1') self.assertIsNone(cache.get('answer1', version=1)) self.assertEqual(cache.get('answer1', version=2), 42) cache.set('answer2', 37, version=1) cache.set('answer2', 42, version=2) cache.delete('answer2', version=2) self.assertEqual(cache.get('answer2', version=1), 37) self.assertIsNone(cache.get('answer2', version=2)) cache.set('answer3', 37, version=1) cache.set('answer3', 42, version=2) cache2.delete('answer3') self.assertEqual(cache.get('answer3', version=1), 37) self.assertIsNone(cache.get('answer3', version=2)) cache.set('answer4', 37, version=1) cache.set('answer4', 42, version=2) cache2.delete('answer4', version=1) self.assertIsNone(cache.get('answer4', version=1)) self.assertEqual(cache.get('answer4', version=2), 42)
def save(self, *args, **kwargs): super(PermissionSet, self).save(*args, **kwargs) # FIXME: can we use `post_save` signals or invalidate caches in model # managers, please? key = iri_to_uri('Permissions:%s' % self.user.username) cache.delete(key)
def delete(self, *args, **kwargs): super(PermissionSet, self).delete(*args, **kwargs) # FIXME: can we use `post_delete` signals or invalidate caches in model # managers, please? key = iri_to_uri('Permissions:%s' % self.user.username) cache.delete(key)
def invalidate_accessible_projects_cache(**kwargs): instance = kwargs["instance"] # XXX: maybe use custom signals or simple function calls? if (instance.__class__.__name__ not in ['Project', 'TranslationProject', 'PermissionSet']): return cache.delete_pattern(make_method_key('Project', 'cached_dict', '*')) cache.delete('projects:all') cache.delete_pattern('projects:accessible:*')
def post(self, request, *args, **kwargs): # Delete cached data for the legends legend_top_key = make_template_fragment_key('legend_partial_top') cache.delete(legend_top_key) legend_bottom_key = make_template_fragment_key('legend_partial_bottom') cache.delete(legend_bottom_key) # Delete cached data for the host format string cache.delete('host_output_format_string') return super(UpdateDynamicSettingsView, self).post(request, *args, **kwargs)
def get_context_data(self, **kwargs): logger.debug('GET: DeleteMissionView (Confirm delete {mission_id})' .format(mission_id=self.object.id)) return super(DeleteMissionView, self).get_context_data(**kwargs)
def save(self, *args, **kwargs): # reset the last_updated and commits_over_52 caches on the package package = self.package cache.delete(package.cache_namer(self.package.last_updated)) cache.delete(package.cache_namer(package.commits_over_52)) self.package.last_updated() super(Commit, self).save(*args, **kwargs)
def save(self, *args, **kwargs): self.license = normalize_license(self.license) # reset the latest_version cache on the package cache_name = self.package.cache_namer(self.package.last_released) cache.delete(cache_name) get_version(self.package) # reset the pypi_version cache on the package cache_name = self.package.cache_namer(self.package.pypi_version) cache.delete(cache_name) get_pypi_version(self.package) super(Version, self).save(*args, **kwargs)
def invalidate_user_cache(sender, **kwargs): key = generate_cache_key(kwargs.get('instance')) cache.delete(key)
def invalidate_group_cache(sender, **kwargs): users = kwargs.get('instance').user_set.all() for user in users: key = generate_cache_key(user) cache.delete(key)
def invalidate_user_m2m_cache(sender, **kwargs): key = generate_cache_key(kwargs.get('instance')) cache.delete(key)
def ready(self): from django.shortcuts import resolve_url from django.core.cache import cache from libs.js_storage import JS_STORAGE cache.delete('attachable_block_types') JS_STORAGE.update({ 'ajax_attached_block': resolve_url('blocks:ajax'), })
def clear_newsletter_cache(*args, **kwargs): cache.delete(CACHE_KEY)
def clear_sms_cache(*args, **kwargs): cache.delete(SMS_CACHE_KEY)
def delete_user(modeladmin, request, queryset): for world in queryset: user = User.objects.get(id=world.worldid) user.is_active = False user.save() world.delete()
def ban_user(modeladmin, request, queryset): for world in queryset: banip = world.lastloggedinip user = User.objects.get(id=world.worldid) user.is_active = False user.save() world.delete() try: Ban.objects.create(address=banip, reason='Multying') except: pass
def clear_ban(modeladmin, request, queryset): for ban in queryset: cache.delete('BAN:'+ban.address) ban.delete()
def invalidate(public_video_id): cache.delete(_cache_key(public_video_id))
def release_lock(name): """ Release a lock for all. Note that the lock will be released even if it was never acquired. """ # Note that in unit tests, and in case the wrapped code raises an # IntegrityError, releasing the cache will result in a # TransactionManagementError. This is because unit tests run inside atomic # blocks. We cannot execute queries inside an atomic block if a transaction # needs to be rollbacked. try: cache.delete(name) except TransactionManagementError: logger.error("Could not release lock %s", name)
def transcode_video_restart(): with Lock('TASK_LOCK_TRANSCODE_VIDEO_RESTART', 60) as lock: if lock.is_acquired: for processing_state in models.ProcessingState.objects.filter(status=models.ProcessingState.STATUS_RESTART): send_task('transcode_video', args=(processing_state.video.public_id,), kwargs={'delete': False})
def clean_upload_urls(): """ Remove video upload urls which cannot be used anymore. """ models.VideoUploadUrl.objects.obsolete().delete()
def clean_old_cache_content(): """Clean CACHE data from old versions of django-modern-rpc""" cache.delete('__rpc_registry__', version=1)
def expire_json_cache(system_name): cache.delete('json_cache:%s' % system_name)
def cache_evict(sender, **kwargs): """ signal for updating a model instance in the cache; any Model class using this signal must have a uniquely identifying 'cache_key' property. """ item = kwargs.get('instance') cache.delete(item.cache_key)
def cache_meeting_page(timeout=60*10, render_timeout=15): def _decorator(fn): @wraps(fn) def _inner(*args, **kwargs): meeting_pk = kwargs.pop('meeting_pk') # getting the meeting from the database is an implicit permission check meeting = get_object_or_404(Meeting, pk=meeting_pk) kwargs['meeting'] = meeting cache_key = cache_key_for_meeting_page(meeting, fn) lock_key = cache_key + ':render-lock' while True: html = cache.get(cache_key) if html is None: if cache.add(lock_key, 'in-progress', render_timeout): break else: time.sleep(1) else: return HttpResponse(html) try: html = fn(*args, **kwargs) cache.set(cache_key, html, timeout) return HttpResponse(html) finally: cache.delete(lock_key) return _inner return _decorator