我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用arrow.Arrow()。
def convert_timestamp(obj, default_tz='America/Chicago'): """Makes a Bark timestamp from an object. If the object is not timezone-aware, the timezone is set to be `default_tz.` Args: obj: time object; see :meth:`timestamp_to_datetime` for supported types default_tz (str): timezone to use if `obj` is timezone-naive; must be a string from the `tz database <https://en.wikipedia.org/wiki/List_of_tz_database_time_zones>`_. Returns: Arrow: Bark timestamp """ dt = timestamp_to_datetime(obj) if dt.tzinfo: return arrow.get(dt).isoformat() else: return arrow.get(dt, default_tz).isoformat()
def test_process_basic_information_sanity_checks(device): pprint.pprint(device) for key in [ 'model', 'name', 'platform', 'serial', 'source', 'last_sync', 'software', ]: assert key in device assert isinstance(device['last_sync'], arrow.Arrow) assert device['source'] == 'jamf'
def local_datetime(utcdatetime, format=None, timezone=None): """ Return local datetime based on the timezone It will automatically format the date. To not format the date, set format=False :param utcdatetime: Arrow or string :param format: string of format or False :param timezone: string, ie: US/Eastern :return: """ if utcdatetime is None: return None timezone = timezone or config("DATETIME_TIMEZONE", "US/Eastern") dt = utcdatetime.to(timezone) \ if isinstance(utcdatetime, arrow.Arrow) \ else arrow.get(utcdatetime, timezone) if format is False: return dt _ = config("DATETIME_FORMAT") format = _.get("default") or "MM/DD/YYYY" if not format else _.get(format) return dt.format(format)
def test_is_weekday(self): mon = arrow.Arrow(2017, 8, 14).weekday() tue = arrow.Arrow(2017, 8, 15).weekday() wed = arrow.Arrow(2017, 8, 16).weekday() thu = arrow.Arrow(2017, 8, 17).weekday() fri = arrow.Arrow(2017, 8, 18).weekday() sat = arrow.Arrow(2017, 8, 19).weekday() sun = arrow.Arrow(2017, 8, 20).weekday() self.assertEqual(mon < 5, True) self.assertEqual(tue < 5, True) self.assertEqual(wed < 5, True) self.assertEqual(thu < 5, True) self.assertEqual(fri < 5, True) self.assertEqual(sat < 5, False) self.assertEqual(sun < 5, False)
def test_create_fails_if_pending_account_has_same_email(): """ Test checks to see if an email is tied to a pending account creation entry in logincreate. If so, login.create() will not permit another account to be made for the same address. """ d.engine.execute(d.meta.tables["logincreate"].insert(), { "token": 40 * "a", "username": "existing", "login_name": "existing", "hashpass": login.passhash(raw_password), "email": email_addr, "birthday": arrow.Arrow(2000, 1, 1), "unixtime": arrow.now(), }) form = Bag(username="test", password='0123456789', passcheck='0123456789', email=email_addr, emailcheck=email_addr, day='12', month='12', year=arrow.now().year - 19) with pytest.raises(WeasylError) as err: login.create(form) assert 'emailExists' == err.value.value
def timestamp_to_datetime(obj): """Converts an object to a `datetime.datetime` object. Note that because floating point values are approximate, some conversions may not be reversible. Args: obj: may be any of the following: 1. `datetime.datetime` object 2. Arrow object 3. ISO 8601 string 4. `time.struct_time` object 5. integer (epoch time) 6. float (epoch time) 7. 2-tuple of integers (seconds and microseconds, epoch time) Returns: datetime.datetime: (local) timezone-aware object Raises: TypeError: if `obj` cannot be interpreted according to the list above """ import numbers from time import mktime, struct_time if isinstance(obj, datetime): dt = obj elif isinstance(obj, arrow.Arrow): dt = obj.datetime elif isinstance(obj, str): dt = arrow.get(obj).datetime elif isinstance(obj, struct_time): dt = mktime(obj) elif isinstance(obj, numbers.Number): dt = datetime.fromtimestamp(obj) elif hasattr(obj, '__getitem__'): dt = datetime.fromtimestamp(obj[0]) dt += timedelta(microseconds=int(obj[1])) else: raise TypeError("unable to convert %s to timestamp" % obj) return dt
def _safe_gremlin_argument(expected_type, argument_value): """Return a Gremlin string representing the given argument value.""" if GraphQLString.is_same_type(expected_type): return _safe_gremlin_string(argument_value) elif GraphQLID.is_same_type(expected_type): # IDs can be strings or numbers, but the GraphQL library coerces them to strings. # We will follow suit and treat them as strings. if not isinstance(argument_value, six.string_types): if isinstance(argument_value, bytes): # should only happen in py3 argument_value = argument_value.decode('utf-8') else: argument_value = six.text_type(argument_value) return _safe_gremlin_string(argument_value) elif GraphQLFloat.is_same_type(expected_type): return represent_float_as_str(argument_value) elif GraphQLInt.is_same_type(expected_type): # Special case: in Python, isinstance(True, int) returns True. # Safeguard against this with an explicit check against bool type. if isinstance(argument_value, bool): raise GraphQLInvalidArgumentError(u'Attempting to represent a non-int as an int: ' u'{}'.format(argument_value)) return type_check_and_str(int, argument_value) elif GraphQLBoolean.is_same_type(expected_type): return type_check_and_str(bool, argument_value) elif GraphQLDate.is_same_type(expected_type): return _safe_gremlin_date_and_datetime(expected_type, (datetime.date,), argument_value) elif GraphQLDateTime.is_same_type(expected_type): return _safe_gremlin_date_and_datetime(expected_type, (datetime.datetime, arrow.Arrow), argument_value) elif isinstance(expected_type, GraphQLList): return _safe_gremlin_list(expected_type.of_type, argument_value) else: raise AssertionError(u'Could not safely represent the requested GraphQL type: ' u'{} {}'.format(expected_type, argument_value)) ###### # Public API ######
def _safe_match_argument(expected_type, argument_value): """Return a MATCH (SQL) string representing the given argument value.""" if GraphQLString.is_same_type(expected_type): return _safe_match_string(argument_value) elif GraphQLID.is_same_type(expected_type): # IDs can be strings or numbers, but the GraphQL library coerces them to strings. # We will follow suit and treat them as strings. if not isinstance(argument_value, six.string_types): if isinstance(argument_value, bytes): # should only happen in py3 argument_value = argument_value.decode('utf-8') else: argument_value = six.text_type(argument_value) return _safe_match_string(argument_value) elif GraphQLFloat.is_same_type(expected_type): return represent_float_as_str(argument_value) elif GraphQLInt.is_same_type(expected_type): # Special case: in Python, isinstance(True, int) returns True. # Safeguard against this with an explicit check against bool type. if isinstance(argument_value, bool): raise GraphQLInvalidArgumentError(u'Attempting to represent a non-int as an int: ' u'{}'.format(argument_value)) return type_check_and_str(int, argument_value) elif GraphQLBoolean.is_same_type(expected_type): return type_check_and_str(bool, argument_value) elif GraphQLDate.is_same_type(expected_type): return _safe_match_date_and_datetime(expected_type, (datetime.date,), argument_value) elif GraphQLDateTime.is_same_type(expected_type): return _safe_match_date_and_datetime(expected_type, (datetime.datetime, arrow.Arrow), argument_value) elif isinstance(expected_type, GraphQLList): return _safe_match_list(expected_type.of_type, argument_value) else: raise AssertionError(u'Could not safely represent the requested GraphQL type: ' u'{} {}'.format(expected_type, argument_value)) ###### # Public API ######
def _serialize_datetime(value): """Serialize a DateTime object to its proper ISO-8601 representation.""" if not isinstance(value, (datetime, arrow.Arrow)): raise ValueError(u'The received object was not a datetime: ' u'{} {}'.format(type(value), value)) return value.isoformat()
def _parse_datetime_value(value): """Deserialize a DateTime object from its proper ISO-8601 representation.""" if value.endswith('Z'): # Arrow doesn't support the "Z" literal to denote UTC time. # Strip the "Z" and add an explicit time zone instead. value = value[:-1] + '+00:00' return arrow.get(value, 'YYYY-MM-DDTHH:mm:ssZ').datetime
def fresh_timestamp(self): """ Get a fresh timestamp for the model. :return: arrow.Arrow """ return arrow.get().naive
def from_datetime(self, value): """ Convert datetime to a datetime object :rtype: datetime.datetime """ if isinstance(value, arrow.Arrow): return value.naive return arrow.get(value).naive
def as_datetime(self, value): """ Return a timestamp as a datetime. :rtype: arrow.Arrow """ return arrow.get(value)
def _format_date(self, date): """ Format a date or timestamp. :param date: The date or timestamp :type date: datetime.datetime or datetime.date or arrow.Arrow :rtype: str """ if date is None: return date format = self.get_date_format() if format == 'iso': if isinstance(date, basestring): return arrow.get(date).isoformat() return date.isoformat() else: if isinstance(date, arrow.Arrow): return date.format(format) elif isinstance(date, basestring): return arrow.get(date).format(format) return date.strftime(format)
def getDate(): today = arrow.utcnow() return arrow.Arrow(today.year, today.month, today.day).datetime
def _json_serial(obj): """JSON serializer for objects not serializable by default json code""" if isinstance(obj, Arrow): serial = obj.isoformat() return serial elif isinstance(obj, bytes): return to_jsonb64(obj) raise TypeError("Type %s not serializable" % type(obj))
def get_timeframe(data: Dict[str, Dict]) -> Tuple[arrow.Arrow, arrow.Arrow]: """ Get the maximum timeframe for the given backtest data :param data: dictionary with backtesting data :return: tuple containing min_date, max_date """ min_date, max_date = None, None for values in data.values(): sorted_values = sorted(values, key=lambda d: arrow.get(d['T'])) if not min_date or sorted_values[0]['T'] < min_date: min_date = sorted_values[0]['T'] if not max_date or sorted_values[-1]['T'] > max_date: max_date = sorted_values[-1]['T'] return arrow.get(min_date), arrow.get(max_date)
def datetime_to_str(d): """ :param arrow.Arrow d: :return str: """ datetime_str = d.to('utc').isoformat() if datetime_str.endswith('+00:00'): datetime_str = datetime_str[:-6] + 'Z' return datetime_str
def str_to_datetime(s): """ :param str s: :return arrow.Arrow: """ return arrow.get(s)
def datetime_to_timestamp(d): """ :param arrow.Arrow d: A datetime object. :return float: An equivalent UNIX timestamp. """ return d.float_timestamp
def format_time(self, time: Union[str, Arrow]) -> str: return time.format(TIME_FORMAT) if isinstance(time, Arrow) else arrow.utcnow().to(time).format(TIME_FORMAT)
def create_row(song): return [ song.Path, song.Title or 'Unknown', song.AlbumArtist or 'Unknown', song.Album or 'Unknown', time_helper.seconds_to_string(song.Length), song.Year or '0000', arrow.Arrow(song.Added.year, song.Added.month, song.Added.day).humanize(), song.Played, song.Genre, str(song.Tracknumber) ]
def msgpack_encoder(value): """ Encoder used by msgpack serializer when an object is unknown. This hook is basically used to serialize dates in Arrow objects. """ if isinstance(value, arrow.Arrow): value = msgpack.ExtType(1, value.isoformat().encode()) return value
def msgpack_ext_decoder(code, data): """ Decoded used by msgpack deserializer when an ext type is found. This hook is basically used to deserialize dates in Arrow objects. """ if code == 1: try: return arrow.get(data.decode()) except arrow.parser.ParserError: return arrow.Arrow.strptime(data.decode(), '%Y%m%dT%H:%M:%S.%f') return msgpack.ExtType(code, data)
def get_month_name(cls, month_index): date = arrow.Arrow(year=2000, day=1, month=month_index) return date.strftime("%B")
def get_month_number(cls, month_index): date = arrow.Arrow(year=2000, day=1, month=month_index) return date.strftime("%m")
def tweets_for_month(self, year, month_number): start_date = arrow.Arrow(year, month_number, 1) end_date = start_date.shift(months=+1) query = """ SELECT twitter_id, text, timestamp FROM tweets WHERE (timestamp > ?) AND (timestamp < ?) ORDER BY twitter_id ASC """ for row in self.query_many(query, start_date.timestamp, end_date.timestamp): yield self.tweet_from_row(row)
def transform(cls, data): """ Compute InfluxDB query expression from data in transformation dictionary. Also compute date range from query parameters "from" and "to". """ measurement = data.measurement # Vanilla QL (v1) #expression = 'SELECT * FROM {measurement}'.format(measurement=measurement) # PyInfluxQL (v2) # https://github.com/jjmalina/pyinfluxql # Labs #time_begin = arrow.utcnow() - arrow.Arrow(hour=1) #expression = Query('*').from_(measurement).where(time__gt=datetime.utcnow() - timedelta(hours=1)) #expression = Query(Mean('*')).from_(measurement).where(time__gt=datetime.now() - timedelta(1)).group_by(time=timedelta(hours=1)) # Fix up "measurement" if starting with numeric value # TODO: Fix should go to pyinfluxql if is_number(measurement[0]): measurement = '"{measurement}"'.format(measurement=measurement) # TODO: Use ".date_range" API method time_begin, time_end = compute_daterange(data.get('from'), data.get('to')) tags = {} #tags = InfluxDBAdapter.get_tags(data) expression = Query('*').from_(measurement).where(time__gte=time_begin, time__lte=time_end, **tags) result = { 'expression': str(expression), 'time_begin': time_begin, 'time_end': time_end, } return result
def stat(self, **kwargs): """Gets information about an object.""" url_endpoint, params, headers, _ = self._get_parameters(**kwargs) resp = self.get_requests_session().head( url_endpoint, params=params, headers=headers) if resp.ok: return location.LocationStat.from_keywords( session=self._session, location=self, size=resp.headers["x-goog-stored-content-length"], generation=resp.headers["x-goog-generation"], created=arrow.Arrow(*(rfc822.parsedate( resp.headers["Last-Modified"])[:7])).timestamp, )
def _coerce_timestamp(value): if isinstance(value, arrow.Arrow): return value.float_timestamp return float(value)
def validate(self, value, session=None): if isinstance(value, (float, int)): value = arrow.Arrow.fromtimestamp(value) elif not isinstance(value, arrow.Arrow): raise ValueError("Value must be timestamp or arrow.Arrow instance.") return value
def default(self, obj): if isinstance(obj, arrow.Arrow): return obj.format('YYYY-MM-DD HH:mm:ss ZZ') elif isinstance(obj, object): return 'cant_serialize_object' return json.JSONEncoder.default(self, obj)
def check_processed_device(device): keys = [ 'model', 'name', 'serial', 'source', 'type', 'last_sync', ] for key in keys: assert key in device assert isinstance(device['last_sync'], arrow.Arrow) assert device['serial'] == '0xDECAFBAD' assert device['source'] == 'landesk'
def json_serialize_datetime(obj): """Serialize a `datetime.datetime` or `arrow.Arrow` by converting to string in ISO format. >>> import json >>> json.dumps(arrow.get("2015-05-16 10:37"), default=json_serialize_datetime) '"2015-05-16T10:37:00+00:00"' >>> json.dumps(datetime.datetime.utcfromtimestamp(1431772620), default=json_serialize_datetime) '"2015-05-16T10:37:00"' """ if isinstance(obj, (datetime.datetime, arrow.Arrow)): return obj.isoformat(b'T' if six.PY2 else 'T') raise TypeError("{!r} is not JSON serializable".format(obj))
def parse_parameters_list(dict_list): """ >>> import arrow >>> returned = parse_parameters_list([ ... { "name": "accounts:is_2sv_enrolled", "boolValue": False }, ... { "name": "accounts:last_name", "stringValue": "Smith" }, ... { "name": "accounts:drive_used_quota_in_mb", "intValue": "0" }, ... { "name": "accounts:creation_time", ... "datetimeValue": "2010-10-28T10:26:35.000Z" }, ... ]) >>> returned == { ... "accounts:is_2sv_enrolled": False, ... "accounts:last_name": "Smith", ... "accounts:drive_used_quota_in_mb": 0, ... "accounts:creation_time": arrow.Arrow(2010, 10, 28, 10, 26, 35), ... } True >>> parse_parameters_list([{ "name": "fakeValue", "otherType": False }]) Traceback (most recent call last): ... ValueError: Failed to determine appropriate type for parameter 'fakeValue' """ retval = dict() for item_dict in dict_list: if 'boolValue' in item_dict: value = item_dict['boolValue'] elif 'intValue' in item_dict: value = int(item_dict['intValue']) elif 'stringValue' in item_dict: value = item_dict['stringValue'] elif 'datetimeValue' in item_dict: value = arrow.get(item_dict['datetimeValue']) else: raise ValueError("Failed to determine appropriate type for parameter " "{!r}".format(item_dict['name'])) retval[item_dict['name']] = value return retval
def validate_arrow(value): assert isinstance( value, arrow.Arrow) or value is None, "Column only accepts arrow types, not {}".format( type(value)) return value
def python_value(self, value): """Return the value in the data base as an arrow object. Returns: arrow.Arrow: An instance of arrow with the field filled in. """ value = super(ArrowDateTimeField, self).python_value(value) if (isinstance(value, (datetime.datetime, datetime.date, string_types))): return arrow.get(value) return value
def db_value(self, value): """Convert the Arrow instance to a datetime for saving in the db.""" if isinstance(value, string_types): value = arrow.get(value) if isinstance(value, arrow.Arrow): value = value.datetime return super(ArrowDateTimeField, self).db_value(value)
def default(self, obj): if isinstance(obj, arrow.Arrow): return obj.for_json() elif isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%dT%H:%M:%SZ') elif isinstance(obj, datetime.date): return obj.strftime('%Y-%m-%d') # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, obj)
def default(self, o): if isinstance(o, arrow.Arrow): return o.for_json() else: return super(self.__class__, self).default(o)
def utc_now(): """ Return the utcnow arrow object :return: Arrow """ return arrow.utcnow()
def get_action_time(time: str) -> arrow.Arrow: time_str = time.strip().replace('at', '') if time_str == "now": return arrow.get(arrow.now(), tzinfo=tz.tzlocal()) else: return arrow.get( time_str, 'MMMM D, YYYY hh:mma', tzinfo=tz.tzlocal())
def test_is_between(self): self.assertEqual(ArrowUtil.is_between((10, 0), (20, 0), now=arrow.Arrow(2017, 8, 14, 12, 0)), True) self.assertEqual(ArrowUtil.is_between((0, 0), (24, 0), now=arrow.Arrow(2017, 8, 14, 12, 0)), True) self.assertEqual(ArrowUtil.is_between((10, 0), (24, 0), now=arrow.Arrow(2017, 8, 14, 9, 0)), False) self.assertEqual(ArrowUtil.is_between((10, 0), (20, 0), now=arrow.Arrow(2017, 8, 14, 10, 0)), True) self.assertEqual(ArrowUtil.is_between((10, 0), (20, 0), now=arrow.Arrow(2017, 8, 14, 20, 0)), True)
def test_get_curr_time_diff(self): start = arrow.Arrow(2017, 8, 14, 12, 0) end = arrow.Arrow(2017, 8, 14, 13, 0) self.assertEqual(ArrowUtil.get_curr_time_diff(start=start, stop=end), 60) self.assertEqual(ArrowUtil.get_curr_time_diff(start=start, stop=end, base_hour=True), 1)
def local_arrow(dt): tz = get_current_request().weasyl_session.timezone return arrow.Arrow.fromdatetime(tz.localtime(dt))
def convert_to_localtime(target): tz = get_current_request().weasyl_session.timezone if isinstance(target, arrow.Arrow): return tz.localtime(target.datetime) else: target = int(get_time() if target is None else target) - _UNIXTIME_OFFSET return tz.localtime_from_timestamp(target)
def age_in_years(birthdate): """ Determines an age in years based off of the given arrow.Arrow birthdate and the current date. """ now = arrow.now() is_upcoming = (now.month, now.day) < (birthdate.month, birthdate.day) return now.year - birthdate.year - int(is_upcoming)
def iso8601(unixtime): if isinstance(unixtime, arrow.Arrow): return unixtime.isoformat().partition('.')[0] + 'Z' else: return datetime.datetime.utcfromtimestamp(unixtime - _UNIXTIME_OFFSET).isoformat() + 'Z'