我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tzlocal.get_localzone()。
def load_setting_export_timezone(logger, config_settings): """ If a user supplied timezone is found in the config settings it will be used to set the dates in the generated audit report, otherwise a local timezone will be used. :param logger: the logger :param config_settings: config settings loaded from config file :return: a timezone from config if valid, else local timezone for this machine """ try: timezone = config_settings['export_options']['timezone'] if timezone is None or timezone not in pytz.all_timezones: timezone = get_localzone() logger.info('No valid timezone found in config file, defaulting to local timezone') return str(timezone) except Exception as ex: log_critical_error(logger, ex, 'Exception parsing timezone from config file') timezone = get_localzone() return str(timezone)
def wait_until_stable(self): """ Wait until AWS reports the service as "stable". """ tz = tzlocal.get_localzone() self.its_run_start_time = datetime.now(tz) for i in range(40): time.sleep(15) success = self._show_current_status() if success: print("\nDeployment successful.\n") return True else: print("\nDeployment unready\n") print('Deployment failed...') # waiter = self.ecs.get_waiter('services_stable') # waiter.wait(cluster=self.clusterName, services=[self.serviceName]) return False
def from_str(cls, time): ''' takes a string like 1h, 2m, and returns a time_period object ''' # let's compute some sane things from these time_period = cls() if time == 'today': time_period.hours = int(dt.now(tzlocal.get_localzone()).hour) return time_period if time.lower().endswith(('h', 'm')) and time[:-1].isdigit(): qualifier, time = time[-1].lower(), int(time[:-1]) if qualifier == 'h': time_period.time_str = 'Hour' if time == 1 else 'Hours' time_period.hours = time if qualifier == 'm': time_period.time_str = 'Minutes' time_period.minutes = time if time >= 5 else 5 return time_period
def __init__(self, test_id, status, message, timestamp=None): """ Parametrized constructor for the TestProgress model :param test_id: The value of the 'test_id' field of TestProgress model :type test_id: int :param status: The value of the 'status' field of TestProgress model :type status: TestStatus :param message: The value of the 'message' field of TestProgress model :type message: str :param timestamp: The value of the 'timestamp' field of TestProgress model (None by default) :type timestamp: datetime """ self.test_id = test_id self.status = status tz = get_localzone() if timestamp is None: timestamp = tz.localize(datetime.datetime.now(), is_dst=None) timestamp = timestamp.astimezone(pytz.UTC) if timestamp.tzinfo is None: timestamp = pytz.utc.localize(timestamp, is_dst=None) self.timestamp = timestamp self.message = message
def __init__(self, loop, base_url, username=None, password=None, verify_ssl=True, tz=None, logger=None): if logger is None: # pragma: no coverage logger = mylogger self._client = RESTfm(loop, logger) self._client.verify_ssl = verify_ssl self._client.base_url = base_url self._closed = False if isinstance(tz, (str,)): self._client.timezone = timezone(tz) # pragma: no coverage elif tz is None: # pragma: no coverage self._client.timezone = get_localzone() else: self._client.timezone = tz if username is not None and password is not None: self._client.basic_auth(username, password) # pragma: no cover
def setUp(self): super(TestCase, self).setUp() # We use Yakutsk, Russia timezone to check if they convert right # with the local one self.other_timezone = pytz.timezone('Asia/Yakutsk') if get_localzone() == self.other_timezone: self.other_timezone = pytz.timezone('Europe/Vienna') self.client = Client(None, 'http://localhost').rest_client self.client.timezone = self.other_timezone self.a_date = datetime.datetime( 1986, 3, 6, 10, 28, 47, tzinfo=pytz.UTC, ).astimezone(pytz.timezone('Europe/Vienna'))\ .astimezone(get_localzone()) self.a_uuid = uuid.uuid1()
def date_to_datetime(time_input, tz=None): """ Convert ISO 8601 and other date strings to datetime.datetime type. Args: time_input (string): The time input string (see formats above). tz (string): The time zone for the returned data. Returns: (datetime.datetime): Python datetime.datetime object. """ dt = None if tz is not None: tz = timezone(tz) try: dt = parser.parse(time_input) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone(get_localzone().zone)) if tz is not None: dt = dt.astimezone(tz) except ValueError: pass return dt
def parse_journal(data): """" Parse systemd journal entries. We do this on the agent rather than the manager because: * It allows us to distribute this bit of load * It localizes the means of log acquisition entirely to the agent, the manager never has any idea that the journal is in use or what forwarding protocol is being used. """ utc_dt = get_localzone().localize(data['__REALTIME_TIMESTAMP'], is_dst=None).astimezone(pytz.utc) return { 'datetime': datetime.datetime.isoformat(utc_dt), 'severity': data['PRIORITY'], 'facility': data['SYSLOG_FACILITY'], 'source': data.get('SYSLOG_IDENTIFIER', "unknown"), 'message': data['MESSAGE'] }
def parse_date_local(date, milliseconds=True): """Parses dates from ISO8601 or Epoch formats to a standard datetime object in the current local timezone. **Note that this function should not be used in time calculations.** **It is primarily intended for displaying dates and times to the user.** Args: date (str): A date string in either iso8601 or Epoch format. milliseconds (bool): If True, then epoch times are treated as millisecond values, otherwise they are evaluated as seconds. Returns: datetime: The parsed date time in local time. """ return parse_date_utc(date, milliseconds).astimezone(get_localzone())
def refresh_client(self, from_dt=None, to_dt=None): """ Refreshes the CalendarService endpoint, ensuring that the event data is up-to-date. If no 'from_dt' or 'to_dt' datetimes have been given, the range becomes this month. """ today = datetime.today() first_day, last_day = monthrange(today.year, today.month) if not from_dt: from_dt = datetime(today.year, today.month, first_day) if not to_dt: to_dt = datetime(today.year, today.month, last_day) params = dict(self.params) params.update({ 'lang': 'en-us', 'usertz': get_localzone().zone, 'startDate': from_dt.strftime('%Y-%m-%d'), 'endDate': to_dt.strftime('%Y-%m-%d') }) req = self.session.get(self._calendar_refresh_url, params=params) self.response = req.json()
def __init__(self, project_name, cache_instance): """ Initialize a ProjectStats class for the specified project. :param project_name: project name to calculate stats for :type project_name: str :param cache_instance: DataCache instance :type cache_instance: :py:class:`~.DiskDataCache` """ logger.debug('Initializing ProjectStats for project: %s', project_name) self.project_name = project_name self.cache = cache_instance self.cache_data = {} self.cache_dates = self._get_cache_dates() self.as_of_timestamp = self._cache_get( self.cache_dates[-1])['cache_metadata']['data_ts'] self.as_of_datetime = datetime.fromtimestamp( self.as_of_timestamp, utc).astimezone(get_localzone())
def dispatch_history(self, request): vault_id = request.match_info['id'] vault = self.find_vault_by_id(vault_id) local_tz = get_localzone() queue = yield from vault.backend.changes(None, None, verbose=True) log_items = [] while True: item = yield from queue.get() if item is None: break store_hash, metadata, server_info = item bundle = VirtualBundle(None, vault, store_hash=store_hash) yield from bundle.write_encrypted_metadata(Once(metadata)) rev_id = server_info['id'].decode(vault.config.encoding) created_at = server_info['created_at'].decode(vault.config.encoding) operation = server_info['operation'].decode(vault.config.encoding) user_email = server_info['email'].decode(vault.config.encoding) log_items.append({ 'operation': operation, 'user_email': user_email, 'created_at': created_at, 'path': bundle.relpath }) return JSONResponse({'items': log_items})
def main(args): tz = None if args.stamp is not None: dt = datetime.datetime.utcfromtimestamp(args.stamp) tz = pytz.utc else: dt = datetime.datetime.now() if args.delta is not None: dt = dt + datetime.timedelta(days=args.delta) if args.localize: tz = pytz.timezone(args.localize) elif tz is None: tz = tzlocal.get_localzone() show(tz.localize(dt), show_zones=args.zones)
def __init__(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None, end_date=None, timezone=None): self.interval = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds) self.interval_length = timedelta_seconds(self.interval) if self.interval_length == 0: self.interval = timedelta(seconds=1) self.interval_length = 1 if timezone: self.timezone = astimezone(timezone) elif start_date and start_date.tzinfo: self.timezone = start_date.tzinfo elif end_date and end_date.tzinfo: self.timezone = end_date.tzinfo else: self.timezone = get_localzone() start_date = start_date or (datetime.now(self.timezone) + self.interval) self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date') self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
def timestamp_to_datetime(input_timestamp, tz=None): """ Converts epoch timestamp into datetime object. :param input_timestamp: Epoch timestamp. Microsecond or millisecond inputs accepted. :type input_timestamp: long :param tz: String representation of timezone accepted by pytz. eg. 'Asia/Hong_Kong'. If param is unfilled, system timezone is used. :return: timezone aware datetime object """ input_timestamp = long(input_timestamp) if tz is None: tz = get_localzone() else: tz = timezone(tz) # if timestamp granularity is microseconds try: return_value = datetime.fromtimestamp(input_timestamp, tz=tz) except ValueError: input_timestamp = float(input_timestamp)/1000 return_value = datetime.fromtimestamp(input_timestamp, tz=tz) return return_value
def _get_session_tz(self): """ Get the session timezone or use the local computer's timezone. """ try: tz = self.get_parameter(u'TIMEZONE') if not tz: tz = 'UTC' return pytz.timezone(tz) except pytz.exceptions.UnknownTimeZoneError: logger.warning('converting to tzinfo failed') if tzlocal is not None: return tzlocal.get_localzone() else: try: return datetime.timezone.utc except AttributeError: # py2k return pytz.timezone('UTC')
def process_shakemap(shake_id=None): """Process a given shake_id for realtime shake""" LOGGER.info('Inotify received new shakemap') tz = get_localzone() notify_realtime_rest(datetime.datetime.now(tz=tz)) done = False while not done: try: done = process_event( working_dir=working_dir, event_id=shake_id, locale=locale_option) except Exception as e: # pylint: disable=W0702 LOGGER.info('Process event failed') LOGGER.exception(e) LOGGER.info('Retrying to process event') LOGGER.info('Shakemap %s handled' % (shake_id, ))
def convert_to_local_from_utc(utc_time): local_timezone = tzlocal.get_localzone() local_time = utc_time.replace(tzinfo=pytz.utc).astimezone(local_timezone) return local_time
def test_use_local_timezone_if_none_given(self): config_settings = [{}, None, ''] for config_setting in config_settings: message = '{0} should cause ' + str(get_localzone()) + ' to be returned'.format(str(config_setting)) self.assertEqual(exp.load_setting_export_timezone(logger, config_setting), (str(get_localzone())), msg=message)
def iso8601_as_datetime(iso, localize=False # Default into local time zone ): try: parsed = isodate.parse_datetime(iso) if localize and parsed.tzinfo is None: parsed = get_localzone().localize(parsed) return parsed except isodate.isoerror.ISO8601Error as ex: raise ValueError("Invalid ISO8601 date") # TODO: This function exists in datetime as .isoformat()
def converter(self, timestamp): """Convert date to local timezone""" ltz = get_localzone() converted = arrow.get( timestamp, tz=ltz).to('UTC') return converted.datetime.timetuple()
def modtime(): """ Get the current local time as a string in iso format """ if six.PY2: local_tz = get_localzone() now = datetime.utcnow().replace(tzinfo=pytz.utc).astimezone(local_tz) else: now = datetime.now(timezone.utc).astimezone() nowstr = now.replace(microsecond=0).isoformat() return nowstr
def TimeEnd(self, timeend): if timeend is None: self._timeend = datetime.now(tz=pytz.utc) elif not isinstance(timeend, datetime): raise TypeError("req.TimeEnd must be a datetime.datetime object.") else: # Always use timezone-aware datetime. if timeend.tzinfo is None: _logger.warning('Naive HistDataReq.TimeEnd. ' 'Assumeing system local time zone.') tz_system = get_localzone() timeend = tz_system.localize(timeend) self._timeend = timeend
def startScheduler(): db.create_all() #create default roles! if not db.session.query(models.Role).filter(models.Role.name == "admin").first(): admin_role = models.Role(name='admin', description='Administrator Role') user_role = models.Role(name='user', description='User Role') db.session.add(admin_role) db.session.add(user_role) db.session.commit() try: import tzlocal tz = tzlocal.get_localzone() logger.info("local timezone: %s" % tz) except: tz = None if not tz or tz.zone == "local": logger.error('Local timezone name could not be determined. Scheduler will display times in UTC for any log' 'messages. To resolve this set up /etc/timezone with correct time zone name.') tz = pytz.utc #in debug mode this is executed twice :( #DONT run flask in auto reload mode when testing this! scheduler = BackgroundScheduler(logger=sched_logger, timezone=tz) scheduler.add_job(notify.task, 'interval', seconds=config.SCAN_INTERVAL, max_instances=1, start_date=datetime.datetime.now(tz) + datetime.timedelta(seconds=2)) scheduler.start() sched = scheduler #notify.task()
def get_next_day_cutoff_seconds(hour): "Return the next cutoff for a particular hour as seconds since epoch." local_tz = tzlocal.get_localzone() dt_cutoff = local_tz.localize(datetime.now().replace(minute=0, second=0, microsecond=0, hour=hour)) # TODO doesn't actually work. if it is 1 am then it doens't chose 4 am of the same day. # dt_cutoff += timedelta(days=1) epoch = datetime(1970, 1, 1, tzinfo=pytz.utc) return (dt_cutoff - epoch).total_seconds()
def localDatetime( datetime_or_timestamp ): if type(datetime_or_timestamp) in (int, float): dt = utcDatetime( datetime_or_timestamp ) else: dt = datetime_or_timestamp local_timezone = tzlocal.get_localzone() local_dt = dt.astimezone( local_timezone ) return local_dt
def localZone(self): from tzlocal import get_localzone self.oop.scr.delete('1.0', tk.END) self.oop.scr.insert(tk.INSERT, get_localzone()) # Format local US time with TimeZone info
def localZone(self): from tzlocal import get_localzone self.scr.insert(tk.INSERT, get_localzone()) # Format local US time with TimeZone info
def show_consumer_group(): client, option, project, logstore, consumer_group = prepare_test() local_timezone = tzlocal.get_localzone() try: while True: ret = client.get_check_point_fixed(project, logstore, consumer_group) with lock: # ret.log_print() print("***** consumer group status*****") if not ret.consumer_group_check_poins: print("[]") else: print("***consumer\t\t\tshard\tcursor time\t\t\t\t\tupdate time") for status in ret.consumer_group_check_poins: update_time = datetime.fromtimestamp(status['updateTime']/1000000, local_timezone) cursor_time = datetime.fromtimestamp(status.get('checkpoint_previous_cursor_time', 0), local_timezone) print("{0}\t{1}\t\t{2}\t{3}".format(status["consumer"], status['shard'], cursor_time, update_time)) time.sleep(1) except KeyboardInterrupt: print("***** exit *****")
def localzone(cls): try: tz = tzlocal.get_localzone() except pytz.exceptions.UnknownTimeZoneError: raise UnknownTimeZone("Failed to guess local timezone") return cls.from_pytz(tz)
def do_download_db(database_name=None, master_pwd=None, base_url=None, backup_format='zip', filename=None, api='9.0', hostname=None): if base_url is None: base_url = 'https://%s.odoo.apertoso.net' % database_name if base_url.endswith('/'): base_url = base_url.strip('/') if filename is None: ts = datetime.datetime.now(get_localzone()).strftime( '%Y%m%d-%H%M%S-%Z') filename = "%s_%s.%s" % (database_name, ts, backup_format) if api.startswith('9.0'): return do_download_db_v9( database_name=database_name, master_pwd=master_pwd, base_url=base_url, backup_format=backup_format, filename=filename) elif api == '8.0': return do_download_db_v8( database_name=database_name, master_pwd=master_pwd, base_url=base_url, backup_format=backup_format, filename=filename) elif api == 'ssh': server_name = hostname or \ 'openerp.production.{}.clients.apertoso.net'.format(database_name) return do_download_db_ssh(database_name=database_name, server=server_name, filename=filename) else: raise NotImplementedError("No support for api %s" % api)
def get_display_timezone(): ''' Returns a pytz timezone for the display_timezone setting in the configuration file or UTC if not specified. :rtype: timezone ''' timezone_name = config.get('ckan.display_timezone') or 'utc' if timezone_name == 'server': return tzlocal.get_localzone() return pytz.timezone(timezone_name)
def test_server_timezone(self): eq_(h.get_display_timezone(), tzlocal.get_localzone())
def print_status(i, status): row = {} row[u'index'] = i row[u'created'] = datetime.strptime(status['created_at'], "%a %b %d %H:%M:%S +0000 %Y").replace(tzinfo=pytz.utc).astimezone(tzlocal.get_localzone()).strftime("%Y-%m-%d %H:%M:%S") row[u'id'] = status['id'] row[u'screen_name'] = status['user']['screen_name'] row[u'user_id'] = status['user']['id'] row[u'text'] = status['text'] row[u'photo'] = status['photo']['largeurl'] if status.get('photo') else "" print(u"[%(index)s]%(created)s(%(id)s)%(screen_name)s(%(user_id)s): %(text)s %(photo)s" % row)
def ask_user_input(): global restoreparams, exitvalue is_safe = ui.ask_yn("Is this system isolated with no access to production database storage") if is_safe != "Y": print "Exiting. Please execute this script in an isolated environment." exitvalue = 1 return restoreparams['mountpath'] = ui.ask_directory("Directory where to mount clone:", False) restoreparams['timepoint'] = ui.ask_timestamp("Restore database to time point") is_utc = ui.ask_yn("Was the timestamp in UTC (answer N for local time)") if is_utc == "Y": tz = pytz.utc else: tz = get_localzone() restoreparams['timepoint'] = tz.localize(restoreparams['timepoint']) restore.set_restore_target_time(restoreparams['timepoint']) restoreparams['sid'] = ui.ask_string("Target instance name:", 8, True) # splitter = "######################################" print splitter print "" print "Database unique name: %s" % configname print "Oracle home: %s" % Configuration.get("oraclehome", "generic") print "Clone mount path: %s" % restoreparams['mountpath'] print "Target instance SID: %s" % restoreparams['sid'] print "Restore target time UTC: %s" % restoreparams['timepoint'].astimezone(pytz.utc) print "Restore target time local: %s" % restoreparams['timepoint'].astimezone(get_localzone()) print "Restored from snapshot: %s" % restore.sourcesnapid # print "" is_ok = ui.ask_yn("Are these parameters correct") if is_ok != "Y": print "Exiting. Please execute this script again." exitvalue = 1 return print "" print splitter
def _set_parameters(self): dbconfig = SafeConfigParser() dbconfig.read(os.path.join(self._mountdest, 'autorestore.cfg')) self._dbparams['dbname'] = dbconfig.get('dbparams','db_name') if self.targettime is None: self._dbparams['restoretarget'] = datetime.strptime(dbconfig.get('dbparams','lasttime'), '%Y-%m-%d %H:%M:%S') else: self._dbparams['restoretarget'] = self.targettime.astimezone(get_localzone()) self._dbparams['bctfile'] = dbconfig.get('dbparams','bctfile') Configuration.substitutions.update({ 'db_name': self._dbparams['dbname'], 'db_compatible': dbconfig.get('dbparams','compatible'), 'db_files': dbconfig.get('dbparams','db_files'), 'db_undotbs': dbconfig.get('dbparams','undo_tablespace'), 'db_block_size': dbconfig.get('dbparams','db_block_size'), # 'lastscn': dbconfig.get('dbparams','lastscn'), 'lasttime': self._dbparams['restoretarget'].strftime('%Y-%m-%d %H:%M:%S'), 'dbid': Configuration.get('dbid', self._configname), 'instancenumber': Configuration.get('autorestoreinstancenumber', self._configname), 'thread': Configuration.get('autorestorethread', self._configname), 'backupfinishedtime': dbconfig.get('dbparams','backup-finished'), 'bctfile': self._dbparams['bctfile'], 'autorestoredestination': self._restoredest, 'mountdestination': self._mountdest, }) try: Configuration.substitutions.update({'cdb': dbconfig.get('dbparams','enable_pluggable_database')}) except: Configuration.substitutions.update({'cdb': 'FALSE'}) self._initfile = os.path.join(self._restoredest, 'init.ora') Configuration.substitutions.update({ 'initora': self._initfile, })
def _ts_to_date(self, ts): return datetime.fromtimestamp(ts, tzlocal.get_localzone())
def human_date_to_datetime(time_input, tz='UTC'): """ Convert human readable date (e.g. 30 days ago) to datetime.datetime using parsedatetime module. Examples: * August 25th, 2008 * 25 Aug 2008 * Aug 25 5pm * 5pm August 25 * next saturday * tomorrow * next thursday at 4pm * at 4pm * eod * tomorrow eod * eod tuesday * eoy * eom * in 5 minutes * 5 minutes from now * 5 hours before now * 2 hours before noon * 2 days from tomorrow Args: time_input (string): The time input string (see formats above). tz (string): The time zone for the returned data. Returns: (datetime.datetime): Python datetime.datetime object. """ dt = None cal = pdt.Calendar() local_tz = timezone(get_localzone().zone) dt, status = cal.parseDT(time_input, tzinfo=local_tz) dt = dt.astimezone(timezone(tz)) if status == 0: dt = None return dt
def due_date(self, due_date): """ Sets or clears the due date. """ if due_date and not isinstance(due_date, datetime): raise TypeError if due_date: self.__task_dict['date'] = \ due_date.astimezone(get_localzone()).date() elif 'date' in self.__task_dict: del self.__task_dict['date']
def get_event_detail(self, pguid, guid): """ Fetches a single event's details by specifying a pguid (a calendar) and a guid (an event's ID). """ params = dict(self.params) params.update({'lang': 'en-us', 'usertz': get_localzone().zone}) url = '%s/%s/%s' % (self._calendar_event_detail_url, pguid, guid) req = self.session.get(url, params=params) self.response = req.json() return self.response['Event'][0]
def _local_tz(self): """Returns the local timezone.""" return get_localzone()
def time(self, timezone_code): """Return current time and time in the timezone listed. **Dependencies**: pip install pytz tzlocal Keyword arguments: timezone_code -- Code for timezone """ local_time = datetime.now(get_localzone()) converted_time = datetime.now(timezone(timezone_code.upper())) await self.bot.say("```Local time : {} \n\n {} : {}```" .format(local_time, timezone_code, converted_time))
def kw_to_sqlalchemy(sqlalchemy_table_cls, kw): sqlalchemy_table = sqlalchemy_table_cls() for column, value in kw.iteritems(): try: column_type = _find_type(sqlalchemy_table_cls, column) python_type = column_type.python_type except NameError: continue try: if not issubclass(type(value), python_type): if issubclass(python_type, bool): column_value = True if value.lower() == 'true' else False else: column_value = python_type(value) else: column_value = value except UnicodeEncodeError: column_value = unicode(value) finally: if issubclass(python_type, datetime): if column_type.timezone: # Convert to local time tz = get_localzone() column_value = tz.localize(column_value, is_dst=None) setattr(sqlalchemy_table, column, column_value) return sqlalchemy_table
def print_log(self, verbose=False): local_tz = get_localzone() for vault in self.vaults: try: yield from vault.backend.open() except VaultNotInitialized: logger.error('%s has not been initialized. Use "syncrypt init" to register the folder as vault.' % vault) continue queue = yield from vault.backend.changes(None, None, verbose=verbose) while True: item = yield from queue.get() if item is None: break store_hash, metadata, server_info = item bundle = VirtualBundle(None, vault, store_hash=store_hash) yield from bundle.write_encrypted_metadata(Once(metadata)) rev_id = server_info['id'].decode(vault.config.encoding) created_at = iso8601.parse_date(server_info['created_at'].decode())\ .astimezone(local_tz)\ .strftime('%x %X') operation = server_info['operation'].decode(vault.config.encoding) if verbose: user_email = server_info['email'].decode(vault.config.encoding) print("%s | %s | %s | %-9s %s" % (created_at, rev_id, user_email, operation, bundle.relpath)) else: print("%s | %-9s %s" % (created_at, operation, bundle.relpath)) yield from self.wait()
def is_uploaded_after(self, package, when): if not self.is_uploaded(package): return False a_file = self.uploaded_files_for(package)[0] return datetime.datetime.fromtimestamp(os.path.getmtime(a_file), tzlocal.get_localzone()) >= when