我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.close()。
def main(): ''' Run code specifed by data received over pipe ''' assert is_forking(sys.argv) handle = int(sys.argv[-1]) fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) from_parent = os.fdopen(fd, 'rb') process.current_process()._inheriting = True preparation_data = load(from_parent) prepare(preparation_data) self = load(from_parent) process.current_process()._inheriting = False from_parent.close() exitcode = self._bootstrap() exit(exitcode)
def _dump(self, file=None, format=None): import tempfile suffix = '' if format: suffix = '.'+format if not file: f, file = tempfile.mkstemp(suffix) os.close(f) self.load() if not format or format == "PPM": self.im.save_ppm(file) else: if not file.endswith(format): file = file + "." + format self.save(file, format) return file
def close(self): """It is advised to call 'close' on the pipe so both handles of pipe are closed. """ if isinstance(self.stdin, AsyncFile): self.stdin.close() self.stdin = None if self.stdin_rh: win32pipe.DisconnectNamedPipe(self.stdin_rh) win32file.CloseHandle(self.stdin_rh) self.stdin_rh = None if isinstance(self.stdout, AsyncFile): self.stdout.close() self.stdout = None if isinstance(self.stderr, AsyncFile): self.stderr.close() self.stderr = None
def _socketpair(): if hasattr(socket, 'socketpair'): return socket.socketpair() srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv_sock.bind(('127.0.0.1', 0)) srv_sock.listen(1) write_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: write_sock.setblocking(False) try: write_sock.connect(srv_sock.getsockname()[:2]) except socket.error as e: if e.args[0] in (EINPROGRESS, EWOULDBLOCK): pass else: raise write_sock.setblocking(True) read_sock = srv_sock.accept()[0] except: write_sock.close() raise finally: srv_sock.close() return (read_sock, write_sock)
def terminate(self): if hasattr(self.cmd_write, 'getsockname'): self.cmd_write.close() self.cmd_read.close() for fd in self._fds.itervalues(): try: self._poller.unregister(fd._fileno) except: logger.warning('unregister of %s failed with %s', fd._fileno, traceback.format_exc()) fd._notifier = None self._fds.clear() self._timeouts = [] if hasattr(self._poller, 'terminate'): self._poller.terminate() self._poller = None self.cmd_read = self.cmd_write = None
def _terminate_task(self, task): """Internal use only. """ self._lock.acquire() tid = task._id task = self._tasks.get(tid, None) if task is None: logger.warning('invalid task %s to terminate', tid) self._lock.release() return -1 # TODO: if currently waiting I/O or holding locks, warn? if task._state == Pycos._Running: logger.warning('task to terminate %s/%s is running', task._name, tid) else: self._suspended.discard(tid) self._scheduled.add(tid) task._state = Pycos._Scheduled task._timeout = None task._callers = [] if self._polling and len(self._scheduled) == 1: self._notifier.interrupt() task._exceptions.append((GeneratorExit, GeneratorExit('close'))) self._lock.release() return 0
def terminate(self): if hasattr(self.cmd_write, 'getsockname'): self.cmd_write.close() self.cmd_read.close() for fd in self._fds.values(): try: self._poller.unregister(fd._fileno) except: logger.warning('unregister of %s failed with %s', fd._fileno, traceback.format_exc()) fd._notifier = None self._fds.clear() self._timeouts = [] if hasattr(self._poller, 'terminate'): self._poller.terminate() self._poller = None self.cmd_read = self.cmd_write = None
def close(self): """Close the TarFile. In write-mode, two finishing zero blocks are appended to the archive. """ if self.closed: return if self.mode in "aw": self.fileobj.write(NUL * (BLOCKSIZE * 2)) self.offset += (BLOCKSIZE * 2) # fill up the end with zero-blocks # (like option -b20 for tar does) blocks, remainder = divmod(self.offset, RECORDSIZE) if remainder > 0: self.fileobj.write(NUL * (RECORDSIZE - remainder)) if not self._extfileobj: self.fileobj.close() self.closed = True
def close(self): """Close the _Stream object. No operation should be done on it afterwards. """ if self.closed: return self.closed = True try: if self.mode == "w" and self.comptype != "tar": self.buf += self.cmp.flush() if self.mode == "w" and self.buf: self.fileobj.write(self.buf) self.buf = b"" if self.comptype == "gz": self.fileobj.write(struct.pack("<L", self.crc)) self.fileobj.write(struct.pack("<L", self.pos & 0xffffFFFF)) finally: if not self._extfileobj: self.fileobj.close()
def close(self): """Close the TarFile. In write-mode, two finishing zero blocks are appended to the archive. """ if self.closed: return self.closed = True try: if self.mode in ("a", "w", "x"): self.fileobj.write(NUL * (BLOCKSIZE * 2)) self.offset += (BLOCKSIZE * 2) # fill up the end with zero-blocks # (like option -b20 for tar does) blocks, remainder = divmod(self.offset, RECORDSIZE) if remainder > 0: self.fileobj.write(NUL * (RECORDSIZE - remainder)) finally: if not self._extfileobj: self.fileobj.close()
def upload_to_fileshare_test(self): #pylint: disable=no-self-use """Upload copies files to non-native store correctly with no progress""" import shutil import tempfile temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp()) temp_src_dir = os.path.dirname(temp_file.name) temp_dst_dir = tempfile.mkdtemp() shutil_mock = MagicMock() shutil_mock.copyfile.return_value = None with patch('sfctl.custom_app.shutil', new=shutil_mock): sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False) shutil_mock.copyfile.assert_called_once() temp_file.close() shutil.rmtree(os.path.dirname(temp_file.name)) shutil.rmtree(temp_dst_dir)
def __init__(self, cmd, bufsize=-1): _cleanup() self.cmd = cmd p2cread, p2cwrite = os.pipe() c2pread, c2pwrite = os.pipe() self.pid = os.fork() if self.pid == 0: # Child os.dup2(p2cread, 0) os.dup2(c2pwrite, 1) os.dup2(c2pwrite, 2) self._run_child(cmd) os.close(p2cread) self.tochild = os.fdopen(p2cwrite, 'w', bufsize) os.close(c2pwrite) self.fromchild = os.fdopen(c2pread, 'r', bufsize)
def importfile(path): """Import a Python source file or compiled file given its path.""" magic = imp.get_magic() file = open(path, 'r') if file.read(len(magic)) == magic: kind = imp.PY_COMPILED else: kind = imp.PY_SOURCE file.close() filename = os.path.basename(path) name, ext = os.path.splitext(filename) file = open(path, 'r') try: module = imp.load_module(name, file, path, (ext, 'r', kind)) except: raise ErrorDuringImport(path, sys.exc_info()) file.close() return module
def get_message(self, key): """Return a Message representation or raise a KeyError.""" subpath = self._lookup(key) f = open(os.path.join(self._path, subpath), 'r') try: if self._factory: msg = self._factory(f) else: msg = MaildirMessage(f) finally: f.close() subdir, name = os.path.split(subpath) msg.set_subdir(subdir) if self.colon in name: msg.set_info(name.split(self.colon)[-1]) msg.set_date(os.path.getmtime(os.path.join(self._path, subpath))) return msg
def __setitem__(self, key, message): """Replace the keyed message; raise KeyError if it doesn't exist.""" path = os.path.join(self._path, str(key)) try: f = open(path, 'rb+') except IOError, e: if e.errno == errno.ENOENT: raise KeyError('No message with key: %s' % key) else: raise try: if self._locked: _lock_file(f) try: os.close(os.open(path, os.O_WRONLY | os.O_TRUNC)) self._dump_message(message, f) if isinstance(message, MHMessage): self._dump_sequences(message, key) finally: if self._locked: _unlock_file(f) finally: _sync_close(f)
def get_string(self, key): """Return a string representation or raise a KeyError.""" try: if self._locked: f = open(os.path.join(self._path, str(key)), 'r+') else: f = open(os.path.join(self._path, str(key)), 'r') except IOError, e: if e.errno == errno.ENOENT: raise KeyError('No message with key: %s' % key) else: raise try: if self._locked: _lock_file(f) try: return f.read() finally: if self._locked: _unlock_file(f) finally: f.close()
def _get_soname(f): # assuming GNU binutils / ELF if not f: return None cmd = 'if ! type objdump >/dev/null 2>&1; then exit 10; fi;' \ "objdump -p -j .dynamic 2>/dev/null " + f f = os.popen(cmd) dump = f.read() rv = f.close() if rv == 10: raise OSError, 'objdump command not found' f = os.popen(cmd) try: data = f.read() finally: f.close() res = re.search(r'\sSONAME\s+([^\s]+)', data) if not res: return None return res.group(1)
def _findLib_ldconfig(name): # XXX assuming GLIBC's ldconfig (with option -p) expr = r'/[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name) f = os.popen('LC_ALL=C LANG=C /sbin/ldconfig -p 2>/dev/null') try: data = f.read() finally: f.close() res = re.search(expr, data) if not res: # Hm, this works only for libs needed by the python executable. cmd = 'ldd %s 2>/dev/null' % sys.executable f = os.popen(cmd) try: data = f.read() finally: f.close() res = re.search(expr, data) if not res: return None return res.group(0)
def run_with_args(args, parser): # type: (argparse.Namespace, argparse.ArgumentParser) -> int set_logging_parameters(args, parser) start_time = time.time() ret = OK try: if args.profile: outline("Profiling...") profile("ret = whatstyle(args, parser)", locals(), globals()) else: ret = whatstyle(args, parser) except IOError as exc: # If the output is piped into a pager like 'less' we get a broken pipe when # the pager is quit early and that is ok. if exc.errno == errno.EPIPE: pass elif str(exc) == 'Stream closed': pass else: raise if not PY2: sys.stderr.close() iprint(INFO_TIME, 'Run time: %s seconds' % (time.time() - start_time)) return ret
def get_terminal_size(): def ioctl_GWINSZ(fd): try: import fcntl import termios import struct cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: return None return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: try: cr = (os.env['LINES'], os.env['COLUMNS']) except: cr = (25, 80) return int(cr[1]), int(cr[0])
def grab(bbox=None): if sys.platform == "darwin": f, file = tempfile.mkstemp('.png') os.close(f) subprocess.call(['screencapture', '-x', file]) im = Image.open(file) im.load() os.unlink(file) else: size, data = grabber() im = Image.frombytes( "RGB", size, data, # RGB, 32-bit line padding, origo in lower left corner "raw", "BGR", (size[0]*3 + 3) & -4, -1 ) if bbox: im = im.crop(bbox) return im
def close(self): """ Closes the file pointer, if possible. This operation will destroy the image core and release its memory. The image data will be unusable afterward. This function is only required to close images that have not had their file read and closed by the :py:meth:`~PIL.Image.Image.load` method. """ try: self.fp.close() except Exception as msg: logger.debug("Error closing: %s", msg) # Instead of simply setting to None, we're setting up a # deferred error that will better explain that the core image # object is gone. self.im = deferred_error(ValueError("Operation on closed image"))
def terminate(self): """Close pipe and terminate child process. """ self.close() super(Popen, self).terminate()
def close(self): """Similar to 'close' of file descriptor. """ if self._handle: try: flags = win32pipe.GetNamedPipeInfo(self._handle)[0] except: flags = 0 if flags & win32con.PIPE_SERVER_END: win32pipe.DisconnectNamedPipe(self._handle) # TODO: if pipe, need to call FlushFileBuffers? def _close_(rc, n): win32file.CloseHandle(self._handle) self._overlap = None if self._notifier: self._notifier.unregister(self._handle) self._handle = None self._read_result = self._write_result = None self._read_task = self._write_task = None self._buflist = [] if self._overlap.object: self._overlap.object = _close_ win32file.CancelIo(self._handle) else: _close_(0, 0)
def close(self): """Close file descriptor. """ if self._fileno: self._notifier.unregister(self) if self._fd: self._fd.close() self._fd = self._fileno = None self._read_task = self._write_task = None self._read_fn = self._write_fn = None self._buflist = []
def close(self): """Close pipe. """ self.first = None self.last = None
def __del__(self): self.close()
def __exit__(self, exc_type, exc_value, trace): self.close() return True
def close(self): """'close' must be called when done with socket. """ self._unregister() if self._rsock: self._rsock.close() self._rsock = None self._read_fn = self._write_fn = None self._read_task = self._write_task = None
def terminate(self): if self.iocp: self.async_poller.terminate() self.cmd_rsock.close() self.cmd_wsock.close() self.cmd_rsock_buf = None iocp, self.iocp = self.iocp, None win32file.CloseHandle(iocp) self._timeouts = [] self.cmd_rsock = self.cmd_wsock = None self.__class__._instance = None
def __exit__(self, exc_type, exc_value, trace): self.close()
def close(self): if not self._location: self.unregister() self._subscribers = set() self._scheduler._lock.acquire() self._scheduler._channels.pop(self._name, None) self._scheduler._lock.release()
def tmpfilepath(): fd, fname = tempfile.mkstemp(prefix='djpt-test-') os.close(fd) yield fname os.remove(fname)
def __init__(self, content): if not isinstance(content, str): raise Exception("Note content must be an instance " "of string, '%s' given." % type(content)) (tempfileHandler, tempfileName) = tempfile.mkstemp(suffix=".markdown") os.write(tempfileHandler, self.ENMLtoText(content)) os.close(tempfileHandler) self.content = content self.tempfile = tempfileName
def close(self): """Closes any open connections""" pass
def close(self): """Close the ssh connection""" ftp = self.client.open_sftp() ftp.remove(self.script_filename) ftp.close() self.client.close()
def upload_file(self, filename, content, mode=None): ftp = self.client.open_sftp() file = ftp.file(filename, 'w', -1) file.write(content) file.flush() if mode: ftp.chmod(self.script_filename, 0o744) ftp.close()
def get_tmp_script_filename(self): channel = self.transport.open_session() channel.get_pty() out = channel.makefile() channel.exec_command('mktemp') tmpfilename = out.readline().strip() channel.recv_exit_status() channel.close() return tmpfilename