我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.write()。
def saveCode(self, title, filter, run_assembler): qFile, qFilter = QFileDialog().getSaveFileName(self, title, HOME, filter=filter) if qFile is None or len(qFile)==0 or qFile=="": return if run_assembler: asm = self.canvas.codeWidget.parser.getCleanCodeAsByte(as_string=True) txt, cnt = assemble(asm, self.arch) if cnt < 0: self.canvas.logWidget.editor.append("Failed to compile code") return else: txt = self.canvas.codeWidget.parser.getCleanCodeAsByte(as_string=True) with open(qFile, "wb") as f: f.write(txt) self.canvas.logWidget.editor.append("Saved as '%s'" % qFile) return
def close(self): """Close the TarFile. In write-mode, two finishing zero blocks are appended to the archive. """ if self.closed: return if self.mode in "aw": self.fileobj.write(NUL * (BLOCKSIZE * 2)) self.offset += (BLOCKSIZE * 2) # fill up the end with zero-blocks # (like option -b20 for tar does) blocks, remainder = divmod(self.offset, RECORDSIZE) if remainder > 0: self.fileobj.write(NUL * (RECORDSIZE - remainder)) if not self._extfileobj: self.fileobj.close() self.closed = True
def addfile(self, tarinfo, fileobj=None): """Add the TarInfo object `tarinfo' to the archive. If `fileobj' is given, tarinfo.size bytes are read from it and added to the archive. You can create TarInfo objects using gettarinfo(). On Windows platforms, `fileobj' should always be opened with mode 'rb' to avoid irritation about the file size. """ self._check("aw") tarinfo = copy.copy(tarinfo) buf = tarinfo.tobuf(self.format, self.encoding, self.errors) self.fileobj.write(buf) self.offset += len(buf) # If there's data to follow, append it. if fileobj is not None: copyfileobj(fileobj, self.fileobj, tarinfo.size) blocks, remainder = divmod(tarinfo.size, BLOCKSIZE) if remainder > 0: self.fileobj.write(NUL * (BLOCKSIZE - remainder)) blocks += 1 self.offset += blocks * BLOCKSIZE self.members.append(tarinfo)
def copyfileobj(src, dst, length=None, exception=OSError, bufsize=None): """Copy length bytes from fileobj src to fileobj dst. If length is None, copy the entire content. """ bufsize = bufsize or 16 * 1024 if length == 0: return if length is None: shutil.copyfileobj(src, dst, bufsize) return blocks, remainder = divmod(length, bufsize) for b in range(blocks): buf = src.read(bufsize) if len(buf) < bufsize: raise exception("unexpected end of data") dst.write(buf) if remainder != 0: buf = src.read(remainder) if len(buf) < remainder: raise exception("unexpected end of data") dst.write(buf) return
def close(self): """Close the _Stream object. No operation should be done on it afterwards. """ if self.closed: return self.closed = True try: if self.mode == "w" and self.comptype != "tar": self.buf += self.cmp.flush() if self.mode == "w" and self.buf: self.fileobj.write(self.buf) self.buf = b"" if self.comptype == "gz": self.fileobj.write(struct.pack("<L", self.crc)) self.fileobj.write(struct.pack("<L", self.pos & 0xffffFFFF)) finally: if not self._extfileobj: self.fileobj.close()
def close(self): """Close the TarFile. In write-mode, two finishing zero blocks are appended to the archive. """ if self.closed: return self.closed = True try: if self.mode in ("a", "w", "x"): self.fileobj.write(NUL * (BLOCKSIZE * 2)) self.offset += (BLOCKSIZE * 2) # fill up the end with zero-blocks # (like option -b20 for tar does) blocks, remainder = divmod(self.offset, RECORDSIZE) if remainder > 0: self.fileobj.write(NUL * (RECORDSIZE - remainder)) finally: if not self._extfileobj: self.fileobj.close()
def addfile(self, tarinfo, fileobj=None): """Add the TarInfo object `tarinfo' to the archive. If `fileobj' is given, it should be a binary file, and tarinfo.size bytes are read from it and added to the archive. You can create TarInfo objects directly, or by using gettarinfo(). """ self._check("awx") tarinfo = copy.copy(tarinfo) buf = tarinfo.tobuf(self.format, self.encoding, self.errors) self.fileobj.write(buf) self.offset += len(buf) bufsize=self.copybufsize # If there's data to follow, append it. if fileobj is not None: copyfileobj(fileobj, self.fileobj, tarinfo.size, bufsize=bufsize) blocks, remainder = divmod(tarinfo.size, BLOCKSIZE) if remainder > 0: self.fileobj.write(NUL * (BLOCKSIZE - remainder)) blocks += 1 self.offset += blocks * BLOCKSIZE self.members.append(tarinfo)
def copyfileobj(src, dst, length=None): """Copy length bytes from fileobj src to fileobj dst. If length is None, copy the entire content. """ if length == 0: return if length is None: shutil.copyfileobj(src, dst) return BUFSIZE = 16 * 1024 blocks, remainder = divmod(length, BUFSIZE) for b in xrange(blocks): buf = src.read(BUFSIZE) if len(buf) < BUFSIZE: raise IOError("end of file reached") dst.write(buf) if remainder != 0: buf = src.read(remainder) if len(buf) < remainder: raise IOError("end of file reached") dst.write(buf) return
def unified_diff(filename, content2=None): # type: (str, Optional[bytes]) -> Tuple[int, Iterable[str]] """This function prints a unified diff of the contents of filename and the standard input, when used from the command line as follows: echo 123 > d.txt ; echo 456 | ./whatstyle.py --stdindiff d.txt We get this result: --- +++ @@ -1 +1 @@ -123 +456 """ use_stdin = content2 is None if content2 is None: # Read binary input stream stdin = rawstream(sys.stdin) econtent2 = bytestr(stdin.read()) else: econtent2 = content2 exit_code, diff = compute_unified_diff(filename, econtent2, lineterm='') if use_stdin: write('\n'.join(diff)) return exit_code, diff
def handle_read(self): try: while True: buf = self.recv(self.in_buffer_size) self._iobuf.write(buf) if len(buf) < self.in_buffer_size: break except socket.error as err: if ssl and isinstance(err, ssl.SSLError): if err.args[0] not in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE): self.defunct(err) return elif err.args[0] not in NONBLOCKING: self.defunct(err) return if self._iobuf.tell(): self.process_io_buffer() if not self._requests and not self.is_control_connection: self._readable = False
def _example(): if sys.platform == 'win32': shell, commands, tail = ('cmd', ('echo "hello"', 'echo "HELLO WORLD"'), '\r\n') else: shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n') a = Popen(shell, stdin=PIPE, stdout=PIPE) sys.stdout.write(a.read_async()) sys.stdout.write(" ") for cmd in commands: a.send_all(cmd + tail) sys.stdout.write(a.read_async()) sys.stdout.write(" ") a.send_all('exit' + tail) print (a.read_async(e=0)) a.wait() ################################################################################
def _compile_module_file(template, text, filename, outputpath, module_writer): source, lexer = _compile(template, text, filename, generate_magic_comment=True) if isinstance(source, compat.text_type): source = source.encode(lexer.encoding or 'ascii') if module_writer: module_writer(source, outputpath) else: # make tempfiles in the same location as the ultimate # location. this ensures they're on the same filesystem, # avoiding synchronization issues. (dest, name) = tempfile.mkstemp(dir=os.path.dirname(outputpath)) os.write(dest, source) os.close(dest) shutil.move(name, outputpath)
def writeKeyToFile(key, filename): """Write **key** to **filename**, with ``0400`` permissions. If **filename** doesn't exist, it will be created. If it does exist already, and is writable by the owner of the current process, then it will be truncated to zero-length and overwritten. :param bytes key: A key (or some other private data) to write to **filename**. :param str filename: The path of the file to write to. :raises: Any exceptions which may occur. """ logging.info("Writing key to file: %r", filename) flags = os.O_WRONLY | os.O_TRUNC | os.O_CREAT | getattr(os, "O_BIN", 0) fd = os.open(filename, flags, 0400) os.write(fd, key) os.fsync(fd) os.close(fd)
def _open_ipipes(wds, fifos, input_pipes): """ This will attempt to open the named pipes in the set of ``fifos`` for writing, which will only succeed if the subprocess has opened them for reading already. This modifies and returns the list of write descriptors, the list of waiting fifo names, and the mapping back to input adapters. """ for fifo in fifos.copy(): try: fd = os.open(fifo, os.O_WRONLY | os.O_NONBLOCK) input_pipes[fd] = fifos.pop(fifo) wds.append(fd) except OSError as e: if e.errno != errno.ENXIO: raise e return wds, fifos, input_pipes
def __init__(self, output_spec, key, dictionary=None): """ Appends all data from a stream under a key inside a dict. Can be used to bind traditional (non-streaming) outputs to pipes when using ``run_process``. :param output_spec: The output specification. :type output_spec: dict :param key: The key to accumulate the data under. :type key: hashable :param dictionary: Dictionary to write into. If not specified, uses the output_spec. :type dictionary: dict """ super(AccumulateDictAdapter, self).__init__(output_spec) if dictionary is None: dictionary = output_spec if key not in dictionary: dictionary[key] = '' self.dictionary = dictionary self.key = key
def write(self, n=65536): """ Called when it is detected the output side of this connector is ready to write. Reads (potentially blocks) at most n bytes and writes them to the output ends of this connector. If no bytes could be read, the connector is closed. :param n The maximum number of bytes to write. :type n int :returns: The actual number of bytes written. """ buf = self.input.read(n) if buf: return self.output.write(buf) else: self.close() return 0
def read(self, n=65536): """ Called when it is detected the input side of this connector is ready to read. Reads at most n bytes and writes them to the output ends of this connector. If no bytes could be read, the connector is closed. :param n The maximum number of bytes to read. :type n int :returns: The actual number of bytes read. """ buf = self.input.read(n) if buf: self.output.write(buf) # TODO PushAdapter/Writers should return number of bytes actually # written. return len(buf) else: self.close() return 0
def write(self, buf): """ Write a chunk of data to the output stream in accordance with the chunked transfer encoding protocol. """ try: self.conn.send(hex(len(buf))[2:].encode('utf-8')) self.conn.send(b'\r\n') self.conn.send(buf) self.conn.send(b'\r\n') except Exception: resp = self.conn.getresponse() sys.stderr.write( 'Exception while sending HTTP chunk to %s, status was %s, ' 'message was:\n%s\n' % (self.output_spec['url'], resp.status, resp.read())) self.conn.close() self._closed = True raise
def exec_response_command(self, cmd, **kw): # not public yet try: tmp = None if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192: program = cmd[0] #unquoted program name, otherwise exec_command will fail cmd = [self.quote_response_command(x) for x in cmd] (fd, tmp) = tempfile.mkstemp() os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode()) os.close(fd) cmd = [program, '@' + tmp] # no return here, that's on purpose ret = self.generator.bld.exec_command(cmd, **kw) finally: if tmp: try: os.remove(tmp) except OSError: pass # anti-virus and indexers can keep the files open -_- return ret ########## stupid evil command modification: concatenate the tokens /Fx, /doc, and /x: with the next token
def exec_response_command(self, cmd, **kw): # not public yet try: tmp = None if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192: program = cmd[0] #unquoted program name, otherwise exec_command will fail cmd = [self.quote_response_command(x) for x in cmd] (fd, tmp) = tempfile.mkstemp() os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode()) os.close(fd) cmd = [program, '@' + tmp] # no return here, that's on purpose ret = super(self.__class__, self).exec_command(cmd, **kw) finally: if tmp: try: os.remove(tmp) except OSError: pass # anti-virus and indexers can keep the files open -_- return ret
def run(self): while True : writefd = [] if not self.messages.empty(): # Expects a message to contain either the string 'exit' # or a line of input in a tuple: ('input', None) message = self.messages.get() if message == 'exit': self.messages.task_done() break else: message, _encoding = message writefd = [self.master] r,w,_ = select.select([self.master], writefd, [], 0) if r: # Read when the binary has new output for us (sometimes this came from us writing) line = os.read(self.master, 1024) # Reads up to a kilobyte at once. Should this be higher/lower? self.RECV_LINE.emit(line) if w: os.write(self.master, message + "\n") self.messages.task_done()
def generate_adhoc_ssl_context(): """Generates an adhoc SSL context for the development server.""" crypto = _get_openssl_crypto_module() import tempfile import atexit cert, pkey = generate_adhoc_ssl_pair() cert_handle, cert_file = tempfile.mkstemp() pkey_handle, pkey_file = tempfile.mkstemp() atexit.register(os.remove, pkey_file) atexit.register(os.remove, cert_file) os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)) os.close(cert_handle) os.close(pkey_handle) ctx = load_ssl_context(cert_file, pkey_file) return ctx
def __init__(self, first, last=None): """'first' is a Popen object. 'last', if given, is another Popen object that is the end of the joints to 'first'. 'write' operations send data to first's stdin and 'read' operations get data from last's stdout/stderr. """ if not last: last = first self.first = first self.last = last if platform.system() == 'Windows': if not isinstance(first, Popen) or not isinstance(last, Popen): raise ValueError('argument must be asyncfile.Popen object') if first.stdin: self.stdin = first.stdin else: self.stdin = None if last.stdout: self.stdout = last.stdout else: self.stdout = None if last.stderr: self.stderr = last.stderr else: self.stderr = None else: if not isinstance(first, subprocess.Popen) or not isinstance(last, subprocess.Popen): raise ValueError('argument must be subprocess.Popen object') if first.stdin: self.stdin = AsyncFile(first.stdin) else: self.stdin = None if last.stdout: self.stdout = AsyncFile(last.stdout) else: self.stdout = None if last.stderr: self.stderr = AsyncFile(last.stderr) else: self.stderr = None
def write(self, buf, full=False, timeout=None): """Write data in buf to stdin of pipe. See 'write' method of AsyncFile for details. """ yield self.stdin.write(buf, full=full, timeout=timeout)
def log(self, message, *args): now = time.time() if args: message = message % args if self.log_ms: self.stream.write('%s.%03d %s - %s\n' % (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(now)), 1000 * (now - int(now)), self.name, message)) else: self.stream.write('%s %s - %s\n' % (time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(now)), self.name, message))
def __init__(self, content): if not isinstance(content, str): raise Exception("Note content must be an instance " "of string, '%s' given." % type(content)) (tempfileHandler, tempfileName) = tempfile.mkstemp(suffix=".markdown") os.write(tempfileHandler, self.ENMLtoText(content)) os.close(tempfileHandler) self.content = content self.tempfile = tempfileName
def saveAsCFile(self): template = open(TEMPLATES_PATH+"/template.c", "rb").read() insns = self.canvas.codeWidget.parser.getCleanCodeAsByte(as_string=False) if sys.version_info.major == 2: title = bytes(self.arch.name) else: title = bytes(self.arch.name, encoding="utf-8") sc = b'""\n' i = 0 for insn in insns: txt, cnt = assemble(insn, self.arch) if cnt < 0: self.canvas.logWidget.editor.append("Failed to compile code") return c = b'"' + b''.join([ b'\\x%.2x'%txt[i] for i in range(len(txt)) ]) + b'"' c = c.ljust(60, b' ') c+= b'// ' + insn + b'\n' sc += b'\t' + c i += len(txt) sc += b'\t""' body = template % (title, i, sc) fd, fpath = tempfile.mkstemp(suffix=".c") os.write(fd, body) os.close(fd) self.canvas.logWidget.editor.append("Saved as '%s'" % fpath) return
def saveAsAsmFile(self): asm_fmt = open(TEMPLATES_PATH + "/template.asm", "rb").read() txt = self.canvas.codeWidget.parser.getCleanCodeAsByte(as_string=True) if sys.version_info.major == 2: title = bytes(self.arch.name) else: title = bytes(self.arch.name, encoding="utf-8") asm = asm_fmt % (title, b'\n'.join([b"\t%s"%x for x in txt.split(b'\n')])) fd, fpath = tempfile.mkstemp(suffix=".asm") os.write(fd, asm) os.close(fd) self.canvas.logWidget.editor.append("Saved as '%s'" % fpath)
def copyfileobj(src, dst, length=None): """Copy length bytes from fileobj src to fileobj dst. If length is None, copy the entire content. """ if length == 0: return if length is None: while True: buf = src.read(16*1024) if not buf: break dst.write(buf) return BUFSIZE = 16 * 1024 blocks, remainder = divmod(length, BUFSIZE) for b in range(blocks): buf = src.read(BUFSIZE) if len(buf) < BUFSIZE: raise IOError("end of file reached") dst.write(buf) if remainder != 0: buf = src.read(remainder) if len(buf) < remainder: raise IOError("end of file reached") dst.write(buf) return