Python fnmatch 模块,fnmatch() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用fnmatch.fnmatch()

项目:pep8speaks    作者:OrkoHunter    | 项目源码 | 文件源码
def filename_match(filename, patterns):
    """
    Check if patterns contains a pattern that matches filename.
    """

    # `dir/*` works but `dir/` does not
    for index in range(len(patterns)):
        if patterns[index][-1] == '/':
            patterns[index] += '*'

    # filename has a leading `/` which confuses fnmatch
    filename = filename.lstrip('/')

    # Pattern is a fnmatch compatible regex
    if any(fnmatch.fnmatch(filename, pattern) for pattern in patterns):
        return True

    # Pattern is a simple name of file or directory (not caught by fnmatch)
    for pattern in patterns:
        if '/' not in pattern and pattern in filename.split('/'):
            return True

    return False
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def clearpyc(root, patterns='*',single_level=False, yield_folders=False):
    """
    root: ??¼
    patterns ???????
    single_level: ???¼??
    yield_folders: ??¼??
    """
    patterns = patterns.split(';')
    for path, subdirs, files in os.walk(root):
        if yield_folders:
            files.extend(subdirs)
            files.sort()
        for name in files:
            for pattern in patterns:
                if fnmatch.fnmatch(name, pattern.strip()):# ?pattern???
                    yield os.path.join(path, name)
        if single_level:
            break
项目:HandDetection    作者:YunqiuXu    | 项目源码 | 文件源码
def load_all_url_files(_dir, file_name_prefix):
    url_list = []

    for file_name in os.listdir(_dir):
        if fnmatch.fnmatch(file_name, file_name_prefix +'*.txt'):
            file_name = osp.join(_dir, file_name)
            fp_urls = open(file_name, 'r')        #Open the text file called database.txt
            print 'load URLs from file: ' + file_name

            i = 0
            for line in fp_urls:
                line = line.strip()
                if len(line)>0:
                    splits = line.split('\t')
                    url_list.append(splits[0].strip())
                    i=i+1
            print str(i) + ' URLs loaded'
            fp_urls.close()

    return url_list         
########### End of Functions to Load downloaded urls ###########

############## Functions to get date/time strings ############
项目:caduc    作者:tjamet    | 项目源码 | 文件源码
def get_grace_times(self, names):
        labels = self.details['Config']['Labels']
        if labels and labels.get("com.caduc.image.grace_time"):
            return set([labels.get('com.caduc.image.grace_time', None)])
        grace_config = self.config.get("images")
        grace_times = set()
        if grace_config:
            for name in names:
                for pattern, kv in six.iteritems(grace_config):
                    if fnmatch.fnmatch(name, pattern):
                        grace_time = kv['grace_time']
                        if grace_time is None or grace_time==-1:
                            grace_times.add(float('inf'))
                        else:
                            grace_times.add(kv['grace_time'])
        if grace_times:
            return grace_times
        return set([self.grace_time])
项目:tfplus    作者:renmengye    | 项目源码 | 文件源码
def get_latest_ckpt(self):
        """Get the latest checkpoint filename in a folder."""

        ckpt_fname_pattern = os.path.join(self.folder, self.fname + '.ckpt*')
        ckpt_fname_list = []
        for fn in os.listdir(self.folder):
            fullname = os.path.join(self.folder, fn)
            if fnmatch.fnmatch(fullname, ckpt_fname_pattern):
                if not fullname.endswith('.meta'):
                    ckpt_fname_list.append(fullname)
        if len(ckpt_fname_list) == 0:
            raise Exception(
                'No checkpoint file found {}'.format(ckpt_fname_pattern))
        ckpt_fname_step = [int(fn.split('-')[-1]) for fn in ckpt_fname_list]
        latest_step = max(ckpt_fname_step)

        latest_ckpt = os.path.join(self.folder,
                                   self.fname + '.ckpt-{}'.format(latest_step))
        latest_graph = os.path.join(self.folder,
                                    self.fname + '.ckpt-{}.meta'.format(latest_step))
        return (latest_ckpt, latest_graph, latest_step)
项目:python-powerdns-management    作者:mrlesmithjr    | 项目源码 | 文件源码
def zone_list(module, base_url, zone=None):
    ''' Return list of existing zones '''

    list = []
    url = "{0}".format(base_url)

    response, info = fetch_url(module, url, headers=headers)
    if info['status'] != 200:
        module.fail_json(msg="failed to enumerate zones at %s: %s" % (url, info['msg']))

    content = response.read()
    data = json.loads(content)
    for z in data:
        if zone is None or fnmatch.fnmatch(z['name'], zone):
            list.append({
                'name'      : z['name'],
                'kind'      : z['kind'].lower(),
                'serial'    : z['serial'],
            })
    return list
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def convert_all_files_in_path(self, path):
        if not os.path.exists(path):
            print("'%s': Path doesn't exists. Skipping" % path)
            return
        count = 0
        for filename in os.listdir(path):
            full_path = os.path.join(path, filename)
            only_name, ext = os.path.splitext(full_path)
            cmd = None
            pyfile = None
            if fnmatch.fnmatch(filename, '*.ui'):
                pyfile = '%s.py' % only_name
                cmd = self.PYUIC
            elif fnmatch.fnmatch(filename, '*.qrc'):
                pyfile = '%s_rc.py' % only_name
                cmd = self.PYRCC
            if cmd and modified(full_path, pyfile):
                cmd_string = '%s -o "%s" "%s"' % (cmd, pyfile, full_path)
                os.system(cmd_string)
                count += 1
        print("'%s': %s converted %s files" % (path, time.ctime(time.time()), count))
项目:specton    作者:somesortoferror    | 项目源码 | 文件源码
def getScannerThread(i, filenameStr, mp3guessenc_bin, mediainfo_bin, fileinfo_dialog_update=None, cmd_timeout=300,debug_enabled=False,main_q=None,info_q=None):
    threads = set()
    if fnmatch.fnmatch(filenameStr, "*.mp3"):
        # use mp3guessenc if available
        if not mp3guessenc_bin == "":
            threads.add(scanner_Thread(i, filenameStr, mp3guessenc_bin, "mp3guessenc", "-e", debug_enabled, info_q,
                                    main_q, fileinfo_dialog_update, cmd_timeout))
        elif not mediainfo_bin == "":  # always use mediainfo
            threads.add(scanner_Thread(i, filenameStr, mediainfo_bin, "mediainfo", "-", debug_enabled, info_q, main_q,
                                    fileinfo_dialog_update, cmd_timeout))

    elif fnmatch.fnmatch(filenameStr, "*.flac") and not mediainfo_bin == "":
        threads.add(scanner_Thread(i, filenameStr, mediainfo_bin, "mediainfo", "-", debug_enabled, info_q, main_q,
                                fileinfo_dialog_update, cmd_timeout))
    elif not mediainfo_bin == "":  # default for all files is mediainfo
        threads.add(scanner_Thread(i, filenameStr, mediainfo_bin, "mediainfo", "-", debug_enabled, info_q, main_q,
                                fileinfo_dialog_update, cmd_timeout))
    return threads
项目:RSPET    作者:panagiks    | 项目源码 | 文件源码
def installed_plugins(self):
        """List all plugins installed."""
        from os import listdir
        from fnmatch import fnmatch
        import compiler
        import inspect
        files = listdir('Plugins')
        try:
            files.remove('mount.py')
            files.remove('template.py')
        except ValueError:
            pass
        plugins = {}
        for element in files:
            if fnmatch(element, '*.py') and not fnmatch(element, '_*'):
                plug_doc = compiler.parseFile('Plugins/' + element).doc
                plug_doc = inspect.cleandoc(plug_doc)
                plugins[element[:-3]] = plug_doc # Remove .py)
        return plugins
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def match(item, patterns):
    matched = not patterns
    for pattern in filter(None, patterns):
        negate = False
        if pattern.startswith('-') or pattern.startswith('!'):
            negate = True
            pattern = pattern[1:]
        if pattern.startswith('+'):
            pattern = pattern[1:]

        local_matched = fnmatch.fnmatch(item, pattern)
        if negate:
            matched = matched and not local_matched
        else:
            matched = matched or local_matched

    return matched
项目:gitorg    作者:mariocj89    | 项目源码 | 文件源码
def list(self, pattern):
        res = self._EXTRACT_PATTERN.match(pattern)
        if not res:
            raise URIException(f"Unable to match {pattern},"
                               " please use 'organization[/repo_pattern]'")
        org_name = res.group("org")
        repo_matcher = res.group("repo") or "*"

        try:
            repos = self._gh.get_organization(org_name).get_repos()
        except github.GithubException:
            repos = self._gh.get_user(org_name).get_repos()

        for repo in repos:
            if not fnmatch.fnmatch(repo.name, repo_matcher):
                continue
            if self._clone_protocol == self.CloneProtocol.ssh:
                yield Repo(name=repo.name, url=repo.ssh_url)
            elif self._clone_protocol == self.CloneProtocol.https:
                yield Repo(name=repo.name, url=repo.clone_url)
            else:
                raise RuntimeError(f"Invalid protocol selected: {self._clone_protocol}")
项目:pyrate-build    作者:pyrate-build    | 项目源码 | 文件源码
def match(value, dn, recurse):
    import fnmatch
    result = []
    if recurse:
        walk_entries = os.walk(dn)
    else:
        walk_entries = [(dn, [], os.listdir(dn))]
    for walk_result in walk_entries:
        for fn in walk_result[2]:
            fn = os.path.relpath(os.path.join(walk_result[0], fn), dn)
            accept = False
            for token in value.split():
                negate = token.startswith('-')
                if negate:
                    token = token[1:]
                if not fnmatch.fnmatch(fn, token):
                    continue
                accept = not negate
            if accept:
                result.append(fn)
    result.sort()
    return result
项目:atoolbox    作者:liweitianux    | 项目源码 | 文件源码
def _build(inputs):
        paths = []
        for f in inputs:
            if os.path.isdir(f):
                # Walk through the directory and get all PDF files
                # Credit: https://stackoverflow.com/a/2186565/4856091
                for root, dirnames, filenames in os.walk(f):
                    for filename in fnmatch.filter(filenames, "*.pdf"):
                        paths.append(os.path.join(root, filename))
            elif f.endswith(".pdf"):
                paths.append(f)
            else:
                # Get the contents as list of files
                _files = [line.strip() for line in open(f).readlines()]
                _files = [_f for _f in _files
                          if (not _f.startswith("#")) and (_f != "")]
                paths += _files
        return paths
项目:atoolbox    作者:liweitianux    | 项目源码 | 文件源码
def copy_other(opts, flacdir, outdir):
    if opts.verbose:
        print('COPYING other files')
    for dirpath, dirs, files in os.walk(flacdir, topdown=False):
        for name in files:
            if opts.nolog and fnmatch(name.lower(), '*.log'):
                continue
            if opts.nocue and fnmatch(name.lower(), '*.cue'):
                continue
            if opts.nodots and fnmatch(name.lower(), '^.'):
                continue
            if (not fnmatch(name.lower(), '*.flac')
               and not fnmatch(name.lower(), '*.m3u')):
                d = re.sub(re.escape(flacdir), outdir, dirpath)
                if (os.path.exists(os.path.join(d, name))
                   and not opts.overwrite):
                    continue
                if not os.path.exists(d):
                    os.makedirs(d)
                shutil.copy(os.path.join(dirpath, name), d)
项目:atoolbox    作者:liweitianux    | 项目源码 | 文件源码
def remove(path, dest_root, dryrun=False, debug=False):
        """
        Remove the specified file/directory using `rm -rf`, to clean
        up the destination backup.

        The specified path must locate under the `dest_root` for safety.
        """
        if not fnmatch(path, dest_root+"/*"):
            raise ValueError("Not allowed to remove file/directory "
                             "outside destination: %s" % path)
        if not os.path.exists(path):
            return
        logger.info("Remove: %s" % path)
        args = ["-r", "-f"]
        if debug:
            args += ["-v"]
        cmd = ["rm"] + args + [path]
        if not dryrun:
            subprocess.check_call(cmd)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def fnmatch_lines_random(self, lines2):
        """Check lines exist in the output.

        The argument is a list of lines which have to occur in the
        output, in any order.  Each line can contain glob whildcards.

        """
        lines2 = self._getlines(lines2)
        for line in lines2:
            for x in self.lines:
                if line == x or fnmatch(x, line):
                    self._log("matched: ", repr(line))
                    break
            else:
                self._log("line %r not found in output" % line)
                raise ValueError(self._log_text)
项目:pythonVSCode    作者:DonJayamanne    | 项目源码 | 文件源码
def should_skip(filename, config, path='/'):
    """Returns True if the file should be skipped based on the passed in settings."""
    for skip_path in config['skip']:
        if posixpath.abspath(posixpath.join(path, filename)) == posixpath.abspath(skip_path.replace('\\', '/')):
            return True

    position = os.path.split(filename)
    while position[1]:
        if position[1] in config['skip']:
            return True
        position = os.path.split(position[0])

    for glob in config['skip_glob']:
        if fnmatch.fnmatch(filename, glob):
            return True

    return False
项目:tecken    作者:mozilla-services    | 项目源码 | 文件源码
def get_bucket_info(user):
    """return an object that has 'bucket', 'endpoint_url',
    'region'.
    Only 'bucket' is mandatory in the response object.
    """
    url = settings.UPLOAD_DEFAULT_URL
    exceptions = settings.UPLOAD_URL_EXCEPTIONS
    if user.email.lower() in exceptions:
        # easy
        exception = exceptions[user.email.lower()]
    else:
        # match against every possible wildcard
        exception = None  # assume no match
        for email_or_wildcard in settings.UPLOAD_URL_EXCEPTIONS:
            if fnmatch.fnmatch(user.email.lower(), email_or_wildcard.lower()):
                # a match!
                exception = settings.UPLOAD_URL_EXCEPTIONS[
                    email_or_wildcard
                ]
                break

    if exception:
        url = exception

    return S3Bucket(url)
项目:munkisuit    作者:velotraveler    | 项目源码 | 文件源码
def list(self, labelmatch="*"):
        '''
        run "launchctl list" and return list of jobs that match the shell-
        style wildcard <labelmatch>

        since all jobs have our app-specific prefix, and have their
        run time appended to their names, caller should terminate
        <labelmatch> with ".*" unless they already have an exact name.
        '''
        results = []
        try:
            output = subprocess.check_output(["sudo", "launchctl", "list"])
            for line in output.split("\n"):
                if len(line) == 0:
                    break
                fields= line.split(None,3)
                if len(fields) < 3:
                    self.bomb("unexpected output from 'sudo launchctl list: %s'" % line)
                job= fields[2]
                if fnmatch(job, self.prefix + labelmatch):
                    results.append(job[len(self.prefix):])
        except (subprocess.CalledProcessError) as error:
            self.bomb("running 'sudo launchctl list'", error)
        return results
项目:hlama    作者:bihealth    | 项目源码 | 文件源码
def get_mode(self, paths):
        """Return ``SINGLE_END`` or ``PAIRED_END`` from list of file paths

        Raise InputDataException if non-existing.
        """
        seen = {0: 0, 1: 0}  # counters
        for path in paths:
            filename = os.path.basename(path)
            for i, patterns in enumerate([PATTERNS_R1, PATTERNS_R2]):
                for pattern in patterns:
                    if fnmatch.fnmatch(filename, pattern):
                        seen[i] += 1
                        break
        if not seen[0] and seen[1]:
            raise InvalidDataException('Have seen only R2 in {}!'.format(
                ','.join(paths)))
        if seen[1] and seen[0] != seen[1]:
            raise InvalidDataException(
                'Have seen different number of R1 and R2 reads in {}'.format(
                    ','.join(paths)))
        return (PAIRED_END if seen[1] else SINGLE_END)
项目:piqueserver    作者:piqueserver    | 项目源码 | 文件源码
def commands(connection, value=None):
    names = []
    for command in cmdlist:
        command_func = cmdlist[command]
        if (hasattr(command_func, 'user_types')
                and command not in connection.rights):
            continue
        include = False
        if (value is None or fnmatch.fnmatch(command, value)):
            include = True
        aliases = []
        for a in aliaslist:
            if aliaslist[a] == command:
                if (value is None or fnmatch.fnmatch(a, value)):
                    include = True
                aliases.append(a)
        cmd = command if len(aliases) == 0 else (
            '%s (%s)' % (command, ', '.join(aliases)))
        if include:
            names.append(cmd)
    return 'Commands: %s' % (', '.join(names))
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def select_host(self, host, host_names=None):
        """
        checks that host is valid, i.e. in the list of glob host_names
        if the host is missing, then is it selects the first entry from host_names
        read more here: https://github.com/web2py/web2py/issues/1196
        """
        if host:
            if host_names:
                for item in host_names:
                    if fnmatch.fnmatch(host, item):
                        break
                else:
                    raise HTTP(403, "Invalid Hostname")
        elif host_names:
            host = host_names[0]
        else:
            host = 'localhost'
        return host
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def __getitem__(self, index):
        while 1:
            try:
                file = self.files[self.index]
                self.index = self.index + 1
            except IndexError:
                self.index = 0
                self.directory = self.stack.pop()
                self.files = os.listdir(self.directory)
            else:
                fullname = os.path.join(self.directory, file)
                if os.path.isdir(fullname) and not os.path.islink(fullname):
                    self.stack.append(fullname)
                if not (file.startswith('.') or file.startswith('#') or file.endswith('~')) \
                        and fnmatch.fnmatch(file, self.pattern):
                    return fullname
项目:reportIT    作者:stevekm    | 项目源码 | 文件源码
def find_analysis_barcode_file(parent_dir):
    '''
    Find the barcodes file for the analysis
    # analysis_barcode_file="$(find "$analysis_outdir"  -path "*variantCaller_out*" -name "sample_barcode_IDs.tsv" | head -1)"
    '''
    import os
    import fnmatch
    analysis_barcode_file = None
    for root, dirs, files in os.walk(parent_dir):
        if fnmatch.fnmatch(root, "*variantCaller_out*"):
            for file in files:
                if fnmatch.fnmatch(file, "sample_barcode_IDs.tsv"):
                    analysis_barcode_file = os.path.join(root, file)
                    break
    # make sure file exists
    if analysis_barcode_file == None:
        print("ERROR: Analysis barcode file not found")
    pl.file_exists(analysis_barcode_file, kill = True)
    return(analysis_barcode_file)
项目:reportIT    作者:stevekm    | 项目源码 | 文件源码
def find_analysis_barcode_file(parent_dir):
    '''
    Find the barcodes file for the analysis
    # analysis_barcode_file="$(find "$analysis_outdir"  -path "*variantCaller_out*" -name "sample_barcode_IDs.tsv" | head -1)"
    '''
    import os
    import fnmatch
    analysis_barcode_file = None
    for root, dirs, files in os.walk(parent_dir):
        if fnmatch.fnmatch(root, "*variantCaller_out*"):
            for file in files:
                if fnmatch.fnmatch(file, "sample_barcode_IDs.tsv"):
                    analysis_barcode_file = os.path.join(root, file)
                    break
    # make sure file exists
    if analysis_barcode_file == None:
        print("ERROR: Analysis barcode file not found")
    pl.file_exists(analysis_barcode_file, kill = True)
    return(analysis_barcode_file)
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def included_in_wildcard(self, names, target_name):
        """Is target_name covered by a wildcard?

        :param names: server aliases
        :type names: `collections.Iterable` of `str`
        :param str target_name: name to compare with wildcards

        :returns: True if target_name is covered by a wildcard,
            otherwise, False
        :rtype: bool

        """
        # use lowercase strings because fnmatch can be case sensitive
        target_name = target_name.lower()
        for name in names:
            name = name.lower()
            # fnmatch treats "[seq]" specially and [ or ] characters aren't
            # valid in Apache but Apache doesn't error out if they are present
            if "[" not in name and fnmatch.fnmatch(target_name, name):
                return True
        return False
项目:image_recognition    作者:tue-robotics    | 项目源码 | 文件源码
def read_annotated(dir_path, patterns=["*.jpg", "*.png", "*.jpeg"]):
    """
    Read annotated images from a directory. This reader assumes that the images in this directory are separated in
    different directories with the label name as directory name. The method returns a generator of the label (string)
    , the opencv image and the filename.
    :param dir_path: The base directory we are going to read
    :param patterns: Patterns of the images the reader should match
    """
    for label in os.listdir(dir_path):
        for root, dirs, files in os.walk(os.path.join(dir_path, label)):
            for basename in files:
                for pattern in patterns:
                    if fnmatch.fnmatch(basename, pattern):
                        filename = os.path.join(root, basename)
                        image = cv2.imread(filename)
                        if image is None:
                            print ">> Ignore empty image {f}".format(f=filename)
                        else:
                            yield label, image, filename
项目:live-serial    作者:rosenbrockc    | 项目源码 | 文件源码
def _load_sensors(config):
    """Loads all the sensors from the specified config file.

    Args:
        config (ConfigParser): instance from which to extract the sensor list.
          `str` is also allowed, in which case it
          should be the path to the config file to load.
    """
    global _sensors, _sensors_parsed
    if not _sensors_parsed:
        parser = _get_parser(config)
        if parser is not None:
            #Now that we have the thread, we can add configuration for each of the
            #sensors in the config file.
            from fnmatch import fnmatch
            for section in parser.sections():
                if fnmatch(section, "sensor.*"):
                    name = section[len("sensor."):]
                    _sensors[name] = Sensor(None,name,**dict(parser.items(section)))

        _sensors_parsed = True
项目:sentence-classification    作者:jind11    | 项目源码 | 文件源码
def preprocess_data_and_labels_AAP(data_file_path, save_path):
    def merge_folds(data_file_path, save_path):
        # merge all the separated folds into one file
        train = []
        val = []
        test = []
        for file in os.listdir(data_file_path):
            if fnmatch.fnmatch(file, '*train.txt'):
                train += (open(data_file_path + '/' + file, 'r').readlines())
            elif fnmatch.fnmatch(file, '*validation.txt'):
                val += (open(data_file_path + '/' + file, 'r').readlines())
            else:
                test += (open(data_file_path + '/' + file, 'r').readlines())

        open(save_path + '/train.txt', 'w').write(''.join(train))
        open(save_path + '/val.txt', 'w').write(''.join(val))
        open(save_path + '/test.txt', 'w').write(''.join(test))
        print len(train+val+test)

    merge_folds(data_file_path, save_path)
项目:AshsSDK    作者:thehappydinoa    | 项目源码 | 文件源码
def _match_pattern(self, pattern, file_info):
        file_status = None
        file_path = file_info.src
        pattern_type = pattern[0]
        if file_info.src_type == 'local':
            path_pattern = pattern[1].replace('/', os.sep)
        else:
            path_pattern = pattern[1].replace(os.sep, '/')
        is_match = fnmatch.fnmatch(file_path, path_pattern)
        if is_match and pattern_type == 'include':
            file_status = (file_info, True)
            LOG.debug("%s matched include filter: %s",
                        file_path, path_pattern)
        elif is_match and pattern_type == 'exclude':
            file_status = (file_info, False)
            LOG.debug("%s matched exclude filter: %s",
                        file_path, path_pattern)
        else:
            LOG.debug("%s did not match %s filter: %s",
                        file_path, pattern_type[2:], path_pattern)
        return file_status
项目:inline-plz    作者:guykisel    | 项目源码 | 文件源码
def run_per_file(config, ignore_paths=None, path=None, config_dir=None):
    ignore_paths = ignore_paths or []
    path = path or os.getcwd()
    cmd = run_config(config, config_dir)
    print(cmd)
    run_cmds = []
    patterns = PATTERNS.get(config.get('language'))
    paths = all_filenames_in_dir(path=path, ignore_paths=ignore_paths)
    for pattern in patterns:
        for filepath in fnmatch.filter(paths, pattern):
            run_cmds.append(cmd + [filepath])
    pool = Pool()

    def result(run_cmd):
        _, out = run_command(run_cmd)
        return run_cmd[-1], out

    output = pool.map(result, run_cmds)
    return output
项目:jawaf    作者:danpozmanter    | 项目源码 | 文件源码
def _render(self, target_name, read_path, write_path):
        """Render a given template or directory for the target.
        :param target_name: String. Project or App name to render.
        :param read_path: String. Path to template or directory to render.
        :param write_path: String. Path to write to (or create directory).
        """
        if os.path.isdir(read_path):
            if os.path.split(read_path)[1] == 'project_name':
                write_path = os.path.join(os.path.split(write_path)[0], self.variables['project_name'])
            os.mkdir(write_path)
            for filename in os.listdir(read_path):
                if fnmatch(filename, 'test_*'):
                    write_filename = filename.replace('test_', f'test_{target_name}_')
                else:
                    write_filename = filename
                self._render(target_name, os.path.join(read_path, filename), os.path.join(write_path, write_filename))
        else:
            tpl = Template(filename=read_path)
            with open(os.path.splitext(write_path)[0], 'w') as f:
                f.write(tpl.render(**self.variables))
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def match_file(self, filename):
        """Used to check if files can be handled by this linter,
        Often this will just file extension checks."""
        pattern = self.options.get('pattern') or self.default_pattern
        if not pattern:
            return True

        globs = pattern.split()
        for glob in globs:
            if fnmatch.fnmatch(filename, glob):
                # ??? glob ??
                return True
        try:
            if re.match(pattern, filename, re.I):
                # ???????????
                return True
        except re.error:
            pass
        return False
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def get_filter(opts=None):
    opts = opts or []
    if 'inc=*' in opts:
        # do not filter any files, include everything
        return None

    def _filter(dir, ls):
        incs = [opt.split('=').pop() for opt in opts if 'inc=' in opt]
        _filter = []
        for f in ls:
            _f = os.path.join(dir, f)

            if not os.path.isdir(_f) and not _f.endswith('.py') and incs:
                if True not in [fnmatch(_f, inc) for inc in incs]:
                    logging.debug('Not syncing %s, does not match include '
                                  'filters (%s)' % (_f, incs))
                    _filter.append(f)
                else:
                    logging.debug('Including file, which matches include '
                                  'filters (%s): %s' % (incs, _f))
            elif (os.path.isfile(_f) and not _f.endswith('.py')):
                logging.debug('Not syncing file: %s' % f)
                _filter.append(f)
            elif (os.path.isdir(_f) and not
                  os.path.isfile(os.path.join(_f, '__init__.py'))):
                logging.debug('Not syncing directory: %s' % f)
                _filter.append(f)
        return _filter
    return _filter
项目:nova    作者:hubblestack    | 项目源码 | 文件源码
def _get_tags(data):
    '''
    Retrieve all the tags for this distro from the yaml
    '''
    ret = {}
    distro = __grains__.get('osfinger')
    for audit_dict in data.get('command', []):
        # command:0
        for audit_id, audit_data in audit_dict.iteritems():
            # command:0:nodev
            tags_dict = audit_data.get('data', {})
            # command:0:nodev:data
            tags = None
            for osfinger in tags_dict:
                if osfinger == '*':
                    continue
                osfinger_list = [finger.strip() for finger in osfinger.split(',')]
                for osfinger_glob in osfinger_list:
                    if fnmatch.fnmatch(distro, osfinger_glob):
                        tags = tags_dict.get(osfinger)
                        break
                if tags is not None:
                    break
            # If we didn't find a match, check for a '*'
            if tags is None:
                tags = tags_dict.get('*', {})
            # command:0:nodev:data:Debian-8
            if 'tag' not in tags:
                tags['tag'] = ''
            tag = tags['tag']
            if tag not in ret:
                ret[tag] = []
            formatted_data = {'tag': tag,
                              'module': 'command'}
            formatted_data.update(audit_data)
            formatted_data.update(tags)
            formatted_data.pop('data')
            ret[tag].append(formatted_data)
    return ret
项目:cpe_utils    作者:ExodusIntelligence    | 项目源码 | 文件源码
def matches(self, cpe):
        """Return true or false if this CPE object matches
        the provided cpe_str, using wildcards.

        :param cpe: The cpe to compare against
        """
        # TODO see issue #3

        if self.vendor and not fnmatch.fnmatch(cpe.vendor, self.vendor):
            print ("vendor was false")
            return False
        elif self.product and not fnmatch.fnmatch(cpe.product, self.product):
            print ("product was false")      
            return False
        elif self.version and not fnmatch.fnmatch(cpe.version, self.version):
            print ("version was false")
            return False
        elif self.update and not fnmatch.fnmatch(cpe.update, self.update):
            print ("update was false")
            return False
        elif self.edition and not fnmatch.fnmatch(cpe.edition, self.edition):
            print ("edition was false")
            return False
        elif self.part and not fnmatch.fnmatch(cpe.part, self.part):
            print ("part was false")
            return False
        else:
            return True
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _match_path(self, path, full_path, pattern):
        # override this method to use alternative matching strategy
        return fnmatch(path, pattern)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def is_skipped_module(self, module_name):
        for pattern in self.skip:
            if fnmatch.fnmatch(module_name, pattern):
                return True
        return False
项目:git-of-theseus    作者:erikbern    | 项目源码 | 文件源码
def get_entries(commit):
    return [entry for entry in commit.tree.traverse()
            if entry.type == 'blob'
            and all([fnmatch.fnmatch(entry.path, pattern) for pattern in args.only])
            and not any([fnmatch.fnmatch(entry.path, pattern) for pattern in args.ignore])]
项目:eventdriventalk    作者:cachedout    | 项目源码 | 文件源码
def process_event(self, event):
        '''
        Take an event and attempt to match it
        in the list of keys.

        If found, schedule the requested action.
        '''
        if not self.opts:
            return
        for tag in self.opts:
            if fnmatch.fnmatch(event['tag'], tag):
                for action in self.opts[tag]['reactions']:
                    # Super-simple non-blocking appraoch
                    # Threading won't scale as much as a true event loop
                    # would. It will, however, handle cases where single-threaded
                    # loop would be blocked. Do you trust your reactions to be co-op?!
                    # Of course, the other side of this is thread-safety. Either way, be smart!
                    t = threading.Thread(target=self.react, args=(action, event))
                    t.start()
                if 'rules' in self.opts[tag]:
                    rule_actions = []
                for rule in self.opts[tag]['rules']:
                    rule_actions = process_rule(rule, event, tracking_id)
                    if rule_actions:
                        for action in rule_actions:
                            self.react(action.keys()[0], action.values())
                    else:
                        # Rule chaining ends when a rule does not match
                        break
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _process(self, event):
        """Processes a raw event

        Creates the proper salt event class wrapper and notifies listeners

        Args:
            event (dict): the raw event data
        """
        logger.debug("Process event -> %s", event)
        wrapper = None
        if fnmatch.fnmatch(event['tag'], 'salt/job/*/new'):
            wrapper = NewJobEvent(event)
            for listener in self.listeners:
                listener.handle_salt_event(wrapper)
                listener.handle_new_job_event(wrapper)
        elif fnmatch.fnmatch(event['tag'], 'salt/run/*/new'):
            wrapper = NewRunnerEvent(event)
            for listener in self.listeners:
                listener.handle_salt_event(wrapper)
                listener.handle_new_runner_event(wrapper)
        elif fnmatch.fnmatch(event['tag'], 'salt/job/*/ret/*'):
            wrapper = RetJobEvent(event)
            for listener in self.listeners:
                listener.handle_salt_event(wrapper)
                listener.handle_ret_job_event(wrapper)
        elif fnmatch.fnmatch(event['tag'], 'salt/run/*/ret'):
            wrapper = RetRunnerEvent(event)
            for listener in self.listeners:
                listener.handle_salt_event(wrapper)
                listener.handle_ret_runner_event(wrapper)
        elif fnmatch.fnmatch(event['tag'], 'salt/state_result/*'):
            wrapper = StateResultEvent(event)
            for listener in self.listeners:
                listener.handle_salt_event(wrapper)
                listener.handle_state_result_event(wrapper)
项目:Proxy46    作者:Macronut    | 项目源码 | 文件源码
def matchGlob(self,pattern):
        if type(pattern) != DNSLabel:
            pattern = DNSLabel(pattern)
        return fnmatch.fnmatch(str(self).lower(),str(pattern).lower())
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def Walk(root='.', recurse=True, pattern='*'):
    """
        Generator for walking a directory tree.
        Starts at specified root folder, returning files
        that match our pattern. Optionally will also
        recurse through sub-folders.
    """
    for path, subdirs, files in os.walk(root):
        for name in files:
            if fnmatch.fnmatch(name, pattern):
                yield os.path.join(path, name)
        if not recurse:
            break
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def Walk( root, recurse=0, pattern='*', return_folders=0 ):
    import fnmatch, os, string

    # initialize
    result = []

    # must have at least root folder
    try:
        names = os.listdir(root)
    except os.error:
        return result

    # expand pattern
    pattern = pattern or '*'
    pat_list = string.splitfields( pattern , ';' )

    # check each file
    for name in names:
        fullname = os.path.normpath(os.path.join(root, name))

        # grab if it matches our pattern and entry type
        for pat in pat_list:
            if fnmatch.fnmatch(name, pat):
                if os.path.isfile(fullname) or (return_folders and os.path.isdir(fullname)):
                    result.append(fullname)
                continue

        # recursively scan other folders, appending results
        if recurse:
            if os.path.isdir(fullname) and not os.path.islink(fullname):
                result = result + Walk( fullname, recurse, pattern, return_folders )

    return result
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def callback(arg, directory, files):
    for file in files:
        if fnmatch.fnmatch(file,arg):
            for line in fileinput.input(os.path.abspath(os.path.join(directory, file)),inplace=1):
                if re.search('.*theunderdogs.*', line): # I changed * to .* but it would probably work without this if
                    line = string.replace(line,'theunderdogs','the-underdogs') # old string , new string
                print line,
项目:automatron    作者:madflojo    | 项目源码 | 文件源码
def apply_to_targets(runbooks, config, dbc):
    ''' Match hosts with runbooks '''
    targets = dbc.get_target()
    logger.debug("Found targets: {0}".format(json.dumps(targets)))
    for target in targets.keys():
        # Create runbook dictionary if it doesn't exist
        if "runbooks" not in targets[target].keys():
            logger.debug("Creating runbook dictionary in target config")
            targets[target]['runbooks'] = {}
        logger.debug("Identifying runbooks for target {0}".format(target))
        for matcher in runbooks.keys():
            if fnmatch.fnmatch(targets[target]['hostname'], matcher):
                for runbook in runbooks[matcher].keys():
                    logger.debug("Checking if {0} is already applied".format(runbook))
                    if runbook not in targets[target]['runbooks'].keys():
                        try:
                            targets[target]['runbooks'][runbook] = render_runbooks(
                                runbooks[matcher][runbook],
                                targets[target]['facts'])
                        except Exception as e:
                            logger.warn("Could not apply runbook {0} to target {1}: {2}".format(
                                runbook,
                                targets[target]['hostname'],
                                e.message
                            ))
                        dbc.save_target(target=targets[target])
                        msg = {
                            'msg_type' : 'runbook_add',
                            'runbook' : runbook,
                            'target' : target}
                        logger.debug("Adding runbook policy {0} to target {1}".format(
                            runbook, target))
                        count = dbc.notify("monitors", msg)
                        logger.info("Notified {0} of runbook changes to target {1}".format(
                            count, target))
                    else:
                        logger.debug("{0} is already applied to target {1}".format(runbook, target))
    return True
项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def is_domain_match_glob_whitelist(domain):
    """
    ?????? `domains_whitelist_auto_add_glob_list` ???????
    :type domain: str
    :rtype: bool
    """
    for domain_glob in domains_whitelist_auto_add_glob_list:
        if fnmatch(domain, domain_glob):
            return True
    return False
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def filename_match(filename, patterns, default=True):
    """Check if patterns contains a pattern that matches filename.

    If patterns is unspecified, this always returns True.
    """
    if not patterns:
        return default
    return any(fnmatch(filename, pattern) for pattern in patterns)
项目:dottorrent-gui    作者:kz26    | 项目源码 | 文件源码
def run(self):
        def callback(*args):
            return self.isInterruptionRequested()

        entries = os.listdir(self.path)
        for i, p in enumerate(entries):
            if any(fnmatch(p, ex) for ex in self.exclude):
                continue
            p = os.path.join(self.path, p)
            if not dottorrent.is_hidden_file(p):
                sfn = os.path.split(p)[1] + '.torrent'
                self.progress_update.emit(sfn, i, len(entries))
                t = dottorrent.Torrent(
                    p,
                    exclude=self.exclude,
                    trackers=self.trackers,
                    web_seeds=self.web_seeds,
                    private=self.private,
                    source=self.source,
                    comment=self.comment,
                    include_md5=self.include_md5,
                    creation_date=datetime.now(),
                    created_by=CREATOR
                )
                try:
                    self.success = t.generate(callback=callback)
                # ignore empty inputs
                except dottorrent.exceptions.EmptyInputException:
                    continue
                except Exception as exc:
                    self.onError.emit(str(exc))
                    return
                if self.isInterruptionRequested():
                    return
                if self.success:
                    with open(os.path.join(self.save_dir, sfn), 'wb') as f:
                        t.save(f)
项目:hubble-salt    作者:hubblestack    | 项目源码 | 文件源码
def _get_tags(data):
    '''
    Retrieve all the tags for this distro from the yaml
    '''
    ret = {}
    distro = __grains__.get('osfinger')
    for audit_dict in data.get('command', []):
        # command:0
        for audit_id, audit_data in audit_dict.iteritems():
            # command:0:nodev
            tags_dict = audit_data.get('data', {})
            # command:0:nodev:data
            tags = None
            for osfinger in tags_dict:
                if osfinger == '*':
                    continue
                osfinger_list = [finger.strip() for finger in osfinger.split(',')]
                for osfinger_glob in osfinger_list:
                    if fnmatch.fnmatch(distro, osfinger_glob):
                        tags = tags_dict.get(osfinger)
                        break
                if tags is not None:
                    break
            # If we didn't find a match, check for a '*'
            if tags is None:
                tags = tags_dict.get('*', {})
            # command:0:nodev:data:Debian-8
            if 'tag' not in tags:
                tags['tag'] = ''
            tag = tags['tag']
            if tag not in ret:
                ret[tag] = []
            formatted_data = {'tag': tag,
                              'module': 'command'}
            formatted_data.update(audit_data)
            formatted_data.update(tags)
            formatted_data.pop('data')
            ret[tag].append(formatted_data)
    return ret