我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用subprocess.SubprocessError()。
def kill(self, app_id): """ Method to kill a yarn application Parameters ---------- app_id: str YARN application id Returns ------- bool: True if successful, False otherwise. """ cmd = ["yarn", "application", "-kill", app_id] try: out = shell_out(cmd, stderr=STDOUT) return "Killed application" in out except SubprocessError: return False
def find_class_having_main(self, classes): for file in classes: # run javap(1) with type signatures try: stdout = subprocess.check_output( [self.extra_binaries['disassembler'].cmd, '-s', str(file)], stderr=subprocess.DEVNULL, env=self.compiler.env) except subprocess.SubprocessError: # noqa continue # iterate on lines to find p s v main() signature and then # its descriptor on the line below; we don't rely on the type # from the signature, because it could be String[], String... or # some other syntax I'm not even aware of lines = iter(stdout.decode().split('\n')) for line in lines: line = line.lstrip() if line.startswith('public static') and 'void main(' in line: if next(lines).lstrip() == PSVMAIN_DESCRIPTOR: return file.stem
def adb_start_server_safe(): ''' checks if `adb server` is running. if not, starts it. ''' try: status = subprocess.check_output(['pidof', ADB]) util.debug_print('adb already running in PID: ' + status.decode(), flag=PRINT_FLAG) return True except subprocess.CalledProcessError as exception: print('adb is not running, returned status: ' + str(exception.returncode)) print('adb was not started. starting...') try: subprocess.check_output([ADB, 'start-server']) return True except subprocess.SubprocessError as exception: print( 'something disastrous happened. maybe ' + ADB + ' was not found') return False
def _parse_rtf_protocol(self, committee_id, meeting_id, bucket, protocol_object_name, parts_object_name, text_object_name): # currently with the new API - we don't seem to get rtf files anymore # it looks like files which used to be rtf are actually doc # need to investigate further return False # rtf_extractor = os.environ.get("RTF_EXTRACTOR_BIN") # if rtf_extractor: # with object_storage.temp_download(protocol_object_name) as protocol_filename: # with tempfile.NamedTemporaryFile() as text_filename: # cmd = rtf_extractor + ' ' + protocol_filename + ' ' + text_filename # try: # subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True) # protocol_text = fs.read(text_filename) # with CommitteeMeetingProtocol.get_from_text(protocol_text) as protocol: # self._parse_protocol_parts(parts_filename, protocol) # except subprocess.SubprocessError: # logging.exception("committee {} meeting {}: failed to parse rtf file, skipping".format(committee_id, # meeting_id)) # return False # return True # else: # logging.warning("missing RTF_EXTRACTOR_BIN environment variable, skipping rtf parsing") # return False
def _spawn_ffmpeg(self): if self.streaming: url = self._stream_url elif self.playing: url = self._song_context.song_url else: raise RuntimeError('Player is in an invalid state') args = shlex.split(self._ffmpeg_command.format(shlex.quote(url))) try: self._ffmpeg = subprocess.Popen(args) except FileNotFoundError as e: raise RuntimeError('ffmpeg executable was not found') from e except subprocess.SubprocessError as e: raise RuntimeError('Popen failed: {0.__name__} {1}'.format(type(e), str(e))) from e # # Player FSM #
def _terminate_ffmpeg(self): proc = self._current_proc if not proc: return _logger.info('Terminating ffmpeg...') try: proc.terminate() proc.wait(5) except (OSError, subprocess.SubprocessError): _logger.exception('Terminate ffmpeg') try: proc.kill() except (OSError, subprocess.SubprocessError): pass self._current_proc = None
def verify_s3_bucket_exists(s3_bucket_name): """ Check whether S3 bucket exists Args: s3_bucket_name (str): The s3 bucket name Returns: list: if connection established and bucket found, return list of objects in bucket otherwise error and exit on any issues trying to list objects in bucket. """ ls_s3_bucket_cmd = 'aws s3api head-bucket --bucket {}'.format(s3_bucket_name) try: subprocess.run(ls_s3_bucket_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except subprocess.SubprocessError: logger.exception("Failed to list specified s3 bucket: {}", s3_bucket_name) sys.exit("[-] Failed to list specified s3 bucket")
def test_surrogates_error_message(self): def prepare(): raise ValueError("surrogate:\uDCff") try: subprocess.call( [sys.executable, "-c", "pass"], preexec_fn=prepare) except ValueError as err: # Pure Python implementations keeps the message self.assertIsNone(subprocess._posixsubprocess) self.assertEqual(str(err), "surrogate:\uDCff") except subprocess.SubprocessError as err: # _posixsubprocess uses a default message self.assertIsNotNone(subprocess._posixsubprocess) self.assertEqual(str(err), "Exception occurred in preexec_fn.") else: self.fail("Expected ValueError or subprocess.SubprocessError")
def install_package(package, upgrade=True, target=None): """Install a package on PyPi. Accepts pip compatible package strings. Return boolean if install successful. """ # Not using 'import pip; pip.main([])' because it breaks the logger with INSTALL_LOCK: if check_package_exists(package, target): return True _LOGGER.info('Attempting install of %s', package) args = [sys.executable, '-m', 'pip', 'install', '--quiet', package] if upgrade: args.append('--upgrade') if target: args += ['--target', os.path.abspath(target)] try: return subprocess.call(args) == 0 except subprocess.SubprocessError: _LOGGER.exception('Unable to install pacakge %s', package) return False
def install_package(package: str, upgrade: bool=True, target: Optional[str]=None) -> bool: """Install a package on PyPi. Accepts pip compatible package strings. Return boolean if install successful. """ # Not using 'import pip; pip.main([])' because it breaks the logger with INSTALL_LOCK: if check_package_exists(package, target): return True _LOGGER.info('Attempting install of %s', package) args = [sys.executable, '-m', 'pip', 'install', '--quiet', package] if upgrade: args.append('--upgrade') if target: args += ['--target', os.path.abspath(target)] try: return subprocess.call(args) == 0 except subprocess.SubprocessError: _LOGGER.exception('Unable to install pacakge %s', package) return False
def makeblastdb(fasta, database_name): with SequenceReader(fasta) as fr: sequences = list(fr) if not sequences: raise ValueError("FASTA file {} is empty".format(fasta)) process_output = subprocess.check_output( ['makeblastdb', '-parse_seqids', '-dbtype', 'nucl', '-in', fasta, '-out', database_name], stderr=subprocess.STDOUT ) if b'Error: ' in process_output: raise subprocess.SubprocessError()
def is_cpython_repo(): cmd = "git log -r 7f777ed95a19224294949e1b4ce56bbffcb1fe9f" try: subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT) except subprocess.SubprocessError: return False return True
def adb_install_apk(emulator: Emulator, apk: Apk): ''' installs provided apk to specified emulator ''' util.check_file_directory_exists(apk.apk_path, True) try: result = subprocess.check_output( [config.adb, '-s', 'emulator-' + emulator.port, 'install', apk.apk_path]).decode() util.debug_print(result, flag=PRINT_FLAG) except subprocess.SubprocessError as error: print(error) raise ValueError("error installing.")
def adb_uninstall_package(emulator: Emulator, package: str): ''' uninstalls the provided package if only one entry with the specified package is found. ''' # if adb_is_package_present(emulator, package) is not 1: # raise ValueError("Package not found / Too generic.") try: result = subprocess.check_output( [config.adb, '-s', 'emulator-' + emulator.port, 'uninstall', package]) print("uninstalled " + package) except subprocess.SubprocessError as error: print("maybe not found/uninstalled already")
def adb_pidof_app(emulator: Emulator, apk: Apk): ''' returns PID of running apk ''' try: result = subprocess.check_output( [config.adb, '-s', 'emulator-' + emulator.port, 'shell', 'pidof', apk.package_name]) result = result.decode().split('\n')[0] util.debug_print(result, flag=PRINT_FLAG) return result except subprocess.SubprocessError: print("maybe not found/uninstalled already")
def reset_all(self): ''' defaults to settings. accelerometer_rotation on airplane_mode_off user_rotation POTRAIT ''' self.set_accelerometer_rotation(False) self.set_user_rotation(UserRotation.ROTATION_POTRAIT) self.set_accelerometer_rotation(True) # def adb_start_server_safe(self): # ''' # checks if `adb server` is running. if not, starts it. # ''' # try: # status = subprocess.check_output(['pidof', ADB]) # print('adb already running in PID: ' + status.decode()) # return True # except subprocess.CalledProcessError as exception: # print('adb is not running, returned status: ' + # str(exception.returncode)) # print('adb was not started. starting...') # try: # subprocess.check_output([ADB, 'start-server']) # return True # except subprocess.SubprocessError as exception: # print('something disastrous happened. maybe ' + # ADB + ' was not found') # return False
def create_ffmpeg_player(self, filename, *, use_avconv=False, pipe=False, options=None, before_options=None, headers=None, after=None): """ Stolen from Rapptz/Danny, thanks! """ command = 'ffmpeg' if not use_avconv else 'avconv' input_name = '-' if pipe else shlex.quote(filename) before_args = "" if isinstance(headers, dict): for key, value in headers.items(): before_args += "{}: {}\r\n".format(key, value) before_args = ' -headers ' + shlex.quote(before_args) if isinstance(before_options, str): before_args += ' ' + before_options cmd = command + '{} -i {} -f s16le -ar {} -ac {} -loglevel warning' cmd = cmd.format(before_args, input_name, self.encoder.sampling_rate, self.encoder.channels) if isinstance(options, str): cmd = cmd + ' ' + options cmd += ' pipe:1' stdin = None if not pipe else filename args = shlex.split(cmd) try: p = subprocess.Popen(args, stdin=stdin, stdout=subprocess.PIPE) return ProcessPlayer(p, self, after) except FileNotFoundError as e: raise ClientException('ffmpeg/avconv was not found in your PATH' ' environment variable') from e except subprocess.SubprocessError as e: raise ClientException( 'Popen failed: {0.__name__} {1}'.format(type(e), str(e))) \ from e
def lint(context): """Looks for errors in source code of your blog""" config = context.obj try: run('flake8 {dir} --exclude={exclude}'.format( dir=config['CWD'], exclude=','.join(EXCLUDE), )) except SubprocessError: context.exit(1)
def _parse_doc_protocol(self, committee_id, meeting_id, bucket, protocol_object_name, parts_object_name, text_object_name): logging.info("parsing doc protocol {} --> {}, {}".format(protocol_object_name, parts_object_name, text_object_name)) with object_storage.temp_download(self.s3, bucket, protocol_object_name) as protocol_filename: try: with CommitteeMeetingProtocol.get_from_filename(protocol_filename) as protocol: object_storage.write(self.s3, bucket, text_object_name, protocol.text, public_bucket=True) self._parse_protocol_parts(bucket, parts_object_name, protocol) except ( AntiwordException, # see https://github.com/hasadna/knesset-data-pipelines/issues/15 subprocess.SubprocessError, xml.etree.ElementTree.ParseError # see https://github.com/hasadna/knesset-data-pipelines/issues/32 ): logging.exception("committee {} meeting {}: failed to parse doc file, skipping".format(committee_id, meeting_id)) return False return True
def _subprocess_called_process_error(already_exist, subprocess_lib): if already_exist: class ExtCalledProcessError(subprocess_lib.CalledProcessError): """Raised when a process run by check_call() or check_output() returns a non-zero exit status.""" def __init__(self, returncode, cmd, output=None, stderr=None): try: super(ExtCalledProcessError, self).__init__(returncode=returncode, cmd=cmd, output=output, stderr=stderr) except TypeError: try: super(ExtCalledProcessError, self).__init__(returncode=returncode, cmd=cmd, output=output) except TypeError: super(ExtCalledProcessError, self).__init__(returncode=returncode, cmd=cmd) self.output = output self.stdout = output self.stderr = stderr _InternalReferences.UsedCalledProcessError = ExtCalledProcessError else: class CalledProcessError(subprocess_lib.SubprocessError): """Raised when a process run by check_call() or check_output() returns a non-zero exit status.""" def __init__(self, returncode, cmd, output=None, stderr=None): subprocess_lib.SubprocessError.__init__(self, "Command '" + str(cmd) + "' returned non-zero exit status " + str(returncode)) self.returncode = returncode self.cmd = cmd self.output = output self.stdout = output self.stderr = stderr _InternalReferences.UsedCalledProcessError = CalledProcessError # API
def fix_subprocess(override_debug=False, override_exception=False): """Activate the subprocess compatibility.""" import subprocess # Exceptions if subprocess.__dict__.get("SubprocessError") is None: subprocess.SubprocessError = _Internal.SubprocessError if _InternalReferences.UsedCalledProcessError is None: if "CalledProcessError" in subprocess.__dict__: _subprocess_called_process_error(True, subprocess) else: _subprocess_called_process_error(False, subprocess) subprocess.CalledProcessError = _InternalReferences.UsedCalledProcessError def _check_output(*args, **kwargs): if "stdout" in kwargs: raise ValueError("stdout argument not allowed, " "it will be overridden.") process = subprocess.Popen(stdout=subprocess.PIPE, *args, **kwargs) stdout_data, __ = process.communicate() ret_code = process.poll() if ret_code is None: raise RuntimeWarning("The process is not yet terminated.") if ret_code: cmd = kwargs.get("args") if cmd is None: cmd = args[0] raise _InternalReferences.UsedCalledProcessError(returncode=ret_code, cmd=cmd, output=stdout_data) return stdout_data try: subprocess.check_output except AttributeError: subprocess.check_output = _check_output
def __init__(self, filename, max_events=0): if not os.path.exists("/proc/iotracer"): raise AssertionError('iotracer kernel module is not loaded') self._filename = filename cmd = 'readlink -e $(df --output=source ' + filename + ' | tail -1)' cmd += ' | cut -d/ -f3' try: out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL, universal_newlines=True, shell=True) except subprocess.SubprocessError: print('fail to get block device for %s' % filename) raise else: bdev = out.rstrip(b'\n') try: ino = os.stat(filename).st_ino with open('/proc/iotracer/control', 'w') as fctl: cmd = 'add %s' % self._filename if max_events: cmd += ' %s' % max_events print(cmd, file=fctl) except OSError: print('fail to add %s to iotracer monitoring' % filename) raise else: self._procdir = '/proc/iotracer/%s_%s' % (bdev, ino) IoTracerLog.__init__(self, self._procdir + '/log')
def sync_local_to_s3(local_video_records_done_folder, s3_bucket_name, s3_sync_result_file): """ Sync local files to specified S3 bucket Args: local_video_records_done_folder (str): local folder containing video files ready to be copied to S3. s3_bucket_name (str): s3 bucket name """ s3_sync_cmd = 'aws s3 sync {} s3://{} > "{}"'.format(local_video_records_done_folder, s3_bucket_name, s3_sync_result_file) try: cmd_output = subprocess.run(s3_sync_cmd, check=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except subprocess.SubprocessError as err: logger.exception("Failed to sync local files to s3 bucket") notify_slack_channel(f"Sync failed: *{computer_name}* \n `{err}`") sys.exit("[-] Failed to sync local files to s3 bucket") logger.info("S3 sync successfully ran: {}", cmd_output) notify_slack_channel(f"Sync succeeded on: *{computer_name}* \n `str({cmd_output})`") logger.info("Syncing complete")
def git_cmd(self, args): git_args = ['git'] + args try: res = subprocess.run( git_args, stdout=subprocess.PIPE, ) if res.returncode != 0: err('%s command exited with error: %s', ' '.join(git_args), res.returncode) return str(res.stdout, 'utf-8').strip() except FileNotFoundError as e: err('Cannot find git command: %s', e) except subprocess.SubprocessError as e: err('Error running git command: %s', e)
def test_preexec_exception(self): def raise_it(): raise ValueError("What if two swallows carried a coconut?") try: p = subprocess.Popen([sys.executable, "-c", ""], preexec_fn=raise_it) except subprocess.SubprocessError as e: self.assertTrue( subprocess._posixsubprocess, "Expected a ValueError from the preexec_fn") except ValueError as e: self.assertIn("coconut", e.args[0]) else: self.fail("Exception raised by preexec_fn did not make it " "to the parent process.")
def test_preexec_errpipe_does_not_double_close_pipes(self): """Issue16140: Don't double close pipes on preexec error.""" def raise_it(): raise subprocess.SubprocessError( "force the _execute_child() errpipe_data path.") with self.assertRaises(subprocess.SubprocessError): self._TestExecuteChildPopen( self, [sys.executable, "-c", "pass"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=raise_it)
def test_install_error(self, mock_sys, mock_logger, mock_exists, mock_subprocess): """Test an install with a target.""" mock_exists.return_value = False mock_subprocess.side_effect = [subprocess.SubprocessError] self.assertFalse(package.install_package(TEST_NEW_REQ)) self.assertEqual(mock_logger.exception.call_count, 1)
def send_message(self, message="", **kwargs): """Send a message to a command line.""" try: proc = subprocess.Popen(self.command, universal_newlines=True, stdin=subprocess.PIPE, shell=True) proc.communicate(input=message) if proc.returncode != 0: _LOGGER.error('Command failed: %s', self.command) except subprocess.SubprocessError: _LOGGER.error('Error trying to exec Command: %s', self.command)
def run(self): for s in self.test_suites: script = os.path.join(s, 'run.py') start_time = time.time() manifest = self.load_manifest(s) os.chdir(s) info("Running tests from {0}".format(s)) args = [e('${venvdir}/bin/python'), script] test = None try: test = subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True ) test.wait(timeout=manifest['timeout']) except subprocess.TimeoutExpired as err: self.generate_suite_error( os.path.join(s, 'results.xml'), manifest['name'], time.time() - start_time, 'Test timeout reached', err ) except subprocess.SubprocessError as err: self.generate_suite_error( os.path.join(s, 'results.xml'), manifest['name'], time.time() - start_time, 'Test could not be started', err ) out, err = test.communicate() if test and test.returncode: self.generate_suite_error( os.path.join(s, 'results.xml'), manifest['name'], time.time() - start_time, 'Test process has returned an error', out ) info("{0} error:".format(script)) print(out.decode('utf-8'))
def setup(hass, config): """Setup the shell_command component.""" conf = config.get(DOMAIN, {}) cache = {} def service_handler(call): """Execute a shell command service.""" cmd = conf[call.service] if cmd in cache: prog, args, args_compiled = cache[cmd] elif ' ' not in cmd: prog = cmd args = None args_compiled = None cache[cmd] = prog, args, args_compiled else: prog, args = cmd.split(' ', 1) args_compiled = template.Template(args, hass) cache[cmd] = prog, args, args_compiled if args_compiled: try: rendered_args = args_compiled.render(call.data) except TemplateError as ex: _LOGGER.exception('Error rendering command template: %s', ex) return else: rendered_args = None if rendered_args == args: # no template used. default behavior shell = True else: # template used. Break into list and use shell=False for security cmd = [prog] + shlex.split(rendered_args) shell = False try: subprocess.call(cmd, shell=shell, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.SubprocessError: _LOGGER.exception('Error running command: %s', cmd) for name in conf.keys(): hass.services.register(DOMAIN, name, service_handler) return True