我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.datetime64()。
def _create_daily_stats(self, perfs): # create daily and cumulative stats dataframe daily_perfs = [] # TODO: the loop here could overwrite expected properties # of daily_perf. Could potentially raise or log a # warning. for perf in perfs: if 'daily_perf' in perf: perf['daily_perf'].update( perf['daily_perf'].pop('recorded_vars') ) perf['daily_perf'].update(perf['cumulative_risk_metrics']) daily_perfs.append(perf['daily_perf']) else: self.risk_report = perf daily_dts = [np.datetime64(perf['period_close'], utc=True) for perf in daily_perfs] daily_stats = pd.DataFrame(daily_perfs, index=daily_dts) return daily_stats
def drop_inconsistent_keys(self, columns, obj): """Drop inconsistent keys Drop inconsistent keys from a ValueCounts or Histogram object. :param list columns: columns key to retrieve desired datatypes :param object obj: ValueCounts or Histogram object to drop inconsistent keys from """ # has array been converted first? if so, set correct comparison # datatype comp_dtype = [] for col in columns: dt = np.dtype(self.var_dtype[col]).type() is_converted = isinstance( dt, np.number) or isinstance( dt, np.datetime64) if is_converted: comp_dtype.append(np.int64) else: comp_dtype.append(self.var_dtype[col]) # keep only keys of types in comp_dtype obj.remove_keys_of_inconsistent_type(prefered_key_type=comp_dtype) return obj
def categorize_columns(self, df): """Categorize columns of dataframe by data type :param df: input (pandas) data frame """ # check presence and data type of requested columns # sort columns into numerical, timestamp and category based for c in self.columns: for col in c: if col not in df.columns: raise KeyError('column "{0:s}" not in dataframe "{1:s}"'.format(col, self.read_key)) dt = self.get_data_type(df, col) if col not in self.var_dtype: self.var_dtype[col] = dt.type if (self.var_dtype[col] is np.string_) or (self.var_dtype[col] is np.object_): self.var_dtype[col] = str if not any(dt in types for types in (STRING_SUBSTR, NUMERIC_SUBSTR, TIME_SUBSTR)): raise TypeError('cannot process column "{0:s}" of data type "{1:s}"'.format(col, str(dt))) is_number = isinstance(dt.type(), np.number) is_timestamp = isinstance(dt.type(), np.datetime64) colset = self.num_cols if is_number else self.dt_cols if is_timestamp else self.str_cols if col not in colset: colset.append(col) self.log().debug('Data type of column "%s" is "%s"', col, self.var_dtype[col])
def _dtype(self, c): n = ':'.join(c) if n in self.var_dtype: return self.var_dtype[n] # ranking in order is: float, int, short, char elif any(self.var_dtype[col] == np.dtype(float) for col in c if col in self.var_dtype): return np.dtype(float) elif any(self.var_dtype[col] == np.dtype(int) for col in c if col in self.var_dtype): return np.dtype(int) elif any(self.var_dtype[col] == np.datetime64 for col in c if col in self.var_dtype): return np.dtype(int) elif any(self.var_dtype[col] == np.dtype('short') for col in c if col in self.var_dtype): return np.dtype('short') elif any(self.var_dtype[col] == np.dtype('byte') for col in c if col in self.var_dtype): return np.dtype('byte') elif any(self.var_dtype[col] == np.dtype(bool) for col in c if col in self.var_dtype): return np.dtype(bool) # default is float return self._default_dtype
def test_make_scale_and_datetimes(): def correct_scale(scale, name): return scale.__class__.__name__ == name # cpython x = pd.Series([datetime(year, 1, 1) for year in [2010, 2026, 2015]]) assert correct_scale(make_scale('x', x), 'scale_x_datetime') assert correct_scale(make_scale('color', x), 'scale_color_datetime') assert correct_scale(make_scale('fill', x), 'scale_fill_datetime') assert correct_scale(make_scale('size', x), 'scale_size_datetime') assert correct_scale(make_scale('alpha', x), 'scale_alpha_datetime') # numpy x = pd.Series([np.datetime64(i*10, 'D') for i in range(1, 10)]) assert correct_scale(make_scale('x', x), 'scale_x_datetime') assert correct_scale(make_scale('color', x), 'scale_color_datetime') assert correct_scale(make_scale('fill', x), 'scale_fill_datetime') assert correct_scale(make_scale('size', x), 'scale_size_datetime') assert correct_scale(make_scale('alpha', x), 'scale_alpha_datetime')
def get_price_yahoo(self,ticker): try: if Cache.data_request_delay != None: time.sleep(Cache.data_request_delay) base_url = "http://ichart.finance.yahoo.com/table.csv?s=" file_name = os.path.dirname(__file__) + "\\data\\"+ "yahoo_"+ticker+"_"+str(Pricing_Database.current_date) urllib.request.urlretrieve(base_url+ticker, file_name) df = pd.read_csv(file_name, index_col=False, header=0) df['Open'] = df['Open']*df['Adj Close']/df['Close'] df['High'] = df['High']*df['Adj Close']/df['Close'] df['Low'] = df['Low']*df['Adj Close']/df['Close'] df['Close'] = df['Adj Close'] df['Date'] = df['Date'].apply(lambda x:np.datetime64(x)) df = df[['Date','Open', 'High', 'Low', 'Close', 'Volume']] df = df.set_index(['Date']) df.dropna(inplace=True) df = df.iloc[::-1] return df except Exception as e: print("YAHOO/"+ticker) raise e
def __init__(self, x, y, t): try: if (len(x) != len(y) or len(y) != len(t)): raise Exception('Os arrays x, y e t precisam ser do mesmo tamanho') except: raise Exception('Os atributos x, y e t precisam ser um arrays') self.unit = 's' self.x = np.array(x, dtype='f8') self.y = np.array(y, dtype='f8') self.t = np.array(t, dtype='datetime64[{}]'.format(self.unit)) self.seconds = (self.t - np.datetime64("1970-01-01T00:00:00")) / np.timedelta64(1, 's') self._t = {str(v): i for i, v in enumerate(t)}
def test_scalar_none_comparison(self): # Scalars should still just return False and not give a warnings. # The comparisons are flagged by pep8, ignore that. with warnings.catch_warnings(record=True) as w: warnings.filterwarnings('always', '', FutureWarning) assert_(not np.float32(1) == None) assert_(not np.str_('test') == None) # This is dubious (see below): assert_(not np.datetime64('NaT') == None) assert_(np.float32(1) != None) assert_(np.str_('test') != None) # This is dubious (see below): assert_(np.datetime64('NaT') != None) assert_(len(w) == 0) # For documentation purposes, this is why the datetime is dubious. # At the time of deprecation this was no behaviour change, but # it has to be considered when the deprecations are done. assert_(np.equal(np.datetime64('NaT'), None))
def test_datetime_nat_casting(self): a = np.array('NaT', dtype='M8[D]') b = np.datetime64('NaT', '[D]') # Arrays assert_equal(a.astype('M8[s]'), np.array('NaT', dtype='M8[s]')) assert_equal(a.astype('M8[ms]'), np.array('NaT', dtype='M8[ms]')) assert_equal(a.astype('M8[M]'), np.array('NaT', dtype='M8[M]')) assert_equal(a.astype('M8[Y]'), np.array('NaT', dtype='M8[Y]')) assert_equal(a.astype('M8[W]'), np.array('NaT', dtype='M8[W]')) # Scalars -> Scalars assert_equal(np.datetime64(b, '[s]'), np.datetime64('NaT', '[s]')) assert_equal(np.datetime64(b, '[ms]'), np.datetime64('NaT', '[ms]')) assert_equal(np.datetime64(b, '[M]'), np.datetime64('NaT', '[M]')) assert_equal(np.datetime64(b, '[Y]'), np.datetime64('NaT', '[Y]')) assert_equal(np.datetime64(b, '[W]'), np.datetime64('NaT', '[W]')) # Arrays -> Scalars assert_equal(np.datetime64(a, '[s]'), np.datetime64('NaT', '[s]')) assert_equal(np.datetime64(a, '[ms]'), np.datetime64('NaT', '[ms]')) assert_equal(np.datetime64(a, '[M]'), np.datetime64('NaT', '[M]')) assert_equal(np.datetime64(a, '[Y]'), np.datetime64('NaT', '[Y]')) assert_equal(np.datetime64(a, '[W]'), np.datetime64('NaT', '[W]'))
def test_pydatetime_creation(self): a = np.array(['1960-03-12', datetime.date(1960, 3, 12)], dtype='M8[D]') assert_equal(a[0], a[1]) a = np.array(['1999-12-31', datetime.date(1999, 12, 31)], dtype='M8[D]') assert_equal(a[0], a[1]) a = np.array(['2000-01-01', datetime.date(2000, 1, 1)], dtype='M8[D]') assert_equal(a[0], a[1]) # Will fail if the date changes during the exact right moment a = np.array(['today', datetime.date.today()], dtype='M8[D]') assert_equal(a[0], a[1]) # datetime.datetime.now() returns local time, not UTC #a = np.array(['now', datetime.datetime.now()], dtype='M8[s]') #assert_equal(a[0], a[1]) # we can give a datetime.date time units assert_equal(np.array(datetime.date(1960, 3, 12), dtype='M8[s]'), np.array(np.datetime64('1960-03-12T00:00:00')))
def test_datetime_y2038(self): # Test parsing on either side of the Y2038 boundary a = np.datetime64('2038-01-19T03:14:07') assert_equal(a.view(np.int64), 2**31 - 1) a = np.datetime64('2038-01-19T03:14:08') assert_equal(a.view(np.int64), 2**31) # Test parsing on either side of the Y2038 boundary with # a manually specified timezone offset with assert_warns(DeprecationWarning): a = np.datetime64('2038-01-19T04:14:07+0100') assert_equal(a.view(np.int64), 2**31 - 1) with assert_warns(DeprecationWarning): a = np.datetime64('2038-01-19T04:14:08+0100') assert_equal(a.view(np.int64), 2**31) # Test parsing a date after Y2038 a = np.datetime64('2038-01-20T13:21:14') assert_equal(str(a), '2038-01-20T13:21:14')
def format_time(x): """Formats date values This function formats :class:`datetime.datetime` and :class:`datetime.timedelta` objects (and the corresponding numpy objects) using the :func:`xarray.core.formatting.format_timestamp` and the :func:`xarray.core.formatting.format_timedelta` functions. Parameters ---------- x: object The value to format. If not a time object, the value is returned Returns ------- str or `x` Either the formatted time object or the initial `x`""" if isinstance(x, (datetime64, datetime)): return format_timestamp(x) elif isinstance(x, (timedelta64, timedelta)): return format_timedelta(x) elif isinstance(x, ndarray): return list(x) if x.ndim else x[()] return x
def _parase_fq_factor(code, start, end): symbol = _code_to_symbol(code) request = Request(ct.HIST_FQ_FACTOR_URL%(ct.P_TYPE['http'], ct.DOMAINS['vsf'], symbol)) text = urlopen(request, timeout=10).read() text = text[1:len(text)-1] text = text.replace('{_', '{"') text = text.replace('total', '"total"') text = text.replace('data', '"data"') text = text.replace(':"', '":"') text = text.replace('",_', '","') text = text.replace('_', '-') text = json.loads(text) df = pd.DataFrame({'date':list(text['data'].keys()), 'factor':list(text['data'].values())}) df['date'] = df['date'].map(_fun_except) # for null case if df['date'].dtypes == np.object: df['date'] = df['date'].astype(np.datetime64) df = df.drop_duplicates('date') df['factor'] = df['factor'].astype(float) return df
def test_netcdf_monitor_single_time_all_vars(): try: assert not os.path.isfile('out.nc') monitor = NetCDFMonitor('out.nc') monitor.store(state) assert not os.path.isfile('out.nc') # not set to write on store monitor.write() assert os.path.isfile('out.nc') with xr.open_dataset('out.nc') as ds: assert len(ds.data_vars.keys()) == 2 assert 'air_temperature' in ds.data_vars.keys() assert ds.data_vars['air_temperature'].attrs['units'] == 'degK' assert tuple(ds.data_vars['air_temperature'].shape) == (1, nx, ny, nz) assert 'air_pressure' in ds.data_vars.keys() assert ds.data_vars['air_pressure'].attrs['units'] == 'Pa' assert tuple(ds.data_vars['air_pressure'].shape) == (1, nx, ny, nz) assert len(ds['time']) == 1 assert ds['time'][0] == np.datetime64(state['time']) finally: # make sure we remove the output file if os.path.isfile('out.nc'): os.remove('out.nc')
def test_too_late_start() -> None: """Model start after last release in file""" config = { 'start_time': np.datetime64('2015-05-02 12'), 'stop_time': np.datetime64('2015-05-03 12'), 'particle_release_file': 'release.rls', 'release_format': ['mult', 'release_time', 'X'], 'release_dtype': dict(mult=int, release_time=np.datetime64, X=float), 'release_type': 'discrete', 'dt': 3600, 'particle_variables': [] } # Make a release file with open('release.rls', mode='w') as f: f.write('2 2015-04-01 100\n') # Release should quit with SystemExit with pytest.raises(SystemExit): ParticleReleaser(config) # Clean up os.remove('release.rls')
def test_filter_date(): t = Table() t.a = np.random.rand(10) t.b = pd.date_range('2000-01-01', freq='D', periods=10) t.c = np.array([1, 2]) t.add_column('d', np.array([1, 2]), align='bottom') thres1 = np.array(['2000-01-03'], dtype=np.datetime64) thres2 = np.array(['2000-01-05'], dtype=np.datetime64) t1 = t.filter(t.b >= thres1) assert np.all(t1.c.values == np.array([])) assert np.all(t1.d.values == np.array([1, 2])) assert np.all(t1.a.values == t.a.values[2:]) t1 = t.filter((t.b >= thres1) & (t.b <= thres2)) assert np.all(t1.c.values == np.array([])) assert np.all(t1.d.values == np.array([])) assert np.all(t1.a.values == t.a.values[2:5]) t1 = t.filter(t.b.date_range(fr=thres1, to=thres2)) assert np.all(t1.c.values == np.array([])) assert np.all(t1.d.values == np.array([])) assert np.all(t1.a.values == t.a.values[2:5])
def __init__(self): self.ipo=ts.new_stocks() #print ipo.info() #???? self.ipo['ipo_date']=self.ipo['ipo_date'].astype('datetime64') #print ipo.info() self.start=self.ipo['ipo_date'].values[-1] self.end=self.ipo['ipo_date'].values[0] print type(self.end) #???? #ipo['ipo_date']=ipo['ipo_date'].astype('datetime64') #self.start_d=datetime.datetime.strptime(self.start,'%Y-%m-%d') #self.end_d=datetime.datetime.strptime(self.end,'%Y-%m-%d') #print type(self.start_d) #period=self.start_d+datetime.timedelta(days=30) #print period.strftime('%Y-%m-%d') #print ipo[ipo['ipo_date']<np.datetime64(period)]
def how_to_use(): # Set date range for training/test data train_start_date = np.datetime64("2012-01-01") train_end_date = np.datetime64("2015-12-31") test_start_date = np.datetime64("2016-01-01") test_end_date = np.datetime64("2016-08-31") # Doanload latest data stockdata = StockData() stockdata.download() # How to train model = NikkeiModel([], "YourModelName") model.prepare_training_data(train_start_date, train_end_date) model.train() # How to evaluate model.prepare_test_data(test_start_date, test_end_date) model.evaluate() model.backtest() # How to predict n225_open = 16500 # Today's N225 Open value model.predict(n225_open, np.datetime64("today"), downloadData=False)
def unique1d(values): """ Hash table-based unique """ if np.issubdtype(values.dtype, np.floating): table = _hash.Float64HashTable(len(values)) uniques = np.array(table.unique(_ensure_float64(values)), dtype=np.float64) elif np.issubdtype(values.dtype, np.datetime64): table = _hash.Int64HashTable(len(values)) uniques = table.unique(_ensure_int64(values)) uniques = uniques.view('M8[ns]') elif np.issubdtype(values.dtype, np.timedelta64): table = _hash.Int64HashTable(len(values)) uniques = table.unique(_ensure_int64(values)) uniques = uniques.view('m8[ns]') elif np.issubdtype(values.dtype, np.integer): table = _hash.Int64HashTable(len(values)) uniques = table.unique(_ensure_int64(values)) else: table = _hash.PyObjectHashTable(len(values)) uniques = table.unique(_ensure_object(values)) return uniques
def _astype(self, dtype, mgr=None, **kwargs): """ these automatically copy, so copy=True has no effect raise on an except if raise == True """ # if we are passed a datetime64[ns, tz] if com.is_datetime64tz_dtype(dtype): dtype = DatetimeTZDtype(dtype) values = self.values if getattr(values, 'tz', None) is None: values = DatetimeIndex(values).tz_localize('UTC') values = values.tz_convert(dtype.tz) return self.make_block(values) # delegate return super(DatetimeBlock, self)._astype(dtype=dtype, **kwargs)
def _infer_fill_value(val): """ infer the fill value for the nan/NaT from the provided scalar/ndarray/list-like if we are a NaT, return the correct dtyped element to provide proper block construction """ if not is_list_like(val): val = [val] val = np.array(val, copy=False) if is_datetimelike(val): return np.array('NaT', dtype=val.dtype) elif is_object_dtype(val.dtype): dtype = lib.infer_dtype(_ensure_object(val)) if dtype in ['datetime', 'datetime64']: return np.array('NaT', dtype=_NS_DTYPE) elif dtype in ['timedelta', 'timedelta64']: return np.array('NaT', dtype=_TD_DTYPE) return np.nan
def test_normalize(self): rng = date_range('1/1/2000 9:30', periods=10, freq='D') result = rng.normalize() expected = date_range('1/1/2000', periods=10, freq='D') self.assertTrue(result.equals(expected)) rng_ns = pd.DatetimeIndex(np.array([1380585623454345752, 1380585612343234312]).astype( "datetime64[ns]")) rng_ns_normalized = rng_ns.normalize() expected = pd.DatetimeIndex(np.array([1380585600000000000, 1380585600000000000]).astype( "datetime64[ns]")) self.assertTrue(rng_ns_normalized.equals(expected)) self.assertTrue(result.is_normalized) self.assertFalse(rng.is_normalized)
def test_timestamp_compare_scalars(self): # case where ndim == 0 lhs = np.datetime64(datetime(2013, 12, 6)) rhs = Timestamp('now') nat = Timestamp('nat') ops = {'gt': 'lt', 'lt': 'gt', 'ge': 'le', 'le': 'ge', 'eq': 'eq', 'ne': 'ne'} for left, right in ops.items(): left_f = getattr(operator, left) right_f = getattr(operator, right) expected = left_f(lhs, rhs) result = right_f(rhs, lhs) self.assertEqual(result, expected) expected = left_f(rhs, nat) result = right_f(nat, rhs) self.assertEqual(result, expected)
def test_barely_oob_dts(self): one_us = np.timedelta64(1).astype('timedelta64[us]') # By definition we can't go out of bounds in [ns], so we # convert the datetime64s to [us] so we can go out of bounds min_ts_us = np.datetime64(Timestamp.min).astype('M8[us]') max_ts_us = np.datetime64(Timestamp.max).astype('M8[us]') # No error for the min/max datetimes Timestamp(min_ts_us) Timestamp(max_ts_us) # One us less than the minimum is an error self.assertRaises(ValueError, Timestamp, min_ts_us - one_us) # One us more than the maximum is an error self.assertRaises(ValueError, Timestamp, max_ts_us + one_us)
def datetime(x): """ Helper function to convert list of string objects to np.datetime64 objects. """ return np.array(x, dtype=np.datetime64)
def process_line(line): dt = np.datetime64(line["dt"]).astype(np.int64) sid = line["sid"] open_p = float(line["open"]) high_p = float(line["high"]) low_p = float(line["low"]) close_p = float(line["close"]) volume = int(line["volume"]) return (dt, sid, open_p, high_p, low_p, close_p, volume)
def getIntervals(data,sat_num,maxgap=3,maxjump=1.2): """ scans through the phase tec of a satellite and determines where "good" intervals begin and end inputs: data - Panel4D with dimensions (parameter,satellite number,time,data/lli/ssi) sat_num - the number of the satellite to get good intervals for maxgap - maximum number of nans before starting new interval maxjump - maximum jump in phase TEC before starting new interval output: intervals - list of 2-tuples, beginning and end of each "good" interval as a Pandas/numpy datetime64 """ if c2p2(data,sat_num): finite_values = np.where(np.logical_and.reduce(( np.isfinite(data[['L1','L2','C1','C2'],sat_num,:,'data']).T)))[0] else: finite_values = np.where(np.logical_and.reduce(( np.isfinite(data[['L1','L2','C1','P2'],sat_num,:,'data']).T)))[0] intervals=[] if len(finite_values)==0: return intervals phase_tec=2.85E9*(data['L1',sat_num,:,'data']/f1-data['L2',sat_num,:,'data']/f2) beginning=finite_values[0] last=finite_values[0] for i in finite_values[1:]: if i-last>maxgap or abs(phase_tec[i]-phase_tec[last])>maxjump: intervals.append((beginning,last)) beginning=i last=i if i==finite_values[-1]: intervals.append((beginning,last)) intervals=[(data.major_axis[time[0]],data.major_axis[time[1]]) for time in intervals] return intervals
def getTec(data,sat_num,data_interval,satbias=None): """ calculates slant TEC using phase tec shifted by the median difference between phase tec and pseudorange tec inputs: data - Panel4D with dimensions (parameter,satellite number,time,data/lli/ssi) sat_num - the number of the satellite to calculate TEC for data_interval - the interval made from getInterval(), it's a 2-tuple marking the beginning and the end of a "good" interval of data, each value is a Pandas/numpy datetime64 """ if c2p2(data,sat_num,data_interval): range_tec = (2.85E9/3.0E8)*( data['C2',sat_num,data_interval[0]:data_interval[1],'data'] -data['C1',sat_num,data_interval[0]:data_interval[1],'data']) else: range_tec = (2.85E9/3.0E8)*( data['P2',sat_num,data_interval[0]:data_interval[1],'data'] -data['C1',sat_num,data_interval[0]:data_interval[1],'data']) phase_tec=2.85E9*(data['L1',sat_num,data_interval[0]:data_interval[1],'data']/f1 -data['L2',sat_num,data_interval[0]:data_interval[1],'data']/f2) tec_difference = np.array(sorted(phase_tec-range_tec)) tec_difference = tec_difference[np.isfinite(tec_difference)] median_difference = tec_difference[int(len(tec_difference)/2)] difference_width = tec_difference[int(len(tec_difference)*.75)]-tec_difference[int(len(tec_difference)*.25)] median_error = difference_width/np.sqrt(len(tec_difference)) tec = phase_tec - median_difference return tec,median_error
def get_data_type(self, df, col): """Get data type of dataframe column :param df: input data frame :param str col: column """ if col not in df.columns: raise KeyError('column "{0:s}" not in input dataframe'.format(col)) dt = dict(df.dtypes)[col] # spark conversions to numpy or python equivalent if dt == 'string': dt = 'str' elif dt == 'timestamp': dt = np.datetime64 return np.dtype(dt)
def process_columns(self, df): """Process columns before histogram filling Specifically, convert timestamp columns to integers and numeric variables are converted to indices :param df: input (pandas) data frame :returns: output (pandas) data frame with converted timestamp columns :rtype: pandas DataFrame """ # timestamp variables are converted to ns here # make temp df for value counting (used below) idf = df[self.str_cols].copy(deep=False) for col in self.dt_cols: self.log().debug('Converting column "%s" of type "%s" to nanosec', col, self.var_dtype[col]) idf[col] = df[col].apply(hf.to_ns) # numerical variables are converted to indices here for col in self.num_cols + self.dt_cols: self.log().debug('Converting column "%s" of type "%s" to index', col, self.var_dtype[col]) # find column specific bin_specs. if not found, use dict of default # values. dt = df[col].dtype is_number = isinstance(dt.type(), np.number) is_timestamp = isinstance(dt.type(), np.datetime64) sf = idf if is_timestamp else df bin_specs = self.bin_specs.get(col, self._unit_bin_specs if is_number else self._unit_timestamp_specs) idf[col] = sf[col].apply(hf.value_to_bin_index, **bin_specs) return idf
def get_col_props(var_type): """Get column properties :returns dict: Column properties """ npdtype = np.dtype(var_type) # determine data-type categories is_int = isinstance(npdtype.type(), np.integer) is_ts = isinstance(npdtype.type(), np.datetime64) is_num = is_ts or isinstance(npdtype.type(), np.number) return dict(dtype=npdtype, is_num=is_num, is_int=is_int, is_ts=is_ts)
def to_date_time(val): """Convert input to numpy.datetime64 :param val: value to be evaluated :returns: evaluated value :rtype: numpy.datetime64 """ return pd.to_datetime(val, errors='coerce')
def __getitem__(self, t): t = np.datetime64(t, unit=self.unit) try: i = self._t[str(t)] return self.x[i], self.y[i] except KeyError: # interpolate? pass
def __setitem__(self, t, point): t = np.datetime64(t, unit=self.unit) try: i = self._t[str(t)] self.x[i] = point[0] self.y[i] = point[1] except KeyError: # interpolate? pass
def __to_Timestamp__(self, time): return time * np.timedelta64(1, 's') + np.datetime64("1970-01-01 00:00:00")
def after(self, t): t = np.datetime64(t, unit=self.unit) mask = self.t > t if(np.all(mask == False, axis = 0)): return None result = Trajectory(self.x[mask], self.y[mask], self.t[mask]) return result
def during(self, t1, t2): t1 = np.datetime64(t1, unit=self.unit) t2 = np.datetime64(t2, unit=self.unit) mask = ((self.t > t1) & (self.t < t2)) if(np.all(mask == False, axis = 0)): return None result = Trajectory(self.x[mask], self.y[mask], self.t[mask]) return result
def test_intersection_sem_mock_do_test_2(self): poly = Polygon([(1, 1), (1, 3), (4, 3), (4, 1), (1, 1)]) response = self.traj2.intersection_shapely(poly) traj = self.traj2.to_Trajectory(response) time = np.datetime64('2000-02-01T00:01:00') seconds = (time - np.datetime64("1970-01-01 00:00:00")) / np.timedelta64(1, 's') assert (np.array_equal(traj.getTime()[0], seconds)) assert (np.array_equal(traj.getTime()[1], seconds)) assert (np.array_equal(traj.getTime()[2], seconds))
def test_startTime(self): dateTest = dt.datetime(year=2000, month=1, day=1, minute=1) dateTestNumPy = np.array(dateTest, dtype='datetime64[{}]'.format('s')) dateFalse = dt.datetime(year=2000, month=1, day=2, hour=1) self.assertTrue(np.array_equal(self.traj.begins(), dateTestNumPy)) self.assertNotIsInstance(self.traj.begins(), dt.datetime) self.assertNotEqual(self.traj.begins(), dateFalse) self.assertEqual(self.traj.begins(), dateTestNumPy)
def test_endTime(self): dateTest = dt.datetime(year=2000, month=4, day=1, minute=1) dateTestNumPy = np.array(dateTest, dtype='datetime64[{}]'.format('s')) dateFalse = dt.datetime(year=2000, month=3, day=2, hour=1) self.assertTrue(self.traj.ends(), dateTest) self.assertNotIsInstance(self.traj.ends(), dt.datetime) self.assertNotEqual(self.traj.ends(), dateFalse) self.assertEqual(self.traj.ends(), dateTestNumPy)
def test_datetime(self): a = np.array([0,0,0], dtype='datetime64[D]') b = np.array([2,1,0], dtype='datetime64[D]') idx = np.lexsort((b, a)) expected_idx = np.array([2, 1, 0]) assert_array_equal(idx, expected_idx) a = np.array([0,0,0], dtype='timedelta64[D]') b = np.array([2,1,0], dtype='timedelta64[D]') idx = np.lexsort((b, a)) expected_idx = np.array([2, 1, 0]) assert_array_equal(idx, expected_idx)
def test_string(self): self.assert_deprecated(np.datetime64, args=('2000-01-01T00+01',)) self.assert_deprecated(np.datetime64, args=('2000-01-01T00Z',))
def test_datetime(self): tz = pytz.timezone('US/Eastern') dt = datetime.datetime(2000, 1, 1, 0, 0, tzinfo=tz) self.assert_deprecated(np.datetime64, args=(dt,))
def test_datetime_dtype_creation(self): for unit in ['Y', 'M', 'W', 'D', 'h', 'm', 's', 'ms', 'us', 'ns', 'ps', 'fs', 'as']: dt1 = np.dtype('M8[750%s]' % unit) assert_(dt1 == np.dtype('datetime64[750%s]' % unit)) dt2 = np.dtype('m8[%s]' % unit) assert_(dt2 == np.dtype('timedelta64[%s]' % unit)) # Generic units shouldn't add [] to the end assert_equal(str(np.dtype("M8")), "datetime64") # Should be possible to specify the endianness assert_equal(np.dtype("=M8"), np.dtype("M8")) assert_equal(np.dtype("=M8[s]"), np.dtype("M8[s]")) assert_(np.dtype(">M8") == np.dtype("M8") or np.dtype("<M8") == np.dtype("M8")) assert_(np.dtype(">M8[D]") == np.dtype("M8[D]") or np.dtype("<M8[D]") == np.dtype("M8[D]")) assert_(np.dtype(">M8") != np.dtype("<M8")) assert_equal(np.dtype("=m8"), np.dtype("m8")) assert_equal(np.dtype("=m8[s]"), np.dtype("m8[s]")) assert_(np.dtype(">m8") == np.dtype("m8") or np.dtype("<m8") == np.dtype("m8")) assert_(np.dtype(">m8[D]") == np.dtype("m8[D]") or np.dtype("<m8[D]") == np.dtype("m8[D]")) assert_(np.dtype(">m8") != np.dtype("<m8")) # Check that the parser rejects bad datetime types assert_raises(TypeError, np.dtype, 'M8[badunit]') assert_raises(TypeError, np.dtype, 'm8[badunit]') assert_raises(TypeError, np.dtype, 'M8[YY]') assert_raises(TypeError, np.dtype, 'm8[YY]') assert_raises(TypeError, np.dtype, 'm4') assert_raises(TypeError, np.dtype, 'M7') assert_raises(TypeError, np.dtype, 'm7') assert_raises(TypeError, np.dtype, 'M16') assert_raises(TypeError, np.dtype, 'm16')
def test_datetime_scalar_construction_timezone(self): # verify that supplying an explicit timezone works, but is deprecated with assert_warns(DeprecationWarning): assert_equal(np.datetime64('2000-01-01T00Z'), np.datetime64('2000-01-01T00')) with assert_warns(DeprecationWarning): assert_equal(np.datetime64('2000-01-01T00-08'), np.datetime64('2000-01-01T08'))
def test_datetime_array_find_type(self): dt = np.datetime64('1970-01-01', 'M') arr = np.array([dt]) assert_equal(arr.dtype, np.dtype('M8[M]')) # at the moment, we don't automatically convert these to datetime64 dt = datetime.date(1970, 1, 1) arr = np.array([dt]) assert_equal(arr.dtype, np.dtype('O')) dt = datetime.datetime(1970, 1, 1, 12, 30, 40) arr = np.array([dt]) assert_equal(arr.dtype, np.dtype('O')) # find "supertype" for non-dates and dates b = np.bool_(True) dt = np.datetime64('1970-01-01', 'M') arr = np.array([b, dt]) assert_equal(arr.dtype, np.dtype('O')) dt = datetime.date(1970, 1, 1) arr = np.array([b, dt]) assert_equal(arr.dtype, np.dtype('O')) dt = datetime.datetime(1970, 1, 1, 12, 30, 40) arr = np.array([b, dt]) assert_equal(arr.dtype, np.dtype('O'))