我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.datetime.combine()。
def test_iter_quarters(): start = timezone.make_aware(datetime(2015, 11, 30, 1, 2, 3)) end = timezone.make_aware(datetime(2017, 2, 28, 11, 22, 33)) quarters = iter_quarters(start, end) assert type(quarters) is types.GeneratorType starts = [ datetime.combine(datetime(year, month, day).date(), start.timetz()) for year, month, day in [ (2015, 11, 30), (2016, 2, 29), # leap! (2016, 5, 30), (2016, 8, 30), (2016, 11, 30), (2017, 2, 28), ] ] ends = starts[1:] + [end] assert list(quarters) == list(zip(starts, ends))
def test_iter_years(): start = timezone.make_aware(datetime(2016, 2, 29, 1, 2, 3)) end = timezone.make_aware(datetime(2019, 2, 28, 11, 22, 33)) years = iter_years(start, end) assert type(years) is types.GeneratorType starts = [ datetime.combine(datetime(year, month, day).date(), start.timetz()) for year, month, day in [ (2016, 2, 29), # leap! (2017, 2, 28), (2018, 2, 28), (2019, 2, 28), ] ] ends = starts[1:] + [end] assert list(years) == list(zip(starts, ends))
def format_date(utc, isoformat=False): """Parse Twitter's UTC date into UTC or local time.""" u = datetime.strptime(utc.replace('+0000','UTC'), '%a %b %d %H:%M:%S %Z %Y') # This is the least painful way I could find to create a non-naive # datetime including a UTC timezone. Alternative suggestions # welcome. unew = datetime.combine(u.date(), time(u.time().hour, u.time().minute, u.time().second, tzinfo=UTC)) # Convert to localtime unew = unew.astimezone(Local) if isoformat: return unew.isoformat() else: return unew.strftime('%Y-%m-%d %H:%M:%S %Z')
def _get_day_attack_schedule(self): """ Return an array of datetimes according to the planner args """ planer_args = self.planner_config["args"] start_time = datetime.strptime(planer_args["min_time"], "%H:%M").time() start_date = datetime.combine(datetime.today().date(), start_time) end_time = datetime.strptime(planer_args["max_time"], "%H:%M").time() end_date = datetime.combine(datetime.today().date(), end_time) random.seed() attack_schedule = [] for start, end in self._split_date_range(start_date, end_date, planer_args["times"]): attack_schedule.append(random.uniform(start, end)) return attack_schedule
def _validate_mindate(self, min_date, field, value): """ {'type': ['date', 'datetime']} """ # Remarks # ------- # the yaml-reader prepares a datetime.date objects when possible, # the dwca-reader is not doing this, so compatibility need to be better # ensured if self._dateisrange(value): [self._validate_mindate(min_date, field, valdate) for valdate in value.split("/")] else: # convert schema info to datetime to enable comparison if isinstance(min_date, date): min_date = datetime.combine(min_date, datetime.min.time()) # try to parse the datetime-format event_date = self._parse_date(field, value) if event_date: if event_date < min_date: self._error(field, "date is before min limit " + min_date.date().isoformat())
def _validate_maxdate(self, max_date, field, value): """ {'type': ['date', 'datetime']} """ # Remarks # ------- # the yaml-reader prepares a datetime.date objects when possible, # the dwca-reader is not doing this, so compatibility need to be better # ensured if self._dateisrange(value): for valdate in value.split("/"): self._validate_maxdate(max_date, field, valdate) else: # convert schema info to datetime to enable comparison if isinstance(max_date, date): max_date = datetime.combine(max_date, datetime.min.time()) # try to parse the datetime-format event_date = self._parse_date(field, value) if event_date: if event_date > max_date: self._error(field, "date is after max limit " + max_date.date().isoformat())
def test_combine(self): d = date(2002, 3, 4) t = time(18, 45, 3, 1234) expected = self.theclass(2002, 3, 4, 18, 45, 3, 1234) combine = self.theclass.combine dt = combine(d, t) self.assertEqual(dt, expected) dt = combine(time=t, date=d) self.assertEqual(dt, expected) self.assertEqual(d, dt.date()) self.assertEqual(t, dt.time()) self.assertEqual(dt, combine(dt.date(), dt.time())) self.assertRaises(TypeError, combine) # need an arg self.assertRaises(TypeError, combine, d) # need two args self.assertRaises(TypeError, combine, t, d) # args reversed self.assertRaises(TypeError, combine, d, t, 1) # too many args self.assertRaises(TypeError, combine, "date", "time") # wrong types self.assertRaises(TypeError, combine, d, "time") # wrong type self.assertRaises(TypeError, combine, "date", t) # wrong type
def calculate_mean_departure_datetime(departure_datetimes): """ Calculate the mean value of a list of departure_datetime values. :param departure_datetimes: [datetime] :return: mean_departure_datetime: datetime """ total = 0 number_of_departure_datetimes = len(departure_datetimes) for departure_datetime in departure_datetimes: total += (departure_datetime.hour * 3600) + (departure_datetime.minute * 60) + departure_datetime.second avg = total / number_of_departure_datetimes minutes, seconds = divmod(int(avg), 60) hours, minutes = divmod(minutes, 60) mean_departure_datetime = datetime.combine(departure_datetimes[0].date(), time(hours, minutes, seconds)) return mean_departure_datetime
def get_task_instances( self, session, start_date=None, end_date=None, state=None): TI = TaskInstance if not start_date: start_date = (timezone.utcnow() - timedelta(30)).date() start_date = datetime.combine(start_date, datetime.min.time()) end_date = end_date or timezone.utcnow() tis = session.query(TI).filter( TI.dag_id == self.dag_id, TI.execution_date >= start_date, TI.execution_date <= end_date, TI.task_id.in_([t.task_id for t in self.tasks]), ) if state: tis = tis.filter(TI.state == state) tis = tis.order_by(TI.execution_date).all() return tis
def transform_inverse(self, df): price = pd.read_csv(self.path_to_price, parse_dates=True, index_col='Unnamed: 0') # index = [] values = [] days = extract_days(df) time_intervs_in_day = [d.time() for i, d in enumerate(price.index)] for day in days: i = 0 for time_intv in time_intervs_in_day: if time_intv <= df.index[i].time(): values.append(df.values[i][0]) index.append(datetime.combine(day, time_intv)) else: i+=1 values.append(df.values[i][0]) index.append(datetime.combine(day, time_intv)) df_out = pd.DataFrame(values, columns=[df.columns[0]], index=index) return df_out
def test_combine(self): d = date(2002, 3, 4) t = time(18, 45, 3, 1234) expected = self.theclass(2002, 3, 4, 18, 45, 3, 1234) combine = self.theclass.combine dt = combine(d, t) self.assertEqual(dt, expected) dt = combine(time=t, date=d) self.assertEqual(dt, expected) self.assertEqual(d, dt.date()) self.assertEqual(t, dt.time()) self.assertEqual(dt, combine(dt.date(), dt.time())) self.assertRaises(TypeError, combine) # need an arg self.assertRaises(TypeError, combine, d) # need two args self.assertRaises(TypeError, combine, t, d) # args reversed self.assertRaises(TypeError, combine, d, t, 1) # too many args self.assertRaises(TypeError, combine, "date", "time") # wrong types
def timetable_entries_which_violate_constraints(self): start_date = self.start.date() entries_which_violate_constraints = [] for constraint in self.constraints.all(): constraint_start = timezone.make_aware( datetime.combine(start_date, constraint.start_time), timezone.get_current_timezone()) constraint_end = timezone.make_aware( datetime.combine(start_date, constraint.end_time), timezone.get_current_timezone()) participations = Participation.objects.filter(entry__meeting=self, user=constraint.user, ignored_for_optimization=False, entry__timetable_index__isnull=False) for participation in participations: start = participation.entry.start end = participation.entry.end if (constraint_start >= start and constraint_start < end) or \ (constraint_end > start and constraint_end <= end) or \ (constraint_start <= start and constraint_end >= end): entries_which_violate_constraints.append(participation.entry) return entries_which_violate_constraints
def clean(self): ''' Only allow submission if there are not already slots in the submitted window, and only allow rooms associated with the chosen location. ''' super(SlotCreationForm,self).clean() startDate = self.cleaned_data.get('startDate') endDate = self.cleaned_data.get('endDate') startTime = self.cleaned_data.get('startTime') endTime = self.cleaned_data.get('endTime') instructor = self.cleaned_data.get('instructorId') existingSlots = InstructorAvailabilitySlot.objects.filter( instructor=instructor, startTime__gt=( ensure_localtime(datetime.combine(startDate,startTime)) - timedelta(minutes=getConstant('privateLessons__lessonLengthInterval')) ), startTime__lt=ensure_localtime(datetime.combine(endDate,endTime)), ) if existingSlots.exists(): raise ValidationError(_('Newly created slots cannot overlap existing slots for this instructor.'),code='invalid')
def shouldStartAfter(self): if self.endDate: return tender = self.__parent__ if tender.lots or tender.status not in ['active.tendering', 'active.pre-qualification.stand-still', 'active.auction']: return start_after = None if tender.status == 'active.tendering' and tender.tenderPeriod.endDate: start_after = calculate_business_date(tender.tenderPeriod.endDate, TENDERING_AUCTION, tender) elif self.startDate and get_now() > calc_auction_end_time(tender.numberOfBids, self.startDate): start_after = calc_auction_end_time(tender.numberOfBids, self.startDate) elif tender.qualificationPeriod and tender.qualificationPeriod.endDate: decision_dates = [ datetime.combine(complaint.dateDecision.date() + timedelta(days=3), time(0, tzinfo=complaint.dateDecision.tzinfo)) for qualification in tender.qualifications for complaint in qualification.complaints if complaint.dateDecision ] decision_dates.append(tender.qualificationPeriod.endDate) start_after = max(decision_dates) if start_after: return rounding_shouldStartAfter(start_after, tender).isoformat()
def shouldStartAfter(self): if self.endDate: return tender = get_tender(self) lot = self.__parent__ if tender.status not in ['active.tendering', 'active.pre-qualification.stand-still', 'active.auction'] or lot.status != 'active': return start_after = None if tender.status == 'active.tendering' and tender.tenderPeriod.endDate: start_after = calculate_business_date(tender.tenderPeriod.endDate, TENDERING_AUCTION, tender) elif self.startDate and get_now() > calc_auction_end_time(lot.numberOfBids, self.startDate): start_after = calc_auction_end_time(lot.numberOfBids, self.startDate) elif tender.qualificationPeriod and tender.qualificationPeriod.endDate: decision_dates = [ datetime.combine(complaint.dateDecision.date() + timedelta(days=3), time(0, tzinfo=complaint.dateDecision.tzinfo)) for qualification in tender.qualifications for complaint in qualification.complaints if complaint.dateDecision ] decision_dates.append(tender.qualificationPeriod.endDate) start_after = max(decision_dates) if start_after: return rounding_shouldStartAfter(start_after, tender).isoformat()
def _load(probe, starttime, endtime, instrument, product_id, cdfkeys): daylist = helper._daysplitinterval(starttime, endtime) data = [] for day in daylist: date = day[0] year = str(date.year) month = str(date.month).zfill(2) day = str(date.day).zfill(2) local_dir = os.path.join(cluster_dir, 'c' + probe, instrument, year) local_fname = 'C' + probe + '_' + product_id + '__' +\ year + month + day + '.cdf' # If we don't have local file download it if not os.path.exists(os.path.join(local_dir, local_fname)): thisstart = datetime.combine(date, time.min) thisend = datetime.combine(date, time.max) try: _download(probe, thisstart, thisend, instrument, product_id) except Exception as err: print(str(err), '\n') continue from pycdf import pycdf cdf = pycdf.CDF(os.path.join(local_dir, local_fname)) for key, value in cdfkeys.items(): if value == 'Time': index_key = key break data.append(helper.cdf2df(cdf, index_key, cdfkeys)) if len(data) == 0: raise RuntimeError('No data available to download during requested ' 'times') return helper.timefilter(data, starttime, endtime)
def to_json(self): speakers = [] for speaker in self.speakers.all(): speakers.append(speaker.to_json()) return { 'title': self.title, 'shortTitle': self.shortTitle, 'subTitle': self.subTitle, 'startTime': datetime_to_utc(datetime.combine(self.date, self.startTime)), 'endTime': datetime_to_utc(datetime.combine(self.date, self.endTime)), 'desc': self.description, 'img': self.image.url, 'location': self.location, 'speakers': speakers, 'registrationUrl': self.registrationUrl, 'prettyUrl': self.prettyUrl }
def test_NaT_methods(self): # GH 9513 raise_methods = ['astimezone', 'combine', 'ctime', 'dst', 'fromordinal', 'fromtimestamp', 'isocalendar', 'strftime', 'strptime', 'time', 'timestamp', 'timetuple', 'timetz', 'toordinal', 'tzname', 'utcfromtimestamp', 'utcnow', 'utcoffset', 'utctimetuple'] nat_methods = ['date', 'now', 'replace', 'to_datetime', 'today'] nan_methods = ['weekday', 'isoweekday'] for method in raise_methods: if hasattr(NaT, method): self.assertRaises(ValueError, getattr(NaT, method)) for method in nan_methods: if hasattr(NaT, method): self.assertTrue(np.isnan(getattr(NaT, method)())) for method in nat_methods: if hasattr(NaT, method): self.assertIs(getattr(NaT, method)(), NaT) # GH 12300 self.assertEqual(NaT.isoformat(), 'NaT')
def test_class_ops_pytz(self): tm._skip_if_no_pytz() from pytz import timezone def compare(x, y): self.assertEqual(int(Timestamp(x).value / 1e9), int(Timestamp(y).value / 1e9)) compare(Timestamp.now(), datetime.now()) compare(Timestamp.now('UTC'), datetime.now(timezone('UTC'))) compare(Timestamp.utcnow(), datetime.utcnow()) compare(Timestamp.today(), datetime.today()) current_time = calendar.timegm(datetime.now().utctimetuple()) compare(Timestamp.utcfromtimestamp(current_time), datetime.utcfromtimestamp(current_time)) compare(Timestamp.fromtimestamp(current_time), datetime.fromtimestamp(current_time)) date_component = datetime.utcnow() time_component = (date_component + timedelta(minutes=10)).time() compare(Timestamp.combine(date_component, time_component), datetime.combine(date_component, time_component))
def test_class_ops_dateutil(self): tm._skip_if_no_dateutil() from dateutil.tz import tzutc def compare(x, y): self.assertEqual(int(np.round(Timestamp(x).value / 1e9)), int(np.round(Timestamp(y).value / 1e9))) compare(Timestamp.now(), datetime.now()) compare(Timestamp.now('UTC'), datetime.now(tzutc())) compare(Timestamp.utcnow(), datetime.utcnow()) compare(Timestamp.today(), datetime.today()) current_time = calendar.timegm(datetime.now().utctimetuple()) compare(Timestamp.utcfromtimestamp(current_time), datetime.utcfromtimestamp(current_time)) compare(Timestamp.fromtimestamp(current_time), datetime.fromtimestamp(current_time)) date_component = datetime.utcnow() time_component = (date_component + timedelta(minutes=10)).time() compare(Timestamp.combine(date_component, time_component), datetime.combine(date_component, time_component))
def town_factory(**custom): """Generate a `Town` namedtuple from `custom` parameters. Required parameters: `dep`, `com` and `nccenr`. """ params = { 'actual': '1', 'modification': 0, 'ancestors': '', 'successors': '', 'start_date': START_DATE, 'end_date': END_DATE, 'population': 'NULL', 'parents': '' } custom['depcom'] = custom['dep'] + custom['com'] params.update(custom) params['id'] = compute_id(params['depcom'], params['start_date']) params['start_datetime'] = datetime.combine(params['start_date'], datetime.min.time()) params['end_datetime'] = datetime.combine(params['end_date'], datetime.max.time()) return Town(**params)
def block(user_id): form = UserBlockForm() myUser = User.q.get(user_id) if form.validate_on_submit(): end_date = datetime.combine(form.date.data, time(0)) if form.unlimited.data: end_date = None try: blocked_user = lib.user.block( user=myUser, date=end_date, reason=form.reason.data, processor=current_user) except ValueError as e: flash(e.message, 'error') else: flash(u'Nutzer gesperrt', 'success') return redirect(url_for('.user_show', user_id=user_id)) return render_template('user/user_block.html', form=form, user_id=user_id)
def move_out(user_id): form = UserMoveOutForm() myUser = User.q.get(user_id) if myUser is None: flash(u"Nutzer mit ID %s existiert nicht!" % (user_id,), 'error') abort(404) if form.validate_on_submit(): lib.user.move_out( user=myUser, date=datetime.combine(form.date.data, time(0)), processor=current_user, comment=form.comment.data ) flash(u'Nutzer wurde ausgezogen', 'success') return redirect(url_for('.user_show', user_id=myUser.id)) return render_template('user/user_moveout.html', form=form, user_id=user_id)
def move_out_tmp(user_id): form = UserMoveOutForm() my_user = User.q.get(user_id) if my_user is None: flash(u"Nutzer mit ID %s existiert nicht!" % (user_id,), 'error') abort(404) if form.validate_on_submit(): changed_user = lib.user.move_out_tmp( user=my_user, date=datetime.combine(form.date.data,time(0)), comment=form.comment.data, processor=current_user ) flash(u'Nutzer zieht am %s vorübegehend aus' % form.date.data, 'success') return redirect(url_for('.user_show', user_id=changed_user.id)) return render_template('user/user_moveout.html', form=form, user_id=user_id)
def get_task_instances( self, session, start_date=None, end_date=None, state=None): TI = TaskInstance if not start_date: start_date = (datetime.today()-timedelta(30)).date() start_date = datetime.combine(start_date, datetime.min.time()) end_date = end_date or datetime.now() tis = session.query(TI).filter( TI.dag_id == self.dag_id, TI.execution_date >= start_date, TI.execution_date <= end_date, TI.task_id.in_([t.task_id for t in self.tasks]), ) if state: tis = tis.filter(TI.state == state) tis = tis.all() return tis
def get_price_quote(self, d=None, column='adj_close'): """ Get the price of an Asset. :param date or datetime d: The datetime of when to retrieve the price quote from. (default: ``date.today()``) :param str column: The header of the column to use to get the price quote from. (default: ``adj_close`` :return: namedtuple with price and the the datetime :rtype: namedtuple """ quote = namedtuple('Quote', 'price time') if d is None: df = web.get_quote_yahoo(self.ticker) d = date.today() time = dt_utils.parse_date(df['time'][0]).time() dt = datetime.combine(d, time=time) return quote(price=df['last'], time=dt) else: price = self.ohlcv.ix[d][column][0] return quote(price=price, time=d)
def create_future_events(self, date_stop=None): if not self.date_stop and not date_stop: raise ValidationError(_("Stop date should be specified.")) date_stop = min(filter(None, [date_stop, self.date_stop])) current_date = max(self.time_start, timezone.now()) last_event = self.events.order_by('-date_start').first() if last_event: current_date = max(current_date, last_event.date_start + timedelta(days=1)) current_date = make_aware(datetime.combine(current_date, time.min)) added_events = [] for day in iter_daterange(current_date, date_stop): if not getattr(self, 'on_day%d' % day.weekday()): continue event = self.gen_future_event(day) event.full_clean() event.save(force_insert=True) added_events.append(event) return added_events
def get_govuk_capture_time(self, govuk_payment): try: capture_submit_time = parse_datetime( govuk_payment['settlement_summary'].get('capture_submit_time', '') ) captured_date = parse_date( govuk_payment['settlement_summary'].get('captured_date', '') ) if captured_date is not None: capture_submit_time = ( capture_submit_time or timezone.now() ).astimezone(timezone.utc) if capture_submit_time.date() < captured_date: return datetime.combine( captured_date, time.min ).replace(tzinfo=timezone.utc) elif capture_submit_time.date() > captured_date: return datetime.combine( captured_date, time.max ).replace(tzinfo=timezone.utc) else: return capture_submit_time except (KeyError, TypeError): pass raise GovUkPaymentStatusException( 'Capture date not yet available for payment %s' % govuk_payment['reference'] )
def __get_occur_time_list(self, weekday_table, course_time_table): occur_time_list = [] for split_time_string in self.__occur_time_str.split(): weekday_chinese_str = split_time_string[0] try: weekday = weekday_table[weekday_chinese_str] except KeyError: continue occur_indexes = re.findall(pattern=r'[0-9]+', string=split_time_string) start_index = int(occur_indexes[0]) - 1 end_index = int(occur_indexes[1]) - 1 occur_time_list_in_a_day = [datetime.combine(date=weekday, time=course_time_table[i]) for i in range(start_index, end_index + 1)] occur_time_list.append(occur_time_list_in_a_day) return occur_time_list
def get_user_free_intervals(account, day, start = time(0,0,0), end = time(23,59,59)): busy_intervals = [] weekday = day.strftime("%a")[0:2] # Get the first two characters of weekday string tasks = account.tasks.filter(category__in=[0,3,5,6], repeat__contains=weekday, ) busy_intervals += [(task.start.time(), task.end.time()) for task in tasks] # all_non_repeat_tasks = account.tasks.filter(category=6, start__year=day.year, start__month=day.month, start__day=day.day) all_non_repeat_tasks = account.tasks.filter(category=6, start=day) busy_intervals += [(task.start.time(), task.end.time()) for task in all_non_repeat_tasks] intervals = combine(busy_intervals) # union # compute the complement of all the intervals free_intervals = complement(intervals, first=start, last=end) return free_intervals # Copied from http://nullege.com/codes/search/Intervals.complement
def Create(user, title, due=None): if not due: tz = user.get_timezone() local_now = tools.local_time(tz) task_prefs = user.get_setting_prop(['tasks', 'preferences'], {}) same_day_hour = tools.safe_number(task_prefs.get('same_day_hour', 16), default=16, integer=True) due_hour = tools.safe_number(task_prefs.get('due_hour', 22), default=22, integer=True) schedule_for_same_day = local_now.hour < same_day_hour dt_due = local_now if due_hour > 23: due_hour = 0 dt_due += timedelta(days=1) if due_hour < 0: due_hour = 0 time_due = time(due_hour, 0) due = datetime.combine(dt_due.date(), time_due) if not schedule_for_same_day: due += timedelta(days=1) if due: due = tools.server_time(tz, due) return Task(title=tools.capitalize(title), dt_due=due, parent=user.key)
def _tasks_request(self): tasks = Task.Recent(self.user) tasks_undone = [] n_done = Task.CountCompletedSince(self.user, datetime.combine(datetime.today(), time(0,0))) for task in tasks: if not task.is_done(): tasks_undone.append(task.title) if n_done: text = "You've completed %d %s for today." % (n_done, tools.pluralize('task', n_done)) else: text = "You haven't completed any tasks yet." if tasks_undone: text += " You still need to do %s." % tools.english_list(tasks_undone) if not n_done and not tasks_undone: text += " Try adding tasks by saying 'add task Q2 planning'" return text
def to_jalali(cls, year, month=None, day=None, hour=None, minute=None, second=None, microsecond=None, tzinfo=None): if month is None and isinstance(year, dt): month = year.month day = year.day hour = year.hour minute = year.minute second = year.second microsecond = year.microsecond tzinfo = year.tzinfo year = year.year j_date = JalaliDate.to_jalali(year, month, day) return cls.combine(j_date, _time(hour=hour, minute=minute, second=second, microsecond=microsecond, tzinfo=tzinfo))
def __add__(self, other): if not isinstance(other, timedelta): return NotImplemented delta = timedelta(self.toordinal(), hours=self._hour, minutes=self._minute, seconds=self._second, microseconds=self._microsecond) delta += other hour, rem = divmod(delta.seconds, 3600) minute, second = divmod(rem, 60) if 0 < delta.days <= _MAXORDINAL: return JalaliDateTime.combine(JalaliDate.fromordinal(delta.days), _time(hour, minute, second, delta.microseconds, tzinfo=self._tzinfo)) raise OverflowError("result out of range")
def parse_datetime(datetimestring): ''' Parses ISO 8601 date-times into datetime.datetime objects. This function uses parse_date and parse_time to do the job, so it allows more combinations of date and time representations, than the actual ISO 8601:2004 standard allows. ''' try: datestring, timestring = datetimestring.split('T') except ValueError: raise ISO8601Error("ISO 8601 time designator 'T' missing. Unable to" " parse datetime string %r" % datetimestring) tmpdate = parse_date(datestring) tmptime = parse_time(timestring) return datetime.combine(tmpdate, tmptime)
def get_queryset(self): """returns actions""" start_date = self.request.GET.get('start') end_date = self.request.GET.get('end') if not start_date or not end_date: raise ParseError("Period frame missing") queryset = self.model.objects.all() queryset = self._apply_in_action_type_lookup(queryset) queryset = self._apply_in_charge_lookup(queryset) try: start_date = self._parse_date(start_date) end_date = self._parse_date(end_date) except ValueError: raise ParseError("Invalid period frame") start_datetime = datetime.combine(start_date, time.min) end_datetime = datetime.combine(end_date, time.max) if end_datetime < start_datetime: return self.model.objects.none() queryset = queryset.filter( Q(planned_date__lte=start_datetime, end_datetime__gte=end_datetime) | # starts before, ends after period Q(planned_date__gte=start_datetime, end_datetime__lte=end_datetime) | # starts and ends during period Q(planned_date__lte=start_datetime, end_datetime__gte=start_datetime) | # starts before, ends during Q(planned_date__lte=end_datetime, end_datetime__gte=end_datetime) | # starts during period, ends after Q( planned_date__gte=start_datetime, end_datetime__isnull=True, planned_date__lte=end_datetime ) # no end, starts during period ) return queryset
def _get_datetimes(self): """return selected date times""" start_date, end_date = get_date_bounds(self.value) return datetime.combine(start_date, time.min), datetime.combine(end_date, time.max)
def form_valid(self, form): """create a new sale""" analysis_code = form.cleaned_data['analysis_code'] planned_date = datetime.combine(form.cleaned_data['date'], time.min) amount = form.cleaned_data['amount'] vat_rate = form.cleaned_data['vat_rate'] action = Action.objects.create(type=analysis_code.action_type, planned_date=planned_date) if action.sale: action.sale.analysis_code = analysis_code action.sale.save() SaleItem.objects.create( sale=action.sale, pre_tax_price=amount, text=analysis_code.name, vat_rate=vat_rate, quantity=Decimal(1) ) return super(AddExtraSaleView, self).form_valid(form)
def download_files(start_date, number_of_days, lon, lat, target_directory): print('Downloading files...') if not os.path.exists(f'{target_directory}/originals'): os.makedirs(f'{target_directory}/originals') target = '%s/originals/{}' % target_directory pathlib.Path(target_directory).mkdir(parents=True, exist_ok=True) start_datetime = datetime.combine(start_date, time.min) first_day = int(start_datetime.timestamp() / 86400) # days since epoch BASE_URL = 'https://oceancolor.gsfc.nasa.gov/cgi/browse.pl' GET_FILE_URL = 'https://oceandata.sci.gsfc.nasa.gov/cgi/getfile/' url = BASE_URL + '?sub=level1or2list&sen=am&per=DAY&day={}&prm=CHL&n={}&s={}&w={}&e={}' for d in range(first_day, first_day + number_of_days): _url = url.format(d, lat[1], lat[0], lon[0], lon[1]) _data = requests.get(_url) if _data: content = _data.content all_a_href = re.findall(r'(?<=<a href=")[^"]*', str(content)) for a_href in all_a_href: # if 'getfile' in a_href and any((True for x in ['OC', 'SST'] if x in a_href)): if 'file' in a_href: try: response = requests.get(BASE_URL + a_href, timeout=(3, 60)) for link in re.findall(r'(?<=<a href=")[^"]*', str(response.content)): if 'LAC_OC.nc' in link: filename = link.split('/')[-1] r = requests.get(link) if not os.path.exists(target.format(filename)): with open(target.format(filename), 'wb') as f: f.write(r.content) print('downloaded file {}'.format(filename)) except Exception as e: print('Failed to download file due to: {}'.format(e)) print('Done downloading files...')
def read(self): out = {k: None for k in self.FIELDS + self.EXTRA_FIELD} start = datetime.now() self.ser.reset_input_buffer() while not all(out.values()): line = self.readline() if (datetime.now() - start).total_seconds() > self.timeout: break line = re.sub(r'[\x00-\x1F]|\r|\n|\t|\$', "", line) cmd = line.split(',')[0] if cmd not in ['GNGGA', 'GNRMC']: continue try: msg = pynmea2.parse(line) for key in out: if hasattr(msg, key): out[key] = getattr(msg, key) except pynmea2.ParseError as e: print("Parse error:", e) if out['datestamp'] is not None and out['timestamp'] is not None: timestamp = datetime.combine(out['datestamp'], out['timestamp']).replace(tzinfo=timezone.utc) out['timestamp'] = timestamp.isoformat() else: del out['timestamp'] if out[self.FIELDS[-1]] is not None: out[self.FIELDS[-1]] *= self.KNOTS_PER_KMPH if out.get('latitude') is not None and out.get('longitude') is not None: if out['latitude'] != 0.0 and out['longitude'] != 0.0: out['pos'] = { 'type': 'Point', 'coordinates': [out['longitude'], out['latitude']] } del out['latitude'] del out['longitude'] for f in self.EXTRA_FIELD: if f in out: del out[f] return out
def test_datetime_date_support(self): today = date.today() self.DatetimeTest.objects.create(test_id=2, created_at=today) dt2 = self.DatetimeTest.objects(test_id=2).first() self.assertEqual(dt2.created_at.isoformat(), datetime(today.year, today.month, today.day).isoformat()) result = self.DatetimeTest.objects.all().allow_filtering().filter(test_id=2).first() self.assertEqual(result.created_at, datetime.combine(today, datetime.min.time())) result = self.DatetimeTest.objects.all().allow_filtering().filter(test_id=2, created_at=today).first() self.assertEqual(result.created_at, datetime.combine(today, datetime.min.time()))
def strftime(self, _date, _format): """Convert date to string according to format :param date _date: date to convert :param str _format: format to use (same as for datetime.strftime) :return str: converted date """ _datetime = datetime.combine(_date, datetime.min.time()) return _datetime.strftime(_format)