我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用asyncio.DatagramTransport()。
def test_unicast(): """test_unicast""" mock_transport = DatagramTransport() mock_transport.sendto = MagicMock() mock_protocol = protocol.AqaraProtocol() mock_protocol.transport = mock_transport test_data = {"cmd": "read"} test_addr = "10.10.10.10" mock_protocol.unicast(test_addr, test_data) test_data_encoded = json.dumps(test_data).encode('utf-8') mock_transport.sendto.assert_called_with(test_data_encoded, (test_addr, GATEWAY_PORT))
def test_broadcast(): """test_broadcast""" mock_transport = DatagramTransport() mock_transport.sendto = MagicMock() mock_protocol = protocol.AqaraProtocol() mock_protocol.transport = mock_transport test_data = {"cmd": "read"} mock_protocol.broadcast(test_data) test_data_encoded = json.dumps(test_data).encode('utf-8') mock_transport.sendto.assert_called_with(test_data_encoded, (MCAST_ADDR, MCAST_PORT))
def test_dgram_not_implemented(self): transport = asyncio.DatagramTransport() self.assertRaises(NotImplementedError, transport.sendto, 'data') self.assertRaises(NotImplementedError, transport.abort)
def connection_made(self, transport: asyncio.DatagramTransport): pass