我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.utils.timezone.now()。
def make_inactive_productlist_query(queryset): now = timezone.now() # Create a query of things are definitively inactive. Some of the ones # filtered here might be out of stock, but we include that later. inactive_candidates = ( queryset .exclude( Q(active=True) & (Q(deactivate_date=None) | Q(deactivate_date__gte=now))) .values("id") ) inactive_out_of_stock = ( queryset .filter(sale__timestamp__gt=F("start_date")) .annotate(c=Count("sale__id")) .filter(c__gte=F("quantity")) .values("id") ) return ( queryset .filter( Q(id__in=inactive_candidates) | Q(id__in=inactive_out_of_stock)) )
def send(self, request): if (not self.last_email) or self.last_email + timedelta(hours=12) < now(): # TODO: TIMEDELTA mit config old_lang = translation.get_language() translation.activate(self.user.language) link = reverse('poll_vote', args=(self.poll.url,)) # TODO: hier direkt das poll oder das Vote? email_content = render_to_string('invitations/mail_invite.txt', { 'receiver': self.user.username, 'creator': self.creator.username, 'link': link }) try: send_mail("Invitation to vote on {}".format(self.poll.title), email_content, None, [self.user.email]) self.last_email = now() self.save() except SMTPRecipientsRefused: translation.activate(old_lang) messages.error( request, _("The mail server had an error sending the notification to {}".format(self.user.username)) ) translation.activate(old_lang) else: messages.error( request, _("You have send an Email for {} in the last 12 Hours".format(self.user.username)) )
def razzia_wizard(request): if request.method == 'POST': return redirect( reverse("razzia_view") + "?start={0}-{1}-{2}&end={3}-{4}-{5}&products={6}&username=&razzia_title={7}" .format(int(request.POST['start_year']), int(request.POST['start_month']), int(request.POST['start_day']), int(request.POST['end_year']), int(request.POST['end_month']), int(request.POST['end_day']), request.POST.get('products'), request.POST.get('razzia_title'))) suggested_start_date = timezone.now() - datetime.timedelta(days=-180) suggested_end_date = timezone.now() start_date_picker = fields.DateField( widget=extras.SelectDateWidget(years=[x for x in range(2000, timezone.now().year + 1)])) end_date_picker = fields.DateField( widget=extras.SelectDateWidget(years=[x for x in range(2000, timezone.now().year + 1)])) return render(request, 'admin/stregsystem/razzia/wizard.html', { 'start_date_picker': start_date_picker.widget.render("start", suggested_start_date), 'end_date_picker': end_date_picker.widget.render("end", suggested_end_date)}, )
def nodeinfo_view(request): """Generate a NodeInfo document.""" site = Site.objects.get_current() usage = {"users": {}} if settings.SOCIALHOME_STATISTICS: usage = { "users": { "total": User.objects.count(), "activeHalfyear": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=180)).count(), "activeMonth": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=30)).count(), }, "localPosts": Content.objects.filter(author__user__isnull=False, content_type=ContentType.CONTENT).count(), "localComments": Content.objects.filter(author__user__isnull=False, content_type=ContentType.REPLY).count(), } nodeinfo = NodeInfo( software={"name": "socialhome", "version": version}, protocols={"inbound": ["diaspora"], "outbound": ["diaspora"]}, services={"inbound": [], "outbound": []}, open_registrations=settings.ACCOUNT_ALLOW_REGISTRATION, usage=usage, metadata={"nodeName": site.name} ) return JsonResponse(nodeinfo.doc)
def test_view_responds_stats_on(self): self.get(NODEINFO_DOCUMENT_PATH) self.response_200() self.assertEqual( json.loads(decode_if_bytes(self.last_response.content))["usage"], { "users": { "total": User.objects.count(), "activeHalfyear": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=180)).count(), "activeMonth": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=30)).count(), }, "localPosts": Content.objects.filter( author__user__isnull=False, content_type=ContentType.CONTENT).count(), "localComments": Content.objects.filter( author__user__isnull=False, content_type=ContentType.REPLY).count(), } )
def mention_event(project, group, tenant, event=None): ts = to_timestamp(timezone.now()) id = '%s/%s' % (group.id, event.id if event is not None else '-') item = json.dumps( { 'project': project.id, 'group': group.id, 'event': event.id if event is not None else None, 'last_mentioned': ts, } ) expires = (RECENT_HOURS + 1) * 60 * 60 with cluster.map() as client: key = get_key(tenant) client.zadd(key, ts, id) client.expire(key, expires) client.setex('%s:%s' % (key, id), expires, item) client.zremrangebyscore(key, '-inf', time.time() - (RECENT_HOURS * 60)) client.zremrangebyrank(key, 0, -MAX_RECENT - 1)
def test_create_oauth2_token(self): admin_user = mixer.blend('auth.User', is_staff=True, is_superuser=True) app = Application.objects.create( name='SuperAPI OAUTH2 APP', user=admin_user, client_type=Application.CLIENT_PUBLIC, authorization_grant_type=Application.GRANT_PASSWORD, ) assert Application.objects.count() == 1, "Should be equal" random = get_random_string(length=16) admin_token = AccessToken.objects.create( user=admin_user, scope='read write', # ?? ????? . . . expires=timezone.now() + timedelta(minutes=5), token=f'{random}---{admin_user.username}', application=app ) assert admin_token is not None, "??? ???"
def create_modifications(cls, instance, previous, current): prev = defaultdict(lambda: None, previous) curr = defaultdict(lambda: None, current) # Compute difference between previous and current diffkeys = set([k for k in prev if prev[k] != curr[k]]) in_previous_not_current = set([k for k in prev if k not in curr]) in_current_not_previous = set([k for k in curr if k not in prev]) diffkeys = diffkeys.union(in_previous_not_current).union(in_current_not_previous) current_datetime = timezone.now() for key in diffkeys: FieldModification.objects.create( field_name=key, previous_value=prev[key], current_value=curr[key], content_object=instance, created=current_datetime, )
def make_active_productlist_query(queryset): now = timezone.now() # Create a query for the set of products that MIGHT be active. Might # because they can be out of stock. Which we compute later active_candidates = ( queryset .filter( Q(active=True) & (Q(deactivate_date=None) | Q(deactivate_date__gte=now))) ) # This query selects all the candidates that are out of stock. candidates_out_of_stock = ( active_candidates .filter(sale__timestamp__gt=F("start_date")) .annotate(c=Count("sale__id")) .filter(c__gte=F("quantity")) .values("id") ) # We can now create a query that selects all the candidates which are not # out of stock. return ( active_candidates .exclude( Q(start_date__isnull=False) & Q(id__in=candidates_out_of_stock)))
def test_update_and_stop_seeding_that_seeding_return_none(self, mock_get_torrent, mock_hset): mock_get_torrent.return_value = self.torrent( status='seeding', progress=Decimal('100.00'), ratio=Decimal('9.99'), rateUpload=10500, rateDownload=105000, stop=mock_stop ) self.torrent_model.created = timezone.now() + timezone.timedelta(hours=-24, seconds=-1) self.torrent_model.save() self.assertIsNone(update_and_stop_seeding(self.torrent_model.pk)) mock_get_torrent.assert_called_with(self.torrent_model.hash) mock_hset.assert_called_with('torrent:{}'.format(self.torrent_model.pk), 'rate_upload', 0)
def _sync_report(self, h1_report, now): self.stdout.write(f"Synchronizing #{h1_report.id}.") scope = h1_report.structured_scope # Create or update the report report, created = Report.objects.update_or_create( defaults=dict( title=h1_report.title, created_at=h1_report.created_at, triaged_at=h1_report.triaged_at, closed_at=h1_report.closed_at, disclosed_at=h1_report.disclosed_at, state=h1_report.state, issue_tracker_reference_url=h1_report.issue_tracker_reference_url or "", weakness=h1_report.weakness.name if h1_report.weakness else "", asset_identifier=scope and scope.asset_identifier, asset_type=scope and scope.asset_type, is_eligible_for_bounty=scope and scope.eligible_for_bounty, last_synced_at=now, ), id=h1_report.id ) self._sync_bounties(report, h1_report) self._sync_activities(report, h1_report)
def build_rebuild(request, build_id): build = get_object_or_404(Build, id=build_id) if not request.user.is_staff: return HttpResponseForbidden( 'You are not authorized to rebuild builds') rebuild = Rebuild( build=build, user=request.user, status='queued', ) rebuild.save() if not build.log: build.log = '' build.log += '\n=== Build restarted at {} by {} ===\n'.format( timezone.now(), request.user.username) build.current_rebuild = rebuild build.save() return HttpResponseRedirect('/builds/{}'.format(build.id))
def handle(self, *args, **options): job, created = RepeatableJob.objects.get_or_create( callable = 'metaci.build.tasks.check_waiting_builds', enabled = True, name = 'check_waiting_builds', queue = 'short', defaults={ 'interval': 1, 'interval_unit': 'minutes', 'scheduled_time': timezone.now(), } ) if created: self.stdout.write(self.style.SUCCESS('Created job check_waiting_builds with id {}'.format(job.id))) else: self.stdout.write(self.style.SUCCESS('Scheduled job check_waiting_builds with id {} already exists and is {}.'.format(job.id, 'enabled' if job.enabled else 'disabled')))
def teste_cenario_1(self): inicio = timezone.now() fim = inicio + datetime.timedelta(days=10) c_user = Usuario(nome="Test User") c_user.save() c_local = Localidade(nome="Web") c_local.save() anunciante = Usuario.objects.get(nome="Test User") localidade = Localidade.objects.get(nome = "Web") anuncio = Anuncio(anunciante=anunciante, titulo="Choppada Engenharia Eletrônica", descricao="", data_inicio=inicio, data_fim=fim, localidade=localidade) self.assertIs(anuncio.publicar(), None) #################################################### #Cenário 2: # #Título: Choppada de Engenharia Eletrônica, de Engenharia de Controle e Automação, de Engenharia de Computação e Informação, de Engenharia de Produção, de Engenharia Metalúrgica, de Psicologia e de Ciências Sociais (inválido) #Data Inicio: data atual (válido) #Data fim: data atual + 10 dias (válido) ####################################################
def teste_cenario_2(self): c_user = Usuario(nome="Test User") c_user.save() c_local = Localidade(nome="Web") c_local.save() anunciante = Usuario.objects.get(nome="Test User") localidade = Localidade.objects.get(nome = "Web") inicio = timezone.now() fim = inicio + datetime.timedelta(days=10) titulo = "Choppada de Engenharia Eletrônica, de Engenharia de Controle e Automação, de Engenharia de Computação e Informação, de Engenharia de Produção, de Engenharia Metalúrgica, de Psicologia e de Ciências Sociais" anuncio = Anuncio(anunciante=anunciante, titulo=titulo, data_inicio=inicio, data_fim=fim, localidade=localidade) self.assertIsNot(anuncio.publicar(), None) #################################################### #Cenário 3: # #Título: Choppada Engenharia Eletrônica (válido) #Data Inicio: em branco (inválido) #Data fim: data atual + 10 dias (válido) ####################################################
def teste_cenario_3(self): c_user = Usuario(nome="Test User") c_user.save() c_local = Localidade(nome="Web") c_local.save() anunciante = Usuario.objects.get(nome="Test User") localidade = Localidade.objects.get(nome = "Web") fim = timezone.now() + datetime.timedelta(days=10) titulo = "Choppada de Engenharia Eletrônica, de Engenharia de Controle e Automação, de Engenharia de Computação e Informação, de Engenharia de Produção, de Engenharia Metalúrgica, de Psicologia e de Ciências Sociais" anuncio = Anuncio(anunciante=anunciante, titulo=titulo, data_fim=fim, localidade=localidade) self.assertIsNot(anuncio.publicar(), None) #################################################### #Cenário 4: # #Título: Choppada Engenharia Eletrônica (válido) #Data Inicio: em branco (inválido) #Data fim: data atual + 10 dias (válido) ####################################################
def filter_time(self, queryset, name, value): """ Filter to valid parkings at given time stamp. If there is no valid parkings at given time, but there is a parking within a day from given time, then return the parking that has the latest ending time. :type queryset: parkings.models.ParkingQuerySet :type name: str :type value: datetime.datetime """ time = value if value else timezone.now() valid_parkings = queryset.valid_at(time) if valid_parkings: return valid_parkings limit = time - get_time_old_parkings_visible() valid_within_limit = queryset.starts_before(time).ends_after(limit) return valid_within_limit.order_by('-time_end')[:1]
def send(self): if self.sent: raise Exception('This mail has been sent already. It cannot be sent again.') from byro.mails.send import mail_send_task mail_send_task.apply_async( kwargs={ 'to': self.to.split(','), 'subject': self.subject, 'body': self.text, 'sender': self.reply_to, 'cc': (self.cc or '').split(','), 'bcc': (self.bcc or '').split(','), } ) self.sent = now() self.save()
def execute(self): """ Execute the PreparedBillingAgreement by creating and executing a matching BillingAgreement. """ # Save the execution time first. # If execute() fails, executed_at will be set, with no executed_agreement set. self.executed_at = now() self.save() with transaction.atomic(): ret = BillingAgreement.execute(self.id) ret.user = self.user ret.save() self.executed_agreement = ret self.save() return ret
def check_identity(self, token): """ Lookup token on identity service and create/update local user. """ logger.info("checking identity server {}".format(settings.KEL["IDENTITY_URL"])) params = {"access_token": token} resp = requests.get("{}/tokeninfo/".format(settings.KEL["IDENTITY_URL"]), params=params) if not resp.ok: return None payload = resp.json() with transaction.atomic(): user = next(iter(User.objects.filter(username=payload["user"]["username"])), None) if user is None: user = User.objects.create(username=payload["user"]["username"]) else: user.last_login = timezone.now() user.save() return user
def customer_monthly_usage(subscription_start, start=None, end=None): if start is None: start = subscription_start # regardless of start, the monthly iterator must use subscription_start for # the sake of enumerating. periods = iter_months(start=subscription_start, end=end or timezone.now()) for m, usage in enumerate(usage_for_periods(periods), 1): if usage['period']['end'] <= start: continue usage.update( labels=dict( year_month=label_year_month_m(m), year_quarter=label_year_quarter_m(m), year=label_year_m(m), ), ) yield usage
def customer_quarterly_usage(subscription_start, start=None, end=None): if start is None: start = subscription_start # regardless of start, the quarterly iterator must use subscription_start # for the sake of enumerating. periods = iter_quarters(start=subscription_start, end=end or timezone.now()) for q, usage in enumerate(usage_for_periods(periods), 1): if usage['period']['end'] <= start: continue usage.update( labels=dict( year_quarter=label_year_quarter_q(q), year=label_year_q(q), ), ) yield usage
def customer_yearly_usage(subscription_start, start=None, end=None): if start is None: start = subscription_start # regardless of start, the yearly iterator must use subscription_start # for the sake of enumerating. periods = iter_years(start=subscription_start, end=end or timezone.now()) for y, usage in enumerate(usage_for_periods(periods), 1): if usage['period']['end'] <= start: continue usage.update( labels=dict( year=label_year_y(y) ), ) yield usage
def post(self, request, server_id): if 'private_secret' not in request.GET: return HttpResponse('missing required param private_secret', status=400) server = (Server.objects .filter(id=server_id, private_secret=request.GET['private_secret']) .first()) if server is None: return HttpResponse('server does not exist', status=404) data = json.loads(request.body) if 'characters' in data: sync_characters_task.delay(server.id, data['characters'], request.GET) if 'clans' in data: sync_clans_task.delay(server.id, data['clans']) server.last_sync = timezone.now() server.save() delete_old_history.delay() return HttpResponse(status=200)
def save(self, *args, **kwargs): """ Guarda el objeto en BD, en realidad lo único que hace es actualizar los datetimes. El datetime de actualización se actualiza siempre, el de creación sólo al guardar de nuevas. """ # Datetime con el momento actual en UTC now_datetime = datetime.datetime.now() # Si no se ha guardado aún, el datetime de creación es la fecha actual if not self.id: self.creation_datetime = localize_datetime(now_datetime) # El datetime de actualización es la fecha actual self.last_update_datetime = localize_datetime(now_datetime) # Llamada al constructor del padre super(VPOSPaymentOperation, self).save(*args, **kwargs) #################################################################### #################################################################### # Excepción para indicar que la operación charge ha devuelto una respuesta incorrecta o de fallo
def save(self, *args, **kwargs): """ Guarda el objeto en BD, en realidad lo único que hace es actualizar los datetimes. El datetime de actualización se actualiza siempre, el de creación sólo al guardar de nuevas. """ # Datetime con el momento actual en UTC now_datetime = datetime.datetime.now() # Si no se ha guardado aún, el datetime de creación es la fecha actual if not self.id: self.creation_datetime = localize_datetime(now_datetime) # El datetime de actualización es la fecha actual self.last_update_datetime = localize_datetime(now_datetime) # Llamada al constructor del padre super(VPOSRefundOperation, self).save(*args, **kwargs) ######################################################################################################################## ######################################################################################################################## ####################################################### TPV Ceca ####################################################### ######################################################################################################################## ########################################################################################################################
def save(self, *args, **kwargs): if self.date_taken is None: try: exif_date = self.exif.get('DateTimeOriginal', None) if exif_date is not None: d, t = exif_date.split(" ") year, month, day = d.split(':') hour, minute, second = t.split(':') if getattr(settings, "USE_TZ", False): tz = get_current_timezone() self.date_taken = make_aware(datetime( int(year), int(month), int(day), int(hour), int(minute), int(second)), tz) else: self.date_taken = datetime( int(year), int(month), int(day), int(hour), int(minute), int(second)) except Exception: pass if self.date_taken is None: self.date_taken = now() super(Image, self).save(*args, **kwargs)
def has_key(self, key, version=None): key = self.make_key(key, version=version) self.validate_key(key) db = router.db_for_read(self.cache_model_class) connection = connections[db] table = connection.ops.quote_name(self._table) if settings.USE_TZ: now = datetime.utcnow() else: now = datetime.now() now = now.replace(microsecond=0) with connection.cursor() as cursor: cursor.execute("SELECT cache_key FROM %s " "WHERE cache_key = %%s and expires > %%s" % table, [key, connection.ops.adapt_datetimefield_value(now)]) return cursor.fetchone() is not None
def _cull(self, db, cursor, now): if self._cull_frequency == 0: self.clear() else: connection = connections[db] table = connection.ops.quote_name(self._table) cursor.execute("DELETE FROM %s WHERE expires < %%s" % table, [connection.ops.adapt_datetimefield_value(now)]) cursor.execute("SELECT COUNT(*) FROM %s" % table) num = cursor.fetchone()[0] if num > self._max_entries: cull_num = num // self._cull_frequency cursor.execute( connection.ops.cache_key_culling_sql() % table, [cull_num]) cursor.execute("DELETE FROM %s " "WHERE cache_key < %%s" % table, [cursor.fetchone()[0]])
def get_expiry_age(self, **kwargs): """Get the number of seconds until the session expires. Optionally, this function accepts `modification` and `expiry` keyword arguments specifying the modification and expiry of the session. """ try: modification = kwargs['modification'] except KeyError: modification = timezone.now() # Make the difference between "expiry=None passed in kwargs" and # "expiry not passed in kwargs", in order to guarantee not to trigger # self.load() when expiry is provided. try: expiry = kwargs['expiry'] except KeyError: expiry = self.get('_session_expiry') if not expiry: # Checks both None and 0 cases return settings.SESSION_COOKIE_AGE if not isinstance(expiry, datetime): return expiry delta = expiry - modification return delta.days * 86400 + delta.seconds
def get_expiry_date(self, **kwargs): """Get session the expiry date (as a datetime object). Optionally, this function accepts `modification` and `expiry` keyword arguments specifying the modification and expiry of the session. """ try: modification = kwargs['modification'] except KeyError: modification = timezone.now() # Same comment as in get_expiry_age try: expiry = kwargs['expiry'] except KeyError: expiry = self.get('_session_expiry') if isinstance(expiry, datetime): return expiry if not expiry: # Checks both None and 0 cases expiry = settings.SESSION_COOKIE_AGE return modification + timedelta(seconds=expiry)
def _ask_default(self): print("Please enter the default value now, as valid Python") print("The datetime and django.utils.timezone modules are available, so you can do e.g. timezone.now()") while True: if six.PY3: # Six does not correctly abstract over the fact that # py3 input returns a unicode string, while py2 raw_input # returns a bytestring. code = input(">>> ") else: code = input(">>> ").decode(sys.stdin.encoding) if not code: print("Please enter some code, or 'exit' (with no quotes) to exit.") elif code == "exit": sys.exit(1) else: try: return eval(code, {}, {"datetime": datetime_safe, "timezone": timezone}) except (SyntaxError, NameError) as e: print("Invalid input: %s" % e)
def ask_not_null_addition(self, field_name, model_name): "Adding a NOT NULL field to a model" if not self.dry_run: choice = self._choice_input( "You are trying to add a non-nullable field '%s' to %s without a default; " "we can't do that (the database needs something to populate existing rows).\n" "Please select a fix:" % (field_name, model_name), [ "Provide a one-off default now (will be set on all existing rows)", "Quit, and let me add a default in models.py", ] ) if choice == 2: sys.exit(3) else: return self._ask_default() return None
def ask_not_null_alteration(self, field_name, model_name): "Changing a NULL field to NOT NULL" if not self.dry_run: choice = self._choice_input( "You are trying to change the nullable field '%s' on %s to non-nullable " "without a default; we can't do that (the database needs something to " "populate existing rows).\n" "Please select a fix:" % (field_name, model_name), [ "Provide a one-off default now (will be set on all existing rows)", ("Ignore for now, and let me handle existing rows with NULL myself " "(e.g. because you added a RunPython or RunSQL operation to handle " "NULL values in a previous data migration)"), "Quit, and let me add a default in models.py", ] ) if choice == 2: return NOT_PROVIDED elif choice == 3: sys.exit(3) else: return self._ask_default() return None
def get_dated_queryset(self, **lookup): """ Get a queryset properly filtered according to `allow_future` and any extra lookup kwargs. """ qs = self.get_queryset().filter(**lookup) date_field = self.get_date_field() allow_future = self.get_allow_future() allow_empty = self.get_allow_empty() paginate_by = self.get_paginate_by(qs) if not allow_future: now = timezone.now() if self.uses_datetime_field else timezone_today() qs = qs.filter(**{'%s__lte' % date_field: now}) if not allow_empty: # When pagination is enabled, it's better to do a cheap query # than to load the unpaginated queryset in memory. is_empty = len(qs) == 0 if paginate_by is None else not qs.exists() if is_empty: raise Http404(_("No %(verbose_name_plural)s available") % { 'verbose_name_plural': force_text(qs.model._meta.verbose_name_plural) }) return qs
def load(self): try: data = self._cache.get(self.cache_key) except Exception: # Some backends (e.g. memcache) raise an exception on invalid # cache keys. If this happens, reset the session. See #17810. data = None if data is None: # Duplicate DBStore.load, because we need to keep track # of the expiry date to set it properly in the cache. try: s = self.model.objects.get( session_key=self.session_key, expire_date__gt=timezone.now() ) data = self.decode(s.session_data) self._cache.set(self.cache_key, data, self.get_expiry_age(expiry=s.expire_date)) except (self.model.DoesNotExist, SuspiciousOperation) as e: if isinstance(e, SuspiciousOperation): logger = logging.getLogger('django.security.%s' % e.__class__.__name__) logger.warning(force_text(e)) self._session_key = None data = {} return data
def validate_token_age(callback_token): """ Returns True if a given token is within the age expiration limit. """ try: token = CallbackToken.objects.get(key=callback_token, is_active=True) seconds = (timezone.now() - token.created_at).total_seconds() token_expiry_time = api_settings.PASSWORDLESS_TOKEN_EXPIRE_TIME if seconds <= token_expiry_time: return True else: # Invalidate our token. token.is_active = False token.save() return False except CallbackToken.DoesNotExist: # No valid token. return False
def fetch_og_preview(content, urls): """Fetch first opengraph entry for a list of urls.""" for url in urls: # See first if recently cached already if OpenGraphCache.objects.filter(url=url, modified__gte=now() - datetime.timedelta(days=7)).exists(): opengraph = OpenGraphCache.objects.get(url=url) Content.objects.filter(id=content.id).update(opengraph=opengraph) return opengraph try: og = OpenGraph(url=url, parser="lxml") except AttributeError: continue if not og or ("title" not in og and "site_name" not in og and "description" not in og and "image" not in og): continue try: title = og.title if "title" in og else og.site_name if "site_name" in og else "" description = og.description if "description" in og else "" image = og.image if "image" in og and not content.is_nsfw else "" try: with transaction.atomic(): opengraph = OpenGraphCache.objects.create( url=url, title=truncate_letters(safe_text(title), 250), description=safe_text(description), image=safe_text(image), ) except DataError: continue except IntegrityError: # Some other process got ahead of us opengraph = OpenGraphCache.objects.get(url=url) Content.objects.filter(id=content.id).update(opengraph=opengraph) return opengraph Content.objects.filter(id=content.id).update(opengraph=opengraph) return opengraph return False
def fetch_oembed_preview(content, urls): """Fetch first oembed content for a list of urls.""" for url in urls: # See first if recently cached already if OEmbedCache.objects.filter(url=url, modified__gte=now()-datetime.timedelta(days=7)).exists(): oembed = OEmbedCache.objects.get(url=url) Content.objects.filter(id=content.id).update(oembed=oembed) return oembed # Fetch oembed options = {} if url.startswith("https://twitter.com/"): # This probably has little effect since we fetch these on the backend... # But, DNT is always good to communicate if possible :) options = {"dnt": "true"} try: oembed = PyEmbed(discoverer=OEmbedDiscoverer()).embed(url, **options) except (PyEmbedError, PyEmbedDiscoveryError, PyEmbedConsumerError, ValueError): continue if not oembed: continue # Ensure width is 100% not fixed oembed = re.sub(r'width="[0-9]*"', 'width="100%"', oembed) oembed = re.sub(r'height="[0-9]*"', "", oembed) try: with transaction.atomic(): oembed = OEmbedCache.objects.create(url=url, oembed=oembed) except IntegrityError: # Some other process got ahead of us oembed = OEmbedCache.objects.get(url=url) Content.objects.filter(id=content.id).update(oembed=oembed) return oembed Content.objects.filter(id=content.id).update(oembed=oembed) return oembed return False
def get_recommended_products(cls, lang, family=None, category=None, subcategory=None): products = [] query = Q(most_sold=True) | Q(product__products_image__principal=True) if family is not None: query &= Q(product__family=category) if category is not None: query &= Q(product__category=category) if subcategory is not None: query &= Q(product__subcategory=subcategory) for product in cls.query_or( query, "{}__slug".format(lang), "offer", "created", "offer", "pk", "product__{}__name".format(lang), "product__model", "product__brand__{}__name".format(lang), "product__products_image__image", "{}__meta_title".format(lang), slug="{}__slug".format(lang), meta_title="{}__meta_title".format(lang), image="product__products_image__image", name="product__{}__name".format(lang), pop_annotations=True ): product['new'] = 1 if (timezone.now() - product['created']).days <= settings.CDNX_PRODUCTS_NOVELTY_DAYS else 0 products.append(product) return products
def get_products(cls, lang, family=None, category=None, subcategory=None, brand=None): products = [] query = Q(product__products_image__principal=True) if family is not None: query &= Q(product__family=family) if category is not None: query &= Q(product__category=category) if subcategory is not None: query &= Q(product__subcategory=subcategory) if brand is not None: query &= Q(product__brand=brand) for product in cls.query_or( query, "{}__slug".format(lang), "offer", "created", "offer", "pk", "product__tax__tax", "product__{}__name".format(lang), "product__model", "product__brand__{}__name".format(lang), "product__products_image__image", "{}__meta_title".format(lang), slug="{}__slug".format(lang), meta_title="{}__meta_title".format(lang), image="product__products_image__image", name="product__{}__name".format(lang), pop_annotations=True ): prices = cls.objects.get(pk=product['pk']).calculate_price() product['price'] = prices['price_total'] product['new'] = 1 if (timezone.now() - product['created']).days <= settings.CDNX_PRODUCTS_NOVELTY_DAYS else 0 products.append(product) return products
def _save(self, name, content): # Make sure that the cache stores the file as bytes, like it would be # on disk. content = content.read() try: content = content.encode() except AttributeError: pass with self._lock.writer(): while name in self.cache: name = self.get_available_name(name) self.cache[name] = FakeContent(content, now()) return name
def post_list(request): posts = Post.objects.filter(published_date__lte=timezone.now()).order_by('published_date') return render(request, 'blog/post_list.html', {'posts': posts})
def publish(self): self.published_date = timezone.now() self.save()
def start_current(self, **data): try: self.stop_current() except exceptions.NoCurrentEntry: pass entry = TimeEntry(user=self.user, start=timezone.now(), **data) entry.save() return entry
def stop_current(self): current = self.current() current.stop = timezone.now() current.save() return current