我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用subprocess.run()。
def preprocess_source(self, in_file, additional_args=[]): import subprocess self._args.extend(self._build_compiler_flags()) self._args.extend(additional_args) result = subprocess.run(self._args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if result.returncode == 0: return result.stdout else: if result.stderr: Style.error('Preprocess failed: ') print(result.stderr) return ''
def del_addr(linkname, address): try: subprocess.run(['ip', 'address', 'del', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, str(linkname)] except subprocess.CalledProcessError as suberror: return [False, "delete address failed : %s" % suberror.stdout.decode('utf-8')] # ovs-vsctl list-br # ovs-vsctl br-exists <Bridge> # ovs-vsctl add-br <Bridge> # ovs-vsctl del-br <Bridge> # ovs-vsctl list-ports <Bridge> # ovs-vsctl del-port <Bridge> <Port> # ovs-vsctl add-port <Bridge> <Port> -- set interface <Port> type=gre options:remote_ip=<RemoteIP> # ovs-vsctl add-port <Bridge> <Port> tag=<ID> -- set interface <Port> type=internal # ovs-vsctl port-to-br <Port> # ovs-vsctl set Port <Port> tag=<ID> # ovs-vsctl clear Port <Port> tag
def build_package(builder_image, package_type, version, out_dir, dependencies): """ Build a deb or RPM package using a fpm-within-docker Docker image. :param package_type str: "rpm" or "deb". :param version str: The package version. :param out_dir Path: Directory where package will be output. :param dependencies list: package names the resulting package should depend on. """ run([ "docker", "run", "--rm", "-e", "PACKAGE_VERSION=" + version, "-e", "PACKAGE_TYPE=" + package_type, "-v", "{}:/build-inside:rw".format(THIS_DIRECTORY), "-v", "{}:/source:rw".format(THIS_DIRECTORY.parent), "-v", str(out_dir) + ":/out", "-w", "/build-inside", builder_image, "/build-inside/build-package.sh", *dependencies ], check=True)
def execute_task(host, tasks): """Call Fabric to execute tasks against a host Return: CompletedProcess instance """ # TODO: add support for groups, multiple hosts # TODO: add support for providing input data from team files return subprocess.run( [ PYTHON2_PATH, FABRIC_PATH, '--abort-on-prompts', '--hosts=%s' % host, '--fabfile=%s' % FABFILE_PATH, *tasks, ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Combine out/err into stdout; stderr will be None universal_newlines=True, check=True, )
def _upload_artifacts_to_version(self): """Recursively upload directory contents to S3.""" if not os.listdir(self.artifact_path) or not self.artifact_path: raise S3ArtifactNotFound uploaded = False if self.s3props.get("content_metadata"): LOG.info("Uploading in multiple parts to set metadata") uploaded = self.content_metadata_uploads() if not uploaded: cmd = 'aws s3 sync {} {} --delete --exact-timestamps --profile {}'.format(self.artifact_path, self.s3_version_uri, self.env) result = subprocess.run(cmd, check=True, shell=True, stdout=subprocess.PIPE) LOG.debug("Upload Command Ouput: %s", result.stdout) LOG.info("Uploaded artifacts to %s bucket", self.bucket)
def _sync_to_uri(self, uri): """Copy and sync versioned directory to uri in S3. Args: uri (str): S3 URI to sync version to. """ cmd_cp = 'aws s3 cp {} {} --recursive --profile {}'.format(self.s3_version_uri, uri, self.env) # AWS CLI sync does not work as expected bucket to bucket with exact timestamp sync. cmd_sync = 'aws s3 sync {} {} --delete --exact-timestamps --profile {}'.format( self.s3_version_uri, uri, self.env) cp_result = subprocess.run(cmd_cp, check=True, shell=True, stdout=subprocess.PIPE) LOG.debug("Copy to %s before sync output: %s", uri, cp_result.stdout) LOG.info("Copied version %s to %s", self.version, uri) sync_result = subprocess.run(cmd_sync, check=True, shell=True, stdout=subprocess.PIPE) LOG.debug("Sync to %s command output: %s", uri, sync_result.stdout) LOG.info("Synced version %s to %s", self.version, uri)
def test_example_manuscript(manuscript): """ Test command line execution of manubot to build an example manuscript. """ manuscript_dir = directory.joinpath('manuscripts', manuscript) args = [ 'manubot', '--log-level', 'INFO', '--content-directory', manuscript_dir.joinpath('content'), '--output-directory', manuscript_dir.joinpath('output'), ] if manuscript == 'variables': args.extend([ '--template-variables-path', manuscript_dir.joinpath('content/template-variables.json'), ]) process = subprocess.run( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) print(process.args) print(process.stderr.decode()) assert process.returncode == 0
def do_start_build_stuff(ctx): config = ctx.obj.config solution_num = ctx.obj.solution_num try: file = click.open_file("run_hls.tcl","w") file.write("open_project " + config["project_name"] + "\n") file.write("set_top " + config["top_level_function_name"] + "\n") for src_file in config["src_files"]: file.write("add_files " + config["src_dir_name"] + "/" + src_file + "\n") for tb_file in config["tb_files"]: file.write("add_files -tb " + config["tb_dir_name"] + "/" + tb_file + "\n") if ctx.params['keep']: file.write("open_solution -reset \"solution" + str(solution_num) + "\"" + "\n") else: file.write("open_solution \"solution" + str(solution_num) + "\"" + "\n") file.write("set_part \{" + config["part_name"] + "\}" + "\n") file.write("create_clock -period " + config["clock_period"] + " -name default" + "\n") return file except OSError: click.echo("Woah! Couldn't create a Tcl run file in the current folder!") raise click.Abort() # Function to write a default build into the HLS Tcl build script.
def build_end_callback(ctx,sub_command_returns,keep,report): # Catch the case where no subcommands have been issued and offer a default build if not sub_command_returns: if click.confirm("No build stages specified, would you like to run a default sequence using all the build stages?", abort=True): do_default_build(ctx) ctx.obj.file.write("exit" + "\n") ctx.obj.file.close() # Call the Vivado HLS process hls_processs = subprocess.run(["vivado_hls", "-f", "run_hls.tcl"]) # Check return status of the HLS process. if hls_processs.returncode < 0: raise click.Abort() elif hls_processs.returncode > 0: click.echo("Warning: HLS Process returned an error, skipping report opening!") raise click.Abort() else: do_end_build_stuff(ctx,sub_command_returns,report) # csim subcommand
def volume_plugin(colors): path=os.path.realpath(__file__) path=os.path.dirname(path) p=subprocess.run(['sh',path+"/volume_level.sh"], stdout=subprocess.PIPE) string='' for c in str(p.stdout): if c.isnumeric(): string+=c level=int(string) p=subprocess.run(['sh', path+"/volume_muted.sh"], stdout=subprocess.PIPE) if str(p.stdout)[2]=='y': muted=True else: muted=False string='%{F'+colors["lwhite"]+'}' if level>0 and not muted: if level>=50: string+='\uf028' else: string+='\uf027' else: string+='\uf026' string+=" %{F"+colors["lwhite"]+"}%3.0f"%level+'%' return string
def generate_ssh_key(note, keypath='github_deploy_key'): """ Generates an SSH deploy public and private key. Returns the public key as a str. """ p = subprocess.run(['ssh-keygen', '-t', 'rsa', '-b', '4096', '-C', note, '-f', keypath, '-N', '']) if p.returncode: raise RuntimeError("SSH key generation failed") with open(keypath + ".pub") as f: key = f.read() os.remove(keypath + ".pub") return key
def guess_github_repo(): """ Guesses the github repo for the current directory Returns False if no guess can be made. """ p = subprocess.run(['git', 'ls-remote', '--get-url', 'origin'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False) if p.stderr or p.returncode: return False url = p.stdout.decode('utf-8').strip() m = GIT_URL.fullmatch(url) if not m: return False return m.group(1)
def checkout_deploy_branch(deploy_branch, canpush=True): """ Checkout the deploy branch, creating it if it doesn't exist. """ # Create an empty branch with .nojekyll if it doesn't already exist create_deploy_branch(deploy_branch, push=canpush) remote_branch = "doctr_remote/{}".format(deploy_branch) print("Checking out doctr working branch tracking", remote_branch) clear_working_branch() # If gh-pages doesn't exist the above create_deploy_branch() will create # it we can push, but if we can't, it won't and the --track would fail. if run(['git', 'rev-parse', '--verify', remote_branch], exit=False) == 0: extra_args = ['--track', remote_branch] else: extra_args = [] run(['git', 'checkout', '-b', DOCTR_WORKING_BRANCH] + extra_args) print("Done") return canpush
def push_docs(deploy_branch='gh-pages', retries=3): """ Push the changes to the branch named ``deploy_branch``. Assumes that :func:`setup_GitHub_push` has been run and returned True, and that :func:`commit_docs` has been run. Does not push anything if no changes were made. """ code = 1 while code and retries: print("Pulling") code = run(['git', 'pull', '-s', 'recursive', '-X', 'ours', 'doctr_remote', deploy_branch], exit=False) print("Pushing commit") code = run(['git', 'push', '-q', 'doctr_remote', '{}:{}'.format(DOCTR_WORKING_BRANCH, deploy_branch)], exit=False) if code: retries -= 1 print("Push failed, retrying") time.sleep(1) else: return sys.exit("Giving up...")
def run_cmd(cmd, quiet=False): if not quiet: logging.info('command: {}'.format(cmd)) # use shlex to keep quoted substrings result = run(shlex.split(cmd), stdout=PIPE, stderr=PIPE) stdout = result.stdout.strip().decode() stderr = result.stderr.strip().decode() if stdout and not quiet: logging.debug(stdout) if stderr and not quiet: logging.warning(stderr) return result.stdout.strip()
def meld(got, expected): if got == expected: return import inspect call_frame = inspect.getouterframes(inspect.currentframe(), 2) test_name = call_frame[1][3] from pprint import pformat import os from os import path os.makedirs(test_name, exist_ok=True) got_fn = path.join(test_name, 'got') expected_fn = path.join(test_name, 'expected') with open(got_fn, 'w') as got_f, open(expected_fn, 'w') as expected_f: got_f.write(pformat(got)) expected_f.write(pformat(expected)) import subprocess subprocess.run(['meld', got_fn, expected_fn])
def call_say(self, txt: str, speed=None, pitch=None, literal=False): if self.get_option(self.Options.USE_ESPEAK): args = ["espeak"] if pitch: args += ["-p", str(pitch)] if speed: args += ["-s", str(speed)] if literal: txt = " ".join(txt) args.append(txt) else: args = ["say"] if pitch: txt = f"[[ pbas +{pitch}]] {txt}" if speed: args += ["-r", str(speed)] if literal: txt = f"[[ char LTRL ]] {txt}" args.append(txt) if self.enabled: logger.debug(f"Saying '{txt}'") subprocess.run(args)
def main(): parser = argparse.ArgumentParser( description="Backup system/data using dar and par2") parser.add_argument("-c", "--config", dest="config", required=True, help="configuration file for dar and archive. " + "NOTE: the backup archive will be placed under " + "the same directory as this configuration file") parser.add_argument("-n", "--dry-run", dest="dry_run", action="store_true", help="dry run, do not perform any action") parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", help="show verbose information") args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.INFO) settings = DarSettings(args.config, verbose=args.verbose, dry_run=args.dry_run) dar = DarBackup(settings) dar.run(dry_run=args.dry_run)
def run(self): ''' Restarts the RabbitMQ broker using a method derived from the TEST_DISTRIBUTION environmental variable. If TEST_DISTRIBUTION=="arch", will try to restart rabbitmq using the linux ``systemctl`` command. If TEST_DISTRIBUTION=="ubuntu", will try to restart rabbitmq using the linux ``service`` command. Will wait for 20 seconds after restarting before returning. :raises ValueError: if TEST_DISTRIBUTION environmental variable not found .. note:: the user who invokes this method will likely require sudo access to the linux commands. This can be provided by editing the sudoers file. ''' await self.loop.run_in_executor(None, self._run) await asyncio.sleep(20)
def get_vpn_connections(): try: output = subprocess.run(['nmcli', '--mode', 'tabular', '--terse', '--fields', 'TYPE,NAME', 'connection', 'show'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) output.check_returncode() lines = output.stdout.decode('utf-8').split('\n') vpn_connections = [] for line in lines: if line: elements = line.strip().split(':') if (elements[0] == 'vpn'): vpn_connections.append(elements[1]) return vpn_connections except subprocess.CalledProcessError: error = utils.format_std_string(output.stderr) logger.error(error) return False
def get_interfaces(wifi=True, ethernet=True): try: output = subprocess.run(['nmcli', '--mode', 'tabular', '--terse', '--fields', 'TYPE,DEVICE', 'device', 'status'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) output.check_returncode() lines = output.stdout.decode('utf-8').split('\n') interfaces = [] for line in lines: if line: elements = line.strip().split(':') if (wifi and elements[0] == 'wifi') or (ethernet and elements[0] == 'ethernet'): interfaces.append(elements[1]) return interfaces except subprocess.CalledProcessError: error = utils.format_std_string(output.stderr) logger.error(error) return False except Exception as ex: logger.error(ex) return False
def get_num_processes(num_servers): # Since each process is not resource heavy and simply takes time waiting for pings, maximise the number of processes (within constraints of the current configuration) # Maximum open file descriptors of current configuration soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) # Find how many file descriptors are already in use by the parent process ppid = os.getppid() used_file_descriptors = int(subprocess.run('ls -l /proc/' + str(ppid) + '/fd | wc -l', shell=True, stdout=subprocess.PIPE).stdout.decode('utf-8')) # Max processes is the number of file descriptors left, before the sof limit (configuration maximum) is reached max_processes = int((soft_limit - used_file_descriptors) / 2) if num_servers > max_processes: return max_processes else: return num_servers
def extract_all_features(save_dir, data_dir=DATA_DIR, extension=".cell"): from naive_bayes import extract_nb_features from random_forest import extract_rf_features from svc1 import extract_svc1_features from svc2 import extract_svc2_features import subprocess create_dir_if_not_exists(save_dir + '/knn_cells/') subprocess.run([ 'go', 'run', dirname + '/kNN.go', '-folder', data_dir + '/', '-new_path', save_dir + '/knn_cells/', '-extension', extension] ) # extract_features(extract_nb_features, save_dir + '/nb_cells', data_dir=data_dir, extension=extension, model_name="naive bayes") extract_features(extract_rf_features, save_dir + '/rf_cells', data_dir=data_dir, extension=extension, model_name="random forest") extract_features(extract_svc1_features, save_dir + '/svc1_cells', data_dir=data_dir, extension=extension, model_name="svc1") extract_features(extract_svc2_features, save_dir + '/svc2_cells', data_dir=data_dir, extension=extension, model_name="svc2") stdout.write("Finished extracting features\n")
def fixRPath( self, filename ): all_rpaths = self.getRPaths( filename ) all_replacements = [] print( 'Checking RPATH for %s' % (filename,) ) for rpath in all_rpaths: print( ' Looking at RPATH %s' % (rpath,) ) for all_orig_rpaths, replacement in self.all_rpath_replacements: all_replacements.append( replacement ) for orig in all_orig_rpaths: if rpath == orig: print( ' Need to replace %s with %s' % (rpath, replacement) ) subprocess.run( ['install_name_tool', '-rpath', rpath, replacement, filename], check=True ) all_rpaths = self.getRPaths( filename ) for rpath in all_rpaths: if rpath not in all_replacements: print( ' Delete unused rpath %s' % (rpath,) ) subprocess.run( ['install_name_tool', '-delete_rpath', rpath, filename], check=True )
def getRPaths( self, filename ): all_rpaths = [] res = subprocess.run( ['otool', '-l', filename], stdout=subprocess.PIPE, universal_newlines=True ) state = self.ST_IDLE for line in res.stdout.split('\n'): words = line.strip().split() if state == self.ST_IDLE: if words == ['cmd','LC_RPATH']: state = self.ST_RPATH elif state == self.ST_RPATH: if words[0:1] == ['path']: path = words[1] all_rpaths.append( path ) state = self.ST_IDLE return all_rpaths
def getDylibs( self, filename ): all_dylibs = [] res = subprocess.run( ['otool', '-l', filename], stdout=subprocess.PIPE, universal_newlines=True ) state = self.ST_IDLE for line in res.stdout.split('\n'): words = line.strip().split() if state == self.ST_IDLE: if words == ['cmd','LC_LOAD_DYLIB']: state = self.ST_DYNLIB elif state == self.ST_DYNLIB: if words[0:1] == ['name']: path = words[1] all_dylibs.append( path ) state = self.ST_IDLE return all_dylibs
def write_raspa_file(filename, uuid, simulation_config): """Writes RASPA input file for calculating helium void fraction. Args: filename (str): path to input file. run_id (str): identification string for run. material_id (str): uuid for material. Writes RASPA input-file. """ # Load simulation parameters from config values = { 'NumberOfCycles' : simulation_config['simulation_cycles'], 'FrameworkName' : uuid} # Load template and replace values input_data = load_and_subs_template('helium_void_fraction.input', values) # Write simulation input-file with open(filename, "w") as raspa_input_file: raspa_input_file.write(input_data)
def write_raspa_file(filename, uuid, simulation_config): """Writes RASPA input file for calculating surface area. Args: filename (str): path to input file. run_id (str): identification string for run. material_id (str): uuid for material. Writes RASPA input-file. """ # Load simulation parameters from config values = { 'NumberOfCycles' : simulation_config['simulation_cycles'], 'FrameworkName' : uuid} # Load template and replace values input_data = load_and_subs_template('surface_area.input', values) # Write simulation input-file with open(filename, "w") as raspa_input_file: raspa_input_file.write(input_data)
def flush_git_rm_files(): if git_rm_files: try: subprocess.run(["git", "rm", "-f", *git_rm_files], stdout=subprocess.PIPE, stderr=subprocess.PIPE).check_returncode() except subprocess.CalledProcessError: pass # clean up for path in git_rm_files: try: os.unlink(path) except FileNotFoundError: pass git_rm_files.clear() # @subcommand # def noop(): # "Do-nothing command. Used for blurb smoke-testing." # pass
def run_chef_knife(host): knife = "knife bootstrap --no-host-key-verify " \ "--ssh-user root --ssh-identity-file %s/.ssh/id_rsa_prox " \ "--environment=scicomp-env-compute " \ '--server-url "https://chef.fhcrc.org/organizations/cit" ' \ "--run-list 'role[cit-base]','role[scicomp-base]' " \ "--node-name %s " \ "%s" % (homedir,host,host) if host == 'hostname': print('you can also execute this knife command manually:') print('************************************') print(knife) print('************************************') else: if os.path.exists('%s/.chef' % homedir): print('*** executing knife command:') print(knife) ret = subprocess.run(knife, shell=True) else: print ('chef/knife config dir %s/.chef does not exist.' % homedir)
def load_into_pgsql(self, capture_stderr=True): if not os.path.exists(self.overpass_filename): return 'no data from overpass to load with osm2pgsql' if os.stat(self.overpass_filename).st_size == 0: return 'no data from overpass to load with osm2pgsql' cmd = self.osm2pgsql_cmd() if not capture_stderr: p = subprocess.run(cmd, env={'PGPASSWORD': current_app.config['DB_PASS']}) return p = subprocess.run(cmd, stderr=subprocess.PIPE, env={'PGPASSWORD': current_app.config['DB_PASS']}) if p.returncode != 0: if b'Out of memory' in p.stderr: return 'out of memory' else: return p.stderr.decode('utf-8')
def chunk(self): chunk_size = utils.calc_chunk_size(self.area_in_sq_km) chunks = self.chunk_n(chunk_size) print('chunk size:', chunk_size) files = [] for num, chunk in enumerate(chunks): filename = self.chunk_filename(num, len(chunks)) # print(num, q.count(), len(tags), filename, list(tags)) full = os.path.join('overpass', filename) files.append(full) if os.path.exists(full): continue oql = self.oql_for_chunk(chunk, include_self=(num == 0)) r = overpass.run_query_persistent(oql) if not r: print(oql) assert r open(full, 'wb').write(r.content) cmd = ['osmium', 'merge'] + files + ['-o', self.overpass_filename] print(' '.join(cmd)) subprocess.run(cmd)
def run(self): tstart = datetime.now() msg = "{} Starting validator: {}".format(tstart, self.vcf_file) self.log(msg) std = self.validate() tend = datetime.now() annotation_time = tend - tstart msg = "{} Finished validator, it took: {}".format(tend, annotation_time) self.log(msg) # print(tend, 'Finished validator, it took: ', annotation_time) return std # Validate vcf file with Vcftools
def _get_gitinfo(): import pygrunt.platform as platform import subprocess from pathlib import Path git = platform.current.find_executable('git') if git is None: # No git installed; assume we're on master return ('master', '') cwd = str(Path(__file__).parent) args = [git, 'rev-parse', '--abbrev-ref', 'HEAD'] result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True) if result.returncode != 0: # Quietly return defaults on fail return ('master', '') branch = result.stdout args = [git, 'rev-parse', 'HEAD'] result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True) if result.returncode != 0: # Quietly return defaults on fail return ('master', '') commit = result.stdout return (branch.strip(), commit.strip())
def compile_object(self, in_file, out_file): import subprocess in_file = Path(in_file) out_file = Path(out_file) # Skip compile if RecompileStrategy says so # Since preprocess_source ( possibly used by recompile ) also modifies self._args, # we gotta back it up # TODO: Maybe use something more elegant than self._args? old_args = self._args if out_file.is_file(): if not self.recompile.should_recompile(str(in_file)): # Style.info('Nothing to do with', in_file) return True self._args = old_args Path(out_file).parent.mkdir(parents=True, exist_ok=True) self._args.extend(self._build_compiler_flags()) result = subprocess.run(self._args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) # TODO: do something useful with output if result.stdout: print(result.stdout) if result.stderr: print(result.stderr) return result.returncode == 0
def link_executable(self, in_files, out_file): import subprocess Path(out_file).parent.mkdir(parents=True, exist_ok=True) self._args.extend(self._build_linker_flags()) result = subprocess.run(self._args) return result.returncode == 0
def link_library(self, in_files, out_file): import subprocess Path(out_file).parent.mkdir(parents=True, exist_ok=True) self._args.extend(self._build_linker_flags()) result = subprocess.run(self._args) return result.returncode == 0
def clear(): subprocess.run(["clear"])
def sleep(_time): """genera una pausa la cantidad de segundos indicados por _time Args: _time (int): tiempo de la pausa en segundos """ subprocess.run(["sleep", str(_time)+"s"])
def sys_run(command,check=False): Ret = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, shell=True, check=check) return Ret
def list_links(): try: ret = subprocess.run(['ip', 'link', 'show'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) links = ipcontrol.parse(ret.stdout.decode('utf-8')) return [True, list(links.keys())] except subprocess.CalledProcessError as suberror: return [False, "list links failed : %s" % suberror.stdout.decode('utf-8')]
def link_exist(linkname): try: subprocess.run(['ip', 'link', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return True except subprocess.CalledProcessError: return False
def link_info(linkname): try: ret = subprocess.run(['ip', 'address', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, ipcontrol.parse(ret.stdout.decode('utf-8'))[str(linkname)]] except subprocess.CalledProcessError as suberror: return [False, "get link info failed : %s" % suberror.stdout.decode('utf-8')]
def link_state(linkname): try: ret = subprocess.run(['ip', 'link', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, ipcontrol.parse(ret.stdout.decode('utf-8'))[str(linkname)]['state']] except subprocess.CalledProcessError as suberror: return [False, "get link state failed : %s" % suberror.stdout.decode('utf-8')]
def down_link(linkname): try: subprocess.run(['ip', 'link', 'set', 'dev', str(linkname), 'down'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, str(linkname)] except subprocess.CalledProcessError as suberror: return [False, "set link down failed : %s" % suberror.stdout.decode('utf-8')]
def add_addr(linkname, address): try: subprocess.run(['ip', 'address', 'add', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, str(linkname)] except subprocess.CalledProcessError as suberror: return [False, "add address failed : %s" % suberror.stdout.decode('utf-8')]
def list_bridges(): try: ret = subprocess.run(['ovs-vsctl', 'list-br'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, ret.stdout.decode('utf-8').split()] except subprocess.CalledProcessError as suberror: return [False, "list bridges failed : %s" % suberror.stdout.decode('utf-8')]
def bridge_exist(bridge): try: subprocess.run(['ovs-vsctl', 'br-exists', str(bridge)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return True except subprocess.CalledProcessError: return False
def add_bridge(bridge): try: subprocess.run(['ovs-vsctl', '--may-exist', 'add-br', str(bridge)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, str(bridge)] except subprocess.CalledProcessError as suberror: return [False, "add bridge failed : %s" % suberror.stdout.decode('utf-8')]
def del_bridge(bridge): try: subprocess.run(['ovs-vsctl', 'del-br', str(bridge)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, str(bridge)] except subprocess.CalledProcessError as suberror: return [False, "del bridge failed : %s" % suberror.stdout.decode('utf-8')]