我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.rename()。
def client_proc(job_id, data_file, rtask, task=None): # send input file to rtask.location; this will be saved to dispycos process's # working directory if (yield pycos.Pycos().send_file(rtask.location, data_file, timeout=10)) < 0: print('Could not send input data to %s' % rtask.location) # terminate remote task rtask.send(None) raise StopIteration(-1) # send info about input obj = C(job_id, data_file, random.uniform(5, 8), task) if (yield rtask.deliver(obj)) != 1: print('Could not send input to %s' % rtask.location) raise StopIteration(-1) # rtask sends result to this task as message result = yield task.receive() if not result.result_file: print('Processing %s failed' % obj.i) raise StopIteration(-1) # rtask saves results file at this client, which is saved in pycos's # dest_path, not current working directory! result_file = os.path.join(pycos.Pycos().dest_path, result.result_file) # move file to cwd target = os.path.join(os.getcwd(), os.path.basename(result_file)) os.rename(result_file, target) print(' job %s output is in %s' % (obj.i, target))
def atomic_writer(file_path, mode): """Atomic file writer. :param file_path: path of file to write to. :type file_path: ``unicode`` :param mode: sames as for `func:open` :type mode: string .. versionadded:: 1.12 Context manager that ensures the file is only written if the write succeeds. The data is first written to a temporary file. """ temp_suffix = '.aw.temp' temp_file_path = file_path + temp_suffix with open(temp_file_path, mode) as file_obj: try: yield file_obj os.rename(temp_file_path, file_path) finally: try: os.remove(temp_file_path) except (OSError, IOError): pass
def _atomic_write(filename): path = os.path.dirname(filename) try: file = tempfile.NamedTemporaryFile(delete=False, dir=path, mode="w+") yield file file.flush() os.fsync(file.fileno()) os.rename(file.name, filename) finally: try: os.remove(file.name) except OSError as e: if e.errno == 2: pass else: raise e
def fetch_increment_and_clean(uuid): cpu_acct = 0.0 mem_acct = 0.0 cnt_acct = 0 try: fetch_path = '%s/%s/%f' % (system_manager.db_prefix, uuid, time.time()) os.rename('%s/%s/usage' % (system_manager.db_prefix, uuid), fetch_path) with open(fetch_path, 'r') as fp: line = fp.readline() while line != '': [cpu, mem] = line.split() line = fp.readline() cnt_acct += 1 cpu_acct += float(cpu) mem_acct += float(mem) os.remove(fetch_path) except: pass return {"cpu_acct": cpu_acct, "mem_acct": mem_acct, "cnt_acct": cnt_acct}
def zap_pyfiles(self): log.info("Removing .py files from temporary directory") for base, dirs, files in walk_egg(self.bdist_dir): for name in files: path = os.path.join(base, name) if name.endswith('.py'): log.debug("Deleting %s", path) os.unlink(path) if base.endswith('__pycache__'): path_old = path pattern = r'(?P<name>.+)\.(?P<magic>[^.]+)\.pyc' m = re.match(pattern, name) path_new = os.path.join(base, os.pardir, m.group('name') + '.pyc') log.info("Renaming file from [%s] to [%s]" % (path_old, path_new)) try: os.remove(path_new) except OSError: pass os.rename(path_old, path_new)
def trash_old_stuff(trashlist, trashpath, newpath): if isinstance(trashlist, list): for old_location in trashlist: # Get the subfolders needed to be created path_within_destination=os.path.relpath(old_location, trashpath) # Create what will be the destination path new_location=os.path.join(newpath, path_within_destination) # Make sure all the relevant subfolders exist in the destination if not os.path.exists(os.path.dirname(new_location)): os.makedirs(os.path.dirname(new_location)) # Even though we've been double-checking paths all along, let's just make one last check if os.path.exists(old_location) and os.path.isdir(newpath): os.rename(old_location, new_location) logging.info("Moving %s to %s\n" % (old_location, new_location)) else: logging.error("One of %s or %s does not exist\n" % (old_location, new_location)) else: logging.error("%s is not a valid list\n" % trashlist) # Function that checks paths are writable
def main(args): if args.set: with open(args.file) as f: config = ruamel.yaml.load(f, ruamel.yaml.RoundTripLoader) for k, v in args.set: v = ruamel.yaml.safe_load(v) # config[k] = v item = config # allow nested keys keys = k.split('.') for i in keys[:-1]: item = item[i] item[keys[-1]] = v tmpfile = args.file + '.tmp' with open(tmpfile, 'w') as f: print(ruamel.yaml.dump(config, Dumper=ruamel.yaml.RoundTripDumper), end='', file=f) os.rename(tmpfile, args.file) else: with open(args.file) as f: config = ruamel.yaml.safe_load(f) print(ruamel.yaml.dump(config), end='')
def doRollover(self): """ Do a rollover, as described in __init__(). """ if self.stream: self.stream.close() self.stream = None if self.backupCount > 0: for i in range(self.backupCount - 1, 0, -1): sfn = "%s.%d" % (self.baseFilename, i) dfn = "%s.%d" % (self.baseFilename, i + 1) if os.path.exists(sfn): #print "%s -> %s" % (sfn, dfn) if os.path.exists(dfn): os.remove(dfn) os.rename(sfn, dfn) dfn = self.baseFilename + ".1" if os.path.exists(dfn): os.remove(dfn) os.rename(self.baseFilename, dfn) #print "%s -> %s" % (self.baseFilename, dfn) self.mode = 'w' self.stream = self._open()
def write_file(self, new_text, filename, old_text, encoding): if not self.nobackups: # Make backup backup = filename + ".bak" if os.path.lexists(backup): try: os.remove(backup) except os.error, err: self.log_message("Can't remove backup %s", backup) try: os.rename(filename, backup) except os.error, err: self.log_message("Can't rename %s to %s", filename, backup) # Actually write the new file write = super(StdoutRefactoringTool, self).write_file write(new_text, filename, old_text, encoding) if not self.nobackups: shutil.copymode(backup, filename)
def mimify(infile, outfile): """Convert 8bit parts of a MIME mail message to quoted-printable.""" if type(infile) == type(''): ifile = open(infile) if type(outfile) == type('') and infile == outfile: import os d, f = os.path.split(infile) os.rename(infile, os.path.join(d, ',' + f)) else: ifile = infile if type(outfile) == type(''): ofile = open(outfile, 'w') else: ofile = outfile nifile = File(ifile, None) mimify_part(nifile, ofile, 0) ofile.flush()
def removemessages(self, list): """Remove one or more messages -- may raise os.error.""" errors = [] deleted = [] for n in list: path = self.getmessagefilename(n) commapath = self.getmessagefilename(',' + str(n)) try: os.unlink(commapath) except os.error: pass try: os.rename(path, commapath) except os.error, msg: errors.append(msg) else: deleted.append(n) if deleted: self.removefromallsequences(deleted) if errors: if len(errors) == 1: raise os.error, errors[0] else: raise os.error, ('multiple errors:', errors)
def copymessage(self, n, tofolder, ton): """Copy one message over a specific destination message, which may or may not already exist.""" path = self.getmessagefilename(n) # Open it to check that it exists f = open(path) f.close() del f topath = tofolder.getmessagefilename(ton) backuptopath = tofolder.getmessagefilename(',%d' % ton) try: os.rename(topath, backuptopath) except os.error: pass ok = 0 try: tofolder.setlast(None) shutil.copy2(path, topath) ok = 1 finally: if not ok: try: os.unlink(topath) except os.error: pass
def __setitem__(self, key, message): """Replace the keyed message; raise KeyError if it doesn't exist.""" old_subpath = self._lookup(key) temp_key = self.add(message) temp_subpath = self._lookup(temp_key) if isinstance(message, MaildirMessage): # temp's subdir and suffix were specified by message. dominant_subpath = temp_subpath else: # temp's subdir and suffix were defaults from add(). dominant_subpath = old_subpath subdir = os.path.dirname(dominant_subpath) if self.colon in dominant_subpath: suffix = self.colon + dominant_subpath.split(self.colon)[-1] else: suffix = '' self.discard(key) new_path = os.path.join(self._path, subdir, key + suffix) os.rename(os.path.join(self._path, temp_subpath), new_path) if isinstance(message, MaildirMessage): os.utime(new_path, (os.path.getatime(new_path), message.get_date()))
def pack(self): """Re-name messages to eliminate numbering gaps. Invalidates keys.""" sequences = self.get_sequences() prev = 0 changes = [] for key in self.iterkeys(): if key - 1 != prev: changes.append((key, prev + 1)) if hasattr(os, 'link'): os.link(os.path.join(self._path, str(key)), os.path.join(self._path, str(prev + 1))) os.unlink(os.path.join(self._path, str(key))) else: os.rename(os.path.join(self._path, str(key)), os.path.join(self._path, str(prev + 1))) prev += 1 self._next_key = prev + 1 if len(changes) == 0: return for name, key_list in sequences.items(): for old, new in changes: if old in key_list: key_list[key_list.index(old)] = new self.set_sequences(sequences)
def munge(src_dir): # stored as: ./MCG-COCO-val2014-boxes/COCO_val2014_000000193401.mat # want: ./MCG/mat/COCO_val2014_0/COCO_val2014_000000141/COCO_val2014_000000141334.mat files = os.listdir(src_dir) for fn in files: base, ext = os.path.splitext(fn) # first 14 chars / first 22 chars / all chars + .mat # COCO_val2014_0/COCO_val2014_000000447/COCO_val2014_000000447991.mat first = base[:14] second = base[:22] dst_dir = os.path.join('MCG', 'mat', first, second) if not os.path.exists(dst_dir): os.makedirs(dst_dir) src = os.path.join(src_dir, fn) dst = os.path.join(dst_dir, fn) print 'MV: {} -> {}'.format(src, dst) os.rename(src, dst)
def _clean_upgrade(binary_ok, binary_path, path, temp_path): if binary_ok: import stat # save the permissions from the current binary old_stat = os.stat(binary_path) # rename the current binary in order to replace it with the latest os.rename(binary_path, path + "/old") os.rename(temp_path, binary_path) # set the same permissions that had the previous binary os.chmod(binary_path, old_stat.st_mode | stat.S_IEXEC) # delete the old binary os.remove(path + "/old") print("mongoaudit updated, restarting...") os.execl(binary_path, binary_path, *sys.argv) else: os.remove(temp_path) print("couldn't download the latest binary")
def download_driver_file(whichbin, url, base_path): if url.endswith('.tar.gz'): ext = '.tar.gz' else: ext = '.zip' print("Downloading from: {}".format(url)) download_file(url, '/tmp/pwr_temp{}'.format(ext)) if ext == '.tar.gz': import tarfile tar = tarfile.open('/tmp/pwr_temp{}'.format(ext), "r:gz") tar.extractall('{}/'.format(base_path)) tar.close() else: import zipfile with zipfile.ZipFile('/tmp/pwr_temp{}'.format(ext), "r") as z: z.extractall('{}/'.format(base_path)) # if whichbin == 'wires' and '/v{}/'.format(latest_gecko_driver) in url: # os.rename('{}/geckodriver'.format(base_path), # '{}/wires'.format(base_path)) # os.chmod('{}/wires'.format(base_path), 0o775) if whichbin == 'wires': os.chmod('{}/geckodriver'.format(base_path), 0o775) else: os.chmod('{}/chromedriver'.format(base_path), 0o775)
def operate_file_style(file_style = "csv", bases_dir = "../season_1/", is_add = True): """ add the style into the none style files """ if not os.path.exists(bases_dir): raise IOError("Path is not existed!...") if not os.path.isdir(bases_dir): raise IOError("This is not a dir!...") files = os.listdir(bases_dir) for file in files: file_path = os.path.join(bases_dir, file) if os.path.isdir(file_path): operate_file_style(file_style, file_path, is_add) else: if is_required_file(file, file_style, is_add): new_file = add_style_suffix(file, file_style) \ if is_add else remove_style_suffix(file, file_style) new_file_path = os.path.join(bases_dir, new_file) os.rename(file_path, new_file_path)
def upload_file(upload_file_name, temp): # upload_file_name????? # ??? saveas??? # ?????????,??git???saveas #key = md5(str(time.time())+''.join(random.sample(string.letters, 12))).hexdigest() # key ?????? print u"??????: ", pic_name = raw_input() uuid_6 = uuid.uuid4().get_hex()[:8] #????? key = pic_name+"_"+uuid_6+".png" copyfile(upload_file_name,join(saveas,key)) mime_type = 'image/png' token = q.upload_token(bucket, key) ret, info = put_file(token, key, upload_file_name, mime_type=mime_type, check_crc=True) print 'upload qiniu result:', info assert ret['key'] == key assert ret['hash'] == etag(upload_file_name) os.rename(upload_file_name, upload_file_name+'.old') return domain+'/'+key
def download_celeb_a(base_path): data_path = os.path.join(base_path, 'celebA') images_path = os.path.join(data_path, 'images') if os.path.exists(data_path): print('[!] Found celeb-A - skip') return filename, drive_id = "img_align_celeba.zip", "0B7EVK8r0v71pZjFTYXZWM3FlRnM" save_path = os.path.join(base_path, filename) if os.path.exists(save_path): print('[*] {} already exists'.format(save_path)) else: download_file_from_google_drive(drive_id, save_path) zip_dir = '' with zipfile.ZipFile(save_path) as zf: zip_dir = zf.namelist()[0] zf.extractall(base_path) if not os.path.exists(data_path): os.mkdir(data_path) os.rename(os.path.join(base_path, "img_align_celeba"), images_path) os.remove(save_path) download_attr_file(data_path)
def _rename_atomic(src, dst): ta = _CreateTransaction(None, 0, 0, 0, 0, 1000, 'Werkzeug rename') if ta == -1: return False try: retry = 0 rv = False while not rv and retry < 100: rv = _MoveFileTransacted(src, dst, None, None, _MOVEFILE_REPLACE_EXISTING | _MOVEFILE_WRITE_THROUGH, ta) if rv: rv = _CommitTransaction(ta) break else: time.sleep(0.001) retry += 1 return rv finally: _CloseHandle(ta)
def rename(src, dst): # Try atomic or pseudo-atomic rename if _rename(src, dst): return # Fall back to "move away and replace" try: os.rename(src, dst) except OSError as e: if e.errno != errno.EEXIST: raise old = "%s-%08x" % (dst, random.randint(0, sys.maxint)) os.rename(dst, old) os.rename(src, dst) try: os.unlink(old) except Exception: pass
def _sync(self): self._sync_index() get_file = os.path.join(self.name, str(self.head)) temp_file = open(self.temp_file, 'wb') self.marshal.dump(self.get_cache, temp_file) temp_file.close() if os.path.exists(get_file): os.remove(get_file) os.rename(self.temp_file, get_file) put_file = os.path.join(self.name, str(self.tail)) temp_file = open(self.temp_file, 'wb') self.marshal.dump(self.put_cache, temp_file) temp_file.close() if os.path.exists(put_file): os.remove(put_file) os.rename(self.temp_file, put_file)
def save_pickle(self, dumpfile=DUMPFILE): if not self.changed: self.note(0, "\nNo need to save checkpoint") elif not dumpfile: self.note(0, "No dumpfile, won't save checkpoint") else: self.note(0, "\nSaving checkpoint to %s ...", dumpfile) newfile = dumpfile + ".new" f = open(newfile, "wb") pickle.dump(self, f) f.close() try: os.unlink(dumpfile) except os.error: pass os.rename(newfile, dumpfile) self.note(0, "Done.") return 1
def makedirs(dir): if not dir: return if os.path.exists(dir): if not os.path.isdir(dir): try: os.rename(dir, dir + ".bak") os.mkdir(dir) os.rename(dir + ".bak", os.path.join(dir, "index.html")) except os.error: pass return head, tail = os.path.split(dir) if not tail: print "Huh? Don't know how to make dir", dir return makedirs(head) os.mkdir(dir, 0777)
def restore_backup(self): if self._verbose:print("Restoring backup") if self._verbose:print("Backing up current addon folder") backuploc = os.path.join(self._updater_path,"backup") tempdest = os.path.join(self._addon_root, os.pardir, self._addon+"_updater_backup_temp") tempdest = os.path.abspath(tempdest) # make the copy shutil.move(backuploc,tempdest) shutil.rmtree(self._addon_root) os.rename(tempdest,self._addon_root) self._json["backup_date"] = "" self._json["just_restored"] = True self._json["just_updated"] = True self.save_updater_json() self.reload_addon()
def deepMergeDirectory(self,base,merger): if not os.path.exists(base): if self._verbose:print("Base path does not exist") return -1 elif not os.path.exists(merger): if self._verbose:print("Merger path does not exist") return -1 # this should have better error handling # and also avoid the addon dir # Could also do error handling outside this function for path, dirs, files in os.walk(merger): relPath = os.path.relpath(path, merger) destPath = os.path.join(base, relPath) if not os.path.exists(destPath): os.makedirs(destPath) for file in files: destFile = os.path.join(destPath, file) if os.path.isfile(destFile): os.remove(destFile) srcFile = os.path.join(path, file) os.rename(srcFile, destFile)
def _download(args): url, folderName, index = args session = setupSession() try: # time out is another parameter tuned # fit for the network about 10Mb image = session.get(url, timeout = 5) imageName = str(index) with open(os.path.join(folderName, imageName),'wb') as fout: fout.write(image.content) fileExtension = imghdr.what(os.path.join(folderName, imageName)) if fileExtension is None: os.remove(os.path.join(folderName, imageName)) else: newName = imageName + '.' + str(fileExtension) os.rename(os.path.join(folderName, imageName), os.path.join(folderName, newName)) except Exception as e: print ("failed to download one pages with url of " + str(url)) # basic funciton to get id list
def _download(url, imageName, folderName): session = _setupSession() try: # time out is another parameter tuned # fit for the network about 10Mb image = session.get(url, timeout = 5) with open(os.path.join(folderName, imageName),'wb') as fout: fout.write(image.content) fileExtension = imghdr.what(os.path.join(folderName, imageName)) if fileExtension is None: os.remove(os.path.join(folderName, imageName)) else: newName = imageName + '.' + str(fileExtension) os.rename(os.path.join(folderName, imageName), os.path.join(folderName, newName)) except Exception as e: print ("failed to download one pages with url of " + str(url)) # wrapper for map function
def Set(self,key,data): path = self._GetPath(key) directory = os.path.dirname(path) if not os.path.exists(directory): os.makedirs(directory) if not os.path.isdir(directory): raise _FileCacheError('%s exists but is not a directory' % directory) temp_fd, temp_path = tempfile.mkstemp() temp_fp = os.fdopen(temp_fd, 'w') temp_fp.write(data) temp_fp.close() if not path.startswith(self._root_directory): raise _FileCacheError('%s does not appear to live under %s' % (path, self._root_directory)) if os.path.exists(path): os.remove(path) os.rename(temp_path, path)
def setUp(self): test_configuration = \ '''DATA_DIRECTORY: ~/ TODO_FILE: ~/footodo.txt TODO_START_TAG: '' TODO_STOP_TAG: '' ''' self.test_conf_file = os.path.expanduser(os.path.join('~', '.letsdo')) self.user_conf_bak = os.path.expanduser(os.path.join('~', '.letsdo.bak')) if os.path.exists(self.test_conf_file): # Backup user configuration file os.rename(self.test_conf_file, self.user_conf_bak) # Create test configuration file with open(self.test_conf_file, 'w') as fconf: fconf.write(test_configuration) self.conf = Configuration() if os.path.exists(self.conf.data_fullpath): os.remove(self.conf.data_fullpath) if os.path.exists(self.conf.task_fullpath): os.remove(self.conf.task_fullpath)
def prepare_inception_data(o_dir, i_dir): if not os.path.exists(o_dir): os.makedirs(o_dir) cnt = 0 bar = progressbar.ProgressBar(redirect_stdout=True, max_value=progressbar.UnknownLength) for root, subFolders, files in os.walk(i_dir): if files: for f in files: if 'jpg' in f: f_name = str(cnt) + '_ins.' + f.split('.')[-1] cnt += 1 file_dir = os.path.join(root, f) dest_path = os.path.join(o_dir, f) dest_new_name = os.path.join(o_dir, f_name) copy(file_dir, o_dir) os.rename(dest_path, dest_new_name) bar.update(cnt) bar.finish() print('Total number of files: {}'.format(cnt))
def download(url, target, expected_hash): if filesystem.calculate_hash(target) == expected_hash: return count = 0 while filesystem.calculate_hash(TEMP_FILE) != expected_hash: count += 1 if count > 5: os.remove(TEMP_FILE) raise OSError("Aborting download of %s after 5 unsuccessful attempts" % url) try: http_client.get(url).raise_for_status().download_to(TEMP_FILE) except OSError: pass # If it already exists the rename will fail try: os.remove(target) except OSError: pass os.rename(TEMP_FILE, target)
def get_pkg_dir(self, pkgname, pkgver, subdir): pkgdir = os.path.join(get_distnet_cache(), pkgname, pkgver) if not os.path.isdir(pkgdir): os.makedirs(pkgdir) target = os.path.join(pkgdir, subdir) if os.path.exists(target): return target (fd, tmp) = tempfile.mkstemp(dir=pkgdir) try: os.close(fd) self.download_to_file(pkgname, pkgver, subdir, tmp) if subdir == REQUIRES: os.rename(tmp, target) else: self.extract_tar(subdir, pkgdir, tmp) finally: try: os.remove(tmp) except OSError: pass return target
def create_seed_and_test_random(factor, start_id): # Only use 1/factor of the crop images # for example there are 10000 crops and a factor of 100 #then only 100 of them would be the random seed and test images. # A factor of 0 would be 100% # This should be changed to percent! crops = [] image_ids = [] for filename in glob.iglob(crop_dir + '*.png'): crops.append(filename) for filename in crops: renamed = filename.replace("_", "") image_id = int(renamed.replace('.png', '').replace('/home/pkrush/cents/', '')) if image_id < start_id: continue renamed = crop_dir + str(image_id) + '.png' os.rename(filename, renamed) rand_int = random.randint(0, factor) if rand_int == 0: image_ids.append(image_id) pickle.dump(image_ids, open(data_dir + 'seed_image_ids.pickle', "wb")) pickle.dump(image_ids, open(data_dir + 'test_image_ids.pickle', "wb"))
def rander(self): writers = os.listdir(conf.train_path) for writer in writers: test_writer_path = '{:s}/{:s}'.format(conf.test_path, writer) train_writer_path = '{:s}/{:s}'.format(conf.train_path, writer) if not os.path.isdir(train_writer_path): continue # make sure path: {test_path}/{writer} exist if not os.path.exists(test_writer_path): os.mkdir(test_writer_path) # move train file as test file files = os.listdir('{:s}/{:s}'.format(conf.train_path, writer)) for file in files: # (0, split) to move train file => test file if random.random() < self.split: os.rename('{:s}/{:s}'.format(train_writer_path, file), '{:s}/{:s}'.format(test_writer_path, file)) return self
def recover(self): writers = os.listdir(conf.test_path) for writer in writers: test_writer_path = '{:s}/{:s}'.format(conf.test_path, writer) train_writer_path = '{:s}/{:s}'.format(conf.train_path, writer) if not os.path.isdir(train_writer_path): continue # make sure path: {train_path}/{writer} exist if not os.path.exists(train_writer_path): os.mkdir(train_writer_path) # move test file to train file files = os.listdir('{:s}/{:s}'.format(conf.test_path, writer)) for file in files: os.rename('{:s}/{:s}'.format(test_writer_path, file), '{:s}/{:s}'.format(train_writer_path, file)) return self
def build_identify(self, writer): X_list = [] dict_map = {} train_writer_path = '{:s}/{:s}'.format(conf.train_path, writer) files = os.listdir(train_writer_path) for file in files: tmp_path = '{:s}/{:s}'.format(train_writer_path, file) img_data = self.read(tmp_path) if img_data is None: continue X_list.append(img_data) letter = self.predict(np.array(img_data))[0] if letter not in dict_map: dict_map[letter] = 0 else: dict_map[letter] += 1 print('=> rename: {:s} => {:s}_{:d}.jpg'.format(file, letter, dict_map[letter])) os.rename(tmp_path, '{:s}/{:s}_{:d}.jpg'.format(train_writer_path, letter, dict_map[letter])) return self
def write_file(path, fcont, uid=None, gid=None, atomic=False): if not atomic: return _write_file(path, fcont, uid, gid) tmp_path = '{path}._tmp_.{pid}_{timestamp}'.format( path=path, pid=os.getpid(), timestamp=timeutil.ns(), ) _write_file(tmp_path, fcont, uid, gid) try: os.rename(tmp_path, path) except EnvironmentError: os.remove(tmp_path) raise
def removeall_clicked(self, button, store): """ @description: Same as the past function but remove all lines of the treeview """ # if there is still an entry in the model old = expanduser('~') + '/.config/mama/mama.xml' new = expanduser('~') + '/.config/mama/.mama.bak' if os.path.exists(old): os.rename(old, new) if len(store) != 0: # remove all the entries in the model self.labelState.set_text('Remove all commands') for i in range(len(store)): iter = store.get_iter(0) store.remove(iter) self.saveTree(store) print("Empty list")
def set_docker_compose_version_to(dir, repo_docker, tag): """Modifies docker-compose files in the given directory so that repo_docker image points to the given tag.""" compose_files = docker_compose_files_list(dir) for filename in compose_files: old = open(filename) new = open(filename + ".tmp", "w") for line in old: # Replace build tag with a new one. line = re.sub(r"^(\s*image:\s*mendersoftware/%s:)\S+(\s*)$" % re.escape(repo_docker), r"\g<1>%s\2" % tag, line) new.write(line) new.close() old.close() os.rename(filename + ".tmp", filename)
def save_yaml(self, filename: str=None): """ Serialize the hierarchy to a file. :param filename: Target filename, autogenerated if None """ if filename is None: filename = self.config_filename with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(filename), delete=False) as temp: header = self._yaml_header() if header is not None: temp.write(header) yaml.round_trip_dump(self, stream=temp) tempname = temp.name os.rename(tempname, filename) if filename in self.__class__._yaml_cache: del self.__class__._yaml_cache[filename] # YAML library configuration
def download_celeb_a(dirpath): data_dir = 'celebA' if os.path.exists(os.path.join(dirpath, data_dir)): print('Found Celeb-A - skip') return filename, drive_id = "img_align_celeba.zip", "0B7EVK8r0v71pZjFTYXZWM3FlRnM" save_path = os.path.join(dirpath, filename) if os.path.exists(save_path): print('[*] {} already exists'.format(save_path)) else: download_file_from_google_drive(drive_id, save_path) zip_dir = '' with zipfile.ZipFile(save_path) as zf: zip_dir = zf.namelist()[0] zf.extractall(dirpath) os.remove(save_path) os.rename(os.path.join(dirpath, zip_dir), os.path.join(dirpath, data_dir))