Python shlex 模块,split() 实例源码


项目:heerkat    作者:Roche    | 项目源码 | 文件源码
def cmd(command):
    result = Result()

    p = Popen(shlex.split(command), stdin=PIPE, stdout=PIPE, stderr=PIPE)
    (stdout, stderr) = p.communicate()

    result.exit_code = p.returncode
    result.stdout = stdout
    result.stderr = stderr
    result.command = command

    if p.returncode != 0:
        print 'Error executing command [%s]' % command
        print 'stderr: [%s]' % stderr
        print 'stdout: [%s]' % stdout

    return result
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def mounts(self, detectdev=False):
        mounts = []
        with open('/proc/mounts', 'r') as f:
            for line in f:
                dev, path, fstype = line.split()[0:3]
                if fstype in ('ext2', 'ext3', 'ext4', 'xfs',
                              'jfs', 'reiserfs', 'btrfs',
                              'simfs'): # simfs: filesystem in OpenVZ
                    if not os.path.isdir(path): continue
                    mounts.append({'dev': dev, 'path': path, 'fstype': fstype})
        for mount in mounts:
            stat = os.statvfs(mount['path'])
            total = stat.f_blocks*stat.f_bsize
            free = stat.f_bfree*stat.f_bsize
            used = (stat.f_blocks-stat.f_bfree)*stat.f_bsize
            mount['total'] = b2h(total)
            mount['free'] = b2h(free)
            mount['used'] = b2h(used)
            mount['used_rate'] = div_percent(used, total)
            if detectdev:
                dev = os.stat(mount['path']).st_dev
                mount['major'], mount['minor'] = os.major(dev), os.minor(dev)
        return mounts
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _pythonpath():
    items = os.environ.get('PYTHONPATH', '').split(os.pathsep)
    return filter(None, items)
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def uptime(self):
        with open('/proc/uptime', 'r') as f:
            uptime, idletime = f.readline().split()
            up_seconds = int(float(uptime))
            idle_seconds = int(float(idletime))
            # in some machine like Linode VPS, idle time may bigger than up time
            if idle_seconds > up_seconds:
                cpu_count = multiprocessing.cpu_count()
                idle_seconds = idle_seconds/cpu_count
                # in some VPS, this value may still bigger than up time
                # may be the domain 0 machine has more cores
                # we calclate approximately for it
                if idle_seconds > up_seconds:
                    for n in range(2,10):
                        if idle_seconds/n < up_seconds:
                            idle_seconds = idle_seconds/n
            fmt = '{days} ? {hours} ?? {minutes} ? {seconds} ?'
            uptime_string = strfdelta(datetime.timedelta(seconds = up_seconds), fmt)
            idletime_string = strfdelta(datetime.timedelta(seconds = idle_seconds), fmt)
        return {
            'up': uptime_string,
            'idle': idletime_string,
            'idle_rate': div_percent(idle_seconds, up_seconds),
项目:Aigis    作者:Joaquin-V    | 项目源码 | 文件源码
def parse_command(self, prefixes = ['!']):
        for prefix in prefixes:
            if len(prefix) < len(self.text_cont) and self.text_cont.startswith(prefix):
                print("Found a command with the {} prefix.".format(prefix))
                self.comm_is = True
                self.command = self.text_words[0][len(prefix):]

                if len(self.text_words) > 1:
                        self.comm_params = shlex.split(self.text_cont)[1:]
                    except ValueError:
                        self.comm_params = self.text_words[1:]

        if self.chat_query and self.comm_is is False:
                self.comm_is = True
                self.command = self.text_words[0]

                if len(self.text_words) > 1:
                        self.comm_params = shlex.split(self.text_cont)[1:]
                    except ValueError:
                        self.comm_params = self.text_words[1:]
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _parse_command_opts(self, parser, args):
        # Remove --with-X/--without-X options when processing command args
        self.global_options = self.__class__.global_options
        self.negative_opt = self.__class__.negative_opt

        # First, expand any aliases
        command = args[0]
        aliases = self.get_option_dict('aliases')
        while command in aliases:
            src, alias = aliases[command]
            del aliases[command]  # ensure each alias can expand only once!
            import shlex
            args[:1] = shlex.split(alias, True)
            command = args[0]

        nargs = _Distribution._parse_command_opts(self, parser, args)

        # Handle commands that want to consume all remaining arguments
        cmd_class = self.get_command_class(command)
        if getattr(cmd_class, 'command_consumes_arguments', None):
            self.get_option_dict(command)['args'] = ("command line", nargs)
            if nargs is not None:
                return []

        return nargs
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _parse_command_opts(self, parser, args):
        # Remove --with-X/--without-X options when processing command args
        self.global_options = self.__class__.global_options
        self.negative_opt = self.__class__.negative_opt

        # First, expand any aliases
        command = args[0]
        aliases = self.get_option_dict('aliases')
        while command in aliases:
            src, alias = aliases[command]
            del aliases[command]  # ensure each alias can expand only once!
            import shlex
            args[:1] = shlex.split(alias, True)
            command = args[0]

        nargs = _Distribution._parse_command_opts(self, parser, args)

        # Handle commands that want to consume all remaining arguments
        cmd_class = self.get_command_class(command)
        if getattr(cmd_class, 'command_consumes_arguments', None):
            self.get_option_dict(command)['args'] = ("command line", nargs)
            if nargs is not None:
                return []

        return nargs
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def plot(self):
        if _domainless._DEBUG == True:
            print "Plot:plot()"

        # Error checking before launching plot
        if self.usesPortIORString == None:
            raise AssertionError, "Plot:plot() ERROR - usesPortIORString not set ... must call connect() on this object from another component"
        if self._usesPortName == None:
            raise AssertionError, "Plot:plot() ERROR - usesPortName not set ... must call connect() on this object from another component"
        if self._dataType == None:
            raise AssertionError, "Plot:plot() ERROR - dataType not set ... must call connect() on this object from another component"

        plotCommand = str(self._eclipsePath) + "/bin/ -portname " + str(self._usesPortName) + " -repid " + str(self._dataType) + " -ior " + str(self.usesPortIORString)
        if _domainless._DEBUG == True:
            print "Plot:plotCommand " + str(plotCommand)
        args = _shlex.split(plotCommand)
        if _domainless._DEBUG == True:
            print "Plot:args " + str(args)
            dev_null = open('/dev/null','w')
            sub_process = _subprocess.Popen(args,stdout=dev_null,preexec_fn=_os.setpgrp)
            pid =
            self._processes[pid] = sub_process
        except Exception, e:
            raise AssertionError, "Plot:plot() Failed to launch plotting due to %s" % ( e)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def precmd(self, line):
        """Handle alias expansion and ';;' separator."""
        if not line.strip():
            return line
        args = line.split()
        while args[0] in self.aliases:
            line = self.aliases[args[0]]
            ii = 1
            for tmpArg in args[1:]:
                line = line.replace("%" + str(ii),
                ii = ii + 1
            line = line.replace("%*", ' '.join(args[1:]))
            args = line.split()
        # split into ';;' separated commands
        # unless it's an alias command
        if args[0] != 'alias':
            marker = line.find(';;')
            if marker >= 0:
                # queue up everything after marker
                next = line[marker+2:].lstrip()
                line = line[:marker].rstrip()
        return line
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def do_enable(self, arg):
        args = arg.split()
        for i in args:
                i = int(i)
            except ValueError:
                print >>self.stdout, 'Breakpoint index %r is not a number' % i

            if not (0 <= i < len(bdb.Breakpoint.bpbynumber)):
                print >>self.stdout, 'No breakpoint numbered', i

            bp = bdb.Breakpoint.bpbynumber[i]
            if bp:
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def do_disable(self, arg):
        args = arg.split()
        for i in args:
                i = int(i)
            except ValueError:
                print >>self.stdout, 'Breakpoint index %r is not a number' % i

            if not (0 <= i < len(bdb.Breakpoint.bpbynumber)):
                print >>self.stdout, 'No breakpoint numbered', i

            bp = bdb.Breakpoint.bpbynumber[i]
            if bp:
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def do_condition(self, arg):
        # arg is breakpoint number and condition
        args = arg.split(' ', 1)
            bpnum = int(args[0].strip())
        except ValueError:
            # something went wrong
            print >>self.stdout, \
                'Breakpoint index %r is not a number' % args[0]
            cond = args[1]
            cond = None
            bp = bdb.Breakpoint.bpbynumber[bpnum]
        except IndexError:
            print >>self.stdout, 'Breakpoint index %r is not valid' % args[0]
        if bp:
            bp.cond = cond
            if not cond:
                print >>self.stdout, 'Breakpoint', bpnum,
                print >>self.stdout, 'is now unconditional.'
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _iscommand(cmd):
    """Return True if cmd is executable or can be found on the executable
    search path."""
    if _isexecutable(cmd):
        return True
    path = os.environ.get("PATH")
    if not path:
        return False
    for d in path.split(os.pathsep):
        exe = os.path.join(d, cmd)
        if _isexecutable(exe):
            return True
    return False

# General parent classes
项目:PyJFuzz    作者:mseclab    | 项目源码 | 文件源码
def save_testcase(self, testcase):
        Save all testcases collected during monitoring
            if self.config.debug:
                print("[\033[92mINFO\033[0m] Saving testcase...")
            dir_name = "testcase_{0}".format(os.path.basename(shlex.split(self.config.process_to_monitor)[0]))
            except OSError:
            for test in testcase:
                with open("{0}/testcase_{1}.json".format(dir_name, self.testcase_count), "wb") as t:
                self.testcase_count += 1
        except Exception as e:
            raise PJFBaseException(e.message if hasattr(e, "message") else str(e))
项目:deployfish    作者:caltechads    | 项目源码 | 文件源码
def get_helper_tasks(self):
        Return a information about our helper tasks for this task definition.
        This is in the form of a dictionary like so:

            {`<helper_task_family>`: `<helper_task_family>:<revision>`, ...}

        If our service has helper tasks (as defined in the `tasks:` section of
        the deployfish.yml file), we've recorded the appropriate
        `<family>:<revision>` of each them as docker labels in the container
        definition of the first container in the task definition.

        Those docker labels will be in this form:

            edu.caltech.tasks.<task name>.id=<family>:<revision>

        :rtype: dict of strings
        l = {}
        for key, value in self.dockerLabels.items():
            if key.startswith('edu.caltech.task'):
                l[value.split(':')[0]] = value
        return l
项目:deployfish    作者:caltechads    | 项目源码 | 文件源码
def __get_volumes(self):
        Generate the "volumes" argument for the ``register_task_definition()`` call.

        :rtype: dict
        volume_names = set()
        volumes = []
        for c in self.containers:
            for v in c.volumes:
                host_path = v.split(':')[0]
                name = self.mount_from_host_path(host_path)
                if name not in volume_names:
                        'name': name,
                        'host': {'sourcePath': host_path}
        return volumes
项目:messier    作者:conorsch    | 项目源码 | 文件源码
def test_reload_vms(self):
        Reboot VMs and check that uptime decreased.
        m = messier.Messier()

        def get_uptime(vm):
            Return uptime in seconds for VM.
            # Hideous one-liner, but it works.
            cmd = """vagrant ssh {} --command \"cut -d' ' -f 1 /proc/uptime\" """.format(
            cmd = shlex.split(cmd)
            return subprocess.check_output(cmd, stderr=open('/dev/null', 'w'))

        # Sleep to make sure the original boot has a higher uptime
        original_uptimes = { get_uptime(vm) for vm in m.vms }
        new_uptimes = { get_uptime(vm) for vm in m.vms }

        for k, v in original_uptimes.iteritems():
            assert new_uptimes[k] < v
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _process_map():
    Create a map of processes that have deleted files.
    procs = []
    proc1 = Popen(shlex.split('lsof '), stdout=PIPE)
    # pylint: disable=line-too-long
    proc2 = Popen(shlex.split("awk 'BEGIN {IGNORECASE = 1} /deleted/ {print $1 \" \" $2 \" \" $4}'"),
                  stdin=proc1.stdout, stdout=PIPE, stderr=PIPE)
    stdout, _ = proc2.communicate()
    for proc_l in stdout.split('\n'):
        proc = proc_l.split(' ')
        proc_info = {}
        if proc[0] and proc[1] and proc[2]:
            proc_info['name'] = proc[0]
            if proc_info['name'] == 'httpd-pre':
                # lsof 'nicely' abbreviates httpd-prefork to httpd-pre
                proc_info['name'] = 'httpd-prefork'
            proc_info['pid'] = proc[1]
            proc_info['user'] = proc[2]
    return procs
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def zypper_ps(role, lsof_map):
    Gets services that need a restart from zypper
    assert role
    proc1 = Popen(shlex.split('zypper ps -sss'), stdout=PIPE)
    stdout, _ = proc1.communicate()
    processes_ = processes
    # adding instead of overwriting, eh?
    # radosgw is ceph-radosgw in zypper ps.
    processes_['rgw'] = ['ceph-radosgw', 'radosgw', 'rgw']
    # ganesha is called nfs-ganesha
    processes_['ganesha'] = ['ganesha.nfsd', 'rpcbind', 'rpc.statd', 'nfs-ganesha']
    for proc_l in stdout.split('\n'):
        if '@' in proc_l:
            proc_l = proc_l.split('@')[0]
        if proc_l in processes_[role]:
            lsof_map.append({'name': proc_l})
    return lsof_map
项目:git-stacktrace    作者:pinterest    | 项目源码 | 文件源码
def files_touched(git_range):
    """Run git log --pretty="%H" --raw  git_range.

    Generate a dictionary of files modified by the commits in range
    cmd = 'git', 'log', '--pretty=%H', '--raw', git_range
    data = run_command(*cmd)
    commits = collections.defaultdict(list)
    commit = None
    for line in data.splitlines():
        if SHA1_REGEX.match(line):
            commit = line
        elif line.strip():
            split_line = line.split('\t')
            filename = split_line[-1]
            state = split_line[0].split(' ')[-1][0]
            commits[commit].append(GitFile(filename, state))
    return commits
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def check_nsp(dist, attr, value):
    """Verify that namespace packages are valid"""
    for nsp in value:
        if not dist.has_contents_for(nsp):
            raise DistutilsSetupError(
                "Distribution contains no modules or packages for " +
                "namespace package %r" % nsp
        if '.' in nsp:
            parent = '.'.join(nsp.split('.')[:-1])
            if parent not in value:
                    "WARNING: %r is declared as a package namespace, but %r"
                    " is not: please correct this in", nsp, parent
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _parse_command_opts(self, parser, args):
        # Remove --with-X/--without-X options when processing command args
        self.global_options = self.__class__.global_options
        self.negative_opt = self.__class__.negative_opt

        # First, expand any aliases
        command = args[0]
        aliases = self.get_option_dict('aliases')
        while command in aliases:
            src,alias = aliases[command]
            del aliases[command]    # ensure each alias can expand only once!
            import shlex
            args[:1] = shlex.split(alias,True)
            command = args[0]

        nargs = _Distribution._parse_command_opts(self, parser, args)

        # Handle commands that want to consume all remaining arguments
        cmd_class = self.get_command_class(command)
        if getattr(cmd_class,'command_consumes_arguments',None):
            self.get_option_dict(command)['args'] = ("command line", nargs)
            if nargs is not None:
                return []

        return nargs
项目:ssbio    作者:SBRG    | 项目源码 | 文件源码
def _tokenize(self, handle):
        for line in handle:
            if line.startswith("#"):
            elif line.startswith(";"):
                token = line[1:].strip()
                for line in handle:
                    line = line.strip()
                    if line == ';':
                    token += line
                yield token
                    tokens = shlex.split(line)
                except ValueError:
                    # error "No closing quotation"
                    line = line.replace("'",'"')
                    # if odd - add a closing " to that line
                    if not line.count('"') % 2 == 0:
                        line = '{}"'.format(line)
                    tokens = shlex.split(line)
                for token in tokens:
                    yield token
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def update(ctx):
    lst = Options.options.files
    if lst:
        lst = lst.split(',')
        path = os.path.join(Context.waf_dir, 'waflib', 'extras')
        lst = [x for x in Utils.listdir(path) if x.endswith('.py')]
    for x in lst:
        tool = x.replace('.py', '')
        if not tool:
            dl = Configure.download_tool
        except AttributeError:
            ctx.fatal('The command "update" is dangerous; include the tool "use_config" in your project!')
            dl(tool, force=True, ctx=ctx)
        except Errors.WafError:
            Logs.error('Could not find the tool %r in the remote repository' % x)
            Logs.warn('Updated %r' % tool)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def update(ctx):
    lst = Options.options.files
    if lst:
        lst = lst.split(',')
        path = os.path.join(Context.waf_dir, 'waflib', 'extras')
        lst = [x for x in Utils.listdir(path) if x.endswith('.py')]
    for x in lst:
        tool = x.replace('.py', '')
        if not tool:
            dl = Configure.download_tool
        except AttributeError:
            ctx.fatal('The command "update" is dangerous; include the tool "use_config" in your project!')
            dl(tool, force=True, ctx=ctx)
        except Errors.WafError:
            Logs.error('Could not find the tool %r in the remote repository' % x)
            Logs.warn('Updated %r' % tool)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def add_os_flags(self, var, dest=None, dup=True):
    Import operating system environment values into ``conf.env`` dict::

        def configure(conf):

    :param var: variable to use
    :type var: string
    :param dest: destination variable, by default the same as var
    :type dest: string
    :param dup: add the same set of flags again
    :type dup: bool
        flags = shlex.split(self.environ[var])
    except KeyError:
    # TODO: in waf 1.9, make dup=False the default
    if dup or ''.join(flags) not in ''.join(Utils.to_list(self.env[dest or var])):
        self.env.append_value(dest or var, flags)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def update(ctx):
    lst = Options.options.files
    if lst:
        lst = lst.split(',')
        path = os.path.join(Context.waf_dir, 'waflib', 'extras')
        lst = [x for x in Utils.listdir(path) if x.endswith('.py')]
    for x in lst:
        tool = x.replace('.py', '')
        if not tool:
            dl = Configure.download_tool
        except AttributeError:
            ctx.fatal('The command "update" is dangerous; include the tool "use_config" in your project!')
            dl(tool, force=True, ctx=ctx)
        except Errors.WafError:
            Logs.error('Could not find the tool %r in the remote repository' % x)
            Logs.warn('Updated %r' % tool)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def add_os_flags(self, var, dest=None, dup=True):
    Import operating system environment values into ``conf.env`` dict::

        def configure(conf):

    :param var: variable to use
    :type var: string
    :param dest: destination variable, by default the same as var
    :type dest: string
    :param dup: add the same set of flags again
    :type dup: bool
        flags = shlex.split(self.environ[var])
    except KeyError:
    # TODO: in waf 1.9, make dup=False the default
    if dup or ''.join(flags) not in ''.join(Utils.to_list(self.env[dest or var])):
        self.env.append_value(dest or var, flags)
项目:react-tornado-graphql-example    作者:yatsu    | 项目源码 | 文件源码
def countdown_handler(self, interval, count):
        command = '{0}/countdown -i {1} {2}'.format(os.getcwd(), interval, count)
        proc = Subprocess(shlex.split(command), stdout=Subprocess.STREAM)
            while True:
                line_bytes = yield proc.stdout.read_until(b'\n')
                line = to_unicode(line_bytes)[:-1]
      'command read: %s', line)
                timestamp =
                self.zmq_stream.send_multipart([b'0', utf8(json_encode({
                    'stdout': line,
                    'finished': False,
                    'timestamp': timestamp
        except StreamClosedError:
  'command closed')
            timestamp =
            self.zmq_stream.send_multipart([b'0', utf8(json_encode({
                'stdout': None,
                'finished': True,
                'timestamp': timestamp
项目:python-everywhere    作者:wdv4758h    | 项目源码 | 文件源码
def run_cmd(cmd, quiet=False):
    if not quiet:'command: {}'.format(cmd))

    # use shlex to keep quoted substrings
    result = run(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
    stdout = result.stdout.strip().decode()
    stderr = result.stderr.strip().decode()

    if stdout and not quiet:

    if stderr and not quiet:

    return result.stdout.strip()
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def _parse_command_opts(self, parser, args):
        # Remove --with-X/--without-X options when processing command args
        self.global_options = self.__class__.global_options
        self.negative_opt = self.__class__.negative_opt

        # First, expand any aliases
        command = args[0]
        aliases = self.get_option_dict('aliases')
        while command in aliases:
            src, alias = aliases[command]
            del aliases[command]  # ensure each alias can expand only once!
            import shlex
            args[:1] = shlex.split(alias, True)
            command = args[0]

        nargs = _Distribution._parse_command_opts(self, parser, args)

        # Handle commands that want to consume all remaining arguments
        cmd_class = self.get_command_class(command)
        if getattr(cmd_class, 'command_consumes_arguments', None):
            self.get_option_dict(command)['args'] = ("command line", nargs)
            if nargs is not None:
                return []

        return nargs
项目:quartz-browser    作者:ksharindam    | 项目源码 | 文件源码
def addbookmark(self):
        """ Opens add bookmark dialog and gets url from url box"""
        dialog = QDialog(self)
        addbmkdialog = Add_Bookmark_Dialog()
        if (dialog.exec_() == QDialog.Accepted):
            url = unicode(addbmkdialog.addressEdit.text())
            bmk = [unicode(addbmkdialog.titleEdit.text()), url]
            self.bookmarks = importBookmarks(configdir+"bookmarks.txt")
            self.bookmarks.insert(0, bmk)
            exportBookmarks(configdir+"bookmarks.txt", self.bookmarks)
            icon = self.tabWidget.currentWidget().icon()
            if not icon.isNull():
                icon.pixmap(16, 16).save(icon_dir + url.split('/')[2] + '.png')
项目:quartz-browser    作者:ksharindam    | 项目源码 | 文件源码
def closeEvent(self, event):
        """This saves all settings, bookmarks, cookies etc. during window close"""
        if self.confirm_before_quit:
            confirm = QMessageBox.warning(self, 'Quit Browser ?', 'Are you sure to close the Browser',
                                                'Quit', 'Cancel')
            if confirm == 1 :
        # Delete excess thumbnails
        thumbnails = [ x for x in os.listdir(thumbnails_dir) ]
        for fav in self.favourites:
            if fav[2] in thumbnails:
        for f in thumbnails: os.remove(thumbnails_dir + f)
        # Delete excess icons
        icons = [ x for x in os.listdir(icon_dir) if x.endswith('.png') ]
        for bmk in self.bookmarks:
            if bmk[1].split('/')[2] + '.png' in icons:
                icons.remove(bmk[1].split('/')[2] + '.png')
        for f in icons: os.remove( icon_dir + f )
        super(Main, self).closeEvent(event)
项目:weechat-pyrnotify    作者:arnottcr    | 项目源码 | 文件源码
def accept_connections(s):
    conn, addr = s.accept()
        data = ""
        d = conn.recv(1024)
        while d:
            data += d
            d = conn.recv(1024)
    if data:
            urgency, host, title, body = shlex.split(data)
  ["notify-send", "-u", urgency, "-a", "IRC %s" % host, escape(title), escape(body)])

        except ValueError as e:
            print e
        except OSError as e:
            print e
项目:iot    作者:akademikbilisim    | 项目源码 | 文件源码
def expand_file_arguments():
    """ Any argument starting with "@" gets replaced with all values read from a text file.
    Text file arguments can be split by newline or by space.
    Values are added "as-is", as if they were specified in this order on the command line.
    new_args = []
    expanded = False
    for arg in sys.argv:
        if arg.startswith("@"):
            expanded = True
            with open(arg[1:],"r") as f:
                for line in f.readlines():
                    new_args += shlex.split(line)
    if expanded:
        print(" %s" % (" ".join(new_args[1:])))
        sys.argv = new_args
项目:satellite6_automation    作者:hambuergaer    | 项目源码 | 文件源码
def get_environment_id(default_ccv):
    translation_table = string.maketrans('-','_')
    CONVERT_CCV = DEFAULT_CONTENT_VIEW.translate(translation_table)
    CONVERT_ORGANIZATION = ORGANIZATION.translate(translation_table)

        cmd_get_environment_id = hammer_cmd + " --csv environment list"
                perform_cmd = subprocess.Popen(cmd_get_environment_id, shell=True, stdout=subprocess.PIPE)
                puppet_env_id =
                for line in  islice(puppet_env_id.strip().split("\n"), 1, None):        # print output without CSV header
                        if PUPPET_ENV in line:
                                return line.split(",")[0]

                print log.ERROR + "ERROR: Puppet environment id not found. Please ensure that the Puppet environment " + PUPPET_ENV + " is configured properly in Satellite." + log.END
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def check_nsp(dist, attr, value):
    """Verify that namespace packages are valid"""
    for nsp in value:
        if not dist.has_contents_for(nsp):
            raise DistutilsSetupError(
                "Distribution contains no modules or packages for " +
                "namespace package %r" % nsp
        if '.' in nsp:
            parent = '.'.join(nsp.split('.')[:-1])
            if parent not in value:
                    "WARNING: %r is declared as a package namespace, but %r"
                    " is not: please correct this in", nsp, parent
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _parse_command_opts(self, parser, args):
        # Remove --with-X/--without-X options when processing command args
        self.global_options = self.__class__.global_options
        self.negative_opt = self.__class__.negative_opt

        # First, expand any aliases
        command = args[0]
        aliases = self.get_option_dict('aliases')
        while command in aliases:
            src,alias = aliases[command]
            del aliases[command]    # ensure each alias can expand only once!
            import shlex
            args[:1] = shlex.split(alias,True)
            command = args[0]

        nargs = _Distribution._parse_command_opts(self, parser, args)

        # Handle commands that want to consume all remaining arguments
        cmd_class = self.get_command_class(command)
        if getattr(cmd_class,'command_consumes_arguments',None):
            self.get_option_dict(command)['args'] = ("command line", nargs)
            if nargs is not None:
                return []

        return nargs
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def check_nsp(dist, attr, value):
    """Verify that namespace packages are valid"""
    for nsp in value:
        if not dist.has_contents_for(nsp):
            raise DistutilsSetupError(
                "Distribution contains no modules or packages for " +
                "namespace package %r" % nsp
        if '.' in nsp:
            parent = '.'.join(nsp.split('.')[:-1])
            if parent not in value:
                    "WARNING: %r is declared as a package namespace, but %r"
                    " is not: please correct this in", nsp, parent
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _parse_command_opts(self, parser, args):
        # Remove --with-X/--without-X options when processing command args
        self.global_options = self.__class__.global_options
        self.negative_opt = self.__class__.negative_opt

        # First, expand any aliases
        command = args[0]
        aliases = self.get_option_dict('aliases')
        while command in aliases:
            src,alias = aliases[command]
            del aliases[command]    # ensure each alias can expand only once!
            import shlex
            args[:1] = shlex.split(alias,True)
            command = args[0]

        nargs = _Distribution._parse_command_opts(self, parser, args)

        # Handle commands that want to consume all remaining arguments
        cmd_class = self.get_command_class(command)
        if getattr(cmd_class,'command_consumes_arguments',None):
            self.get_option_dict(command)['args'] = ("command line", nargs)
            if nargs is not None:
                return []

        return nargs
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def check_nsp(dist, attr, value):
    """Verify that namespace packages are valid"""
    for nsp in value:
        if not dist.has_contents_for(nsp):
            raise DistutilsSetupError(
                "Distribution contains no modules or packages for " +
                "namespace package %r" % nsp
        if '.' in nsp:
            parent = '.'.join(nsp.split('.')[:-1])
            if parent not in value:
                    "WARNING: %r is declared as a package namespace, but %r"
                    " is not: please correct this in", nsp, parent
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def parse_args(self, arg_string):
            self.args = self.parser.parse_args(shlex.split(arg_string))
            return True
        except BadArgumentsException:
            return False
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def loadavg(self):
        with open('/proc/loadavg', 'r') as f:
            load_1min, load_5min, load_15min = f.readline().split()[0:3]
        return {
            '1min': load_1min,
            '5min': load_5min,
            '15min': load_15min,
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def cpustat(self, fullstat=False):
        cpustat = {}
        # REF:
        fname = ('used', 'idle')
        full_fname = ('user', 'nice', 'system', 'idle', 'iowait', 'irq',
                'softirq', 'steal', 'guest', 'guest_nice')
        cpustat['cpus'] = []
        with open('/proc/stat', 'r') as f:
            for line in f:
                if line.startswith('cpu'):
                    fields = line.strip().split()
                    name = fields[0]
                    if not fullstat and name != 'cpu': continue;
                    stat = fields[1:]
                    stat = [int(i) for i in stat]
                    statall = sum(stat)
                    if fullstat:
                        while len(stat) < 10: stat.append(0)
                        stat = dict(zip(full_fname, stat))
                        stat = [statall-stat[3], stat[3]]
                        stat = dict(zip(fname, stat))
                    stat['all'] = statall
                    if name == 'cpu':
                        cpustat['total'] = stat
                elif line.startswith('btime'):
                    btime = int(line.strip().split()[1])
                    cpustat['btime'] = time.strftime('%Y-%m-%d %X %Z',
        return cpustat
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def meminfo(self):
        # OpenVZ may not have some varirables
        # so init them first
        mem_total = mem_free = mem_buffers = mem_cached = swap_total = swap_free = 0

        with open('/proc/meminfo', 'r') as f:
            for line in f:
                if ':' not in line: continue
                item, value = line.split(':')
                value = int(value.split()[0]) * 1024;
                if item == 'MemTotal':
                    mem_total = value
                elif item == 'MemFree':
                    mem_free = value
                elif item == 'Buffers':
                    mem_buffers = value
                elif item == 'Cached':
                    mem_cached = value
                elif item == 'SwapTotal':
                    swap_total = value
                elif item == 'SwapFree':
                    swap_free = value

        mem_used = mem_total - mem_free
        swap_used = swap_total - swap_free
        return {
            'mem_total': b2h(mem_total),
            'mem_used': b2h(mem_used),
            'mem_free': b2h(mem_free),
            'mem_buffers': b2h(mem_buffers),
            'mem_cached': b2h(mem_cached),
            'swap_total': b2h(swap_total),
            'swap_used': b2h(swap_used),
            'swap_free': b2h(swap_free),
            'mem_used_rate': div_percent(mem_used, mem_total),
            'mem_free_rate': div_percent(mem_free, mem_total),
            'swap_used_rate': div_percent(swap_used, swap_total),
            'swap_free_rate': div_percent(swap_free, swap_total),
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def uname(self):
        p = subprocess.Popen(shlex.split('uname -i'), stdout=subprocess.PIPE, close_fds=True)
        hwplatform =

        uname = platform.uname()
        return {
            'kernel_name': uname[0],
            'node': uname[1],
            'kernel_release': uname[2],
            'kernel_version': uname[3],
            'machine': uname[4],
            'processor': uname[5],
            'platform': hwplatform,
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def cpuinfo(self):
        models = []
        bitss = []
        cpuids = []
        with open('/proc/cpuinfo', 'r') as f:
            for line in f:
                if 'model name' in line or 'physical id' in line or 'flags' in line:
                    item, value = line.strip().split(':')
                    item = item.strip()
                    value = value.strip()
                    if item == 'model name':
                        models.append(re.sub('\s+', ' ', value))
                    elif item == 'physical id':
                    elif item == 'flags':
                        if ' lm ' in value:
        cores = [{'model': x, 'bits': y} for x, y in zip(models, bitss)]
        cpu_count = len(set(cpuids))
        if cpu_count == 0: cpu_count = 1
        return {
            'cores': cores,
            'cpu_count': cpu_count,
            'core_count': len(cores),
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def partinfo(self, uuid=None, devname=None):
        """Read partition info including uuid and filesystem.

        You can specify uuid or devname to get the identified partition info.
        If no argument provided, all partitions will return.

        We read info from /etc/blkid/ instead of call blkid command.
        blks = {}
        p = subprocess.Popen(shlex.split('/sbin/blkid'), stdout=subprocess.PIPE, close_fds=True)

        # OpenVZ may not have this file
        if not os.path.exists('/etc/blkid/'): return None

        with open('/etc/blkid/') as f:
            for line in f:
                dom = parseString(line).documentElement
                _fstype = dom.getAttribute('TYPE')
                _uuid = dom.getAttribute('UUID')
                _devname = dom.firstChild.nodeValue.replace('/dev/', '')
                partinfo = {
                    'name': _devname,
                    'fstype': _fstype,
                    'uuid': _uuid,
                if uuid and uuid == _uuid:
                    return partinfo
                elif devname and devname == _devname:
                    return partinfo
                    blks[_devname] = partinfo
        if uuid or devname:
            return None
            return blks
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def autostart_list(self):
        """Return a list of the autostart service name.
        with open('/etc/inittab') as f:
            for line in f:
                if line.startswith('id:'):
                    startlevel = line.split(':')[1]
        rcpath = '/etc/rc.d/rc%s.d/' % startlevel
        services = [
            for filepath in glob.glob('%s/S*' % rcpath)]
        return services
项目:Aigis    作者:Joaquin-V    | 项目源码 | 文件源码
def set_params(self, message):
        self.text_cont  = message
        self.text_words = message.split(' ')

# Define useful functions outside class.