我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用fnmatch.fnmatch()。
def filename_match(filename, patterns): """ Check if patterns contains a pattern that matches filename. """ # `dir/*` works but `dir/` does not for index in range(len(patterns)): if patterns[index][-1] == '/': patterns[index] += '*' # filename has a leading `/` which confuses fnmatch filename = filename.lstrip('/') # Pattern is a fnmatch compatible regex if any(fnmatch.fnmatch(filename, pattern) for pattern in patterns): return True # Pattern is a simple name of file or directory (not caught by fnmatch) for pattern in patterns: if '/' not in pattern and pattern in filename.split('/'): return True return False
def clearpyc(root, patterns='*',single_level=False, yield_folders=False): """ root: ??¼ patterns ??????? single_level: ???¼?? yield_folders: ??¼?? """ patterns = patterns.split(';') for path, subdirs, files in os.walk(root): if yield_folders: files.extend(subdirs) files.sort() for name in files: for pattern in patterns: if fnmatch.fnmatch(name, pattern.strip()):# ?pattern??? yield os.path.join(path, name) if single_level: break
def load_all_url_files(_dir, file_name_prefix): url_list = [] for file_name in os.listdir(_dir): if fnmatch.fnmatch(file_name, file_name_prefix +'*.txt'): file_name = osp.join(_dir, file_name) fp_urls = open(file_name, 'r') #Open the text file called database.txt print 'load URLs from file: ' + file_name i = 0 for line in fp_urls: line = line.strip() if len(line)>0: splits = line.split('\t') url_list.append(splits[0].strip()) i=i+1 print str(i) + ' URLs loaded' fp_urls.close() return url_list ########### End of Functions to Load downloaded urls ########### ############## Functions to get date/time strings ############
def get_grace_times(self, names): labels = self.details['Config']['Labels'] if labels and labels.get("com.caduc.image.grace_time"): return set([labels.get('com.caduc.image.grace_time', None)]) grace_config = self.config.get("images") grace_times = set() if grace_config: for name in names: for pattern, kv in six.iteritems(grace_config): if fnmatch.fnmatch(name, pattern): grace_time = kv['grace_time'] if grace_time is None or grace_time==-1: grace_times.add(float('inf')) else: grace_times.add(kv['grace_time']) if grace_times: return grace_times return set([self.grace_time])
def get_latest_ckpt(self): """Get the latest checkpoint filename in a folder.""" ckpt_fname_pattern = os.path.join(self.folder, self.fname + '.ckpt*') ckpt_fname_list = [] for fn in os.listdir(self.folder): fullname = os.path.join(self.folder, fn) if fnmatch.fnmatch(fullname, ckpt_fname_pattern): if not fullname.endswith('.meta'): ckpt_fname_list.append(fullname) if len(ckpt_fname_list) == 0: raise Exception( 'No checkpoint file found {}'.format(ckpt_fname_pattern)) ckpt_fname_step = [int(fn.split('-')[-1]) for fn in ckpt_fname_list] latest_step = max(ckpt_fname_step) latest_ckpt = os.path.join(self.folder, self.fname + '.ckpt-{}'.format(latest_step)) latest_graph = os.path.join(self.folder, self.fname + '.ckpt-{}.meta'.format(latest_step)) return (latest_ckpt, latest_graph, latest_step)
def zone_list(module, base_url, zone=None): ''' Return list of existing zones ''' list = [] url = "{0}".format(base_url) response, info = fetch_url(module, url, headers=headers) if info['status'] != 200: module.fail_json(msg="failed to enumerate zones at %s: %s" % (url, info['msg'])) content = response.read() data = json.loads(content) for z in data: if zone is None or fnmatch.fnmatch(z['name'], zone): list.append({ 'name' : z['name'], 'kind' : z['kind'].lower(), 'serial' : z['serial'], }) return list
def convert_all_files_in_path(self, path): if not os.path.exists(path): print("'%s': Path doesn't exists. Skipping" % path) return count = 0 for filename in os.listdir(path): full_path = os.path.join(path, filename) only_name, ext = os.path.splitext(full_path) cmd = None pyfile = None if fnmatch.fnmatch(filename, '*.ui'): pyfile = '%s.py' % only_name cmd = self.PYUIC elif fnmatch.fnmatch(filename, '*.qrc'): pyfile = '%s_rc.py' % only_name cmd = self.PYRCC if cmd and modified(full_path, pyfile): cmd_string = '%s -o "%s" "%s"' % (cmd, pyfile, full_path) os.system(cmd_string) count += 1 print("'%s': %s converted %s files" % (path, time.ctime(time.time()), count))
def getScannerThread(i, filenameStr, mp3guessenc_bin, mediainfo_bin, fileinfo_dialog_update=None, cmd_timeout=300,debug_enabled=False,main_q=None,info_q=None): threads = set() if fnmatch.fnmatch(filenameStr, "*.mp3"): # use mp3guessenc if available if not mp3guessenc_bin == "": threads.add(scanner_Thread(i, filenameStr, mp3guessenc_bin, "mp3guessenc", "-e", debug_enabled, info_q, main_q, fileinfo_dialog_update, cmd_timeout)) elif not mediainfo_bin == "": # always use mediainfo threads.add(scanner_Thread(i, filenameStr, mediainfo_bin, "mediainfo", "-", debug_enabled, info_q, main_q, fileinfo_dialog_update, cmd_timeout)) elif fnmatch.fnmatch(filenameStr, "*.flac") and not mediainfo_bin == "": threads.add(scanner_Thread(i, filenameStr, mediainfo_bin, "mediainfo", "-", debug_enabled, info_q, main_q, fileinfo_dialog_update, cmd_timeout)) elif not mediainfo_bin == "": # default for all files is mediainfo threads.add(scanner_Thread(i, filenameStr, mediainfo_bin, "mediainfo", "-", debug_enabled, info_q, main_q, fileinfo_dialog_update, cmd_timeout)) return threads
def installed_plugins(self): """List all plugins installed.""" from os import listdir from fnmatch import fnmatch import compiler import inspect files = listdir('Plugins') try: files.remove('mount.py') files.remove('template.py') except ValueError: pass plugins = {} for element in files: if fnmatch(element, '*.py') and not fnmatch(element, '_*'): plug_doc = compiler.parseFile('Plugins/' + element).doc plug_doc = inspect.cleandoc(plug_doc) plugins[element[:-3]] = plug_doc # Remove .py) return plugins
def match(item, patterns): matched = not patterns for pattern in filter(None, patterns): negate = False if pattern.startswith('-') or pattern.startswith('!'): negate = True pattern = pattern[1:] if pattern.startswith('+'): pattern = pattern[1:] local_matched = fnmatch.fnmatch(item, pattern) if negate: matched = matched and not local_matched else: matched = matched or local_matched return matched
def list(self, pattern): res = self._EXTRACT_PATTERN.match(pattern) if not res: raise URIException(f"Unable to match {pattern}," " please use 'organization[/repo_pattern]'") org_name = res.group("org") repo_matcher = res.group("repo") or "*" try: repos = self._gh.get_organization(org_name).get_repos() except github.GithubException: repos = self._gh.get_user(org_name).get_repos() for repo in repos: if not fnmatch.fnmatch(repo.name, repo_matcher): continue if self._clone_protocol == self.CloneProtocol.ssh: yield Repo(name=repo.name, url=repo.ssh_url) elif self._clone_protocol == self.CloneProtocol.https: yield Repo(name=repo.name, url=repo.clone_url) else: raise RuntimeError(f"Invalid protocol selected: {self._clone_protocol}")
def match(value, dn, recurse): import fnmatch result = [] if recurse: walk_entries = os.walk(dn) else: walk_entries = [(dn, [], os.listdir(dn))] for walk_result in walk_entries: for fn in walk_result[2]: fn = os.path.relpath(os.path.join(walk_result[0], fn), dn) accept = False for token in value.split(): negate = token.startswith('-') if negate: token = token[1:] if not fnmatch.fnmatch(fn, token): continue accept = not negate if accept: result.append(fn) result.sort() return result
def _build(inputs): paths = [] for f in inputs: if os.path.isdir(f): # Walk through the directory and get all PDF files # Credit: https://stackoverflow.com/a/2186565/4856091 for root, dirnames, filenames in os.walk(f): for filename in fnmatch.filter(filenames, "*.pdf"): paths.append(os.path.join(root, filename)) elif f.endswith(".pdf"): paths.append(f) else: # Get the contents as list of files _files = [line.strip() for line in open(f).readlines()] _files = [_f for _f in _files if (not _f.startswith("#")) and (_f != "")] paths += _files return paths
def copy_other(opts, flacdir, outdir): if opts.verbose: print('COPYING other files') for dirpath, dirs, files in os.walk(flacdir, topdown=False): for name in files: if opts.nolog and fnmatch(name.lower(), '*.log'): continue if opts.nocue and fnmatch(name.lower(), '*.cue'): continue if opts.nodots and fnmatch(name.lower(), '^.'): continue if (not fnmatch(name.lower(), '*.flac') and not fnmatch(name.lower(), '*.m3u')): d = re.sub(re.escape(flacdir), outdir, dirpath) if (os.path.exists(os.path.join(d, name)) and not opts.overwrite): continue if not os.path.exists(d): os.makedirs(d) shutil.copy(os.path.join(dirpath, name), d)
def remove(path, dest_root, dryrun=False, debug=False): """ Remove the specified file/directory using `rm -rf`, to clean up the destination backup. The specified path must locate under the `dest_root` for safety. """ if not fnmatch(path, dest_root+"/*"): raise ValueError("Not allowed to remove file/directory " "outside destination: %s" % path) if not os.path.exists(path): return logger.info("Remove: %s" % path) args = ["-r", "-f"] if debug: args += ["-v"] cmd = ["rm"] + args + [path] if not dryrun: subprocess.check_call(cmd)
def fnmatch_lines_random(self, lines2): """Check lines exist in the output. The argument is a list of lines which have to occur in the output, in any order. Each line can contain glob whildcards. """ lines2 = self._getlines(lines2) for line in lines2: for x in self.lines: if line == x or fnmatch(x, line): self._log("matched: ", repr(line)) break else: self._log("line %r not found in output" % line) raise ValueError(self._log_text)
def should_skip(filename, config, path='/'): """Returns True if the file should be skipped based on the passed in settings.""" for skip_path in config['skip']: if posixpath.abspath(posixpath.join(path, filename)) == posixpath.abspath(skip_path.replace('\\', '/')): return True position = os.path.split(filename) while position[1]: if position[1] in config['skip']: return True position = os.path.split(position[0]) for glob in config['skip_glob']: if fnmatch.fnmatch(filename, glob): return True return False
def get_bucket_info(user): """return an object that has 'bucket', 'endpoint_url', 'region'. Only 'bucket' is mandatory in the response object. """ url = settings.UPLOAD_DEFAULT_URL exceptions = settings.UPLOAD_URL_EXCEPTIONS if user.email.lower() in exceptions: # easy exception = exceptions[user.email.lower()] else: # match against every possible wildcard exception = None # assume no match for email_or_wildcard in settings.UPLOAD_URL_EXCEPTIONS: if fnmatch.fnmatch(user.email.lower(), email_or_wildcard.lower()): # a match! exception = settings.UPLOAD_URL_EXCEPTIONS[ email_or_wildcard ] break if exception: url = exception return S3Bucket(url)
def list(self, labelmatch="*"): ''' run "launchctl list" and return list of jobs that match the shell- style wildcard <labelmatch> since all jobs have our app-specific prefix, and have their run time appended to their names, caller should terminate <labelmatch> with ".*" unless they already have an exact name. ''' results = [] try: output = subprocess.check_output(["sudo", "launchctl", "list"]) for line in output.split("\n"): if len(line) == 0: break fields= line.split(None,3) if len(fields) < 3: self.bomb("unexpected output from 'sudo launchctl list: %s'" % line) job= fields[2] if fnmatch(job, self.prefix + labelmatch): results.append(job[len(self.prefix):]) except (subprocess.CalledProcessError) as error: self.bomb("running 'sudo launchctl list'", error) return results
def get_mode(self, paths): """Return ``SINGLE_END`` or ``PAIRED_END`` from list of file paths Raise InputDataException if non-existing. """ seen = {0: 0, 1: 0} # counters for path in paths: filename = os.path.basename(path) for i, patterns in enumerate([PATTERNS_R1, PATTERNS_R2]): for pattern in patterns: if fnmatch.fnmatch(filename, pattern): seen[i] += 1 break if not seen[0] and seen[1]: raise InvalidDataException('Have seen only R2 in {}!'.format( ','.join(paths))) if seen[1] and seen[0] != seen[1]: raise InvalidDataException( 'Have seen different number of R1 and R2 reads in {}'.format( ','.join(paths))) return (PAIRED_END if seen[1] else SINGLE_END)
def commands(connection, value=None): names = [] for command in cmdlist: command_func = cmdlist[command] if (hasattr(command_func, 'user_types') and command not in connection.rights): continue include = False if (value is None or fnmatch.fnmatch(command, value)): include = True aliases = [] for a in aliaslist: if aliaslist[a] == command: if (value is None or fnmatch.fnmatch(a, value)): include = True aliases.append(a) cmd = command if len(aliases) == 0 else ( '%s (%s)' % (command, ', '.join(aliases))) if include: names.append(cmd) return 'Commands: %s' % (', '.join(names))
def select_host(self, host, host_names=None): """ checks that host is valid, i.e. in the list of glob host_names if the host is missing, then is it selects the first entry from host_names read more here: https://github.com/web2py/web2py/issues/1196 """ if host: if host_names: for item in host_names: if fnmatch.fnmatch(host, item): break else: raise HTTP(403, "Invalid Hostname") elif host_names: host = host_names[0] else: host = 'localhost' return host
def __getitem__(self, index): while 1: try: file = self.files[self.index] self.index = self.index + 1 except IndexError: self.index = 0 self.directory = self.stack.pop() self.files = os.listdir(self.directory) else: fullname = os.path.join(self.directory, file) if os.path.isdir(fullname) and not os.path.islink(fullname): self.stack.append(fullname) if not (file.startswith('.') or file.startswith('#') or file.endswith('~')) \ and fnmatch.fnmatch(file, self.pattern): return fullname
def find_analysis_barcode_file(parent_dir): ''' Find the barcodes file for the analysis # analysis_barcode_file="$(find "$analysis_outdir" -path "*variantCaller_out*" -name "sample_barcode_IDs.tsv" | head -1)" ''' import os import fnmatch analysis_barcode_file = None for root, dirs, files in os.walk(parent_dir): if fnmatch.fnmatch(root, "*variantCaller_out*"): for file in files: if fnmatch.fnmatch(file, "sample_barcode_IDs.tsv"): analysis_barcode_file = os.path.join(root, file) break # make sure file exists if analysis_barcode_file == None: print("ERROR: Analysis barcode file not found") pl.file_exists(analysis_barcode_file, kill = True) return(analysis_barcode_file)
def included_in_wildcard(self, names, target_name): """Is target_name covered by a wildcard? :param names: server aliases :type names: `collections.Iterable` of `str` :param str target_name: name to compare with wildcards :returns: True if target_name is covered by a wildcard, otherwise, False :rtype: bool """ # use lowercase strings because fnmatch can be case sensitive target_name = target_name.lower() for name in names: name = name.lower() # fnmatch treats "[seq]" specially and [ or ] characters aren't # valid in Apache but Apache doesn't error out if they are present if "[" not in name and fnmatch.fnmatch(target_name, name): return True return False
def read_annotated(dir_path, patterns=["*.jpg", "*.png", "*.jpeg"]): """ Read annotated images from a directory. This reader assumes that the images in this directory are separated in different directories with the label name as directory name. The method returns a generator of the label (string) , the opencv image and the filename. :param dir_path: The base directory we are going to read :param patterns: Patterns of the images the reader should match """ for label in os.listdir(dir_path): for root, dirs, files in os.walk(os.path.join(dir_path, label)): for basename in files: for pattern in patterns: if fnmatch.fnmatch(basename, pattern): filename = os.path.join(root, basename) image = cv2.imread(filename) if image is None: print ">> Ignore empty image {f}".format(f=filename) else: yield label, image, filename
def _load_sensors(config): """Loads all the sensors from the specified config file. Args: config (ConfigParser): instance from which to extract the sensor list. `str` is also allowed, in which case it should be the path to the config file to load. """ global _sensors, _sensors_parsed if not _sensors_parsed: parser = _get_parser(config) if parser is not None: #Now that we have the thread, we can add configuration for each of the #sensors in the config file. from fnmatch import fnmatch for section in parser.sections(): if fnmatch(section, "sensor.*"): name = section[len("sensor."):] _sensors[name] = Sensor(None,name,**dict(parser.items(section))) _sensors_parsed = True
def preprocess_data_and_labels_AAP(data_file_path, save_path): def merge_folds(data_file_path, save_path): # merge all the separated folds into one file train = [] val = [] test = [] for file in os.listdir(data_file_path): if fnmatch.fnmatch(file, '*train.txt'): train += (open(data_file_path + '/' + file, 'r').readlines()) elif fnmatch.fnmatch(file, '*validation.txt'): val += (open(data_file_path + '/' + file, 'r').readlines()) else: test += (open(data_file_path + '/' + file, 'r').readlines()) open(save_path + '/train.txt', 'w').write(''.join(train)) open(save_path + '/val.txt', 'w').write(''.join(val)) open(save_path + '/test.txt', 'w').write(''.join(test)) print len(train+val+test) merge_folds(data_file_path, save_path)
def _match_pattern(self, pattern, file_info): file_status = None file_path = file_info.src pattern_type = pattern[0] if file_info.src_type == 'local': path_pattern = pattern[1].replace('/', os.sep) else: path_pattern = pattern[1].replace(os.sep, '/') is_match = fnmatch.fnmatch(file_path, path_pattern) if is_match and pattern_type == 'include': file_status = (file_info, True) LOG.debug("%s matched include filter: %s", file_path, path_pattern) elif is_match and pattern_type == 'exclude': file_status = (file_info, False) LOG.debug("%s matched exclude filter: %s", file_path, path_pattern) else: LOG.debug("%s did not match %s filter: %s", file_path, pattern_type[2:], path_pattern) return file_status
def run_per_file(config, ignore_paths=None, path=None, config_dir=None): ignore_paths = ignore_paths or [] path = path or os.getcwd() cmd = run_config(config, config_dir) print(cmd) run_cmds = [] patterns = PATTERNS.get(config.get('language')) paths = all_filenames_in_dir(path=path, ignore_paths=ignore_paths) for pattern in patterns: for filepath in fnmatch.filter(paths, pattern): run_cmds.append(cmd + [filepath]) pool = Pool() def result(run_cmd): _, out = run_command(run_cmd) return run_cmd[-1], out output = pool.map(result, run_cmds) return output
def _render(self, target_name, read_path, write_path): """Render a given template or directory for the target. :param target_name: String. Project or App name to render. :param read_path: String. Path to template or directory to render. :param write_path: String. Path to write to (or create directory). """ if os.path.isdir(read_path): if os.path.split(read_path)[1] == 'project_name': write_path = os.path.join(os.path.split(write_path)[0], self.variables['project_name']) os.mkdir(write_path) for filename in os.listdir(read_path): if fnmatch(filename, 'test_*'): write_filename = filename.replace('test_', f'test_{target_name}_') else: write_filename = filename self._render(target_name, os.path.join(read_path, filename), os.path.join(write_path, write_filename)) else: tpl = Template(filename=read_path) with open(os.path.splitext(write_path)[0], 'w') as f: f.write(tpl.render(**self.variables))
def match_file(self, filename): """Used to check if files can be handled by this linter, Often this will just file extension checks.""" pattern = self.options.get('pattern') or self.default_pattern if not pattern: return True globs = pattern.split() for glob in globs: if fnmatch.fnmatch(filename, glob): # ??? glob ?? return True try: if re.match(pattern, filename, re.I): # ??????????? return True except re.error: pass return False
def get_filter(opts=None): opts = opts or [] if 'inc=*' in opts: # do not filter any files, include everything return None def _filter(dir, ls): incs = [opt.split('=').pop() for opt in opts if 'inc=' in opt] _filter = [] for f in ls: _f = os.path.join(dir, f) if not os.path.isdir(_f) and not _f.endswith('.py') and incs: if True not in [fnmatch(_f, inc) for inc in incs]: logging.debug('Not syncing %s, does not match include ' 'filters (%s)' % (_f, incs)) _filter.append(f) else: logging.debug('Including file, which matches include ' 'filters (%s): %s' % (incs, _f)) elif (os.path.isfile(_f) and not _f.endswith('.py')): logging.debug('Not syncing file: %s' % f) _filter.append(f) elif (os.path.isdir(_f) and not os.path.isfile(os.path.join(_f, '__init__.py'))): logging.debug('Not syncing directory: %s' % f) _filter.append(f) return _filter return _filter
def _get_tags(data): ''' Retrieve all the tags for this distro from the yaml ''' ret = {} distro = __grains__.get('osfinger') for audit_dict in data.get('command', []): # command:0 for audit_id, audit_data in audit_dict.iteritems(): # command:0:nodev tags_dict = audit_data.get('data', {}) # command:0:nodev:data tags = None for osfinger in tags_dict: if osfinger == '*': continue osfinger_list = [finger.strip() for finger in osfinger.split(',')] for osfinger_glob in osfinger_list: if fnmatch.fnmatch(distro, osfinger_glob): tags = tags_dict.get(osfinger) break if tags is not None: break # If we didn't find a match, check for a '*' if tags is None: tags = tags_dict.get('*', {}) # command:0:nodev:data:Debian-8 if 'tag' not in tags: tags['tag'] = '' tag = tags['tag'] if tag not in ret: ret[tag] = [] formatted_data = {'tag': tag, 'module': 'command'} formatted_data.update(audit_data) formatted_data.update(tags) formatted_data.pop('data') ret[tag].append(formatted_data) return ret
def matches(self, cpe): """Return true or false if this CPE object matches the provided cpe_str, using wildcards. :param cpe: The cpe to compare against """ # TODO see issue #3 if self.vendor and not fnmatch.fnmatch(cpe.vendor, self.vendor): print ("vendor was false") return False elif self.product and not fnmatch.fnmatch(cpe.product, self.product): print ("product was false") return False elif self.version and not fnmatch.fnmatch(cpe.version, self.version): print ("version was false") return False elif self.update and not fnmatch.fnmatch(cpe.update, self.update): print ("update was false") return False elif self.edition and not fnmatch.fnmatch(cpe.edition, self.edition): print ("edition was false") return False elif self.part and not fnmatch.fnmatch(cpe.part, self.part): print ("part was false") return False else: return True
def _match_path(self, path, full_path, pattern): # override this method to use alternative matching strategy return fnmatch(path, pattern)
def is_skipped_module(self, module_name): for pattern in self.skip: if fnmatch.fnmatch(module_name, pattern): return True return False
def get_entries(commit): return [entry for entry in commit.tree.traverse() if entry.type == 'blob' and all([fnmatch.fnmatch(entry.path, pattern) for pattern in args.only]) and not any([fnmatch.fnmatch(entry.path, pattern) for pattern in args.ignore])]
def process_event(self, event): ''' Take an event and attempt to match it in the list of keys. If found, schedule the requested action. ''' if not self.opts: return for tag in self.opts: if fnmatch.fnmatch(event['tag'], tag): for action in self.opts[tag]['reactions']: # Super-simple non-blocking appraoch # Threading won't scale as much as a true event loop # would. It will, however, handle cases where single-threaded # loop would be blocked. Do you trust your reactions to be co-op?! # Of course, the other side of this is thread-safety. Either way, be smart! t = threading.Thread(target=self.react, args=(action, event)) t.start() if 'rules' in self.opts[tag]: rule_actions = [] for rule in self.opts[tag]['rules']: rule_actions = process_rule(rule, event, tracking_id) if rule_actions: for action in rule_actions: self.react(action.keys()[0], action.values()) else: # Rule chaining ends when a rule does not match break
def _process(self, event): """Processes a raw event Creates the proper salt event class wrapper and notifies listeners Args: event (dict): the raw event data """ logger.debug("Process event -> %s", event) wrapper = None if fnmatch.fnmatch(event['tag'], 'salt/job/*/new'): wrapper = NewJobEvent(event) for listener in self.listeners: listener.handle_salt_event(wrapper) listener.handle_new_job_event(wrapper) elif fnmatch.fnmatch(event['tag'], 'salt/run/*/new'): wrapper = NewRunnerEvent(event) for listener in self.listeners: listener.handle_salt_event(wrapper) listener.handle_new_runner_event(wrapper) elif fnmatch.fnmatch(event['tag'], 'salt/job/*/ret/*'): wrapper = RetJobEvent(event) for listener in self.listeners: listener.handle_salt_event(wrapper) listener.handle_ret_job_event(wrapper) elif fnmatch.fnmatch(event['tag'], 'salt/run/*/ret'): wrapper = RetRunnerEvent(event) for listener in self.listeners: listener.handle_salt_event(wrapper) listener.handle_ret_runner_event(wrapper) elif fnmatch.fnmatch(event['tag'], 'salt/state_result/*'): wrapper = StateResultEvent(event) for listener in self.listeners: listener.handle_salt_event(wrapper) listener.handle_state_result_event(wrapper)
def matchGlob(self,pattern): if type(pattern) != DNSLabel: pattern = DNSLabel(pattern) return fnmatch.fnmatch(str(self).lower(),str(pattern).lower())
def Walk(root='.', recurse=True, pattern='*'): """ Generator for walking a directory tree. Starts at specified root folder, returning files that match our pattern. Optionally will also recurse through sub-folders. """ for path, subdirs, files in os.walk(root): for name in files: if fnmatch.fnmatch(name, pattern): yield os.path.join(path, name) if not recurse: break
def Walk( root, recurse=0, pattern='*', return_folders=0 ): import fnmatch, os, string # initialize result = [] # must have at least root folder try: names = os.listdir(root) except os.error: return result # expand pattern pattern = pattern or '*' pat_list = string.splitfields( pattern , ';' ) # check each file for name in names: fullname = os.path.normpath(os.path.join(root, name)) # grab if it matches our pattern and entry type for pat in pat_list: if fnmatch.fnmatch(name, pat): if os.path.isfile(fullname) or (return_folders and os.path.isdir(fullname)): result.append(fullname) continue # recursively scan other folders, appending results if recurse: if os.path.isdir(fullname) and not os.path.islink(fullname): result = result + Walk( fullname, recurse, pattern, return_folders ) return result
def callback(arg, directory, files): for file in files: if fnmatch.fnmatch(file,arg): for line in fileinput.input(os.path.abspath(os.path.join(directory, file)),inplace=1): if re.search('.*theunderdogs.*', line): # I changed * to .* but it would probably work without this if line = string.replace(line,'theunderdogs','the-underdogs') # old string , new string print line,
def apply_to_targets(runbooks, config, dbc): ''' Match hosts with runbooks ''' targets = dbc.get_target() logger.debug("Found targets: {0}".format(json.dumps(targets))) for target in targets.keys(): # Create runbook dictionary if it doesn't exist if "runbooks" not in targets[target].keys(): logger.debug("Creating runbook dictionary in target config") targets[target]['runbooks'] = {} logger.debug("Identifying runbooks for target {0}".format(target)) for matcher in runbooks.keys(): if fnmatch.fnmatch(targets[target]['hostname'], matcher): for runbook in runbooks[matcher].keys(): logger.debug("Checking if {0} is already applied".format(runbook)) if runbook not in targets[target]['runbooks'].keys(): try: targets[target]['runbooks'][runbook] = render_runbooks( runbooks[matcher][runbook], targets[target]['facts']) except Exception as e: logger.warn("Could not apply runbook {0} to target {1}: {2}".format( runbook, targets[target]['hostname'], e.message )) dbc.save_target(target=targets[target]) msg = { 'msg_type' : 'runbook_add', 'runbook' : runbook, 'target' : target} logger.debug("Adding runbook policy {0} to target {1}".format( runbook, target)) count = dbc.notify("monitors", msg) logger.info("Notified {0} of runbook changes to target {1}".format( count, target)) else: logger.debug("{0} is already applied to target {1}".format(runbook, target)) return True
def is_domain_match_glob_whitelist(domain): """ ?????? `domains_whitelist_auto_add_glob_list` ??????? :type domain: str :rtype: bool """ for domain_glob in domains_whitelist_auto_add_glob_list: if fnmatch(domain, domain_glob): return True return False
def filename_match(filename, patterns, default=True): """Check if patterns contains a pattern that matches filename. If patterns is unspecified, this always returns True. """ if not patterns: return default return any(fnmatch(filename, pattern) for pattern in patterns)
def run(self): def callback(*args): return self.isInterruptionRequested() entries = os.listdir(self.path) for i, p in enumerate(entries): if any(fnmatch(p, ex) for ex in self.exclude): continue p = os.path.join(self.path, p) if not dottorrent.is_hidden_file(p): sfn = os.path.split(p)[1] + '.torrent' self.progress_update.emit(sfn, i, len(entries)) t = dottorrent.Torrent( p, exclude=self.exclude, trackers=self.trackers, web_seeds=self.web_seeds, private=self.private, source=self.source, comment=self.comment, include_md5=self.include_md5, creation_date=datetime.now(), created_by=CREATOR ) try: self.success = t.generate(callback=callback) # ignore empty inputs except dottorrent.exceptions.EmptyInputException: continue except Exception as exc: self.onError.emit(str(exc)) return if self.isInterruptionRequested(): return if self.success: with open(os.path.join(self.save_dir, sfn), 'wb') as f: t.save(f)