我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用shlex.split()。
def cmd(command): result = Result() p = Popen(shlex.split(command), stdin=PIPE, stdout=PIPE, stderr=PIPE) (stdout, stderr) = p.communicate() result.exit_code = p.returncode result.stdout = stdout result.stderr = stderr result.command = command if p.returncode != 0: print 'Error executing command [%s]' % command print 'stderr: [%s]' % stderr print 'stdout: [%s]' % stdout return result
def mounts(self, detectdev=False): mounts = [] with open('/proc/mounts', 'r') as f: for line in f: dev, path, fstype = line.split()[0:3] if fstype in ('ext2', 'ext3', 'ext4', 'xfs', 'jfs', 'reiserfs', 'btrfs', 'simfs'): # simfs: filesystem in OpenVZ if not os.path.isdir(path): continue mounts.append({'dev': dev, 'path': path, 'fstype': fstype}) for mount in mounts: stat = os.statvfs(mount['path']) total = stat.f_blocks*stat.f_bsize free = stat.f_bfree*stat.f_bsize used = (stat.f_blocks-stat.f_bfree)*stat.f_bsize mount['total'] = b2h(total) mount['free'] = b2h(free) mount['used'] = b2h(used) mount['used_rate'] = div_percent(used, total) if detectdev: dev = os.stat(mount['path']).st_dev mount['major'], mount['minor'] = os.major(dev), os.minor(dev) return mounts
def _pythonpath(): items = os.environ.get('PYTHONPATH', '').split(os.pathsep) return filter(None, items)
def uptime(self): with open('/proc/uptime', 'r') as f: uptime, idletime = f.readline().split() up_seconds = int(float(uptime)) idle_seconds = int(float(idletime)) # in some machine like Linode VPS, idle time may bigger than up time if idle_seconds > up_seconds: cpu_count = multiprocessing.cpu_count() idle_seconds = idle_seconds/cpu_count # in some VPS, this value may still bigger than up time # may be the domain 0 machine has more cores # we calclate approximately for it if idle_seconds > up_seconds: for n in range(2,10): if idle_seconds/n < up_seconds: idle_seconds = idle_seconds/n break fmt = '{days} ? {hours} ?? {minutes} ? {seconds} ?' uptime_string = strfdelta(datetime.timedelta(seconds = up_seconds), fmt) idletime_string = strfdelta(datetime.timedelta(seconds = idle_seconds), fmt) return { 'up': uptime_string, 'idle': idletime_string, 'idle_rate': div_percent(idle_seconds, up_seconds), }
def parse_command(self, prefixes = ['!']): for prefix in prefixes: if len(prefix) < len(self.text_cont) and self.text_cont.startswith(prefix): print("Found a command with the {} prefix.".format(prefix)) self.comm_is = True self.command = self.text_words[0][len(prefix):] if len(self.text_words) > 1: try: self.comm_params = shlex.split(self.text_cont)[1:] except ValueError: self.comm_params = self.text_words[1:] if self.chat_query and self.comm_is is False: self.comm_is = True self.command = self.text_words[0] if len(self.text_words) > 1: try: self.comm_params = shlex.split(self.text_cont)[1:] except ValueError: self.comm_params = self.text_words[1:]
def _parse_command_opts(self, parser, args): # Remove --with-X/--without-X options when processing command args self.global_options = self.__class__.global_options self.negative_opt = self.__class__.negative_opt # First, expand any aliases command = args[0] aliases = self.get_option_dict('aliases') while command in aliases: src, alias = aliases[command] del aliases[command] # ensure each alias can expand only once! import shlex args[:1] = shlex.split(alias, True) command = args[0] nargs = _Distribution._parse_command_opts(self, parser, args) # Handle commands that want to consume all remaining arguments cmd_class = self.get_command_class(command) if getattr(cmd_class, 'command_consumes_arguments', None): self.get_option_dict(command)['args'] = ("command line", nargs) if nargs is not None: return [] return nargs
def plot(self): if _domainless._DEBUG == True: print "Plot:plot()" # Error checking before launching plot if self.usesPortIORString == None: raise AssertionError, "Plot:plot() ERROR - usesPortIORString not set ... must call connect() on this object from another component" if self._usesPortName == None: raise AssertionError, "Plot:plot() ERROR - usesPortName not set ... must call connect() on this object from another component" if self._dataType == None: raise AssertionError, "Plot:plot() ERROR - dataType not set ... must call connect() on this object from another component" plotCommand = str(self._eclipsePath) + "/bin/plotter.sh -portname " + str(self._usesPortName) + " -repid " + str(self._dataType) + " -ior " + str(self.usesPortIORString) if _domainless._DEBUG == True: print "Plot:plotCommand " + str(plotCommand) args = _shlex.split(plotCommand) if _domainless._DEBUG == True: print "Plot:args " + str(args) try: dev_null = open('/dev/null','w') sub_process = _subprocess.Popen(args,stdout=dev_null,preexec_fn=_os.setpgrp) pid = sub_process.pid self._processes[pid] = sub_process except Exception, e: raise AssertionError, "Plot:plot() Failed to launch plotting due to %s" % ( e)
def precmd(self, line): """Handle alias expansion and ';;' separator.""" if not line.strip(): return line args = line.split() while args[0] in self.aliases: line = self.aliases[args[0]] ii = 1 for tmpArg in args[1:]: line = line.replace("%" + str(ii), tmpArg) ii = ii + 1 line = line.replace("%*", ' '.join(args[1:])) args = line.split() # split into ';;' separated commands # unless it's an alias command if args[0] != 'alias': marker = line.find(';;') if marker >= 0: # queue up everything after marker next = line[marker+2:].lstrip() self.cmdqueue.append(next) line = line[:marker].rstrip() return line
def do_enable(self, arg): args = arg.split() for i in args: try: i = int(i) except ValueError: print >>self.stdout, 'Breakpoint index %r is not a number' % i continue if not (0 <= i < len(bdb.Breakpoint.bpbynumber)): print >>self.stdout, 'No breakpoint numbered', i continue bp = bdb.Breakpoint.bpbynumber[i] if bp: bp.enable()
def do_disable(self, arg): args = arg.split() for i in args: try: i = int(i) except ValueError: print >>self.stdout, 'Breakpoint index %r is not a number' % i continue if not (0 <= i < len(bdb.Breakpoint.bpbynumber)): print >>self.stdout, 'No breakpoint numbered', i continue bp = bdb.Breakpoint.bpbynumber[i] if bp: bp.disable()
def do_condition(self, arg): # arg is breakpoint number and condition args = arg.split(' ', 1) try: bpnum = int(args[0].strip()) except ValueError: # something went wrong print >>self.stdout, \ 'Breakpoint index %r is not a number' % args[0] return try: cond = args[1] except: cond = None try: bp = bdb.Breakpoint.bpbynumber[bpnum] except IndexError: print >>self.stdout, 'Breakpoint index %r is not valid' % args[0] return if bp: bp.cond = cond if not cond: print >>self.stdout, 'Breakpoint', bpnum, print >>self.stdout, 'is now unconditional.'
def _iscommand(cmd): """Return True if cmd is executable or can be found on the executable search path.""" if _isexecutable(cmd): return True path = os.environ.get("PATH") if not path: return False for d in path.split(os.pathsep): exe = os.path.join(d, cmd) if _isexecutable(exe): return True return False # General parent classes
def save_testcase(self, testcase): """ Save all testcases collected during monitoring """ try: if self.config.debug: print("[\033[92mINFO\033[0m] Saving testcase...") dir_name = "testcase_{0}".format(os.path.basename(shlex.split(self.config.process_to_monitor)[0])) try: os.mkdir(dir_name) except OSError: pass for test in testcase: with open("{0}/testcase_{1}.json".format(dir_name, self.testcase_count), "wb") as t: t.write(test) t.close() self.testcase_count += 1 except Exception as e: raise PJFBaseException(e.message if hasattr(e, "message") else str(e))
def get_helper_tasks(self): """ Return a information about our helper tasks for this task definition. This is in the form of a dictionary like so: {`<helper_task_family>`: `<helper_task_family>:<revision>`, ...} If our service has helper tasks (as defined in the `tasks:` section of the deployfish.yml file), we've recorded the appropriate `<family>:<revision>` of each them as docker labels in the container definition of the first container in the task definition. Those docker labels will be in this form: edu.caltech.tasks.<task name>.id=<family>:<revision> :rtype: dict of strings """ l = {} for key, value in self.dockerLabels.items(): if key.startswith('edu.caltech.task'): l[value.split(':')[0]] = value return l
def __get_volumes(self): """ Generate the "volumes" argument for the ``register_task_definition()`` call. :rtype: dict """ volume_names = set() volumes = [] for c in self.containers: for v in c.volumes: host_path = v.split(':')[0] name = self.mount_from_host_path(host_path) if name not in volume_names: volumes.append({ 'name': name, 'host': {'sourcePath': host_path} }) volume_names.add(name) return volumes
def test_reload_vms(self): """ Reboot VMs and check that uptime decreased. """ m = messier.Messier() m.create_vms() def get_uptime(vm): """ Return uptime in seconds for VM. """ # Hideous one-liner, but it works. cmd = """vagrant ssh {} --command \"cut -d' ' -f 1 /proc/uptime\" """.format(vm.name) cmd = shlex.split(cmd) return subprocess.check_output(cmd, stderr=open('/dev/null', 'w')) # Sleep to make sure the original boot has a higher uptime time.sleep(10) original_uptimes = { vm.name: get_uptime(vm) for vm in m.vms } m.reload_vms() new_uptimes = { vm.name: get_uptime(vm) for vm in m.vms } for k, v in original_uptimes.iteritems(): assert new_uptimes[k] < v
def _process_map(): """ Create a map of processes that have deleted files. """ procs = [] proc1 = Popen(shlex.split('lsof '), stdout=PIPE) # pylint: disable=line-too-long proc2 = Popen(shlex.split("awk 'BEGIN {IGNORECASE = 1} /deleted/ {print $1 \" \" $2 \" \" $4}'"), stdin=proc1.stdout, stdout=PIPE, stderr=PIPE) proc1.stdout.close() stdout, _ = proc2.communicate() for proc_l in stdout.split('\n'): proc = proc_l.split(' ') proc_info = {} if proc[0] and proc[1] and proc[2]: proc_info['name'] = proc[0] if proc_info['name'] == 'httpd-pre': # lsof 'nicely' abbreviates httpd-prefork to httpd-pre proc_info['name'] = 'httpd-prefork' proc_info['pid'] = proc[1] proc_info['user'] = proc[2] procs.append(proc_info) else: continue return procs
def zypper_ps(role, lsof_map): """ Gets services that need a restart from zypper """ assert role proc1 = Popen(shlex.split('zypper ps -sss'), stdout=PIPE) stdout, _ = proc1.communicate() processes_ = processes # adding instead of overwriting, eh? # radosgw is ceph-radosgw in zypper ps. processes_['rgw'] = ['ceph-radosgw', 'radosgw', 'rgw'] # ganesha is called nfs-ganesha processes_['ganesha'] = ['ganesha.nfsd', 'rpcbind', 'rpc.statd', 'nfs-ganesha'] for proc_l in stdout.split('\n'): if '@' in proc_l: proc_l = proc_l.split('@')[0] if proc_l in processes_[role]: lsof_map.append({'name': proc_l}) return lsof_map
def files_touched(git_range): """Run git log --pretty="%H" --raw git_range. Generate a dictionary of files modified by the commits in range """ cmd = 'git', 'log', '--pretty=%H', '--raw', git_range data = run_command(*cmd) commits = collections.defaultdict(list) commit = None for line in data.splitlines(): if SHA1_REGEX.match(line): commit = line elif line.strip(): split_line = line.split('\t') filename = split_line[-1] state = split_line[0].split(' ')[-1][0] commits[commit].append(GitFile(filename, state)) return commits
def check_nsp(dist, attr, value): """Verify that namespace packages are valid""" assert_string_list(dist,attr,value) for nsp in value: if not dist.has_contents_for(nsp): raise DistutilsSetupError( "Distribution contains no modules or packages for " + "namespace package %r" % nsp ) if '.' in nsp: parent = '.'.join(nsp.split('.')[:-1]) if parent not in value: distutils.log.warn( "WARNING: %r is declared as a package namespace, but %r" " is not: please correct this in setup.py", nsp, parent )
def _parse_command_opts(self, parser, args): # Remove --with-X/--without-X options when processing command args self.global_options = self.__class__.global_options self.negative_opt = self.__class__.negative_opt # First, expand any aliases command = args[0] aliases = self.get_option_dict('aliases') while command in aliases: src,alias = aliases[command] del aliases[command] # ensure each alias can expand only once! import shlex args[:1] = shlex.split(alias,True) command = args[0] nargs = _Distribution._parse_command_opts(self, parser, args) # Handle commands that want to consume all remaining arguments cmd_class = self.get_command_class(command) if getattr(cmd_class,'command_consumes_arguments',None): self.get_option_dict(command)['args'] = ("command line", nargs) if nargs is not None: return [] return nargs
def _tokenize(self, handle): for line in handle: if line.startswith("#"): continue elif line.startswith(";"): token = line[1:].strip() for line in handle: line = line.strip() if line == ';': break token += line yield token else: try: tokens = shlex.split(line) except ValueError: # error "No closing quotation" line = line.replace("'",'"') # if odd - add a closing " to that line if not line.count('"') % 2 == 0: line = '{}"'.format(line) tokens = shlex.split(line) for token in tokens: yield token
def update(ctx): lst = Options.options.files if lst: lst = lst.split(',') else: path = os.path.join(Context.waf_dir, 'waflib', 'extras') lst = [x for x in Utils.listdir(path) if x.endswith('.py')] for x in lst: tool = x.replace('.py', '') if not tool: continue try: dl = Configure.download_tool except AttributeError: ctx.fatal('The command "update" is dangerous; include the tool "use_config" in your project!') try: dl(tool, force=True, ctx=ctx) except Errors.WafError: Logs.error('Could not find the tool %r in the remote repository' % x) else: Logs.warn('Updated %r' % tool)
def add_os_flags(self, var, dest=None, dup=True): """ Import operating system environment values into ``conf.env`` dict:: def configure(conf): conf.add_os_flags('CFLAGS') :param var: variable to use :type var: string :param dest: destination variable, by default the same as var :type dest: string :param dup: add the same set of flags again :type dup: bool """ try: flags = shlex.split(self.environ[var]) except KeyError: return # TODO: in waf 1.9, make dup=False the default if dup or ''.join(flags) not in ''.join(Utils.to_list(self.env[dest or var])): self.env.append_value(dest or var, flags)
def countdown_handler(self, interval, count): command = '{0}/countdown -i {1} {2}'.format(os.getcwd(), interval, count) proc = Subprocess(shlex.split(command), stdout=Subprocess.STREAM) try: while True: line_bytes = yield proc.stdout.read_until(b'\n') line = to_unicode(line_bytes)[:-1] self.log.info('command read: %s', line) timestamp = datetime.now().timestamp() self.zmq_stream.send_multipart([b'0', utf8(json_encode({ 'stdout': line, 'finished': False, 'timestamp': timestamp }))]) except StreamClosedError: self.log.info('command closed') timestamp = datetime.now().timestamp() self.zmq_stream.send_multipart([b'0', utf8(json_encode({ 'stdout': None, 'finished': True, 'timestamp': timestamp }))])
def run_cmd(cmd, quiet=False): if not quiet: logging.info('command: {}'.format(cmd)) # use shlex to keep quoted substrings result = run(shlex.split(cmd), stdout=PIPE, stderr=PIPE) stdout = result.stdout.strip().decode() stderr = result.stderr.strip().decode() if stdout and not quiet: logging.debug(stdout) if stderr and not quiet: logging.warning(stderr) return result.stdout.strip()
def addbookmark(self): """ Opens add bookmark dialog and gets url from url box""" dialog = QDialog(self) addbmkdialog = Add_Bookmark_Dialog() addbmkdialog.setupUi(dialog) addbmkdialog.titleEdit.setText(self.tabWidget.currentWidget().page().mainFrame().title()) addbmkdialog.addressEdit.setText(self.line.text()) if (dialog.exec_() == QDialog.Accepted): url = unicode(addbmkdialog.addressEdit.text()) bmk = [unicode(addbmkdialog.titleEdit.text()), url] self.bookmarks = importBookmarks(configdir+"bookmarks.txt") self.bookmarks.insert(0, bmk) exportBookmarks(configdir+"bookmarks.txt", self.bookmarks) icon = self.tabWidget.currentWidget().icon() if not icon.isNull(): icon.pixmap(16, 16).save(icon_dir + url.split('/')[2] + '.png')
def closeEvent(self, event): """This saves all settings, bookmarks, cookies etc. during window close""" if self.confirm_before_quit: confirm = QMessageBox.warning(self, 'Quit Browser ?', 'Are you sure to close the Browser', 'Quit', 'Cancel') if confirm == 1 : event.ignore() return self.savesettings() cookiejar.exportCookies() # Delete excess thumbnails thumbnails = [ x for x in os.listdir(thumbnails_dir) ] for fav in self.favourites: if fav[2] in thumbnails: thumbnails.remove(fav[2]) for f in thumbnails: os.remove(thumbnails_dir + f) # Delete excess icons icons = [ x for x in os.listdir(icon_dir) if x.endswith('.png') ] for bmk in self.bookmarks: if bmk[1].split('/')[2] + '.png' in icons: icons.remove(bmk[1].split('/')[2] + '.png') for f in icons: os.remove( icon_dir + f ) super(Main, self).closeEvent(event)
def accept_connections(s): conn, addr = s.accept() try: data = "" d = conn.recv(1024) while d: data += d d = conn.recv(1024) finally: conn.close() if data: try: urgency, host, title, body = shlex.split(data) subprocess.call(["notify-send", "-u", urgency, "-a", "IRC %s" % host, escape(title), escape(body)]) except ValueError as e: print e except OSError as e: print e accept_connections(s)
def expand_file_arguments(): """ Any argument starting with "@" gets replaced with all values read from a text file. Text file arguments can be split by newline or by space. Values are added "as-is", as if they were specified in this order on the command line. """ new_args = [] expanded = False for arg in sys.argv: if arg.startswith("@"): expanded = True with open(arg[1:],"r") as f: for line in f.readlines(): new_args += shlex.split(line) else: new_args.append(arg) if expanded: print("esptool.py %s" % (" ".join(new_args[1:]))) sys.argv = new_args
def get_environment_id(default_ccv): translation_table = string.maketrans('-','_') CONVERT_CCV = DEFAULT_CONTENT_VIEW.translate(translation_table) CONVERT_ORGANIZATION = ORGANIZATION.translate(translation_table) PUPPET_ENV = str("KT_" + CONVERT_ORGANIZATION + "_" + ENVIRONMENT + "_" + CONVERT_CCV) cmd_get_environment_id = hammer_cmd + " --csv environment list" try: perform_cmd = subprocess.Popen(cmd_get_environment_id, shell=True, stdout=subprocess.PIPE) puppet_env_id = perform_cmd.stdout.read() for line in islice(puppet_env_id.strip().split("\n"), 1, None): # print output without CSV header if PUPPET_ENV in line: return line.split(",")[0] break except: print log.ERROR + "ERROR: Puppet environment id not found. Please ensure that the Puppet environment " + PUPPET_ENV + " is configured properly in Satellite." + log.END sys.exit(1)
def parse_args(self, arg_string): try: self.args = self.parser.parse_args(shlex.split(arg_string)) return True except BadArgumentsException: return False
def loadavg(self): with open('/proc/loadavg', 'r') as f: load_1min, load_5min, load_15min = f.readline().split()[0:3] return { '1min': load_1min, '5min': load_5min, '15min': load_15min, }
def cpustat(self, fullstat=False): cpustat = {} # REF: http://www.kernel.org/doc/Documentation/filesystems/proc.txt fname = ('used', 'idle') full_fname = ('user', 'nice', 'system', 'idle', 'iowait', 'irq', 'softirq', 'steal', 'guest', 'guest_nice') cpustat['cpus'] = [] with open('/proc/stat', 'r') as f: for line in f: if line.startswith('cpu'): fields = line.strip().split() name = fields[0] if not fullstat and name != 'cpu': continue; stat = fields[1:] stat = [int(i) for i in stat] statall = sum(stat) if fullstat: while len(stat) < 10: stat.append(0) stat = dict(zip(full_fname, stat)) else: stat = [statall-stat[3], stat[3]] stat = dict(zip(fname, stat)) stat['all'] = statall if name == 'cpu': cpustat['total'] = stat else: cpustat['cpus'].append(stat) elif line.startswith('btime'): btime = int(line.strip().split()[1]) cpustat['btime'] = time.strftime('%Y-%m-%d %X %Z', time.localtime(btime)) return cpustat
def meminfo(self): # OpenVZ may not have some varirables # so init them first mem_total = mem_free = mem_buffers = mem_cached = swap_total = swap_free = 0 with open('/proc/meminfo', 'r') as f: for line in f: if ':' not in line: continue item, value = line.split(':') value = int(value.split()[0]) * 1024; if item == 'MemTotal': mem_total = value elif item == 'MemFree': mem_free = value elif item == 'Buffers': mem_buffers = value elif item == 'Cached': mem_cached = value elif item == 'SwapTotal': swap_total = value elif item == 'SwapFree': swap_free = value mem_used = mem_total - mem_free swap_used = swap_total - swap_free return { 'mem_total': b2h(mem_total), 'mem_used': b2h(mem_used), 'mem_free': b2h(mem_free), 'mem_buffers': b2h(mem_buffers), 'mem_cached': b2h(mem_cached), 'swap_total': b2h(swap_total), 'swap_used': b2h(swap_used), 'swap_free': b2h(swap_free), 'mem_used_rate': div_percent(mem_used, mem_total), 'mem_free_rate': div_percent(mem_free, mem_total), 'swap_used_rate': div_percent(swap_used, swap_total), 'swap_free_rate': div_percent(swap_free, swap_total), }
def uname(self): p = subprocess.Popen(shlex.split('uname -i'), stdout=subprocess.PIPE, close_fds=True) hwplatform = p.stdout.read().strip() p.wait() uname = platform.uname() return { 'kernel_name': uname[0], 'node': uname[1], 'kernel_release': uname[2], 'kernel_version': uname[3], 'machine': uname[4], 'processor': uname[5], 'platform': hwplatform, }
def cpuinfo(self): models = [] bitss = [] cpuids = [] with open('/proc/cpuinfo', 'r') as f: for line in f: if 'model name' in line or 'physical id' in line or 'flags' in line: item, value = line.strip().split(':') item = item.strip() value = value.strip() if item == 'model name': models.append(re.sub('\s+', ' ', value)) elif item == 'physical id': cpuids.append(value) elif item == 'flags': if ' lm ' in value: bitss.append('64bit') else: bitss.append('32bit') cores = [{'model': x, 'bits': y} for x, y in zip(models, bitss)] cpu_count = len(set(cpuids)) if cpu_count == 0: cpu_count = 1 return { 'cores': cores, 'cpu_count': cpu_count, 'core_count': len(cores), }
def partinfo(self, uuid=None, devname=None): """Read partition info including uuid and filesystem. You can specify uuid or devname to get the identified partition info. If no argument provided, all partitions will return. We read info from /etc/blkid/blkid.tab instead of call blkid command. REF: http://linuxconfig.org/how-to-retrieve-and-change-partitions-universally-unique-identifier-uuid-on-linux """ blks = {} p = subprocess.Popen(shlex.split('/sbin/blkid'), stdout=subprocess.PIPE, close_fds=True) p.stdout.read() p.wait() # OpenVZ may not have this file if not os.path.exists('/etc/blkid/blkid.tab'): return None with open('/etc/blkid/blkid.tab') as f: for line in f: dom = parseString(line).documentElement _fstype = dom.getAttribute('TYPE') _uuid = dom.getAttribute('UUID') _devname = dom.firstChild.nodeValue.replace('/dev/', '') partinfo = { 'name': _devname, 'fstype': _fstype, 'uuid': _uuid, } if uuid and uuid == _uuid: return partinfo elif devname and devname == _devname: return partinfo else: blks[_devname] = partinfo if uuid or devname: return None else: return blks
def autostart_list(self): """Return a list of the autostart service name. """ with open('/etc/inittab') as f: for line in f: if line.startswith('id:'): startlevel = line.split(':')[1] break rcpath = '/etc/rc.d/rc%s.d/' % startlevel services = [ os.path.basename(os.readlink(filepath)) for filepath in glob.glob('%s/S*' % rcpath)] return services #}}} #{{{Tool
def set_params(self, message): self.text_cont = message self.text_words = message.split(' ') # # Define useful functions outside class. #