我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用glob.glob()。
def delete_previous_checkpoints(self, num_previous=5): """ Deletes all previous checkpoints that are <num_previous> before the present checkpoint. This is done to prevent blowing out of memory due to too many checkpoints :param num_previous: :return: """ self.present_checkpoints = glob.glob(self.get_checkpoint_location() + '/*.ckpt') if len(self.present_checkpoints) > num_previous: present_ids = [self.__get_id(ckpt) for ckpt in self.present_checkpoints] present_ids.sort() ids_2_delete = present_ids[0:len(present_ids) - num_previous] for ckpt_id in ids_2_delete: ckpt_file_nm = self.get_checkpoint_location() + '/model_' + str(ckpt_id) + '.ckpt' os.remove(ckpt_file_nm)
def directory_has_smart_contract(location): # returns bool if there is a tsol contract in said directory # probably makes more sense to put this inside of the tsol package code_path = glob.glob(os.path.join(location, '*.tsol')) example = glob.glob(os.path.join(location, '*.json')) assert len(code_path) > 0 and len(example) > 0, 'Could not find *.tsol and *.json files in provided directory.' # pop off the first file name and turn the code into a file object code = open(code_path[0]) # turn the example into a dict with open(example[0]) as e: example = json.load(e) try: tsol.compile(code, example) except Exception as e: print(e) return False return True
def loadImgs(imgsfolder, rows, cols): myfiles = glob.glob(imgsfolder+'*.jpg', 0) nPics = len(myfiles) X = np.zeros((nPics, rows, cols), dtype = 'uint8') i = 0; imgNames = [] for filepath in myfiles: sd = filepath.rfind('/'); ed = filepath.find('.'); filename = filepath[int(sd+1):int(ed)] imgNames.append(filename) temp = cv2.imread(filepath, 0) if temp == None: continue elif temp.size < 1000: continue elif temp.shape == [rows, cols, 1]: X[i,:,:] = temp else: X[i,:,:] = cv2.resize(temp,(cols, rows), interpolation = cv2.INTER_CUBIC) i += 1 return X, imgNames
def gen_pruned_features(name): print name feature_dir = 'data/feature_' + args.domain + \ '_' + str(args.n_boxes) + 'boxes/' + name + '/' n_clips = len(glob.glob(feature_dir + BOX_FEATURE + '*.npy')) for clip in xrange(1, n_clips+1): pruned_boxes = np.load(feature_dir + BOX_FEATURE + '{:04d}.npy'.format(clip)) # (50, args.n_boxes, 4) roisavg = np.load(feature_dir + 'roisavg{:04d}.npy'.format(clip)) # (50, args.n_boxes, 512) pruned_roisavg = np.zeros((50, args.n_boxes, 512)) for frame in xrange(50): for box_id in xrange(args.n_boxes): if not np.array_equal(pruned_boxes[frame][box_id], np.zeros((4))): pruned_roisavg[frame][box_id] = roisavg[frame][box_id] np.save('{}pruned_roisavg{:04d}'.format(feature_dir, clip), pruned_roisavg)
def __call__(self): ctxt = {} mappings = super(PhyNICMTUContext, self).__call__() if mappings and mappings.keys(): ports = sorted(mappings.keys()) napi_settings = NeutronAPIContext()() mtu = napi_settings.get('network_device_mtu') all_ports = set() # If any of ports is a vlan device, its underlying device must have # mtu applied first. for port in ports: for lport in glob.glob("/sys/class/net/%s/lower_*" % port): lport = os.path.basename(lport) all_ports.add(lport.split('_')[1]) all_ports = list(all_ports) all_ports.extend(ports) if mtu: ctxt["devs"] = '\\n'.join(all_ports) ctxt['mtu'] = mtu return ctxt
def copy_results(datanames, result_dir, output_dir, verbose): ''' This function copies all the [dataname.predict] results from result_dir to output_dir''' for basename in datanames: try: test_files = ls(result_dir + "/" + basename + "*_test*.predict") if len(test_files)==0: vprint(verbose, "[-] Missing 'test' result files for " + basename) return 0 for f in test_files: copy2(f, output_dir) valid_files = ls(result_dir + "/" + basename + "*_valid*.predict") if len(valid_files)==0: vprint(verbose, "[-] Missing 'valid' result files for " + basename) return 0 for f in valid_files: copy2(f, output_dir) vprint( verbose, "[+] " + basename.capitalize() + " copied") except: vprint(verbose, "[-] Missing result files") return 0 return 1 # ================ Display directory structure and code version (for debug purposes) =================
def get_last_checkpoint(self): """ Assumes that the last checpoint has a higher checkpoint id. Checkpoint will be saved in this exact format model_<checkpint_id>.ckpt Eg - model_100.ckpt :return: """ ''' ''' self.present_checkpoints = glob.glob(self.get_checkpoint_location() + '/*.ckpt') if len(self.present_checkpoints) != 0: present_ids = [self.__get_id(ckpt) for ckpt in self.present_checkpoints] # sort the ID's and return the model for the last ID present_ids.sort() self.last_id = present_ids[-1] self.last_ckpt = self.get_checkpoint_location() + '/model_' +\ str(self.last_id) + '.ckpt' return self.last_ckpt
def _get_files(self): """ Get files by self.mask from self.path dir. """ file_paths = glob.glob(os.path.join(self.path, self.mask)) files = [] for f in file_paths: if os.path.isfile(f): file_name = os.path.basename(f) file_name = os.path.splitext(file_name)[0] mtime = int(os.path.getmtime(f) * 1000) files.append({'path': f, 'name': file_name, 'mtime': mtime}) return files
def run(self): self.run_command("egg_info") from glob import glob for pattern in self.match: pattern = self.distribution.get_name() + '*' + pattern files = glob(os.path.join(self.dist_dir, pattern)) files = [(os.path.getmtime(f), f) for f in files] files.sort() files.reverse() log.info("%d file(s) matching %s", len(files), pattern) files = files[self.keep:] for (t, f) in files: log.info("Deleting %s", f) if not self.dry_run: if os.path.isdir(f): shutil.rmtree(f) else: os.unlink(f)
def find_data_files(self, package, src_dir): """Return filenames for package's data files in 'src_dir'""" patterns = self._get_platform_patterns( self.package_data, package, src_dir, ) globs_expanded = map(glob, patterns) # flatten the expanded globs into an iterable of matches globs_matches = itertools.chain.from_iterable(globs_expanded) glob_files = filter(os.path.isfile, globs_matches) files = itertools.chain( self.manifest_files.get(package, []), glob_files, ) return self.exclude_data_files(package, src_dir, files)
def _get_platform_patterns(spec, package, src_dir): """ yield platform-specific path patterns (suitable for glob or fn_match) from a glob-based spec (such as self.package_data or self.exclude_package_data) matching package in src_dir. """ raw_patterns = itertools.chain( spec.get('', []), spec.get(package, []), ) return ( # Each pattern has to be converted to a platform-specific path os.path.join(src_dir, convert_path(pattern)) for pattern in raw_patterns ) # from Python docs
def prepare_data(video_dir, output_dir, max_video_limit=1, screen_display=False): """ Args: 1. video_dir: Directory storing all videos to be processed. 2. output_dir: Directory where all mouth region images are to be stored. 3. max_video_limit: Puts a limit on number of videos to be used for processing. 4. screen_display: Decides whether to use screen (to display video being processed). """ video_file_paths = sorted(glob.glob(video_dir + "*.mp4"))[:max_video_limit] load_trained_models() if not FACE_DETECTOR_MODEL: print "[ERROR]: Please ensure that you have dlib's landmarks predictor file " + \ "at data/dlib_data/. You can download it here: " + \ "http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2" return False for path in video_file_paths: extract_mouth_regions(path, output_dir, screen_display) return True
def convert_mp4(video_dir, audio_dir): ''' Args: 1. video_dir: Directory for all video files 2. audio_dir: Directory where all converted files will be stored. ''' # Get all file names video_file_names = sorted(glob.glob(video_dir + "*.mp4")) # Extract actual names of file, also remove any extensions video_names = map(lambda x : x.split('/')[-1].split(".")[0], video_file_names) # Command for converting video to audio command = "ffmpeg -i " + video_dir + "{0}.mp4 -ab 96k -ar 44100 -vn " + audio_dir + "{0}.wav" for name in video_names: subprocess.call(command.format(name), shell=True)
def prompt_pick_backup(message): """Prompts the user to pick an existing database, and returns the selected choice database ID and its metadata""" # First load all the saved databases (splitting extension and path) saved_db = [path.splitext(path.split(f)[1])[0] for f in glob('backups/*.tlo')] # Then prompt the user print('Available backups databases:') for i, db_id in enumerate(saved_db): metadata = get_metadata(db_id) print('{}. {}, ID: {}'.format(i + 1, metadata.get('peer_name', '???'), db_id)) db_id = saved_db[get_integer(message, 1, len(saved_db)) - 1] return db_id, get_metadata(db_id)
def create_nodule_mask_subset(luna_subset): LUNA_DIR = LUNA_BASE_DIR % luna_subset files = glob.glob(''.join([LUNA_DIR,'*.mhd'])) annotations = pd.read_csv(LUNA_ANNOTATIONS) annotations.head() file = "../luna/original_lungs/subset0/1.3.6.1.4.1.14519.5.2.1.6279.6001.564534197011295112247542153557.mhd" for file in files: imagePath = file seriesuid = file[file.rindex('/')+1:] # everything after the last slash seriesuid = seriesuid[:len(seriesuid)-len(".mhd")] # cut out the suffix to get the uid cands = annotations[seriesuid == annotations.seriesuid] # select the annotations for the current series #print (cands) create_nodule_mask (imagePath, cands)
def setSdrRoot(self, sdrroot): self.nodeTreeWidget.clear() nodepath = os.path.join(sdrroot, 'dev/nodes/*/DeviceManager.dcd.xml') for dcdfile in glob.glob(nodepath): try: node = dcd.parse(dcdfile) name = node.get_name() domain = node.get_domainmanager().get_namingservice().get_name() domain = domain.split('/')[-1] dcdfile = dcdfile.replace(os.path.join(sdrroot,'dev'), '') # Add the node to the tree widget, including the default domain # as a hidden column QtGui.QTreeWidgetItem(self.nodeTreeWidget, [name, dcdfile, domain]) except: pass # Readjust the column widths to ensure that the entire name is shown # and that the scollbar allows viewing the entire DCD filename self.nodeTreeWidget.resizeColumnToContents(0) self.nodeTreeWidget.resizeColumnToContents(1) # Sort alphabetically by name self.nodeTreeWidget.sortByColumn(0, 0)
def main(): parser = argparse.ArgumentParser(description='FractalNet on CIFAR-100') parser.add_argument('--load', nargs=1, help='Test network with weights file') parser.add_argument('--deepest', help='Build with only deepest column activated', action='store_true') parser.add_argument('--test-all', nargs=1, help='Test all the weights from a folder') parser.add_argument('--summary', help='Print a summary of the network and exit', action='store_true') args = parser.parse_args() net = build_network(deepest=args.deepest) if args.load: weights = args.load[0] test_network(net, weights) elif args.test_all: folder = args.test_all[0] for weights in glob.glob(os.path.join(folder, 'weigh*')): test_network(net, weights) elif args.summary: net.summary() else: train_network(net)
def main(): parser = argparse.ArgumentParser(description='FractalNet on CIFAR-10') parser.add_argument('--load', nargs=1, help='Test network with weights file') parser.add_argument('--deepest', help='Build with only deepest column activated', action='store_true') parser.add_argument('--test-all', nargs=1, help='Test all the weights from a folder') parser.add_argument('--summary', help='Print a summary of the network and exit', action='store_true') args = parser.parse_args() net = build_network(deepest=args.deepest) if args.load: weights = args.load[0] test_network(net, weights) elif args.test_all: folder = args.test_all[0] for weights in glob.glob(os.path.join(folder, 'weigh*')): test_network(net, weights) elif args.summary: net.summary() else: train_network(net)
def guess_paired_path(path): """ Given the path to a file that contains the sequences for the first read in a pair, return the file that contains the sequences for the second read in a pair. Both files must have identical names, except that the first must have a '1' in its name, and the second must have a '2' at the same position. Return None if no second file was found or if there are too many candidates. >>> guess_paired_path('file.1.fastq.gz') # doctest: +SKIP 'file.2.fastq.gz' # if that file exists """ base, name = os.path.split(path) glob_pattern = os.path.join(base, name.replace('1', '?')) paths = [p for p in glob.glob(glob_pattern) if is_1_2(p, path) and '_R1_' not in p] if len(paths) != 1: return None return paths[0]
def slot_autoload_victim_clients(self): # clear self.combo_wep_mac_cfrag.clear() self.combo_wpa_mac_hand.clear() # check *.csv files if not glob.glob(config_dir + "*.csv"): self.output("no csv files in " + config_dir, 1) return # open dump file dump_file = commands.getoutput("cat " + config_dir + "*.csv | egrep -e '^[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}.+[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2},' | grep " + self.ac + " | tr ',' ' ' | awk ' { print $1 } '") dump_file = dump_file.split('\n') for mac in dump_file: self.combo_wep_mac_cfrag.insertItem(0, mac) self.combo_wpa_mac_hand.insertItem(0, mac) # # Add cracked key to database #
def _find_grail_rc(self): import glob import pwd import socket import tempfile tempdir = os.path.join(tempfile.gettempdir(), ".grail-unix") user = pwd.getpwuid(os.getuid())[0] filename = os.path.join(tempdir, user + "-*") maybes = glob.glob(filename) if not maybes: return None s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) for fn in maybes: # need to PING each one until we find one that's live try: s.connect(fn) except socket.error: # no good; attempt to clean it out, but don't fail: try: os.unlink(fn) except IOError: pass else: return s
def testall(list, recursive, toplevel): import sys import os for filename in list: if os.path.isdir(filename): print filename + '/:', if recursive or toplevel: print 'recursing down:' import glob names = glob.glob(os.path.join(filename, '*')) testall(names, recursive, 0) else: print '*** directory (use -r) ***' else: print filename + ':', sys.stdout.flush() try: print what(filename) except IOError: print '*** not found ***'
def _make_writer(self): """ :return: """ self._buffer = StringIO() self._bytes_written = 0 now = datetime.now() self.fname = self.log_folder + '/' + now.strftime('%Y%m%d_%H%M%S_{}.json'.format(self.make_random(6))) self.fname = str(pathlib.Path(self.fname)) self._out_fh = open(self.fname, 'w') self.write_pid() logging.warning("Writing to {} ({} bytes)".format(self._out_fh.name, self.max_bytes)) # compress any old files still lying around for fname in glob(self.log_folder+"/*.json"): if fname != self.fname: self._compress(fname)
def __init__(self, graph, start): for f in glob.glob(self.WSDIR + '/*.png'): os.remove(f) for f in glob.glob(self.WSDIR + '/*.dot'): os.remove(f) self._n = 0 ranks = {start: 0} for s, d in bfs_edges(graph, start): ranks[d] = ranks[s] + 1 _ranks = {} for n, r in ranks.items(): _ranks.setdefault(r, set()).add(n) ranks = _ranks self._ranks = '' fmt = ' "{n}" [pos="{x},{y}!"] ;\n' for r, ns in sorted(ranks.items()): ns = sorted([str(n) for n in ns]) for i, n in enumerate(ns): x = i * 2 + r % 2 y = -r * 2 self._ranks += fmt.format(n=n, x=x, y=y)
def find_previous(self): sfiles = os.path.join(self.output_dir, cfg.TRAIN.SNAPSHOT_PREFIX + '_iter_*.ckpt.meta') sfiles = glob.glob(sfiles) sfiles.sort(key=os.path.getmtime) # Get the snapshot name in TensorFlow redfiles = [] for stepsize in cfg.TRAIN.STEPSIZE: redfiles.append(os.path.join(self.output_dir, cfg.TRAIN.SNAPSHOT_PREFIX + '_iter_{:d}.ckpt.meta'.format(stepsize+1))) sfiles = [ss.replace('.meta', '') for ss in sfiles if ss not in redfiles] nfiles = os.path.join(self.output_dir, cfg.TRAIN.SNAPSHOT_PREFIX + '_iter_*.pkl') nfiles = glob.glob(nfiles) nfiles.sort(key=os.path.getmtime) redfiles = [redfile.replace('.ckpt.meta', '.pkl') for redfile in redfiles] nfiles = [nn for nn in nfiles if nn not in redfiles] lsf = len(sfiles) assert len(nfiles) == lsf return lsf, nfiles, sfiles
def clean_project_files(path_or_glob, logger) : """ Resolve file name references and ensure they are properly deleted """ if "*" in path_or_glob : files_to_clean = glob.glob(path_or_glob) else : files_to_clean = [os.path.expanduser(path_or_glob)] for file_to_clean in files_to_clean : if not os.path.exists(file_to_clean) : continue if os.path.isdir(file_to_clean) : logger.info("Removing directory {}".format(file_to_clean)) shutil.rmtree(file_to_clean) else : logger.info("Removing file {}".format(file_to_clean)) os.remove(file_to_clean)
def get_data(datadir): #datadir = args.data # assume each image is 512x256 split to left and right imgs = glob.glob(os.path.join(datadir, '*.jpg')) data_X = np.zeros((len(imgs),3,img_cols,img_rows)) data_Y = np.zeros((len(imgs),3,img_cols,img_rows)) i = 0 for file in imgs: img = cv2.imread(file,cv2.IMREAD_COLOR) img = cv2.resize(img, (img_cols*2, img_rows)) #print('{} {},{}'.format(i,np.shape(img)[0],np.shape(img)[1])) img = np.swapaxes(img,0,2) X, Y = split_input(img) data_X[i,:,:,:] = X data_Y[i,:,:,:] = Y i = i+1 return data_X, data_Y
def find_data_files(source, target, patterns): """ Locates the specified data-files and returns the matches in a data_files compatible format. source is the root of the source data tree. Use '' or '.' for current directory. target is the root of the target data tree. Use '' or '.' for the distribution directory. patterns is a sequence of glob-patterns for the files you want to copy. """ if glob.has_magic(source) or glob.has_magic(target): raise ValueError("Magic not allowed in src, target") ret = {} for pattern in patterns: pattern = os.path.join(source, pattern) for filename in glob.glob(pattern): if os.path.isfile(filename): targetpath = os.path.join( target, os.path.relpath(filename, source) ) path = os.path.dirname(targetpath) ret.setdefault(path, []).append(filename) return sorted(ret.items())
def load_all(self, config): """ Load all existing data. :param config: Configuration object. :type config: ``dict`` """ self.buckets = {} for path in glob.glob(os.path.join( config[helper.DATA_ROOT], '%s_buckets-*.pickle' % self.NAME)): with open(path, 'rb') as inp: try: for key, value in pickle.load(inp).items(): if key in self.buckets: self.buckets[key]['bins'].update(value['bins']) else: self.buckets[key] = value except: logging.warning('could not load related_%s data', self.NAME)
def execute_recreate(self): repairedfiles=[] recreatedfiles=[] if self.len_verified_actions>0: for f,retcode in self.verifiedfiles_repairable+self.verifiedfiles_err: yield 1 pars = glob.glob(glob.escape(f)+'*.par2') for p in pars: os.remove(p) recreatedfiles.append([ f , self.runpar([self.par_cmd,"c","-r"+self.percentage,"-n"+self.nr_parfiles,f]) ]) self.recreate = sorted(recreatedfiles) self.recreate_err = sorted([f for f,err in recreatedfiles if err !=0]) self.fixes = sorted([f for f,err in repairedfiles if err ==0]) self.fixes_err = sorted([f for f,err in repairedfiles if err !=0]) self.len_all_err = self.len_all_err + len(self.recreate_err) + len(self.fixes_err) return
def stream_reuters_documents(reuters_dir): """ Iterate over documents of the Reuters dataset. The Reuters archive will automatically be downloaded and uncompressed if the `data_path` directory does not exist. Documents are represented as dictionaries with 'body' (str), 'title' (str), 'topics' (list(str)) keys. """ parser = ReutersParser() for filename in glob(os.path.join(reuters_dir, "*.sgm")): for doc in parser.parse(open(filename, 'rb')): yield doc ##################### main ######################
def _admin(self, filename="user.admin.json"): """ Expect admin user file; otherwise, search for first system user. Update access_key, secret_key """ filepath = "{}/{}".format(self.pathname, filename) if os.path.exists(filepath): user = json.loads(open(filepath).read()) else: user = None for user_file in glob.glob("{}/user.*".format(self.pathname)): user = json.loads(open(user_file).read()) if 'system' in user and user['system'] == "true": break user = None if not user: # No system user log.error("No system user for radosgw found") return self.credentials['access_key'] = user['keys'][0]['access_key'] self.credentials['secret_key'] = user['keys'][0]['secret_key'] self.credentials['user_id'] = user['keys'][0]['user'] self.credentials['success'] = True
def _parse(line): """ Return globbed files constrained by optional slices or regexes. """ if " " in line: parts = re.split('\s+', line) files = sorted(glob.glob(parts[0])) for optional in parts[1:]: filter_type, value = optional.split('=') if filter_type == "re": regex = re.compile(value) files = [m.group(0) for l in files for m in [regex.search(l)] if m] elif filter_type == "slice": # pylint: disable=eval-used files = eval("files{}".format(value)) else: log.warning("keyword {} unsupported".format(filter_type)) else: files = glob.glob(line) return files
def master_minion(self): """ Verify that the master minion setting is a minion """ data = None node = None local = salt.client.LocalClient() for node in self.data.keys(): data = local.cmd(self.data[node]['master_minion'], 'pillar.get', ['master_minion'], expr_form="glob") break if data: self.passed['master_minion'] = "valid" else: if node: msg = "Could not find minion {}.".format(self.data[node]['master_minion']) msg += " Check /srv/pillar/ceph/master_minion.sls" else: msg = "Missing pillar data" self.errors['master_minion'] = [msg]
def _parse(self, line): """ Return globbed files constrained by optional slices or regexes. """ if " " in line: parts = re.split(r'\s+', line) files = sorted(glob.glob(parts[0])) for keyvalue in parts[1:]: key, value = keyvalue.split('=') if key == "re": regex = re.compile(value) files = [match.group(0) for _file in files for match in [regex.search(_file)] if match] elif key == "slice": # pylint: disable=eval-used files = eval("files{}".format(value)) else: log.warning("keyword {} unsupported".format(key)) else: files = glob.glob(line) return files
def pairs(): """ Return an array of devices and paths """ _paths = [pathname for pathname in glob.glob("/var/lib/ceph/osd/*")] _pairs = [] with open('/proc/mounts') as mounts: for line in mounts: _partition, path = line.split()[:2] if path in _paths: match = re.match(r'^(.+)\d+$', _partition) device = match.group(1) if 'nvme' in device: device = device[:-1] _pairs.append([device, path]) return _pairs
def do_egg_install(self): easy_install = self.distribution.get_command_class('easy_install') cmd = easy_install( self.distribution, args="x", root=self.root, record=self.record, ) cmd.ensure_finalized() # finalize before bdist_egg munges install cmd cmd.always_copy_from = '.' # make sure local-dir eggs get installed # pick up setup-dir .egg files only: no .egg-info cmd.package_index.scan(glob.glob('*.egg')) self.run_command('bdist_egg') args = [self.distribution.get_command_obj('bdist_egg').egg_output] if setuptools.bootstrap_install_from: # Bootstrap self-installation of setuptools args.insert(0, setuptools.bootstrap_install_from) cmd.args = args cmd.run() setuptools.bootstrap_install_from = None # XXX Python 3.1 doesn't see _nc if this is inside the class
def run(self): self.run_command("egg_info") from glob import glob for pattern in self.match: pattern = self.distribution.get_name()+'*'+pattern files = glob(os.path.join(self.dist_dir,pattern)) files = [(os.path.getmtime(f),f) for f in files] files.sort() files.reverse() log.info("%d file(s) matching %s", len(files), pattern) files = files[self.keep:] for (t,f) in files: log.info("Deleting %s", f) if not self.dry_run: os.unlink(f)
def path_hash(path): """Generate a hash checksum of all files matching 'path'. Standard wildcards like '*' and '?' are supported, see documentation for the 'glob' module for more information. :return: dict: A { filename: hash } dictionary for all matched files. Empty if none found. """ return { filename: file_hash(filename) for filename in glob.iglob(path) }
def restart_on_change(restart_map, stopstart=False, restart_functions=None): """Restart services based on configuration files changing This function is used a decorator, for example:: @restart_on_change({ '/etc/ceph/ceph.conf': [ 'cinder-api', 'cinder-volume' ] '/etc/apache/sites-enabled/*': [ 'apache2' ] }) def config_changed(): pass # your code here In this example, the cinder-api and cinder-volume services would be restarted if /etc/ceph/ceph.conf is changed by the ceph_client_changed function. The apache2 service would be restarted if any file matching the pattern got changed, created or removed. Standard wildcards are supported, see documentation for the 'glob' module for more information. @param restart_map: {path_file_name: [service_name, ...] @param stopstart: DEFAULT false; whether to stop, start OR restart @param restart_functions: nonstandard functions to use to restart services {svc: func, ...} @returns result from decorated function """ def wrap(f): @functools.wraps(f) def wrapped_f(*args, **kwargs): return restart_on_change_helper( (lambda: f(*args, **kwargs)), restart_map, stopstart, restart_functions) return wrapped_f return wrap
def is_phy_iface(interface): """Returns True if interface is not virtual, otherwise False.""" if interface: sys_net = '/sys/class/net' if os.path.isdir(sys_net): for iface in glob.glob(os.path.join(sys_net, '*')): if '/virtual/' in os.path.realpath(iface): continue if interface == os.path.basename(iface): return True return False
def juju_version(): """Full version string (eg. '1.23.3.1-trusty-amd64')""" # Per https://bugs.launchpad.net/juju-core/+bug/1455368/comments/1 jujud = glob.glob('/var/lib/juju/tools/machine-*/jujud')[0] return subprocess.check_output([jujud, 'version'], universal_newlines=True).strip()
def get_bridges(vnic_dir='/sys/devices/virtual/net'): """Return a list of bridges on the system.""" b_regex = "%s/*/bridge" % vnic_dir return [x.replace(vnic_dir, '').split('/')[1] for x in glob.glob(b_regex)]
def ls(filename): return sorted(glob(filename))