我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用gi.repository.GLib.IO_HUP。
def copy_files(self, file_name, action): if action == 'make_backup': command = ['cp', '-R', self.winetricks_cache + '/' + file_name, self.winetricks_cache_backup] elif action == 'restore_backup': command = ['cp', '-R', self.winetricks_cache_backup + '/' + file_name, self.winetricks_cache] self.pid, stdin, stdout, stderr = GLib.spawn_async(command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch(GLib.IO_IN|GLib.IO_HUP, self.watch_process, 'copy_files', priority=GLib.PRIORITY_HIGH)
def check_for_new_games(self): self.goglib_new_games_list = [] self.goglib_updated_games_list = [] command = ['lgogdownloader', '--exclude', '1,2,4,8,16,32','--list'] pid, stdin, stdout, stderr = GLib.spawn_async( command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True ) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch( GLib.IO_IN|GLib.IO_HUP, self.watch_process, 'check_for_new_games', priority=GLib.PRIORITY_HIGH )
def run_goglib_setup_script(self, game_name): self.progressbar_goglib.set_text(_("Executing script...")) self.set_environ(game_name, self.goglib_download_dir, self.goglib_install_dir) command = [data_dir + '/scripts/goglib/' + game_name + '/setup'] goglib_name_to_pid_install_dict[game_name], stdin, stdout, stderr = GLib.spawn_async(command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch(GLib.IO_IN|GLib.IO_HUP, self.watch_process, 'run_goglib_setup_script', priority=GLib.PRIORITY_HIGH)
def color_update(self, bytedata): """Launch new calculation process with given image bytedata""" if bytedata is None: if self.default is None: if not self.config["image"]["default"].endswith(".svg"): # fix this file_ = self.config["image"]["default"] self.handler_unblock(self.catcher) else: return else: self.emit("ac-update", self.default) return else: file_ = io.BytesIO(bytedata) if self.process is None or not self.process.is_alive(): if self.watcher is None: self.watcher = GLib.io_add_watch(self.pc, GLib.IO_IN | GLib.IO_HUP, self.color_setup) self.process = multiprocessing.Process( target=self.calculate, args=(file_, self.config["autocolor"], self.cc) ) self.process.start() else: logger.error("Autocolor threading error: previus process still running, refusing to start new one")
def crop_file(self, file_name, crop): if crop == 1: region = '640:360:0:60' elif crop == 2: region = '640:400:0:40' command = ['ffmpeg', '-i', video_dir_bak + '/' + file_name, '-filter:v', 'crop=' + region, '-q', '1', video_dir + '/' + file_name] self.pid, stdin, stdout, stderr = GLib.spawn_async(command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch(GLib.IO_IN|GLib.IO_HUP, self.watch_process, 'cropping_video', priority=GLib.PRIORITY_HIGH)
def watch_process(self, io, condition, process_name): if condition is GLib.IO_HUP: self.progressbar.pulse() self.n_files -= 1 if self.n_files == 0: self.config_save_crop(self.crop) return False while Gtk.events_pending(): Gtk.main_iteration_do(False) return True
def watch_process(self, io, condition, process_name): if condition is GLib.IO_HUP: os.system('rm ' + current_dir + '/game/cache') os.system('mkdir -p ' + current_dir + '/game/cache') self.config_save() Gtk.main_quit() return False print io.readline().strip('\n') self.progressbar.pulse() return True
def crop_file_480(self, file_name, crop): if crop == 1: region = '640:360:0:60' elif crop == 2: region = '640:400:0:40' command = ['ffmpeg', '-i', video_dir_bak + '/' + file_name, '-filter:v', 'crop=' + region, '-vcodec', 'wmv2', '-q', '3', video_dir + '/' + file_name] self.pid, stdin, stdout, stderr = GLib.spawn_async(command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch(GLib.IO_IN|GLib.IO_HUP, self.watch_process, 'cropping_video_480', priority=GLib.PRIORITY_HIGH)
def crop_file_600(self, file_name, crop): if crop == 1: region = '800:450:0:75' elif crop == 2: region = '800:500:0:50' command = ['ffmpeg', '-i', video_dir_bak + '/' + file_name, '-filter:v', 'crop=' + region, '-vcodec', 'wmv2', '-q', '5', video_dir + '/' + file_name] self.pid, stdin, stdout, stderr = GLib.spawn_async(command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch(GLib.IO_IN|GLib.IO_HUP, self.watch_process, 'cropping_video_600', priority=GLib.PRIORITY_HIGH)
def _run_lintian(self, filename): buf = self.textview_lintian_output.get_buffer() if not os.path.exists("/usr/bin/lintian"): buf.set_text( _("No lintian available.\n" "Please install using sudo apt-get install lintian")) return buf.set_text(_("Running lintian...")) self._lintian_output = "" self._lintian_exit_status = None self._lintian_exit_status_gathered = None cmd = ["/usr/bin/lintian", filename] (pid, stdin, stdout, stderr) = GLib.spawn_async( cmd, flags=GObject.SPAWN_DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) for fd in [stdout, stderr]: channel = GLib.IOChannel(filedes=fd) channel.set_flags(GLib.IOFlags.NONBLOCK) GLib.io_add_watch(channel, GLib.PRIORITY_DEFAULT, GLib.IOCondition.IN | GLib.IO_ERR | GLib.IO_HUP, self._on_lintian_output) GLib.child_watch_add(GLib.PRIORITY_DEFAULT, pid, self._on_lintian_finished)
def watch_process(self, io, condition, process_name): if condition is GLib.IO_HUP: self.progressbar.pulse() self.n_files_to_copy -= 1 if self.n_files_to_copy == 0: message_dialog = Gtk.MessageDialog( self.main_window, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, _("Done!"), width_request = 360 ) content_area = message_dialog.get_content_area() content_area.set_property('margin-left', 10) content_area.set_property('margin-right', 10) content_area.set_property('margin-top', 10) content_area.set_property('margin-bottom', 10) action_area = message_dialog.get_action_area() action_area.set_property('spacing', 10) self.main_window.hide() message_dialog.run() message_dialog.destroy() Gtk.main_quit() return False while Gtk.events_pending(): Gtk.main_iteration_do(False) return True
def goglib_download_game(self, game_name): if self.goglib_offline_mode: self.goglib_unpack_game(goglib_installation_queue[0]) else: if not self.queue_tab_exists(): self.append_queue_tab() self.progressbar_goglib.set_text(_("Downloading...")) if not os.path.exists(self.goglib_download_dir + '/' + game_name): os.makedirs(self.goglib_download_dir + '/' + game_name) self.preferred_language = self.lang_index[self.goglib_lang.lower()] if self.goglib_download_extras == False: command = ['lgogdownloader', '--download', '--ignore-dlc-count', '--platform', '4,1', \ '--language', self.preferred_language + ',1,', '--game', game_name + '$', \ '--directory=' + self.goglib_download_dir + '/', '--exclude', '2,4,16'] elif self.goglib_download_extras == True: command = ['lgogdownloader', '--download', '--ignore-dlc-count', '--platform', '4,1', \ '--language', self.preferred_language + ',1,', '--game', game_name + '$', \ '--directory=' + self.goglib_download_dir + '/', '--exclude', '4,16'] goglib_name_to_pid_download_dict[game_name], stdin, stdout, stderr = GLib.spawn_async(command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch(GLib.IO_IN|GLib.IO_HUP, self.watch_process, 'goglib_download_game', priority=GLib.PRIORITY_HIGH)
def execute_command(self, command, process_name): pid, stdin, stdout, stderr = GLib.spawn_async(command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch(GLib.IO_IN|GLib.IO_HUP, self.watch_process, process_name, priority=GLib.PRIORITY_HIGH)
def watch_process(self, io, condition, process_name): if condition is GLib.IO_HUP: if process_name == 'cropping_video_480': self.progressbar.pulse() self.n_files -= 1 if self.n_files == 0: self.config_save_crop(self.crop) Gtk.main_quit() return False elif process_name == 'cropping_video_600': self.progressbar.pulse() self.n_files -= 1 if self.n_files == 0: self.config_save_crop(self.crop) Gtk.main_quit() return False print io.readline().strip('\n') return True
def update_goglib(self): self.window_update_message = Gtk.Window( title = _("Changes in library"), type = Gtk.WindowType.POPUP, window_position = Gtk.WindowPosition.CENTER_ALWAYS, resizable = False, icon = app_icon, ) self.box_update_message = Gtk.Box( orientation = Gtk.Orientation.HORIZONTAL ) self.label_update_message = Gtk.Label( label = _("Updating GOG library..."), margin_right = 10, margin_top = 20, margin_bottom = 20, ) self.spinner_update_message = Gtk.Spinner( active = True, visible = True, margin_left = 10, width_request = 48, height_request = 48 ) self.box_update_message.pack_start(self.spinner_update_message, True, True, 0) self.box_update_message.pack_start(self.label_update_message, True, True, 0) self.window_update_message.add(self.box_update_message) self.main_window.hide() if len(self.additional_windows_list) != 0: for window in self.additional_windows_list: window.hide() while Gtk.events_pending(): Gtk.main_iteration() self.window_update_message.show_all() command = ['lgogdownloader', '--exclude', '1,2,4,8,16,32','--list-details'] pid, stdin, stdout, stderr = GLib.spawn_async(command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch(GLib.IO_IN|GLib.IO_HUP, self.watch_process, 'update_goglib', priority=GLib.PRIORITY_HIGH)
def goglib_unpack_game(self, game_name): if not self.queue_tab_exists(): self.append_queue_tab() if not os.path.exists(self.goglib_install_dir): os.makedirs(self.goglib_install_dir) if not os.path.exists(self.goglib_install_dir + '/' + game_name): os.makedirs(self.goglib_install_dir + '/' + game_name) self.progressbar_goglib.set_text(_("Installing...")) files_list = os.listdir(self.goglib_download_dir + '/' + game_name) files_list = sorted(files_list) self.installer_type = 'none' number_of_installers = 0 versions = [] for line in files_list: if '.sh' in line: self.installer_type = 'sh' versions.append(line) number_of_installers += 1 elif '.exe' in line: self.installer_type = 'exe' versions.append(line) number_of_installers += 1 versions = sorted(versions) if number_of_installers > 0: if self.installer_type == 'sh': command = ['unzip', '-o', \ self.goglib_download_dir + '/' + game_name + '/' + versions[-1], \ '-d', self.goglib_install_dir + '/' + game_name + '/tmp'] elif self.installer_type == 'exe': command = ['innoextract', '--gog', \ self.goglib_download_dir + '/' + game_name + '/' + versions[-1], \ '-d', self.goglib_install_dir + '/' + game_name + '/tmp'] elif number_of_installers == 0: self.goglib_install_game(goglib_installation_queue[0]) return goglib_name_to_pid_unpack_dict[game_name], stdin, stdout, stderr = GLib.spawn_async(command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch(GLib.IO_IN|GLib.IO_HUP, self.watch_process, 'goglib_unpack_game', priority=GLib.PRIORITY_HIGH)
def start_process(self): """ Start the ffmpeg subprocess """ if self.process and self.process.poll() is None: raise RuntimeError('Refusing to start process when already running') cmd = self.get_process_command() self.process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.DEVNULL, ) # Make the pipes non-blocking flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL) fcntl.fcntl(self.process.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK) flags = fcntl.fcntl(self.process.stderr, fcntl.F_GETFL) fcntl.fcntl(self.process.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK) # Clear any output from previous incarnations self.clear_process_stdout() self.clear_process_stderr() # Update the UI when there's output from the process def output_callback(fd, condition, pipe, func): """ Read from pipe, and pass to the given func """ output = pipe.read() if not output: return False func(output.decode('utf-8')) return condition != GLib.IO_HUP stdout_read_cb = GLib.io_add_watch( self.process.stdout, GLib.PRIORITY_DEFAULT, GLib.IO_IN | GLib.IO_HUP, output_callback, self.process.stdout, self.append_process_stdout, ) GLib.io_add_watch( self.process.stderr, GLib.PRIORITY_DEFAULT, GLib.IO_IN | GLib.IO_HUP, output_callback, self.process.stderr, self.append_process_stderr, ) # Update the UI self.show_process_state()
def cb_button_patch(self, button): if os.path.exists(game_dir + '/' + 'br2fsaaConfig.exe'): self.launch_fsaa_settings() else: patch_path = download_dir + '/_distr/bloodrayne_2/BR2_FSAA_Patch_1.666.rar' if not os.path.exists(patch_path): message_dialog = Gtk.MessageDialog( self.main_window, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, _("Patch not found in download directory.") ) content_area = message_dialog.get_content_area() content_area.set_property('margin-left', 10) content_area.set_property('margin-right', 10) content_area.set_property('margin-top', 10) content_area.set_property('margin-bottom', 10) self.main_window.hide() message_dialog.run() message_dialog.destroy() self.main_window.show() else: self.button_patch.set_sensitive(False) while Gtk.events_pending(): Gtk.main_iteration_do(False) command = ['7z', 'e', '-aoa', '-o' + game_dir, patch_path] self.pid, stdin, stdout, stderr = GLib.spawn_async(command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch(GLib.IO_IN|GLib.IO_HUP, self.watch_process, 'extracting', priority=GLib.PRIORITY_HIGH)
def cb_button_install_std(self, button): modpacks_path = download_dir + '/_distr/the_temple_of_elemental_evil/' + exe_co8_modpack_std if not os.path.exists(modpacks_path): message_dialog = Gtk.MessageDialog( self.co8_std_window, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, _("Modpack not found in download directory") ) content_area = message_dialog.get_content_area() content_area.set_property('margin-left', 10) content_area.set_property('margin-right', 10) content_area.set_property('margin-top', 10) content_area.set_property('margin-bottom', 10) self.co8_std_window.hide() message_dialog.run() message_dialog.destroy() self.co8_std_window.show() else: self.box_std.set_visible(False) self.progressbar_std.set_visible(True) command = ['innoextract', modpacks_path, '-d', game_dir + '/tmp'] self.pid_std, stdin, stdout, stderr = GLib.spawn_async(command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch(GLib.IO_IN|GLib.IO_HUP, self.watch_process, 'extracting std', priority=GLib.PRIORITY_HIGH)
def cb_button_install_nc(self, button): modpacks_path = download_dir + '/_distr/the_temple_of_elemental_evil/' + exe_co8_modpack_nc if not os.path.exists(modpacks_path): message_dialog = Gtk.MessageDialog( self.co8_nc_window, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, _("Modpack not found in download directory") ) content_area = message_dialog.get_content_area() content_area.set_property('margin-left', 10) content_area.set_property('margin-right', 10) content_area.set_property('margin-top', 10) content_area.set_property('margin-bottom', 10) self.co8_nc_window.hide() message_dialog.run() message_dialog.destroy() self.co8_nc_window.show() else: self.box_nc.set_visible(False) self.progressbar_nc.set_visible(True) command = ['innoextract', modpacks_path, '-d', game_dir + '/tmp'] self.pid_nc, stdin, stdout, stderr = GLib.spawn_async(command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch(GLib.IO_IN|GLib.IO_HUP, self.watch_process, 'extracting nc', priority=GLib.PRIORITY_HIGH)
def cb_button_install_kob(self, button): modpacks_path = download_dir + '/_distr/the_temple_of_elemental_evil/' + exe_co8_kob if not os.path.exists(modpacks_path): message_dialog = Gtk.MessageDialog( self.co8_kob_window, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, _("Mod not found in download directory") ) content_area = message_dialog.get_content_area() content_area.set_property('margin-left', 10) content_area.set_property('margin-right', 10) content_area.set_property('margin-top', 10) content_area.set_property('margin-bottom', 10) self.co8_kob_window.hide() message_dialog.run() message_dialog.destroy() self.co8_kob_window.show() else: self.box_kob.set_visible(False) self.progressbar_kob.set_visible(True) command = ['innoextract', modpacks_path, '-d', game_dir + '/tmp'] self.pid_kob, stdin, stdout, stderr = GLib.spawn_async(command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch(GLib.IO_IN|GLib.IO_HUP, self.watch_process, 'extracting kob', priority=GLib.PRIORITY_HIGH)
def cb_button_patch(self, button): patch_path = download_dir + '/_distr/pathologic/Pathologic_Widescreen_Addon.exe' if not os.path.exists(patch_path): message_dialog = Gtk.MessageDialog( self.main_window, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.OK, _("Addon not found in download directory.") ) content_area = message_dialog.get_content_area() content_area.set_property('margin-left', 10) content_area.set_property('margin-right', 10) content_area.set_property('margin-top', 10) content_area.set_property('margin-bottom', 10) self.main_window.hide() message_dialog.run() message_dialog.destroy() self.main_window.show() else: self.button_config.set_sensitive(False) self.button_patch.set_visible(False) self.progressbar.set_visible(True) while Gtk.events_pending(): Gtk.main_iteration_do(False) command = ['innoextract', patch_path, '-d', game_dir + '/tmp'] self.pid, stdin, stdout, stderr = GLib.spawn_async(command, flags=GLib.SpawnFlags.SEARCH_PATH|GLib.SpawnFlags.DO_NOT_REAP_CHILD, standard_output=True, standard_error=True) io = GLib.IOChannel(stdout) self.source_id_out = io.add_watch(GLib.IO_IN|GLib.IO_HUP, self.watch_process, 'extracting', priority=GLib.PRIORITY_HIGH)