Python os 模块,close() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.close()

项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def main():
        '''
        Run code specifed by data received over pipe
        '''
        assert is_forking(sys.argv)

        handle = int(sys.argv[-1])
        fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
        from_parent = os.fdopen(fd, 'rb')

        process.current_process()._inheriting = True
        preparation_data = load(from_parent)
        prepare(preparation_data)
        self = load(from_parent)
        process.current_process()._inheriting = False

        from_parent.close()

        exitcode = self._bootstrap()
        exit(exitcode)
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def _dump(self, file=None, format=None):
        import tempfile
        suffix = ''
        if format:
            suffix = '.'+format
        if not file:
            f, file = tempfile.mkstemp(suffix)
            os.close(f)

        self.load()
        if not format or format == "PPM":
            self.im.save_ppm(file)
        else:
            if not file.endswith(format):
                file = file + "." + format
            self.save(file, format)
        return file
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def close(self):
            """It is advised to call 'close' on the pipe so both
            handles of pipe are closed.
            """
            if isinstance(self.stdin, AsyncFile):
                self.stdin.close()
                self.stdin = None
            if self.stdin_rh:
                win32pipe.DisconnectNamedPipe(self.stdin_rh)
                win32file.CloseHandle(self.stdin_rh)
                self.stdin_rh = None
            if isinstance(self.stdout, AsyncFile):
                self.stdout.close()
                self.stdout = None
            if isinstance(self.stderr, AsyncFile):
                self.stderr.close()
                self.stderr = None
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def _socketpair():
                if hasattr(socket, 'socketpair'):
                    return socket.socketpair()
                srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                srv_sock.bind(('127.0.0.1', 0))
                srv_sock.listen(1)
                write_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    write_sock.setblocking(False)
                    try:
                        write_sock.connect(srv_sock.getsockname()[:2])
                    except socket.error as e:
                        if e.args[0] in (EINPROGRESS, EWOULDBLOCK):
                            pass
                        else:
                            raise
                    write_sock.setblocking(True)
                    read_sock = srv_sock.accept()[0]
                except:
                    write_sock.close()
                    raise
                finally:
                    srv_sock.close()
                return (read_sock, write_sock)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def terminate(self):
            if hasattr(self.cmd_write, 'getsockname'):
                self.cmd_write.close()
            self.cmd_read.close()
            for fd in self._fds.itervalues():
                try:
                    self._poller.unregister(fd._fileno)
                except:
                    logger.warning('unregister of %s failed with %s',
                                   fd._fileno, traceback.format_exc())
                fd._notifier = None
            self._fds.clear()
            self._timeouts = []
            if hasattr(self._poller, 'terminate'):
                self._poller.terminate()
            self._poller = None
            self.cmd_read = self.cmd_write = None
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def _terminate_task(self, task):
        """Internal use only.
        """
        self._lock.acquire()
        tid = task._id
        task = self._tasks.get(tid, None)
        if task is None:
            logger.warning('invalid task %s to terminate', tid)
            self._lock.release()
            return -1
        # TODO: if currently waiting I/O or holding locks, warn?
        if task._state == Pycos._Running:
            logger.warning('task to terminate %s/%s is running', task._name, tid)
        else:
            self._suspended.discard(tid)
            self._scheduled.add(tid)
            task._state = Pycos._Scheduled
            task._timeout = None
            task._callers = []
            if self._polling and len(self._scheduled) == 1:
                self._notifier.interrupt()
        task._exceptions.append((GeneratorExit, GeneratorExit('close')))
        self._lock.release()
        return 0
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def close(self):
            """It is advised to call 'close' on the pipe so both
            handles of pipe are closed.
            """
            if isinstance(self.stdin, AsyncFile):
                self.stdin.close()
                self.stdin = None
            if self.stdin_rh:
                win32pipe.DisconnectNamedPipe(self.stdin_rh)
                win32file.CloseHandle(self.stdin_rh)
                self.stdin_rh = None
            if isinstance(self.stdout, AsyncFile):
                self.stdout.close()
                self.stdout = None
            if isinstance(self.stderr, AsyncFile):
                self.stderr.close()
                self.stderr = None
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def _socketpair():
                if hasattr(socket, 'socketpair'):
                    return socket.socketpair()
                srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                srv_sock.bind(('127.0.0.1', 0))
                srv_sock.listen(1)
                write_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    write_sock.setblocking(False)
                    try:
                        write_sock.connect(srv_sock.getsockname()[:2])
                    except socket.error as e:
                        if e.args[0] in (EINPROGRESS, EWOULDBLOCK):
                            pass
                        else:
                            raise
                    write_sock.setblocking(True)
                    read_sock = srv_sock.accept()[0]
                except:
                    write_sock.close()
                    raise
                finally:
                    srv_sock.close()
                return (read_sock, write_sock)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def terminate(self):
            if hasattr(self.cmd_write, 'getsockname'):
                self.cmd_write.close()
            self.cmd_read.close()
            for fd in self._fds.values():
                try:
                    self._poller.unregister(fd._fileno)
                except:
                    logger.warning('unregister of %s failed with %s',
                                   fd._fileno, traceback.format_exc())
                fd._notifier = None
            self._fds.clear()
            self._timeouts = []
            if hasattr(self._poller, 'terminate'):
                self._poller.terminate()
            self._poller = None
            self.cmd_read = self.cmd_write = None
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def _terminate_task(self, task):
        """Internal use only.
        """
        self._lock.acquire()
        tid = task._id
        task = self._tasks.get(tid, None)
        if task is None:
            logger.warning('invalid task %s to terminate', tid)
            self._lock.release()
            return -1
        # TODO: if currently waiting I/O or holding locks, warn?
        if task._state == Pycos._Running:
            logger.warning('task to terminate %s/%s is running', task._name, tid)
        else:
            self._suspended.discard(tid)
            self._scheduled.add(tid)
            task._state = Pycos._Scheduled
            task._timeout = None
            task._callers = []
            if self._polling and len(self._scheduled) == 1:
                self._notifier.interrupt()
        task._exceptions.append((GeneratorExit, GeneratorExit('close')))
        self._lock.release()
        return 0
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def close(self):
        """Close the TarFile. In write-mode, two finishing zero blocks are
           appended to the archive.
        """
        if self.closed:
            return

        if self.mode in "aw":
            self.fileobj.write(NUL * (BLOCKSIZE * 2))
            self.offset += (BLOCKSIZE * 2)
            # fill up the end with zero-blocks
            # (like option -b20 for tar does)
            blocks, remainder = divmod(self.offset, RECORDSIZE)
            if remainder > 0:
                self.fileobj.write(NUL * (RECORDSIZE - remainder))

        if not self._extfileobj:
            self.fileobj.close()
        self.closed = True
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def close(self):
        """Close the _Stream object. No operation should be
           done on it afterwards.
        """
        if self.closed:
            return

        self.closed = True
        try:
            if self.mode == "w" and self.comptype != "tar":
                self.buf += self.cmp.flush()

            if self.mode == "w" and self.buf:
                self.fileobj.write(self.buf)
                self.buf = b""
                if self.comptype == "gz":
                    self.fileobj.write(struct.pack("<L", self.crc))
                    self.fileobj.write(struct.pack("<L", self.pos & 0xffffFFFF))
        finally:
            if not self._extfileobj:
                self.fileobj.close()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def close(self):
        """Close the TarFile. In write-mode, two finishing zero blocks are
           appended to the archive.
        """
        if self.closed:
            return

        self.closed = True
        try:
            if self.mode in ("a", "w", "x"):
                self.fileobj.write(NUL * (BLOCKSIZE * 2))
                self.offset += (BLOCKSIZE * 2)
                # fill up the end with zero-blocks
                # (like option -b20 for tar does)
                blocks, remainder = divmod(self.offset, RECORDSIZE)
                if remainder > 0:
                    self.fileobj.write(NUL * (RECORDSIZE - remainder))
        finally:
            if not self._extfileobj:
                self.fileobj.close()
项目:service-fabric-cli    作者:Azure    | 项目源码 | 文件源码
def upload_to_fileshare_test(self): #pylint: disable=no-self-use
        """Upload copies files to non-native store correctly with no
        progress"""
        import shutil
        import tempfile
        temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp())
        temp_src_dir = os.path.dirname(temp_file.name)
        temp_dst_dir = tempfile.mkdtemp()
        shutil_mock = MagicMock()
        shutil_mock.copyfile.return_value = None
        with patch('sfctl.custom_app.shutil', new=shutil_mock):
            sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False)
            shutil_mock.copyfile.assert_called_once()
        temp_file.close()
        shutil.rmtree(os.path.dirname(temp_file.name))
        shutil.rmtree(temp_dst_dir)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def close(self):
        """Close the TarFile. In write-mode, two finishing zero blocks are
           appended to the archive.
        """
        if self.closed:
            return

        if self.mode in "aw":
            self.fileobj.write(NUL * (BLOCKSIZE * 2))
            self.offset += (BLOCKSIZE * 2)
            # fill up the end with zero-blocks
            # (like option -b20 for tar does)
            blocks, remainder = divmod(self.offset, RECORDSIZE)
            if remainder > 0:
                self.fileobj.write(NUL * (RECORDSIZE - remainder))

        if not self._extfileobj:
            self.fileobj.close()
        self.closed = True
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def close(self):
        """Close the TarFile. In write-mode, two finishing zero blocks are
           appended to the archive.
        """
        if self.closed:
            return

        if self.mode in "aw":
            self.fileobj.write(NUL * (BLOCKSIZE * 2))
            self.offset += (BLOCKSIZE * 2)
            # fill up the end with zero-blocks
            # (like option -b20 for tar does)
            blocks, remainder = divmod(self.offset, RECORDSIZE)
            if remainder > 0:
                self.fileobj.write(NUL * (RECORDSIZE - remainder))

        if not self._extfileobj:
            self.fileobj.close()
        self.closed = True
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, cmd, bufsize=-1):
        _cleanup()
        self.cmd = cmd
        p2cread, p2cwrite = os.pipe()
        c2pread, c2pwrite = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            os.dup2(p2cread, 0)
            os.dup2(c2pwrite, 1)
            os.dup2(c2pwrite, 2)
            self._run_child(cmd)
        os.close(p2cread)
        self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
        os.close(c2pwrite)
        self.fromchild = os.fdopen(c2pread, 'r', bufsize)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def importfile(path):
    """Import a Python source file or compiled file given its path."""
    magic = imp.get_magic()
    file = open(path, 'r')
    if file.read(len(magic)) == magic:
        kind = imp.PY_COMPILED
    else:
        kind = imp.PY_SOURCE
    file.close()
    filename = os.path.basename(path)
    name, ext = os.path.splitext(filename)
    file = open(path, 'r')
    try:
        module = imp.load_module(name, file, path, (ext, 'r', kind))
    except:
        raise ErrorDuringImport(path, sys.exc_info())
    file.close()
    return module
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        subpath = self._lookup(key)
        f = open(os.path.join(self._path, subpath), 'r')
        try:
            if self._factory:
                msg = self._factory(f)
            else:
                msg = MaildirMessage(f)
        finally:
            f.close()
        subdir, name = os.path.split(subpath)
        msg.set_subdir(subdir)
        if self.colon in name:
            msg.set_info(name.split(self.colon)[-1])
        msg.set_date(os.path.getmtime(os.path.join(self._path, subpath)))
        return msg
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __setitem__(self, key, message):
        """Replace the keyed message; raise KeyError if it doesn't exist."""
        path = os.path.join(self._path, str(key))
        try:
            f = open(path, 'rb+')
        except IOError, e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
                self._dump_message(message, f)
                if isinstance(message, MHMessage):
                    self._dump_sequences(message, key)
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            _sync_close(f)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def get_string(self, key):
        """Return a string representation or raise a KeyError."""
        try:
            if self._locked:
                f = open(os.path.join(self._path, str(key)), 'r+')
            else:
                f = open(os.path.join(self._path, str(key)), 'r')
        except IOError, e:
            if e.errno == errno.ENOENT:
                raise KeyError('No message with key: %s' % key)
            else:
                raise
        try:
            if self._locked:
                _lock_file(f)
            try:
                return f.read()
            finally:
                if self._locked:
                    _unlock_file(f)
        finally:
            f.close()
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _get_soname(f):
            # assuming GNU binutils / ELF
            if not f:
                return None
            cmd = 'if ! type objdump >/dev/null 2>&1; then exit 10; fi;' \
                  "objdump -p -j .dynamic 2>/dev/null " + f
            f = os.popen(cmd)
            dump = f.read()
            rv = f.close()
            if rv == 10:
                raise OSError, 'objdump command not found'
            f = os.popen(cmd)
            try:
                data = f.read()
            finally:
                f.close()
            res = re.search(r'\sSONAME\s+([^\s]+)', data)
            if not res:
                return None
            return res.group(1)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _findLib_ldconfig(name):
            # XXX assuming GLIBC's ldconfig (with option -p)
            expr = r'/[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)
            f = os.popen('LC_ALL=C LANG=C /sbin/ldconfig -p 2>/dev/null')
            try:
                data = f.read()
            finally:
                f.close()
            res = re.search(expr, data)
            if not res:
                # Hm, this works only for libs needed by the python executable.
                cmd = 'ldd %s 2>/dev/null' % sys.executable
                f = os.popen(cmd)
                try:
                    data = f.read()
                finally:
                    f.close()
                res = re.search(expr, data)
                if not res:
                    return None
            return res.group(0)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def run_with_args(args, parser):
    # type: (argparse.Namespace, argparse.ArgumentParser) -> int
    set_logging_parameters(args, parser)
    start_time = time.time()
    ret = OK
    try:
        if args.profile:
            outline("Profiling...")
            profile("ret = whatstyle(args, parser)", locals(), globals())
        else:
            ret = whatstyle(args, parser)
    except IOError as exc:
        # If the output is piped into a pager like 'less' we get a broken pipe when
        # the pager is quit early and that is ok.
        if exc.errno == errno.EPIPE:
            pass
        elif str(exc) == 'Stream closed':
            pass
        else:
            raise
        if not PY2:
            sys.stderr.close()
    iprint(INFO_TIME, 'Run time: %s seconds' % (time.time() - start_time))
    return ret
项目:pocket-cli    作者:rakanalh    | 项目源码 | 文件源码
def get_terminal_size():
        def ioctl_GWINSZ(fd):
            try:
                import fcntl
                import termios
                import struct
                cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
                                                     '1234'))
            except:
                return None
            return cr
        cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
        if not cr:
            try:
                fd = os.open(os.ctermid(), os.O_RDONLY)
                cr = ioctl_GWINSZ(fd)
                os.close(fd)
            except:
                pass
        if not cr:
            try:
                cr = (os.env['LINES'], os.env['COLUMNS'])
            except:
                cr = (25, 80)
        return int(cr[1]), int(cr[0])
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def grab(bbox=None):
    if sys.platform == "darwin":
        f, file = tempfile.mkstemp('.png')
        os.close(f)
        subprocess.call(['screencapture', '-x', file])
        im = Image.open(file)
        im.load()
        os.unlink(file)
    else:
        size, data = grabber()
        im = Image.frombytes(
            "RGB", size, data,
            # RGB, 32-bit line padding, origo in lower left corner
            "raw", "BGR", (size[0]*3 + 3) & -4, -1
            )
    if bbox:
        im = im.crop(bbox)
    return im
项目:imagepaste    作者:robinchenyu    | 项目源码 | 文件源码
def close(self):
        """
        Closes the file pointer, if possible.

        This operation will destroy the image core and release its memory.
        The image data will be unusable afterward.

        This function is only required to close images that have not
        had their file read and closed by the
        :py:meth:`~PIL.Image.Image.load` method.
        """
        try:
            self.fp.close()
        except Exception as msg:
            logger.debug("Error closing: %s", msg)

        # Instead of simply setting to None, we're setting up a
        # deferred error that will better explain that the core image
        # object is gone.
        self.im = deferred_error(ValueError("Operation on closed image"))
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def terminate(self):
            """Close pipe and terminate child process.
            """
            self.close()
            super(Popen, self).terminate()
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def close(self):
            """Similar to 'close' of file descriptor.
            """
            if self._handle:
                try:
                    flags = win32pipe.GetNamedPipeInfo(self._handle)[0]
                except:
                    flags = 0

                if flags & win32con.PIPE_SERVER_END:
                    win32pipe.DisconnectNamedPipe(self._handle)
                # TODO: if pipe, need to call FlushFileBuffers?

                def _close_(rc, n):
                    win32file.CloseHandle(self._handle)
                    self._overlap = None
                    if self._notifier:
                        self._notifier.unregister(self._handle)
                    self._handle = None
                    self._read_result = self._write_result = None
                    self._read_task = self._write_task = None
                    self._buflist = []

                if self._overlap.object:
                    self._overlap.object = _close_
                    win32file.CancelIo(self._handle)
                else:
                    _close_(0, 0)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def close(self):
            """Close file descriptor.
            """
            if self._fileno:
                self._notifier.unregister(self)
                if self._fd:
                    self._fd.close()
                self._fd = self._fileno = None
                self._read_task = self._write_task = None
                self._read_fn = self._write_fn = None
                self._buflist = []
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def close(self):
        """Close pipe.
        """
        self.first = None
        self.last = None
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __del__(self):
        self.close()
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, trace):
        self.close()
        return True
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def close(self):
        """'close' must be called when done with socket.
        """
        self._unregister()
        if self._rsock:
            self._rsock.close()
            self._rsock = None
        self._read_fn = self._write_fn = None
        self._read_task = self._write_task = None
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def terminate(self):
                if self.iocp:
                    self.async_poller.terminate()
                    self.cmd_rsock.close()
                    self.cmd_wsock.close()
                    self.cmd_rsock_buf = None
                    iocp, self.iocp = self.iocp, None
                    win32file.CloseHandle(iocp)
                    self._timeouts = []
                    self.cmd_rsock = self.cmd_wsock = None
                    self.__class__._instance = None
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def terminate(self):
            """Close pipe and terminate child process.
            """
            self.close()
            super(Popen, self).terminate()
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def close(self):
            """Similar to 'close' of file descriptor.
            """
            if self._handle:
                try:
                    flags = win32pipe.GetNamedPipeInfo(self._handle)[0]
                except:
                    flags = 0

                if flags & win32con.PIPE_SERVER_END:
                    win32pipe.DisconnectNamedPipe(self._handle)
                # TODO: if pipe, need to call FlushFileBuffers?

                def _close_(rc, n):
                    win32file.CloseHandle(self._handle)
                    self._overlap = None
                    if self._notifier:
                        self._notifier.unregister(self._handle)
                    self._handle = None
                    self._read_result = self._write_result = None
                    self._read_task = self._write_task = None
                    self._buflist = []

                if self._overlap.object:
                    self._overlap.object = _close_
                    win32file.CancelIo(self._handle)
                else:
                    _close_(0, 0)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, trace):
        self.close()
        return True
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def close(self):
        """Close pipe.
        """
        self.first = None
        self.last = None
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __del__(self):
        self.close()
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, trace):
        self.close()
        return True
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, trace):
        self.close()
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def terminate(self):
                if self.iocp:
                    self.async_poller.terminate()
                    self.cmd_rsock.close()
                    self.cmd_wsock.close()
                    self.cmd_rsock_buf = None
                    iocp, self.iocp = self.iocp, None
                    win32file.CloseHandle(iocp)
                    self._timeouts = []
                    self.cmd_rsock = self.cmd_wsock = None
                    self.__class__._instance = None
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def close(self):
        if not self._location:
            self.unregister()
            self._subscribers = set()
            self._scheduler._lock.acquire()
            self._scheduler._channels.pop(self._name, None)
            self._scheduler._lock.release()
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def tmpfilepath():
    fd, fname = tempfile.mkstemp(prefix='djpt-test-')
    os.close(fd)
    yield fname
    os.remove(fname)
项目:txt2evernote    作者:Xunius    | 项目源码 | 文件源码
def __init__(self, content):
        if not isinstance(content, str):
            raise Exception("Note content must be an instance "
                            "of string, '%s' given." % type(content))

        (tempfileHandler, tempfileName) = tempfile.mkstemp(suffix=".markdown")
        os.write(tempfileHandler, self.ENMLtoText(content))
        os.close(tempfileHandler)

        self.content = content
        self.tempfile = tempfileName
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def close(self):
        """Closes any open connections"""
        pass
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def close(self):
        """Close the ssh connection"""
        ftp = self.client.open_sftp()
        ftp.remove(self.script_filename)
        ftp.close()
        self.client.close()
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def upload_file(self, filename, content, mode=None):
        ftp = self.client.open_sftp()
        file = ftp.file(filename, 'w', -1)
        file.write(content)
        file.flush()
        if mode:
            ftp.chmod(self.script_filename, 0o744)
        ftp.close()
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def get_tmp_script_filename(self):
        channel = self.transport.open_session()
        channel.get_pty()
        out = channel.makefile()
        channel.exec_command('mktemp')
        tmpfilename = out.readline().strip()
        channel.recv_exit_status()
        channel.close()
        return tmpfilename