Python tzlocal 模块,get_localzone() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tzlocal.get_localzone()

项目:safetyculture-sdk-python    作者:SafetyCulture    | 项目源码 | 文件源码
def load_setting_export_timezone(logger, config_settings):
    """
    If a user supplied timezone is found in the config settings it will
    be used to set the dates in the generated audit report, otherwise
    a local timezone will be used.

    :param logger:           the logger
    :param config_settings:  config settings loaded from config file
    :return:                 a timezone from config if valid, else local timezone for this machine
    """
    try:
        timezone = config_settings['export_options']['timezone']
        if timezone is None or timezone not in pytz.all_timezones:
            timezone = get_localzone()
            logger.info('No valid timezone found in config file, defaulting to local timezone')
        return str(timezone)
    except Exception as ex:
        log_critical_error(logger, ex, 'Exception parsing timezone from config file')
        timezone = get_localzone()
        return str(timezone)
项目:deployfish    作者:caltechads    | 项目源码 | 文件源码
def wait_until_stable(self):
        """
        Wait until AWS reports the service as "stable".
        """
        tz = tzlocal.get_localzone()
        self.its_run_start_time = datetime.now(tz)

        for i in range(40):
            time.sleep(15)
            success = self._show_current_status()
            if success:
                print("\nDeployment successful.\n")
                return True
            else:
                print("\nDeployment unready\n")

        print('Deployment failed...')

        # waiter = self.ecs.get_waiter('services_stable')
        # waiter.wait(cluster=self.clusterName, services=[self.serviceName])
        return False
项目:slackbot    作者:Parsely    | 项目源码 | 文件源码
def from_str(cls, time):
        ''' takes a string like 1h, 2m, and returns a time_period object '''
        # let's compute some sane things from these
        time_period = cls()
        if time == 'today':
            time_period.hours = int(dt.now(tzlocal.get_localzone()).hour)
            return time_period

        if time.lower().endswith(('h', 'm')) and time[:-1].isdigit():
            qualifier, time = time[-1].lower(), int(time[:-1])
            if qualifier == 'h':
                time_period.time_str = 'Hour' if time == 1 else 'Hours'
                time_period.hours = time
            if qualifier == 'm':
                time_period.time_str = 'Minutes'
                time_period.minutes = time if time >= 5 else 5
            return time_period
项目:sample-platform    作者:CCExtractor    | 项目源码 | 文件源码
def __init__(self, test_id, status, message, timestamp=None):
        """
        Parametrized constructor for the TestProgress model

        :param test_id: The value of the 'test_id' field of TestProgress model
        :type test_id: int
        :param status: The value of the 'status' field of TestProgress model
        :type status: TestStatus
        :param message: The value of the 'message' field of TestProgress model
        :type message: str
        :param timestamp: The value of the 'timestamp' field of TestProgress
         model (None by default)
        :type timestamp: datetime
        """
        self.test_id = test_id
        self.status = status
        tz = get_localzone()
        if timestamp is None:
            timestamp = tz.localize(datetime.datetime.now(), is_dst=None)
            timestamp = timestamp.astimezone(pytz.UTC)
        if timestamp.tzinfo is None:
            timestamp = pytz.utc.localize(timestamp, is_dst=None)
        self.timestamp = timestamp
        self.message = message
项目:py-restfmclient    作者:pcdummy    | 项目源码 | 文件源码
def __init__(self, loop, base_url,
                 username=None, password=None, verify_ssl=True,
                 tz=None, logger=None):
        if logger is None:  # pragma: no coverage
            logger = mylogger
        self._client = RESTfm(loop, logger)
        self._client.verify_ssl = verify_ssl
        self._client.base_url = base_url

        self._closed = False

        if isinstance(tz, (str,)):
            self._client.timezone = timezone(tz)  # pragma: no coverage
        elif tz is None:  # pragma: no coverage
            self._client.timezone = get_localzone()

        else:
            self._client.timezone = tz

        if username is not None and password is not None:
            self._client.basic_auth(username, password)  # pragma: no cover
项目:py-restfmclient    作者:pcdummy    | 项目源码 | 文件源码
def setUp(self):
        super(TestCase, self).setUp()

        # We use Yakutsk, Russia timezone to check if they convert right
        # with the local one
        self.other_timezone = pytz.timezone('Asia/Yakutsk')
        if get_localzone() == self.other_timezone:
            self.other_timezone = pytz.timezone('Europe/Vienna')

        self.client = Client(None, 'http://localhost').rest_client
        self.client.timezone = self.other_timezone

        self.a_date = datetime.datetime(
            1986, 3, 6, 10, 28, 47,
            tzinfo=pytz.UTC,
        ).astimezone(pytz.timezone('Europe/Vienna'))\
         .astimezone(get_localzone())

        self.a_uuid = uuid.uuid1()
项目:tcex    作者:ThreatConnect-Inc    | 项目源码 | 文件源码
def date_to_datetime(time_input, tz=None):
        """ Convert ISO 8601 and other date strings to datetime.datetime type.

        Args:
            time_input (string): The time input string (see formats above).
            tz (string): The time zone for the returned data.

        Returns:
            (datetime.datetime): Python datetime.datetime object.
        """
        dt = None
        if tz is not None:
            tz = timezone(tz)
        try:
            dt = parser.parse(time_input)
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone(get_localzone().zone))
            if tz is not None:
                dt = dt.astimezone(tz)
        except ValueError:
            pass

        return dt
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def parse_journal(data):
    """"
    Parse systemd journal entries.

    We do this on the agent rather than the manager because:
     * It allows us to distribute this bit of load
     * It localizes the means of log acquisition entirely to the agent,
       the manager never has any idea that the journal is in use or what
       forwarding protocol is being used.
    """

    utc_dt = get_localzone().localize(data['__REALTIME_TIMESTAMP'], is_dst=None).astimezone(pytz.utc)

    return {
        'datetime': datetime.datetime.isoformat(utc_dt),
        'severity': data['PRIORITY'],
        'facility': data['SYSLOG_FACILITY'],
        'source': data.get('SYSLOG_IDENTIFIER', "unknown"),
        'message': data['MESSAGE']
    }
项目:scriptabit    作者:DC23    | 项目源码 | 文件源码
def parse_date_local(date, milliseconds=True):
    """Parses dates from ISO8601 or Epoch formats to a standard datetime object
    in the current local timezone.

    **Note that this function should not be used in time calculations.**
    **It is primarily intended for displaying dates and times to the user.**

    Args:
        date (str): A date string in either iso8601 or Epoch format.
        milliseconds (bool): If True, then epoch times are treated as
            millisecond values, otherwise they are evaluated as seconds.

    Returns:
        datetime: The parsed date time in local time.
    """
    return parse_date_utc(date, milliseconds).astimezone(get_localzone())
项目:Habitica-todo    作者:eringiglio    | 项目源码 | 文件源码
def parse_date_local(date, milliseconds=True):
    """Parses dates from ISO8601 or Epoch formats to a standard datetime object
    in the current local timezone.

    **Note that this function should not be used in time calculations.**
    **It is primarily intended for displaying dates and times to the user.**

    Args:
        date (str): A date string in either iso8601 or Epoch format.
        milliseconds (bool): If True, then epoch times are treated as
            millisecond values, otherwise they are evaluated as seconds.

    Returns:
        datetime: The parsed date time in local time.
    """
    return parse_date_utc(date, milliseconds).astimezone(get_localzone())
项目:alexa-apple-calendar    作者:zanderxyz    | 项目源码 | 文件源码
def refresh_client(self, from_dt=None, to_dt=None):
        """
        Refreshes the CalendarService endpoint, ensuring that the
        event data is up-to-date. If no 'from_dt' or 'to_dt' datetimes
        have been given, the range becomes this month.
        """
        today = datetime.today()
        first_day, last_day = monthrange(today.year, today.month)
        if not from_dt:
            from_dt = datetime(today.year, today.month, first_day)
        if not to_dt:
            to_dt = datetime(today.year, today.month, last_day)
        params = dict(self.params)
        params.update({
            'lang': 'en-us',
            'usertz': get_localzone().zone,
            'startDate': from_dt.strftime('%Y-%m-%d'),
            'endDate': to_dt.strftime('%Y-%m-%d')
        })
        req = self.session.get(self._calendar_refresh_url, params=params)
        self.response = req.json()
项目:pypi-download-stats    作者:jantman    | 项目源码 | 文件源码
def __init__(self, project_name, cache_instance):
        """
        Initialize a ProjectStats class for the specified project.
        :param project_name: project name to calculate stats for
        :type project_name: str
        :param cache_instance: DataCache instance
        :type cache_instance: :py:class:`~.DiskDataCache`
        """
        logger.debug('Initializing ProjectStats for project: %s', project_name)
        self.project_name = project_name
        self.cache = cache_instance
        self.cache_data = {}
        self.cache_dates = self._get_cache_dates()
        self.as_of_timestamp = self._cache_get(
            self.cache_dates[-1])['cache_metadata']['data_ts']
        self.as_of_datetime = datetime.fromtimestamp(
            self.as_of_timestamp, utc).astimezone(get_localzone())
项目:client    作者:syncrypt    | 项目源码 | 文件源码
def dispatch_history(self, request):
        vault_id = request.match_info['id']
        vault = self.find_vault_by_id(vault_id)
        local_tz = get_localzone()
        queue = yield from vault.backend.changes(None, None, verbose=True)
        log_items = []
        while True:
            item = yield from queue.get()
            if item is None:
                break
            store_hash, metadata, server_info = item
            bundle = VirtualBundle(None, vault, store_hash=store_hash)
            yield from bundle.write_encrypted_metadata(Once(metadata))
            rev_id = server_info['id'].decode(vault.config.encoding)
            created_at = server_info['created_at'].decode(vault.config.encoding)
            operation = server_info['operation'].decode(vault.config.encoding)
            user_email = server_info['email'].decode(vault.config.encoding)
            log_items.append({
                'operation': operation,
                'user_email': user_email,
                'created_at': created_at,
                'path': bundle.relpath
            })
        return JSONResponse({'items': log_items})
项目:cligraphy    作者:Netflix-Skunkworks    | 项目源码 | 文件源码
def main(args):

    tz = None

    if args.stamp is not None:
        dt = datetime.datetime.utcfromtimestamp(args.stamp)
        tz = pytz.utc
    else:
        dt = datetime.datetime.now()

    if args.delta is not None:
        dt = dt + datetime.timedelta(days=args.delta)

    if args.localize:
        tz = pytz.timezone(args.localize)
    elif tz is None:
        tz = tzlocal.get_localzone()

    show(tz.localize(dt), show_zones=args.zones)
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def __init__(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None,
                 end_date=None, timezone=None):
        self.interval = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes,
                                  seconds=seconds)
        self.interval_length = timedelta_seconds(self.interval)
        if self.interval_length == 0:
            self.interval = timedelta(seconds=1)
            self.interval_length = 1

        if timezone:
            self.timezone = astimezone(timezone)
        elif start_date and start_date.tzinfo:
            self.timezone = start_date.tzinfo
        elif end_date and end_date.tzinfo:
            self.timezone = end_date.tzinfo
        else:
            self.timezone = get_localzone()

        start_date = start_date or (datetime.now(self.timezone) + self.interval)
        self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
        self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def __init__(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None,
                 end_date=None, timezone=None):
        self.interval = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes,
                                  seconds=seconds)
        self.interval_length = timedelta_seconds(self.interval)
        if self.interval_length == 0:
            self.interval = timedelta(seconds=1)
            self.interval_length = 1

        if timezone:
            self.timezone = astimezone(timezone)
        elif start_date and start_date.tzinfo:
            self.timezone = start_date.tzinfo
        elif end_date and end_date.tzinfo:
            self.timezone = end_date.tzinfo
        else:
            self.timezone = get_localzone()

        start_date = start_date or (datetime.now(self.timezone) + self.interval)
        self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
        self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
项目:gwrappy    作者:danielpoonwj    | 项目源码 | 文件源码
def timestamp_to_datetime(input_timestamp, tz=None):
    """
    Converts epoch timestamp into datetime object.

    :param input_timestamp: Epoch timestamp. Microsecond or millisecond inputs accepted.
    :type input_timestamp: long
    :param tz: String representation of timezone accepted by pytz. eg. 'Asia/Hong_Kong'. If param is unfilled, system timezone is used.
    :return: timezone aware datetime object
    """
    input_timestamp = long(input_timestamp)

    if tz is None:
        tz = get_localzone()
    else:
        tz = timezone(tz)

    # if timestamp granularity is microseconds
    try:
        return_value = datetime.fromtimestamp(input_timestamp, tz=tz)
    except ValueError:
        input_timestamp = float(input_timestamp)/1000
        return_value = datetime.fromtimestamp(input_timestamp, tz=tz)

    return return_value
项目:snowflake-connector-python    作者:snowflakedb    | 项目源码 | 文件源码
def _get_session_tz(self):
        """ Get the session timezone or use the local computer's timezone. """
        try:
            tz = self.get_parameter(u'TIMEZONE')
            if not tz:
                tz = 'UTC'
            return pytz.timezone(tz)
        except pytz.exceptions.UnknownTimeZoneError:
            logger.warning('converting to tzinfo failed')
            if tzlocal is not None:
                return tzlocal.get_localzone()
            else:
                try:
                    return datetime.timezone.utc
                except AttributeError:  # py2k
                    return pytz.timezone('UTC')
项目:inasafe-realtime    作者:inasafe    | 项目源码 | 文件源码
def process_shakemap(shake_id=None):
        """Process a given shake_id for realtime shake"""
        LOGGER.info('Inotify received new shakemap')
        tz = get_localzone()
        notify_realtime_rest(datetime.datetime.now(tz=tz))
        done = False
        while not done:
            try:
                done = process_event(
                    working_dir=working_dir,
                    event_id=shake_id,
                    locale=locale_option)
            except Exception as e:  # pylint: disable=W0702
                LOGGER.info('Process event failed')
                LOGGER.exception(e)
                LOGGER.info('Retrying to process event')
        LOGGER.info('Shakemap %s handled' % (shake_id, ))
项目:Alfred    作者:jkachhadia    | 项目源码 | 文件源码
def __init__(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None,
                 end_date=None, timezone=None):
        self.interval = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes,
                                  seconds=seconds)
        self.interval_length = timedelta_seconds(self.interval)
        if self.interval_length == 0:
            self.interval = timedelta(seconds=1)
            self.interval_length = 1

        if timezone:
            self.timezone = astimezone(timezone)
        elif start_date and start_date.tzinfo:
            self.timezone = start_date.tzinfo
        elif end_date and end_date.tzinfo:
            self.timezone = end_date.tzinfo
        else:
            self.timezone = get_localzone()

        start_date = start_date or (datetime.now(self.timezone) + self.interval)
        self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
        self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
项目:threatconnect-developer-docs    作者:ThreatConnect-Inc    | 项目源码 | 文件源码
def date_to_datetime(time_input, tz=None):
        """ Convert ISO 8601 and other date strings to datetime.datetime type.

        Args:
            time_input (string): The time input string (see formats above).
            tz (string): The time zone for the returned data.

        Returns:
            (datetime.datetime): Python datetime.datetime object.
        """
        dt = None
        if tz is not None:
            tz = timezone(tz)
        try:
            dt = parser.parse(time_input)
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone(get_localzone().zone))
            if tz is not None:
                dt = dt.astimezone(tz)
        except ValueError:
            pass

        return dt
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def convert_to_local_from_utc(utc_time):
    local_timezone = tzlocal.get_localzone()
    local_time = utc_time.replace(tzinfo=pytz.utc).astimezone(local_timezone)
    return local_time
项目:safetyculture-sdk-python    作者:SafetyCulture    | 项目源码 | 文件源码
def test_use_local_timezone_if_none_given(self):
        config_settings = [{}, None, '']
        for config_setting in config_settings:
            message = '{0} should cause ' + str(get_localzone()) + ' to be returned'.format(str(config_setting))
            self.assertEqual(exp.load_setting_export_timezone(logger, config_setting), (str(get_localzone())), msg=message)
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def iso8601_as_datetime(iso,
                        localize=False  # Default into local time zone
                        ):
    try:
        parsed = isodate.parse_datetime(iso)
        if localize and parsed.tzinfo is None:
            parsed = get_localzone().localize(parsed)
        return parsed
    except isodate.isoerror.ISO8601Error as ex:
        raise ValueError("Invalid ISO8601 date")

# TODO: This function exists in datetime as .isoformat()
项目:defplorex    作者:trendmicro    | 项目源码 | 文件源码
def converter(self, timestamp):
        """Convert date to local timezone"""

        ltz = get_localzone()
        converted = arrow.get(
                timestamp,
                tz=ltz).to('UTC')
        return converted.datetime.timetuple()
项目:HugoPhotoSwipe    作者:GjjvdBurg    | 项目源码 | 文件源码
def modtime():
    """ Get the current local time as a string in iso format """
    if six.PY2:
        local_tz = get_localzone()
        now = datetime.utcnow().replace(tzinfo=pytz.utc).astimezone(local_tz)
    else:
        now = datetime.now(timezone.utc).astimezone()
    nowstr = now.replace(microsecond=0).isoformat()
    return nowstr
项目:ibstract    作者:jesseliu0    | 项目源码 | 文件源码
def TimeEnd(self, timeend):
        if timeend is None:
            self._timeend = datetime.now(tz=pytz.utc)
        elif not isinstance(timeend, datetime):
            raise TypeError("req.TimeEnd must be a datetime.datetime object.")
        else:
            # Always use timezone-aware datetime.
            if timeend.tzinfo is None:
                _logger.warning('Naive HistDataReq.TimeEnd. '
                                'Assumeing system local time zone.')
                tz_system = get_localzone()
                timeend = tz_system.localize(timeend)
            self._timeend = timeend
项目:plexivity    作者:mutschler    | 项目源码 | 文件源码
def startScheduler():
    db.create_all()
    #create default roles!
    if not db.session.query(models.Role).filter(models.Role.name == "admin").first():
        admin_role = models.Role(name='admin', description='Administrator Role')
        user_role = models.Role(name='user', description='User Role')
        db.session.add(admin_role)
        db.session.add(user_role)
        db.session.commit()

    try:
        import tzlocal

        tz = tzlocal.get_localzone()
        logger.info("local timezone: %s" % tz)
    except:
        tz = None

    if not tz or tz.zone == "local":
        logger.error('Local timezone name could not be determined. Scheduler will display times in UTC for any log'
                 'messages. To resolve this set up /etc/timezone with correct time zone name.')
        tz = pytz.utc
    #in debug mode this is executed twice :(
    #DONT run flask in auto reload mode when testing this!
    scheduler = BackgroundScheduler(logger=sched_logger, timezone=tz)
    scheduler.add_job(notify.task, 'interval', seconds=config.SCAN_INTERVAL, max_instances=1,
                      start_date=datetime.datetime.now(tz) + datetime.timedelta(seconds=2))
    scheduler.start()
    sched = scheduler
    #notify.task()
项目:anki_progress_stats    作者:matthayes    | 项目源码 | 文件源码
def get_next_day_cutoff_seconds(hour):
  "Return the next cutoff for a particular hour as seconds since epoch."
  local_tz = tzlocal.get_localzone()
  dt_cutoff = local_tz.localize(datetime.now().replace(minute=0, second=0, microsecond=0, hour=hour))

  # TODO doesn't actually work.  if it is 1 am then it doens't chose 4 am of the same day.
  # dt_cutoff += timedelta(days=1)

  epoch = datetime(1970, 1, 1, tzinfo=pytz.utc)
  return (dt_cutoff - epoch).total_seconds()
项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def localDatetime( datetime_or_timestamp ):
    if type(datetime_or_timestamp) in (int, float):
        dt = utcDatetime( datetime_or_timestamp )
    else:
        dt = datetime_or_timestamp

    local_timezone = tzlocal.get_localzone()
    local_dt = dt.astimezone( local_timezone )
    return local_dt
项目:Python-GUI-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def localZone(self):   
        from tzlocal import get_localzone    
        self.oop.scr.delete('1.0', tk.END)   
        self.oop.scr.insert(tk.INSERT, get_localzone())

    # Format local US time with TimeZone info
项目:Python-GUI-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def localZone(self):   
        from tzlocal import get_localzone         
        self.scr.insert(tk.INSERT, get_localzone())


    # Format local US time with TimeZone info
项目:aliyun-log-python-sdk    作者:aliyun    | 项目源码 | 文件源码
def show_consumer_group():
    client, option, project, logstore, consumer_group = prepare_test()

    local_timezone = tzlocal.get_localzone()

    try:
        while True:
            ret = client.get_check_point_fixed(project, logstore, consumer_group)

            with lock:
                # ret.log_print()
                print("***** consumer group status*****")
                if not ret.consumer_group_check_poins:
                    print("[]")
                else:
                    print("***consumer\t\t\tshard\tcursor time\t\t\t\t\tupdate time")
                    for status in ret.consumer_group_check_poins:
                        update_time = datetime.fromtimestamp(status['updateTime']/1000000, local_timezone)
                        cursor_time = datetime.fromtimestamp(status.get('checkpoint_previous_cursor_time', 0),
                                                             local_timezone)
                        print("{0}\t{1}\t\t{2}\t{3}".format(status["consumer"], status['shard'],
                                                          cursor_time, update_time))

            time.sleep(1)
    except KeyboardInterrupt:
        print("***** exit *****")
项目:exchangelib    作者:ecederstrand    | 项目源码 | 文件源码
def localzone(cls):
        try:
            tz = tzlocal.get_localzone()
        except pytz.exceptions.UnknownTimeZoneError:
            raise UnknownTimeZone("Failed to guess local timezone")
        return cls.from_pytz(tz)
项目:tools    作者:apertoso    | 项目源码 | 文件源码
def do_download_db(database_name=None, master_pwd=None, base_url=None,
                   backup_format='zip', filename=None, api='9.0',
                   hostname=None):
    if base_url is None:
        base_url = 'https://%s.odoo.apertoso.net' % database_name
    if base_url.endswith('/'):
        base_url = base_url.strip('/')
    if filename is None:
        ts = datetime.datetime.now(get_localzone()).strftime(
            '%Y%m%d-%H%M%S-%Z')
        filename = "%s_%s.%s" % (database_name, ts, backup_format)

    if api.startswith('9.0'):
        return do_download_db_v9(
            database_name=database_name,
            master_pwd=master_pwd, base_url=base_url,
            backup_format=backup_format,
            filename=filename)
    elif api == '8.0':
        return do_download_db_v8(
            database_name=database_name,
            master_pwd=master_pwd, base_url=base_url,
            backup_format=backup_format,
            filename=filename)
    elif api == 'ssh':
        server_name = hostname or \
            'openerp.production.{}.clients.apertoso.net'.format(database_name)
        return do_download_db_ssh(database_name=database_name,
                                  server=server_name,
                                  filename=filename)
    else:
        raise NotImplementedError("No support for api %s" % api)
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def get_display_timezone():
    ''' Returns a pytz timezone for the display_timezone setting in the
    configuration file or UTC if not specified.
    :rtype: timezone
    '''
    timezone_name = config.get('ckan.display_timezone') or 'utc'

    if timezone_name == 'server':
        return tzlocal.get_localzone()

    return pytz.timezone(timezone_name)
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_server_timezone(self):
        eq_(h.get_display_timezone(), tzlocal.get_localzone())
项目:pyfan    作者:raptorz    | 项目源码 | 文件源码
def print_status(i, status):
    row = {}
    row[u'index'] = i
    row[u'created'] = datetime.strptime(status['created_at'], "%a %b %d %H:%M:%S +0000 %Y").replace(tzinfo=pytz.utc).astimezone(tzlocal.get_localzone()).strftime("%Y-%m-%d %H:%M:%S")
    row[u'id'] = status['id']
    row[u'screen_name'] = status['user']['screen_name']
    row[u'user_id'] = status['user']['id']
    row[u'text'] = status['text']
    row[u'photo'] = status['photo']['largeurl'] if status.get('photo') else ""
    print(u"[%(index)s]%(created)s(%(id)s)%(screen_name)s(%(user_id)s): %(text)s %(photo)s" % row)
项目:oracle-imagecopy-backup    作者:unibet    | 项目源码 | 文件源码
def ask_user_input():
    global restoreparams, exitvalue

    is_safe = ui.ask_yn("Is this system isolated with no access to production database storage")
    if is_safe != "Y":
        print "Exiting. Please execute this script in an isolated environment."
        exitvalue = 1
        return
    restoreparams['mountpath'] = ui.ask_directory("Directory where to mount clone:", False)
    restoreparams['timepoint'] = ui.ask_timestamp("Restore database to time point")
    is_utc = ui.ask_yn("Was the timestamp in UTC (answer N for local time)")
    if is_utc == "Y":
        tz = pytz.utc
    else:
        tz = get_localzone()
    restoreparams['timepoint'] = tz.localize(restoreparams['timepoint'])
    restore.set_restore_target_time(restoreparams['timepoint'])
    restoreparams['sid'] = ui.ask_string("Target instance name:", 8, True)
    #
    splitter = "######################################"
    print splitter
    print ""
    print "Database unique name: %s" % configname
    print "Oracle home: %s" % Configuration.get("oraclehome", "generic")
    print "Clone mount path: %s" % restoreparams['mountpath']
    print "Target instance SID: %s" % restoreparams['sid']
    print "Restore target time UTC: %s" % restoreparams['timepoint'].astimezone(pytz.utc)
    print "Restore target time local: %s" % restoreparams['timepoint'].astimezone(get_localzone())
    print "Restored from snapshot: %s" % restore.sourcesnapid
    #
    print ""
    is_ok = ui.ask_yn("Are these parameters correct")
    if is_ok != "Y":
        print "Exiting. Please execute this script again."
        exitvalue = 1
        return
    print ""
    print splitter
项目:oracle-imagecopy-backup    作者:unibet    | 项目源码 | 文件源码
def _set_parameters(self):
        dbconfig = SafeConfigParser()
        dbconfig.read(os.path.join(self._mountdest, 'autorestore.cfg'))
        self._dbparams['dbname'] = dbconfig.get('dbparams','db_name')
        if self.targettime is None:
            self._dbparams['restoretarget'] = datetime.strptime(dbconfig.get('dbparams','lasttime'), '%Y-%m-%d %H:%M:%S')
        else:
            self._dbparams['restoretarget'] = self.targettime.astimezone(get_localzone())
        self._dbparams['bctfile'] = dbconfig.get('dbparams','bctfile')
        Configuration.substitutions.update({
            'db_name': self._dbparams['dbname'],
            'db_compatible': dbconfig.get('dbparams','compatible'),
            'db_files': dbconfig.get('dbparams','db_files'),
            'db_undotbs': dbconfig.get('dbparams','undo_tablespace'),
            'db_block_size': dbconfig.get('dbparams','db_block_size'),
#            'lastscn': dbconfig.get('dbparams','lastscn'),
            'lasttime': self._dbparams['restoretarget'].strftime('%Y-%m-%d %H:%M:%S'),
            'dbid': Configuration.get('dbid', self._configname),
            'instancenumber': Configuration.get('autorestoreinstancenumber', self._configname),
            'thread': Configuration.get('autorestorethread', self._configname),
            'backupfinishedtime': dbconfig.get('dbparams','backup-finished'),
            'bctfile': self._dbparams['bctfile'],
            'autorestoredestination': self._restoredest,
            'mountdestination': self._mountdest,
        })
        try:
            Configuration.substitutions.update({'cdb': dbconfig.get('dbparams','enable_pluggable_database')})
        except:
            Configuration.substitutions.update({'cdb': 'FALSE'})
        self._initfile = os.path.join(self._restoredest, 'init.ora')
        Configuration.substitutions.update({
            'initora': self._initfile,
        })
项目:qiime2    作者:qiime2    | 项目源码 | 文件源码
def _ts_to_date(self, ts):
        return datetime.fromtimestamp(ts, tzlocal.get_localzone())
项目:tcex    作者:ThreatConnect-Inc    | 项目源码 | 文件源码
def human_date_to_datetime(time_input, tz='UTC'):
        """ Convert human readable date (e.g. 30 days ago) to datetime.datetime using
            parsedatetime module.

        Examples:

        * August 25th, 2008
        * 25 Aug 2008
        * Aug 25 5pm
        * 5pm August 25
        * next saturday
        * tomorrow
        * next thursday at 4pm
        * at 4pm
        * eod
        * tomorrow eod
        * eod tuesday
        * eoy
        * eom
        * in 5 minutes
        * 5 minutes from now
        * 5 hours before now
        * 2 hours before noon
        * 2 days from tomorrow

        Args:
            time_input (string): The time input string (see formats above).
            tz (string): The time zone for the returned data.

        Returns:
            (datetime.datetime): Python datetime.datetime object.
        """
        dt = None
        cal = pdt.Calendar()
        local_tz = timezone(get_localzone().zone)
        dt, status = cal.parseDT(time_input, tzinfo=local_tz)
        dt = dt.astimezone(timezone(tz))
        if status == 0:
            dt = None

        return dt
项目:scriptabit    作者:DC23    | 项目源码 | 文件源码
def due_date(self, due_date):
        """ Sets or clears the due date. """
        if due_date and not isinstance(due_date, datetime):
            raise TypeError
        if due_date:
            self.__task_dict['date'] = \
                due_date.astimezone(get_localzone()).date()
        elif 'date' in self.__task_dict:
            del self.__task_dict['date']
项目:Habitica-todo    作者:eringiglio    | 项目源码 | 文件源码
def due_date(self, due_date):
        """ Sets or clears the due date. """
        if due_date and not isinstance(due_date, datetime):
            raise TypeError
        if due_date:
            self.__task_dict['date'] = \
                due_date.astimezone(get_localzone()).date()
        elif 'date' in self.__task_dict:
            del self.__task_dict['date']
项目:alexa-apple-calendar    作者:zanderxyz    | 项目源码 | 文件源码
def get_event_detail(self, pguid, guid):
        """
        Fetches a single event's details by specifying a pguid
        (a calendar) and a guid (an event's ID).
        """
        params = dict(self.params)
        params.update({'lang': 'en-us', 'usertz': get_localzone().zone})
        url = '%s/%s/%s' % (self._calendar_event_detail_url, pguid, guid)
        req = self.session.get(url, params=params)
        self.response = req.json()
        return self.response['Event'][0]
项目:maya    作者:kennethreitz    | 项目源码 | 文件源码
def _local_tz(self):
        """Returns the local timezone."""
        return get_localzone()
项目:main_bot    作者:PythonBuddies    | 项目源码 | 文件源码
def time(self, timezone_code):
        """Return current time and time in the timezone listed.

        **Dependencies**: pip install pytz tzlocal

        Keyword arguments:
        timezone_code  -- Code for timezone
        """
        local_time = datetime.now(get_localzone())
        converted_time = datetime.now(timezone(timezone_code.upper()))
        await self.bot.say("```Local time : {} \n\n   {}     : {}```"
                           .format(local_time, timezone_code, converted_time))
项目:web    作者:pyjobs    | 项目源码 | 文件源码
def kw_to_sqlalchemy(sqlalchemy_table_cls, kw):
    sqlalchemy_table = sqlalchemy_table_cls()

    for column, value in kw.iteritems():
        try:
            column_type = _find_type(sqlalchemy_table_cls, column)
            python_type = column_type.python_type
        except NameError:
            continue

        try:
            if not issubclass(type(value), python_type):
                if issubclass(python_type, bool):
                    column_value = True if value.lower() == 'true' else False
                else:
                    column_value = python_type(value)
            else:
                column_value = value
        except UnicodeEncodeError:
            column_value = unicode(value)
        finally:
            if issubclass(python_type, datetime):
                if column_type.timezone:
                    # Convert to local time
                    tz = get_localzone()
                    column_value = tz.localize(column_value, is_dst=None)
            setattr(sqlalchemy_table, column, column_value)

    return sqlalchemy_table
项目:client    作者:syncrypt    | 项目源码 | 文件源码
def print_log(self, verbose=False):
        local_tz = get_localzone()
        for vault in self.vaults:
            try:
                yield from vault.backend.open()
            except VaultNotInitialized:
                logger.error('%s has not been initialized. Use "syncrypt init" to register the folder as vault.' % vault)
                continue
            queue = yield from vault.backend.changes(None, None, verbose=verbose)
            while True:
                item = yield from queue.get()
                if item is None:
                    break
                store_hash, metadata, server_info = item
                bundle = VirtualBundle(None, vault, store_hash=store_hash)
                yield from bundle.write_encrypted_metadata(Once(metadata))
                rev_id = server_info['id'].decode(vault.config.encoding)
                created_at = iso8601.parse_date(server_info['created_at'].decode())\
                        .astimezone(local_tz)\
                        .strftime('%x %X')
                operation = server_info['operation'].decode(vault.config.encoding)
                if verbose:
                    user_email = server_info['email'].decode(vault.config.encoding)
                    print("%s | %s | %s | %-9s %s" % (created_at, rev_id, user_email,
                        operation, bundle.relpath))
                else:
                    print("%s | %-9s %s" % (created_at, operation, bundle.relpath))

        yield from self.wait()
项目:reahl    作者:reahl    | 项目源码 | 文件源码
def is_uploaded_after(self, package, when):
        if not self.is_uploaded(package):
            return False
        a_file = self.uploaded_files_for(package)[0]
        return datetime.datetime.fromtimestamp(os.path.getmtime(a_file), tzlocal.get_localzone()) >= when