Python fabric.api 模块,quiet() 实例源码

我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用fabric.api.quiet()

项目:ansible-role    作者:mattvonrocketstein    | 项目源码 | 文件源码
def vulture():
    """ try to find dead code paths """
    with api.quiet():
        if not api.local('which vulture').succeeded:
            print 'vulture not found, installing it'
            api.local('pip install vulture')
    ignore_functions_grep = 'egrep -v "{0}"'.format(
        '|'.join(VULTURE_IGNORE_FUNCTIONS))
    excluded = ",".join(VULTURE_EXCLUDE_PATHS)
    excluded_paths = (' --exclude ' + excluded) if excluded else ''
    vulture_cmd = '\n  vulture {pkg_name}{exclude}{pipes}'
    vulture_cmd = vulture_cmd.format(
        pkg_name=PKG_NAME,
        exclude=excluded_paths,
        pipes='|'.join(['', ignore_functions_grep]))
    changedir = api.lcd(os.path.dirname(__file__))
    warn_only = api.settings(warn_only=True)
    be_quit = api.hide('warnings')
    with contextlib.nested(changedir, warn_only, be_quit):
        result = api.local(vulture_cmd, capture=True)
        exit_code = result.return_code
    print result.strip()
    raise SystemExit(exit_code)
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def list_platforms(root_dir):
    """
    ???????????????
    """
    def is_platform(dir):
        """
        ????version.lua???????????????????
        """
        with quiet():
            return run('test -f "{}/{}/version.lua"'.format(root_dir, dir)).succeeded

    with cd(root_dir), hide('stdout'):
        result = run('''find ./ -mindepth 1 -maxdepth 1 -type d -print |grep --color=never -vE '([0-9]+(\.[0-9]+){3}\\b)|(lyServers)' ''')  
    dirs = [each.lstrip('./') for each in result.splitlines()]

    return [each for each in dirs if is_platform(each)]
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def fetch_and_inflat_package(template_matchServer_ip, remote_dir, matchServer):
    mk_remote_dir(remote_dir)
    with cd(remote_dir):
        #wget = 'wget -c -t 10 -T 10 -q'
        #server_name = 'match_download_{}'.format(TIME)
        #run('''{} --header="Host:{}" http://{}/package.tgz'''.format(wget, server_name, template_matchServer_ip))
        run('tar zxf package.tgz')
        with quiet():
            nginx_conf_exsits = run('test -f /app/nginx/conf/vhost/{}.conf'.format(matchServer)).succeeded
        if nginx_conf_exsits:
            run('mkdir backup')
            run('mv /app/nginx/conf/vhost/{}.conf backup/'.format(matchServer))
        run('cp nginx_matchServer.conf /app/nginx/conf/vhost/{}.conf'.format(matchServer))
        run('''pandora --update -e 'create database {}' '''.format(matchServer))
        run('''pandora --update {} <matchServer_init.sql'''.format(matchServer))
        run('''mkdir -p /app/{}'''.format(matchServer))
        run('''tar zxf matchServer.tgz -C /app/{}'''.format(matchServer))
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def update_backend(gameServer, version, mainland=True):
    backup_dir = '/app/opbak/{}/{}'.format(TIME, gameServer)
    run(''' [ -d {0} ] || mkdir -p {0} '''.format(backup_dir))
    with cd('/app/{}/backend/apps'.format(gameServer)):
        for conf_file in ['app.properties', 'plugins.xml']:
            #check if the config exists
            with quiet():
                conf_exists = run('test -f {}'.format(conf_file)).succeeded

            if conf_exists:
                run('cp {} {}/'.format(conf_file, backup_dir))
                if mainland:
                    cmd = ''' sed -i '/http:\/\/.*\/%s/s/%stest_[0-9]\{1,3\}-[0-9]\{1,3\}-[0-9]\{1,3\}/%s/g' %s ''' % (GAME, GAME, version, conf_file)
                else:
                    cmd = ''' sed -i '/http:\/\/.*\/%s/s/%stest_[a-z]\{2,5\}_[0-9]\{1,3\}-[0-9]\{1,3\}-[0-9]\{1,3\}/%s/g' %s ''' % (GAME, GAME, version, conf_file)

                run(cmd)
项目:djangocon-2016-demo    作者:hadjango    | 项目源码 | 文件源码
def ip():
    """The ip where the addons site can be accessed"""
    docker_host = os.environ.get("DOCKER_HOST") or ""
    if not docker_host:
        with quiet():
            docker_env = local("docker-machine env addons", capture=True)
        if docker_env:
            match = re.search(r'DOCKER_HOST="(tcp://[^"]+?)"', docker_env)
            if match:
                docker_host = match.group(1)

    match = re.search(r'tcp://([^:]+):', docker_host)
    if match:
        print(match.group(1))
    else:
        try:
            # host used by dlite
            _, _, ips = socket.gethostbyname_ex("local.docker")
        except:
            abort("Could not determine docker-machine host; perhaps localhost?")
        else:
            print(ips[0])
项目:dprr-django    作者:kingsdigitallab    | 项目源码 | 文件源码
def create_virtualenv():
    require('srvr', 'path', 'within_virtualenv', provided_by=env.servers)
    with quiet():
        env_vpath = os.path.join(env.envs_path, 'dprr-' + env.srvr)
        if run('ls {}'.format(env_vpath)).succeeded:
            print(
                green('virtual environment at [{}] exists'.format(env_vpath)))
            return

    print(yellow('setting up virtual environment in [{}]'.format(env_vpath)))
    run('virtualenv {}'.format(env_vpath))
项目:dprr-django    作者:kingsdigitallab    | 项目源码 | 文件源码
def clone_repo():
    require('srvr', 'path', 'within_virtualenv', provided_by=env.servers)
    with quiet():
        if run('ls {}'.format(os.path.join(env.path, '.git'))).succeeded:
            print(green(('repository at'
                         ' [{}] exists').format(env.path)))
            return

    print(yellow('cloning repository to [{}]'.format(env.path)))
    run('git clone --recursive {} {}'.format(REPOSITORY, env.path))
项目:ottertune    作者:cmu-db    | 项目源码 | 文件源码
def status_rabbitmq():
    with settings(warn_only=True), quiet():
        res = local(RABBITMQ_CMD(action='status'), capture=True)
    if res.return_code == 2 or res.return_code == 69:
        status = STATUS.STOPPED
    elif res.return_code == 0:
        status = STATUS.RUNNING
    else:
        raise Exception("Rabbitmq: unknown status " + str(res.return_code))
    print status
    print_status(status, 'rabbitmq')
    return status
项目:ansible-role    作者:mattvonrocketstein    | 项目源码 | 文件源码
def git_branch():
    """ returns (branch-name, hash) """
    with api.quiet():
        cmd = 'git rev-parse --abbrev-ref HEAD'
        current_branch = api.local(cmd, capture=True)
        cmd = 'git rev-parse HEAD'
        current_hash = api.local(cmd, capture=True)
    return current_hash, current_branch
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def _local_resolver(domain):
    with quiet():
        ret_value = local('''nslookup %s |grep "^Address" |grep -v '#'|awk '{print $2}' ''' % domain, capture=True)
    if ret_value:
        return ret_value
    else:
        raise Exception('Fail to resolve {} to ip address'.format(domain))
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def remote_dir_exists(dir):
    with quiet():
        dir_exists = run('test -d {}'.format(dir)).succeeded
    return dir_exists
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def file_exists_check(file):
    with quiet():
        ret = run('test -f {}'.format(file)).succeeded
    return ret
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def __init__(self, filename, remote_dir=REMOTE_DIR):
        self.filename = filename

        with quiet():
            has_the_file = run('test -f {}'.format(filename)).succeeded

        if not has_the_file:
            raise Exception('File {} NOT exists under backend/apps/'.format(filename))

        self.dir = run('pwd')
        gameServer = self.dir.split('/')[2]
        backup_dir = '{}/{}'.format(remote_dir, gameServer)
        run('[ -d {0} ] || mkdir -p {0}'.format(backup_dir))
        run('cp {} {}/'.format(self.filename, backup_dir))
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def has_the_key(self, key):
        with quiet():
            _has_the_key = run('grep --color=never "^{} *=" {}'.format(key, self.filename)).succeeded
        return _has_the_key
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def remote_file_exists(file):
    with quiet():
        return run('test -f "{}"'.format(file)).succeeded
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def remote_dir_exists(file):
    with quiet():
        return run('test -d "{}"'.format(file)).succeeded
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def __init__(self, filename, remote_dir=REMOTE_DIR):
        self.filename = filename
        self.dir = run('pwd')

        with quiet():
            has_the_file = run('test -f {}'.format(filename)).succeeded

        if not has_the_file:
            raise Exception('File {}/{} NOT exists'.format(self.dir, filename))

        tmp_tag = self.filename.split('/')[0]
        backup_dir = '{}/{}'.format(remote_dir, tmp_tag)
        run('[ -d {0} ] || mkdir -p {0}'.format(backup_dir))
        run('cp {} {}/'.format(self.filename, backup_dir))
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def check_file(file):
    pattern = r'^/.*/hotswap.zip$'
    file_with_full_path = '/app/online/{}{}'.format(GAME, file)
    file_path = os.path.dirname(file_with_full_path)

    pattern_matched = (re.match(pattern, file), 'Hotswap filename should be hotswap.zip')
    with quiet():
        file_exists = (local('test -f {}'.format(file_with_full_path)).succeeded, '{} does NOT exists on FTP, please check'.format(file))
        md5_exists = (local('test -f {}/md5.txt'.format(file_path)).succeeded, 'md5.txt does NOT exists on FTP, please check')

    for each_check in [pattern_matched, file_exists, md5_exists]:
        if each_check[0]:
            pass
        else:
            raise Exception(each_check[1])
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def remote_file_exists(file):
    with quiet():
        return run('test -f "{}"'.format(file)).succeeded
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def remote_dir_exists(file):
    with quiet():
        return run('test -d "{}"'.format(file)).succeeded
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def print_help():

    with quiet():

        @hosts(DNS_SERVER)
        def _dnsapi():
            result = run('/app/opbin/dns/dnsapi -h')
            print(result)

        execute(_dnsapi)
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def remote_file_exists(file):
    with quiet():
        return run('test -f "{}"'.format(file)).succeeded
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def remote_dir_exists(file):
    with quiet():
        return run('test -d "{}"'.format(file)).succeeded
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def remote_dir_exists(dir):
    with quiet():
        dir_exists = run('test -d {}'.format(dir)).succeeded
    return dir_exists
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def matchServer_exists(matchServer, ip):
    with quiet():
        exists = local('''grep "\\b{}\\b" /etc/hosts '''.format(matchServer)).succeeded
    if exists:
        raise Exception('''The match server {} already exists in /etc/hosts'''.format(matchServer))
    else:
        matchServer_dir_exists = execute(remote_dir_exists, '/app/{}'.format(matchServer), hosts=[ip])[ip]
        if matchServer_dir_exists:
            raise Exception('''The match dir: /app/{} already exists on {}'''.format(matchServer, ip))

#def create_nginx_conf(remote_dir):
#    server_name = 'match_download_{}'.format(TIME)
#    conf_name = 'download_{}.conf'.format(TIME)
#    with cd('/app/nginx/conf/vhost'):
#        run('''echo -e "server {\\n    listen       80;\\n    server_name  %s;\\n    root %s;\\n    index Main.html;\\n    access_log  logs/default.access.log  main;\\n    location / {\\n        expires 0;\\n    }\\n\\n    error_page  404 500 502 503 504  /404.html;\\n}" >%s''' % (server_name, remote_dir, conf_name))
项目:astoptool    作者:zouliuyun    | 项目源码 | 文件源码
def load_file(game_servers, local_file, remote_file, load_type='upload'):
    test_server_info = get_test_server_info(GAME)
    check_game_servers(game_servers, test_server_info)

    locate_game_servers = transform_gameservers(game_servers, test_server_info)

    ips = locate_game_servers.keys()

    @hosts(ips)
    def _upload_file():
        upload(local_file, REMOTE_DIR, env.host_string)
        for game_server in locate_game_servers[env.host_string]:
            replace_file(game_server, remote_file)

    @hosts(ips)
    def _download_file():
        for game_server in locate_game_servers[env.host_string]:
            local_path = '{}/{}/'.format(local_root_path, game_server)
            local('su - astd -c "mkdir -p {}"'.format(local_path))
            target_file = '/app/{}/{}'.format(game_server, remote_file)
            with quiet():
                target_file_exists = run('test -f {}'.format(target_file)).succeeded

            if target_file_exists:
                get(target_file, local_path)
            else:
                raise Exception('File {} NOT exists on {}'.format(target_file, game_server))

        local('chown -R astd.astd {}'.format(local_root_path))

    if load_type == 'upload':
        ftp_file_check(local_file)
        file_name_consistence_check(local_file, remote_file)
        execute(_upload_file)
        print('{} was uploaded to {} successfully.'.format(local_file, game_servers))

    elif load_type == 'download':
        ftp_path = 'download/{}/{}'.format(GAME, TIMESTAMP)
        local_root_path = '/app/online/{}'.format(ftp_path)
        execute(_download_file)
        print('Downloaded remote file: {} to FTP: {}/'.format(remote_file, ftp_path))
项目:clusterdock    作者:cloudera    | 项目源码 | 文件源码
def get_clusterdock_container_id():
    """Returns the container ID of the Docker container running clusterdock.
    """
    with quiet():
        for cgroup in local('cat /proc/self/cgroup', capture=True).stdout.split('\n'):
            if 'docker' in cgroup:
                return cgroup.rsplit('/')[-1]

        # If we get through the loop and never find the cgroup, something has gone very wrong.
        raise ContainerNotFoundException('Could not find container name from /proc/self/cgroup.')
项目:clusterdock    作者:cloudera    | 项目源码 | 文件源码
def _get_images():
    return client.images(quiet=True)
项目:clusterdock    作者:cloudera    | 项目源码 | 文件源码
def _get_running_containers():
    return client.containers(all=False, quiet=True)