我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用errno.ENODEV。
def queryWirelessDevice(self,iface): try: from pythonwifi.iwlibs import Wireless import errno except ImportError: return False else: try: ifobj = Wireless(iface) # a Wireless NIC Object wlanresponse = ifobj.getAPaddr() except IOError, (error_no, error_str): if error_no in (errno.EOPNOTSUPP, errno.ENODEV, errno.EPERM): return False else: print "error: ",error_no,error_str return True else: return True
def test_init_linux_version_ack_err(self, mocker, transport): # noqa: F811 mocker.patch('nfc.clf.pn532.Device.__init__').return_value = None mocker.patch('nfc.clf.pn532.open').return_value = ["cpuinfo"] mocker.patch('os.system').return_value = -1 sys.platform = "linux" transport.write.return_value = None transport.read.side_effect = [ ERR(), # GetFirmwareVersion ] with pytest.raises(IOError) as excinfo: nfc.clf.pn532.init(transport) assert excinfo.value.errno == errno.ENODEV assert transport.write.mock_calls == [call(_) for _ in [ HEX(10 * '00') + CMD('02'), # GetFirmwareVersion ]]
def test_init_linux_version_rsp_err(self, mocker, transport): # noqa: F811 mocker.patch('nfc.clf.pn532.Device.__init__').return_value = None mocker.patch('nfc.clf.pn532.open').return_value = ["cpuinfo"] mocker.patch('os.system').return_value = -1 sys.platform = "linux" transport.write.return_value = None transport.read.side_effect = [ ACK(), ERR(), # GetFirmwareVersion ] with pytest.raises(IOError) as excinfo: nfc.clf.pn532.init(transport) assert excinfo.value.errno == errno.ENODEV assert transport.write.mock_calls == [call(_) for _ in [ HEX(10 * '00') + CMD('02'), # GetFirmwareVersion ]]
def test_init_linux_sam_cfg_ack_err(self, mocker, transport): # noqa: F811 mocker.patch('nfc.clf.pn532.Device.__init__').return_value = None mocker.patch('nfc.clf.pn532.open').return_value = ["cpuinfo"] mocker.patch('os.system').return_value = -1 sys.platform = "linux" transport.write.return_value = None transport.read.side_effect = [ ACK(), RSP('03 32010607'), # GetFirmwareVersion ERR(), # SAMConfiguration ] with pytest.raises(IOError) as excinfo: nfc.clf.pn532.init(transport) assert excinfo.value.errno == errno.ENODEV assert transport.write.mock_calls == [call(_) for _ in [ HEX(10 * '00') + CMD('02'), # GetFirmwareVersion HEX(10 * '00') + CMD('14 010000'), # SAMConfiguration ]]
def rfkill_block(idx): """ blocks the device at index :param idx: rkill index """ if not os.path.exists(os.path.join(spath,"rfkill{0}".format(idx))): raise pyric.error(errno.ENODEV,"No device at {0}".format(idx)) fout = None try: rfke = rfkh.rfkill_event(idx,rfkh.RFKILL_TYPE_ALL,rfkh.RFKILL_OP_CHANGE,1,0) if _PY3_: rfke = rfke.decode('ascii') fout = open(dpath, 'w') fout.write(rfke) except struct.error as e: raise pyric.error(pyric.EUNDEF,"Error packing rfkill event {0}".format(e)) except IOError as e: raise pyric.error(e.errno,e.message) finally: if fout: fout.close()
def rfkill_unblock(idx): """ unblocks the device at index :param idx: rkill index """ if not os.path.exists(os.path.join(spath, "rfkill{0}".format(idx))): raise pyric.error(errno.ENODEV, "No device at {0}".format(idx)) fout = None try: rfke = rfkh.rfkill_event(idx,rfkh.RFKILL_TYPE_ALL,rfkh.RFKILL_OP_CHANGE,0,0) fout = open(dpath, 'w') fout.write(rfke) except struct.error as e: raise pyric.error(pyric.EUNDEF,"Error packing rfkill event {0}".format(e)) except IOError as e: raise pyric.error(e.errno,e.message) finally: if fout: fout.close()
def soft_blocked(idx): """ determines soft block state of device :param idx: rkill index :returns: True if device at idx is soft blocked, False otherwise """ if not os.path.exists(os.path.join(spath,"rfkill{0}".format(idx))): raise pyric.error(errno.ENODEV,"No device at {0}".format(idx)) fin = None try: fin = open(os.path.join(spath,"rfkill{0}".format(idx),'soft'),'r') return int(fin.read().strip()) == 1 except IOError: raise pyric.error(errno.ENODEV,"No device at {0}".format(idx)) except ValueError: raise pyric.error(pyric.EUNDEF,"Unexpected error") finally: if fin: fin.close()
def hard_blocked(idx): """ determines hard block state of device :param idx: rkill index :returns: True if device at idx is hard blocked, False otherwise """ if not os.path.exists(os.path.join(spath,"rfkill{0}".format(idx))): raise pyric.error(errno.ENODEV,"No device at {0}".format(idx)) fin = None try: fin = open(os.path.join(spath,"rfkill{0}".format(idx),'hard'),'r') return int(fin.read().strip()) == 1 except IOError: raise pyric.error(errno.ENODEV,"No device at {0}".format(idx)) except ValueError: raise pyric.error(pyric.EUNDEF,"Unexpected error") finally: if fin: fin.close()
def sense(self, targets, **kwargs): """Send discovery and activation requests to find a target. Targets is a list of target specifications (TTA, TTB, TTF). Not all readers may support all possible target types. The return value is an activated target with a possibly updated specification (bitrate) or None. Additional keyword arguments are driver specific. .. note:: This is a direct interface to the driver and not needed if :meth:`connect` is used. """ if self.dev is None: raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) with self.lock: return self.dev.sense(targets, **kwargs)
def exchange(self, send_data, timeout): """Exchange data with an activated target (data is a command frame) or as an activated target (data is a response frame). Returns a target response frame (if data is send to an activated target) or a next command frame (if data is send from an activated target). Returns None if the communication link broke during exchange (if data is sent as a target). The timeout is the number of seconds to wait for data to return, if the timeout expires an nfc.clf.TimeoutException is raised. Other nfc.clf.DigitalProtocolExceptions may be raised if an error is detected during communication. .. note:: This is a direct interface to the driver and not needed if :meth:`connect` is used. """ if self.dev is None: raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) with self.lock: return self.dev.exchange(send_data, timeout)
def set_communication_mode(self, brm, **kwargs): """Set the hardware communication mode. The effect of calling this method depends on the hardware support, some drivers may purposely ignore this function. If supported, the parameter *brm* specifies the communication mode to choose as a string composed of the bitrate and modulation type, for example '212F' shall switch to 212 kbps Type F communication. Other communication parameters may be changed with optional keyword arguments. Currently implemented by the RC-S380 driver are the parameters 'add-crc' and 'check-crc' when running as initator. It is possible to set *brm* to an empty string if bitrate and modulation shall not be changed but only optional parameters executed. .. note:: This is a direct interface to the driver and not needed if :meth:`connect` is used. """ if self.dev is None: raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) with self.lock: self.dev.set_communication_mode(brm, **kwargs)
def queryWirelessDevice(self,iface): try: from pythonwifi.iwlibs import Wireless import errno except ImportError: return False else: try: ifobj = Wireless(iface) # a Wireless NIC Object wlanresponse = ifobj.getAPaddr() except IOError, (error_no, error_str): if error_no in (errno.EOPNOTSUPP, errno.ENODEV, errno.EPERM): return False else: print "[AdapterSetupConfiguration] error: ",error_no,error_str return True else: return True
def get_interface_mac(sock: socket.socket, ifname: str) -> str: """Obtain a network interface's MAC address, as a string.""" ifreq = struct.pack(b'256s', ifname.encode('utf-8')[:15]) try: info = fcntl.ioctl(sock.fileno(), SIOCGIFHWADDR, ifreq) except OSError as e: if e.errno is not None and e.errno == errno.ENODEV: raise InterfaceNotFound( "Interface not found: '%s'." % ifname) else: raise MACAddressNotAvailable( "Failed to get MAC address for '%s': %s." % ( ifname, strerror(e.errno))) else: # Of course we're sure these are the correct indexes into the `ifreq`. # Also, your lack of faith is disturbing. mac = ''.join('%02x:' % char for char in info[18:24])[:-1] return mac
def test_read(self, usb): usb.usb_dev.bulkRead.side_effect = [ b'12', b'34', nfc.clf.transport.libusb.USBErrorTimeout, nfc.clf.transport.libusb.USBErrorNoDevice, nfc.clf.transport.libusb.USBError, b'', ] assert usb.read() == b'12' usb.usb_dev.bulkRead.assert_called_with(0x84, 300, 0) assert usb.read(100) == b'34' usb.usb_dev.bulkRead.assert_called_with(0x84, 300, 100) with pytest.raises(IOError) as excinfo: usb.read() assert excinfo.value.errno == errno.ETIMEDOUT with pytest.raises(IOError) as excinfo: usb.read() assert excinfo.value.errno == errno.ENODEV with pytest.raises(IOError) as excinfo: usb.read() assert excinfo.value.errno == errno.EIO with pytest.raises(IOError) as excinfo: usb.read() assert excinfo.value.errno == errno.EIO usb.usb_inp = None assert usb.read() is None
def test_write(self, usb): usb.write(b'12') usb.usb_dev.bulkWrite.assert_called_with(0x04, b'12', 0) usb.write(b'12', 100) usb.usb_dev.bulkWrite.assert_called_with(0x04, b'12', 100) usb.write(64 * b'1', 100) usb.usb_dev.bulkWrite.assert_has_calls([ call(0x04, 64 * b'1', 100), call(0x04, b'', 100), ]) usb.usb_dev.bulkWrite.side_effect = [ nfc.clf.transport.libusb.USBErrorTimeout, nfc.clf.transport.libusb.USBErrorNoDevice, nfc.clf.transport.libusb.USBError, ] with pytest.raises(IOError) as excinfo: usb.write(b'12') assert excinfo.value.errno == errno.ETIMEDOUT with pytest.raises(IOError) as excinfo: usb.write(b'12') assert excinfo.value.errno == errno.ENODEV with pytest.raises(IOError) as excinfo: usb.write(b'12') assert excinfo.value.errno == errno.EIO usb.usb_out = None assert usb.write(b'12') is None
def test_init_linux_setbaud_ack_err(self, mocker, transport): # noqa: F811 mocker.patch('nfc.clf.pn532.Device.__init__').return_value = None mocker.patch('nfc.clf.pn532.open').return_value = ["cpuinfo"] stty = mocker.patch('os.system') stty.side_effect = [-1, 0, None] sys.platform = "linux" transport.write.return_value = None transport.read.side_effect = [ ACK(), RSP('03 32010607'), # GetFirmwareVersion ACK(), RSP('15'), # SAMConfiguration ERR(), # SetSerialBaudrate ] with pytest.raises(IOError) as excinfo: nfc.clf.pn532.init(transport) assert excinfo.value.errno == errno.ENODEV assert stty.mock_calls == [ call('stty -F /dev/ttyS0 921600 2> /dev/null'), call('stty -F /dev/ttyS0 460800 2> /dev/null'), call('stty -F /dev/ttyS0 115200 2> /dev/null'), ] assert transport.write.mock_calls == [call(_) for _ in [ HEX(10 * '00') + CMD('02'), # GetFirmwareVersion HEX(10 * '00') + CMD('14 010000'), # SAMConfiguration HEX(10 * '00') + CMD('10 06'), # SetSerialBaudrate ]]
def test_init_linux_setbaud_rsp_err(self, mocker, transport): # noqa: F811 mocker.patch('nfc.clf.pn532.Device.__init__').return_value = None mocker.patch('nfc.clf.pn532.open').return_value = ["cpuinfo"] stty = mocker.patch('os.system') stty.side_effect = [-1, 0, None] sys.platform = "linux" transport.write.return_value = None transport.read.side_effect = [ ACK(), RSP('03 32010607'), # GetFirmwareVersion ACK(), RSP('15'), # SAMConfiguration ACK(), ERR(), # SetSerialBaudrate ] with pytest.raises(IOError) as excinfo: nfc.clf.pn532.init(transport) assert excinfo.value.errno == errno.ENODEV assert stty.mock_calls == [ call('stty -F /dev/ttyS0 921600 2> /dev/null'), call('stty -F /dev/ttyS0 460800 2> /dev/null'), call('stty -F /dev/ttyS0 115200 2> /dev/null'), ] assert transport.write.mock_calls == [call(_) for _ in [ HEX(10 * '00') + CMD('02'), # GetFirmwareVersion HEX(10 * '00') + CMD('14 010000'), # SAMConfiguration HEX(10 * '00') + CMD('10 06'), # SetSerialBaudrate ]]
def __init__(self, transport): self.transport = transport # read ACR122U firmware version string reader_version = self.ccid_xfr_block(bytearray.fromhex("FF00480000")) if not reader_version.startswith("ACR122U"): log.error("failed to retrieve ACR122U version string") raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) if int(chr(reader_version[7])) < 2: log.error("{0} not supported, need 2.x".format(reader_version[7:])) raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) log.debug("initialize " + str(reader_version)) # set icc power on log.debug("CCID ICC-POWER-ON") frame = bytearray.fromhex("62000000000000000000") transport.write(frame) transport.read(100) # disable autodetection log.debug("Set PICC Operating Parameters") self.ccid_xfr_block(bytearray.fromhex("FF00517F00")) # switch red/green led off/on log.debug("Configure Buzzer and LED") self.set_buzzer_and_led_to_default() super(Chipset, self).__init__(transport, logger=log)
def write(self, frame, timeout=0): if self.usb_out is not None: log.log(logging.DEBUG-1, ">>> %s", hexlify(frame)) try: ep_addr = self.usb_out.getAddress() self.usb_dev.bulkWrite(ep_addr, bytes(frame), timeout) if len(frame) % self.usb_out.getMaxPacketSize() == 0: self.usb_dev.bulkWrite(ep_addr, b'', timeout) except libusb.USBErrorTimeout: raise IOError(errno.ETIMEDOUT, os.strerror(errno.ETIMEDOUT)) except libusb.USBErrorNoDevice: raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) except libusb.USBError as error: log.error("%r", error) raise IOError(errno.EIO, os.strerror(errno.EIO))
def __init__(self, path=None): self.device = None self.target = None self.lock = threading.Lock() if path and not self.open(path): raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
def exchange(self, send_data, timeout): """Exchange data with an activated target (*send_data* is a command frame) or as an activated target (*send_data* is a response frame). Returns a target response frame (if data is send to an activated target) or a next command frame (if data is send from an activated target). Returns None if the communication link broke during exchange (if data is sent as a target). The timeout is the number of seconds to wait for data to return, if the timeout expires an nfc.clf.TimeoutException is raised. Other nfc.clf.CommunicationError exceptions may be raised if an error is detected during communication. """ with self.lock: if self.device is None: raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) log.debug(">>> %s timeout=%s", print_data(send_data), str(timeout)) if isinstance(self.target, RemoteTarget): exchange = self.device.send_cmd_recv_rsp elif isinstance(self.target, LocalTarget): exchange = self.device.send_rsp_recv_cmd else: log.error("no target for data exchange") return None send_time = time.time() rcvd_data = exchange(self.target, send_data, timeout) recv_time = time.time() - send_time log.debug("<<< %s %.3fs", print_data(rcvd_data), recv_time) return rcvd_data
def max_send_data_size(self): """The maximum number of octets that can be send with the :meth:`exchange` method in the established operating mode. """ with self.lock: if self.device is None: raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) else: return self.device.get_max_send_data_size(self.target)
def max_recv_data_size(self): """The maximum number of octets that can be received with the :meth:`exchange` method in the established operating mode. """ with self.lock: if self.device is None: raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) else: return self.device.get_max_recv_data_size(self.target)
def test_main(): try: dsp = ossaudiodev.open('w') except (ossaudiodev.error, IOError) as msg: if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY): raise unittest.SkipTest(msg) raise dsp.close() support.run_unittest(__name__)
def test_main(): try: dsp = linuxaudiodev.open('w') except linuxaudiodev.error, msg: if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY): raise unittest.SkipTest(msg) raise dsp.close() run_unittest(LinuxAudioDevTests)
def test_main(): try: dsp = ossaudiodev.open('w') except (ossaudiodev.error, IOError), msg: if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY): raise unittest.SkipTest(msg) raise dsp.close() test_support.run_unittest(__name__)
def gettype(idx): """ returns the type of the device :param idx: rfkill index :returns: the type of the device """ fin = None try: fin = open(os.path.join(spath,"rfkill{0}".format(idx),'type'),'r') return fin.read().strip() except IOError: raise pyric.error(errno.ENODEV,"No device at {0}".format(idx)) finally: if fin: fin.close()
def __init__(self, path=None): self.dev = None self.lock = threading.Lock() if path and not self.open(path): raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
def listen(self, target, timeout): """Listen for *timeout* seconds to become initialized as a *target*. The *target* must be one of :class:`nfc.clf.TTA`, :class:`nfc.clf.TTB`, :class:`nfc.clf.TTF`, or :class:`nfc.clf.DEP` (note that target type support depends on the hardware capabilities). The return value is :const:`None` if *timeout* elapsed without activation or a tuple (target, command) where target is the activated target (which may differ from the requested target, see below) and command is the first command received from the initiator. If an activated target is returned, the target type and attributes may differ from the *target* requested. This is especically true if activation as a :class:`nfc.clf.DEP` target is requested but the contactless frontend does not have a hardware implementation of the data exchange protocol and returns a :class:`nfc.clf.TTA` or :class:`nfc.clf.TTF` target instead. .. note:: This is a direct interface to the driver and not needed if :meth:`connect` is used. """ if self.dev is None: raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) with self.lock: log.debug("listen for {0:.3f} sec as target {1}" .format(timeout, target)) if type(target) is TTA: return self.dev.listen_tta(target, timeout) if type(target) is TTB: return self.dev.listen_ttb(target, timeout) if type(target) is TTF: return self.dev.listen_ttf(target, timeout) if type(target) is DEP: return self.dev.listen_dep(target, timeout)
def __init__(self, transport): self.transport = transport for speed in (230400, 9600, 19200, 38400, 57600, 115200): log.debug("try serial baud rate {0} kbps".format(speed)) self.transport.tty = self.transport.serial.Serial( self.transport.tty.port, baudrate=speed, timeout=0.1) log.debug("read arygon firmware version") self.transport.tty.write("0av") version = self.transport.tty.readline() if version.startswith("FF0000"): log.debug("Arygon Reader {0}".format(version.strip()[-4:])) self.transport.tty.timeout = 1.0 self.transport.tty.writeTimeout = 1.0 log.debug("set mcu-tama speed to 230.4 kbps") self.transport.tty.write("0at05") if self.transport.tty.readline().strip() != "FF000000": log.debug("failed to set mcu-tama speed") break if self.transport.tty.baudrate != 230400: log.debug("set mcu-host speed to 230.4 kbps") self.transport.tty.write("0ah05") if self.transport.tty.readline().strip() != "FF000000": log.debug("failed to set mcu-host speed") break time.sleep(0.5) self.transport.tty.close() self.transport.tty = self.transport.serial.Serial( self.transport.tty.port, baudrate=230400, timeout=1.0, writeTimeout=1.0) return super(Chipset, self).__init__(transport) raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
def test_main(): try: dsp = ossaudiodev.open('w') except (ossaudiodev.error, OSError) as msg: if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY): raise unittest.SkipTest(msg) raise dsp.close() support.run_unittest(__name__)
def open(self, filename, *args, **kwargs): """ Attempt an open, but if the file is /dev/net/tun and it does not exist, translate the error into L{SkipTest} so that tests that require platform support for tuntap devices are skipped instead of failed. """ try: return super(TestRealSystem, self).open(filename, *args, **kwargs) except OSError as e: # The device file may simply be missing. The device file may also # exist but be unsupported by the kernel. if e.errno in (ENOENT, ENODEV) and filename == b"/dev/net/tun": raise SkipTest("Platform lacks /dev/net/tun") raise
def get_interface_ip(sock: socket.socket, ifname: str) -> str: """Obtain an IP address for a network interface, as a string.""" ifreq_tuple = (ifname.encode('utf-8')[:15], socket.AF_INET, b'\x00' * 14) ifreq = struct.pack(b'16sH14s', *ifreq_tuple) try: info = fcntl.ioctl(sock, SIOCGIFADDR, ifreq) except OSError as e: if e.errno == errno.ENODEV: raise InterfaceNotFound( "Interface not found: '%s'." % ifname) elif e.errno == errno.EADDRNOTAVAIL: raise IPAddressNotAvailable( "No IP address found on interface '%s'." % ifname) else: raise IPAddressNotAvailable( "Failed to get IP address for '%s': %s." % ( ifname, strerror(e.errno))) else: # Parse the `struct ifreq` that comes back from the ioctl() call. # 16x --> char ifr_name[IFNAMSIZ]; # ... next is a union of structures; we're interested in the # `sockaddr_in` that is returned from this particular ioctl(). # 2x --> short sin_family; # 2x --> unsigned short sin_port; # 4s --> struct in_addr sin_addr; # 8x --> char sin_zero[8]; addr, = struct.unpack(b'16x2x2x4s8x', info) ip = socket.inet_ntoa(addr) return ip
def init(transport): transport.open(transport.port, 115200) transport.tty.write("0av") # read version response = transport.tty.readline() if response.startswith("FF00000600V"): log.debug("Arygon Reader AxxB Version %s", response[11:].strip()) transport.tty.timeout = 0.5 transport.tty.write("0at05") if transport.tty.readline().startswith("FF0000"): log.debug("MCU/TAMA communication set to 230400 bps") transport.tty.write("0ah05") if transport.tty.readline().startswith("FF0000"): log.debug("MCU/HOST communication set to 230400 bps") transport.tty.baudrate = 230400 transport.tty.timeout = 0.1 time.sleep(0.1) chipset = ChipsetB(transport, logger=log) device = DeviceB(chipset, logger=log) device._vendor_name = "Arygon" device._device_name = "ADRB" return device transport.open(transport.port, 9600) transport.tty.write("0av") # read version response = transport.tty.readline() if response.startswith("FF00000600V"): log.debug("Arygon Reader AxxA Version %s", response[11:].strip()) transport.tty.timeout = 0.5 transport.tty.write("0at05") if transport.tty.readline().startswith("FF0000"): log.debug("MCU/TAMA communication set to 230400 bps") transport.tty.write("0ah05") if transport.tty.readline().startswith("FF0000"): log.debug("MCU/HOST communication set to 230400 bps") transport.tty.baudrate = 230400 transport.tty.timeout = 0.1 time.sleep(0.1) chipset = ChipsetA(transport, logger=log) device = DeviceA(chipset, logger=log) device._vendor_name = "Arygon" device._device_name = "ADRA" return device raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
def open(self, usb_bus, dev_adr): self.usb_dev = None self.usb_out = None self.usb_inp = None for dev in self.context.getDeviceList(skip_on_error=True): if ((dev.getBusNumber() == usb_bus and dev.getDeviceAddress() == dev_adr)): break else: log.error("no device {0} on bus {1}".format(dev_adr, usb_bus)) raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) try: first_setting = dev.iterSettings().next() except StopIteration: log.error("no usb configuration settings, please replug device") raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) def transfer_type(x): return x & libusb.TRANSFER_TYPE_MASK def endpoint_dir(x): return x & libusb.ENDPOINT_DIR_MASK for endpoint in first_setting.iterEndpoints(): ep_addr = endpoint.getAddress() ep_attr = endpoint.getAttributes() if transfer_type(ep_attr) == libusb.TRANSFER_TYPE_BULK: if endpoint_dir(ep_addr) == libusb.ENDPOINT_IN: if not self.usb_inp: self.usb_inp = endpoint if endpoint_dir(ep_addr) == libusb.ENDPOINT_OUT: if not self.usb_out: self.usb_out = endpoint if not (self.usb_inp and self.usb_out): log.error("no bulk endpoints for read and write") raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) try: # workaround the PN533's buggy USB implementation self._manufacturer_name = dev.getManufacturer() self._product_name = dev.getProduct() except libusb.USBErrorIO: self._manufacturer_name = None self._product_name = None try: self.usb_dev = dev.open() self.usb_dev.claimInterface(0) except libusb.USBErrorAccess: raise IOError(errno.EACCES, os.strerror(errno.EACCES)) except libusb.USBErrorBusy: raise IOError(errno.EBUSY, os.strerror(errno.EBUSY)) except libusb.USBErrorNoDevice: raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
def play_sound_file(self, data, rate, ssize, nchannels): try: dsp = ossaudiodev.open('w') except IOError as msg: if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY): raise unittest.SkipTest(msg) raise # at least check that these methods can be invoked dsp.bufsize() dsp.obufcount() dsp.obuffree() dsp.getptr() dsp.fileno() # Make sure the read-only attributes work. self.assertFalse(dsp.closed) self.assertEqual(dsp.name, "/dev/dsp") self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode) # And make sure they're really read-only. for attr in ('closed', 'name', 'mode'): try: setattr(dsp, attr, 42) except (TypeError, AttributeError): pass else: self.fail("dsp.%s not read-only" % attr) # Compute expected running time of sound sample (in seconds). expected_time = float(len(data)) / (ssize/8) / nchannels / rate # set parameters based on .au file headers dsp.setparameters(AFMT_S16_NE, nchannels, rate) self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time) t1 = time.time() dsp.write(data) dsp.close() t2 = time.time() elapsed_time = t2 - t1 percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100 self.assertTrue(percent_diff <= 10.0, "elapsed time (%s) > 10%% off of expected time (%s)" % (elapsed_time, expected_time))
def play_sound_file(self, data, rate, ssize, nchannels): try: dsp = ossaudiodev.open('w') except IOError, msg: if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY): raise unittest.SkipTest(msg) raise # at least check that these methods can be invoked dsp.bufsize() dsp.obufcount() dsp.obuffree() dsp.getptr() dsp.fileno() # Make sure the read-only attributes work. self.assertFalse(dsp.closed) self.assertEqual(dsp.name, "/dev/dsp") self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode) # And make sure they're really read-only. for attr in ('closed', 'name', 'mode'): try: setattr(dsp, attr, 42) except TypeError: pass else: self.fail("dsp.%s not read-only" % attr) # Compute expected running time of sound sample (in seconds). expected_time = float(len(data)) / (ssize//8) / nchannels / rate # set parameters based on .au file headers dsp.setparameters(AFMT_S16_NE, nchannels, rate) self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time) t1 = time.time() dsp.write(data) dsp.close() t2 = time.time() elapsed_time = t2 - t1 percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100 self.assertTrue(percent_diff <= 10.0, "elapsed time > 10% off of expected time")