Python socket 模块,BTPROTO_RFCOMM 实例源码

我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用socket.BTPROTO_RFCOMM

项目:racercat-logger    作者:theduckylittle    | 项目源码 | 文件源码
def __init__(self, addr):

        connected = False
        while(not connected):
            try:
                self.sock = socket.socket(socket.AF_BLUETOOTH, 
                                socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
                self.sock.connect((addr, 1))
                connected = True
            except KeyboardInterrupt:
                sys.exit(0)
            except Exception as e:
                # TODO: Move to the logging library
                logging.error("Exception: "+str(e))
                logging.info("Could not connect. Waiting 10s")
                time.sleep(10)


        # set the timeout to 5 seconds.
        self.sock.settimeout(5)

    ## Read data from the GPS feed
    #
    #  @returns Data read from the GPS
项目:nOBEX    作者:nccgroup    | 项目源码 | 文件源码
def BluetoothSocket():
    return socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM,
            socket.BTPROTO_RFCOMM)
项目:ev3-python3    作者:ChristophGaukel    | 项目源码 | 文件源码
def _connect_bluetooth(self, host: str) -> int:
        """
        Create a socket, that holds a bluetooth-connection to an EV3
        """
        self._socket = socket.socket(socket.AF_BLUETOOTH,
                                     socket.SOCK_STREAM,
                                     socket.BTPROTO_RFCOMM)
        self._socket.connect((host, 1))
项目:BlueDot    作者:martinohanlon    | 项目源码 | 文件源码
def start(self):
        """
        Starts the Bluetooth server if its not already running. The server needs to be started before
        connections can be made.
        """
        if not self._running:

            if self._power_up_device:
                self.adapter.powered = True

            if not self.adapter.powered:
                raise Exception("Bluetooth device {} is turned off".format(self.adapter.device))

            #register the serial port profile with Bluetooth
            register_spp(self._port)

            #start Bluetooth server
            #open the Bluetooth socket
            self._server_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
            self._server_sock.settimeout(BLUETOOTH_TIMEOUT)
            try:
                self._server_sock.bind((self.server_address, self.port))
            except (socket.error, OSError) as e:
                if e.errno == errno.EADDRINUSE:
                    print("Bluetooth address {} is already in use - is the server already running?".format(self.server_address))
                raise e
            self._server_sock.listen(1)

            #wait for client connection
            self._conn_thread = WrapThread(target=self._wait_for_connection)
            self._conn_thread.start()

            self._running = True
项目:BlueDot    作者:martinohanlon    | 项目源码 | 文件源码
def connect(self):
        """
        Connect to a Bluetooth server.
        """
        if not self._connected:

            if self._power_up_device:
                self.adapter.powered = True

            if not self.adapter.powered:
                raise Exception("Bluetooth device {} is turned off".format(self.adapter.device))

            #try and find the server name or MAC address in the paired devices list
            server_mac = None
            for device in self.adapter.paired_devices:
                if self._server == device[0] or self._server == device[1]:
                    server_mac = device[0]
                    break
            if server_mac == None:
                raise Exception("Server {} not found in paired devices".format(self._server))

            #create a socket
            self._client_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
            self._client_sock.bind((self.adapter.address, self._port))
            self._client_sock.connect((server_mac, self._port))

            self._connected = True

            self._conn_thread = WrapThread(target=self._read)
            self._conn_thread.start()
项目:openadms-node    作者:dabamos    | 项目源码 | 文件源码
def _open(self) -> None:
        """Opens a Bluetooth socket connection to a sensor."""
        if not self._server_mac_address:
            self.logger.error('MAC address of server not set')
            return

        try:
            self._sock = socket.socket(socket.AF_BLUETOOTH,
                                       socket.SOCK_STREAM,
                                       socket.BTPROTO_RFCOMM)
            self._sock.connect((self._server_mac_address, self._port))
        except OSError as e:
            self.logger.error(e)