我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.popen2()。
def __StartViews(self, aListOfView): """ Start all the view listed in <aListOfView>. """ for myViewName in aListOfView: myClearCaseCommand = 'cleartool endview ' + myViewName os.popen2(myClearCaseCommand) myClearCaseCommand = 'cleartool startview ' + myViewName os.popen2(myClearCaseCommand) """ (mystdIn, myStdOut) = popen2.popen2(myClearCaseCommand) for myLine in mystdIn: # Consume di output pass """ return
def __PrivateFile(self, aLocalView): """ Return a list with all private and cheked out files in <aLocalView> view. """ myClearCaseCommand = 'cleartool lsprivate -tag ' + aLocalView myPrivateFileList = [] (mystdIn, myStdOut) = popen2.popen2(myClearCaseCommand) for myLine in mystdIn: myFilter = '[checkedout]' myLine = string.rstrip(string.lstrip(myLine)) if myLine[-len(myFilter):] == myFilter: myLine = string.rstrip(string.lstrip(myLine[:-len(myFilter)])) myPrivateFileList.append(myLine) return myPrivateFileList
def popen2(cmd, mode="t", bufsize=-1): """Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd' may be a sequence, in which case arguments will be passed directly to the program without shell intervention (as with os.spawnv()). If 'cmd' is a string it will be passed to the shell (as with os.system()). If 'bufsize' is specified, it sets the buffer size for the I/O pipes. The file objects (child_stdin, child_stdout) are returned.""" import warnings msg = "os.popen2 is deprecated. Use the subprocess module." warnings.warn(msg, DeprecationWarning, stacklevel=2) import subprocess PIPE = subprocess.PIPE p = subprocess.Popen(cmd, shell=isinstance(cmd, basestring), bufsize=bufsize, stdin=PIPE, stdout=PIPE, close_fds=True) return p.stdin, p.stdout
def _getNetworksNetstatProto(self, proto) : if(proto == "tcp") : current_list = self.networks_netstat.list_tcp argproto = "-t" elif(proto == "udp") : current_list = self.networks_netstat.list_udp argproto = "-u" i, o = os.popen2("/bin/netstat -an " + argproto) j = o.readline() j = o.readline() j = o.readline() while(j != ""): liste = j.split() if(proto == "tcp") : current_list.append([liste[3], liste[4], liste[5]]) elif(proto == "udp") : current_list.append([liste[3], liste[4], None]) j = o.readline() o.close() i.close()
def voip_play1(s1,list=None,**kargs): dsp,rd = os.popen2("sox -t .ul - -t ossdsp /dev/dsp") def play(pkt): if not pkt: return if not pkt.haslayer(UDP): return ip=pkt.getlayer(IP) if s1 in [ip.src, ip.dst]: dsp.write(pkt.getlayer(Raw).load[12:]) try: if list is None: sniff(store=0, prn=play, **kargs) else: for p in list: play(p) finally: dsp.close() rd.close()
def voip_play2(s1,**kargs): dsp,rd = os.popen2("sox -t .ul -c 2 - -t ossdsp /dev/dsp") def play(pkt,last=[]): if not pkt: return if not pkt.haslayer(UDP): return ip=pkt.getlayer(IP) if s1 in [ip.src, ip.dst]: if not last: last.append(pkt) return load=last.pop() x1 = load.load[12:] # c1.write(load.load[12:]) if load.getlayer(IP).src == ip.src: x2 = "" # c2.write("\x00"*len(load.load[12:])) last.append(pkt) else: x2 = pkt.load[:12] # c2.write(pkt.load[12:]) dsp.write(merge(x1,x2)) sniff(store=0, prn=play, **kargs)
def voip_play3(lst=None,**kargs): dsp,rd = os.popen2("sox -t .ul - -t ossdsp /dev/dsp") try: def play(pkt, dsp=dsp): if pkt and pkt.haslayer(UDP) and pkt.haslayer(Raw): dsp.write(pkt.getlayer(RTP).load) if lst is None: sniff(store=0, prn=play, **kargs) else: for p in lst: play(p) finally: try: dsp.close() rd.close() except: pass
def _detect_ncpus(): """Detect the number of effective CPUs in the system""" # Snippet taken from ParallelPython # For Linux, Unix and MacOS if hasattr(os, "sysconf"): if "SC_NPROCESSORS_ONLN" in os.sysconf_names: #Linux and Unix ncpus = os.sysconf("SC_NPROCESSORS_ONLN") if isinstance(ncpus, int) and ncpus > 0: return ncpus else: #MacOS X return int(os.popen2("sysctl -n hw.ncpu")[1].read()) #for Windows if "NUMBER_OF_PROCESSORS" in os.environ: ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]) if ncpus > 0: return ncpus #return the default value return 1
def voip_play1(s1,list=None,**kargs): dsp,rd = os.popen2("sox -t .ul - -t ossdsp /dev/dsp") def play(pkt): if not pkt: return if not pkt.haslayer(UDP): return ip=pkt.getlayer(IP) if s1 in [ip.src, ip.dst]: dsp.write(pkt.getlayer(conf.raw_layer).load[12:]) try: if list is None: sniff(store=0, prn=play, **kargs) else: for p in list: play(p) finally: dsp.close() rd.close()
def voip_play3(lst=None,**kargs): dsp,rd = os.popen2("sox -t .ul - -t ossdsp /dev/dsp") try: def play(pkt, dsp=dsp): if pkt and pkt.haslayer(UDP) and pkt.haslayer(conf.raw_layer): dsp.write(pkt.getlayer(RTP).load) if lst is None: sniff(store=0, prn=play, **kargs) else: for p in lst: play(p) finally: try: dsp.close() rd.close() except: pass