Python gi.repository.GLib 模块,Error() 实例源码

我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用gi.repository.GLib.Error()

项目:python-eduvpn-client    作者:eduvpn    | 项目源码 | 文件源码
def bytes2pixbuf(data, width=icon_size['width'], height=icon_size['height'], display_name=None):
    """
    converts raw bytes into a GTK PixBug

    args:
        data (bytes): raw bytes
        width (int): width of image
        height (int): height of images

    returns:
        GtkPixbuf: a GTK Pixbuf

    """
    loader = GdkPixbuf.PixbufLoader()
    loader.set_size(width, height)
    try:
        loader.write(data)
        loader.close()
    except GLib.Error as e:
        logger.error("can't process icon for {}: {}".format(display_name, str(e)))
    else:
        return loader.get_pixbuf()
项目:btle-sniffer    作者:scipag    | 项目源码 | 文件源码
def _connect(self, device):
        def cb_connect():
            try:
                bus = pydbus.SystemBus()
                proxy = bus.get(SERVICE_NAME, device.path)
                proxy.Connect()
            except KeyError:
                self._log.debug("The device has likely disappeared.", exc_info=True)
            except GLib.Error:
                self._log.debug("Connect() failed:", exc_info=True)
            else:
                self._log.info("Connection successful.")

            self.queued_connections -= 1

        if self.queued_connections == 0:
            print_device(device, "Connecting")
            GLib.idle_add(cb_connect)
            device.connected = True
            self.queued_connections += 1
项目:coiotd    作者:coiot-ble    | 项目源码 | 文件源码
def refresh_devices(self):
        for a, d in self.adapter.devices.items():
            try:
                if a in self.ble_devices:
                    if not d.proxy.Connected:
                        del self.ble_devices[a]
                    else:
                        continue

                if not d.proxy.Connected:
                    continue

                for driver in self.drivers:
                    driver_devices = driver.probe(d)
                    for i, v in driver_devices.items():
                        da = self.ble_devices.setdefault(a, {})
                        da.setdefault(i, CompositeBleDevice()).extend(v)
            except GLib.Error as e:
                epart = e.message.split(':')
                if epart[0] != "GDBus.Error":
                    raise
                if not epart[1].startswith("org.bluez.Error"):
                    raise
                emsg = ':'.join(epart[1:])
                log.error("{}: {}".format(d, emsg))
项目:Icon-Requests    作者:bil-elmoussaoui    | 项目源码 | 文件源码
def get_icon(icon_path):
    """
        Generate a GdkPixbuf image
        :param image: icon name or image path
        :return: GdkPixbux Image
    """
    try:
        if "symbolic" in icon_path:
            icon = (None, icon_path)
        else:
            icon = GdkPixbuf.Pixbuf.new_from_file(icon_path)
            if icon.get_width() != 48 or icon.get_height() != 48:
                icon = icon.scale_simple(48, 48, GdkPixbuf.InterpType.BILINEAR)
        return icon
    except GLib.Error:
        return None
项目:lightdm-settings    作者:linuxmint    | 项目源码 | 文件源码
def update_icon_preview_cb(self, dialog, preview):
        # Different widths make the dialog look really crappy as it resizes -
        # constrain the width and adjust the height to keep perspective.
        filename = dialog.get_preview_filename()
        if filename is not None:
            if os.path.isfile(filename):
                try:
                    pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_size(filename, 320, -1)
                    if pixbuf is not None:
                        preview.set_from_pixbuf(pixbuf)
                        self.frame.show()
                        return
                except GLib.Error as e:
                    print("Unable to generate preview for file '%s' - %s\n", filename, e.message)

        preview.clear()
        self.frame.hide()
项目:sc-controller    作者:kozec    | 项目源码 | 文件源码
def _apply_css(config):
        if OSDWindow.css_provider:
            Gtk.StyleContext.remove_provider_for_screen(
                Gdk.Screen.get_default(), OSDWindow.css_provider)

        colors = {}
        for x in config['osk_colors'] : colors["osk_%s" % (x,)] = config['osk_colors'][x]
        for x in config['osd_colors'] : colors[x] = config['osd_colors'][x]
        colors = OSDCssMagic(colors)
        try:
            css_file = os.path.join(get_share_path(), "osd_styles", config["osd_style"])
            css = file(css_file, "r").read()
            if ((Gtk.get_major_version(), Gtk.get_minor_version()) > (3, 20)):
                css += OSDWindow.CSS_3_20
            OSDWindow.css_provider = Gtk.CssProvider()
            OSDWindow.css_provider.load_from_data((css % colors).encode("utf-8"))
            Gtk.StyleContext.add_provider_for_screen(
                    Gdk.Screen.get_default(),
                    OSDWindow.css_provider,
                    Gtk.STYLE_PROVIDER_PRIORITY_USER)
        except GLib.Error, e:
            log.error("Failed to apply css with user settings:")
            log.error(e)
            log.error("Retrying with default values")

            OSDWindow.css_provider = Gtk.CssProvider()
            css_file = os.path.join(get_share_path(), "osd_styles", "Classic.gtkstyle.css")
            css = file(css_file, "r").read()
            if ((Gtk.get_major_version(), Gtk.get_minor_version()) > (3, 20)):
                css += OSDWindow.CSS_3_20
            OSDWindow.css_provider.load_from_data((css % colors).encode("utf-8"))
            Gtk.StyleContext.add_provider_for_screen(
                    Gdk.Screen.get_default(),
                    OSDWindow.css_provider,
                    Gtk.STYLE_PROVIDER_PRIORITY_USER)
项目:bracer    作者:deikatsuo    | 项目源码 | 文件源码
def search(self, iterc, mode):
        project_dir = Bracer.get_tmp_dir()
        temp_file = tempfile.NamedTemporaryFile(dir=project_dir)

        buff = iterc.get_buffer()

        begin, end = buff.get_bounds()
        doc_text = buff.get_text(begin, end, True)

        temp_file.write(doc_text.encode('utf-8'))
        temp_file.seek(0)

        line = iterc.get_line() + 1
        column = iterc.get_line_offset()

        result = None
        try:
            launcher = Ide.SubprocessLauncher.new(Gio.SubprocessFlags.STDOUT_PIPE)
            launcher.push_argv(self.get_racer_path())
            launcher.push_argv(mode)
            launcher.push_argv(str(line))
            launcher.push_argv(str(column))
            launcher.push_argv(temp_file.name)
            launcher.set_run_on_host(True)
            sub_process = launcher.spawn()
            success, stdout, stderr = sub_process.communicate_utf8(None, None)

            if stdout:
                result = stdout

        except GLib.Error as e:
            pass

        temp_file.close()
        return result
项目:bracer    作者:deikatsuo    | 项目源码 | 文件源码
def version(self):
        result = None
        try:
            launcher = Ide.SubprocessLauncher.new(Gio.SubprocessFlags.STDOUT_PIPE)
            launcher.push_argv(self.get_racer_path())
            launcher.push_argv('-V') 
            launcher.set_run_on_host(True)
            sub_process = launcher.spawn()
            success, stdout, stderr = sub_process.communicate_utf8(None, None) 

            if stdout:
                result = stdout.replace('racer','').strip()
        except GLib.Error as e:
            pass
        return result
项目:btle-sniffer    作者:scipag    | 项目源码 | 文件源码
def __enter__(self):
        self._log.debug("Choosing the first available Bluetooth adapter and "
                        "starting device discovery.")
        self._log.debug("The discovery filter is set to Bluetooth LE only.")
        try:
            self.adapter = find_adapter()
            self.adapter.SetDiscoveryFilter({"Transport": pydbus.Variant("s", "le")})
            self.adapter.StartDiscovery()
        except GLib.Error as ex:
            self._log.exception("Is the bluetooth controller powered on? "
                                "Use `bluetoothctl`, `power on` otherwise.")
            raise ex
        return self
项目:eos-data-distribution    作者:endlessm    | 项目源码 | 文件源码
def _on_request_interest_complete(self, interface, res, interest):
        logger.info('Consumer: request interest complete: %s, %s, %s', interface, res, interest)

        try:
            name, final_segment, fd_list = interface.call_request_interest_finish(res)
            self._set_final_segment(final_segment)
        except GLib.Error as error:
            # XXX actual error handeling !
            # assuming TryAgain
            return self.expressInterest(interest)
项目:eos-data-distribution    作者:endlessm    | 项目源码 | 文件源码
def _on_request_interest_complete(self, interface, res, interest):
        logger.info('call complete, %s', res)
        try:
            name, data = interface.call_request_interest_finish(res)
        except GLib.Error as error:
            # XXX actual error handeling !
            # assuming TryAgain
            return self.expressInterest(interest)

        self.emit('data', interest, data)
        self.emit('complete')
项目:draobpilc    作者:awamper    | 项目源码 | 文件源码
def _get_thumb_path(self):
        result = None
        if (
            self.kind != HistoryItemKind.FILE and
            self.kind != HistoryItemKind.IMAGE
        ): return result
        filename = os.path.expanduser(self._raw)
        if not os.path.exists(filename): return result

        uri = 'file://%s' % filename
        file_ = Gio.file_new_for_uri(uri)

        try:
            info = file_.query_info(
                'standard::content-type,thumbnail::path',
                Gio.FileQueryInfoFlags.NONE
            )
            path = info.get_attribute_byte_string('thumbnail::path')
            self._content_type = info.get_content_type()
            is_image = self._content_type.startswith('image')

            if path:
                result = path
            elif is_image:
                try:
                    GdkPixbuf.Pixbuf.new_from_file_at_scale(
                        filename,
                        80,
                        80,
                        False
                    )
                except GLib.Error:
                    pass
                else:
                    result = filename
            else:
                pass
        except GLib.Error:
            pass
        finally:
            return result
项目:Icon-Requests    作者:bil-elmoussaoui    | 项目源码 | 文件源码
def __init__(self):
        try:
            repos_obj = Gio.File.new_for_uri('resource:///org/gnome/IconRequests/repos.json')
            self._repositories = loads(str(repos_obj.load_contents(None)[1].decode("utf-8")))
            self._get_supported_themes()
        except GLib.Error:
            logging.error("Error loading repository database file. Exiting")
            exit()
项目:labnote    作者:phragment    | 项目源码 | 文件源码
def uri_scheme_deny(self, request):
        # FIXME
        # .. image:: http://example.org/foo.jpg

        # GLib-GObject-WARNING **: invalid cast from 'WebKitSoupRequestGeneric' to 'SoupRequestHTTP'
        # libsoup-CRITICAL **: soup_request_http_get_message: assertion 'SOUP_IS_REQUEST_HTTP (http)' failed

        # this is not called!
        log.debug("uri scheme denied " + request.get_uri())
        err = GLib.Error("load cancelled: extern")
        request.finish_error(err)
项目:labnote    作者:phragment    | 项目源码 | 文件源码
def load_img(self, uri, request):
        log.debug("load image " + uri)
        try:
            stream = Gio.file_new_for_path(uri).read()
            request.finish(stream, -1, None)
        except GLib.Error:
            stream = Gio.MemoryInputStream.new_from_data("file not found".encode())
            request.finish(stream, -1, None)
项目:kickoff-player    作者:jonian    | 项目源码 | 文件源码
def image_from_path(path, size=48, image=None):
  gimage = Gtk.Image() if image is None else image

  try:
    pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(path, size, size, True)
    gimage.set_from_pixbuf(pixbuf)
  except GLib.Error:
    pass

  return gimage
项目:rauc-hawkbit    作者:rauc    | 项目源码 | 文件源码
def process_deployment(self, base):
        """
        Check for deployments, download them, verify checksum and trigger
        RAUC install operation.
        """
        if self.action_id is not None:
            self.logger.info('Deployment is already in progress')
            return

        # retrieve action id and resource parameter from URL
        deployment = base['_links']['deploymentBase']['href']
        match = re.search('/deploymentBase/(.+)\?c=(.+)$', deployment)
        action_id, resource = match.groups()
        self.logger.info('Deployment found for this target')
        # fetch deployment information
        deploy_info = await self.ddi.deploymentBase[action_id](resource)
        try:
            chunk = deploy_info['deployment']['chunks'][0]
        except IndexError:
            # send negative feedback to HawkBit
            status_execution = DeploymentStatusExecution.closed
            status_result = DeploymentStatusResult.failure
            msg = 'Deployment without chunks found. Ignoring'
            await self.ddi.deploymentBase[action_id].feedback(
                    status_execution, status_result, [msg])
            raise APIError(msg)

        try:
            artifact = chunk['artifacts'][0]
        except IndexError:
            # send negative feedback to HawkBit
            status_execution = DeploymentStatusExecution.closed
            status_result = DeploymentStatusResult.failure
            msg = 'Deployment without artifacts found. Ignoring'
            await self.ddi.deploymentBase[action_id].feedback(
                    status_execution, status_result, [msg])
            raise APIError(msg)
        # download artifact, check md5 and report feedback
        download_url = artifact['_links']['download-http']['href']
        md5_hash = artifact['hashes']['md5']
        self.logger.info('Starting bundle download')
        await self.download_artifact(action_id, download_url, md5_hash)

        # download successful, start install
        self.logger.info('Starting installation')
        try:
            self.action_id = action_id
            # do not interrupt install call
            await asyncio.shield(self.install())
        except GLib.Error as e:
            # send negative feedback to HawkBit
            status_execution = DeploymentStatusExecution.closed
            status_result = DeploymentStatusResult.failure
            await self.ddi.deploymentBase[action_id].feedback(
                    status_execution, status_result, [str(e)])
            raise APIError(str(e))
项目:labnote    作者:phragment    | 项目源码 | 文件源码
def uri_scheme_file(self, request):

        if log.isEnabledFor(logging.INFO):
            self.time_start = datetime.datetime.now()

        uri = request.get_uri()
        log.debug("----------")
        log.debug("loading")
        log.debug("URI " + uri)
        log.debug("state " + str(self.load_state))

        # handle non ascii
        uri = urllib.request.unquote(uri)
        # handle spaces in links
        uri = uri.replace(rechar, " ")

        uri, ext = uri2path(uri, os.path.dirname(self.current_file), startdir)
        log.debug("URI " + uri)

        if self.load_state == 0:

            if uri.endswith(".rst") and not ext:

                if self.tvbuffer.get_modified():
                    log.debug("cancel due to modified")
                    err = GLib.Error("load cancelled: open file modified")
                    request.finish_error(err)

                    self.saved_request = request.get_uri()

                    self.info.set_reveal_child(True)
                    self.info_box_button_ok.grab_focus()
                    return

                self.load_rst(uri, request)

                self.state_file.set_label("")
                self.state.set_label("")
                return

            self.open_uri(uri)
            err = GLib.Error("load cancelled: file opened externally")
            request.finish_error(err)
            return

        if self.load_state == 1:
            (typ, enc) = mimetypes.guess_type(uri)
            log.debug(typ)
            if typ and typ.startswith("image"):
                self.load_img(uri, request)
            return