我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用fabric.api.env.host_string()。
def get_node_state(node): status = {} if node.get('status') == NODE_STATUSES.pending: status[NODE_STATUSES.pending] = False return status hostname = node['hostname'] status[NODE_STATUSES.running] = node.get('status') == NODE_STATUSES.running env.host_string = hostname try: status['ntp'] = False status['services'] = False # AC-3105 Fix. Check if master can connect to node via ssh. if can_ssh_to_host(hostname): rv = run('ntpstat', quiet=True, timeout=SSH_TIMEOUT) if rv.succeeded: status['ntp'] = True status['ssh'] = True stopped = get_stopped_services(node_services, local=False) status['services'] = stopped if stopped else True status['disk'] = check_disk_space(local=False) else: status['ssh'] = False except (NetworkError, CommandTimeout): status['ssh'] = False return status
def set_env(config, version_tag=None): """ Fabric environmental variable setup """ # Bug: when setting this inside a function. Using host_string as workaround config_dict = get_config(config) env.hosts = [config_dict['HOST_NAME'], ] env.host_string = config_dict['HOST_NAME'] env.project_name = config_dict['PROJECT_NAME'] env.project_dir = posixpath.join('/srv/images/', env.project_name) env.use_ssh_config = True env.image_name = config_dict['IMAGE'].split(':')[0] env.base_image_name = env.image_name + '_base' env.version_tag = version_tag env.build_dir = '/srv/build' env.local_path = os.path.dirname(__file__)
def runs_on(target): """ A decorator that picks the correct target server from the inventory file. Can be called with either target = 'hadoop_master' or 'spark_master' (which can be different machines) """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): if target == 'hadoop_master': env.host_string = hadoop_master elif target == 'spark_master': env.host_string = spark_master else: raise ValueError('Unhandled target %d' % target) func(*args, **kwargs) return wrapper return decorator
def upgrade_kernel_and_grub(do_reboot=False, log=True): """ updates the kernel and the grub config """ print(env.host_string) if log: log_yellow('upgrading kernel') with settings(hide('running', 'stdout')): sudo('unset UCF_FORCE_CONFFOLD; ' 'export UCF_FORCE_CONFFNEW=YES; ' 'ucf --purge /boot/grub/menu.lst; ' 'export DEBIAN_FRONTEND=noninteractive ; ' 'apt-get update; ' 'apt-get -o Dpkg::Options::="--force-confnew" --force-yes -fuy ' 'dist-upgrade') with settings(warn_only=True): if do_reboot: if log: log_yellow('rebooting host') reboot()
def tcp_port(self): """ The tcp port used for the game server. Will try to get only once and save to self._tcp_port for later use. """ def get_tcp_port(): cmd = '''grep 'name="port" type="int"' conf.xml |awk -F[\<\>] '{print $3}' ''' with settings(host_string=self.int_ip), cd('/app/{}/backend/apps'.format(self.name)): result = run(cmd) lines = result.splitlines() if len(lines) == 1: return int(lines[0]) else: raise Exception("Can't get tcp port using cmd: {}".format(cmd)) if not self._tcp_port: self._tcp_port = get_tcp_port() return self._tcp_port
def dns(self): """ The dns for the game server. Will try to get only once and save to self._dns for later use. """ def get_dns(): cmd = '''grep server_name %s.conf | awk '{print $2}' | tr -d ";" ''' % self.name with settings(host_string=self.int_ip), cd('/app/nginx/conf/vhost'.format(self.name)): result = run(cmd) lines = result.splitlines() if len(lines) == 1: return lines[0] else: raise Exception("Can't get dns using cmd: {}".format(cmd)) if not self._dns: self._dns = get_dns() return self._dns
def bundle_merge(file): merge_details = parse_merge_list(file) print(merge_details) target_servers = merge_details.keys() print(target_servers) @parallel(pool_size=2) #??????????? def _bundle_merge(): target_server = GameServer(env.host_string) source_server_info_list = merge_details[target_server] for each_server, each_sequence in source_server_info_list[:-1]: each_source_server = GameServer(each_server) single_merge(each_source_server, target_server, each_sequence) #??????????????????????????????????? #??: ???37wan_8,37wan_9,37wan_10??37wan_7???37wan_10??????????????????37wan_7? last_source_server, last_sequence = source_server_info_list[-1] single_merge(last_source_server, target_server, last_sequence, restart='yes') execute(_bundle_merge, hosts=target_servers) print('Done!')
def install(host_config): env.host_string = helper.get_env_host_string(host_config) env.user = helper.get_env_user(host_config) env.key_filename = helper.get_env_key_filename(host_config) software_config = helper.get_software_config(host_config, 'redis') redis_version = software_config.get('version', '3.2.6') redis_port = software_config.get('port', '6379') redis_data_dir = software_config.get('data-directory', '/var/lib/redis') machine.disable_transparent_huge_pages(env.host_string) machine.set_overcommit_memory(env.host_string, 1) put('{}/software/scripts/redis.sh'.format(os.getcwd()), '~/', use_sudo=True) sudo("chmod +x redis.sh") sudo(". ~/redis.sh {} {} {}".format(redis_version, redis_port, redis_data_dir))
def install(host_config): env.host_string = helper.get_env_host_string(host_config) env.user = helper.get_env_user(host_config) env.key_filename = helper.get_env_key_filename(host_config) software_config = helper.get_software_config(host_config, 'zookeeper') java.v8_install(host_config) port = software_config.get('port', '2181') zk_server_id = software_config.get('id', '0') zk_nodes = ",".join(software_config.get('nodes')) put('{}/software/scripts/zookeeper.sh'.format(os.getcwd()), '~/', use_sudo=True) sudo("chmod +x zookeeper.sh") sudo(". ~/zookeeper.sh {} {} {}".format(port, zk_server_id, zk_nodes))
def mount_ebs_volumes(host_config): env.host_string = helper.get_env_host_string(host_config) env.user = helper.get_env_user(host_config) env.key_filename = helper.get_env_key_filename(host_config) sudo("apt-get -y install xfsprogs") for ebs in host_config['ec2-mounts']: device = ebs['device'] mount = ebs['mount'] sudo("mkdir -p {}".format(mount)) sudo("mv /etc/fstab /etc/fstab.old") sudo("touch /etc/fstab") if sudo('mkfs.xfs -f {0}'.format(device), warn_only=True): run("echo '{0}\t{1}\txfs\tdefaults\t0\t0' | sudo tee -a /etc/fstab".format(device, mount)) sudo('sudo mount -a') logger.info("EBS volume {} : {} mounted.".format(device, mount))
def connect_to_instance_in_ssh(address, keypair_path, user='root'): """ Run the command LS on a given instance :param address: ip or dns name of a machine :type address: str :param keypair_path: keypair path :type keypair_path: str """ env.host_string = address env.user = user env.parallel = False env.key_filename = keypair_path env.disable_known_hosts = True env.connection_attempts = 10 env.timeout = 120 ocb.log(run('ls -la /root'), level='INFO')
def demo(): answer = prompt('Are you sure you want to DELETE ALL DATA on "{0}" and replace it with demo data? (type "I am sure" to continue):'.format(env.host_string)) if answer != 'I am sure': abort('Aborted!') password = None while not password: password = prompt('Choose a password:') with temp(): put('radar.sql', 'radar.sql') run_db('drop') run_db('create') run_db('restore radar.sql') # Note: user must be a PostgreSQL superuser to run this run_fixtures('users --password {0}'.format(password)) run_fixtures('patients --patients 95 --no-data') run_fixtures('patients --patients 5 --data')
def run_download_db(filename=None): """ Downloads the database from the server into your local machine. In order to import the downloaded database, run ``fab import_db`` Usage:: fab prod run_download_db fab prod run_download_db:filename=foobar.dump """ if not filename: filename = settings.DB_DUMP_FILENAME if env.key_filename: ssh = settings.PROJECT_NAME else: ssh = '{0}@{1}'.format(env.user, env.host_string) local('scp {0}:{1}{2} .'.format( ssh, settings.FAB_SETTING('SERVER_DB_BACKUP_DIR'), filename))
def run_download_media(filename=None): """ Downloads the media dump from the server into your local machine. In order to import the downloaded media dump, run ``fab import_media`` Usage:: fab prod run_download_media fab prod run_download_media:filename=foobar.tar.gz """ if not filename: filename = settings.MEDIA_DUMP_FILENAME if env.key_filename: ssh = settings.PROJECT_NAME else: ssh = '{0}@{1}'.format(env.user, env.host_string) local('scp {0}:{1}{2} .'.format( ssh, settings.FAB_SETTING('SERVER_MEDIA_BACKUP_DIR'), filename))
def run_upload_db(filename=None): """ Uploads your local database to the server. You can create a local dump with ``fab export_db`` first. In order to import the database on the server you still need to SSH into the server. Usage:: fab prod run_upload_db fab prod run_upload_db:filename=foobar.dump """ if not filename: filename = settings.DB_DUMP_FILENAME if env.key_filename: ssh = settings.PROJECT_NAME else: ssh = '{0}@{1}'.format(env.user, env.host_string) local('scp {0} {1}:{3}'.format( filename, ssh, settings.FAB_SETTING('SERVER_DB_BACKUP_DIR')))
def _config_fabric(self): env.host_string = 'localhost' env.warn_only = True for c in output.keys(): output[c] = False
def tsudo(target, cmd): env.host_string = target return sudo(cmd)
def upload_keys(): """ Upload the SSH public/private keys to the remote server via scp """ scp_command = 'scp {} {}/authorized_keys {}@{}:~/.ssh'.format( env.ssh_keys_name + '.pub', env.ssh_keys_dir, env.user_name, env.host_string ) local(scp_command)
def pre_start_hook(app): from ..nodes.models import Node # env.warn_only = True env.user = 'root' env.key_filename = SSH_KEY_FILENAME output.stdout = False output.running = False PLUGIN_DIR = '/usr/libexec/kubernetes/kubelet-plugins/net/exec/kuberdock/' with app.app_context(): for node in Node.query.all(): env.host_string = node.hostname put('./node_network_plugin.sh', PLUGIN_DIR + 'kuberdock') put('./node_network_plugin.py', PLUGIN_DIR + 'kuberdock.py') run('systemctl restart kuberdock-watcher') print 'Kuberdock node parts are updated'
def upload(): """Upload entire project to server""" # Bug: when setting this inside a function. Using host_string as workaround run('mkdir -p /srv/images/'+env.project_name+'/') rsync_project( env.project_dir, './', exclude=( '.git', '.gitignore', '__pycache__', '*.pyc', '.DS_Store', 'environment.yml', 'fabfile.py', 'Makefile', '.idea', 'bower_components', 'node_modules', '.env.example', 'README.md', 'var' ), delete=True) # Wrapper Functions:
def set_server_config(json_data): env.host_string = json_data.get('server_ip', '127.0.0.1') env.user = json_data.get('user', getuser()) env.password = json_data.get('password', '')
def backup_db(db_name, db_user, password, db_server=env.host_string): with cd('/tmp'): local('pg_dump -U {user} -h {ip} {db} > {db}-bak.sql'.format( user=db_user, ip=db_server, pwd=password, db=db_name ))
def restore_db(db_name, db_user, password, db_server=env.host_string): local('psql -U {user} -h {ip} {db} < /tmp/{db}-bak.sql'.format( user=db_user, ip=db_server, pwd=password, db=db_name ))
def retrieve_file_names(files): """ ??????????????????????????? Uploader ?????????????????????? """ filenames = [os.path.basename(each) for each in files] for each_file in files: if not file_exists_check: raise Exception("File {} doesn't exist on {}".format(each_file, env.host_string)) if len(set(filenames)) != len(files): raise Exception('Duplicate file names in the files: {}'.format(files)) return filenames
def mkdir(self, remote_dir): with settings(host_string=self.int_ip): run(' [ -d {0} ] || mkdir -p {0} '.format(remote_dir))
def ext_ip(self): """ The external ip for the game server. Will try to get only once and save to self._ext_ip for later use. """ if not self._ext_ip: with settings(host_string=self.int_ip): self._ext_ip = run('''curl -s ip.cn |awk '{split($2,x,"?");print x[2]}' ''') return self._ext_ip
def _operation(self, action): with settings(host_string=self.int_ip): run('set -m; /app/{}/backend/bin/startup.sh {} && sleep 0.2'.format(self.name, action), warn_only=True)
def sql_exec(self, sql_file): with settings(host_string=self.int_ip): run('pandora --update {} <{}'.format(self.name, sql_file))
def remove(self, remote_dir=None): """????????????????(nginx, backstage)""" if remote_dir is None: release_type = 'game_backup' remote_dir = '/app/opbak/{}_{}'.format(release_type, TIMESTAMP) self.stop() self.mkdir(remote_dir) with settings(host_string=self.int_ip): with cd('/app'): run('mv {} {}/'.format(self.name, remote_dir)) with cd(remote_dir): run('pandora --dump --opt -R {0}>{0}.sql.rb{1}'.format(self.name, TIMESTAMP)) run('mv /app/nginx/conf/vhost/{}.conf ./'.format(self.name)) reload_nginx() run('pandora --update -e "DROP DATABASE {}"'.format(self.name)) with cd('/app/{}_backstage'.format(self.game)): run('cp socket_gameserver.ini {}/'.format(remote_dir)) run("sudo -u agent sed -i '/\\b{}\\b/d' socket_gameserver.ini".format(self.name)) run("set -m; sudo -u agent /bin/bash start.sh restart")
def upload_log(self, logtype=None, date=None, logfile=None, ftp_ip=None): """ :: Example pandora --ftp -r 30 -t 1200 -z -m 42.62.119.164 /tjmob_log/tjmob_37wan_1 /app/tjmob_37wan_1/backend/logs/game/dayreport/dayreport_2015-05-03.log.bz2* """ from bible.utils import BZIP2 ftp_log_path = '/{}_log/{}'.format(self.game, self.name) logtypes = ['dayreport', 'rtreport'] date = date if date else time.strftime('%Y-%m-%d') ftp_ip = ftp_ip if ftp_ip else '42.62.119.164' if logfile: logfiles = [logfile] else: if logtype: logfiles = ['/app/{0}/backend/logs/game/{1}/{1}_{2}.log'.format(self.name, each_type, date) for each_type in logtype.split(',')] else: logfiles = ['/app/{0}/backend/logs/game/{1}/{1}_{2}.log'.format(self.name, each_logtype, date) for each_logtype in logtypes] for each_log in logfiles: dir, filename = os.path.split(each_log) file_bz2 = '{}.bz2'.format(filename) file_md5 = '{}.MD5'.format(file_bz2) with settings(host_string=self.int_ip), cd(dir): run('[ -f {0} ] && echo "{0} already exists" || {1} {2}'.format(file_bz2, BZIP2, filename)) run('[ -f {0} ] && echo "{0} already exists" || md5sum {1} >{0}'.format(file_md5, file_bz2)) with settings(host_string=self.int_ip): run('''pandora --ftp -r 30 -t 1200 -z -m {} {} {}.bz2*'''.format(ftp_ip, ftp_log_path, each_log) )
def transform(self, gameServers, all_gameServer_info=None): """ Transform funcion. eg: it will transformat from ['astd_37wan_2', 'astd_51wan_99', 'astd_uoyoo_90'] to { '10.6.20.1':['astd_37wan_2', 'astd_51wan_99'], '10.6.20.2':['astd_uoyoo_90'] } """ if not all_gameServer_info: all_gameServer_info = self.all_gameServer_info IPS = list(set([ all_gameServer_info[each] for each in gameServers ])) locate_game_servers = { each:[] for each in IPS } for each in gameServers: locate_game_servers[all_gameServer_info[each]].append(each) return locate_game_servers # def sql_content_exec(self, gameServers, sql_content, backup='Yes', remote_dir=REMOTE_DIR): # locate_game_servers = self.transform(gameServers) # ips = locate_game_servers.keys() # # def _sql_content_exec(sql_content, locate_game_servers, backup): # for gameServer in locate_game_servers[env.host_string]: # backup_dir = '{}/{}'.format(remote_dir, gameServer) # run('[ -d {0} ] || mkdir -p {0}'.format(backup_dir)) # if backup.lower() == 'yes': # run('pandora --dump --opt -R {0} >{1}/rollback_{0}.sql'.format(gameServer, backup_dir)) # run('''pandora --update {} -e '{}' '''.format(gameServer, sql_content)) # # execute(_sql_content_exec, sql_content, locate_game_servers, backup=backup, hosts=ips) #
def modify_file(filename, operation, key, value, comment='null'): for gameServer in LOCATE_GAME_SRVS[env.host_string]: with cd('/app/{}/backend/apps'.format(gameServer)): conf = Config(filename) operate_method = getattr(conf, operation) operate_method(key, value, comment)
def load_file(game_servers, local_file, remote_file, load_type='upload'): test_server_info = get_test_server_info(GAME) check_game_servers(game_servers, test_server_info) locate_game_servers = transform_gameservers(game_servers, test_server_info) ips = locate_game_servers.keys() @hosts(ips) def _upload_file(): upload(local_file, REMOTE_DIR, env.host_string) for game_server in locate_game_servers[env.host_string]: replace_file(game_server, remote_file) @hosts(ips) def _download_file(): for game_server in locate_game_servers[env.host_string]: local_path = '{}/{}/'.format(local_root_path, game_server) local('su - astd -c "mkdir -p {}"'.format(local_path)) target_file = '/app/{}/{}'.format(game_server, remote_file) with quiet(): target_file_exists = run('test -f {}'.format(target_file)).succeeded if target_file_exists: get(target_file, local_path) else: raise Exception('File {} NOT exists on {}'.format(target_file, game_server)) local('chown -R astd.astd {}'.format(local_root_path)) if load_type == 'upload': ftp_file_check(local_file) file_name_consistence_check(local_file, remote_file) execute(_upload_file) print('{} was uploaded to {} successfully.'.format(local_file, game_servers)) elif load_type == 'download': ftp_path = 'download/{}/{}'.format(GAME, TIMESTAMP) local_root_path = '/app/online/{}'.format(ftp_path) execute(_download_file) print('Downloaded remote file: {} to FTP: {}/'.format(remote_file, ftp_path))
def export_db(game_servers, export_type='data'): test_server_info = get_test_server_info(GAME) check_game_servers(game_servers, test_server_info) locate_game_servers = transform_gameservers(game_servers, test_server_info) ips = locate_game_servers.keys() ftp_path = 'download/{}/{}'.format(GAME, TIMESTAMP) local_root_path = '/app/online/{}'.format(ftp_path) @hosts(ips) def _export_db(): for game_server in locate_game_servers[env.host_string]: local_path = '{}/{}/'.format(local_root_path, game_server) local('su - astd -c "mkdir -p {}"'.format(local_path)) run('mkdir -p {}'.format(REMOTE_DIR)) sql_name = '{}.sql.rb{}'.format(game_server, TIMESTAMP) if export_type == 'no-data': run('pandora --dump -R --opt -d {} >{}/{}'.format(game_server, REMOTE_DIR, sql_name)) elif export_type == 'data': run('pandora --dump -R --opt {} >{}/{}'.format(game_server, REMOTE_DIR, sql_name)) with cd(REMOTE_DIR): run('tar zcf {0}.tgz {0}'.format(sql_name)) target_file = '{}/{}.tgz'.format(REMOTE_DIR, sql_name) get(target_file, local_path) local('chown -R astd.astd {}'.format(local_root_path)) print('Start dumping db...') sys.stdout.flush() execute(_export_db) print('Downloaded db to FTP: {}/'.format(ftp_path))
def upload_to_resource_server(game, file): dir, filename = os.path.split(file) resource_dir = '/app/www/{}/{}/{}'.format(game, RELEASE_TYPE, TIMESTAMP) resource_ip = gameOption('www_ssh_ip') execute(mk_remote_dir, resource_dir, hosts=[resource_ip]) with lcd(dir), settings(host_string=resource_ip): put(filename, resource_dir) put('md5.txt', resource_dir) #local('{} {}/{{{},md5.txt}} {}:{}/'.format(RSYNC, dir, filename, resource_ip, resource_dir))
def update(restart='No'): for gameServer in LOCATE_GAME_SRVS[env.host_string]: if restart == 'Yes': stop_gameServer(gameServer) if restart == 'Yes': start_gameServer(gameServer)
def transfer(file): from bible.utils import RSYNC file_dir, filename = os.path.split(file) remote_dir = '''/app/opbak/{}'''.format(TIME) run(''' [ -d {0} ] || mkdir -p {0} '''.format(remote_dir)) #cmd = ''' {rsync} {file_dir}/{{{filename},md5.txt}} {ssh_user}@{target_host}:{remote_dir}/ '''.format(file_dir=file_dir, target_host=env.host_string, remote_dir=remote_dir, ssh_user=env.user, filename=filename, rsync=RSYNC) #local(cmd) with lcd(file_dir): put(filename, remote_dir) put('md5.txt', remote_dir) with cd(remote_dir): run('dos2unix md5.txt && md5sum -c md5.txt')
def update(fVersion, bVersion, sql_file, maindland, restart='No'): if sql_file.lower() != '/app/online/{}/no'.format(GAME): transfer(sql_file) for gameServer in LOCATE_GAME_SRVS[env.host_string]: if restart in ['Yes', 'Restart', 'Stop']: stop_gameServer(gameServer) if bVersion.lower() != 'no': update_backend(gameServer, bVersion, maindland) if fVersion.lower() != 'no': update_frontend(gameServer, fVersion, maindland) if sql_file.lower() != '/app/online/{}/no'.format(GAME): sql_exec(gameServer, sql_file) if restart in ['Yes', 'Restart', 'Start']: start_gameServer(gameServer)
def disable_transparent_huge_pages(_host): env.host_string = _host run('echo "never" | sudo tee /sys/kernel/mm/transparent_hugepage/enabled') put('{}/bootstrap/scripts/disable-thp.sh'.format(os.getcwd()), '~/', use_sudo=True) sudo("chmod +x disable-thp.sh") sudo(". ~/disable-thp.sh")
def set_swapiness(_host, _swap_percent): env.host_string = _host sudo('sudo sysctl vm.swappiness={}'.format(_swap_percent)) run('echo "vm.swappiness = {}" | sudo tee -a /etc/sysctl.conf'.format(_swap_percent))
def set_overcommit_memory(_host, _value): env.host_string = _host sudo('sysctl vm.overcommit_memory={}'.format(_value)) run('echo "vm.overcommit_memory = {}" | sudo tee -a /etc/sysctl.conf'.format(_value))
def restart_all(config_file): """Restarts crate service on all hosts""" cfg = helper.get_config(config_file) for host_config in cfg['hosts']: env.host_string = helper.get_env_host_string(host_config) env.user = helper.get_env_user(host_config) env.key_filename = helper.get_env_key_filename(host_config) sudo('service crate restart')
def broker_install(host_config): env.host_string = helper.get_env_host_string(host_config) env.user = helper.get_env_user(host_config) env.key_filename = helper.get_env_key_filename(host_config) java.v8_install(host_config) software_config = helper.get_software_config(host_config, 'kafka-broker') version = software_config.get('version', '0.10.0.1') put('{}/software/scripts/kafka-broker.sh'.format(os.getcwd()), '~/', use_sudo=True) sudo("chmod +x kafka-broker.sh") sudo(". ~/kafka-broker.sh {}".format(version)) broker_id = software_config.get('broker-id', '0') zk_hosts = software_config.get('zookeeper-hosts', 'localhost:2181') log_directories = software_config.get('log-directories', '/var/lib/kafka-logs') tag = '## ---- CUSTOM CONFIGURATION ---' sudo('echo "{}" | sudo tee -a /srv/kafka/config/server.properties'.format(tag)) sudo('echo "delete.topic.enable = true" | sudo tee -a /srv/kafka/config/server.properties') sudo('echo "broker.id={}" | sudo tee -a /srv/kafka/config/server.properties'.format(broker_id)) sudo('echo "zookeeper.connect={}" | sudo tee -a /srv/kafka/config/server.properties'.format(zk_hosts)) sudo('echo "log.dirs={}" | sudo tee -a /srv/kafka/config/server.properties'.format(log_directories)) sudo('echo "listeners=PLAINTEXT://{}:9093" | sudo tee -a /srv/kafka/config/server.properties'.format(host_config['private-ip'])) sudo('echo "{}" | sudo tee -a /srv/kafka/config/server.properties'.format(tag)) sudo("service kafka restart")
def manager_install(host_config): env.host_string = helper.get_env_host_string(host_config) env.user = helper.get_env_user(host_config) env.key_filename = helper.get_env_key_filename(host_config) software_config = helper.get_software_config(host_config, 'kafka-manager') zk_hosts = software_config.get('zookeeper-hosts', 'localhost:2181') put('{}/software/scripts/kafka-manager.sh'.format(os.getcwd()), '~/', use_sudo=True) sudo("chmod +x kafka-manager.sh") sudo(". ~/kafka-manager.sh {}".format(zk_hosts))
def create_topic(config_file, topic, replication_factor=1, partitions=1): """Creates a Kafka topic | args: config_file, topic name, replication factor, partitions""" cfg = helper.get_config(config_file) host_config = get_kafka_host_cfg(cfg) cmd = "/srv/kafka/bin/kafka-topics.sh --create --zookeeper {} " \ "--replication-factor {} --partitions {} --topic {}".format(get_zk_host(cfg), replication_factor, partitions, topic) env.host_string = helper.get_env_host_string(host_config) env.user = helper.get_env_user(host_config) env.key_filename = helper.get_env_key_filename(host_config) sudo(cmd)
def delete_topic(config_file, topic): """Deletes a Kafka topic | args: config_file, topic name""" cfg = helper.get_config(config_file) host_config = get_kafka_host_cfg(cfg) cmd = "/srv/kafka/bin/kafka-topics.sh --delete --zookeeper {} --topic {}".format(get_zk_host(cfg), topic) env.host_string = helper.get_env_host_string(host_config) env.user = helper.get_env_user(host_config) env.key_filename = helper.get_env_key_filename(host_config) sudo(cmd)