Python shlex 模块,quote() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用shlex.quote()

项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def xcheck_envar(conf, name, wafname=None, cross=False):
    wafname = wafname or name
    envar = os.environ.get(name, None)

    if envar is None:
        return

    value = Utils.to_list(envar) if envar != '' else [envar]

    conf.env[wafname] = value
    if cross:
        pretty = 'cross-compilation %s' % wafname
    else:
        pretty = wafname
    conf.msg('Will use %s' % pretty,
     " ".join(quote(x) for x in value))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def xcheck_envar(conf, name, wafname=None, cross=False):
    wafname = wafname or name
    envar = os.environ.get(name, None)

    if envar is None:
        return

    value = Utils.to_list(envar) if envar != '' else [envar]

    conf.env[wafname] = value
    if cross:
        pretty = 'cross-compilation %s' % wafname
    else:
        pretty = wafname
    conf.msg('Will use %s' % pretty,
     " ".join(quote(x) for x in value))
项目:cheribuild    作者:CTSRD-CHERI    | 项目源码 | 文件源码
def printCommand(arg1: "typing.Union[str, typing.Sequence[typing.Any]]", *remainingArgs, outputFile=None,
                 colour=AnsiColour.yellow, cwd=None, env=None, sep=" ", printVerboseOnly=False, **kwargs):
    if not _cheriConfig or (_cheriConfig.quiet or (printVerboseOnly and not _cheriConfig.verbose)):
        return
    # also allow passing a single string
    if not type(arg1) is str:
        allArgs = arg1
        arg1 = allArgs[0]
        remainingArgs = allArgs[1:]
    newArgs = ("cd", shlex.quote(str(cwd)), "&&") if cwd else tuple()
    if env:
        # only print the changed environment entries
        filteredEnv = __filterEnv(env)
        if filteredEnv:
            newArgs += ("env",) + tuple(map(shlex.quote, (k + "=" + str(v) for k, v in filteredEnv.items())))
    # comma in tuple is required otherwise it creates a tuple of string chars
    newArgs += (shlex.quote(str(arg1)),) + tuple(map(shlex.quote, map(str, remainingArgs)))
    if outputFile:
        newArgs += (">", str(outputFile))
    print(coloured(colour, newArgs, sep=sep), flush=True, **kwargs)
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def process_pre(self):
        import tempfile

        self.temp_dir = tempfile.TemporaryDirectory()

        self.environ = {'PYTHONPATH': pythonpath()}
        self.outfname = bpy.path.ensure_ext(self.filepath, ".zip")
        self.command = (
            bpy.app.binary_path_python,
            '-m', 'bam.pack',
            # file to pack
            "--input", bpy.data.filepath,
            # file to write
            "--output", self.outfname,
            "--temp", self.temp_dir.name,
        )

        if self.log.isEnabledFor(logging.INFO):
            import shlex
            cmd_to_log = ' '.join(shlex.quote(s) for s in self.command)
            self.log.info('Executing %s', cmd_to_log)
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def shell_script(self):
        def _trace(command):
            return 'echo + {}\n{} '.format(
                shlex.quote(command),
                command
            )

        commands = []
        after_success = [_trace(cmd) for cmd in self.after_success]
        after_failure = [_trace(cmd) for cmd in self.after_failure]
        for service in self.services:
            commands.append(_trace('service {} start'.format(service)))
        for script in self.scripts:
            commands.append(_trace(script))

        command_encoded = shlex.quote(to_text(base64.b64encode(to_binary('\n'.join(commands)))))
        context = {
            'command': command_encoded,
            'after_success': '    \n'.join(after_success),
            'after_failure': '    \n'.join(after_failure),
        }
        script = render_template('script.sh', **context)
        logger.debug('Build script: \n%s', script)
        return script
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def files_in_archive(self, force_refresh=False):
        if self._files_in_archive and not force_refresh:
            return self._files_in_archive

        cmd = [ self.cmd_path, '--list', '-C', self.dest ]
        if self.zipflag:
            cmd.append(self.zipflag)
        if self.opts:
            cmd.extend([ '--show-transformed-names' ] + self.opts)
        if self.excludes:
            cmd.extend([ '--exclude=' + quote(f) for f in self.excludes ])
        cmd.extend([ '-f', self.src ])
        rc, out, err = self.module.run_command(cmd, cwd=self.dest, environ_update=dict(LANG='C', LC_ALL='C', LC_MESSAGES='C'))
        if rc != 0:
            raise UnarchiveError('Unable to list files in the archive')

        for filename in out.splitlines():
            # Compensate for locale-related problems in gtar output (octal unicode representation) #11348
#            filename = filename.decode('string_escape')
            filename = codecs.escape_decode(filename)[0]
            if filename and filename not in self.excludes:
                self._files_in_archive.append(to_native(filename))
        return self._files_in_archive
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def unarchive(self):
        cmd = [ self.cmd_path, '--extract', '-C', self.dest ]
        if self.zipflag:
            cmd.append(self.zipflag)
        if self.opts:
            cmd.extend([ '--show-transformed-names' ] + self.opts)
        if self.file_args['owner']:
            cmd.append('--owner=' + quote(self.file_args['owner']))
        if self.file_args['group']:
            cmd.append('--group=' + quote(self.file_args['group']))
        if self.module.params['keep_newer']:
            cmd.append('--keep-newer-files')
        if self.excludes:
            cmd.extend([ '--exclude=' + quote(f) for f in self.excludes ])
        cmd.extend([ '-f', self.src ])
        rc, out, err = self.module.run_command(cmd, cwd=self.dest, environ_update=dict(LANG='C', LC_ALL='C', LC_MESSAGES='C'))
        return dict(cmd=cmd, rc=rc, out=out, err=err)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def test_alias_magic():
    """Test %alias_magic."""
    ip = get_ipython()
    mm = ip.magics_manager

    # Basic operation: both cell and line magics are created, if possible.
    ip.run_line_magic('alias_magic', 'timeit_alias timeit')
    nt.assert_in('timeit_alias', mm.magics['line'])
    nt.assert_in('timeit_alias', mm.magics['cell'])

    # --cell is specified, line magic not created.
    ip.run_line_magic('alias_magic', '--cell timeit_cell_alias timeit')
    nt.assert_not_in('timeit_cell_alias', mm.magics['line'])
    nt.assert_in('timeit_cell_alias', mm.magics['cell'])

    # Test that line alias is created successfully.
    ip.run_line_magic('alias_magic', '--line env_alias env')
    nt.assert_equal(ip.run_line_magic('env', ''),
                    ip.run_line_magic('env_alias', ''))

    # Test that line alias with parameters passed in is created successfully.
    ip.run_line_magic('alias_magic', '--line history_alias history --params ' + shlex.quote('3'))
    nt.assert_in('history_alias', mm.magics['line'])
项目:unionfs_cleaner    作者:l3uddz    | 项目源码 | 文件源码
def opened_files(path, excludes):
    files = []

    try:
        process = os.popen('lsof -wFn +D %s | tail -n +2 | cut -c2-' % cmd_quote(path))
        data = process.read()
        process.close()
        for item in data.split('\n'):
            if not item or len(item) <= 2 or os.path.isdir(item) or item.isdigit() or file_excluded(item, excludes):
                continue
            files.append(item)

        return files

    except Exception as ex:
        logger.exception("Exception checking %r: ", path)
        return None
项目:unionfs_cleaner    作者:l3uddz    | 项目源码 | 文件源码
def rclone_move_command(local, remote, transfers, checkers, bwlimit, excludes, chunk_size, dry_run):
    upload_cmd = 'rclone move %s %s' \
                 ' --delete-after' \
                 ' --no-traverse' \
                 ' --stats=60s' \
                 ' -v' \
                 ' --transfers=%d' \
                 ' --checkers=%d' \
                 ' --drive-chunk-size=%s' % \
                 (cmd_quote(local), cmd_quote(remote), transfers, checkers, chunk_size)
    if bwlimit and len(bwlimit):
        upload_cmd += ' --bwlimit="%s"' % bwlimit
    for item in excludes:
        upload_cmd += ' --exclude="%s"' % item
    if dry_run:
        upload_cmd += ' --dry-run'
    return upload_cmd
项目:unionfs_cleaner    作者:l3uddz    | 项目源码 | 文件源码
def remove_empty_directories(config, force_dry_run=False):
    open_files = opened_files(config['local_folder'], config['lsof_excludes'])
    if not len(open_files):
        clearing = False
        for dir, depth in config['rclone_remove_empty_on_upload'].items():
            if os.path.exists(dir):
                clearing = True
                logger.debug("Removing empty directories from %r with mindepth %r", dir, depth)
                cmd = 'find %s -mindepth %d -type d -empty' % (cmd_quote(dir), depth)
                if not config['dry_run'] and not force_dry_run:
                    cmd += ' -delete'
                run_command(cmd)
        if clearing:
            logger.debug("Finished clearing empty directories")
    else:
        logger.debug("Skipped removing empty directories because %d files are currently open: %r", len(open_files),
                     open_files)


############################################################
# CONFIG STUFF
############################################################
项目:kapsel    作者:conda    | 项目源码 | 文件源码
def test_activate(monkeypatch):
    can_connect_args = _monkeypatch_can_connect_to_socket_to_succeed(monkeypatch)

    def activate_redis_url(dirname):
        project_dir_disable_dedicated_env(dirname)
        result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None)
        assert can_connect_args['port'] == 6379
        assert result is not None
        if platform.system() == 'Windows':
            result = [line for line in result if not line.startswith("export PATH")]
            print("activate changed PATH on Windows and ideally it would not.")
        if len(result) > 2:
            import os
            print("os.environ=" + repr(os.environ))
            print("result=" + repr(result))
        assert ['export PROJECT_DIR=' + quote(dirname), 'export REDIS_URL=redis://localhost:6379'] == result

    with_directory_contents_completing_project_file(
        {DEFAULT_PROJECT_FILENAME: """
services:
  REDIS_URL: redis
    """}, activate_redis_url)
项目:kapsel    作者:conda    | 项目源码 | 文件源码
def test_activate_quoting(monkeypatch):
    def activate_foo(dirname):
        project_dir_disable_dedicated_env(dirname)
        result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None)
        assert result is not None
        if platform.system() == 'Windows':
            result = [line for line in result if not line.startswith("export PATH")]
            print("activate changed PATH on Windows and ideally it would not.")
        assert ["export FOO='$! boo'", 'export PROJECT_DIR=' + quote(dirname)] == result

    with_directory_contents_completing_project_file(
        {
            DEFAULT_PROJECT_FILENAME: """
variables:
  FOO: {}
    """,
            DEFAULT_LOCAL_STATE_FILENAME: """
variables:
  FOO: $! boo
"""
        }, activate_foo)
项目:ml-utils    作者:LinxiFan    | 项目源码 | 文件源码
def _shell_replace_vars(cmd, local_vars):
    spans = []
    replacements = []
    for match in _dollar_var_re.finditer(cmd):
        varname = match.group(1)
        if varname.isdigit():
            # $1, $2 for sys.argv, just like bash
            value = sys.argv[int(varname)]
        elif varname == '#':
            value = len(sys.argv) - 1
        elif varname == '@':
            value = ' '.join(map(shlex.quote, sys.argv[1:]))
        else:
            assert is_variable_name(varname), 'not a valid var name: ' + varname
            if not varname in local_vars:
                continue
            value = local_vars[varname]

        if isinstance(value, str):
            value = shlex.quote(value)
        else:
            value = str(value)
        spans.append(match.span())
        replacements.append(value)
    return _replace_n(cmd, spans, replacements)
项目:stig    作者:rndusr    | 项目源码 | 文件源码
def run(self, context, KEY, ACTION):
        keymap = self.tui.keymap

        key = KEY
        if len(ACTION) == 1 and ACTION[0][0] == '<' and ACTION[0][-1] == '>':
            # ACTION is another key (e.g. 'j' -> 'down')
            action = keymap.mkkey(ACTION[0])
        else:
            action = ' '.join(shlex.quote(x) for x in ACTION)

        if context is None:
            from ...tui.keymap import DEFAULT_CONTEXT
            context = DEFAULT_CONTEXT
        elif context not in _get_KEYMAP_CONTEXTS():
            log.error('Invalid context: {!r}'.format(context))
            return False

        try:
            keymap.bind(key, action, context=context)
        except ValueError as e:
            log.error(e)
            return False
        else:
            return True
项目:performance    作者:python    | 项目源码 | 文件源码
def get_output_nocheck(self, *cmd, **kwargs):
        proc = self.create_subprocess(cmd,
                                      stdout=subprocess.PIPE,
                                      universal_newlines=True,
                                      **kwargs)
        # FIXME: support Python 2?
        with proc:
            stdout = proc.communicate()[0]

        stdout = stdout.rstrip()

        exitcode = proc.wait()
        if exitcode:
            cmd_str = ' '.join(map(shlex.quote, cmd))
            self.logger.error("Command %s failed with exit code %s"
                              % (cmd_str, exitcode))

        return (exitcode, stdout)
项目:ddmbot    作者:Budovi    | 项目源码 | 文件源码
def _spawn_ffmpeg(self):
        if self.streaming:
            url = self._stream_url
        elif self.playing:
            url = self._song_context.song_url
        else:
            raise RuntimeError('Player is in an invalid state')

        args = shlex.split(self._ffmpeg_command.format(shlex.quote(url)))
        try:
            self._ffmpeg = subprocess.Popen(args)
        except FileNotFoundError as e:
            raise RuntimeError('ffmpeg executable was not found') from e
        except subprocess.SubprocessError as e:
            raise RuntimeError('Popen failed: {0.__name__} {1}'.format(type(e), str(e))) from e

    #
    # Player FSM
    #
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testQuote(self):
        safeunquoted = string.ascii_letters + string.digits + '@%_-+=:,./'
        unicode_sample = '\xe9\xe0\xdf'  # e + acute accent, a + grave, sharp s
        unsafe = '"`$\\!' + unicode_sample

        self.assertEqual(shlex.quote(''), "''")
        self.assertEqual(shlex.quote(safeunquoted), safeunquoted)
        self.assertEqual(shlex.quote('test file name'), "'test file name'")
        for u in unsafe:
            self.assertEqual(shlex.quote('test%sname' % u),
                             "'test%sname'" % u)
        for u in unsafe:
            self.assertEqual(shlex.quote("test%s'name'" % u),
                             "'test%s'\"'\"'name'\"'\"''" % u)

# Allow this test to be used with old shlex.py
项目:qubes-incremental-backup-poc    作者:v6ak    | 项目源码 | 文件源码
def restore_vm(self, new_vm, new_name, size, qvm_create_args, vm_keys, backup_storage_vm):
        subprocess.check_call("qvm-create "+shlex.quote(new_name)+" "+qvm_create_args, shell=True)
        subprocess.check_call(["qvm-prefs", "-s", new_name, "netvm", "none"]) # Safe approach…
        if size is not None:
            subprocess.check_call(["qvm-grow-private", new_name, size])
        with Dvm() as dvm:
            dvm.attach("xvdz", new_vm.private_volume())
            try:
                if size is not None:
                    dvm.check_call("sudo e2fsck -f -p /dev/xvdz")
                    dvm.check_call("sudo resize2fs /dev/xvdz")
                dvm.check_call("sudo mkdir /mnt/clone")
                dvm.check_call("sudo mount /dev/xvdz /mnt/clone")
                try:
                    self.upload_agent(dvm)
                    with self.add_permissions(backup_storage_vm, dvm, vm_keys.encrypted_name):
                        dvm.check_call("/tmp/restore-agent "+shlex.quote(backup_storage_vm.get_name())+" "+shlex.quote(vm_keys.encrypted_name), input = vm_keys.key, stdout = None, stderr = None)
                finally: dvm.check_call("sudo umount /mnt/clone")
            finally: dvm.detach_all()

    # abstract def upload_agent(self, dvm)
项目:borg-import    作者:borgbackup    | 项目源码 | 文件源码
def main():
    if not shutil.which('borg'):
        print('The \'borg\' command can\'t be found in the PATH. Please correctly install borgbackup first.')
        print('See instructions at https://borgbackup.readthedocs.io/en/stable/installation.html')
        return 1

    parser = build_parser()
    args = parser.parse_args()
    logging.basicConfig(level=args.log_level, format='%(message)s')

    if 'function' not in args:
        return parser.print_help()
    try:
        return args.function(args)
    except subprocess.CalledProcessError as cpe:
        print('{} invocation failed with status {}'.format(cpe.cmd[0], cpe.returncode))
        print('Command line was:', *[shlex.quote(s) for s in cpe.cmd])
        return cpe.returncode
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def get_package_version(prefix, connection, package_name):
    command = "dpkg-query --showformat='${Version}' --show %s" % shlex.quote(
        package_name)
    result = await connection.run(command)

    if result.exit_status != os.EX_OK:
        click.echo(
            "{0}package (failed {1}): {2} - {3}".format(
                prefix, result.exit_status, package_name, result.stderr.strip()
            )
        )
    else:
        click.echo(
            "{0}package (ok): {1}=={2}".format(
                prefix, package_name, result.stdout.strip()
            )
        )
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def run(self):
        data = cluster_data.ClusterData.find_one(self.cluster.model_id)
        cluster_name = data.global_vars.get("cluster", self.cluster.name)
        cluster_name = shlex.quote(cluster_name)

        cluster_servers = {item._id: item for item in self.cluster.server_list}
        mons = [
            cluster_servers[item["server_id"]]
            for item in self.cluster.configuration.state
            if item["role"] == "mons"]
        if not mons:
            return

        version_result = await self.execute_cmd(
            "ceph --cluster {0} health --format json".format(cluster_name),
            random.choice(mons))

        self.manage_errors(
            "Cannot execute ceph health command on %s (%s): %s",
            "Not all hosts have working ceph command",
            version_result.errors
        )
        self.manage_health(version_result)
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def run(self):
        data = cluster_data.ClusterData.find_one(self.cluster.model_id)
        cluster_name = data.global_vars.get("cluster", self.cluster.name)
        cluster_name = shlex.quote(cluster_name)

        version_result = await self.execute_cmd(
            "ceph --cluster {0} version".format(cluster_name),
            *self.cluster.server_list)

        self.manage_errors(
            "Cannot execute ceph version command on %s (%s): %s",
            "Not all hosts have working ceph command",
            version_result.errors
        )

        results = list(parse_results(version_result.ok))
        self.manage_versions(results)
        self.manage_commits(results)
项目:SLAC    作者:bencbartlett    | 项目源码 | 文件源码
def _write_command_to_file(self, env, arglist):
        envvar_settings_list = []

        if "DIALOGRC" in env:
            envvar_settings_list.append(
                "DIALOGRC={0}".format(_shell_quote(env["DIALOGRC"])))

        for var in self._lowlevel_exit_code_varnames:
            varname = "DIALOG_" + var
            envvar_settings_list.append(
                "{0}={1}".format(varname, _shell_quote(env[varname])))

        command_str = ' '.join(envvar_settings_list +
                               list(imap(_shell_quote, arglist)))
        s = "{separator}{cmd}\n\nArgs: {args!r}\n".format(
            separator="" if self._debug_first_output else ("-" * 79) + "\n",
            cmd=command_str, args=arglist)

        self._debug_logfile.write(s)
        if self._debug_always_flush:
            self._debug_logfile.flush()

        self._debug_first_output = False
项目:plash    作者:ihucos    | 项目源码 | 文件源码
def action(action_name=None, keep_comments=False, escape=True):
    def decorator(func):

        action = action_name or func.__name__.replace('_', '-')

        def function_wrapper(*args, **kw):

            if not keep_comments:
                args = [i for i in args if not i.startswith('#')]

            if escape:
                args = [shlex.quote(i) for i in args]

            res = func(*args, **kw)

            # allow actions to yield each line
            if not isinstance(res, str) and res is not None:
                res = '\n'.join(res)

            return res

        state['actions'][action] = function_wrapper
        return function_wrapper

    return decorator
项目:ctf-write-ups    作者:mvisat    | 项目源码 | 文件源码
def brute(i):
  global flag
  global last_breakpoint
  for c in charset:
    flag[i] = c
    output = gdb.execute('r < <(echo {})'.format(shlex.quote(''.join(flag))), True, True)
    # skip floating point exception
    while "SIGFPE" in output:
      output = gdb.execute('c', True, True)

    output = gdb.execute('x $pc', True, True)
    pc = output.split(":")[0]
    pc = int(pc, 16)
    if pc > last_breakpoint:
      last_breakpoint = pc
      break
  print(''.join(flag))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testQuote(self):
        safeunquoted = string.ascii_letters + string.digits + '@%_-+=:,./'
        unicode_sample = '\xe9\xe0\xdf'  # e + acute accent, a + grave, sharp s
        unsafe = '"`$\\!' + unicode_sample

        self.assertEqual(shlex.quote(''), "''")
        self.assertEqual(shlex.quote(safeunquoted), safeunquoted)
        self.assertEqual(shlex.quote('test file name'), "'test file name'")
        for u in unsafe:
            self.assertEqual(shlex.quote('test%sname' % u),
                             "'test%sname'" % u)
        for u in unsafe:
            self.assertEqual(shlex.quote("test%s'name'" % u),
                             "'test%s'\"'\"'name'\"'\"''" % u)

# Allow this test to be used with old shlex.py
项目:Slivka    作者:warownia1    | 项目源码 | 文件源码
def submit(self, values, cwd):
        queue_command = [
            'qsub', '-cwd', '-e', 'stderr.txt', '-o', 'stdout.txt'
        ] + self.qargs
        process = subprocess.Popen(
            queue_command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            stdin=subprocess.PIPE,
            cwd=cwd,
            env=self.env,
            universal_newlines=True
        )
        command_chunks = self.bin + self.get_options(values)
        command = ' '.join(shlex.quote(s) for s in command_chunks)
        stdin = ("echo > started;\n"
                 "%s;\n"
                 "echo > finished;") % command
        stdout, stderr = process.communicate(stdin)
        match = self.job_submission_regex.match(stdout)
        return match.group(1)
项目:alicloud-duplicity    作者:aliyun    | 项目源码 | 文件源码
def _list(self):
        # Do a long listing to avoid connection reset
        # remote_dir = urllib.unquote(self.parsed_url.path.lstrip('/')).rstrip()
        remote_dir = urllib.unquote(self.parsed_url.path)
        # print remote_dir
        quoted_path = cmd_quote(self.remote_path)
        # failing to cd into the folder might be because it was not created already
        commandline = "lftp -c \"source %s; ( cd %s && ls ) || ( mkdir -p %s && cd %s && ls )\"" % (
            cmd_quote(self.tempname),
            quoted_path, quoted_path, quoted_path
        )
        log.Debug("CMD: %s" % commandline)
        _, l, e = self.subprocess_popen(commandline)
        log.Debug("STDERR:\n"
                  "%s" % (e))
        log.Debug("STDOUT:\n"
                  "%s" % (l))

        # Look for our files as the last element of a long list line
        return [x.split()[-1] for x in l.split('\n') if x]
项目:tools    作者:freedict    | 项目源码 | 文件源码
def rm_doubled_senses(entry):
    """Some entries have multiple senses. A few of them are exactly the same,
    remove these.
    This function returns True if an element has been altered"""
    senses = list(findall(entry, 'sense'))
    if len(senses) == 1:
        return
    # obtain a mapping from XML node -> list of words within `<quote>…</quote>`
    senses = {sense: tuple(q.text.strip() for q in tei_iter(sense, 'quote')
            if q.text) for sense in senses}
    changed = False
    # pair each sense with another and compare their content
    for s1, s2 in itertools.combinations(senses.items(), 2):
        if len(s1[1]) == len(s2[1]):
            # if two senses are *excactly* identical
            if all(e1 == e2 for e1, e2 in zip(s1[1], s2[1])):
                try:
                    entry.remove(s2[0]) # sense node object
                    changed = True
                except ValueError: # already removed?
                    pass
    return changed
项目:tools    作者:freedict    | 项目源码 | 文件源码
def rm_doubled_quotes(entry):
    """Some entries have doubled quotes (translations) within different senses.
    Remove the doubled quotes.
    This function return True, if the entry has been modified."""
    senses = list(findall(entry, 'sense'))
    # add quote elements
    senses = [(cit, q)  for s in senses for cit in findall(s, 'cit')
            for q in findall(cit, 'quote')]
    if len(senses) <= 1:
        return
    changed = False
    # pair each sense with another and compare their content
    for trans1, trans2 in itertools.combinations(senses, 2):
        # could have been removed already, so check:
        cit1, quote1 = trans1
        cit2, quote2 = trans2
        if not cit1.findall(quote1.tag) or not cit2.findall(quote2.tag) \
                and cit1 is not cit2:
            continue # one of them has been removed already
        # text of both quotes match, remove second quote
        if quote1.text == quote2.text:
            cit2.remove(quote2)
            changed = True
    return changed
项目:yawn    作者:aclowes    | 项目源码 | 文件源码
def delay(func, *args, timeout=None, max_retries=0, queue=None):
    arguments = [shlex.quote(arg) for arg in args]
    command = 'yawn exec {0.__module__} {0.__name__} {1}'.format(
        func, ' '.join(arguments)).strip()
    task_name = '{0.__module__}.{0.__name__}({1})'.format(
        func, ', '.join(arguments))

    if queue:
        queue_obj, _ = Queue.objects.get_or_create(name=queue)
    else:
        queue_obj = Queue.get_default_queue()

    template, _ = Template.objects.get_or_create(
        name=task_name,
        command=command,
        queue=queue_obj,
        max_retries=max_retries,
        timeout=timeout
    )
    task = Task.objects.create(
        template=template
    )
    task.enqueue()
    return task
项目:raptiformica    作者:vdloo    | 项目源码 | 文件源码
def pull_origin_master(directory, host=None, port=22):
    """
    Pull origin master in a directory on the remote machine
    :param str directory: directory on the remote machine to pull origin master in
    :param str host: hostname or ip of the remote machine, None for the local machine
    :param int port: port to use to connect to the remote machine over ssh
    :return int exit_code: exit code of the remote command
    """
    log.info("Pulling origin master in {}".format(directory))
    pull_origin_master_command = 'cd {}; git pull origin master'.format(
        quote(directory)
    )
    exit_code, _, _ = run_command_print_ready(
        pull_origin_master_command, host=host, port=port,
        failure_callback=log_failure_factory(
            "Failed to pull origin master"
        ),
        buffered=False,
        shell=True
    )
    return exit_code
项目:raptiformica    作者:vdloo    | 项目源码 | 文件源码
def reset_hard_head(directory, host=None, port=22):
    """
    Reset a checkout to the HEAD of the branch
    :param str directory: directory on the remote machine to reset to HEAD in
    :param str host: hostname or ip of the remote machine, or None for local
    :param int port: port to use to connect to the remote machine over ssh
    :return int exit_code: exit code of the remote command
    """
    log.info("Resetting to HEAD in {}".format(directory))
    reset_hard_command = 'cd {}; git reset --hard ' \
                         'HEAD'.format(quote(directory))
    exit_code, _, _ = run_command_print_ready(
        reset_hard_command, host=host, port=port,
        failure_callback=log_failure_factory(
            "Failed to reset to HEAD"
        ),
        buffered=False,
        shell=True
    )
    return exit_code
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testQuote(self):
        safeunquoted = string.ascii_letters + string.digits + '@%_-+=:,./'
        unicode_sample = '\xe9\xe0\xdf'  # e + acute accent, a + grave, sharp s
        unsafe = '"`$\\!' + unicode_sample

        self.assertEqual(shlex.quote(''), "''")
        self.assertEqual(shlex.quote(safeunquoted), safeunquoted)
        self.assertEqual(shlex.quote('test file name'), "'test file name'")
        for u in unsafe:
            self.assertEqual(shlex.quote('test%sname' % u),
                             "'test%sname'" % u)
        for u in unsafe:
            self.assertEqual(shlex.quote("test%s'name'" % u),
                             "'test%s'\"'\"'name'\"'\"''" % u)

# Allow this test to be used with old shlex.py
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def get_command(self, file, **options):
            # on darwin open returns immediately resulting in the temp
            # file removal while app is opening
            command = "open -a /Applications/Preview.app"
            command = "(%s %s; sleep 20; rm -f %s)&" % (command, quote(file),
                                                        quote(file))
            return command
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def show_file(self, file, **options):
            command, executable = self.get_command_ex(file, **options)
            command = "(%s %s; rm -f %s)&" % (command, quote(file),
                                              quote(file))
            os.system(command)
            return 1

    # implementations
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def get_command_ex(self, file, title=None, **options):
            # note: xv is pretty outdated.  most modern systems have
            # imagemagick's display command instead.
            command = executable = "xv"
            if title:
                command += " -name %s" % quote(title)
            return command, executable
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def get_command(self, file, **options):
            # on darwin open returns immediately resulting in the temp
            # file removal while app is opening
            command = "open -a /Applications/Preview.app"
            command = "(%s %s; sleep 20; rm -f %s)&" % (command, quote(file),
                                                        quote(file))
            return command
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def show_file(self, file, **options):
            command, executable = self.get_command_ex(file, **options)
            command = "(%s %s; rm -f %s)&" % (command, quote(file),
                                              quote(file))
            os.system(command)
            return 1

    # implementations
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def get_command_ex(self, file, title=None, **options):
            # note: xv is pretty outdated.  most modern systems have
            # imagemagick's display command instead.
            command = executable = "xv"
            if title:
                command += " -name %s" % quote(title)
            return command, executable
项目:doctr    作者:drdoctr    | 项目源码 | 文件源码
def run(args, shell=False, exit=True):
    """
    Run the command ``args``.

    Automatically hides the secret GitHub token from the output.

    If shell=False (recommended for most commands), args should be a list of
    strings. If shell=True, args should be a string of the command to run.

    If exit=True, it exits on nonzero returncode. Otherwise it returns the
    returncode.
    """
    if "GH_TOKEN" in os.environ:
        token = get_token()
    else:
        token = b''

    if not shell:
        command = ' '.join(map(shlex.quote, args))
    else:
        command = args
    command = command.replace(token.decode('utf-8'), '~'*len(token))
    print(blue(command))
    sys.stdout.flush()

    returncode = run_command_hiding_token(args, token, shell=shell)

    if exit and returncode != 0:
        sys.exit(red("%s failed: %s" % (command, returncode)))
    return returncode
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def xcheck_host_prog(conf, name, tool, wafname=None):
    wafname = wafname or name

    chost, chost_envar = get_chost_stuff(conf)

    specific = None
    if chost:
        specific = os.environ.get('%s_%s' % (chost_envar, name), None)

    if specific:
        value = Utils.to_list(specific)
        conf.env[wafname] += value
        conf.msg('Will use cross-compilation %s from %s_%s' \
         % (name, chost_envar, name),
         " ".join(quote(x) for x in value))
        return
    else:
        envar = os.environ.get('HOST_%s' % name, None)
        if envar is not None:
            value = Utils.to_list(envar)
            conf.env[wafname] = value
            conf.msg('Will use cross-compilation %s from HOST_%s' \
             % (name, name),
             " ".join(quote(x) for x in value))
            return

    if conf.env[wafname]:
        return

    value = None
    if chost:
        value = '%s-%s' % (chost, tool)

    if value:
        conf.env[wafname] = value
        conf.msg('Will use cross-compilation %s from CHOST' \
         % wafname, value)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def xcheck_host_envar(conf, name, wafname=None):
    wafname = wafname or name

    chost, chost_envar = get_chost_stuff(conf)

    specific = None
    if chost:
        specific = os.environ.get('%s_%s' % (chost_envar, name), None)

    if specific:
        value = Utils.to_list(specific)
        conf.env[wafname] += value
        conf.msg('Will use cross-compilation %s from %s_%s' \
         % (name, chost_envar, name),
         " ".join(quote(x) for x in value))
        return


    envar = os.environ.get('HOST_%s' % name, None)
    if envar is None:
        return

    value = Utils.to_list(envar) if envar != '' else [envar]

    conf.env[wafname] = value
    conf.msg('Will use cross-compilation %s from HOST_%s' \
     % (name, name),
     " ".join(quote(x) for x in value))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def write_compilation_database(ctx):
    "Write the clang compilation database as JSON"
    database_file = ctx.bldnode.make_node('compile_commands.json')
    Logs.info("Build commands will be stored in %s" % database_file.path_from(ctx.path))
    try:
        root = json.load(database_file)
    except IOError:
        root = []
    clang_db = dict((x["file"], x) for x in root)
    for task in getattr(ctx, 'clang_compilation_database_tasks', []):
        try:
            cmd = task.last_cmd
        except AttributeError:
            continue
        directory = getattr(task, 'cwd', ctx.variant_dir)
        f_node = task.inputs[0]
        filename = os.path.relpath(f_node.abspath(), directory)
        cmd = " ".join(map(quote, cmd))
        entry = {
            "directory": directory,
            "command": cmd,
            "file": filename,
        }
        clang_db[filename] = entry
    root = list(clang_db.values())
    database_file.write(json.dumps(root, indent=2))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def xcheck_host_envar(conf, name, wafname=None):
    wafname = wafname or name

    chost, chost_envar = get_chost_stuff(conf)

    specific = None
    if chost:
        specific = os.environ.get('%s_%s' % (chost_envar, name), None)

    if specific:
        value = Utils.to_list(specific)
        conf.env[wafname] += value
        conf.msg('Will use cross-compilation %s from %s_%s' \
         % (name, chost_envar, name),
         " ".join(quote(x) for x in value))
        return


    envar = os.environ.get('HOST_%s' % name, None)
    if envar is None:
        return

    value = Utils.to_list(envar) if envar != '' else [envar]

    conf.env[wafname] = value
    conf.msg('Will use cross-compilation %s from HOST_%s' \
     % (name, name),
     " ".join(quote(x) for x in value))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def write_compilation_database(ctx):
    "Write the clang compilation database as JSON"
    database_file = ctx.bldnode.make_node('compile_commands.json')
    Logs.info("Build commands will be stored in %s" % database_file.path_from(ctx.path))
    try:
        root = json.load(database_file)
    except IOError:
        root = []
    clang_db = dict((x["file"], x) for x in root)
    for task in getattr(ctx, 'clang_compilation_database_tasks', []):
        try:
            cmd = task.last_cmd
        except AttributeError:
            continue
        directory = getattr(task, 'cwd', ctx.variant_dir)
        f_node = task.inputs[0]
        filename = os.path.relpath(f_node.abspath(), directory)
        cmd = " ".join(map(quote, cmd))
        entry = {
            "directory": directory,
            "command": cmd,
            "file": filename,
        }
        clang_db[filename] = entry
    root = list(clang_db.values())
    database_file.write(json.dumps(root, indent=2))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def xcheck_prog(conf, var, tool, cross=False):
    value = os.environ.get(var, '')
    value = Utils.to_list(value)

    if not value:
        return

    conf.env[var] = value
    if cross:
        pretty = 'cross-compilation %s' % var
    else:
        pretty = var
    conf.msg('Will use %s' % pretty,
     " ".join(quote(x) for x in value))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def xcheck_envar(conf, name, wafname=None, cross=False):
    wafname = wafname or name
    value = os.environ.get(name, None)
    value = Utils.to_list(value)

    if not value:
        return

    conf.env[wafname] += value
    if cross:
        pretty = 'cross-compilation %s' % wafname
    else:
        pretty = wafname
    conf.msg('Will use %s' % pretty,
     " ".join(quote(x) for x in value))