我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pipes.quote()。
def get_speech(self, phrase): getinfo_url = 'http://www.peiyinge.com/make/getSynthSign' voice_baseurl = 'http://proxy.peiyinge.com:17063/synth?ts=' data = { 'content': phrase.encode('utf8') } result_info = requests.post(getinfo_url, data=data).json() content = urllib.quote(phrase.encode('utf8')) ts = result_info['ts'] sign = result_info['sign'] voice_url = voice_baseurl + ts + '&sign=' + sign + \ '&vid=' + self.vid + '&volume=&speed=0&content=' + content r = requests.get(voice_url) with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f: f.write(r.content) tmpfile = f.name return tmpfile
def _RunGdb(device, package_name, output_directory, target_cpu, extra_args, verbose): gdb_script_path = os.path.dirname(__file__) + '/adb_gdb' cmd = [ gdb_script_path, '--package-name=%s' % package_name, '--output-directory=%s' % output_directory, '--adb=%s' % adb_wrapper.AdbWrapper.GetAdbPath(), '--device=%s' % device.serial, # Use one lib dir per device so that changing between devices does require # refetching the device libs. '--pull-libs-dir=/tmp/adb-gdb-libs-%s' % device.serial, ] # Enable verbose output of adb_gdb if it's set for this script. if verbose: cmd.append('--verbose') if target_cpu: cmd.append('--target-arch=%s' % _TargetCpuToTargetArch(target_cpu)) cmd.extend(extra_args) logging.warning('Running: %s', ' '.join(pipes.quote(x) for x in cmd)) print _Colorize('YELLOW', 'All subsequent output is from adb_gdb script.') os.execv(gdb_script_path, cmd)
def upload(self, step, buildIdFile, tgzFile): # only upload if requested if not self.canUploadJenkins(): return "" # upload with curl if file does not exist yet on server return "\n" + textwrap.dedent("""\ # upload artifact cd $WORKSPACE BOB_UPLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}" BOB_UPLOAD_URL="{URL}/${{BOB_UPLOAD_BID:0:2}}/${{BOB_UPLOAD_BID:2:2}}/${{BOB_UPLOAD_BID:4}}{SUFFIX}" if ! curl --output /dev/null --silent --head --fail "$BOB_UPLOAD_URL" ; then BOB_UPLOAD_RSP=$(curl -sSgf -w '%{{http_code}}' -H 'If-None-Match: *' -T {RESULT} "$BOB_UPLOAD_URL" || true) if [[ $BOB_UPLOAD_RSP != 2?? && $BOB_UPLOAD_RSP != 412 ]]; then echo "Upload failed with code $BOB_UPLOAD_RSP"{FAIL} fi fi""".format(URL=self.__url.geturl(), BUILDID=quote(buildIdFile), RESULT=quote(tgzFile), FAIL="" if self._ignoreErrors() else "; exit 1", GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX))
def uploadJenkinsLiveBuildId(self, step, liveBuildId, buildId): # only upload if requested if not self.canUploadJenkins(): return "" # upload with curl if file does not exist yet on server return "\n" + textwrap.dedent("""\ # upload live build-id cd $WORKSPACE BOB_UPLOAD_BID="$(hexdump -ve '/1 "%02x"' {LIVEBUILDID}){GEN}" BOB_UPLOAD_URL="{URL}/${{BOB_UPLOAD_BID:0:2}}/${{BOB_UPLOAD_BID:2:2}}/${{BOB_UPLOAD_BID:4}}{SUFFIX}" BOB_UPLOAD_RSP=$(curl -sSgf -w '%{{http_code}}' -H 'If-None-Match: *' -T {BUILDID} "$BOB_UPLOAD_URL" || true) if [[ $BOB_UPLOAD_RSP != 2?? && $BOB_UPLOAD_RSP != 412 ]]; then echo "Upload failed with code $BOB_UPLOAD_RSP"{FAIL} fi """.format(URL=self.__url.geturl(), LIVEBUILDID=quote(liveBuildId), BUILDID=quote(buildId), FAIL="" if self._ignoreErrors() else "; exit 1", GEN=ARCHIVE_GENERATION, SUFFIX=BUILDID_SUFFIX))
def jenkinsNamePersister(jenkins, wrapFmt, uuid): def persist(step, props): ret = BobState().getJenkinsByNameDirectory( jenkins, wrapFmt(step, props), step.getVariantId()) if uuid: ret = ret + "-" + uuid return ret def fmt(step, mode, props): if mode == 'workspace': return persist(step, props) else: assert mode == 'exec' if step.getSandbox() is None: return os.path.join("$PWD", quote(persist(step, props))) else: return os.path.join("/bob", asHexStr(step.getVariantId()), "workspace") return fmt
def do_osx_install(srcdir, targetdir): if os.path.exists(targetdir): print 'Target dir %s already exists! Removing...' shutil.rmtree(targetdir) install_script = os.popen('find '+ srcdir +' -iname install.sh').read().strip() print 'DBG install_script:', install_script os.popen('chmod +x "%s"' % install_script) cmd_install = '%s %s %s' % (pipes.quote(install_script), srcdir, targetdir) print 'DBG cmd: "%s"' % cmd_install cmd_chmod_chromium = 'find %s -name Chromium -exec chmod +x {} \;' % (targetdir) cmd_chmod_chromium_helper = 'find %s -name Chromium\ Helper -exec chmod +x {} \;' % (targetdir) for cmd in [cmd_install, cmd_chmod_chromium, cmd_chmod_chromium_helper]: proc = subprocess.Popen(cmd, shell=True) proc.wait() if proc.returncode: print "returncode " + str(proc.returncode)
def xcheck_envar(conf, name, wafname=None, cross=False): wafname = wafname or name envar = os.environ.get(name, None) if envar is None: return value = Utils.to_list(envar) if envar != '' else [envar] conf.env[wafname] = value if cross: pretty = 'cross-compilation %s' % wafname else: pretty = wafname conf.msg('Will use %s' % pretty, " ".join(quote(x) for x in value))
def run_optical_flow(vid_item, dev_id=0): vid_path = vid_item[0] vid_id = vid_item[1] vid_name = vid_path.split('/')[-1].split('.')[0] out_full_path = os.path.join(out_path, vid_name) try: os.mkdir(out_full_path) except OSError: pass current = current_process() dev_id = (int(current._identity[0]) - 1) % NUM_GPU image_path = '{}/img'.format(out_full_path) flow_x_path = '{}/flow_x'.format(out_full_path) flow_y_path = '{}/flow_y'.format(out_full_path) cmd = os.path.join(df_path + 'build/extract_gpu')+' -f {} -x {} -y {} -i {} -b 20 -t 1 -d {} -s 1 -o {} -w {} -h {}'.format( quote(vid_path), quote(flow_x_path), quote(flow_y_path), quote(image_path), dev_id, out_format, new_size[0], new_size[1]) os.system(cmd) print '{} {} done'.format(vid_id, vid_name) sys.stdout.flush() return True
def main(): temp = mkdtemp(prefix='pipstrap-') try: downloads = [hashed_download(url, temp, digest) for url, digest in PACKAGES] check_output('pip install --no-index --no-deps -U ' + ' '.join(quote(d) for d in downloads), shell=True) except HashError as exc: print(exc) except Exception: rmtree(temp) raise else: rmtree(temp) return 0 return 1
def _debug_cmd(self, args, exe=None): if not self.params.get('verbose', False): return str_args = [decodeArgument(a) for a in args] if exe is None: exe = os.path.basename(str_args[0]) try: import pipes shell_quote = lambda args: ' '.join(map(pipes.quote, str_args)) except ImportError: shell_quote = repr self.to_screen('[debug] %s command line: %s' % ( exe, shell_quote(str_args)))
def _set_delayed(self): """Delayed change of iptables rules (postprocess method). After deleting/creation of pod we should change iptables rules, but we don't know if the operation actually have been performed. So, wait for 2 minutes and call postprocess method as superuser (via suid binary 'suidwrap'). """ token = getattr(self, 'token', None) if not token or token == 'None': data = self.query.get(AUTH_TOKEN_PATH) token = data['token'] try: fmt = 'echo /usr/libexec/suidwrap "{0}" {1} ' \ '|at now + 2 minute > /dev/null 2>&1' subprocess.check_call([fmt.format(token, quote(self.name))], shell=True) except (KeyError, TypeError, subprocess.CalledProcessError): return
def search(self, query): if self.grep_command is not None: command = self.grep_command.format(pipes.quote(query)) elif self.show_in_view: command = grepFormatStr().format( grepPath(self.window), pipes.quote(query) ) else: # we need quick results command = quickGrepFormatStr().format( grepPath(self.window), pipes.quote(query) ) sublime.status_message("grepping {0} ...".format(pipes.quote(query))) output, _ = run_bash_for_output(command) lines = output.split('\n') self.show_results(query, lines)
def files_in_archive(self, force_refresh=False): if self._files_in_archive and not force_refresh: return self._files_in_archive cmd = [ self.cmd_path, '--list', '-C', self.dest ] if self.zipflag: cmd.append(self.zipflag) if self.opts: cmd.extend([ '--show-transformed-names' ] + self.opts) if self.excludes: cmd.extend([ '--exclude=' + quote(f) for f in self.excludes ]) cmd.extend([ '-f', self.src ]) rc, out, err = self.module.run_command(cmd, cwd=self.dest, environ_update=dict(LANG='C', LC_ALL='C', LC_MESSAGES='C')) if rc != 0: raise UnarchiveError('Unable to list files in the archive') for filename in out.splitlines(): # Compensate for locale-related problems in gtar output (octal unicode representation) #11348 # filename = filename.decode('string_escape') filename = codecs.escape_decode(filename)[0] if filename and filename not in self.excludes: self._files_in_archive.append(to_native(filename)) return self._files_in_archive
def unarchive(self): cmd = [ self.cmd_path, '--extract', '-C', self.dest ] if self.zipflag: cmd.append(self.zipflag) if self.opts: cmd.extend([ '--show-transformed-names' ] + self.opts) if self.file_args['owner']: cmd.append('--owner=' + quote(self.file_args['owner'])) if self.file_args['group']: cmd.append('--group=' + quote(self.file_args['group'])) if self.module.params['keep_newer']: cmd.append('--keep-newer-files') if self.excludes: cmd.extend([ '--exclude=' + quote(f) for f in self.excludes ]) cmd.extend([ '-f', self.src ]) rc, out, err = self.module.run_command(cmd, cwd=self.dest, environ_update=dict(LANG='C', LC_ALL='C', LC_MESSAGES='C')) return dict(cmd=cmd, rc=rc, out=out, err=err)
def _read_user_execute(self): """ Returns the command line for reading a crontab """ user = '' if self.user: if platform.system() == 'SunOS': return "su %s -c '%s -l'" % (pipes.quote(self.user), pipes.quote(CRONCMD)) elif platform.system() == 'AIX': return "%s -l %s" % (pipes.quote(CRONCMD), pipes.quote(self.user)) elif platform.system() == 'HP-UX': return "%s %s %s" % (CRONCMD , '-l', pipes.quote(self.user)) else: user = '-u %s' % pipes.quote(self.user) return "%s %s %s" % (CRONCMD , user, '-l')
def query_package(module, port_path, name, state="present"): """ Returns whether a package is installed or not. """ if state == "present": rc, out, err = module.run_command("%s installed | grep -q ^.*%s" % (pipes.quote(port_path), pipes.quote(name)), use_unsafe_shell=True) if rc == 0: return True return False elif state == "active": rc, out, err = module.run_command("%s installed %s | grep -q active" % (pipes.quote(port_path), pipes.quote(name)), use_unsafe_shell=True) if rc == 0: return True return False
def build_module_command(self, env_string, shebang, cmd, arg_path=None, rm_tmp=None): # don't quote the cmd if it's an empty string, because this will break pipelining mode if cmd.strip() != '': cmd = pipes.quote(cmd) cmd_parts = [] if shebang: shebang = shebang.replace("#!", "").strip() else: shebang = "" cmd_parts.extend([env_string.strip(), shebang, cmd]) if arg_path is not None: cmd_parts.append(arg_path) new_cmd = " ".join(cmd_parts) if rm_tmp: new_cmd = '%s; rm -rf "%s" %s' % (new_cmd, rm_tmp, self._SHELL_REDIRECT_ALLNULL) return new_cmd
def put_file(self, in_path, out_path): ''' transfer a file from local to lxc ''' super(Connection, self).put_file(in_path, out_path) display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.lxc) out_path = pipes.quote(self._prefix_login_path(out_path)) try: with open(to_bytes(in_path, errors='surrogate_or_strict'), 'rb') as in_file: try: p = self._buffered_exec_command('dd of=%s bs=%s' % (out_path, BUFSIZE), stdin=in_file) except OSError: raise AnsibleError("chroot connection requires dd command in the chroot") try: stdout, stderr = p.communicate() except: traceback.print_exc() raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path)) if p.returncode != 0: raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr)) except IOError: raise AnsibleError("file or module does not exist at: %s" % in_path)
def fetch_file(self, in_path, out_path): ''' fetch a file from lxc to local ''' super(Connection, self).fetch_file(in_path, out_path) display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self.lxc) in_path = pipes.quote(self._prefix_login_path(in_path)) try: p = self._buffered_exec_command('dd if=%s bs=%s' % (in_path, BUFSIZE)) except OSError: raise AnsibleError("chroot connection requires dd command in the chroot") with open(to_bytes(out_path, errors='surrogate_or_strict'), 'wb+') as out_file: try: chunk = p.stdout.read(BUFSIZE) while chunk: out_file.write(chunk) chunk = p.stdout.read(BUFSIZE) except: traceback.print_exc() raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path)) stdout, stderr = p.communicate() if p.returncode != 0: raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
def put_file(self, in_path, out_path): ''' transfer a file from local to chroot ''' super(Connection, self).put_file(in_path, out_path) display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.chroot) out_path = pipes.quote(self._prefix_login_path(out_path)) try: with open(to_bytes(in_path, errors='surrogate_or_strict'), 'rb') as in_file: try: p = self._buffered_exec_command('dd of=%s bs=%s' % (out_path, BUFSIZE), stdin=in_file) except OSError: raise AnsibleError("chroot connection requires dd command in the chroot") try: stdout, stderr = p.communicate() except: traceback.print_exc() raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path)) if p.returncode != 0: raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr)) except IOError: raise AnsibleError("file or module does not exist at: %s" % in_path)
def put_file(self, in_path, out_path): ''' transfer a file from local to jail ''' super(Connection, self).put_file(in_path, out_path) display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.jail) out_path = pipes.quote(self._prefix_login_path(out_path)) try: with open(to_bytes(in_path, errors='surrogate_or_strict'), 'rb') as in_file: try: p = self._buffered_exec_command('dd of=%s bs=%s' % (out_path, BUFSIZE), stdin=in_file) except OSError: raise AnsibleError("jail connection requires dd command in the jail") try: stdout, stderr = p.communicate() except: traceback.print_exc() raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path)) if p.returncode != 0: raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr)) except IOError: raise AnsibleError("file or module does not exist at: %s" % in_path)
def fetch_file(self, in_path, out_path): ''' fetch a file from jail to local ''' super(Connection, self).fetch_file(in_path, out_path) display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self.jail) in_path = pipes.quote(self._prefix_login_path(in_path)) try: p = self._buffered_exec_command('dd if=%s bs=%s' % (in_path, BUFSIZE)) except OSError: raise AnsibleError("jail connection requires dd command in the jail") with open(to_bytes(out_path, errors='surrogate_or_strict'), 'wb+') as out_file: try: chunk = p.stdout.read(BUFSIZE) while chunk: out_file.write(chunk) chunk = p.stdout.read(BUFSIZE) except: traceback.print_exc() raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path)) stdout, stderr = p.communicate() if p.returncode != 0: raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
def put_file(self, in_path, out_path): ''' transfer a file from local to zone ''' super(Connection, self).put_file(in_path, out_path) display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.zone) out_path = pipes.quote(self._prefix_login_path(out_path)) try: with open(in_path, 'rb') as in_file: try: p = self._buffered_exec_command('dd of=%s bs=%s' % (out_path, BUFSIZE), stdin=in_file) except OSError: raise AnsibleError("jail connection requires dd command in the jail") try: stdout, stderr = p.communicate() except: traceback.print_exc() raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path)) if p.returncode != 0: raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr)) except IOError: raise AnsibleError("file or module does not exist at: %s" % in_path)
def run(*args, **kw): if len(args) == 1 and isinstance(args[0], type('')): cmd = split(args[0]) else: cmd = args print(' '.join(pipes.quote(x) for x in cmd)) sys.stdout.flush() env = current_env(library_path=kw.get('library_path')) try: p = subprocess.Popen(cmd, env=env, cwd=kw.get('cwd')) except EnvironmentError as err: if err.errno == errno.ENOENT: raise SystemExit('Could not find the program: %s' % cmd[0]) raise rc = p.wait() if kw.get('no_check'): return rc if rc != 0: print('The following command failed, with return code:', rc) print(' '.join(pipes.quote(x) for x in cmd)) print('Dropping you into a shell') sys.stdout.flush() run_shell(library_path=kw.get('library_path')) raise SystemExit(1)
def gitpkgv_revision(self, ud, d, name): """ Return a sortable revision number by counting commits in the history Based on gitpkgv.bblass in meta-openembedded """ rev = self._build_revision(ud, d, name) localpath = ud.localpath rev_file = os.path.join(localpath, "oe-gitpkgv_" + rev) if not os.path.exists(localpath): commits = None else: if not os.path.exists(rev_file) or not os.path.getsize(rev_file): from pipes import quote commits = bb.fetch2.runfetchcmd( "git rev-list %s -- | wc -l" % (quote(rev)), d, quiet=True).strip().lstrip('0') if commits: open(rev_file, "w").write("%d\n" % int(commits)) else: commits = open(rev_file, "r").readline(128).strip() if commits: return False, "%s+%s" % (commits, rev[:7]) else: return True, str(rev)
def opened_files(path, excludes): files = [] try: process = os.popen('lsof -wFn +D %s | tail -n +2 | cut -c2-' % cmd_quote(path)) data = process.read() process.close() for item in data.split('\n'): if not item or len(item) <= 2 or os.path.isdir(item) or item.isdigit() or file_excluded(item, excludes): continue files.append(item) return files except Exception as ex: logger.exception("Exception checking %r: ", path) return None
def rclone_move_command(local, remote, transfers, checkers, bwlimit, excludes, chunk_size, dry_run): upload_cmd = 'rclone move %s %s' \ ' --delete-after' \ ' --no-traverse' \ ' --stats=60s' \ ' -v' \ ' --transfers=%d' \ ' --checkers=%d' \ ' --drive-chunk-size=%s' % \ (cmd_quote(local), cmd_quote(remote), transfers, checkers, chunk_size) if bwlimit and len(bwlimit): upload_cmd += ' --bwlimit="%s"' % bwlimit for item in excludes: upload_cmd += ' --exclude="%s"' % item if dry_run: upload_cmd += ' --dry-run' return upload_cmd
def remove_empty_directories(config, force_dry_run=False): open_files = opened_files(config['local_folder'], config['lsof_excludes']) if not len(open_files): clearing = False for dir, depth in config['rclone_remove_empty_on_upload'].items(): if os.path.exists(dir): clearing = True logger.debug("Removing empty directories from %r with mindepth %r", dir, depth) cmd = 'find %s -mindepth %d -type d -empty' % (cmd_quote(dir), depth) if not config['dry_run'] and not force_dry_run: cmd += ' -delete' run_command(cmd) if clearing: logger.debug("Finished clearing empty directories") else: logger.debug("Skipped removing empty directories because %d files are currently open: %r", len(open_files), open_files) ############################################################ # CONFIG STUFF ############################################################
def test_activate(monkeypatch): can_connect_args = _monkeypatch_can_connect_to_socket_to_succeed(monkeypatch) def activate_redis_url(dirname): project_dir_disable_dedicated_env(dirname) result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None) assert can_connect_args['port'] == 6379 assert result is not None if platform.system() == 'Windows': result = [line for line in result if not line.startswith("export PATH")] print("activate changed PATH on Windows and ideally it would not.") if len(result) > 2: import os print("os.environ=" + repr(os.environ)) print("result=" + repr(result)) assert ['export PROJECT_DIR=' + quote(dirname), 'export REDIS_URL=redis://localhost:6379'] == result with_directory_contents_completing_project_file( {DEFAULT_PROJECT_FILENAME: """ services: REDIS_URL: redis """}, activate_redis_url)
def test_activate_quoting(monkeypatch): def activate_foo(dirname): project_dir_disable_dedicated_env(dirname) result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None) assert result is not None if platform.system() == 'Windows': result = [line for line in result if not line.startswith("export PATH")] print("activate changed PATH on Windows and ideally it would not.") assert ["export FOO='$! boo'", 'export PROJECT_DIR=' + quote(dirname)] == result with_directory_contents_completing_project_file( { DEFAULT_PROJECT_FILENAME: """ variables: FOO: {} """, DEFAULT_LOCAL_STATE_FILENAME: """ variables: FOO: $! boo """ }, activate_foo)
def activate(dirname, ui_mode, conda_environment, command_name): """Prepare project and return lines to be sourced. Future direction: should also activate the proper conda env. Returns: None on failure or a list of lines to print. """ project = load_project(dirname) result = prepare_with_ui_mode_printing_errors(project, ui_mode=ui_mode, env_spec_name=conda_environment, command_name=command_name) if result.failed: return None exports = [] # sort so we have deterministic output order for tests sorted_keys = list(result.environ.keys()) sorted_keys.sort() for key in sorted_keys: value = result.environ[key] if key not in os.environ or os.environ[key] != value: exports.append("export {key}={value}".format(key=key, value=quote(value))) return exports
def is_available(cls): if (super(cls, cls).is_available() and diagnose.check_executable('text2wave') and diagnose.check_executable('festival')): logger = logging.getLogger(__name__) cmd = ['festival', '--pipe'] with tempfile.SpooledTemporaryFile() as out_f: with tempfile.SpooledTemporaryFile() as in_f: logger.debug('Executing %s', ' '.join([pipes.quote(arg) for arg in cmd])) subprocess.call(cmd, stdin=in_f, stdout=out_f, stderr=out_f) out_f.seek(0) output = out_f.read().strip() if output: logger.debug("Output was: '%s'", output) return ('No default voice found' not in output) return False
def say(self, phrase, *args): self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG) cmd = ['text2wave'] with tempfile.NamedTemporaryFile(suffix='.wav') as out_f: with tempfile.SpooledTemporaryFile() as in_f: in_f.write(phrase) in_f.seek(0) with tempfile.SpooledTemporaryFile() as err_f: self._logger.debug('Executing %s', ' '.join([pipes.quote(arg) for arg in cmd])) subprocess.call(cmd, stdin=in_f, stdout=out_f, stderr=err_f) err_f.seek(0) output = err_f.read() if output: self._logger.debug("Output was: '%s'", output) self.play(out_f.name)
def say(self, phrase, *args): self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG) cmd = ['flite'] if self.voice: cmd.extend(['-voice', self.voice]) cmd.extend(['-t', phrase]) with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: fname = f.name cmd.append(fname) with tempfile.SpooledTemporaryFile() as out_f: self._logger.debug('Executing %s', ' '.join([pipes.quote(arg) for arg in cmd])) subprocess.call(cmd, stdout=out_f, stderr=out_f) out_f.seek(0) output = out_f.read().strip() if output: self._logger.debug("Output was: '%s'", output) self.play(fname) os.remove(fname)
def say(self, phrase, *args): self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG) with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: fname = f.name cmd = ['espeak', '-v', self.voice, '-p', self.pitch_adjustment, '-s', self.words_per_minute, '-w', fname, phrase] cmd = [str(x) for x in cmd] self._logger.debug('Executing %s', ' '.join([pipes.quote(arg) for arg in cmd])) with tempfile.TemporaryFile() as f: subprocess.call(cmd, stdout=f, stderr=f) f.seek(0) output = f.read() if output: self._logger.debug("Output was: '%s'", output) self.play(fname) os.remove(fname)
def say(self, phrase, *args): self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG) with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: fname = f.name cmd = ['pico2wave', '--wave', fname] if self.language not in self.languages: raise ValueError("Language '%s' not supported by '%s'", self.language, self.SLUG) cmd.extend(['-l', self.language]) cmd.append(phrase) self._logger.debug('Executing %s', ' '.join([pipes.quote(arg) for arg in cmd])) with tempfile.TemporaryFile() as f: subprocess.call(cmd, stdout=f, stderr=f) f.seek(0) output = f.read() if output: self._logger.debug("Output was: '%s'", output) self.play(fname) os.remove(fname)
def get_command(self, file, **options): # on darwin open returns immediately resulting in the temp # file removal while app is opening command = "open -a /Applications/Preview.app" command = "(%s %s; sleep 20; rm -f %s)&" % (command, quote(file), quote(file)) return command
def show_file(self, file, **options): command, executable = self.get_command_ex(file, **options) command = "(%s %s; rm -f %s)&" % (command, quote(file), quote(file)) os.system(command) return 1 # implementations
def get_command_ex(self, file, title=None, **options): # note: xv is pretty outdated. most modern systems have # imagemagick's display command instead. command = executable = "xv" if title: command += " -name %s" % quote(title) return command, executable
def __uploadJenkins(self, step, buildIdFile, resultFile, suffix): """Generate upload shell script. We cannot simply copy the artifact to the final location as this is not atomic. Instead we create a temporary file at the repository root, copy the artifact there and hard-link the temporary file at the final location. If the link fails it is usually caused by a concurrent upload. Test that the artifact is readable in this case to distinguish it from other fatal errors. """ if not self.canUploadJenkins(): return "" return "\n" + textwrap.dedent("""\ # upload artifact cd $WORKSPACE BOB_UPLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}" BOB_UPLOAD_FILE="{DIR}/${{BOB_UPLOAD_BID:0:2}}/${{BOB_UPLOAD_BID:2:2}}/${{BOB_UPLOAD_BID:4}}{SUFFIX}" if [[ ! -e ${{BOB_UPLOAD_FILE}} ]] ; then ( set -eE T="$(mktemp -p {DIR})" trap 'rm -f $T' EXIT cp {RESULT} "$T" mkdir -p "${{BOB_UPLOAD_FILE%/*}}" if ! ln -T "$T" "$BOB_UPLOAD_FILE" ; then [[ -r "$BOB_UPLOAD_FILE" ]] || exit 2 fi ){FIXUP} fi""".format(DIR=self.__basePath, BUILDID=quote(buildIdFile), RESULT=quote(resultFile), FIXUP=" || echo Upload failed: $?" if self._ignoreErrors() else "", GEN=ARCHIVE_GENERATION, SUFFIX=suffix))
def download(self, step, buildIdFile, tgzFile): if not self.canDownloadJenkins(): return "" return "\n" + textwrap.dedent("""\ if [[ ! -e {RESULT} ]] ; then BOB_DOWNLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}" BOB_DOWNLOAD_FILE="{DIR}/${{BOB_DOWNLOAD_BID:0:2}}/${{BOB_DOWNLOAD_BID:2:2}}/${{BOB_DOWNLOAD_BID:4}}{SUFFIX}" cp "$BOB_DOWNLOAD_FILE" {RESULT} || echo Download failed: $? fi """.format(DIR=self.__basePath, BUILDID=quote(buildIdFile), RESULT=quote(tgzFile), GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX))
def download(self, step, buildIdFile, tgzFile): # only download if requested if not self.canDownloadJenkins(): return "" return "\n" + textwrap.dedent("""\ if [[ ! -e {RESULT} ]] ; then BOB_DOWNLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}" BOB_DOWNLOAD_URL="{URL}/${{BOB_DOWNLOAD_BID:0:2}}/${{BOB_DOWNLOAD_BID:2:2}}/${{BOB_DOWNLOAD_BID:4}}{SUFFIX}" curl -sSg --fail -o {RESULT} "$BOB_DOWNLOAD_URL" || echo Download failed: $? fi """.format(URL=self.__url.geturl(), BUILDID=quote(buildIdFile), RESULT=quote(tgzFile), GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX))
def download(self, step, buildIdFile, tgzFile): # only download if requested if not self.canDownloadJenkins(): return "" return """ if [[ ! -e {RESULT} ]] ; then BOB_DOWNLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}" BOB_LOCAL_ARTIFACT={RESULT} BOB_REMOTE_ARTIFACT="${{BOB_DOWNLOAD_BID:0:2}}/${{BOB_DOWNLOAD_BID:2:2}}/${{BOB_DOWNLOAD_BID:4}}{SUFFIX}" {CMD} fi """.format(CMD=self.__downloadCmd, BUILDID=quote(buildIdFile), RESULT=quote(tgzFile), GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX)
def __getitem__(self, item): mode = item[0] item = item[1:] content = [] try: paths = sorted(glob(os.path.join(self.baseDir, item))) if not paths: raise ParseError("No files matched in include pattern '{}'!" .format(item)) for path in paths: content.append(self.fileLoader(path)) except OSError as e: raise ParseError("Error including '"+item+"': " + str(e)) content = b''.join(content) self.incDigests.append(asHexStr(hashlib.sha1(content).digest())) if mode == '<': var = "_{}{}".format(self.varBase, self.count) self.count += 1 self.prolog.extend([ "{VAR}=$(mktemp)".format(VAR=var), "_BOB_TMP_CLEANUP+=( ${VAR} )".format(VAR=var), "base64 -d > ${VAR} <<EOF".format(VAR=var)]) self.prolog.extend(sliceString(b64encode(content).decode("ascii"), 76)) self.prolog.append("EOF") ret = "${" + var + "}" else: assert mode == "'" ret = quote(content.decode('utf8')) return ret