我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用shlex.quote()。
def xcheck_envar(conf, name, wafname=None, cross=False): wafname = wafname or name envar = os.environ.get(name, None) if envar is None: return value = Utils.to_list(envar) if envar != '' else [envar] conf.env[wafname] = value if cross: pretty = 'cross-compilation %s' % wafname else: pretty = wafname conf.msg('Will use %s' % pretty, " ".join(quote(x) for x in value))
def printCommand(arg1: "typing.Union[str, typing.Sequence[typing.Any]]", *remainingArgs, outputFile=None, colour=AnsiColour.yellow, cwd=None, env=None, sep=" ", printVerboseOnly=False, **kwargs): if not _cheriConfig or (_cheriConfig.quiet or (printVerboseOnly and not _cheriConfig.verbose)): return # also allow passing a single string if not type(arg1) is str: allArgs = arg1 arg1 = allArgs[0] remainingArgs = allArgs[1:] newArgs = ("cd", shlex.quote(str(cwd)), "&&") if cwd else tuple() if env: # only print the changed environment entries filteredEnv = __filterEnv(env) if filteredEnv: newArgs += ("env",) + tuple(map(shlex.quote, (k + "=" + str(v) for k, v in filteredEnv.items()))) # comma in tuple is required otherwise it creates a tuple of string chars newArgs += (shlex.quote(str(arg1)),) + tuple(map(shlex.quote, map(str, remainingArgs))) if outputFile: newArgs += (">", str(outputFile)) print(coloured(colour, newArgs, sep=sep), flush=True, **kwargs)
def process_pre(self): import tempfile self.temp_dir = tempfile.TemporaryDirectory() self.environ = {'PYTHONPATH': pythonpath()} self.outfname = bpy.path.ensure_ext(self.filepath, ".zip") self.command = ( bpy.app.binary_path_python, '-m', 'bam.pack', # file to pack "--input", bpy.data.filepath, # file to write "--output", self.outfname, "--temp", self.temp_dir.name, ) if self.log.isEnabledFor(logging.INFO): import shlex cmd_to_log = ' '.join(shlex.quote(s) for s in self.command) self.log.info('Executing %s', cmd_to_log)
def shell_script(self): def _trace(command): return 'echo + {}\n{} '.format( shlex.quote(command), command ) commands = [] after_success = [_trace(cmd) for cmd in self.after_success] after_failure = [_trace(cmd) for cmd in self.after_failure] for service in self.services: commands.append(_trace('service {} start'.format(service))) for script in self.scripts: commands.append(_trace(script)) command_encoded = shlex.quote(to_text(base64.b64encode(to_binary('\n'.join(commands))))) context = { 'command': command_encoded, 'after_success': ' \n'.join(after_success), 'after_failure': ' \n'.join(after_failure), } script = render_template('script.sh', **context) logger.debug('Build script: \n%s', script) return script
def files_in_archive(self, force_refresh=False): if self._files_in_archive and not force_refresh: return self._files_in_archive cmd = [ self.cmd_path, '--list', '-C', self.dest ] if self.zipflag: cmd.append(self.zipflag) if self.opts: cmd.extend([ '--show-transformed-names' ] + self.opts) if self.excludes: cmd.extend([ '--exclude=' + quote(f) for f in self.excludes ]) cmd.extend([ '-f', self.src ]) rc, out, err = self.module.run_command(cmd, cwd=self.dest, environ_update=dict(LANG='C', LC_ALL='C', LC_MESSAGES='C')) if rc != 0: raise UnarchiveError('Unable to list files in the archive') for filename in out.splitlines(): # Compensate for locale-related problems in gtar output (octal unicode representation) #11348 # filename = filename.decode('string_escape') filename = codecs.escape_decode(filename)[0] if filename and filename not in self.excludes: self._files_in_archive.append(to_native(filename)) return self._files_in_archive
def unarchive(self): cmd = [ self.cmd_path, '--extract', '-C', self.dest ] if self.zipflag: cmd.append(self.zipflag) if self.opts: cmd.extend([ '--show-transformed-names' ] + self.opts) if self.file_args['owner']: cmd.append('--owner=' + quote(self.file_args['owner'])) if self.file_args['group']: cmd.append('--group=' + quote(self.file_args['group'])) if self.module.params['keep_newer']: cmd.append('--keep-newer-files') if self.excludes: cmd.extend([ '--exclude=' + quote(f) for f in self.excludes ]) cmd.extend([ '-f', self.src ]) rc, out, err = self.module.run_command(cmd, cwd=self.dest, environ_update=dict(LANG='C', LC_ALL='C', LC_MESSAGES='C')) return dict(cmd=cmd, rc=rc, out=out, err=err)
def test_alias_magic(): """Test %alias_magic.""" ip = get_ipython() mm = ip.magics_manager # Basic operation: both cell and line magics are created, if possible. ip.run_line_magic('alias_magic', 'timeit_alias timeit') nt.assert_in('timeit_alias', mm.magics['line']) nt.assert_in('timeit_alias', mm.magics['cell']) # --cell is specified, line magic not created. ip.run_line_magic('alias_magic', '--cell timeit_cell_alias timeit') nt.assert_not_in('timeit_cell_alias', mm.magics['line']) nt.assert_in('timeit_cell_alias', mm.magics['cell']) # Test that line alias is created successfully. ip.run_line_magic('alias_magic', '--line env_alias env') nt.assert_equal(ip.run_line_magic('env', ''), ip.run_line_magic('env_alias', '')) # Test that line alias with parameters passed in is created successfully. ip.run_line_magic('alias_magic', '--line history_alias history --params ' + shlex.quote('3')) nt.assert_in('history_alias', mm.magics['line'])
def opened_files(path, excludes): files = [] try: process = os.popen('lsof -wFn +D %s | tail -n +2 | cut -c2-' % cmd_quote(path)) data = process.read() process.close() for item in data.split('\n'): if not item or len(item) <= 2 or os.path.isdir(item) or item.isdigit() or file_excluded(item, excludes): continue files.append(item) return files except Exception as ex: logger.exception("Exception checking %r: ", path) return None
def rclone_move_command(local, remote, transfers, checkers, bwlimit, excludes, chunk_size, dry_run): upload_cmd = 'rclone move %s %s' \ ' --delete-after' \ ' --no-traverse' \ ' --stats=60s' \ ' -v' \ ' --transfers=%d' \ ' --checkers=%d' \ ' --drive-chunk-size=%s' % \ (cmd_quote(local), cmd_quote(remote), transfers, checkers, chunk_size) if bwlimit and len(bwlimit): upload_cmd += ' --bwlimit="%s"' % bwlimit for item in excludes: upload_cmd += ' --exclude="%s"' % item if dry_run: upload_cmd += ' --dry-run' return upload_cmd
def remove_empty_directories(config, force_dry_run=False): open_files = opened_files(config['local_folder'], config['lsof_excludes']) if not len(open_files): clearing = False for dir, depth in config['rclone_remove_empty_on_upload'].items(): if os.path.exists(dir): clearing = True logger.debug("Removing empty directories from %r with mindepth %r", dir, depth) cmd = 'find %s -mindepth %d -type d -empty' % (cmd_quote(dir), depth) if not config['dry_run'] and not force_dry_run: cmd += ' -delete' run_command(cmd) if clearing: logger.debug("Finished clearing empty directories") else: logger.debug("Skipped removing empty directories because %d files are currently open: %r", len(open_files), open_files) ############################################################ # CONFIG STUFF ############################################################
def test_activate(monkeypatch): can_connect_args = _monkeypatch_can_connect_to_socket_to_succeed(monkeypatch) def activate_redis_url(dirname): project_dir_disable_dedicated_env(dirname) result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None) assert can_connect_args['port'] == 6379 assert result is not None if platform.system() == 'Windows': result = [line for line in result if not line.startswith("export PATH")] print("activate changed PATH on Windows and ideally it would not.") if len(result) > 2: import os print("os.environ=" + repr(os.environ)) print("result=" + repr(result)) assert ['export PROJECT_DIR=' + quote(dirname), 'export REDIS_URL=redis://localhost:6379'] == result with_directory_contents_completing_project_file( {DEFAULT_PROJECT_FILENAME: """ services: REDIS_URL: redis """}, activate_redis_url)
def test_activate_quoting(monkeypatch): def activate_foo(dirname): project_dir_disable_dedicated_env(dirname) result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None) assert result is not None if platform.system() == 'Windows': result = [line for line in result if not line.startswith("export PATH")] print("activate changed PATH on Windows and ideally it would not.") assert ["export FOO='$! boo'", 'export PROJECT_DIR=' + quote(dirname)] == result with_directory_contents_completing_project_file( { DEFAULT_PROJECT_FILENAME: """ variables: FOO: {} """, DEFAULT_LOCAL_STATE_FILENAME: """ variables: FOO: $! boo """ }, activate_foo)
def _shell_replace_vars(cmd, local_vars): spans = [] replacements = [] for match in _dollar_var_re.finditer(cmd): varname = match.group(1) if varname.isdigit(): # $1, $2 for sys.argv, just like bash value = sys.argv[int(varname)] elif varname == '#': value = len(sys.argv) - 1 elif varname == '@': value = ' '.join(map(shlex.quote, sys.argv[1:])) else: assert is_variable_name(varname), 'not a valid var name: ' + varname if not varname in local_vars: continue value = local_vars[varname] if isinstance(value, str): value = shlex.quote(value) else: value = str(value) spans.append(match.span()) replacements.append(value) return _replace_n(cmd, spans, replacements)
def run(self, context, KEY, ACTION): keymap = self.tui.keymap key = KEY if len(ACTION) == 1 and ACTION[0][0] == '<' and ACTION[0][-1] == '>': # ACTION is another key (e.g. 'j' -> 'down') action = keymap.mkkey(ACTION[0]) else: action = ' '.join(shlex.quote(x) for x in ACTION) if context is None: from ...tui.keymap import DEFAULT_CONTEXT context = DEFAULT_CONTEXT elif context not in _get_KEYMAP_CONTEXTS(): log.error('Invalid context: {!r}'.format(context)) return False try: keymap.bind(key, action, context=context) except ValueError as e: log.error(e) return False else: return True
def get_output_nocheck(self, *cmd, **kwargs): proc = self.create_subprocess(cmd, stdout=subprocess.PIPE, universal_newlines=True, **kwargs) # FIXME: support Python 2? with proc: stdout = proc.communicate()[0] stdout = stdout.rstrip() exitcode = proc.wait() if exitcode: cmd_str = ' '.join(map(shlex.quote, cmd)) self.logger.error("Command %s failed with exit code %s" % (cmd_str, exitcode)) return (exitcode, stdout)
def _spawn_ffmpeg(self): if self.streaming: url = self._stream_url elif self.playing: url = self._song_context.song_url else: raise RuntimeError('Player is in an invalid state') args = shlex.split(self._ffmpeg_command.format(shlex.quote(url))) try: self._ffmpeg = subprocess.Popen(args) except FileNotFoundError as e: raise RuntimeError('ffmpeg executable was not found') from e except subprocess.SubprocessError as e: raise RuntimeError('Popen failed: {0.__name__} {1}'.format(type(e), str(e))) from e # # Player FSM #
def testQuote(self): safeunquoted = string.ascii_letters + string.digits + '@%_-+=:,./' unicode_sample = '\xe9\xe0\xdf' # e + acute accent, a + grave, sharp s unsafe = '"`$\\!' + unicode_sample self.assertEqual(shlex.quote(''), "''") self.assertEqual(shlex.quote(safeunquoted), safeunquoted) self.assertEqual(shlex.quote('test file name'), "'test file name'") for u in unsafe: self.assertEqual(shlex.quote('test%sname' % u), "'test%sname'" % u) for u in unsafe: self.assertEqual(shlex.quote("test%s'name'" % u), "'test%s'\"'\"'name'\"'\"''" % u) # Allow this test to be used with old shlex.py
def restore_vm(self, new_vm, new_name, size, qvm_create_args, vm_keys, backup_storage_vm): subprocess.check_call("qvm-create "+shlex.quote(new_name)+" "+qvm_create_args, shell=True) subprocess.check_call(["qvm-prefs", "-s", new_name, "netvm", "none"]) # Safe approach… if size is not None: subprocess.check_call(["qvm-grow-private", new_name, size]) with Dvm() as dvm: dvm.attach("xvdz", new_vm.private_volume()) try: if size is not None: dvm.check_call("sudo e2fsck -f -p /dev/xvdz") dvm.check_call("sudo resize2fs /dev/xvdz") dvm.check_call("sudo mkdir /mnt/clone") dvm.check_call("sudo mount /dev/xvdz /mnt/clone") try: self.upload_agent(dvm) with self.add_permissions(backup_storage_vm, dvm, vm_keys.encrypted_name): dvm.check_call("/tmp/restore-agent "+shlex.quote(backup_storage_vm.get_name())+" "+shlex.quote(vm_keys.encrypted_name), input = vm_keys.key, stdout = None, stderr = None) finally: dvm.check_call("sudo umount /mnt/clone") finally: dvm.detach_all() # abstract def upload_agent(self, dvm)
def main(): if not shutil.which('borg'): print('The \'borg\' command can\'t be found in the PATH. Please correctly install borgbackup first.') print('See instructions at https://borgbackup.readthedocs.io/en/stable/installation.html') return 1 parser = build_parser() args = parser.parse_args() logging.basicConfig(level=args.log_level, format='%(message)s') if 'function' not in args: return parser.print_help() try: return args.function(args) except subprocess.CalledProcessError as cpe: print('{} invocation failed with status {}'.format(cpe.cmd[0], cpe.returncode)) print('Command line was:', *[shlex.quote(s) for s in cpe.cmd]) return cpe.returncode
def get_package_version(prefix, connection, package_name): command = "dpkg-query --showformat='${Version}' --show %s" % shlex.quote( package_name) result = await connection.run(command) if result.exit_status != os.EX_OK: click.echo( "{0}package (failed {1}): {2} - {3}".format( prefix, result.exit_status, package_name, result.stderr.strip() ) ) else: click.echo( "{0}package (ok): {1}=={2}".format( prefix, package_name, result.stdout.strip() ) )
def run(self): data = cluster_data.ClusterData.find_one(self.cluster.model_id) cluster_name = data.global_vars.get("cluster", self.cluster.name) cluster_name = shlex.quote(cluster_name) cluster_servers = {item._id: item for item in self.cluster.server_list} mons = [ cluster_servers[item["server_id"]] for item in self.cluster.configuration.state if item["role"] == "mons"] if not mons: return version_result = await self.execute_cmd( "ceph --cluster {0} health --format json".format(cluster_name), random.choice(mons)) self.manage_errors( "Cannot execute ceph health command on %s (%s): %s", "Not all hosts have working ceph command", version_result.errors ) self.manage_health(version_result)
def run(self): data = cluster_data.ClusterData.find_one(self.cluster.model_id) cluster_name = data.global_vars.get("cluster", self.cluster.name) cluster_name = shlex.quote(cluster_name) version_result = await self.execute_cmd( "ceph --cluster {0} version".format(cluster_name), *self.cluster.server_list) self.manage_errors( "Cannot execute ceph version command on %s (%s): %s", "Not all hosts have working ceph command", version_result.errors ) results = list(parse_results(version_result.ok)) self.manage_versions(results) self.manage_commits(results)
def _write_command_to_file(self, env, arglist): envvar_settings_list = [] if "DIALOGRC" in env: envvar_settings_list.append( "DIALOGRC={0}".format(_shell_quote(env["DIALOGRC"]))) for var in self._lowlevel_exit_code_varnames: varname = "DIALOG_" + var envvar_settings_list.append( "{0}={1}".format(varname, _shell_quote(env[varname]))) command_str = ' '.join(envvar_settings_list + list(imap(_shell_quote, arglist))) s = "{separator}{cmd}\n\nArgs: {args!r}\n".format( separator="" if self._debug_first_output else ("-" * 79) + "\n", cmd=command_str, args=arglist) self._debug_logfile.write(s) if self._debug_always_flush: self._debug_logfile.flush() self._debug_first_output = False
def action(action_name=None, keep_comments=False, escape=True): def decorator(func): action = action_name or func.__name__.replace('_', '-') def function_wrapper(*args, **kw): if not keep_comments: args = [i for i in args if not i.startswith('#')] if escape: args = [shlex.quote(i) for i in args] res = func(*args, **kw) # allow actions to yield each line if not isinstance(res, str) and res is not None: res = '\n'.join(res) return res state['actions'][action] = function_wrapper return function_wrapper return decorator
def brute(i): global flag global last_breakpoint for c in charset: flag[i] = c output = gdb.execute('r < <(echo {})'.format(shlex.quote(''.join(flag))), True, True) # skip floating point exception while "SIGFPE" in output: output = gdb.execute('c', True, True) output = gdb.execute('x $pc', True, True) pc = output.split(":")[0] pc = int(pc, 16) if pc > last_breakpoint: last_breakpoint = pc break print(''.join(flag))
def submit(self, values, cwd): queue_command = [ 'qsub', '-cwd', '-e', 'stderr.txt', '-o', 'stdout.txt' ] + self.qargs process = subprocess.Popen( queue_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, cwd=cwd, env=self.env, universal_newlines=True ) command_chunks = self.bin + self.get_options(values) command = ' '.join(shlex.quote(s) for s in command_chunks) stdin = ("echo > started;\n" "%s;\n" "echo > finished;") % command stdout, stderr = process.communicate(stdin) match = self.job_submission_regex.match(stdout) return match.group(1)
def _list(self): # Do a long listing to avoid connection reset # remote_dir = urllib.unquote(self.parsed_url.path.lstrip('/')).rstrip() remote_dir = urllib.unquote(self.parsed_url.path) # print remote_dir quoted_path = cmd_quote(self.remote_path) # failing to cd into the folder might be because it was not created already commandline = "lftp -c \"source %s; ( cd %s && ls ) || ( mkdir -p %s && cd %s && ls )\"" % ( cmd_quote(self.tempname), quoted_path, quoted_path, quoted_path ) log.Debug("CMD: %s" % commandline) _, l, e = self.subprocess_popen(commandline) log.Debug("STDERR:\n" "%s" % (e)) log.Debug("STDOUT:\n" "%s" % (l)) # Look for our files as the last element of a long list line return [x.split()[-1] for x in l.split('\n') if x]
def rm_doubled_senses(entry): """Some entries have multiple senses. A few of them are exactly the same, remove these. This function returns True if an element has been altered""" senses = list(findall(entry, 'sense')) if len(senses) == 1: return # obtain a mapping from XML node -> list of words within `<quote>…</quote>` senses = {sense: tuple(q.text.strip() for q in tei_iter(sense, 'quote') if q.text) for sense in senses} changed = False # pair each sense with another and compare their content for s1, s2 in itertools.combinations(senses.items(), 2): if len(s1[1]) == len(s2[1]): # if two senses are *excactly* identical if all(e1 == e2 for e1, e2 in zip(s1[1], s2[1])): try: entry.remove(s2[0]) # sense node object changed = True except ValueError: # already removed? pass return changed
def rm_doubled_quotes(entry): """Some entries have doubled quotes (translations) within different senses. Remove the doubled quotes. This function return True, if the entry has been modified.""" senses = list(findall(entry, 'sense')) # add quote elements senses = [(cit, q) for s in senses for cit in findall(s, 'cit') for q in findall(cit, 'quote')] if len(senses) <= 1: return changed = False # pair each sense with another and compare their content for trans1, trans2 in itertools.combinations(senses, 2): # could have been removed already, so check: cit1, quote1 = trans1 cit2, quote2 = trans2 if not cit1.findall(quote1.tag) or not cit2.findall(quote2.tag) \ and cit1 is not cit2: continue # one of them has been removed already # text of both quotes match, remove second quote if quote1.text == quote2.text: cit2.remove(quote2) changed = True return changed
def delay(func, *args, timeout=None, max_retries=0, queue=None): arguments = [shlex.quote(arg) for arg in args] command = 'yawn exec {0.__module__} {0.__name__} {1}'.format( func, ' '.join(arguments)).strip() task_name = '{0.__module__}.{0.__name__}({1})'.format( func, ', '.join(arguments)) if queue: queue_obj, _ = Queue.objects.get_or_create(name=queue) else: queue_obj = Queue.get_default_queue() template, _ = Template.objects.get_or_create( name=task_name, command=command, queue=queue_obj, max_retries=max_retries, timeout=timeout ) task = Task.objects.create( template=template ) task.enqueue() return task
def pull_origin_master(directory, host=None, port=22): """ Pull origin master in a directory on the remote machine :param str directory: directory on the remote machine to pull origin master in :param str host: hostname or ip of the remote machine, None for the local machine :param int port: port to use to connect to the remote machine over ssh :return int exit_code: exit code of the remote command """ log.info("Pulling origin master in {}".format(directory)) pull_origin_master_command = 'cd {}; git pull origin master'.format( quote(directory) ) exit_code, _, _ = run_command_print_ready( pull_origin_master_command, host=host, port=port, failure_callback=log_failure_factory( "Failed to pull origin master" ), buffered=False, shell=True ) return exit_code
def reset_hard_head(directory, host=None, port=22): """ Reset a checkout to the HEAD of the branch :param str directory: directory on the remote machine to reset to HEAD in :param str host: hostname or ip of the remote machine, or None for local :param int port: port to use to connect to the remote machine over ssh :return int exit_code: exit code of the remote command """ log.info("Resetting to HEAD in {}".format(directory)) reset_hard_command = 'cd {}; git reset --hard ' \ 'HEAD'.format(quote(directory)) exit_code, _, _ = run_command_print_ready( reset_hard_command, host=host, port=port, failure_callback=log_failure_factory( "Failed to reset to HEAD" ), buffered=False, shell=True ) return exit_code
def get_command(self, file, **options): # on darwin open returns immediately resulting in the temp # file removal while app is opening command = "open -a /Applications/Preview.app" command = "(%s %s; sleep 20; rm -f %s)&" % (command, quote(file), quote(file)) return command
def show_file(self, file, **options): command, executable = self.get_command_ex(file, **options) command = "(%s %s; rm -f %s)&" % (command, quote(file), quote(file)) os.system(command) return 1 # implementations
def get_command_ex(self, file, title=None, **options): # note: xv is pretty outdated. most modern systems have # imagemagick's display command instead. command = executable = "xv" if title: command += " -name %s" % quote(title) return command, executable
def run(args, shell=False, exit=True): """ Run the command ``args``. Automatically hides the secret GitHub token from the output. If shell=False (recommended for most commands), args should be a list of strings. If shell=True, args should be a string of the command to run. If exit=True, it exits on nonzero returncode. Otherwise it returns the returncode. """ if "GH_TOKEN" in os.environ: token = get_token() else: token = b'' if not shell: command = ' '.join(map(shlex.quote, args)) else: command = args command = command.replace(token.decode('utf-8'), '~'*len(token)) print(blue(command)) sys.stdout.flush() returncode = run_command_hiding_token(args, token, shell=shell) if exit and returncode != 0: sys.exit(red("%s failed: %s" % (command, returncode))) return returncode
def xcheck_host_prog(conf, name, tool, wafname=None): wafname = wafname or name chost, chost_envar = get_chost_stuff(conf) specific = None if chost: specific = os.environ.get('%s_%s' % (chost_envar, name), None) if specific: value = Utils.to_list(specific) conf.env[wafname] += value conf.msg('Will use cross-compilation %s from %s_%s' \ % (name, chost_envar, name), " ".join(quote(x) for x in value)) return else: envar = os.environ.get('HOST_%s' % name, None) if envar is not None: value = Utils.to_list(envar) conf.env[wafname] = value conf.msg('Will use cross-compilation %s from HOST_%s' \ % (name, name), " ".join(quote(x) for x in value)) return if conf.env[wafname]: return value = None if chost: value = '%s-%s' % (chost, tool) if value: conf.env[wafname] = value conf.msg('Will use cross-compilation %s from CHOST' \ % wafname, value)
def xcheck_host_envar(conf, name, wafname=None): wafname = wafname or name chost, chost_envar = get_chost_stuff(conf) specific = None if chost: specific = os.environ.get('%s_%s' % (chost_envar, name), None) if specific: value = Utils.to_list(specific) conf.env[wafname] += value conf.msg('Will use cross-compilation %s from %s_%s' \ % (name, chost_envar, name), " ".join(quote(x) for x in value)) return envar = os.environ.get('HOST_%s' % name, None) if envar is None: return value = Utils.to_list(envar) if envar != '' else [envar] conf.env[wafname] = value conf.msg('Will use cross-compilation %s from HOST_%s' \ % (name, name), " ".join(quote(x) for x in value))
def write_compilation_database(ctx): "Write the clang compilation database as JSON" database_file = ctx.bldnode.make_node('compile_commands.json') Logs.info("Build commands will be stored in %s" % database_file.path_from(ctx.path)) try: root = json.load(database_file) except IOError: root = [] clang_db = dict((x["file"], x) for x in root) for task in getattr(ctx, 'clang_compilation_database_tasks', []): try: cmd = task.last_cmd except AttributeError: continue directory = getattr(task, 'cwd', ctx.variant_dir) f_node = task.inputs[0] filename = os.path.relpath(f_node.abspath(), directory) cmd = " ".join(map(quote, cmd)) entry = { "directory": directory, "command": cmd, "file": filename, } clang_db[filename] = entry root = list(clang_db.values()) database_file.write(json.dumps(root, indent=2))
def xcheck_prog(conf, var, tool, cross=False): value = os.environ.get(var, '') value = Utils.to_list(value) if not value: return conf.env[var] = value if cross: pretty = 'cross-compilation %s' % var else: pretty = var conf.msg('Will use %s' % pretty, " ".join(quote(x) for x in value))
def xcheck_envar(conf, name, wafname=None, cross=False): wafname = wafname or name value = os.environ.get(name, None) value = Utils.to_list(value) if not value: return conf.env[wafname] += value if cross: pretty = 'cross-compilation %s' % wafname else: pretty = wafname conf.msg('Will use %s' % pretty, " ".join(quote(x) for x in value))