tangguo

线程中断未结束对输入流的读取的阻塞调用

java

我正在使用RXTX从串行端口读取数据。读取是在以下列方式产生的线程中完成的:

CommPortIdentifier portIdentifier = CommPortIdentifier.getPortIdentifier(port);
CommPort comm = portIdentifier.open("Whatever", 2000);
SerialPort serial = (SerialPort)comm;
...settings
Thread t = new Thread(new SerialReader(serial.getInputStream()));
t.start();

SerialReader类实现Runnable并无限期地循环,从端口读取并将数据构造为有用的程序包,然后再将其发送给其他应用程序。但是,我将其简化为以下简单性:

public void run() {
  ReadableByteChannel byteChan = Channels.newChannel(in); //in = InputStream passed to SerialReader
  ByteBuffer buffer = ByteBuffer.allocate(100);
  while (true) {
    try {
      byteChan.read(buffer);
    } catch (Exception e) {
      System.out.println(e);
    }
  }
}

当用户单击停止按钮时,将触发以下功能,从理论上讲,应关闭输入流并突破阻塞的byteChan.read(buffer)调用。代码如下:

public void stop() {
  t.interrupt();
  serial.close();
}

但是,当我运行这段代码时,我永远不会收到ClosedByInterruptException,一旦输入流关闭,就应该触发该异常。此外,执行会阻塞对serial.close()的调用-因为基础输入流仍在读取调用上受阻。我试过用byteChan.close()替换中断调用,这会导致AsynchronousCloseException,但是,我得到了相同的结果。

我所缺少的任何帮助将不胜感激。


阅读 481

收藏
2020-10-16

共2个答案

小编典典

RXTX SerialInputStream(serial.getInputStream()调用返回的结果)支持一种超时方案,该方案最终解决了我所有的问题。在创建新的SerialReader对象之前添加以下内容将导致读取不再无限期地阻塞:

serial.enableReceiveTimeout(1000);

在SerialReader对象中,我不得不更改一些内容以直接从InputStream读取,而不是创建ReadableByteChannel,但是现在,我可以停止并重新启动读取器而不会出现问题。

2020-10-16
小编典典

您不能InterruptibleChannel仅通过包装就将不支持可中断I / O的流打包(并且无论如何ReadableByteChannel都不会扩展InterruptibleChannel)

您必须查看基础合约InputStream。对于SerialPort.getInputStream()结果的可中断性怎么说?如果没有说什么,则应假定它忽略了中断。

对于任何不明确支持可中断性的I / O,唯一的选择通常是关闭来自另一个线程的流。这可能会立即在调用流时阻塞的线程中引发IOException(尽管可能不是AsynchronousCloseException)。

但是,即使这也非常依赖于-的实现,InputStream并且底层OS也可能是一个因素。

注意对以下代码ReadableByteChannelImpl返回的类的源代码注释newChannel()

  private static class ReadableByteChannelImpl
    extends AbstractInterruptibleChannel       // Not really interruptible
    implements ReadableByteChannel
  {
    InputStream in;
    ⋮
2020-10-16