我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.symlink()。
def src_proc_dispatcher(pkg_name, src_tbl_name, src_loc): tobj = tempfile.mkdtemp(dir='/var/cache/acbs/build/', prefix='acbs.') src_tbl_loc = os.path.join(src_loc, src_tbl_name) shadow_ark_loc = os.path.join(tobj, src_tbl_name) if os.path.isdir(src_tbl_loc): print('[I] Making a copy of the source directory...', end='') try: shutil.copytree(src=src_tbl_loc, dst=shadow_ark_loc) except: print('Failed!') return False print('Done!') return True, tobj else: os.symlink(src_tbl_loc, shadow_ark_loc) # print('[D] Source location: {}, Shadow link: {}'.format(src_tbl_loc, shadow_ark_loc)) return decomp_file(shadow_ark_loc, tobj), tobj
def makelink(self, tarinfo, targetpath): """Make a (symbolic) link called targetpath. If it cannot be created (platform limitation), we try to make a copy of the referenced file instead of a link. """ try: # For systems that support symbolic and hard links. if tarinfo.issym(): os.symlink(tarinfo.linkname, targetpath) else: # See extract(). if os.path.exists(tarinfo._link_target): os.link(tarinfo._link_target, targetpath) else: self._extract_member(self._find_link_target(tarinfo), targetpath) except symlink_exception: try: self._extract_member(self._find_link_target(tarinfo), targetpath) except KeyError: raise ExtractError("unable to resolve link inside archive")
def makelink(self, tarinfo, targetpath): """Make a (symbolic) link called targetpath. If it cannot be created (platform limitation), we try to make a copy of the referenced file instead of a link. """ if hasattr(os, "symlink") and hasattr(os, "link"): # For systems that support symbolic and hard links. if tarinfo.issym(): if os.path.lexists(targetpath): os.unlink(targetpath) os.symlink(tarinfo.linkname, targetpath) else: # See extract(). if os.path.exists(tarinfo._link_target): if os.path.lexists(targetpath): os.unlink(targetpath) os.link(tarinfo._link_target, targetpath) else: self._extract_member(self._find_link_target(tarinfo), targetpath) else: try: self._extract_member(self._find_link_target(tarinfo), targetpath) except KeyError: raise ExtractError("unable to resolve link inside archive")
def execute(self): from ranger.container.file import File new_path = self.rest(1) cf = self.fm.thisfile if not new_path: return self.fm.notify('Syntax: relink <newpath>', bad=True) if not cf.is_link: return self.fm.notify('%s is not a symlink!' % cf.relative_path, bad=True) if new_path == os.readlink(cf.path): return try: os.remove(cf.path) os.symlink(new_path, cf.path) except OSError as err: self.fm.notify(err) self.fm.reset() self.fm.thisdir.pointed_obj = cf self.fm.thisfile = cf
def copytree(src, dst, symlinks = False, ignore = None): if not os.path.exists(dst): os.makedirs(dst) shutil.copystat(src, dst) lst = os.listdir(src) if ignore: excl = ignore(src, lst) lst = [x for x in lst if x not in excl] for item in lst: s = os.path.join(src, item) d = os.path.join(dst, item) if symlinks and os.path.islink(s): if os.path.lexists(d): os.remove(d) os.symlink(os.readlink(s), d) try: st = os.lstat(s) mode = stat.S_IMODE(st.st_mode) os.lchmod(d, mode) except: pass # lchmod not available elif os.path.isdir(s): copytree(s, d, symlinks, ignore) else: shutil.copy2(s, d)
def setUp(self): self.template_env = jinja2.Environment( loader=jinja2.FileSystemLoader("config") ) # create working dir self.working_dir = os.path.join(build_path + "run", self.id()) if os.path.exists(self.working_dir): shutil.rmtree(self.working_dir) os.makedirs(self.working_dir) try: # update the last_run link if os.path.islink(build_path + "last_run"): os.unlink(build_path + "last_run") os.symlink(build_path + "run/{}".format(self.id()), build_path + "last_run") except: # symlink is best effort and can fail when # running tests in parallel pass
def clone(self, name, user): """ create a clone of self with a given name, owned by user """ self.expiration = None # create the branch on the database new_branch = Branch(name, self.project, self, user) db.session.add(new_branch) db.session.commit() # clone repository in file system branch_path = os.path.abspath(join(os.getcwd(), 'repos', self.project.name, name, 'source')) self.get_repo().clone(branch_path, branch=self.name) os.symlink(os.path.abspath(join('repos', self.project.name, '_resources/low_resolution')), join(branch_path, '_resources')) branch_repo = git.Repo(branch_path) branch_repo.git.checkout('HEAD', b=name) config_repo(branch_repo, user.username, user.email) # build the source new_branch.build(timeout=60) return new_branch
def mk_tar_dir(issuer, test_profile): wd = os.getcwd() # Make sure there is a tar directory tardirname = wd for part in ["tar", issuer, test_profile]: tardirname = os.path.join(tardirname, part) if not os.path.isdir(tardirname): os.mkdir(tardirname) # Now walk through the log directory and make symlinks from # the log files to links in the tar directory logdirname = os.path.join(wd, "log", issuer, test_profile) for item in os.listdir(logdirname): if item.startswith("."): continue ln = os.path.join(logdirname, item) tn = os.path.join(tardirname, "{}.txt".format(item)) if os.path.isfile(tn): os.unlink(tn) if not os.path.islink(tn): os.symlink(ln, tn)
def close_compressed(filename, hdf5_file, compression_type='bz2', create_link=False): """Closes the compressed hdf5_file that was opened with open_compressed. When the file was opened for writing (using the 'w' flag in open_compressed), the created HDF5 file is compressed into the given file name. To be able to read the data using the real tools, a link with the correct extension might is created, when create_link is set to True. """ hdf5_file_name = hdf5_file.filename is_writable = hdf5_file.writable hdf5_file.close() if is_writable: # create compressed tar file tar = tarfile.open(filename, mode="w:" + compression_type) tar.add(hdf5_file_name, os.path.basename(filename)) tar.close() if create_link: extension = {'': '.tar', 'bz2': '.tar.bz2', 'gz': 'tar.gz'}[compression_type] link_file = filename + extension if not os.path.exists(link_file): os.symlink(os.path.basename(filename), link_file) # clean up locally generated files os.remove(hdf5_file_name)
def reorganize(dataset_dir): dirs = {} dirs['trainA'] = os.path.join(dataset_dir, 'link_trainA') dirs['trainB'] = os.path.join(dataset_dir, 'link_trainB') dirs['testA'] = os.path.join(dataset_dir, 'link_testA') dirs['testB'] = os.path.join(dataset_dir, 'link_testB') mkdir(dirs.values()) for key in dirs: try: os.remove(os.path.join(dirs[key], '0')) except: pass os.symlink(os.path.abspath(os.path.join(dataset_dir, key)), os.path.join(dirs[key], '0')) return dirs
def build_same_files_and_dirs(tmpdir, source_is_symlink, dest_is_symlink, use_files): """Build temporary files or directories to test indication of same source and destination. :param bool source_is_symlink: Should the source be a symlink to the destination (both cannot be True) :param bool dest is symlink: Should the destination be a symlink to the source (both cannot be True) :param bool use_files: Should files be created (if False, directories are used instead) """ if use_files: real = tmpdir.join('real') real.write('some data') else: real = tmpdir.mkdir('real') link = tmpdir.join('link') if source_is_symlink or dest_is_symlink: os.symlink(str(real), str(link)) if source_is_symlink: return str(link), str(real) elif dest_is_symlink: return str(real), str(link) return str(real), str(real)
def test_process_single_file_destination_is_symlink_to_source( tmpdir, patch_process_single_operation, standard_handler ): source = tmpdir.join('source') source.write('some data') destination = str(tmpdir.join('destination')) os.symlink(str(source), destination) with patch('aws_encryption_sdk_cli.internal.io_handling.open', create=True) as mock_open: standard_handler.process_single_file( stream_args=sentinel.stream_args, source=str(source), destination=destination ) assert not mock_open.called assert not patch_process_single_operation.called
def test_file_to_file_cycle_target_through_symlink(tmpdir): plaintext = tmpdir.join('source_plaintext') output_dir = tmpdir.mkdir('output') os.symlink(str(output_dir), str(tmpdir.join('output_link'))) ciphertext = tmpdir.join('output_link', 'ciphertext') decrypted = tmpdir.join('decrypted') with open(str(plaintext), 'wb') as f: f.write(os.urandom(1024)) encrypt_args = encrypt_args_template().format( source=str(plaintext), target=str(ciphertext) ) decrypt_args = decrypt_args_template().format( source=str(ciphertext), target=str(decrypted) ) aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows())) assert filecmp.cmp(str(plaintext), str(decrypted))
def render(self): # Render all instances for instance in self.instances: instance.render() # Render human readable links to each instance for instance in self.instances: try: os.symlink(instance.node_root, os.path.join( self.fixture_root, instance.instance_name)) except Exception: log.exception('Failed to create symlink to %s', instance.instance_name) # Render the master control script self.write_file(self.fixture_control, self.render_control(), perm=0o755) # Render the fixture specification file self.write_file(self.fixture_spec, self.render_spec())
def run(self): venv_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'venv', 'cthulhu') venv_cmd = [ 'virtualenv', venv_path ] print('Creating virtual environment in ', venv_path) subprocess.check_call(venv_cmd) print('Linking `activate` to top level of project.\n') print('To activate, simply run `source activate`.') try: os.symlink( os.path.join(venv_path, 'bin', 'activate'), os.path.join(os.path.dirname(os.path.abspath(__file__)), 'activate') ) except OSError: print('Unable to create symlink, you may have a stale symlink from a previous invocation.')
def symlink_ms(source, linkname): """Python 2 doesn't have os.symlink on windows so we do it ourselfs :param source: sourceFile :type source: str :param linkname: symlink path :type linkname: str :raises: WindowsError, raises when it fails to create the symlink if the user permissions are incorrect """ import ctypes csl = ctypes.windll.kernel32.CreateSymbolicLinkW csl.argtypes = (ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint32) csl.restype = ctypes.c_ubyte flags = 1 if os.path.isdir(source) else 0 try: if csl(linkname, source.replace('/', '\\'), flags) == 0: raise ctypes.WinError() except WindowsError: raise WindowsError("Failed to create symbolicLink due to user permissions")
def symlinks_supported(): """ A function to check if creating symlinks are supported in the host platform and/or if they are allowed to be created (e.g. on Windows it requires admin permissions). """ tmpdir = tempfile.mkdtemp() original_path = os.path.join(tmpdir, 'original') symlink_path = os.path.join(tmpdir, 'symlink') os.makedirs(original_path) try: os.symlink(original_path, symlink_path) supported = True except (OSError, NotImplementedError, AttributeError): supported = False else: os.remove(symlink_path) finally: os.rmdir(original_path) os.rmdir(tmpdir) return supported
def _find_link_target(self, tarinfo): """Find the target member of a symlink or hardlink member in the archive. """ if tarinfo.issym(): # Always search the entire archive. linkname = os.path.dirname(tarinfo.name) + "/" + tarinfo.linkname limit = None else: # Search the archive before the link, because a hard link is # just a reference to an already archived file. linkname = tarinfo.linkname limit = tarinfo member = self._getmember(linkname, tarinfo=limit, normalize=True) if member is None: raise KeyError("linkname %r not found" % linkname) return member
def enable_sv(sv, sv_dir, runsvdir): """ Enable the specified service, 'sv'. Assemble 'sv_path' and 'service_path' from 'sv', 'sv_dir', and 'runsvdir'. Make a symlink from 'sv_path' to 'service_path' and bail if we get one of a couple different exceptions. """ sv_path = os.path.join(sv_dir, sv) service_path = os.path.join(runsvdir, sv) try: check_sv_path(sv_path) os.symlink(sv_path, service_path) return True except NoSuchSvError: raise NoSuchSvError except PermissionError: raise NeedSudoError except FileExistsError: raise SvAlreadyEnabledError
def _find_link_target(self, tarinfo): """Find the target member of a symlink or hardlink member in the archive. """ if tarinfo.issym(): # Always search the entire archive. linkname = "/".join(filter(None, (os.path.dirname(tarinfo.name), tarinfo.linkname))) limit = None else: # Search the archive before the link, because a hard link is # just a reference to an already archived file. linkname = tarinfo.linkname limit = tarinfo member = self._getmember(linkname, tarinfo=limit, normalize=True) if member is None: raise KeyError("linkname %r not found" % linkname) return member
def link(repo): cheat_dir = get_cheat_path() sheet_dir = get_sheet_path(repo) if not os.path.isdir(sheet_dir): raise CheatExtException( "%s hadn't been installed yet at %s" % (repo, sheet_dir)) sheets = get_available_sheets_at(sheet_dir) state_sheets = get_sheets_with_state(cheat_dir, sheet_dir, sheets) _check_sheets_availability(state_sheets) for sheet, _ in filter_by_state(STATE_UNLINK, state_sheets): os.symlink( os.path.join(sheet_dir, sheet), os.path.join(cheat_dir, sheet))
def gen_stream(self, i): cwd = os.getcwd()+'/' res, prob_id = self.gen_data(i) if res == None or prob_id == None: return path = os.path.join('stream', prob_id) path2 = os.path.join('json', 'todo') path2 = os.path.join(d[prob_id], path2) data = concat_data(res) md5 = hashlib.md5(data).hexdigest() outfname = os.path.join(path, self.basename + '_' + md5 + '.json') if (md5 + '.json') in fset[prob_id]: logging.info('Skip %s due to same' % md5) return logging.info('Save Packet Stream: %s' % outfname) fset[prob_id].add(md5 + '.json') f = file(outfname, 'w') json.dump(res, f) outfname2 = os.path.join(path2, self.basename + '_' + md5 + '.json') outfname = cwd+outfname outfname2 = cwd+outfname2 os.symlink(outfname,outfname2)
def link_to_tutorials(): # Linking to the directory does not work well with # nbsphinx. We link to the files themselves from glob import glob from plotnine_examples.tutorials import TUTPATH dest_dir = os.path.join(CUR_PATH, 'tutorials') # Unlink files from previous build for old_file in glob(dest_dir + '/*.ipynb'): os.unlink(old_file) # Link files for this build for file in glob(TUTPATH + '/*.ipynb'): basename = os.path.basename(file) dest = os.path.join(dest_dir, basename) os.symlink(file, dest)
def config_mv(args): """[Not Supported Yet] Relocate config DB from its current location :return: None for success, string for error """ if not args.force: return err_out(DB_REF + " move to {} ".format(args.to) + "requires '--force' flag to execute the request.") # TODO: # this is pure convenience code, so it is very low priority; still, here are the steps: # checks if target exists upfront, and fail if it does # cp the DB instance 'to' , and flip the symlink. # refresh service (not really needed as next vmci_command handlers will pick it up) # need --dryrun or --confirm # issue: works really with discovery only , as others need to find it out printMessage(args.output_format, "Sorry, configuration move ('config mv' command) is not supported yet") return None
def safe_make_symlink(input_file_path,output_file_path): output_file_dir = os.path.dirname(output_file_path) # Verify the input file is actually there if not os.path.exists(input_file_path): raise Exception("can't find file %s"%input_file_path) safe_make_dirs(output_file_dir) try: os.symlink(input_file_path,output_file_path) except OSError as err: if err.errno == errno.EEXIST: # link already exists, check that it is identical to the one we are trying to put down old = os.path.realpath(input_file_path) new = os.path.realpath(output_file_path) if old != new: raise Exception('Existing file is different than the new symlink') else: raise
def pipetteSynchronousRunner(args): full_script_path = os.path.abspath(args[0]) scriptDir = os.path.dirname(full_script_path) communicationDirBase = args[1] fh_outdir = args[2] pipelineCmdStr = ' '.join(args[3:]) if os.path.exists(communicationDirBase): fns = os.listdir(communicationDirBase) if len(fns)>0: raise Exception('For running single pipelines, comm_dir must start empty') if communicationDirBase not in pipelineCmdStr: raise Exception('Pipeline script must accept the comm_dir as an argument') os.symlink(fh_outdir,os.path.join(communicationDirBase,'firehose_outdir')) # run the pipeline, write its description to communicationDirBase/launch subprocess.check_call(pipelineCmdStr,shell=True) runMode = 'runone' retryMode = 'False' main = pipetteServer.Main() main.run_server(communicationDirBase, scriptDir, runMode, retryMode) # if one or more pipelines fail, an exception will be thrown once all that can be run has been attempted.
def add_gstreamer_packages(): import os import sys from distutils.sysconfig import get_python_lib dest_dir = get_python_lib() packages = ['gobject', 'glib', 'pygst', 'pygst.pyc', 'pygst.pth', 'gst-0.10', 'pygtk.pth', 'pygtk.py', 'pygtk.pyc'] python_version = sys.version[:3] global_path = os.path.join('/usr/lib', 'python' + python_version) global_sitepackages = [os.path.join(global_path, 'dist-packages'), # for Debian-based os.path.join(global_path, 'site-packages')] # for others for package in packages: for pack_dir in global_sitepackages: src = os.path.join(pack_dir, package) dest = os.path.join(dest_dir, package) if not os.path.exists(dest) and os.path.exists(src): os.symlink(src, dest)
def manually_disable_aa_profile(self): """ Manually disable an apparmor profile. If aa-profile-mode is set to disabled (default) this is required as the template has been written but apparmor is yet unaware of the profile and aa-disable aa-profile fails. Without this the profile would kick into enforce mode on the next service restart. """ profile_path = '/etc/apparmor.d' disable_path = '/etc/apparmor.d/disable' if not os.path.lexists(os.path.join(disable_path, self.aa_profile)): os.symlink(os.path.join(profile_path, self.aa_profile), os.path.join(disable_path, self.aa_profile))
def acquire(self, timeout=None): # Hopefully unnecessary for symlink. # try: # open(self.unique_name, "wb").close() # except IOError: # raise LockFailed("failed to create %s" % self.unique_name) timeout = timeout if timeout is not None else self.timeout end_time = time.time() if timeout is not None and timeout > 0: end_time += timeout while True: # Try and create a symbolic link to it. try: os.symlink(self.unique_name, self.lock_file) except OSError: # Link creation failed. Maybe we've double-locked? if self.i_am_locking(): # Linked to out unique name. Proceed. return else: # Otherwise the lock creation failed. if timeout is not None and time.time() > end_time: if timeout > 0: raise LockTimeout("Timeout waiting to acquire" " lock for %s" % self.path) else: raise AlreadyLocked("%s is already locked" % self.path) time.sleep(timeout / 10 if timeout is not None else 0.1) else: # Link creation succeeded. We're good to go. return
def makelink(self, tarinfo, targetpath): """Make a (symbolic) link called targetpath. If it cannot be created (platform limitation), we try to make a copy of the referenced file instead of a link. """ try: # For systems that support symbolic and hard links. if tarinfo.issym(): os.symlink(tarinfo.linkname, targetpath) else: # See extract(). if os.path.exists(tarinfo._link_target): os.link(tarinfo._link_target, targetpath) else: self._extract_member(self._find_link_target(tarinfo), targetpath) except symlink_exception: if tarinfo.issym(): linkpath = os.path.join(os.path.dirname(tarinfo.name), tarinfo.linkname) else: linkpath = tarinfo.linkname else: try: self._extract_member(self._find_link_target(tarinfo), targetpath) except KeyError: raise ExtractError("unable to resolve link inside archive")
def move(src, dst): """Recursively move a file or directory to another location. This is similar to the Unix "mv" command. If the destination is a directory or a symlink to a directory, the source is moved inside the directory. The destination path must not already exist. If the destination already exists but is not a directory, it may be overwritten depending on os.rename() semantics. If the destination is on our current filesystem, then rename() is used. Otherwise, src is copied to the destination and then removed. A lot more could be done here... A look at a mv.c shows a lot of the issues this implementation glosses over. """ real_dst = dst if os.path.isdir(dst): if _samefile(src, dst): # We might be on a case insensitive filesystem, # perform the rename anyway. os.rename(src, dst) return real_dst = os.path.join(dst, _basename(src)) if os.path.exists(real_dst): raise Error("Destination path '%s' already exists" % real_dst) try: os.rename(src, real_dst) except OSError: if os.path.isdir(src): if _destinsrc(src, dst): raise Error("Cannot move a directory '%s' into itself '%s'." % (src, dst)) copytree(src, real_dst, symlinks=True) rmtree(src) else: copy2(src, real_dst) os.unlink(src)
def copyfile(src, dst, *, follow_symlinks=True): """Copy data from src to dst. If follow_symlinks is not set and src is a symbolic link, a new symlink will be created instead of copying the file it points to. """ if _samefile(src, dst): raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) for fn in [src, dst]: try: st = os.stat(fn) except OSError: # File most likely does not exist pass else: # XXX What about other special files? (sockets, devices...) if stat.S_ISFIFO(st.st_mode): raise SpecialFileError("`%s` is a named pipe" % fn) if not follow_symlinks and os.path.islink(src): os.symlink(os.readlink(src), dst) else: with open(src, 'rb') as fsrc: with open(dst, 'wb') as fdst: copyfileobj(fsrc, fdst) return dst
def test_ValgrindOption(self): # Make sure that valgrind exists and is in the path valgrind = scatest.which('valgrind') if not valgrind: raise RuntimeError('Valgrind is not installed') # Let the device manager find valgrind on the path self._test_Valgrind('') # Set an explicit path to valgrind, using a symbolic link to a non-path # location as an additional check altpath = os.path.join(scatest.getSdrPath(), 'valgrind') os.symlink(valgrind, altpath) # patch for ubuntu valgrind script ub_patch=False try: if 'UBUNTU' in platform.linux_distribution()[0].upper(): ub_patch=True valgrind_bin = scatest.which('valgrind.bin') os.symlink(valgrind_bin, altpath+'.bin') except: pass try: self._test_Valgrind(altpath) finally: os.unlink(altpath) if ub_patch: os.unlink(altpath+'.bin')
def updateLink(source, target): if os.path.islink(target): # Remove old, possibly stale link. os.unlink(target) if not os.path.exists(target): # Do not replace existing files. os.symlink(source, target)
def symlink(self, source: str, dest: str, **kwargs): os.symlink(self.absolute(source), self.absolute(dest), **kwargs)
def relative_symlink(src, dst, force=False): """ Create a symbolic link in any directory. force -- if True, then overwrite an existing file/symlink """ if force: try: os.remove(dst) except FileNotFoundError: pass target = os.path.relpath(os.path.abspath(src), start=os.path.dirname(dst)) os.symlink(target, dst)
def move(src, dst): """Recursively move a file or directory to another location. This is similar to the Unix "mv" command. If the destination is a directory or a symlink to a directory, the source is moved inside the directory. The destination path must not already exist. If the destination already exists but is not a directory, it may be overwritten depending on os.rename() semantics. If the destination is on our current filesystem, then rename() is used. Otherwise, src is copied to the destination and then removed. A lot more could be done here... A look at a mv.c shows a lot of the issues this implementation glosses over. """ real_dst = dst if os.path.isdir(dst): if _samefile(src, dst): # We might be on a case insensitive filesystem, # perform the rename anyway. os.rename(src, dst) return real_dst = os.path.join(dst, _basename(src)) if os.path.exists(real_dst): raise Error, "Destination path '%s' already exists" % real_dst try: os.rename(src, real_dst) except OSError: if os.path.isdir(src): if _destinsrc(src, dst): raise Error, "Cannot move a directory '%s' into itself '%s'." % (src, dst) copytree(src, real_dst, symlinks=True) rmtree(src) else: copy2(src, real_dst) os.unlink(src)