Python fcntl 模块,ioctl() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用fcntl.ioctl()

项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def read_word_data(self, addr, cmd):
        """Read a word (2 bytes) from the specified cmd register of the device.
        Note that this will interpret data using the endianness of the processor
        running Python (typically little endian)!
        """
        assert self._device is not None, 'Bus must be opened before operations are made against it!'
        # Build ctypes values to marshall between ioctl and Python.
        reg = c_uint8(cmd)
        result = c_uint16()
        # Build ioctl request.
        request = make_i2c_rdwr_data([
            (addr, 0, 1, pointer(reg)),             # Write cmd register.
            (addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8)))   # Read word (2 bytes).
        ])
        # Make ioctl call and return result data.
        ioctl(self._device.fileno(), I2C_RDWR, request)
        return result.value
项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def process_call(self, addr, cmd, val):
        """Perform a smbus process call by writing a word (2 byte) value to
        the specified register of the device, and then reading a word of response
        data (which is returned).
        """
        assert self._device is not None, 'Bus must be opened before operations are made against it!'
        # Build ctypes values to marshall between ioctl and Python.
        data = create_string_buffer(struct.pack('=BH', cmd, val))
        result = c_uint16()
        # Build ioctl request.
        request = make_i2c_rdwr_data([
            (addr, 0, 3, cast(pointer(data), POINTER(c_uint8))),          # Write data.
            (addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8)))  # Read word (2 bytes).
        ])
        # Make ioctl call and return result data.
        ioctl(self._device.fileno(), I2C_RDWR, request)
        # Note the python-smbus code appears to have a rather serious bug and
        # does not return the result value!  This is fixed below by returning it.
        return result.value
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def get_iphostname():
    '''??linux?????????IP??'''
    def get_ip(ifname):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
        ipaddr = socket.inet_ntoa(fcntl.ioctl(
                            sock.fileno(), 
                            0x8915,  # SIOCGIFADDR 
                            struct.pack('256s', ifname[:15]) 
                            )[20:24]
                            )   
        sock.close()
        return ipaddr
    try:
        ip = get_ip('eth0')
    except IOError:
        ip = get_ip('eno1')
    hostname = socket.gethostname()
    return {'hostname': hostname, 'ip':ip}
项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def read_i2c_block_data(self, addr, cmd, length=32):
        """Perform a read from the specified cmd register of device.  Length number
        of bytes (default of 32) will be read and returned as a bytearray.
        """
        assert self._device is not None, 'Bus must be opened before operations are made against it!'
        # Build ctypes values to marshall between ioctl and Python.
        reg = c_uint8(cmd)
        result = create_string_buffer(length)
        # Build ioctl request.
        request = make_i2c_rdwr_data([
            (addr, 0, 1, pointer(reg)),             # Write cmd register.
            (addr, I2C_M_RD, length, cast(result, POINTER(c_uint8)))   # Read data.
        ])
        # Make ioctl call and return result data.
        ioctl(self._device.fileno(), I2C_RDWR, request)
        return bytearray(result.raw)  # Use .raw instead of .value which will stop at a null byte!
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def get_iphostname():
    '''??linux?????????IP??'''
    def get_ip(ifname):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 
        ipaddr = socket.inet_ntoa(fcntl.ioctl(
                            sock.fileno(), 
                            0x8915,  # SIOCGIFADDR 
                            struct.pack('256s', ifname[:15]) 
                            )[20:24]
                            )   
        sock.close()
        return ipaddr
    try:
        ip = get_ip('eth0')
    except IOError:
        ip = get_ip('eno1')
    hostname = socket.gethostname()
    return {'hostname': hostname, 'ip':ip}
项目:pocket-cli    作者:rakanalh    | 项目源码 | 文件源码
def get_terminal_size():
        def ioctl_GWINSZ(fd):
            try:
                import fcntl
                import termios
                import struct
                cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
                                                     '1234'))
            except:
                return None
            return cr
        cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
        if not cr:
            try:
                fd = os.open(os.ctermid(), os.O_RDONLY)
                cr = ioctl_GWINSZ(fd)
                os.close(fd)
            except:
                pass
        if not cr:
            try:
                cr = (os.env['LINES'], os.env['COLUMNS'])
            except:
                cr = (25, 80)
        return int(cr[1]), int(cr[0])
项目:kAFL    作者:RUB-SysSec    | 项目源码 | 文件源码
def check_vmx_pt():
    from fcntl import ioctl

    KVMIO = 0xAE
    KVM_VMX_PT_SUPPORTED = KVMIO << (8) | 0xe4

    try:
        fd = open("/dev/kvm", "wb")
    except:
        print('\033[91m' + error_prefix + "KVM is not loaded!" + '\033[0m')
        return False

    try:
        ret = ioctl(fd, KVM_VMX_PT_SUPPORTED, 0)
    except IOError:
        print('\033[91m' + error_prefix + "VMX_PT is not loaded!" + '\033[0m')
        return False
    fd.close()

    if ret == 0:
        print('\033[91m' + error_prefix + "Intel PT is not supported on this CPU!" + '\033[0m')
        return False

    return True
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def getipaddr(self, ifname='eth0'):
        import socket
        import struct
        ret = '127.0.0.1'
        try:
            ret = socket.gethostbyname(socket.getfqdn(socket.gethostname()))
        except:
            pass
        if ret == '127.0.0.1':
            try:
                import fcntl
                s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
                ret = socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', ifname[:15]))[20:24])
            except:
                pass
        return ret
项目:zabbix_manager    作者:BillWang139967    | 项目源码 | 文件源码
def terminal_size():
    """Get the width and height of the terminal.

    http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/
    http://stackoverflow.com/questions/17993814/why-the-irrelevant-code-made-a-difference

    :return: Width (number of characters) and height (number of lines) of the terminal.
    :rtype: tuple
    """
    if hasattr(ctypes, 'windll'):
        # Only works on Microsoft Windows platforms.
        string_buffer = ctypes.create_string_buffer(22)  # To be written to by GetConsoleScreenBufferInfo.
        ctypes.windll.kernel32.GetConsoleScreenBufferInfo(ctypes.windll.kernel32.GetStdHandle(-11), string_buffer)
        left, top, right, bottom = struct.unpack('hhhhHhhhhhh', string_buffer.raw)[5:-2]
        width, height = right - left, bottom - top
        if width < 1 or height < 1:
            return DEFAULT_WIDTH, DEFAULT_HEIGHT
        return width, height

    try:
        device = fcntl.ioctl(0, termios.TIOCGWINSZ, '\0' * 8)
    except IOError:
        return DEFAULT_WIDTH, DEFAULT_HEIGHT
    height, width = struct.unpack('hhhh', device)[:2]
    return width, height
项目:WeatherPi    作者:LoveBootCaptain    | 项目源码 | 文件源码
def read_word_data(self, addr, cmd):
        """Read a word (2 bytes) from the specified cmd register of the device.
        Note that this will interpret data using the endianness of the processor
        running Python (typically little endian)!
        """
        assert self._device is not None, 'Bus must be opened before operations are made against it!'
        # Build ctypes values to marshall between ioctl and Python.
        reg = c_uint8(cmd)
        result = c_uint16()
        # Build ioctl request.
        request = make_i2c_rdwr_data([
            (addr, 0, 1, pointer(reg)),  # Write cmd register.
            (addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8)))  # Read word (2 bytes).
        ])
        # Make ioctl call and return result data.
        ioctl(self._device.fileno(), I2C_RDWR, request)
        return result.value
项目:WeatherPi    作者:LoveBootCaptain    | 项目源码 | 文件源码
def read_i2c_block_data(self, addr, cmd, length=32):
        """Perform a read from the specified cmd register of device.  Length number
        of bytes (default of 32) will be read and returned as a bytearray.
        """
        assert self._device is not None, 'Bus must be opened before operations are made against it!'
        # Build ctypes values to marshall between ioctl and Python.
        reg = c_uint8(cmd)
        result = create_string_buffer(length)
        # Build ioctl request.
        request = make_i2c_rdwr_data([
            (addr, 0, 1, pointer(reg)),  # Write cmd register.
            (addr, I2C_M_RD, length, cast(result, POINTER(c_uint8)))  # Read data.
        ])
        # Make ioctl call and return result data.
        ioctl(self._device.fileno(), I2C_RDWR, request)
        return bytearray(result.raw)  # Use .raw instead of .value which will stop at a null byte!
项目:WeatherPi    作者:LoveBootCaptain    | 项目源码 | 文件源码
def process_call(self, addr, cmd, val):
        """Perform a smbus process call by writing a word (2 byte) value to
        the specified register of the device, and then reading a word of response
        data (which is returned).
        """
        assert self._device is not None, 'Bus must be opened before operations are made against it!'
        # Build ctypes values to marshall between ioctl and Python.
        data = create_string_buffer(struct.pack('=BH', cmd, val))
        result = c_uint16()
        # Build ioctl request.
        request = make_i2c_rdwr_data([
            (addr, 0, 3, cast(pointer(data), POINTER(c_uint8))),  # Write data.
            (addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8)))  # Read word (2 bytes).
        ])
        # Make ioctl call and return result data.
        ioctl(self._device.fileno(), I2C_RDWR, request)
        # Note the python-smbus code appears to have a rather serious bug and
        # does not return the result value!  This is fixed below by returning it.
        return result.value
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _printProgessBar(self, f, startTime):
        diff = time.time() - startTime
        total = f.total
        try:
            winSize = struct.unpack('4H', 
                fcntl.ioctl(0, tty.TIOCGWINSZ, '12345679'))
        except IOError:
            winSize = [None, 80]
        speed = total/diff
        if speed:
            timeLeft = (f.size - total) / speed
        else:
            timeLeft = 0
        front = f.name
        back = '%3i%% %s %sps %s ' % ((total/f.size)*100, self._abbrevSize(total),
                self._abbrevSize(total/diff), self._abbrevTime(timeLeft))
        spaces = (winSize[1] - (len(front) + len(back) + 1)) * ' '
        self.transport.write('\r%s%s%s' % (front, spaces, back))
项目:enigma2    作者:OpenLD    | 项目源码 | 文件源码
def getFPVersion():
    ret = None
    try:
        if getBrandOEM() == "blackbox":
            file = open("/proc/stb/info/micomver", "r")
            ret = file.readline().strip()
            file.close()
        elif getBoxType() in ('dm7080','dm820','dm520','dm525','dm900'):
            ret = open("/proc/stb/fp/version", "r").read()
        else:
            ret = long(open("/proc/stb/fp/version", "r").read())
    except IOError:
        try:
            fp = open("/dev/dbox/fp0")
            ret = ioctl(fp.fileno(),0)
        except IOError:
            try:
                ret = open("/sys/firmware/devicetree/base/bolt/tag", "r").read().rstrip("\0")
            except:
                print "getFPVersion failed!"
    return ret
项目:pydrm    作者:notro    | 项目源码 | 文件源码
def get(self):
        arg = DrmModeObjGetPropertiesC()
        arg.obj_id = self.obj_id
        arg.obj_type = self.obj_type

        fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_OBJ_GETPROPERTIES, arg)

        prop_ids = (ctypes.c_uint32*arg.count_props)()
        arg.props_ptr = ctypes.cast(ctypes.pointer(prop_ids), ctypes.c_void_p).value

        prop_values = (ctypes.c_uint64*arg.count_props)()
        arg.prop_values_ptr = ctypes.cast(ctypes.pointer(prop_values), ctypes.c_void_p).value

        fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_OBJ_GETPROPERTIES, arg)
        self._arg = arg

        vals = [v for i, v in zip(prop_ids, prop_values) if i == self.id]

        return vals[0]
项目:pydrm    作者:notro    | 项目源码 | 文件源码
def fetch(self):
        arg = DrmModeCrtcC()
        arg.crtc_id = self.id

        fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_GETCRTC, arg)

        self._arg = arg
        if arg.fb_id:
            self.fb = self._drm.get_framebuffer(arg.fb_id)
        else:
            self.fb = None
        self.x = int(arg.x)
        self.y = int(arg.y)
        self.gamma_size = int(arg.gamma_size)
        self.mode_valid = bool(arg.mode_valid)
        if self.mode_valid:
            self.mode = DrmMode(arg.mode)
            self.width = self.mode.hdisplay
            self.height = self.mode.vdisplay
        else:
            self.mode = None
            self.width = 0
            self.height = 0
项目:pydrm    作者:notro    | 项目源码 | 文件源码
def set(self, fb, x, y, mode, *conns):
        arg = DrmModeCrtcC()
        arg.crtc_id = self.id
        arg.fb_id = fb.id
        arg.x = x
        arg.y = y
        arg.mode_valid = 1
        arg.mode = mode._arg

        connector_ids = (ctypes.c_uint32 * len(conns))(*[conn.id for conn in conns])
        arg.set_connectors_ptr = ctypes.cast(ctypes.pointer(connector_ids), ctypes.c_void_p).value
        arg.count_connectors = len(conns)

        fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_SETCRTC, arg)

        self.fetch()
项目:pydrm    作者:notro    | 项目源码 | 文件源码
def __init__(self, drm):
        self._drm = drm
        arg = DrmVersionC()
        arg.name = create_string_buffer(256)
        arg.name_len = 255
        arg.date = create_string_buffer(256)
        arg.date_len = 255
        arg.desc = create_string_buffer(256)
        arg.desc_len = 255

        fcntl.ioctl(self._drm.fd, DRM_IOCTL_VERSION, arg)

        self._arg = arg
        self.major = int(arg.major)
        self.minor = int(arg.minor)
        self.patchlevel = int(arg.patchlevel)
        self.name = str(arg.name[:arg.name_len])
        self.date = str(arg.date[:arg.date_len])
        self.desc = str(arg.desc[:arg.desc_len])
项目:pydrm    作者:notro    | 项目源码 | 文件源码
def prime_import(cls, drm, fd, format_, width, height):
        arg = DrmPrimeHandleC()
        arg.fd = fd
        fcntl.ioctl(drm.fd, DRM_IOCTL_PRIME_FD_TO_HANDLE, arg)

        self = cls(drm, format_, width, height)
        self._arg = arg
        self.fd = fd
        self.id = int(arg.handle)
        self.pitch = self.format.cpp[0] * width
        self.len = height * self.pitch

        self.handles[0] = self.id
        self.pitches[0] = self.pitch
        self.offsets[0] = 0

        if self.format.planes > 1:
            raise NotImplementedError("support for format %s is not implemented yet\n" % self.format.name)

        return self
项目:pydrm    作者:notro    | 项目源码 | 文件源码
def __init__(self, drm, format_, width, height):
        super(DrmDumbBuffer, self).__init__(drm, format_, width, height)

        arg = DrmModeCreateDumbC()
        arg.bpp = self.format.cpp[0] * 8;
        arg.width = self.width;
        arg.height = self.height;

        fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_CREATE_DUMB, arg)

        #self.handle = arg.handle
        self.id = int(arg.handle)
        self.len = int(arg.size)
        self.pitch = int(arg.pitch)
        self._arg = arg

        self.handles[0] = self.id
        self.pitches[0] = self.pitch
        self.offsets[0] = 0

        if self.format.planes > 1:
            raise NotImplementedError("support for format %s is not implemented yet\n" % self.format.name)
项目:smbus2    作者:kplindegaard    | 项目源码 | 文件源码
def read_i2c_block_data(self, i2c_addr, register, length):
        # type: (int, int, int) -> list
        """
        Read a block of byte data from a given register
        :rtype: list
        :param i2c_addr: i2c address
        :param register: Start register
        :param length: Desired block length
        :return: List of bytes
        """
        if length > I2C_SMBUS_BLOCK_MAX:
            raise ValueError("Desired block length over %d bytes" % I2C_SMBUS_BLOCK_MAX)
        self._set_address(i2c_addr)
        msg = i2c_smbus_ioctl_data.create(
            read_write=I2C_SMBUS_READ, command=register, size=I2C_SMBUS_I2C_BLOCK_DATA
        )
        msg.data.contents.byte = length
        ioctl(self.fd, I2C_SMBUS, msg)
        return msg.data.contents.block[1:length+1]
项目:smbus2    作者:kplindegaard    | 项目源码 | 文件源码
def write_i2c_block_data(self, i2c_addr, register, data):
        # type: (int, int, list) -> None
        """
        Write a block of byte data to a given register
        :param i2c_addr: i2c address
        :param register: Start register
        :param data: List of bytes
        """
        length = len(data)
        if length > I2C_SMBUS_BLOCK_MAX:
            raise ValueError("Data length cannot exceed %d bytes" % I2C_SMBUS_BLOCK_MAX)
        self._set_address(i2c_addr)
        msg = i2c_smbus_ioctl_data.create(
            read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_I2C_BLOCK_DATA
        )
        msg.data.contents.block[0] = length
        msg.data.contents.block[1:length + 1] = data
        ioctl(self.fd, I2C_SMBUS, msg)
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def _set_rs485_mode(self, rs485_settings):
            buf = array.array('i', [0] * 8)  # flags, delaytx, delayrx, padding
            try:
                fcntl.ioctl(self.fd, TIOCGRS485, buf)
                if rs485_settings is not None:
                    if rs485_settings.loopback:
                        buf[0] |= SER_RS485_RX_DURING_TX
                    else:
                        buf[0] &= ~SER_RS485_RX_DURING_TX
                    if rs485_settings.rts_level_for_tx:
                        buf[0] |= SER_RS485_RTS_ON_SEND
                    else:
                        buf[0] &= ~SER_RS485_RTS_ON_SEND
                    if rs485_settings.rts_level_for_rx:
                        buf[0] |= SER_RS485_RTS_AFTER_SEND
                    else:
                        buf[0] &= ~SER_RS485_RTS_AFTER_SEND
                    buf[1] = int(rs485_settings.delay_rts_before_send * 1000)
                    buf[2] = int(rs485_settings.delay_rts_after_send * 1000)
                else:
                    buf[0] = 0  # clear SER_RS485_ENABLED
                fcntl.ioctl(self.fd, TIOCSRS485, buf)
            except IOError as e:
                raise ValueError('Failed to set RS485 mode: %s' % (e,))
项目:patiencebar    作者:ceyzeriat    | 项目源码 | 文件源码
def _get_terminal_size():
    def ioctl_GWINSZ(fd):
        try:
            import fcntl, termi_os, struct
            cr = struct.unpack('hh', fcntl.ioctl(fd, termi_os.TIOCGWINSZ,'1234'))
        except:
            return
        return cr
    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
    if not cr:
        try:
            fd = _os.open(_os.ctermid(), _os.O_RDONLY)
            cr = ioctl_GWINSZ(fd)
            _os.cl_ose(fd)
        except:
            pass
    if not cr: cr = (_os.environ.get('LINES', 25), _os.environ.get('COLUMNS', 80))
    return list(map(int, cr))
项目:mouse    作者:boppreh    | 项目源码 | 文件源码
def make_uinput():
    import fcntl, struct

    # Requires uinput driver, but it's usually available.
    uinput = open("/dev/uinput", 'wb')
    UI_SET_EVBIT = 0x40045564
    fcntl.ioctl(uinput, UI_SET_EVBIT, EV_KEY)

    UI_SET_KEYBIT = 0x40045565
    for i in range(256):
        fcntl.ioctl(uinput, UI_SET_KEYBIT, i)

    BUS_USB = 0x03
    uinput_user_dev = "80sHHHHi64i64i64i64i"
    axis = [0] * 64 * 4
    uinput.write(struct.pack(uinput_user_dev, b"Virtual Keyboard", BUS_USB, 1, 1, 1, 0, *axis))
    uinput.flush() # Without this you may get Errno 22: Invalid argument.

    UI_DEV_CREATE = 0x5501
    fcntl.ioctl(uinput, UI_DEV_CREATE)
    UI_DEV_DESTROY = 0x5502
    #fcntl.ioctl(uinput, UI_DEV_DESTROY)

    return uinput
项目:PGPool    作者:sLoPPydrive    | 项目源码 | 文件源码
def _get_terminal_size_linux():
    def ioctl_GWINSZ(fd):
        try:
            import fcntl
            import termios
            cr = struct.unpack('hh',
                               fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
            return cr
        except:
            pass
    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
    if not cr:
        try:
            fd = os.open(os.ctermid(), os.O_RDONLY)
            cr = ioctl_GWINSZ(fd)
            os.close(fd)
        except:
            pass
    if not cr:
        try:
            cr = (os.environ['LINES'], os.environ['COLUMNS'])
        except:
            return None
    return int(cr[1]), int(cr[0])
项目:pyrepl    作者:dajose    | 项目源码 | 文件源码
def getpending(self):
            e = Event('key', '', '')

            while not self.event_queue.empty():
                e2 = self.event_queue.get()
                e.data += e2.data
                e.raw += e.raw

            amount = struct.unpack(
                "i", ioctl(self.input_fd, FIONREAD, "\0\0\0\0"))[0]
            data = os.read(self.input_fd, amount)
            raw = unicode(data, self.encoding, 'replace')
            #XXX: something is wrong here
            e.data += raw
            e.raw += raw
            return e
项目:office-interoperability-tools    作者:milossramek    | 项目源码 | 文件源码
def GetHelpWidth():
  """Returns: an integer, the width of help lines that is used in TextWrap."""
  if (not sys.stdout.isatty()) or (termios is None) or (fcntl is None):
    return _help_width
  try:
    data = fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, '1234')
    columns = struct.unpack('hh', data)[1]
    # Emacs mode returns 0.
    # Here we assume that any value below 40 is unreasonable
    if columns >= 40:
      return columns
    # Returning an int as default is fine, int(int) just return the int.
    return int(os.getenv('COLUMNS', _help_width))

  except (TypeError, IOError, struct.error):
    return _help_width
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def _set_rs485_mode(self, rs485_settings):
            buf = array.array('i', [0] * 8)  # flags, delaytx, delayrx, padding
            try:
                fcntl.ioctl(self.fd, TIOCGRS485, buf)
                if rs485_settings is not None:
                    if rs485_settings.loopback:
                        buf[0] |= SER_RS485_RX_DURING_TX
                    else:
                        buf[0] &= ~SER_RS485_RX_DURING_TX
                    if rs485_settings.rts_level_for_tx:
                        buf[0] |= SER_RS485_RTS_ON_SEND
                    else:
                        buf[0] &= ~SER_RS485_RTS_ON_SEND
                    if rs485_settings.rts_level_for_rx:
                        buf[0] |= SER_RS485_RTS_AFTER_SEND
                    else:
                        buf[0] &= ~SER_RS485_RTS_AFTER_SEND
                    buf[1] = int(rs485_settings.delay_rts_before_send * 1000)
                    buf[2] = int(rs485_settings.delay_rts_after_send * 1000)
                else:
                    buf[0] = 0  # clear SER_RS485_ENABLED
                fcntl.ioctl(self.fd, TIOCSRS485, buf)
            except IOError as e:
                raise ValueError('Failed to set RS485 mode: %s' % (e,))
项目:style50    作者:cs50    | 项目源码 | 文件源码
def get_terminal_size(fallback=(80, 24)):
    """
    Return tuple containing columns and rows of controlling terminal, trying harder
    than shutil.get_terminal_size to find a tty before returning fallback.

    Theoretically, stdout, stderr, and stdin could all be different ttys that could
    cause us to get the wrong measurements (instead of using the fallback) but the much more
    common case is that IO is piped.
    """
    for stream in [sys.__stdout__, sys.__stderr__, sys.__stdin__]:
        try:
            # Make WINSIZE call to terminal
            data = fcntl.ioctl(stream.fileno(), TIOCGWINSZ, b"\x00\x00\00\x00")
        except (IOError, OSError):
            pass
        else:
            # Unpack two shorts from ioctl call
            lines, columns = struct.unpack("hh", data)
            break
    else:
        columns, lines = fallback

    return columns, lines
项目:openwrt-scripts    作者:nijel    | 项目源码 | 文件源码
def readMetadata(self):
        """Read device identity and capabilities via ioctl()"""
        buffer = "\0"*512

        # Read the name
        self.name = ioctl(self.fd, EVIOCGNAME_512, buffer)
        self.name = self.name[:self.name.find("\0")]

        # Read info on each absolute axis
        absmap = Event.codeMaps['EV_ABS']
        buffer = "\0" * struct.calcsize("iiiii")
        self.absAxisInfo = {}
        for name, number in absmap.nameMap.iteritems():
            try:
                values = struct.unpack("iiiii", ioctl(self.fd, EVIOCGABS_512 + number, buffer))
                values = dict(zip(( 'value', 'min', 'max', 'fuzz', 'flat' ),values))
                self.absAxisInfo[name] = values
            except IOError:
                continue
项目:microbit-gateway    作者:whaleygeek    | 项目源码 | 文件源码
def set_special_baudrate(port, baudrate):
        # right size is 44 on x86_64, allow for some growth
        import array
        buf = array.array('i', [0] * 64)

        try:
            # get serial_struct
            FCNTL.ioctl(port.fd, TCGETS2, buf)
            # set custom speed
            buf[2] &= ~TERMIOS.CBAUD
            buf[2] |= BOTHER
            buf[9] = buf[10] = baudrate

            # set serial_struct
            res = FCNTL.ioctl(port.fd, TCSETS2, buf)
        except IOError, e:
            raise ValueError('Failed to set custom baud rate (%s): %s' % (baudrate, e))
项目:rpi3-webiopi    作者:thortex    | 项目源码 | 文件源码
def xfer(self, txbuff=None):
        length = len(txbuff)
        if PYTHON_MAJOR >= 3:
            _txbuff = bytes(txbuff)
            _txptr = ctypes.create_string_buffer(_txbuff)
        else:
            _txbuff = str(bytearray(txbuff))
            _txptr = ctypes.create_string_buffer(_txbuff)
        _rxptr = ctypes.create_string_buffer(length)

        data = struct.pack("QQLLHBBL",  #64 64 32 32 16 8 8 32 b = 32B
                    ctypes.addressof(_txptr),
                    ctypes.addressof(_rxptr),
                    length,
                    self.speed,
                    0, #delay
                    self.bits,
                    0, # cs_change,
                    0  # pad
                    )

        fcntl.ioctl(self.fd, SPI_IOC_MESSAGE(len(data)), data)
        _rxbuff = ctypes.string_at(_rxptr, length)
        return bytearray(_rxbuff)
项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def _select_device(self, addr):
        """Set the address of the device to communicate with on the I2C bus."""
        ioctl(self._device.fileno(), I2C_SLAVE, addr & 0x7F)
项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def read_byte_data(self, addr, cmd):
        """Read a single byte from the specified cmd register of the device."""
        assert self._device is not None, 'Bus must be opened before operations are made against it!'
        # Build ctypes values to marshall between ioctl and Python.
        reg = c_uint8(cmd)
        result = c_uint8()
        # Build ioctl request.
        request = make_i2c_rdwr_data([
            (addr, 0, 1, pointer(reg)),             # Write cmd register.
            (addr, I2C_M_RD, 1, pointer(result))    # Read 1 byte as result.
        ])
        # Make ioctl call and return result data.
        ioctl(self._device.fileno(), I2C_RDWR, request)
        return result.value
项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def read_block_data(self, addr, cmd):
        """Perform a block read from the specified cmd register of the device.
        The amount of data read is determined by the first byte send back by
        the device.  Data is returned as a bytearray.
        """
        # TODO: Unfortunately this will require calling the low level I2C
        # access ioctl to trigger a proper read_block_data.  The amount of data
        # returned isn't known until the device starts responding so an I2C_RDWR
        # ioctl won't work.
        raise NotImplementedError()
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def get_terminal_size():
    """Returns a tuple (x, y) representing the width(x) and the height(x)
    in characters of the terminal window."""
    def ioctl_GWINSZ(fd):
        try:
            import fcntl
            import termios
            import struct
            cr = struct.unpack(
                'hh',
                fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')
            )
        except:
            return None
        if cr == (0, 0):
            return None
        return cr
    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
    if not cr:
        try:
            fd = os.open(os.ctermid(), os.O_RDONLY)
            cr = ioctl_GWINSZ(fd)
            os.close(fd)
        except:
            pass
    if not cr:
        cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
    return int(cr[1]), int(cr[0])
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def get_terminal_size():
    """Returns a tuple (x, y) representing the width(x) and the height(x)
    in characters of the terminal window."""
    def ioctl_GWINSZ(fd):
        try:
            import fcntl
            import termios
            import struct
            cr = struct.unpack(
                'hh',
                fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')
            )
        except:
            return None
        if cr == (0, 0):
            return None
        return cr
    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
    if not cr:
        try:
            fd = os.open(os.ctermid(), os.O_RDONLY)
            cr = ioctl_GWINSZ(fd)
            os.close(fd)
        except:
            pass
    if not cr:
        cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
    return int(cr[1]), int(cr[0])
项目:python-rst2ansi    作者:Snaipe    | 项目源码 | 文件源码
def _get_terminal_size(fd):
        try:
            res = fcntl.ioctl(fd, termios.TIOCGWINSZ, b"\x00" * 4)
        except IOError as e:
            raise OSError(e)
        lines, columns = struct.unpack("hh", res)

        return terminal_size(columns, lines)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def get_terminal_size():
    """Returns a tuple (x, y) representing the width(x) and the height(x)
    in characters of the terminal window."""
    def ioctl_GWINSZ(fd):
        try:
            import fcntl
            import termios
            import struct
            cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
        '1234'))
        except:
            return None
        if cr == (0, 0):
            return None
        if cr == (0, 0):
            return None
        return cr
    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
    if not cr:
        try:
            fd = os.open(os.ctermid(), os.O_RDONLY)
            cr = ioctl_GWINSZ(fd)
            os.close(fd)
        except:
            pass
    if not cr:
        cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
    return int(cr[1]), int(cr[0])
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def get_terminal_size():
    """Returns a tuple (x, y) representing the width(x) and the height(x)
    in characters of the terminal window."""
    def ioctl_GWINSZ(fd):
        try:
            import fcntl
            import termios
            import struct
            cr = struct.unpack(
                'hh',
                fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')
            )
        except:
            return None
        if cr == (0, 0):
            return None
        return cr
    cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
    if not cr:
        try:
            fd = os.open(os.ctermid(), os.O_RDONLY)
            cr = ioctl_GWINSZ(fd)
            os.close(fd)
        except:
            pass
    if not cr:
        cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
    return int(cr[1]), int(cr[0])
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def ioctl(self, fd, ioctlno):
    self.pack()
    rv = fcntl.ioctl(fd, ioctlno, self._buffer, True)
    self.unpack()
    return rv
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def get_version(self, fd): return self.ioctl(fd, HIDIOCGVERSION)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def get_flags(self, fd): return self.ioctl(fd, HIDIOCGFLAG)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def set_flags(self, fd): return self.ioctl(fd, HIDIOCSFLAG)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def get(self, fd): return self.ioctl(fd, HIDIOCGDEVINFO)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def get_info(self, fd): return self.ioctl(fd, HIDIOCGREPORTINFO)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def get_info(self, fd): return self.ioctl(fd, HIDIOCGFIELDINFO)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def get_info(self, fd, index):
    self.index = index
    return self.ioctl(fd, HIDIOCGCOLLECTIONINFO)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def get_device_name(f):
  a = array.array('B', [0]*1024)
  fcntl.ioctl(f, HIDIOCGNAME(1024), a, True)
  print a