我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用os.devnull()。
def convert_image(inpath, outpath, size): """Convert an image file using `sips`. Args: inpath (str): Path of source file. outpath (str): Path to destination file. size (int): Width and height of destination image in pixels. Raises: RuntimeError: Raised if `sips` exits with non-zero status. """ cmd = [ b'sips', b'-z', b'{0}'.format(size), b'{0}'.format(size), inpath, b'--out', outpath] # log().debug(cmd) with open(os.devnull, 'w') as pipe: retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT) if retcode != 0: raise RuntimeError('sips exited with {0}'.format(retcode))
def test_err_in_fun(self): # Test that the original signal this process was hit with # is not returned in case fun raise an exception. Instead, # we're supposed to see retsig = 1. ret = pyrun(textwrap.dedent( """ import os, signal, imp, sys mod = imp.load_source("mod", r"{}") def foo(): sys.stderr = os.devnull 1 / 0 sig = signal.SIGTERM if os.name == 'posix' else \ signal.CTRL_C_EVENT mod.register_exit_fun(foo) os.kill(os.getpid(), sig) """.format(os.path.abspath(__file__), TESTFN) )) if POSIX: self.assertEqual(ret, 1) assert ret != signal.SIGTERM, strfsig(ret)
def epubcheck_help(): """Return epubcheck.jar commandline help text. :return unicode: helptext from epubcheck.jar """ # tc = locale.getdefaultlocale()[1] with open(os.devnull, "w") as devnull: p = subprocess.Popen( [c.JAVA, '-Duser.language=en', '-jar', c.EPUBCHECK, '-h'], stdout=subprocess.PIPE, stderr=devnull, ) result = p.communicate()[0] return result.decode()
def pipe_weighted_edgelist_to_convert(matrix, bin_filename, weight_filename): """ Pipe a weighted edgelist (COO sparse matrix) to Louvain's convert utility """ raise ValueError('Unsupported method at the moment') devnull = open(os.devnull, 'w') proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH, '-i', '/dev/stdin', '-o', bin_filename, '-w', weight_filename, ], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull) # Stream text triplets to 'convert' for ijx in itertools.izip(matrix.row, matrix.col, matrix.data): proc.stdin.write('%d\t%d\t%f\n' % ijx) proc.stdin.close() proc.wait() devnull.close()
def pipe_unweighted_edgelist_to_convert(matrix, bin_filename): """ Pipe an unweighted edgelist (COO sparse matrix) to Louvain's convert utility """ devnull = open(os.devnull, 'w') proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH, '-i', '/dev/stdin', '-o', bin_filename, ], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull) # Stream text triplets to 'convert' for ij in itertools.izip(matrix.row, matrix.col): proc.stdin.write('%d\t%d\n' % ij) proc.stdin.close() proc.wait() devnull.close()
def get_audio_streams(self): with open(os.devnull, 'w') as DEV_NULL: #Get file info and Parse it try: proc = subprocess.Popen([ FFPROBE, '-i', self.input_video, '-of', 'json', '-show_streams' ], stdout=subprocess.PIPE, stderr=DEV_NULL) except OSError as e: if e.errno == os.errno.ENOENT: Logger.error("FFPROBE not found, install on your system to use this script") sys.exit(0) output = proc.stdout.read() return get_audio_streams(json.loads(output))
def launch(self, cfg, path, flags): logging.debug("Determine the OS and Architecture this application is currently running on") hostOS = platform.system().lower() logging.debug("hostOS: " + str(hostOS)) is_64bits = sys.maxsize > 2 ** 32 if is_64bits: hostArchitecture = 'x64' else: hostArchitecture = 'ia32' logging.debug("hostArchitecture: " + str(hostArchitecture)) if(self.validateConfig(cfg, hostOS, hostArchitecture)): fnull = open(os.devnull, 'w') if os.environ.get("WPW_HOME") is not None: cmd = [os.environ["WPW_HOME"] + '/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()] else: cmd = [path + '/wpwithinpy/iot-core-component/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()] cmd.extend(flags) proc = subprocess.Popen(cmd, stdin=None, stdout=fnull, stderr=subprocess.STDOUT) return proc else: logging.debug("Invalid OS/Architecture combination detected")
def win64_available(self): wine_bin, \ wineserver_bin, \ wine_lib = self.get_wine_bin_path() dev_null = open(os.devnull, 'w') try: proc = subprocess.Popen([wine_bin + '64'], stdout=dev_null, \ stdin=subprocess.PIPE, stderr=subprocess.STDOUT) dev_null.close() stdoutdata, stderrdata = proc.communicate() if proc.returncode == 1: self.combobox_winearch.set_visible(True) return True else: self.combobox_winearch.set_visible(False) self.winearch = 'win32' return False except: self.combobox_winearch.set_visible(False) self.winearch = 'win32' return False
def isDaemonAlive(hostAndPort="{0}:{1}".format(IPFSAPI_IP, IPFSAPI_PORT)): """Ensure that the IPFS daemon is running via HTTP before proceeding""" client = ipfsapi.Client(IPFSAPI_IP, IPFSAPI_PORT) try: # OSError if ipfs not installed, redundant of below # subprocess.call(['ipfs', '--version'], stdout=open(devnull, 'wb')) # ConnectionError/AttributeError if IPFS daemon not running client.id() return True except (ConnectionError, exceptions.AttributeError): logError("Daemon is not running at http://" + hostAndPort) return False except OSError: logError("IPFS is likely not installed. " "See https://ipfs.io/docs/install/") sys.exit() except: logError('Unknown error in retrieving daemon status') logError(sys.exc_info()[0])
def _query(self): """ Returns the value of deepsea_minions """ # When search matches no minions, salt prints to stdout. # Suppress stdout. _stdout = sys.stdout sys.stdout = open(os.devnull, 'w') # Relying on side effect - pylint: disable=unused-variable ret = self.local.cmd('*', 'saltutil.pillar_refresh') minions = self.local.cmd('*', 'pillar.get', ['deepsea_minions'], expr_form="compound") sys.stdout = _stdout for minion in minions: if minions[minion]: return minions[minion] log.error("deepsea_minions is not set") return []
def _matches(self): """ Returns the list of matched minions """ if self.deepsea_minions: # When search matches no minions, salt prints to stdout. # Suppress stdout. _stdout = sys.stdout sys.stdout = open(os.devnull, 'w') result = self.local.cmd(self.deepsea_minions, 'pillar.get', ['id'], expr_form="compound") sys.stdout = _stdout return result.keys() return []
def test_exception(self): # Make sure handler fun is called on exception. ret = pyrun(textwrap.dedent( """ import os, imp, sys mod = imp.load_source("mod", r"{}") def foo(): with open(r"{}", "ab") as f: f.write(b"1") mod.register_exit_fun(foo) sys.stderr = os.devnull 1 / 0 """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 1) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"1")
def test_kinterrupt(self): # Simulates CTRL + C and make sure the exit function is called. ret = pyrun(textwrap.dedent( """ import os, imp, sys mod = imp.load_source("mod", r"{}") def foo(): with open(r"{}", "ab") as f: f.write(b"1") mod.register_exit_fun(foo) sys.stderr = os.devnull raise KeyboardInterrupt """.format(os.path.abspath(__file__), TESTFN) )) self.assertEqual(ret, 1) with open(TESTFN, "rb") as f: self.assertEqual(f.read(), b"1")
def __init__(self, raw_email, debug=False): ''' Setup the base options of the copy/convert setup ''' self.raw_email = raw_email self.log_processing = StringIO() self.log_content = StringIO() self.tree(self.raw_email) twiggy_out = outputs.StreamOutput(formats.shell_format, stream=self.log_processing) emitters['*'] = filters.Emitter(levels.DEBUG, True, twiggy_out) self.log_name = log.name('files') self.cur_attachment = None self.debug = debug if self.debug: if not os.path.exists('debug_logs'): os.makedirs('debug_logs') self.log_debug_err = os.path.join('debug_logs', 'debug_stderr.log') self.log_debug_out = os.path.join('debug_logs', 'debug_stdout.log') else: self.log_debug_err = os.devnull self.log_debug_out = os.devnull
def call(self, args, devnull=False): """Call other processes. args - list of command args devnull - whether to pipe stdout to /dev/null (or equivalent) """ if self.debug: click.echo(subprocess.list2cmdline(args)) click.confirm('Continue?', default=True, abort=True) try: kwargs = {} if devnull: # Pipe to /dev/null (or equivalent). kwargs['stderr'] = subprocess.STDOUT kwargs['stdout'] = self.FNULL ret_code = subprocess.call(args, **kwargs) except subprocess.CalledProcessError: return False return ret_code
def _check_ssl_cert(self, cert, key): # Check SSL-Certificate with openssl, if possible try: exit_code = subprocess.call( ["openssl", "x509", "-text", "-noout", "-in", cert], stdout=open(os.devnull, 'wb'), stderr=subprocess.STDOUT) except OSError: exit_code = 0 if exit_code is 0: try: self.httpd.socket = ssl.wrap_socket( self.httpd.socket, certfile=cert, keyfile=key, server_side=True) except ssl.SSLError as error: self.logger.exception('Failed to init SSL socket') raise TelegramError(str(error)) else: raise TelegramError('SSL Certificate invalid')
def run_script(script): script = 'set -euo pipefail\n' + script try: with open(os.devnull) as devnull: # is this the right way to block stdin? data = subprocess.check_output(['bash', '-c', script], stderr=subprocess.STDOUT, stdin=devnull) status = 0 except subprocess.CalledProcessError as ex: data = ex.output status = ex.returncode data = data.decode('utf8') if status != 0: raise PheWebError( 'FAILED with status {}\n'.format(status) + 'output was:\n' + data) return data
def __init__(self, targetfd, tmpfile=None): self.targetfd = targetfd try: self.targetfd_save = os.dup(self.targetfd) except OSError: self.start = lambda: None self.done = lambda: None else: if targetfd == 0: assert not tmpfile, "cannot set tmpfile with stdin" tmpfile = open(os.devnull, "r") self.syscapture = SysCapture(targetfd) else: if tmpfile is None: f = TemporaryFile() with f: tmpfile = safe_text_dupfile(f, mode="wb+") if targetfd in patchsysdict: self.syscapture = SysCapture(targetfd, tmpfile) else: self.syscapture = NoCapture() self.tmpfile = tmpfile self.tmpfile_fd = tmpfile.fileno()
def analyze_structure(seq,filename,ensemble=False): chdir(project_dir) system("echo '" + str(seq) + "' > " + filename + ".seq") fnull = open(devnull, 'w') #this line is necessary to omit output generated by UNAFOLD if ensemble: call("./3rdParty/unafold/UNAFold.pl -n RNA " + filename + ".seq", shell = True, stdout = fnull, stderr = fnull) #code is necessary to omit output generated by UNAFOLD else: call("./3rdParty/unafold/hybrid-ss-min -n RNA " + filename + ".seq", shell = True, stdout = fnull, stderr = fnull) #code is necessary to omit output generated by UNAFOLD if os.path.isfile(filename+".ct"): system("mv %s*.ct tmp/structures/" % filename) # remove tmp files system("rm %s*" % filename) #system("mv " + filename + "* tmp/unafold_files/") fnull.close() return 1
def _run_cdhit(self, fasta_path, identity=0.95, word_size=5, description_length=0, cdhit_path=None): """ Run CDHIT-EST for sequences, install with sudo apt install cd-hit on Ubuntu """ self.messages.get_cdhit_message(identity) if cdhit_path is None: cdhit_path = "cd-hit-est" file_name = self.project + "_IdentityClusters_" + str(identity) out_file = os.path.join(self.tmp_path, file_name) cluster_path = os.path.join(self.tmp_path, file_name + '.clstr') with open(os.devnull, "w") as devnull: call([cdhit_path, "-i", fasta_path, "-o", out_file, "-c", str(identity), "-n", str(word_size), "-d", str(description_length)], stdout=devnull) return cluster_path
def disassemble_it(filename, address=None): """ Wrapper for the ruby disassembler script. """ FNULL = open(os.devnull, 'w') if address is not None: outfile = filename + "_disass_" + hex(address) else: outfile = filename + "_disass_None" args = ['ruby', 'analysis_tools/disassfunc.rb', "-graph", "-svg", "-o", outfile, filename] if address is not None: args.append(hex(address)) proc = Popen(args, stdin=FNULL, stdout=FNULL, stderr=FNULL) proc.wait() FNULL.close() app.logger.debug("Disassembly just finished!") return outfile
def analyze_it(self): """ Wrapper for the ruby analysis script. Executes and get results from files. """ FNULL = open(os.devnull, 'w') args = ['ruby', 'analysis_tools/AnalyzeIt.rb', self.storage_file] proc = Popen(args, stdin=FNULL, stdout=FNULL, stderr=FNULL) proc.wait() FNULL.close() # TEXT report, just UTF-8 decode/parsing fname = self.storage_file + '.txt' if os.path.exists(fname): data = open(fname, 'rb').read() self.txt_report = re.sub(r'[^\x00-\x7F]', '', data).decode('utf-8') return True
def test_no_clustering(self): with redirected_stdio(stderr=os.devnull): obs_table, obs_sequences = cluster_features_de_novo( sequences=self.input_sequences, table=self.input_table, perc_identity=1.0) # order of identifiers is important for biom.Table equality obs_table = \ obs_table.sort_order(self.input_table.ids(axis='observation'), axis='observation') self.assertEqual(obs_table, self.input_table) obs_seqs = _read_seqs(obs_sequences) # sequences are reverse-sorted by abundance in output exp_seqs = [self.input_sequences_list[0], self.input_sequences_list[3], self.input_sequences_list[2], self.input_sequences_list[1]] self.assertEqual(obs_seqs, exp_seqs)
def test_99_percent_clustering(self): exp_table = biom.Table(np.array([[104, 106, 109], [1, 1, 2], [7, 8, 9]]), ['feature1', 'feature2', 'feature4'], ['sample1', 'sample2', 'sample3']) with redirected_stdio(stderr=os.devnull): obs_table, obs_sequences = cluster_features_de_novo( sequences=self.input_sequences, table=self.input_table, perc_identity=0.99) # order of identifiers is important for biom.Table equality obs_table = \ obs_table.sort_order(exp_table.ids(axis='observation'), axis='observation') self.assertEqual(obs_table, exp_table) # sequences are reverse-sorted by abundance in output obs_seqs = _read_seqs(obs_sequences) exp_seqs = [self.input_sequences_list[0], self.input_sequences_list[3], self.input_sequences_list[1]] self.assertEqual(obs_seqs, exp_seqs)
def test_97_percent_clustering_feature1_most_abundant(self): exp_table = biom.Table(np.array([[111, 114, 118], [1, 1, 2]]), ['feature1', 'feature2'], ['sample1', 'sample2', 'sample3']) with redirected_stdio(stderr=os.devnull): obs_table, obs_sequences = cluster_features_de_novo( sequences=self.input_sequences, table=self.input_table, perc_identity=0.97) # order of identifiers is important for biom.Table equality obs_table = \ obs_table.sort_order(exp_table.ids(axis='observation'), axis='observation') self.assertEqual(obs_table, exp_table) # sequences are reverse-sorted by abundance in output obs_seqs = _read_seqs(obs_sequences) exp_seqs = [self.input_sequences_list[0], self.input_sequences_list[1]] self.assertEqual(obs_seqs, exp_seqs)
def test_uchime_denovo(self): with redirected_stdio(stderr=os.devnull): chime, nonchime, stats = uchime_denovo( sequences=self.input_sequences, table=self.input_table) obs_chime = _read_seqs(chime) exp_chime = [self.input_sequences_list[3]] self.assertEqual(obs_chime, exp_chime) # sequences are reverse-sorted by abundance in output obs_nonchime = _read_seqs(nonchime) exp_nonchime = [self.input_sequences_list[0], self.input_sequences_list[1], self.input_sequences_list[2]] self.assertEqual(obs_nonchime, exp_nonchime) with stats.open() as stats_fh: stats_text = stats_fh.read() self.assertTrue('feature1' in stats_text) self.assertTrue('feature2' in stats_text) self.assertTrue('feature3' in stats_text) self.assertTrue('feature4' in stats_text) stats_lines = [e for e in stats_text.split('\n') if len(e) > 0] self.assertEqual(len(stats_lines), 4)
def test_join_pairs_all_samples_w_no_joined_seqs(self): # minmergelen is set very high here, resulting in no sequences # being joined across the three samples. with redirected_stdio(stderr=os.devnull): obs = join_pairs(self.input_seqs, minmergelen=500) # manifest is as expected self._test_manifest(obs) # expected number of fastq files are created output_fastqs = list(obs.sequences.iter_views(FastqGzFormat)) self.assertEqual(len(output_fastqs), 3) for fastq_name, fastq_path in output_fastqs: with redirected_stdio(stderr=os.devnull): seqs = skbio.io.read(str(fastq_path), format='fastq', compression='gzip', constructor=skbio.DNA) seqs = list(seqs) seq_lengths = np.asarray([len(s) for s in seqs]) self.assertEqual(len(seq_lengths), 0)
def get_parsed_args(self, comp_words): """ gets the parsed args from a patched parser """ active_parsers = self._patch_argument_parser() parsed_args = argparse.Namespace() self.completing = True if USING_PYTHON2: # Python 2 argparse only properly works with byte strings. comp_words = [ensure_bytes(word) for word in comp_words] try: stderr = sys.stderr sys.stderr = io.open(os.devnull, "w") active_parsers[0].parse_known_args(comp_words, namespace=parsed_args) sys.stderr.close() sys.stderr = stderr except BaseException: pass self.completing = False return parsed_args
def get_cloud_project(): cmd = [ 'gcloud', '-q', 'config', 'list', 'project', '--format=value(core.project)' ] with open(os.devnull, 'w') as dev_null: try: res = subprocess.check_output(cmd, stderr=dev_null).strip() if not res: raise Exception('--cloud specified but no Google Cloud Platform ' 'project found.\n' 'Please specify your project name with the --project ' 'flag or set a default project: ' 'gcloud config set project YOUR_PROJECT_NAME') return res except OSError as e: if e.errno == errno.ENOENT: raise Exception('gcloud is not installed. The Google Cloud SDK is ' 'necessary to communicate with the Cloud ML service. ' 'Please install and set up gcloud.') raise
def convert_image(inpath, outpath, size): """Convert an image file using ``sips``. Args: inpath (str): Path of source file. outpath (str): Path to destination file. size (int): Width and height of destination image in pixels. Raises: RuntimeError: Raised if ``sips`` exits with non-zero status. """ cmd = [ b'sips', b'-z', str(size), str(size), inpath, b'--out', outpath] # log().debug(cmd) with open(os.devnull, 'w') as pipe: retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT) if retcode != 0: raise RuntimeError('sips exited with %d' % retcode)
def validate(opt, agent): old_datatype = agent.opt['datatype'] agent.opt['datatype'] = 'valid' opt = deepcopy(opt) opt['datatype'] = 'valid' opt['terminate'] = True opt['batchsize'] = 1 old_stdout = sys.stdout sys.stdout = open(os.devnull, 'w') valid_world = create_task(opt, agent) sys.stdout = old_stdout for _ in valid_world: valid_world.parley() stats = valid_world.report() agent.opt['datatype'] = old_datatype return stats
def build(self): self.line('[<comment>Building tzdata</>]') dest_path = os.path.join(self.path, 'tz') # Getting VERSION with open(os.path.join(dest_path, 'version')) as f: version = f.read().strip() self.write('<comment>Building</> version <fg=cyan>{}</>'.format(version)) os.chdir(dest_path) with open(os.devnull, 'w') as temp: subprocess.call( ['make', 'TOPDIR={}'.format(dest_path), 'install'], stdout=temp, stderr=temp ) self.overwrite('<info>Built</> version <fg=cyan>{}</>'.format(version)) self.line('')
def TeeCmd(cmd, logfile, fail_hard=True): """Runs cmd and writes the output to both stdout and logfile.""" # Reading from PIPE can deadlock if one buffer is full but we wait on a # different one. To work around this, pipe the subprocess's stderr to # its stdout buffer and don't give it a stdin. # shell=True is required in cmd.exe since depot_tools has an svn.bat, and # bat files only work with shell=True set. proc = subprocess.Popen(cmd, bufsize=1, shell=sys.platform == 'win32', stdin=open(os.devnull), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) for line in iter(proc.stdout.readline,''): Tee(line, logfile) if proc.poll() is not None: break exit_code = proc.wait() if exit_code != 0 and fail_hard: print 'Failed:', cmd sys.exit(1)
def run(self, with_intermediate_file=False, cwd=None): """Method to run the backup command where it applies.""" command = self.build_dump_command() if with_intermediate_file: try: backup_file_f = open('%s/%s' % (self.output_directory, self.backup_file), 'w') except IOError as exc: raise p = subprocess.Popen(command.split(), stdout=backup_file_f, env=self.env, cwd=cwd) p.wait() backup_file_f.flush() else: FNULL = open(os.devnull, 'w') p = subprocess.Popen(command.split(), env=self.env, cwd=cwd, stdout=FNULL, stderr=subprocess.STDOUT)
def restore(self, backup_filename,with_intermediate_file=False): """Method to restore the backup.""" self.store.get(self.swift_container, backup_filename, self.output_directory) command = self.build_restore_command(backup_filename) if with_intermediate_file: file_path = '%s/%s' % (self.output_directory, backup_filename) backup_file_content = open(file_path, 'r').read() p = subprocess.Popen(command.split(), stdin=subprocess.PIPE) p.communicate(backup_file_content) else: FNULL = open(os.devnull, 'w') p = subprocess.Popen(command.split(), stdout=FNULL, stderr=subprocess.STDOUT) p.wait() if self.clean_local_copy: self._clean_local_copy(backup_filename)
def dump_database(id): """Dump the database to a temporary directory.""" tmp_dir = tempfile.mkdtemp() current_dir = os.getcwd() os.chdir(tmp_dir) FNULL = open(os.devnull, 'w') heroku_app = HerokuApp(dallinger_uid=id, output=FNULL) heroku_app.backup_capture() heroku_app.backup_download() for filename in os.listdir(tmp_dir): if filename.startswith("latest.dump"): os.rename(filename, "database.dump") os.chdir(current_dir) return os.path.join(tmp_dir, "database.dump")
def setUp(self): super(PostArgParseSetupTest, self).setUp() self.config.debug = False self.config.max_log_backups = 1000 self.config.quiet = False self.config.verbose_count = constants.CLI_DEFAULTS['verbose_count'] self.devnull = open(os.devnull, 'w') from certbot.log import ColoredStreamHandler self.stream_handler = ColoredStreamHandler(six.StringIO()) from certbot.log import MemoryHandler, TempHandler self.temp_handler = TempHandler() self.temp_path = self.temp_handler.path self.memory_handler = MemoryHandler(self.temp_handler) self.root_logger = mock.MagicMock( handlers=[self.memory_handler, self.stream_handler])