我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用psycopg2.extras.DateRange()。
def test_cast_date(self): from psycopg2.extras import DateRange cur = self.conn.cursor() cur.execute("select '(2000-01-01,2012-12-31)'::daterange") r = cur.fetchone()[0] self.assertTrue(isinstance(r, DateRange)) self.assertTrue(not r.isempty) self.assertEqual(r.lower, date(2000, 1, 2)) self.assertEqual(r.upper, date(2012, 12, 31)) self.assertTrue(not r.lower_inf) self.assertTrue(not r.upper_inf) self.assertTrue(r.lower_inc) self.assertTrue(not r.upper_inc)
def test_adapt_date_range(self): from psycopg2.extras import DateRange, DateTimeRange, DateTimeTZRange from psycopg2.tz import FixedOffsetTimezone cur = self.conn.cursor() d1 = date(2012, 0o1, 0o1) d2 = date(2012, 12, 31) r = DateRange(d1, d2) cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] self.assertTrue(isinstance(r1, DateRange)) self.assertEqual(r1.lower, d1) self.assertEqual(r1.upper, d2) self.assertTrue(r1.lower_inc) self.assertTrue(not r1.upper_inc) r = DateTimeRange(empty=True) cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] self.assertTrue(isinstance(r1, DateTimeRange)) self.assertTrue(r1.isempty) ts1 = datetime(2000, 1, 1, tzinfo=FixedOffsetTimezone(600)) ts2 = datetime(2000, 12, 31, 23, 59, 59, 999, tzinfo=FixedOffsetTimezone(600)) r = DateTimeTZRange(ts1, ts2, '(]') cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] self.assertTrue(isinstance(r1, DateTimeTZRange)) self.assertEqual(r1.lower, ts1) self.assertEqual(r1.upper, ts2) self.assertTrue(not r1.lower_inc) self.assertTrue(r1.upper_inc)
def test_cast_date(self): from psycopg2.extras import DateRange cur = self.conn.cursor() cur.execute("select '(2000-01-01,2012-12-31)'::daterange") r = cur.fetchone()[0] self.assert_(isinstance(r, DateRange)) self.assert_(not r.isempty) self.assertEqual(r.lower, date(2000, 1, 2)) self.assertEqual(r.upper, date(2012, 12, 31)) self.assert_(not r.lower_inf) self.assert_(not r.upper_inf) self.assert_(r.lower_inc) self.assert_(not r.upper_inc)
def test_adapt_date_range(self): from psycopg2.extras import DateRange, DateTimeRange, DateTimeTZRange from psycopg2.tz import FixedOffsetTimezone cur = self.conn.cursor() d1 = date(2012, 01, 01) d2 = date(2012, 12, 31) r = DateRange(d1, d2) cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] self.assert_(isinstance(r1, DateRange)) self.assertEqual(r1.lower, d1) self.assertEqual(r1.upper, d2) self.assert_(r1.lower_inc) self.assert_(not r1.upper_inc) r = DateTimeRange(empty=True) cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] self.assert_(isinstance(r1, DateTimeRange)) self.assert_(r1.isempty) ts1 = datetime(2000, 1, 1, tzinfo=FixedOffsetTimezone(600)) ts2 = datetime(2000, 12, 31, 23, 59, 59, 999, tzinfo=FixedOffsetTimezone(600)) r = DateTimeTZRange(ts1, ts2, '(]') cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] self.assert_(isinstance(r1, DateTimeTZRange)) self.assertEqual(r1.lower, ts1) self.assertEqual(r1.upper, ts2) self.assert_(not r1.lower_inc) self.assert_(r1.upper_inc)
def test_cast_date(self): from psycopg2.extras import DateRange cur = self.conn.cursor() cur.execute("select '(2000-01-01,2012-12-31)'::daterange") r = cur.fetchone()[0] self.assert_(isinstance(r, DateRange)) self.assert_(not r.isempty) self.assertEqual(r.lower, date(2000,1,2)) self.assertEqual(r.upper, date(2012,12,31)) self.assert_(not r.lower_inf) self.assert_(not r.upper_inf) self.assert_(r.lower_inc) self.assert_(not r.upper_inc)
def test_adapt_date_range(self): from psycopg2.extras import DateRange, DateTimeRange, DateTimeTZRange from psycopg2.tz import FixedOffsetTimezone cur = self.conn.cursor() d1 = date(2012, 01, 01) d2 = date(2012, 12, 31) r = DateRange(d1, d2) cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] self.assert_(isinstance(r1, DateRange)) self.assertEqual(r1.lower, d1) self.assertEqual(r1.upper, d2) self.assert_(r1.lower_inc) self.assert_(not r1.upper_inc) r = DateTimeRange(empty=True) cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] self.assert_(isinstance(r1, DateTimeRange)) self.assert_(r1.isempty) ts1 = datetime(2000,1,1, tzinfo=FixedOffsetTimezone(600)) ts2 = datetime(2000,12,31,23,59,59,999, tzinfo=FixedOffsetTimezone(600)) r = DateTimeTZRange(ts1, ts2, '(]') cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] self.assert_(isinstance(r1, DateTimeTZRange)) self.assertEqual(r1.lower, ts1) self.assertEqual(r1.upper, ts2) self.assert_(not r1.lower_inc) self.assert_(r1.upper_inc)
def get_queryset(self,): params = self.request.query_params showNulls = params.get('showNulls', None) sourceName = params.get('source_name', None) filteredFeatures = Feature.objects.all() if sourceName: filteredFeatures = filteredFeatures.filter(source_name=sourceName) defaultRange = 180 defaultStart = datetime.date.today() - datetime.timedelta(days=2) defaultEnd = defaultStart + datetime.timedelta(days=defaultRange) startDate = params.get('startDate', None) endDate = params.get('endDate', None) if startDate or endDate: if startDate and endDate: try: queryDateRange = DateRange(lower=startDate, upper=endDate) except DataError: queryDateRange = DateRange(lower=defaultStart, upper=defaultEnd) elif startDate: calcEnd = parser.parse(startDate) + datetime.timedelta(days=3) queryDateRange = DateRange(lower=startDate, upper=calcEnd) else: queryDateRange = DateRange(lower=defaultStart, upper=defaultEnd) filteredFeatures = filteredFeatures.filter(canonical_daterange__overlap=queryDateRange) elif not showNulls: filteredFeatures = filteredFeatures.exclude(canonical_daterange=None).exclude(canonical_daterange__isempty=True) print('featurecount', filteredFeatures.count()) return filteredFeatures
def get_queryset(self): params = self.request.query_params excludeStatuses = ['COMPLETED', 'COMPLETED', 'COMPLETE', 'DENIED', 'CANCELED'] minDist = int(params.get('distance', 100)) minDays = int(params.get('days', 14)) defaultStart = datetime.date.today() - datetime.timedelta(days=2) defaultEnd = defaultStart + datetime.timedelta(days=180) startDate = params.get('startDate', defaultStart.isoformat()) endDate = params.get('endDate', defaultEnd.isoformat()) queryDateRange = DateRange(lower=startDate, upper=endDate) excludeDateRange = DateRange(lower='1800-01-01', upper='2014-12-31') collisionGraph = cache.get('featureGraph') featureIDs = set() for u, v, d in collisionGraph.edges(data=True): if d['daysApart'] <= minDays and d['distance'] <= minDist: featureIDs = {u, v} | featureIDs filteredFeatures = Feature.objects\ .filter(pk__in=featureIDs)\ .filter(canonical_daterange__overlap=queryDateRange)\ .exclude(canonical_daterange__overlap=excludeDateRange)\ .exclude(canonical_status__in=excludeStatuses) print('featurecount', filteredFeatures.count()) return filteredFeatures
def get_queryset(self): params = self.request.query_params excludeStatuses = ['COMPLETED', 'COMPLETED', 'COMPLETE', 'DENIED', 'CANCELED'] minDist = params.get('distance', 100) minDays = params.get('days', 14) # queryDate = parser.parse(self.request.query_params['date']).date() # startDate = queryDate - datetime.timedelta(days=minDays) # endDate = queryDate + datetime.timedelta(days=minDays) # queryDateRange = DateRange(startDate, endDate) defaultStart = datetime.date.today() - datetime.timedelta(days=2) defaultEnd = defaultStart + datetime.timedelta(days=365) startDate = params.get('startDate', defaultStart.isoformat()) endDate = params.get('endDate', defaultEnd.isoformat()) queryDateRange = DateRange(lower=startDate, upper=endDate) excludeDateRange = DateRange(lower='1800-01-01', upper='2014-12-31') geoaddy = AddressGeocode.objects.using('geocoder').raw("SELECT g.rating, ST_X(g.geomout) AS lon, ST_Y(g.geomout) AS lat, pprint_addy(addy) AS address FROM geocode(%s) as g LIMIT 1", [self.request.query_params['address']])[0] queryPoint = GEOSGeometry('POINT({} {})'.format(geoaddy.lon, geoaddy.lat)) filteredFeatures = Feature.objects\ .filter(canonical_daterange__overlap=queryDateRange)\ .filter(geom__distance_lte=(queryPoint, D(m=minDist)))\ .exclude(canonical_daterange__overlap=excludeDateRange)\ .exclude(canonical_status__in=excludeStatuses) print('featurecount', filteredFeatures.count()) return filteredFeatures
def test_edit_on_double_wrapped(self, session): double_wrapped_session = temporal.temporal_session(session) t = models.SimpleTableTemporal( prop_a=1, prop_b='foo', prop_c=datetime.datetime(2016, 5, 11, 1, 2, 3, tzinfo=datetime.timezone.utc), prop_d={'foo': 'old value'}, prop_e=psql_extras.DateRange(datetime.date(2016, 1, 1), datetime.date(2016, 1, 10)), prop_f=['old', 'stuff'] ) double_wrapped_session.add(t) double_wrapped_session.commit() t = double_wrapped_session.query(models.SimpleTableTemporal).first() with t.clock_tick(): t.prop_a = 2 t.prop_b = 'bar' double_wrapped_session.commit() history_tables = { 'prop_a': temporal.get_history_model( models.SimpleTableTemporal.prop_a), 'prop_b': temporal.get_history_model( models.SimpleTableTemporal.prop_b), } for attr, history in history_tables.items(): clock_query = session.query(history) assert clock_query.count() == 2, \ "%r missing a history entry for initial value" % history recorded_history = clock_query[-1] assert 2 in recorded_history.vclock assert getattr(t, attr) == getattr(recorded_history, attr)
def test_cast_date(self): from psycopg2.extras import DateRange cur = self.conn.cursor() cur.execute("select '(2000-01-01,2012-12-31)'::daterange") r = cur.fetchone()[0] self.assertTrue(isinstance(r, DateRange)) self.assertTrue(not r.isempty) self.assertEqual(r.lower, date(2000,1,2)) self.assertEqual(r.upper, date(2012,12,31)) self.assertTrue(not r.lower_inf) self.assertTrue(not r.upper_inf) self.assertTrue(r.lower_inc) self.assertTrue(not r.upper_inc)
def test_adapt_date_range(self): from psycopg2.extras import DateRange, DateTimeRange, DateTimeTZRange from psycopg2.tz import FixedOffsetTimezone cur = self.conn.cursor() d1 = date(2012, 0o1, 0o1) d2 = date(2012, 12, 31) r = DateRange(d1, d2) cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] self.assertTrue(isinstance(r1, DateRange)) self.assertEqual(r1.lower, d1) self.assertEqual(r1.upper, d2) self.assertTrue(r1.lower_inc) self.assertTrue(not r1.upper_inc) r = DateTimeRange(empty=True) cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] self.assertTrue(isinstance(r1, DateTimeRange)) self.assertTrue(r1.isempty) ts1 = datetime(2000,1,1, tzinfo=FixedOffsetTimezone(600)) ts2 = datetime(2000,12,31,23,59,59,999, tzinfo=FixedOffsetTimezone(600)) r = DateTimeTZRange(ts1, ts2, '(]') cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] self.assertTrue(isinstance(r1, DateTimeTZRange)) self.assertEqual(r1.lower, ts1) self.assertEqual(r1.upper, ts2) self.assertTrue(not r1.lower_inc) self.assertTrue(r1.upper_inc)
def test_clock_tick_editing(self, session): clock_table = models.SimpleConcreteChildTemporalTable \ .temporal_options.clock_table t = models.SimpleConcreteChildTemporalTable( prop_a=1, prop_b='foo', prop_c=datetime.datetime(2016, 5, 11, 1, 2, 3, tzinfo=datetime.timezone.utc), prop_d={'foo': 'old value'}, prop_e=psql_extras.DateRange(datetime.date(2016, 1, 1), datetime.date(2016, 1, 10)), prop_f=['old', 'stuff'] ) session.add(t) session.commit() with t.clock_tick(): t.prop_a = 2 t.prop_b = 'bar' t.prop_c = datetime.datetime.now(tz=datetime.timezone.utc) t.prop_d['foo'] = 'new value' t.prop_e = psql_extras.DateRange(datetime.date(2016, 2, 1), datetime.date(2016, 2, 10)) t.prop_f = ['new', 'stuff'] session.commit() t = session.query(models.SimpleConcreteChildTemporalTable).first() clock_query = session.query(clock_table) assert clock_query.count() == 2 create_clock = clock_query.first() update_clock = clock_query.order_by( clock_table.timestamp.desc()).first() assert create_clock.timestamp == t.date_created assert update_clock.timestamp == t.date_modified assert t.vclock == 2 assert t.clock.count() == 2 clock = clock_query.order_by( models.SimpleConcreteChildTemporalTable .temporal_options.clock_table.tick.desc() ).first() for history_table in ( models.SimpleConcreteChildTemporalTable .temporal_options.history_tables.values()): clock_query = session.query(history_table).count() assert clock_query == 2 history = session.query(history_table) \ .order_by(history_table.vclock.desc()).first() assert clock.tick in history.vclock
def test_clock_tick_editing(self, session): clock_table = models.SimpleTableTemporal.temporal_options.clock_table t = models.SimpleTableTemporal( prop_a=1, prop_b='foo', prop_c=datetime.datetime(2016, 5, 11, 1, 2, 3, tzinfo=datetime.timezone.utc), prop_d={'foo': 'old value'}, prop_e=psql_extras.DateRange(datetime.date(2016, 1, 1), datetime.date(2016, 1, 10)), prop_f=['old', 'stuff'] ) session.add(t) session.commit() with t.clock_tick(): t.prop_a = 2 t.prop_b = 'bar' t.prop_c = datetime.datetime.now(tz=datetime.timezone.utc) t.prop_d['foo'] = 'new value' t.prop_e = psql_extras.DateRange(datetime.date(2016, 2, 1), datetime.date(2016, 2, 10)) t.prop_f = ['new', 'stuff'] session.commit() t = session.query(models.SimpleTableTemporal).first() clock_query = session.query(clock_table) assert clock_query.count() == 2 create_clock = clock_query.first() update_clock = clock_query.order_by( clock_table.timestamp.desc()).first() assert create_clock.timestamp == t.date_created assert update_clock.timestamp == t.date_modified assert t.vclock == 2 assert t.clock.count() == 2 clock = ( clock_query .order_by(models.SimpleTableTemporal .temporal_options.clock_table.tick.desc()) .first()) for history_table in (models.SimpleTableTemporal .temporal_options.history_tables.values()): clock_query = session.query(history_table).count() assert clock_query == 2 history = ( session.query(history_table) .order_by(history_table.vclock.desc()).first()) assert clock.tick in history.vclock