Python gi.repository.GLib 模块,io_add_watch() 实例源码

我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用gi.repository.GLib.io_add_watch()

项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def attach_readfd(iochannel, context):
    # This does not work currently :-(
    # Also, things are mixed up a bit with iochannel vs. FDs apparently
    # as I am getting an int back in the handler
    # if isinstance(iochannel, int):
    #    iochannel = GLib.IOChannel(iochannel)
    #source = GLib.io_create_watch(iochannel, GLib.IOCondition(GLib.IO_IN | GLib.IO_PRI))
    #source.set_callback(_glib_signal_cb, context)
    # source.set_priority(GLib.PRIORITY_HIGH)
    # source.attach(context)
    GLib.io_add_watch(iochannel, GLib.IO_IN | GLib.IO_PRI, _glib_signal_cb, context)


# Now, this is our magic exception hook. It is *magic*
# it detects whether the exception happened in our signal handler.
# The fun part is that the handler does not run properly. The mainloop
# will actually try to run it again as soon as it can (if it manages).
# We use that to clear the exception option again.
项目:cavalcade    作者:worron    | 项目源码 | 文件源码
def color_update(self, bytedata):
        """Launch new calculation process with given image bytedata"""
        if bytedata is None:
            if self.default is None:
                if not self.config["image"]["default"].endswith(".svg"):  # fix this
                    file_ = self.config["image"]["default"]
                    self.handler_unblock(self.catcher)
                else:
                    return
            else:
                self.emit("ac-update", self.default)
                return
        else:
            file_ = io.BytesIO(bytedata)

        if self.process is None or not self.process.is_alive():
            if self.watcher is None:
                self.watcher = GLib.io_add_watch(self.pc, GLib.IO_IN | GLib.IO_HUP, self.color_setup)
            self.process = multiprocessing.Process(
                target=self.calculate, args=(file_, self.config["autocolor"], self.cc)
            )
            self.process.start()
        else:
            logger.error("Autocolor threading error: previus process still running, refusing to start new one")
项目:gdebi    作者:linuxmint    | 项目源码 | 文件源码
def _run_lintian(self, filename):
        buf = self.textview_lintian_output.get_buffer()
        if not os.path.exists("/usr/bin/lintian"):
            buf.set_text(
                _("No lintian available.\n"
                  "Please install using sudo apt-get install lintian"))
            return
        buf.set_text(_("Running lintian..."))
        self._lintian_output = ""
        self._lintian_exit_status = None
        self._lintian_exit_status_gathered = None
        cmd = ["/usr/bin/lintian", filename]
        (pid, stdin, stdout, stderr) = GLib.spawn_async(
            cmd, flags=GObject.SPAWN_DO_NOT_REAP_CHILD,
            standard_output=True, standard_error=True)
        for fd in [stdout, stderr]:
            channel = GLib.IOChannel(filedes=fd)
            channel.set_flags(GLib.IOFlags.NONBLOCK)
            GLib.io_add_watch(channel, GLib.PRIORITY_DEFAULT,
                              GLib.IOCondition.IN | GLib.IO_ERR | GLib.IO_HUP,
                              self._on_lintian_output)
        GLib.child_watch_add(GLib.PRIORITY_DEFAULT, pid,
                             self._on_lintian_finished)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def inputhook_gtk3():
    GLib.io_add_watch(sys.stdin, GLib.IO_IN, _main_quit)
    Gtk.main()
    return 0
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def inputhook(context):
    GLib.io_add_watch(context.fileno(), GLib.IO_IN, _main_quit)
    Gtk.main()
项目:eos-data-distribution    作者:endlessm    | 项目源码 | 文件源码
def connect(self, connectionInfo, elementListener, onConnected):
        super(GLibUnixTransport, self).connect(
            connectionInfo, elementListener, onConnected)

        fd = self._socket.fileno()
        io_channel = GLib.IOChannel.unix_new(fd)
        self._watch_id = GLib.io_add_watch(
            io_channel, GLib.PRIORITY_DEFAULT, GLib.IO_IN, self._socket_ready)
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def inputhook_gtk3():
    GLib.io_add_watch(sys.stdin, GLib.IO_IN, _main_quit)
    Gtk.main()
    return 0
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def inputhook(context):
    GLib.io_add_watch(context.fileno(), GLib.IO_IN, _main_quit)
    Gtk.main()
项目:blender    作者:gastrodia    | 项目源码 | 文件源码
def inputhook_gtk3():
    GLib.io_add_watch(sys.stdin, GLib.IO_IN, _main_quit)
    Gtk.main()
    return 0
项目:blender    作者:gastrodia    | 项目源码 | 文件源码
def inputhook(context):
    GLib.io_add_watch(context.fileno(), GLib.IO_IN, _main_quit)
    Gtk.main()
项目:yatta_reader    作者:sound88    | 项目源码 | 文件源码
def inputhook_gtk3():
    GLib.io_add_watch(sys.stdin, GLib.IO_IN, _main_quit)
    Gtk.main()
    return 0
项目:yatta_reader    作者:sound88    | 项目源码 | 文件源码
def inputhook(context):
    GLib.io_add_watch(context.fileno(), GLib.IO_IN, _main_quit)
    Gtk.main()
项目:x112v4l2    作者:romlok    | 项目源码 | 文件源码
def start_process(self):
        """
            Start the ffmpeg subprocess
        """
        if self.process and self.process.poll() is None:
            raise RuntimeError('Refusing to start process when already running')

        cmd = self.get_process_command()
        self.process = subprocess.Popen(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            stdin=subprocess.DEVNULL,
        )
        # Make the pipes non-blocking
        flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
        fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
        flags = fcntl.fcntl(self.process.stderr, fcntl.F_GETFL)
        fcntl.fcntl(self.process.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK)
        # Clear any output from previous incarnations
        self.clear_process_stdout()
        self.clear_process_stderr()

        # Update the UI when there's output from the process
        def output_callback(fd, condition, pipe, func):
            """ Read from pipe, and pass to the given func """
            output = pipe.read()
            if not output:
                return False
            func(output.decode('utf-8'))
            return condition != GLib.IO_HUP

        stdout_read_cb = GLib.io_add_watch(
            self.process.stdout,
            GLib.PRIORITY_DEFAULT,
            GLib.IO_IN | GLib.IO_HUP,
            output_callback,
            self.process.stdout,
            self.append_process_stdout,
        )
        GLib.io_add_watch(
            self.process.stderr,
            GLib.PRIORITY_DEFAULT,
            GLib.IO_IN | GLib.IO_HUP,
            output_callback,
            self.process.stderr,
            self.append_process_stderr,
        )

        # Update the UI
        self.show_process_state()