Python sh 模块,rm() 实例源码

我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用sh.rm()

项目:tools    作者:apertoso    | 项目源码 | 文件源码
def restore_attachments(self, zipfile, docker=False):
        unzip = sh.unzip.bake('-x', '-qq', '-n')
        restore_folder = os.path.join(self.data_dir,
                                      'filestore',
                                      self.target_db)
        sh.mkdir('-p', restore_folder)
        # unzip will place files are in <datadir>/filestore/<dbname>/filestore,
        # we create a symlink to <datadir>/filestore/<dbname> so they wind up
        # in the right spot
        restore_folder_faulty = os.path.join(restore_folder, 'filestore')
        sh.ln('-s', restore_folder, restore_folder_faulty)
        unzip(zipfile, 'filestore/*', '-d', restore_folder)
        # cleanup the symlink
        sh.rm(restore_folder_faulty)
        # When running in docker mode, change permissions
        if docker:
            sh.chown('-R', '999:999', self.data_dir)
项目:lbry-android    作者:lbryio    | 项目源码 | 文件源码
def prepare_build_dir(self):
        '''Ensure that a build dir exists for the recipe. This same single
        dir will be used for building all different archs.'''
        self.build_dir = self.get_build_dir()
        shprint(sh.cp, '-r',
                join(self.bootstrap_dir, 'build'),
                self.build_dir)
        if self.ctx.symlink_java_src:
            info('Symlinking java src instead of copying')
            shprint(sh.rm, '-r', join(self.build_dir, 'src'))
            shprint(sh.mkdir, join(self.build_dir, 'src'))
            for dirn in listdir(join(self.bootstrap_dir, 'build', 'src')):
                shprint(sh.ln, '-s', join(self.bootstrap_dir, 'build', 'src', dirn),
                        join(self.build_dir, 'src'))
        with current_directory(self.build_dir):
            with open('project.properties', 'w') as fileh:
                fileh.write('target=android-{}'.format(self.ctx.android_api))
项目:ipbb    作者:ipbus    | 项目源码 | 文件源码
def cleanup(env):

    _, lSubdirs, lFiles =  next(os.walk(env.projectPath))
    lFiles.remove( kProjAreaCfgFile )


    if not click.confirm("All files in {} will be deleted. Do you want to continue?".format( env.projectPath )):
        return

    print (lSubdirs, lFiles)
    if lSubdirs:
        sh.rm('-rv', *lSubdirs, _out=sys.stdout)

    if lFiles:
        sh.rm('-v', *lFiles, _out=sys.stdout)
# ------------------------------------------------------------------------------


# ------------------------------------------------------------------------------
项目:tools    作者:apertoso    | 项目源码 | 文件源码
def restore_db_docker(self, zipfile):
        command = ''' /usr/bin/7z x -so {} dump.sql | ''' \
                  ''' docker run -i --rm --link {}:{} postgres:{} ''' \
                  ''' /bin/bash -c 'echo "{}:5432:*:{}:{}" ''' \
                  ''' > ~/.pgpass; chmod 600 ~/.pgpass; ''' \
                  ''' /usr/lib/postgresql/{}/bin/psql ''' \
                  ''' -q -h {} -U {} {} > /dev/null' ''' \
                  .format(zipfile, self.db_host, self.db_host, self.pgversion,
                          self.db_host, self.db_user, self.db_password,
                          self.pgversion, self.db_host, self.db_user,
                          self.target_db
                          )
        subprocess.check_call(command, shell=True)
项目:rpl-attacks    作者:dhondta    | 项目源码 | 文件源码
def remove_files(path, *files):
    """
    This helper function is aimed to remove specified files. If a file does not exist,
     it fails silently.

    :param path: absolute or relative source path
    :param files: filenames of files to be removed
    """
    path = __expand_folders(path)
    for file in files:
        try:
            sh.rm(join(path, file))
        except sh.ErrorReturnCode_1:
            pass
项目:rpl-attacks    作者:dhondta    | 项目源码 | 文件源码
def remove_folder(path):
    """
    This helper function is aimed to remove an entire folder. If the folder does not exist,
     it fails silently.

    :param path: absolute or relative source path
    """
    path = __expand_folders(path)
    try:
        sh.rm('-r', path)
    except sh.ErrorReturnCode_1:
        pass
项目:qsctl    作者:yunify    | 项目源码 | 文件源码
def step_impl(context):
    for row in context.input:
        assert_that(bucket.head_object(row["name"]).status_code
                    ).is_equal_to(404)
        assert_that(os.path.isfile("tmp/" + row["name"])).is_equal_to(True)

    sh.rm("-rf", "tmp").wait()
项目:qsctl    作者:yunify    | 项目源码 | 文件源码
def step_impl(context):
    resp = bucket.list_objects()
    assert_that(sorted([i["key"] for i in resp["keys"]])
                ).is_equal_to(sorted([row["name"] for row in context.table]))

    sh.rm("-rf", "tmp")
项目:qsctl    作者:yunify    | 项目源码 | 文件源码
def step_impl(context):
    output = sh.ls("tmp").stdout.decode("utf-8")
    ok = True
    for row in context.table:
        if row["name"] not in output:
            ok = False
            break
    assert_that(ok).is_equal_to(True)

    sh.rm("-rf", "tmp")
项目:qsctl    作者:yunify    | 项目源码 | 文件源码
def step_impl(context):
    output = sh.ls("tmp").stdout.decode("utf-8")
    ok = True
    for row in context.table:
        if row["name"] not in output:
            ok = False
            break
    assert_that(ok).is_equal_to(True)

    sh.rm("-rf", "tmp")

    for row in context.table:
        bucket.delete_object(row["name"])
项目:qsctl    作者:yunify    | 项目源码 | 文件源码
def step_impl(context):
    for row in context.table:
        assert_that(bucket.head_object(row["name"]).status_code
                    ).is_equal_to(200)

    sh.rm("-rf", "tmp").wait()
项目:lbry-android    作者:lbryio    | 项目源码 | 文件源码
def build_arch(self, arch):
        env = self.get_recipe_env(arch)
        with current_directory(join(self.get_build_dir(arch.arch), 'libmysqlclient')):
            shprint(sh.cp, '-t', '.', join(self.get_recipe_dir(), 'p4a.cmake'))
            # shprint(sh.mkdir, 'Platform')
            # shprint(sh.cp, '-t', 'Platform', join(self.get_recipe_dir(), 'Linux.cmake'))
            shprint(sh.rm, '-f', 'CMakeCache.txt')
            shprint(sh.cmake, '-G', 'Unix Makefiles',
                    # '-DCMAKE_MODULE_PATH=' + join(self.get_build_dir(arch.arch), 'libmysqlclient'),
                    '-DCMAKE_INSTALL_PREFIX=./install',
                    '-DCMAKE_TOOLCHAIN_FILE=p4a.cmake', _env=env)
            shprint(sh.make, _env=env)

            self.install_libs(arch, join('libmysql', 'libmysql.so'))

    # def get_recipe_env(self, arch=None):
    #   env = super(LibmysqlclientRecipe, self).get_recipe_env(arch)
    #   env['WITHOUT_SERVER'] = 'ON'
    #   ncurses = self.get_recipe('ncurses', self)
    #   # env['CFLAGS'] += ' -I' + join(ncurses.get_build_dir(arch.arch),
    #   #                               'include')
    #   env['CURSES_LIBRARY'] = join(self.ctx.get_libs_dir(arch.arch), 'libncurses.so')
    #   env['CURSES_INCLUDE_PATH'] = join(ncurses.get_build_dir(arch.arch),
    #                                     'include')
    #   return env
    # 
    # def build_arch(self, arch):
    #   env = self.get_recipe_env(arch)
    #   with current_directory(self.get_build_dir(arch.arch)):
    #       # configure = sh.Command('./configure')
    #       # TODO: should add openssl as an optional dep and compile support
    #       # shprint(configure, '--enable-shared', '--enable-assembler',
    #       #         '--enable-thread-safe-client', '--with-innodb',
    #       #         '--without-server', _env=env)
    #       # shprint(sh.make, _env=env)
    #       shprint(sh.cmake, '.', '-DCURSES_LIBRARY=' + env['CURSES_LIBRARY'],
    #               '-DCURSES_INCLUDE_PATH=' + env['CURSES_INCLUDE_PATH'], _env=env)
    #       shprint(sh.make, _env=env)
    # 
    #       self.install_libs(arch, 'libmysqlclient.so')
项目:lbry-android    作者:lbryio    | 项目源码 | 文件源码
def prebuild_arch(self, arch):
        super(ZopeInterfaceRecipe, self).prebuild_arch(arch)
        with current_directory(self.get_build_dir(arch.arch)):
            sh.rm('-rf', 'src/zope/interface/tests', 'src/zope/interface/common/tests')
项目:lbry-android    作者:lbryio    | 项目源码 | 文件源码
def build_arch(self, arch):
        env = self.get_recipe_env(arch)
        with current_directory(self.get_build_dir(arch.arch)):
            # Remove source in this pypi package
            sh.rm('-rf', 'leveldb', 'leveldb.egg-info', 'snappy')
            # Use source from leveldb recipe
            sh.ln('-s', self.get_recipe('leveldb', self.ctx).get_build_dir(arch.arch), 'leveldb')
        # Build and install python bindings
        super(PyLevelDBRecipe, self).build_arch(arch)
项目:lbry-android    作者:lbryio    | 项目源码 | 文件源码
def prebuild_arch(self, arch):
        super(PygameJNIComponentsRecipe, self).postbuild_arch(arch)

        info('Unpacking pygame bootstrap JNI dir components')
        with current_directory(self.get_build_container_dir(arch)):
            if exists('sdl'):
                info('sdl dir exists, so it looks like the JNI components' +
                     'are already unpacked. Skipping.')
                return
            for dirn in glob.glob(join(self.get_build_dir(arch),
                                       'pygame_bootstrap_jni', '*')):
                shprint(sh.mv, dirn, './')
        info('Unpacking was successful, deleting original container dir')
        shprint(sh.rm, '-rf', self.get_build_dir(arch))
项目:lbry-android    作者:lbryio    | 项目源码 | 文件源码
def fry_eggs(self, sitepackages):
        info('Frying eggs in {}'.format(sitepackages))
        for d in listdir(sitepackages):
            rd = join(sitepackages, d)
            if isdir(rd) and d.endswith('.egg'):
                info('  ' + d)
                files = [join(rd, f) for f in listdir(rd) if f != 'EGG-INFO']
                if files:
                    shprint(sh.mv, '-t', sitepackages, *files)
                shprint(sh.rm, '-rf', d)
项目:rpl-attacks    作者:dhondta    | 项目源码 | 文件源码
def replace_in_file(path, replacements):
    """
    This helper function performs a line replacement in the file located at 'path'.

    :param path: path to the file to be altered
    :param replacements: list of string pairs formatted as [old_line_pattern, new_line_replacement]
    """
    tmp = path + '.tmp'
    if isinstance(replacements[0], string_types):
        replacements = [replacements]
    regexs = []
    for replacement in replacements:
        try:
            regex = re.compile(replacement[0])
        except re.error:
            regex = None
        regexs.append(regex)
    with open(tmp, 'w+') as nf:
        with open(path) as of:
            for line in of.readlines():
                skip = False
                for replacement, regex in zip(replacements, regexs):
                    # try a simple string match
                    if replacement[0] in line:
                        if replacement[1] in (None, ''):
                            skip = True
                        else:
                            line = line.replace(replacement[0], replacement[1])
                        break
                    # then try a regex match
                    else:
                        if regex is not None:
                            match = regex.search(line)
                            if match is not None:
                                if replacement[1] in (None, ''):
                                    skip = True
                                try:
                                    line = line.replace(match.groups(0)[0], replacement[1])
                                except IndexError:
                                    line = line.replace(match.group(), replacement[1])
                                break
                if not skip:
                    nf.write(line)
    sh.rm(path)
    sh.mv(tmp, path)


# **************************************** JSON-RELATED HELPER *****************************************
项目:lbry-android    作者:lbryio    | 项目源码 | 文件源码
def download(self):
        if self.url is None:
            info('Skipping {} download as no URL is set'.format(self.name))
            return

        url = self.versioned_url

        shprint(sh.mkdir, '-p', join(self.ctx.packages_path, self.name))

        with current_directory(join(self.ctx.packages_path, self.name)):
            filename = shprint(sh.basename, url).stdout[:-1].decode('utf-8')

            do_download = True

            marker_filename = '.mark-{}'.format(filename)
            if exists(filename) and isfile(filename):
                if not exists(marker_filename):
                    shprint(sh.rm, filename)
                elif self.md5sum:
                    current_md5 = md5sum(filename)
                    if current_md5 == self.md5sum:
                        debug('Checked md5sum: downloaded expected content!')
                        do_download = False
                    else:
                        info('Downloaded unexpected content...')
                        debug('* Generated md5sum: {}'.format(current_md5))
                        debug('* Expected md5sum: {}'.format(self.md5sum))

                else:
                    do_download = False
                    info('{} download already cached, skipping'
                         .format(self.name))

            # If we got this far, we will download
            if do_download:
                debug('Downloading {} from {}'.format(self.name, url))

                shprint(sh.rm, '-f', marker_filename)
                self.download_file(url, filename)
                shprint(sh.touch, marker_filename)

                if exists(filename) and isfile(filename) and self.md5sum:
                    current_md5 = md5sum(filename)
                    if self.md5sum is not None:
                        if current_md5 == self.md5sum:
                            debug('Checked md5sum: downloaded expected content!')
                        else:
                            info('Downloaded unexpected content...')
                            debug('* Generated md5sum: {}'.format(current_md5))
                            debug('* Expected md5sum: {}'.format(self.md5sum))
                            exit(1)
项目:ipbb    作者:ipbus    | 项目源码 | 文件源码
def fli(env, dev, ipbuspkg):

    # -------------------------------------------------------------------------
    # Must be in a build area
    if env.project is None:
        raise click.ClickException(
            'Project area not defined. Move into a project area and try again')

    if env.projectConfig['toolset'] != 'sim':
        raise click.ClickException(
            "Work area toolset mismatch. Expected 'sim', found '%s'" % env.projectConfig['toolset'])
    # -------------------------------------------------------------------------

    # -------------------------------------------------------------------------
    if not which('vsim'):
        raise click.ClickException(
            "ModelSim is not available. Have you sourced the environment script?")
    # -------------------------------------------------------------------------

    # -------------------------------------------------------------------------
    if ipbuspkg not in env.getSources():
        raise click.ClickException(
            "Package %s not found in source/. The FLI cannot be built." % ipbuspkg)
    # -------------------------------------------------------------------------

    # Set ModelSim root based on vsim's path
    os.environ['MODELSIM_ROOT'] = (dirname(dirname(which('vsim'))))
    # Apply set
    # os.environ['MTI_VCO_MODE']='64'

    lFliSrc = join(env.src, ipbuspkg, 'components', 'ipbus_eth',
                   'firmware', 'sim', 'modelsim_fli')

    import sh
    # Clean-up
    sh.rm('-rf', 'modelsim_fli', 'mac_fli.so', _out=sys.stdout)
    # Copy
    sh.cp('-a', lFliSrc, './', _out=sys.stdout)
    # Make
    sh.make('-C', 'modelsim_fli', 'TAP_DEV={0}'.format(dev), _out=sys.stdout)
    # Link
    sh.ln('-s', 'modelsim_fli/mac_fli.so', '.', _out=sys.stdout)

# ------------------------------------------------------------------------------


# ------------------------------------------------------------------------------