Python utils 模块,get_config() 实例源码

我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用utils.get_config()

项目:deepdrive    作者:vdt    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser(description=None)
    parser.add_argument('-g', '--gtav-dir-only', action='store_true', help='Only install things into the gtav directory')
    args = parser.parse_args()
    logging.basicConfig()
    logger.setLevel(logging.INFO)

    config = utils.get_config()

    if not config:
        default_install_dir = 'C:\\Program Files\DeepDrive'
        install_dir = input('Where would you like to install DeepDrive? (press enter for %s): ' % default_install_dir)
        install_dir = install_dir or default_install_dir
        logger.info('Installing to %s', install_dir)
        config = {'install_dir': install_dir}
        config_dir = os.path.dirname(DEEP_DRIVE_CONFIG_LOCATION)
        if not os.path.exists(config_dir):
            os.makedirs(config_dir)
        with open(DEEP_DRIVE_CONFIG_LOCATION, 'w+') as outfile:
            json.dump(config, outfile, indent=4, sort_keys=True)

    setup(config, args)
项目:ISS    作者:RyanJenkins    | 项目源码 | 文件源码
def clean(self, value):
        value = super(BBCodeField, self).clean(value)

        if not isinstance(value, basestring):
            # Probably none, field might be optional, in any case there's no
            # use trying to parse this thing.
            return value

        counts = utils.get_tag_distribution(value)
        embedded_tags = counts['video'] + counts['img'] + counts['bc']
        cool_tags = counts['byusingthistagiaffirmlannyissupercool']
        max_embeds = utils.get_config('max_embedded_items')

        if embedded_tags > max_embeds:
            raise ValidationError(
                ('BBCode must contain %d or fewer embedded items. '
                 'Contained %d.') % (max_embeds, embedded_tags),
                code='TOO_MANY_EMBEDS')

        if cool_tags > 10:
            raise ValidationError(
                'Cool tag bro, but don\'t overuse it.',
                code='TOO_MUCH_COOL')

        return value
项目:ISS    作者:RyanJenkins    | 项目源码 | 文件源码
def clean(self, *args, **kwargs):
        super(InitialPeriodLimitingForm, self).clean(*args, **kwargs)

        post_count = self._author.post_set.count()
        if post_count < utils.get_config('initial_account_period_total'):
            window_start = timezone.now() - utils.get_config(
                'initial_account_period_width')

            posts_in_window = (self._author
                                   .post_set
                                   .order_by('-created')
                                   .filter(created__gte=window_start)
                                   .count())

            if posts_in_window >= utils.get_config(
                    'initial_account_period_limit'):
                raise ValidationError(
                    ('You\'ve made too many posts on a new account. This '
                     'control will be removed once your account is better '
                     'established.'),
                    code='FLOOD_CONTROL')
项目:ISS    作者:RyanJenkins    | 项目源码 | 文件源码
def clean_username(self):
        username = self.cleaned_data['username']
        norm_username = Poster.normalize_username(username)
        forbidden_names = {
            Poster.normalize_username(utils.get_config('junk_user_username')),
            Poster.normalize_username(utils.get_config('system_user_username'))
        }

        if norm_username in forbidden_names:
            raise ValidationError('You may not register that username.',
                                  code='FORBIDDEN_USERNAME')

        if len(norm_username) < 1:
            raise ValidationError('Invalid username', code='INVALID_GENERAL')

        if Poster.objects.filter(normalized_username=norm_username).count():
            raise ValidationError(
                'User with a similar username already exists',
                code='TOO_SIMILAR')

        return username
项目:ISS    作者:RyanJenkins    | 项目源码 | 文件源码
def clean(self, *args, **kwargs):
        super(NewPrivateMessageForm, self).clean(*args, **kwargs)

        try:
            last_pm = (PrivateMessage.objects
                .filter(sender=self._author)
                .order_by('-created'))[0]

        except IndexError:
            return self.cleaned_data


        time_since = (timezone.now() - last_pm.created).total_seconds()
        flood_control = utils.get_config('private_message_flood_control')

        if time_since < flood_control:
            raise ValidationError(
                ('Flood control has blocked this message from being sent, '
                 'you can send another PM in %(ttp)d seconds.'),
                params={'ttp': flood_control - time_since},
                code='FLOOD_CONTROL')


        return self.cleaned_data
项目:fingerprint-securedrop    作者:freedomofpress    | 项目源码 | 文件源码
def __init__(self, database_config=None):
        if not database_config:
            database_config = get_config()['database']

        try:
            self.engine = create_engine(
                'postgresql://{pguser}:@{pghost}:{pgport}/{pgdatabase}'.format(
                    **database_config))
        except OperationalError as exc:
            panic("fingerprint-securedrop Postgres support relies on use of a "
                  "PGPASSFILE. Make sure this file exists in your homedir with "
                  "0600 permissions:\n{}.".format(exc))
项目:fingerprint-securedrop    作者:freedomofpress    | 项目源码 | 文件源码
def test_read_config(self):
        config = get_config()
        self.assertTrue(config.has_section('sorter'))
        self.assertTrue(config.has_section('crawler'))
        self.assertIsInstance(config.getint('sorter', 'page_load_timeout'),
                              int)
        entry_nodes = config['crawler']['entry_nodes'].split(',')
        self.assertRegex(entry_nodes[0], "[0-9A-F]{40}")
项目:fingerprint-securedrop    作者:freedomofpress    | 项目源码 | 文件源码
def test_coalesce_ordered_dict(self):
        config = get_config()
        class_tests = coalesce_ordered_dict(config['sorter']['class_tests'])
        self.assertIsInstance(class_tests, OrderedDict)
项目:deepdrive    作者:vdt    | 项目源码 | 文件源码
def main():
    default_weights_path = 'caffe_deep_drive_train_iter_35352.caffemodel'
    parser = argparse.ArgumentParser(description=None)
    parser.add_argument('-v', '--verbose', action='count', dest='verbosity', default=0, help='Set verbosity.')
    parser.add_argument('-w', '--weights', default=default_weights_path, help='Path to caffemodel weights file - default is ' + default_weights_path)
    args = parser.parse_args()

    logging.basicConfig()

    if args.verbosity == 0:
        logger.setLevel(logging.INFO)
    elif args.verbosity >= 1:
        logger.setLevel(logging.DEBUG)

    GTAVRunner._kill_competing_procs()

    enforce_version(GTAV_DIR)

    install_dir = utils.get_config()['install_dir']

    runner = GTAVRunner(install_dir, args.weights)
    runner.popen()
    time.sleep(10)  # Give some time for caffe to create a new log file
    tail_caffe_logs()
    while True:
        if utils.processes_are_running(ALL_PROCESS_NAMES):
            if 'GTAV_DEAD_MANS_SNITCH_URL' in os.environ:  # i.e. https://nosnch.in/a69389848a
                # Send heartbeat (for monitoring long running environments)
                logging.info('Sending heartbeat')
                try:
                    urllib2.urlopen(os.environ['GTAV_DEAD_MANS_SNITCH_URL']).read()
                except Exception as e:
                    logging.error('Error sending heartbeat \n' + traceback.format_exc())
        time.sleep(15 * 60)
项目:ISS    作者:RyanJenkins    | 项目源码 | 文件源码
def _add_shortcode_preprocessor(parser):
    global shortcode_pat
    global shortcode_map

    if not shortcode_map:
        shortcode_map = utils.get_config('shortcode_map')

    if not shortcode_pat:
        scp = []
        for name, _ in shortcode_map.items():
            scp.append(name)

        shortcode_pat = re.compile(':(%s):' % '|'.join(scp), flags=re.IGNORECASE)


    def _preprocess_shortcode(text):
        def _repl(match):
            name = match.group(1)
            if name in shortcode_map:
                return '[shortcode]%s[/shortcode]' % name
            else:
                return match.group(0)

        return re.sub(shortcode_pat, _repl, text)

    parser.add_preprocessor(_preprocess_shortcode)
    return parser
项目:ISS    作者:RyanJenkins    | 项目源码 | 文件源码
def save(self, editor=None):
        post = self.cleaned_data['post']
        post.content = self.cleaned_data['content']

        edit_time = timezone.now() - post.created
        if edit_time.total_seconds() > utils.get_config('ninja_edit_grace_time'):
            editor_name = editor.username if editor else 'unknown'
            post.content += '\n\n[i]Post last edited by %s at %s[/i]' % (
                    editor_name, timezone.now().isoformat())

        post.save()