def handle_file(self, fname): with open(fname, 'rb') as fd: if fd.read(4) == b'dex\n': new_jar = self.name + '/classes-dex2jar.jar' run([dex2jar, fname, '-f', '-o', new_jar], cwd=self.name, stderr=DEVNULL) fname = new_jar with ZipFile(fname) as jar: jar.extractall(self.name) for cls in jar.namelist(): if cls.endswith('.class'): cls = cls.replace('/', '.')[:-6] self.classes.append(cls) elif cls.endswith('.dex'): self.handle_file(self.name + '/' + cls) elif cls.endswith('.proto'): self.bonus_protos[cls] = jar.read(cls).decode('utf8') elif cls.endswith('.so'): self.bonus_protos.update(walk_binary(self.name + '/' + cls))
def run_cmake(self): print("Running CMake") build_dir_cmd_out = subprocess.call( ["mkdir", "build"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if build_dir_cmd_out != 0: print("Can\'t setup CMake build directory.") return if self.cmake_build_info["build_dir"].is_dir(): try: subprocess.check_output( self.cmake_cmd_info["cmake_cmd"], cwd=str(self.cmake_build_info["build_dir"])) except subprocess.CalledProcessError as e: print(e.output) if not self.cmake_build_info["comp_data_cmake"].is_file(): print("Couldn't setup CMake Project") return else: print("Couldn't setup CMake Project") return
def _openDownloadFile(self, buildId, suffix): (tmpFd, tmpName) = mkstemp() url = self._makeUrl(buildId, suffix) try: os.close(tmpFd) env = { k:v for (k,v) in os.environ.items() if k in self.__whiteList } env["BOB_LOCAL_ARTIFACT"] = tmpName env["BOB_REMOTE_ARTIFACT"] = url ret = subprocess.call(["/bin/bash", "-ec", self.__downloadCmd], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, cwd="/tmp", env=env) if ret == 0: ret = tmpName tmpName = None return CustomDownloader(ret) else: raise ArtifactDownloadError("failed (exit {})".format(ret)) finally: if tmpName is not None: os.unlink(tmpName)
def callGit(self, workspacePath, *args): cmdLine = ['git'] cmdLine.extend(args) try: output = subprocess.check_output(cmdLine, cwd=os.path.join(os.getcwd(), workspacePath, self.__dir), universal_newlines=True, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError as e: raise BuildError("git error:\n Directory: '{}'\n Command: '{}'\n'{}'".format( os.path.join(workspacePath, self.__dir), " ".join(cmdLine), e.output.rstrip())) return output # Get GitSCM status. The purpose of this function is to return the status of the given directory # # return values: # - error: The SCM is in a error state. Use this if git returned a error code. # - dirty: SCM is dirty. Could be: modified files, switched to another branch/tag/commit/repo, unpushed commits. # - clean: Same branch/tag/commit as specified in the recipe and no local changes. # - empty: Directory is not existing. # # This function is called when build with --clean-checkout. 'error' and 'dirty' SCMs are moved to attic, # while empty and clean directories are not.
def callSubversion(self, workspacePath, *args): cmdLine = ['svn'] cmdLine.extend(args) try: output = subprocess.check_output(cmdLine, cwd=workspacePath, universal_newlines=True, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError as e: raise BuildError("svn error:\n Directory: '{}'\n Command: '{}'\n'{}'".format( os.path.join(workspacePath, self.__dir), " ".join(cmdLine), e.output.rstrip())) return output # Get SvnSCM status. The purpose of this function is to return the status of the given directory # # return values: # - error: the scm is in a error state. Use this if svn call returns a error code. # - dirty: SCM is dirty. Could be: modified files, switched to another URL or revision # - clean: same URL and revision as specified in the recipe and no local changes. # - empty: directory is not existing # # This function is called when build with --clean-checkout. 'error' and 'dirty' scm's are moved to attic, # while empty and clean directories are not.
def version_getter(config): """Get tag associated with HEAD; fall back to SHA1. If HEAD is tagged, return the tag name; otherwise fall back to HEAD's short SHA1 hash. .. note:: Only annotated tags are considered. TODO: Support non-annotated tags? """ try: check_output(['git', 'rev-parse', '--is-inside-work-tree'], stderr=DEVNULL) except CalledProcessError: return None encoding = getpreferredencoding(do_setlocale=False) try: version = check_output(['git', 'describe', '--exact-match'], stderr=DEVNULL) except CalledProcessError: version = check_output(['git', 'rev-parse', '--short', 'HEAD']) version = version.decode(encoding).strip() return version
def _check_system(self): null_commands = self.commands + ["--help"] if six.PY2: not_found_exception = IOError else: not_found_exception = FileNotFoundError try: if six.PY2: subprocess.check_output(null_commands) else: subprocess.check_call(null_commands, stdout=subprocess.DEVNULL) except not_found_exception: raise RepositorySystemError("Cannot run {}".format(self.command)) except subprocess.CalledProcessError: raise RepositorySystemError( "Error running {}".format(self.command))
def _convert(self, command_path, input_path, settings=[], prefix='', fileformat='png'): if prefix: prefix = '{}-'.format(prefix) output_path = path.join( self.dest_pages, '{}%04d.{}'.format(prefix, fileformat) ) exit_code = subprocess.Popen( (command_path, *settings, input_path, output_path), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL ).wait() if exit_code != 0: raise ProcessingError( 'Was unable to convert document to {}s ({}).'.format(fileformat, exit_code) ) return glob.glob(path.join( self.dest_pages, '{}[0-9][0-9][0-9][0-9].{}'.format(prefix, fileformat) ))
def run_vis(self, config): """.""" self.poll_master_stdout(5, 'Waiting to receive', 'run_vis initialisation') pid = subprocess.Popen(['/usr/bin/python', '-m', 'sip.emulators.csp_visibility_sender', config], stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE) pid.wait() output = pid.communicate() if DEBUG: print('--------------------------') print('nvis_sender, stdout:') print('--------------------------') print(output[0].decode("utf-8")) print('--------------------------') print('nvis_sender, stderr:') print('--------------------------') print(output[1].decode("utf-8")) self.poll_master_stdout(5, 'Received heap 9', 'run_vis termination')
def getDeployedIps(deploymentName): deploymentCount = subprocess.check_output("bosh -e lite deployments | grep {} | wc -l".format(deploymentName), shell=True, stderr=subprocess.DEVNULL) if int(deploymentCount) == 0: return None deployedManifest = yaml.load( subprocess.check_output("bosh -e lite -d {} manifest".format(deploymentName), shell=True, stderr=subprocess.DEVNULL)) deployedIpConfig = {} deployedIpConfig["global"] = getTestSubnet(deployedManifest)["static"] deployedJobs = [j for j in deployedManifest["jobs"] if j["name"] in commonUtils.POOL_TYPES] for job in deployedJobs: jobIps = job["networks"][0]["static_ips"] deployedIpConfig[job["name"]] = jobIps return deployedIpConfig
def find_class_having_main(self, classes): for file in classes: # run javap(1) with type signatures try: stdout = subprocess.check_output( [self.extra_binaries['disassembler'].cmd, '-s', str(file)], stderr=subprocess.DEVNULL, env=self.compiler.env) except subprocess.SubprocessError: # noqa continue # iterate on lines to find p s v main() signature and then # its descriptor on the line below; we don't rely on the type # from the signature, because it could be String[], String... or # some other syntax I'm not even aware of lines = iter(stdout.decode().split('\n')) for line in lines: line = line.lstrip() if line.startswith('public static') and 'void main(' in line: if next(lines).lstrip() == PSVMAIN_DESCRIPTOR: return file.stem
def runTarsnap(args, timeout = None): command = [config.tarsnap_bin] + config.tarsnap_extra_args + args proc = subprocess.Popen(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE, stdin = subprocess.DEVNULL, universal_newlines = True) result = CheapoCompletedProcess() try: result.stdout, result.stderr = proc.communicate(timeout = timeout) result.returncode = proc.wait() if result.returncode: sys.exit("Error running tarsnap:\nCommand: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n".format(" ".join(command),result.stdout,result.stderr)) return result except subprocess.TimeoutExpired: print("Tarsnap timed out, sending SIGQUIT...") proc.send_signal(signal.SIGQUIT) result.stdout, result.stderr = proc.communicate() result.returncode = proc.wait() print("Tarsnap finished") if result.returncode: sys.exit("Error running tarsnap:\nCommand: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n".format(" ".join(command),result.stdout,result.stderr)) return result
def parse_text(self, img): filename = "obj_{}.png".format(id(self)) self.save_bounding_box(img, filename) cmd = [ 'tesseract', filename, 'stdout', '--psm', '7', '-l', 'eng+equ', '-c', 'tessedit_char_whitelist=abcdefghijklmnopqrstuvwxyz0123456789=+-*/' ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) self.text = result.stdout.decode('ascii').strip().lower() or ''
def running_old_sell(manager, installer): if any(installed is False for package, installed in installer.check_dependencies()): return False if isinstance(manager.machine, Two1MachineVirtual): try: vbox_conf = subprocess.check_output(["VBoxManage", "showvminfo", "21", "--machinereadable"], stderr=subprocess.DEVNULL) if type(vbox_conf) is bytes: vbox_conf = vbox_conf.decode() if "bridgeadapter3" in vbox_conf: return True else: return False except subprocess.CalledProcessError: return False else: return False
def _start_sell_service(self, service_name, failed_to_start_hook, started_hook, failed_to_up_hook, up_hook, timeout=Two1Composer.SERVICE_START_TIMEOUT): try: subprocess.check_output(["docker-compose", "-f", Two1Composer.COMPOSE_FILE, "up", "-d", service_name], stderr=subprocess.DEVNULL, env=self.machine_env) except subprocess.CalledProcessError: failed_to_start_hook(service_name) else: started_hook(service_name) if service_name == 'router': time.sleep(5) elif service_name != 'router' and service_name != 'base': start = time.clock() exec_id = self.docker_client.exec_create('sell_router', "curl %s:5000" % service_name)['Id'] self.docker_client.exec_start(exec_id) running = True while time.clock() - start < timeout and running is True: running = self.docker_client.exec_inspect(exec_id)['Running'] if running is True: failed_to_up_hook(service_name) else: up_hook(service_name)
def start_networking(self): """ Start ZeroTier daemon. """ if not self.status_networking(): try: subprocess.Popen(['sudo', 'service', 'zerotier-one', 'start'], stderr=subprocess.DEVNULL) except subprocess.CalledProcessError: return 1 else: now = time.time() while time.time() <= now + 60: if self.status_networking(): return 0 return 1 else: return 0
def status_networking(self): """ Get status of ZeroTier One service. """ try: if "ps_zerotier.sh" not in self.docker_ssh("ls", stderr=subprocess.DEVNULL).decode().split("\n"): subprocess.check_output(["docker-machine", "scp", os.path.join( os.path.dirname(os.path.abspath(__file__)), "util", "scripts", "ps_zerotier.sh"), "21:~/ps_zerotier.sh"]) self.docker_ssh("chmod a+x ~/ps_zerotier.sh", stderr=subprocess.DEVNULL) processes = self.docker_ssh("./ps_zerotier.sh", stderr=subprocess.DEVNULL).decode().split("\n") for process in processes: if process.find("zerotier-one -d") != -1: return True return False except subprocess.CalledProcessError: return False
def connect_market(self, client, market): try: zt_device_address = json.loads(self.docker_ssh("sudo ./zerotier-cli info -j", stderr=subprocess.DEVNULL).decode())["address"] response = client.join(market, zt_device_address) if response.ok: network_id = response.json().get("networkid") self.docker_ssh("sudo ./zerotier-cli join %s" % network_id, stderr=subprocess.DEVNULL) if self.wait_for_zt_confirmation(): pass except exceptions.ServerRequestError as e: if e.status_code == 400: logger.info(uxstring.UxString.invalid_network) else: raise e except subprocess.CalledProcessError as e: logger.info(str(e)) time.sleep(10) # wait for interface to come up return
def _get_market_address(self): """ Get status of 21mkt network connection. Returns: zt_ip (str): ZeroTier IP address. """ try: zt_conf = self.docker_ssh("sudo ./zerotier-cli listnetworks -j", stderr=subprocess.DEVNULL) if type(zt_conf) == bytes: zt_conf = zt_conf.decode() zt_conf_json = json.loads(zt_conf) for net in zt_conf_json: if net["name"] == "21mkt": if net["status"] == "OK": ip_addrs = net["assignedAddresses"] for addr in ip_addrs: potential_match = Two1Machine.ZEROTIER_CLI_IPV6_PATTERN.search(addr) if potential_match is not None: return "[%s]" % potential_match.group(0) return "" else: return "" return "" except Exception: return ""
def status_machine(self): """ Get status of virtual machine. """ try: status = subprocess.check_output(["docker-machine", "status", self.name], stderr=subprocess.DEVNULL).decode().rstrip() if status.lower() == "running": return VmState.RUNNING elif status.lower() == "stopped": return VmState.STOPPED else: return VmState.UNKNOWN except: return VmState.NOEXIST # private methods
def runRedisServer(port=6379): """runs redis-server""" # waits until it can accept client connection by reading its all # startup messages until it says 'ready to accept ...', then # redirect any following output to DEVNULL. port = str(port) server = subprocess.Popen(['redis-server', '--port', port], stdout=subprocess.PIPE, universal_newlines=True) message = _REDIS_READY_MESSAGE + port while message not in server.stdout.readline(): pass dumper = subprocess.Popen(['cat'], stdin=server.stdout, stdout=subprocess.DEVNULL) return server, dumper
def devserver(port): config.rc_root = 'http://localhost:{}'.format(port) config.rc_api_root = config.rc_root + '/api/v1' proc = subprocess.Popen( ['python', 'devserver/__init__.py'], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env={**os.environ, 'PORT': str(port)} ) # wait for the dev server to come up time.sleep(1) try: yield finally: proc.kill() proc.wait()
def clone(self, repo, branch="master"): location_index = repo+'#'+branch if self.repos.get(location_index) is not None: logger.debug("Repo %s already exists, reusing old." % location_index) return location = os.path.join(tempfile.gettempdir(), os.path.basename(location_index)) if os.path.exists(location): logger.debug("Repo %s with branch %s already exists on disk, reusing and pulling." % (repo, branch)) self.pull(location) else: logger.debug("Cloning %s with branch %s" % (repo, branch)) result = subprocess.run( ["git", "clone", "-b", branch, repo, location], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) result.check_returncode() self.repos[location_index] = location self.copies[location_index] = [] return location
def _instancecheck_impl(self, value, info: Info = NoInfo()): if not isinstance(value, List(Str())): return info.errormsg(self) if not is_perf_available(): return info.wrap(True) assert isinstance(value, list) if "wall-clock" in value: value = value.copy() value.remove("wall-clock") cmd = "perf stat -x ';' -e {props} -- /bin/echo".format(props=",".join(value)) proc = subprocess.Popen(["/bin/sh", "-c", cmd], stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, universal_newlines=True) out, err = proc.communicate() if proc.poll() > 0: return info.errormsg(self, "Not a valid properties list: " + str(err).split("\n")[0].strip()) return info.wrap(True)
def _exec(self, cmd: str, fail_on_error: bool = False, error_message: str = "Failed executing {cmd!r}: out={out!r}, err={err!r}", timeout: int = 10) -> bool: """ Execute the passed command. :param cmd: :param fail_on_error: :param error_message: error message format :param timeout: time in seconds after which the command is aborted :return: Was the command executed successfully? """ out_mode = subprocess.PIPE if fail_on_error else subprocess.DEVNULL proc = subprocess.Popen(["/bin/sh", "-c", cmd], stdout=out_mode, stderr=out_mode, universal_newlines=True) out, err = proc.communicate(timeout=timeout) if proc.wait() > 0: if fail_on_error: self._fail(error_message.format(cmd=cmd, out=out, err=err)) else: return False return True
def cpp_verify(data, user_file): answer = open(user_file).read() # write to temp file with open('answer.cpp', 'w') as temp: temp.write(answer) try: test = open('task/test.h').read() with open('test.h', 'w') as temp: temp.write(test) except IOError: # user has not requested task yet print("A task folder could not be found. Request a task\nusing python client.py request <task_id>") sys.exit(1) # terminate program proc = sp.Popen(['g++', 'answer.cpp', 'test.h', '-o', 'test'], stderr=sp.DEVNULL, stdout=sp.DEVNULL) proc.wait() if(proc.returncode): print("Answer failed to compile.") sys.exit(1) proc = sp.Popen(['./test'], stdin=sp.PIPE, stdout=sp.PIPE) test_result, _ = proc.communicate(input=data) # dispose of temporary files os.remove('answer.cpp') os.remove('test.h') os.remove('test') return test_result
def create_ssl_cert(): if PY3: from subprocess import DEVNULL else: DEVNULL = open(os.devnull, 'wb') try: ssl_exec_list = ['openssl', 'req', '-new', '-x509', '-keyout', ssl_cert_path, '-out', ssl_cert_path, '-days', '365', '-nodes', '-subj', '/CN=www.talhasch.com/O=Talhasch Inc./C=TR'] call(ssl_exec_list, stdout=DEVNULL, stderr=DEVNULL) except OpenSslExecutableNotFoundError: logging.error('openssl executable not found!') exit(1) logging.info('Self signed ssl certificate created at {}'.format(ssl_cert_path))
def _DEPREC_bash_run(cmd, suppress_err=0): if not U.f_exists('~/.bashrc'): print('WARNING: ~/.bashrc not found. ' 'bash_output() will not be able to read the aliases.', file=sys.stderr) # hack: if you don't echo something first, your alias such as `ll` will # kill the script after it's run. # https://stackoverflow.com/questions/45558993/subprocess-interactive-bash-behavior cmd = 'printf ""; ' + cmd err = pc.DEVNULL if suppress_err else pc.STDOUT proc = pc.check_output(['/bin/bash', '-i', '-c', cmd], stderr=err, ).decode() # try: # err = pc.DEVNULL if suppress_err else None # return pc.check_output(['/bin/bash', '-i', '-c', cmd], # stderr=err, # ).decode() # except pc.CalledProcessError as exc: # if suppress_err: # print('Failed bash call:\n', exc.output.decode(), file=sys.stderr) # raise
def __start(self, args): # build aireplay flags flags = args if self.DEAUTH_BSSID_FLAG not in flags: flags.extend([self.DEAUTH_BSSID_FLAG, self._bssid]) if '--ignore-negative-one' not in flags: flags.extend(["--ignore-negative-one"]) # build Popen command as list cmd = [self.AIREPLAY] + flags + [self._miff] print("\t" + " ".join(cmd)) self._proc = Popen(cmd,env={'PATH': os.environ['PATH']}, \ stderr=DEVNULL, stdin=DEVNULL, stdout=DEVNULL) return True
def cut_video(input_file: str, output_file: str, start_time: str, end_time: str, is_concat: bool = True): """ ???? :param input_file: ???? :param output_file: ???? :param start_time: ???? :param end_time: ???? :param is_concat: ?????Virtual Concatenation Script :return: """ ffmpeg_command = ['ffmpeg', '-y'] if is_concat: ffmpeg_command.extend(['-protocol_whitelist', 'file,pipe', '-safe', '0', '-f', 'concat', '-i', '-']) else: ffmpeg_command.extend(['-i', input_file]) ffmpeg_command.extend(['-ss', start_time, '-to', end_time, '-c', 'copy', output_file]) child = subprocess.Popen(ffmpeg_command, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL) child.communicate(input_file if is_concat else None)
def build_docker(settings, student): """ Creates a new docker container for the student """ LOGGER.info('Beginning a Docker build for student %s', student['username']) cmd = 'docker build -t {student}_{project} --file={dockerfile} .' cmd = cmd.format(student=student['username'], project=settings['project']['name'], dockerfile=settings['build']['dockerfile']) timeout = settings['build']['timeout'] LOGGER.info(cmd) subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True, timeout=timeout, cwd=student['directory']) LOGGER.info('Completed a Docker build for student %s', student['username'])
def clean_hg(settings, student): """ cleans a Mercurial repository with the latest submission """ LOGGER.info("Using hg to clean up the directory for student %s", student['username']) #Remove all untracked files and directories #Override all options to enable the purge extension to clean the directory cmd = "hg --config='extensions.purge=' purge --all --dirs --files" timeout = int(settings['clean']['timeout']) or 5 subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True, timeout=timeout, cwd=student['directory']) #Undo any changes to tracked files made since the last commit cmd = "hg update -C" timeout = int(settings['clean']['timeout']) or 5 subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True, timeout=timeout, cwd=student['directory'])
def docker_kill_and_remove(ctr_name): try: try: is_running = check_output(['docker', 'top', ctr_name], stderr=DEVNULL) except: pass else: check_output(['docker', 'kill', ctr_name], stderr=STDOUT) try: has_image = check_output(['docker', 'images', '-q', ctr_name], stderr=DEVNULL) assert has_image except: pass else: check_output(['docker', 'rm', ctr_name], stderr=STDOUT) except: logger.error('could not stop docker container:{}'.format(ctr_name))
def Handle_SquashFS(self, Key): Path = os.path.join(self.DestDir, Key) DestDir = Path.rstrip(".raw") + ".extracted" Binary = "unsquashfs" if self.CheckDependency(Binary): return 1 # Need root to preserve permissions. if self.Debug: self.Logger.debug(' '.join(["sudo", Binary, "-d", DestDir, Path])) Result = subprocess.call(["sudo", Binary, "-d", DestDir, Path]) else: Result = subprocess.call(["sudo", Binary, "-d", DestDir, Path], stdout=subprocess.DEVNULL) return Result
def Handle_CramFS(self, Key): Path = os.path.join(self.DestDir, Key) DestDir = Path.rstrip(".raw") + ".extracted" Binary = "cramfsck" if self.CheckDependency(Binary): return 1 # Need root to preserve permissions. if self.Debug: self.Logger.debug(' '.join(["sudo", Binary, "-x", DestDir, Path])) Result = subprocess.call(["sudo", Binary, "-x", DestDir, Path]) else: Result = subprocess.call(["sudo", Binary, "-x", DestDir, Path], stdout=subprocess.DEVNULL) return Result
def Handle_CramFS(self, Key): ExtractedDir = os.path.join(self.Source, Key + ".extracted") OrigPath = os.path.join(self.Source, Key + ".raw") DestPath = os.path.join(self.BuildDir, Key + ".raw") Binary = "mkcramfs" if self.CheckDependency(Binary): return 1 # Need root to access all files. if self.Debug: self.Logger.debug(' '.join(["sudo", Binary, ExtractedDir, DestPath])) Result = subprocess.call(["sudo", Binary, ExtractedDir, DestPath]) else: Result = subprocess.call(["sudo", Binary, ExtractedDir, DestPath], stdout=subprocess.DEVNULL) return Result
def _get_gitinfo(): import pygrunt.platform as platform import subprocess from pathlib import Path git = platform.current.find_executable('git') if git is None: # No git installed; assume we're on master return ('master', '') cwd = str(Path(__file__).parent) args = [git, 'rev-parse', '--abbrev-ref', 'HEAD'] result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True) if result.returncode != 0: # Quietly return defaults on fail return ('master', '') branch = result.stdout args = [git, 'rev-parse', 'HEAD'] result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True) if result.returncode != 0: # Quietly return defaults on fail return ('master', '') commit = result.stdout return (branch.strip(), commit.strip())
def removeDirtyDir(self): if self.cmake_build_info["build_dir"].is_dir(): print("Cleaning up Build Directory") subprocess.call( ["rm", "-rf", str(self.cmake_build_info["build_dir"])], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def removeOldCMakeFiles(self): if self.cmake_build_info["old_cmake_dir"].is_dir(): print("Cleaning up Old CMake Directory") subprocess.call( ["rm", "-rf", str(self.cmake_build_info["old_cmake_dir"])], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) for path in self.cmake_build_info["old_cmake_files"]: if path.is_file(): print("Cleaning up Old CMake Files") subprocess.call( ["rm", str(path)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def setup_rtags_daemon(self): print("Initializing RTags Daemon") try: subprocess.check_output( self.cmake_cmd_info["rtags_shutdwn"], stderr=subprocess.DEVNULL) except subprocess.CalledProcessError as e: print(e.output) try: subprocess.check_call( self.cmake_cmd_info["rdm_cmd"], stderr=subprocess.DEVNULL) except subprocess.CalledProcessError as e: print(e.output) raise
def connect_rtags_client(self): print("Connecting RTags Client") if self.cmake_build_info["comp_data_cmake"].is_file(): rtags_clt_cmd_out = subprocess.call( self.cmake_cmd_info["rc_cmd"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if rtags_clt_cmd_out != 0: print("Info: RTags Daemon Not Running") return else: print("Error Generating Compilation Database With CMake") return
def rtags_set_file(self, arg): current_buffer = arg try: subprocess.call( self.cmake_cmd_info["rtags_file"] + current_buffer, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError as e: print(e.output)
def update_rtags_buffers(self, args): buffers = args cpp_buffers = [] for buffer in buffers: if str(buffer)[-4:] in ['.cpp', '.cc', '.c', '.h', '.hpp']: cpp_buffers.append(buffer) try: subprocess.call( self.cmake_cmd_info["rtags_buffer"] + cpp_buffers, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError as e: print(e.output) #@classmethod
def rtags_tagrun(self, cmd): try: tagrun = subprocess.check_output(cmd, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError as e: print(e.output) taglines = tagrun.split("\n") tags = map(split("\t"), taglines) tags[:] = map(reverse(), tags) tags[:] = map(self.format_rtags, tags) return tags
def __exit__(self, exc_type, exc_value, traceback): try: if exc_type is None: env = { k:v for (k,v) in os.environ.items() if k in self.whiteList } env["BOB_LOCAL_ARTIFACT"] = self.name env["BOB_REMOTE_ARTIFACT"] = self.remoteName ret = subprocess.call(["/bin/bash", "-ec", self.uploadCmd], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, cwd="/tmp", env=env) if ret != 0: raise ArtifactUploadError("command return with status {}".format(ret)) finally: os.unlink(self.name) return False
def predictLiveBuildId(self): if self.__commit: return [ bytes.fromhex(self.__commit) ] if self.__tag: # Annotated tags are objects themselves. We need the commit object! refs = ["refs/tags/" + self.__tag + '^{}', "refs/tags/" + self.__tag] else: refs = ["refs/heads/" + self.__branch] cmdLine = ['git', 'ls-remote', self.__url] + refs try: output = subprocess.check_output(cmdLine, universal_newlines=True, stderr=subprocess.DEVNULL).strip() except subprocess.CalledProcessError as e: return [None] # have we found anything at all? if not output: return [None] # See if we got one of our intended refs. Git is generating lines with # the following format: # # <sha1>\t<refname> # # Put the output into a dict with the refname as key. Be extra careful # and strip out lines not matching this pattern. output = { commitAndRef[1].strip() : bytes.fromhex(commitAndRef[0].strip()) for commitAndRef in (line.split('\t') for line in output.split('\n')) if len(commitAndRef) == 2 } for ref in refs: if ref in output: return [output[ref]] # uhh, should not happen... return [None]
def load_mirror_index(self, remote_mirror): # See if there is a mirror index available from the BASE_URL mirror_index = os.path.join(self.conf_dir, 'mirror-index') try: cmd = [self.tools['git'], 'ls-remote', remote_mirror, self.base_branch] utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=self.project_dir, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) except: try: remote_mirror += "/.git" cmd = [self.tools['git'], 'ls-remote', remote_mirror, self.base_branch] utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=self.project_dir, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) except: # No mirror, return return None logger.plain('Loading the mirror index from %s (%s)...' % (remote_mirror, self.base_branch)) # This MIGHT be a valid mirror.. if not os.path.exists(mirror_index): os.makedirs(mirror_index) cmd = [self.tools['git'], 'init' ] utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=mirror_index) try: cmd = [self.tools['git'], 'fetch', '-f', '-n', '-u', remote_mirror, self.base_branch + ':' + self.base_branch] utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=mirror_index) except: # Could not fetch, return return None logger.debug('Found mirrored index.') cmd = [self.tools['git'], 'checkout', self.base_branch ] utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=mirror_index) cmd = [self.tools['git'], 'reset', '--hard' ] utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=mirror_index) return mirror_index
def exec_cmd(bus, *cmd_line): """ Executes an external process using the given arguments. Note that the backslash (`\`) character must be escaped, because they are within a Python string, so a total of 2 backslashes to equal 1 real backslash. For example, to run a command prompt with fancy colors and start it in the root of the file system when you press <kbd>F1</kbd>, use:
'f1 => cmd cmd.exe /c start cmd.exe /E:ON /V:ON /T:17 /K cd \\' ``` :param bus: :param cmd_line: The command-line string equivalent to run. :return: """ # The cmd_line is a space-split set of arguments. Because we're running a # command line, and should allow all kinds of inputs, we'll join it back # together. full_cmd = " ".join(cmd_line) # We won't use shlex, because, for Windows, splitting is just unnecessary. print('CMD Running `' + full_cmd + '`') # TODO look at making this a bus command, or at least a bus announcement. proc = subprocess.Popen(full_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) bus.fire(event_ids.LOG__INFO, target_ids.LOGGER, { 'message': '{0}: {1}'.format(proc.pid, full_cmd) })
def get_last_update(self, path): os.chdir(path) cmd = "stat -c %y .git/FETCH_HEAD".split() return str(subprocess.check_output(cmd, stderr=subprocess.DEVNULL))[2:21]
def git_version(): '''Return the git revision as a string Returns ------- git_version : str The current git revision ''' def _minimal_ext_cmd(cmd): # construct minimal environment env = {} for k in ['SYSTEMROOT', 'PATH']: v = os.environ.get(k) if v is not None: env[k] = v # LANGUAGE is used on win32 env['LANGUAGE'] = 'C' env['LANG'] = 'C' env['LC_ALL'] = 'C' output = subprocess.check_output(cmd, stderr=subprocess.DEVNULL, env=env) return output try: out = _minimal_ext_cmd(['git', 'rev-parse', '--verify', '--quiet', '--short', 'HEAD']) GIT_REVISION = out.strip().decode('ascii') except subprocess.CalledProcessError: GIT_REVISION = 'UNKNOWN' return GIT_REVISION