我正在尝试使用Spring批处理并实现一个聚合的读取器(批处理文件,在写入时多个记录应被视为一个记录)。这是我的读者的代码片段:
public class AggregatePeekableReader implements ItemReader<List<T>>, ItemStream { private SingleItemPeekableItemReader<T> reader; private boolean process(T currentRecord , InvoiceLineItemsHolder holder) throws UnexpectedInputException, ParseException, Exception { next = peekNextInvoiceRecord(); // finish processing if we hit the end of file if (currentRecord == null ) { LOG.info("Exhausted ItemReader ( END OF FILE)"); holder.exhausted = true; return false; } if ( currentRecord.hasSameInvoiceNumberAndVendorNumber(next)){ LOG.info("Found new line item to current invocie record"); holder.records.add(currentRecord); currentRecord = null; return true; }else{ holder.records.add(currentRecord); return false; } } private T getNextInvoiceRecord () { T record=null; try { record=reader.read(); } catch (UnexpectedInputException e) { ALERT.error(LogMessageFormatter.format(Severity.HIGH, BATCH_FILE_READ_EXCEPTION, e), e); throw e; } catch (ParseException e) { ALERT.error(LogMessageFormatter.format(Severity.HIGH, BATCH_FILE_READ_EXCEPTION, e), e); throw e; } catch (Exception e) { ALERT.error(LogMessageFormatter.format(Severity.HIGH, BATCH_FILE_READ_EXCEPTION, e), e); } return record; } private T peekNextInvoiceRecord() { T next=null; try { next=reader.peek(); } catch (UnexpectedInputException e) { ALERT.error(LogMessageFormatter.format(Severity.HIGH, BATCH_FILE_READ_EXCEPTION, e), e); throw e; } catch (ParseException e) { ALERT.error(LogMessageFormatter.format(Severity.HIGH, BATCH_FILE_READ_EXCEPTION, e), e); throw e; } catch (Exception e) { ALERT.error(LogMessageFormatter.format(Severity.HIGH, BATCH_FILE_READ_EXCEPTION, e), e); } return next; } public void close () { reader.close(); } public SingleItemPeekableItemReader<T> getReader() { return reader; } public void setReader(SingleItemPeekableItemReader<T> reader) { this.reader = reader; } private class InvoiceLineItemsHolder { List<T> records = new ArrayList<T>(); boolean exhausted = false; } @Override public void open(ExecutionContext executionContext) throws ItemStreamException { // reader.open(executionContext); } @Override public void update(ExecutionContext executionContext) throws ItemStreamException { // TODO } @Override public List<T> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { CLASS holder = new SOMECLASS() synchronized (this) { while (process(getNextInvoiceRecord(), holder)) { continue; } if (!holder.exhausted) { return holder.records; } else { //When you hit the end of the file,close the reader. close(); return null; } } }
}
上面是一个实现可窥视阅读器的工作示例,它窥视下一行(不读取它)并确定是否达到逻辑行尾(有时多行可以组成一个事务)
您需要ItemStream为阅读器实现接口。这将提示Spring Batch,您的读者需要一些操作才能打开/关闭流:
ItemStream
public class InvoiceLineItemAggregatePeekableReader extends AbstractItemStreamItemReader<List<SAPInvoicePaymentRecord>> { @Override public void close() { ... } }
在步骤执行过程中发生的任何错误都会关闭流。有关更多示例,请检查Spring Batch本身的类(例如FlatFileItemReader)。
FlatFileItemReader