Python errno 模块,ENODEV 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用errno.ENODEV

项目:enigma2    作者:OpenLD    | 项目源码 | 文件源码
def queryWirelessDevice(self,iface):
        try:
            from pythonwifi.iwlibs import Wireless
            import errno
        except ImportError:
            return False
        else:
            try:
                ifobj = Wireless(iface) # a Wireless NIC Object
                wlanresponse = ifobj.getAPaddr()
            except IOError, (error_no, error_str):
                if error_no in (errno.EOPNOTSUPP, errno.ENODEV, errno.EPERM):
                    return False
                else:
                    print "error: ",error_no,error_str
                    return True
            else:
                return True
项目:enigma2    作者:OpenLD    | 项目源码 | 文件源码
def queryWirelessDevice(self,iface):
        try:
            from pythonwifi.iwlibs import Wireless
            import errno
        except ImportError:
            return False
        else:
            try:
                ifobj = Wireless(iface) # a Wireless NIC Object
                wlanresponse = ifobj.getAPaddr()
            except IOError, (error_no, error_str):
                if error_no in (errno.EOPNOTSUPP, errno.ENODEV, errno.EPERM):
                    return False
                else:
                    print "error: ",error_no,error_str
                    return True
            else:
                return True
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def test_init_linux_version_ack_err(self, mocker, transport):  # noqa: F811
        mocker.patch('nfc.clf.pn532.Device.__init__').return_value = None
        mocker.patch('nfc.clf.pn532.open').return_value = ["cpuinfo"]
        mocker.patch('os.system').return_value = -1
        sys.platform = "linux"

        transport.write.return_value = None
        transport.read.side_effect = [
            ERR(),                                        # GetFirmwareVersion
        ]
        with pytest.raises(IOError) as excinfo:
            nfc.clf.pn532.init(transport)
        assert excinfo.value.errno == errno.ENODEV
        assert transport.write.mock_calls == [call(_) for _ in [
            HEX(10 * '00') + CMD('02'),                   # GetFirmwareVersion
        ]]
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def test_init_linux_version_rsp_err(self, mocker, transport):  # noqa: F811
        mocker.patch('nfc.clf.pn532.Device.__init__').return_value = None
        mocker.patch('nfc.clf.pn532.open').return_value = ["cpuinfo"]
        mocker.patch('os.system').return_value = -1
        sys.platform = "linux"

        transport.write.return_value = None
        transport.read.side_effect = [
            ACK(), ERR(),                                 # GetFirmwareVersion
        ]
        with pytest.raises(IOError) as excinfo:
            nfc.clf.pn532.init(transport)
        assert excinfo.value.errno == errno.ENODEV
        assert transport.write.mock_calls == [call(_) for _ in [
            HEX(10 * '00') + CMD('02'),                   # GetFirmwareVersion
        ]]
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def test_init_linux_sam_cfg_ack_err(self, mocker, transport):  # noqa: F811
        mocker.patch('nfc.clf.pn532.Device.__init__').return_value = None
        mocker.patch('nfc.clf.pn532.open').return_value = ["cpuinfo"]
        mocker.patch('os.system').return_value = -1
        sys.platform = "linux"

        transport.write.return_value = None
        transport.read.side_effect = [
            ACK(), RSP('03 32010607'),                    # GetFirmwareVersion
            ERR(),                                        # SAMConfiguration
        ]
        with pytest.raises(IOError) as excinfo:
            nfc.clf.pn532.init(transport)
        assert excinfo.value.errno == errno.ENODEV
        assert transport.write.mock_calls == [call(_) for _ in [
            HEX(10 * '00') + CMD('02'),                   # GetFirmwareVersion
            HEX(10 * '00') + CMD('14 010000'),            # SAMConfiguration
        ]]
项目:enigma2    作者:Openeight    | 项目源码 | 文件源码
def queryWirelessDevice(self,iface):
        try:
            from pythonwifi.iwlibs import Wireless
            import errno
        except ImportError:
            return False
        else:
            try:
                ifobj = Wireless(iface) # a Wireless NIC Object
                wlanresponse = ifobj.getAPaddr()
            except IOError, (error_no, error_str):
                if error_no in (errno.EOPNOTSUPP, errno.ENODEV, errno.EPERM):
                    return False
                else:
                    print "error: ",error_no,error_str
                    return True
            else:
                return True
项目:PyRIC    作者:wraith-wireless    | 项目源码 | 文件源码
def rfkill_block(idx):
    """
     blocks the device at index
     :param idx: rkill index
    """
    if not os.path.exists(os.path.join(spath,"rfkill{0}".format(idx))):
        raise pyric.error(errno.ENODEV,"No device at {0}".format(idx))
    fout = None
    try:
        rfke = rfkh.rfkill_event(idx,rfkh.RFKILL_TYPE_ALL,rfkh.RFKILL_OP_CHANGE,1,0)
        if _PY3_: rfke = rfke.decode('ascii')
        fout = open(dpath, 'w')
        fout.write(rfke)
    except struct.error as e:
        raise pyric.error(pyric.EUNDEF,"Error packing rfkill event {0}".format(e))
    except IOError as e:
        raise pyric.error(e.errno,e.message)
    finally:
        if fout: fout.close()
项目:PyRIC    作者:wraith-wireless    | 项目源码 | 文件源码
def rfkill_unblock(idx):
    """
     unblocks the device at index
     :param idx: rkill index
    """
    if not os.path.exists(os.path.join(spath, "rfkill{0}".format(idx))):
        raise pyric.error(errno.ENODEV, "No device at {0}".format(idx))
    fout = None
    try:
        rfke = rfkh.rfkill_event(idx,rfkh.RFKILL_TYPE_ALL,rfkh.RFKILL_OP_CHANGE,0,0)
        fout = open(dpath, 'w')
        fout.write(rfke)
    except struct.error as e:
        raise pyric.error(pyric.EUNDEF,"Error packing rfkill event {0}".format(e))
    except IOError as e:
        raise pyric.error(e.errno,e.message)
    finally:
        if fout: fout.close()
项目:PyRIC    作者:wraith-wireless    | 项目源码 | 文件源码
def soft_blocked(idx):
    """
     determines soft block state of device
     :param idx: rkill index
     :returns: True if device at idx is soft blocked, False otherwise
    """
    if not os.path.exists(os.path.join(spath,"rfkill{0}".format(idx))):
        raise pyric.error(errno.ENODEV,"No device at {0}".format(idx))
    fin = None
    try:
        fin = open(os.path.join(spath,"rfkill{0}".format(idx),'soft'),'r')
        return int(fin.read().strip()) == 1
    except IOError:
        raise pyric.error(errno.ENODEV,"No device at {0}".format(idx))
    except ValueError:
        raise pyric.error(pyric.EUNDEF,"Unexpected error")
    finally:
        if fin: fin.close()
项目:PyRIC    作者:wraith-wireless    | 项目源码 | 文件源码
def hard_blocked(idx):
    """
     determines hard block state of device
     :param idx: rkill index
     :returns: True if device at idx is hard blocked, False otherwise
    """
    if not os.path.exists(os.path.join(spath,"rfkill{0}".format(idx))):
        raise pyric.error(errno.ENODEV,"No device at {0}".format(idx))
    fin = None
    try:
        fin = open(os.path.join(spath,"rfkill{0}".format(idx),'hard'),'r')
        return int(fin.read().strip()) == 1
    except IOError:
        raise pyric.error(errno.ENODEV,"No device at {0}".format(idx))
    except ValueError:
        raise pyric.error(pyric.EUNDEF,"Unexpected error")
    finally:
        if fin: fin.close()
项目:bitpay-brick    作者:javgh    | 项目源码 | 文件源码
def sense(self, targets, **kwargs):
        """Send discovery and activation requests to find a
        target. Targets is a list of target specifications (TTA, TTB,
        TTF). Not all readers may support all possible target
        types. The return value is an activated target with a possibly
        updated specification (bitrate) or None.

        Additional keyword arguments are driver specific.

        .. note:: This is a direct interface to the
           driver and not needed if :meth:`connect` is used.
        """
        if self.dev is None:
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        with self.lock:
            return self.dev.sense(targets, **kwargs)
项目:bitpay-brick    作者:javgh    | 项目源码 | 文件源码
def exchange(self, send_data, timeout):
        """Exchange data with an activated target (data is a command
        frame) or as an activated target (data is a response
        frame). Returns a target response frame (if data is send to an
        activated target) or a next command frame (if data is send
        from an activated target). Returns None if the communication
        link broke during exchange (if data is sent as a target). The
        timeout is the number of seconds to wait for data to return,
        if the timeout expires an nfc.clf.TimeoutException is
        raised. Other nfc.clf.DigitalProtocolExceptions may be raised
        if an error is detected during communication.

        .. note:: This is a direct interface to the
           driver and not needed if :meth:`connect` is used.
        """
        if self.dev is None:
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        with self.lock:
            return self.dev.exchange(send_data, timeout)
项目:bitpay-brick    作者:javgh    | 项目源码 | 文件源码
def set_communication_mode(self, brm, **kwargs):
        """Set the hardware communication mode. The effect of calling
        this method depends on the hardware support, some drivers may
        purposely ignore this function. If supported, the parameter
        *brm* specifies the communication mode to choose as a string
        composed of the bitrate and modulation type, for example
        '212F' shall switch to 212 kbps Type F communication. Other
        communication parameters may be changed with optional keyword
        arguments. Currently implemented by the RC-S380 driver are the
        parameters 'add-crc' and 'check-crc' when running as
        initator. It is possible to set *brm* to an empty string if
        bitrate and modulation shall not be changed but only optional
        parameters executed.

        .. note:: This is a direct interface to the
           driver and not needed if :meth:`connect` is used.
        """
        if self.dev is None:
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        with self.lock:
            self.dev.set_communication_mode(brm, **kwargs)
项目:enigma2    作者:BlackHole    | 项目源码 | 文件源码
def queryWirelessDevice(self,iface):
        try:
            from pythonwifi.iwlibs import Wireless
            import errno
        except ImportError:
            return False
        else:
            try:
                ifobj = Wireless(iface) # a Wireless NIC Object
                wlanresponse = ifobj.getAPaddr()
            except IOError, (error_no, error_str):
                if error_no in (errno.EOPNOTSUPP, errno.ENODEV, errno.EPERM):
                    return False
                else:
                    print "[AdapterSetupConfiguration] error: ",error_no,error_str
                    return True
            else:
                return True
项目:maas    作者:maas    | 项目源码 | 文件源码
def get_interface_mac(sock: socket.socket, ifname: str) -> str:
    """Obtain a network interface's MAC address, as a string."""
    ifreq = struct.pack(b'256s', ifname.encode('utf-8')[:15])
    try:
        info = fcntl.ioctl(sock.fileno(), SIOCGIFHWADDR, ifreq)
    except OSError as e:
        if e.errno is not None and e.errno == errno.ENODEV:
            raise InterfaceNotFound(
                "Interface not found: '%s'." % ifname)
        else:
            raise MACAddressNotAvailable(
                "Failed to get MAC address for '%s': %s." % (
                    ifname, strerror(e.errno)))
    else:
        # Of course we're sure these are the correct indexes into the `ifreq`.
        # Also, your lack of faith is disturbing.
        mac = ''.join('%02x:' % char for char in info[18:24])[:-1]
    return mac
项目:enigma2-openpli-fulan    作者:Taapat    | 项目源码 | 文件源码
def queryWirelessDevice(self,iface):
        try:
            from pythonwifi.iwlibs import Wireless
            import errno
        except ImportError:
            return False
        else:
            try:
                ifobj = Wireless(iface) # a Wireless NIC Object
                wlanresponse = ifobj.getAPaddr()
            except IOError, (error_no, error_str):
                if error_no in (errno.EOPNOTSUPP, errno.ENODEV, errno.EPERM):
                    return False
                else:
                    print "error: ",error_no,error_str
                    return True
            else:
                return True
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def test_read(self, usb):
        usb.usb_dev.bulkRead.side_effect = [
            b'12',
            b'34',
            nfc.clf.transport.libusb.USBErrorTimeout,
            nfc.clf.transport.libusb.USBErrorNoDevice,
            nfc.clf.transport.libusb.USBError,
            b'',
        ]
        assert usb.read() == b'12'
        usb.usb_dev.bulkRead.assert_called_with(0x84, 300, 0)

        assert usb.read(100) == b'34'
        usb.usb_dev.bulkRead.assert_called_with(0x84, 300, 100)

        with pytest.raises(IOError) as excinfo:
            usb.read()
        assert excinfo.value.errno == errno.ETIMEDOUT

        with pytest.raises(IOError) as excinfo:
            usb.read()
        assert excinfo.value.errno == errno.ENODEV

        with pytest.raises(IOError) as excinfo:
            usb.read()
        assert excinfo.value.errno == errno.EIO

        with pytest.raises(IOError) as excinfo:
            usb.read()
        assert excinfo.value.errno == errno.EIO

        usb.usb_inp = None
        assert usb.read() is None
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def test_write(self, usb):
        usb.write(b'12')
        usb.usb_dev.bulkWrite.assert_called_with(0x04, b'12', 0)

        usb.write(b'12', 100)
        usb.usb_dev.bulkWrite.assert_called_with(0x04, b'12', 100)

        usb.write(64 * b'1', 100)
        usb.usb_dev.bulkWrite.assert_has_calls([
            call(0x04, 64 * b'1', 100),
            call(0x04, b'', 100),
        ])

        usb.usb_dev.bulkWrite.side_effect = [
            nfc.clf.transport.libusb.USBErrorTimeout,
            nfc.clf.transport.libusb.USBErrorNoDevice,
            nfc.clf.transport.libusb.USBError,
        ]
        with pytest.raises(IOError) as excinfo:
            usb.write(b'12')
        assert excinfo.value.errno == errno.ETIMEDOUT

        with pytest.raises(IOError) as excinfo:
            usb.write(b'12')
        assert excinfo.value.errno == errno.ENODEV

        with pytest.raises(IOError) as excinfo:
            usb.write(b'12')
        assert excinfo.value.errno == errno.EIO

        usb.usb_out = None
        assert usb.write(b'12') is None
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def test_init_linux_setbaud_ack_err(self, mocker, transport):  # noqa: F811
        mocker.patch('nfc.clf.pn532.Device.__init__').return_value = None
        mocker.patch('nfc.clf.pn532.open').return_value = ["cpuinfo"]
        stty = mocker.patch('os.system')
        stty.side_effect = [-1, 0, None]
        sys.platform = "linux"

        transport.write.return_value = None
        transport.read.side_effect = [
            ACK(), RSP('03 32010607'),                    # GetFirmwareVersion
            ACK(), RSP('15'),                             # SAMConfiguration
            ERR(),                                        # SetSerialBaudrate
        ]
        with pytest.raises(IOError) as excinfo:
            nfc.clf.pn532.init(transport)
        assert excinfo.value.errno == errno.ENODEV
        assert stty.mock_calls == [
            call('stty -F /dev/ttyS0 921600 2> /dev/null'),
            call('stty -F /dev/ttyS0 460800 2> /dev/null'),
            call('stty -F /dev/ttyS0 115200 2> /dev/null'),
        ]
        assert transport.write.mock_calls == [call(_) for _ in [
            HEX(10 * '00') + CMD('02'),                   # GetFirmwareVersion
            HEX(10 * '00') + CMD('14 010000'),            # SAMConfiguration
            HEX(10 * '00') + CMD('10 06'),                # SetSerialBaudrate
        ]]
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def test_init_linux_setbaud_rsp_err(self, mocker, transport):  # noqa: F811
        mocker.patch('nfc.clf.pn532.Device.__init__').return_value = None
        mocker.patch('nfc.clf.pn532.open').return_value = ["cpuinfo"]
        stty = mocker.patch('os.system')
        stty.side_effect = [-1, 0, None]
        sys.platform = "linux"

        transport.write.return_value = None
        transport.read.side_effect = [
            ACK(), RSP('03 32010607'),                    # GetFirmwareVersion
            ACK(), RSP('15'),                             # SAMConfiguration
            ACK(), ERR(),                                 # SetSerialBaudrate
        ]
        with pytest.raises(IOError) as excinfo:
            nfc.clf.pn532.init(transport)
        assert excinfo.value.errno == errno.ENODEV
        assert stty.mock_calls == [
            call('stty -F /dev/ttyS0 921600 2> /dev/null'),
            call('stty -F /dev/ttyS0 460800 2> /dev/null'),
            call('stty -F /dev/ttyS0 115200 2> /dev/null'),
        ]
        assert transport.write.mock_calls == [call(_) for _ in [
            HEX(10 * '00') + CMD('02'),                   # GetFirmwareVersion
            HEX(10 * '00') + CMD('14 010000'),            # SAMConfiguration
            HEX(10 * '00') + CMD('10 06'),                # SetSerialBaudrate
        ]]
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def __init__(self, transport):
        self.transport = transport

        # read ACR122U firmware version string
        reader_version = self.ccid_xfr_block(bytearray.fromhex("FF00480000"))
        if not reader_version.startswith("ACR122U"):
            log.error("failed to retrieve ACR122U version string")
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        if int(chr(reader_version[7])) < 2:
            log.error("{0} not supported, need 2.x".format(reader_version[7:]))
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        log.debug("initialize " + str(reader_version))

        # set icc power on
        log.debug("CCID ICC-POWER-ON")
        frame = bytearray.fromhex("62000000000000000000")
        transport.write(frame)
        transport.read(100)

        # disable autodetection
        log.debug("Set PICC Operating Parameters")
        self.ccid_xfr_block(bytearray.fromhex("FF00517F00"))

        # switch red/green led off/on
        log.debug("Configure Buzzer and LED")
        self.set_buzzer_and_led_to_default()

        super(Chipset, self).__init__(transport, logger=log)
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def write(self, frame, timeout=0):
        if self.usb_out is not None:
            log.log(logging.DEBUG-1, ">>> %s", hexlify(frame))
            try:
                ep_addr = self.usb_out.getAddress()
                self.usb_dev.bulkWrite(ep_addr, bytes(frame), timeout)
                if len(frame) % self.usb_out.getMaxPacketSize() == 0:
                    self.usb_dev.bulkWrite(ep_addr, b'', timeout)
            except libusb.USBErrorTimeout:
                raise IOError(errno.ETIMEDOUT, os.strerror(errno.ETIMEDOUT))
            except libusb.USBErrorNoDevice:
                raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
            except libusb.USBError as error:
                log.error("%r", error)
                raise IOError(errno.EIO, os.strerror(errno.EIO))
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def __init__(self, path=None):
        self.device = None
        self.target = None
        self.lock = threading.Lock()
        if path and not self.open(path):
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def exchange(self, send_data, timeout):
        """Exchange data with an activated target (*send_data* is a command
        frame) or as an activated target (*send_data* is a response
        frame). Returns a target response frame (if data is send to an
        activated target) or a next command frame (if data is send
        from an activated target). Returns None if the communication
        link broke during exchange (if data is sent as a target). The
        timeout is the number of seconds to wait for data to return,
        if the timeout expires an nfc.clf.TimeoutException is
        raised. Other nfc.clf.CommunicationError exceptions may be raised if
        an error is detected during communication.

        """
        with self.lock:
            if self.device is None:
                raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

            log.debug(">>> %s timeout=%s", print_data(send_data), str(timeout))

            if isinstance(self.target, RemoteTarget):
                exchange = self.device.send_cmd_recv_rsp
            elif isinstance(self.target, LocalTarget):
                exchange = self.device.send_rsp_recv_cmd
            else:
                log.error("no target for data exchange")
                return None

            send_time = time.time()
            rcvd_data = exchange(self.target, send_data, timeout)
            recv_time = time.time() - send_time

            log.debug("<<< %s %.3fs", print_data(rcvd_data), recv_time)
            return rcvd_data
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def max_send_data_size(self):
        """The maximum number of octets that can be send with the
        :meth:`exchange` method in the established operating mode.

        """
        with self.lock:
            if self.device is None:
                raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
            else:
                return self.device.get_max_send_data_size(self.target)
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def max_recv_data_size(self):
        """The maximum number of octets that can be received with the
        :meth:`exchange` method in the established operating mode.

        """
        with self.lock:
            if self.device is None:
                raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
            else:
                return self.device.get_max_recv_data_size(self.target)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError) as msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    support.run_unittest(__name__)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = linuxaudiodev.open('w')
    except linuxaudiodev.error, msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    run_unittest(LinuxAudioDevTests)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError), msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    test_support.run_unittest(__name__)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = linuxaudiodev.open('w')
    except linuxaudiodev.error, msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    run_unittest(LinuxAudioDevTests)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError), msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    test_support.run_unittest(__name__)
项目:PyRIC    作者:wraith-wireless    | 项目源码 | 文件源码
def gettype(idx):
    """
     returns the type of the device
     :param idx: rfkill index
     :returns: the type of the device
    """
    fin = None
    try:
        fin = open(os.path.join(spath,"rfkill{0}".format(idx),'type'),'r')
        return fin.read().strip()
    except IOError:
        raise pyric.error(errno.ENODEV,"No device at {0}".format(idx))
    finally:
        if fin: fin.close()
项目:bitpay-brick    作者:javgh    | 项目源码 | 文件源码
def __init__(self, path=None):
        self.dev = None
        self.lock = threading.Lock()
        if path and not self.open(path):
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
项目:bitpay-brick    作者:javgh    | 项目源码 | 文件源码
def listen(self, target, timeout):
        """Listen for *timeout* seconds to become initialized as a
        *target*. The *target* must be one of :class:`nfc.clf.TTA`,
        :class:`nfc.clf.TTB`, :class:`nfc.clf.TTF`, or
        :class:`nfc.clf.DEP` (note that target type support depends on
        the hardware capabilities). The return value is :const:`None`
        if *timeout* elapsed without activation or a tuple (target,
        command) where target is the activated target (which may
        differ from the requested target, see below) and command is
        the first command received from the initiator.

        If an activated target is returned, the target type and
        attributes may differ from the *target* requested. This is
        especically true if activation as a :class:`nfc.clf.DEP`
        target is requested but the contactless frontend does not have
        a hardware implementation of the data exchange protocol and
        returns a :class:`nfc.clf.TTA` or :class:`nfc.clf.TTF` target
        instead.

        .. note:: This is a direct interface to the
           driver and not needed if :meth:`connect` is used.
        """
        if self.dev is None:
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        with self.lock:
            log.debug("listen for {0:.3f} sec as target {1}"
                      .format(timeout, target))

            if type(target) is TTA:
                return self.dev.listen_tta(target, timeout)
            if type(target) is TTB:
                return self.dev.listen_ttb(target, timeout)
            if type(target) is TTF:
                return self.dev.listen_ttf(target, timeout)
            if type(target) is DEP:
                return self.dev.listen_dep(target, timeout)
项目:bitpay-brick    作者:javgh    | 项目源码 | 文件源码
def __init__(self, transport):
        self.transport = transport
        for speed in (230400, 9600, 19200, 38400, 57600, 115200):
            log.debug("try serial baud rate {0} kbps".format(speed))
            self.transport.tty = self.transport.serial.Serial(
                self.transport.tty.port, baudrate=speed, timeout=0.1)
            log.debug("read arygon firmware version")
            self.transport.tty.write("0av")
            version = self.transport.tty.readline()
            if version.startswith("FF0000"):
                log.debug("Arygon Reader {0}".format(version.strip()[-4:]))                
                self.transport.tty.timeout = 1.0
                self.transport.tty.writeTimeout = 1.0
                log.debug("set mcu-tama speed to 230.4 kbps")
                self.transport.tty.write("0at05")
                if self.transport.tty.readline().strip() != "FF000000":
                    log.debug("failed to set mcu-tama speed")
                    break
                if self.transport.tty.baudrate != 230400:
                    log.debug("set mcu-host speed to 230.4 kbps")
                    self.transport.tty.write("0ah05")
                    if self.transport.tty.readline().strip() != "FF000000":
                        log.debug("failed to set mcu-host speed")
                        break
                    time.sleep(0.5)
                    self.transport.tty.close()
                    self.transport.tty = self.transport.serial.Serial(
                        self.transport.tty.port, baudrate=230400,
                        timeout=1.0, writeTimeout=1.0)
                return super(Chipset, self).__init__(transport)
        raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError) as msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    support.run_unittest(__name__)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = linuxaudiodev.open('w')
    except linuxaudiodev.error, msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    run_unittest(LinuxAudioDevTests)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError), msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    test_support.run_unittest(__name__)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, OSError) as msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    support.run_unittest(__name__)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = linuxaudiodev.open('w')
    except linuxaudiodev.error, msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    run_unittest(LinuxAudioDevTests)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError), msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    test_support.run_unittest(__name__)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def open(self, filename, *args, **kwargs):
        """
        Attempt an open, but if the file is /dev/net/tun and it does not exist,
        translate the error into L{SkipTest} so that tests that require
        platform support for tuntap devices are skipped instead of failed.
        """
        try:
            return super(TestRealSystem, self).open(filename, *args, **kwargs)
        except OSError as e:
            # The device file may simply be missing.  The device file may also
            # exist but be unsupported by the kernel.
            if e.errno in (ENOENT, ENODEV) and filename == b"/dev/net/tun":
                raise SkipTest("Platform lacks /dev/net/tun")
            raise
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, OSError) as msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    support.run_unittest(__name__)
项目:maas    作者:maas    | 项目源码 | 文件源码
def get_interface_ip(sock: socket.socket, ifname: str) -> str:
    """Obtain an IP address for a network interface, as a string."""
    ifreq_tuple = (ifname.encode('utf-8')[:15], socket.AF_INET, b'\x00' * 14)
    ifreq = struct.pack(b'16sH14s', *ifreq_tuple)
    try:
        info = fcntl.ioctl(sock, SIOCGIFADDR, ifreq)
    except OSError as e:
        if e.errno == errno.ENODEV:
            raise InterfaceNotFound(
                "Interface not found: '%s'." % ifname)
        elif e.errno == errno.EADDRNOTAVAIL:
            raise IPAddressNotAvailable(
                "No IP address found on interface '%s'." % ifname)
        else:
            raise IPAddressNotAvailable(
                "Failed to get IP address for '%s': %s." % (
                    ifname, strerror(e.errno)))
    else:
        # Parse the `struct ifreq` that comes back from the ioctl() call.
        #     16x --> char ifr_name[IFNAMSIZ];
        # ... next is a union of structures; we're interested in the
        # `sockaddr_in` that is returned from this particular ioctl().
        #     2x  --> short sin_family;
        #     2x  --> unsigned short sin_port;
        #     4s  --> struct in_addr sin_addr;
        #     8x  --> char sin_zero[8];
        addr, = struct.unpack(b'16x2x2x4s8x', info)
        ip = socket.inet_ntoa(addr)
    return ip
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def init(transport):
    transport.open(transport.port, 115200)
    transport.tty.write("0av")  # read version
    response = transport.tty.readline()
    if response.startswith("FF00000600V"):
        log.debug("Arygon Reader AxxB Version %s", response[11:].strip())
        transport.tty.timeout = 0.5
        transport.tty.write("0at05")
        if transport.tty.readline().startswith("FF0000"):
            log.debug("MCU/TAMA communication set to 230400 bps")
            transport.tty.write("0ah05")
            if transport.tty.readline().startswith("FF0000"):
                log.debug("MCU/HOST communication set to 230400 bps")
                transport.tty.baudrate = 230400
                transport.tty.timeout = 0.1
                time.sleep(0.1)
                chipset = ChipsetB(transport, logger=log)
                device = DeviceB(chipset, logger=log)
                device._vendor_name = "Arygon"
                device._device_name = "ADRB"
                return device

    transport.open(transport.port, 9600)
    transport.tty.write("0av")  # read version
    response = transport.tty.readline()
    if response.startswith("FF00000600V"):
        log.debug("Arygon Reader AxxA Version %s", response[11:].strip())
        transport.tty.timeout = 0.5
        transport.tty.write("0at05")
        if transport.tty.readline().startswith("FF0000"):
            log.debug("MCU/TAMA communication set to 230400 bps")
            transport.tty.write("0ah05")
            if transport.tty.readline().startswith("FF0000"):
                log.debug("MCU/HOST communication set to 230400 bps")
                transport.tty.baudrate = 230400
                transport.tty.timeout = 0.1
                time.sleep(0.1)
                chipset = ChipsetA(transport, logger=log)
                device = DeviceA(chipset, logger=log)
                device._vendor_name = "Arygon"
                device._device_name = "ADRA"
                return device

    raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
项目:nfcpy    作者:nfcpy    | 项目源码 | 文件源码
def open(self, usb_bus, dev_adr):
        self.usb_dev = None
        self.usb_out = None
        self.usb_inp = None

        for dev in self.context.getDeviceList(skip_on_error=True):
            if ((dev.getBusNumber() == usb_bus and
                 dev.getDeviceAddress() == dev_adr)):
                break
        else:
            log.error("no device {0} on bus {1}".format(dev_adr, usb_bus))
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        try:
            first_setting = dev.iterSettings().next()
        except StopIteration:
            log.error("no usb configuration settings, please replug device")
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        def transfer_type(x):
            return x & libusb.TRANSFER_TYPE_MASK

        def endpoint_dir(x):
            return x & libusb.ENDPOINT_DIR_MASK

        for endpoint in first_setting.iterEndpoints():
            ep_addr = endpoint.getAddress()
            ep_attr = endpoint.getAttributes()
            if transfer_type(ep_attr) == libusb.TRANSFER_TYPE_BULK:
                if endpoint_dir(ep_addr) == libusb.ENDPOINT_IN:
                    if not self.usb_inp:
                        self.usb_inp = endpoint
                if endpoint_dir(ep_addr) == libusb.ENDPOINT_OUT:
                    if not self.usb_out:
                        self.usb_out = endpoint

        if not (self.usb_inp and self.usb_out):
            log.error("no bulk endpoints for read and write")
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        try:
            # workaround the PN533's buggy USB implementation
            self._manufacturer_name = dev.getManufacturer()
            self._product_name = dev.getProduct()
        except libusb.USBErrorIO:
            self._manufacturer_name = None
            self._product_name = None

        try:
            self.usb_dev = dev.open()
            self.usb_dev.claimInterface(0)
        except libusb.USBErrorAccess:
            raise IOError(errno.EACCES, os.strerror(errno.EACCES))
        except libusb.USBErrorBusy:
            raise IOError(errno.EBUSY, os.strerror(errno.EBUSY))
        except libusb.USBErrorNoDevice:
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError as msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except (TypeError, AttributeError):
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize/8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time (%s) > 10%% off of expected time (%s)" %
                        (elapsed_time, expected_time))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError, msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except TypeError:
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize//8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time > 10% off of expected time")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError, msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except TypeError:
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize//8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time > 10% off of expected time")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError as msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except (TypeError, AttributeError):
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize/8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time (%s) > 10%% off of expected time (%s)" %
                        (elapsed_time, expected_time))