@Override public void map(AvroKey<DocumentContent> key, NullWritable ignore, Context context) throws IOException, InterruptedException { DocumentContent content = key.datum(); String documentId = content.getId().toString(); if (excludedIds.contains(documentId)) { log.info("skipping processing for excluded id " + documentId); return; } if (content.getPdf()!=null) { ByteBuffer byteBuffer = content.getPdf(); if (byteBuffer.hasArray() && contentApprover.approve(byteBuffer.array())) { try (InputStream inputStream = new ByteBufferInputStream(byteBuffer)) { processStream(documentId, inputStream); } } else { log.info(invalidPdfHeaderMsg); handleException(new InvalidPdfException(invalidPdfHeaderMsg), content.getId().toString()); } } else { log.warn("no byte data found for id: " + content.getId()); } }
@Override protected PdfObject readPRObject() throws IOException { try { return super.readPRObject(); } catch (InvalidPdfException e) { LOGGER.error(String.format("While reading a PdfObject ignored an InvalidPdfException (%s); returning PdfNull.", e.getMessage()), e); return PdfNull.PDFNULL; } }
/** * Processes content input stream. Does not close contentStream. * * @param documentId document identifier * @param contentStream stream to be processed */ protected void processStream(String documentId, InputStream contentStream) throws IOException, InterruptedException { currentProgress++; if (currentProgress % PROGRESS_LOG_INTERVAL == 0) { log.info("metadata extaction progress: " + currentProgress + ", time taken to process " + PROGRESS_LOG_INTERVAL + " elements: " + ((System.currentTimeMillis() - intervalTime) / 1000) + " secs"); intervalTime = System.currentTimeMillis(); } log.info("starting processing for id: " + documentId); long startTime = System.currentTimeMillis(); try { ContentExtractor extractor = interruptionTimeoutSecs != null ? new ContentExtractor(interruptionTimeoutSecs) : new ContentExtractor(); extractor.setPDF(contentStream); handleContent(extractor, documentId); } catch (Exception e) { log.error((e.getCause() instanceof InvalidPdfException) ? "Invalid PDF file" : "got unexpected exception, just logging", e); handleException(e, documentId); return; } handleProcessingTime(System.currentTimeMillis() - startTime, documentId); }
@Test public void testMapWithIvalidPdf() throws Exception { // given Configuration conf = new Configuration(); conf.set(NAMED_OUTPUT_META, "meta"); conf.set(NAMED_OUTPUT_FAULT, "fault"); doReturn(conf).when(context).getConfiguration(); doReturn(invalidPdfCounter).when(context).getCounter(INVALID_PDF_HEADER); mapper.setup(context); String id = "id"; DocumentContent.Builder docContentBuilder = DocumentContent.newBuilder(); docContentBuilder.setId(id); docContentBuilder.setPdf(ByteBuffer.wrap(getContent(NON_PDF_FILE))); // execute mapper.map(new AvroKey<>(docContentBuilder.build()), null, context); // assert verify(context, never()).write(any(), any()); verify(multipleOutputs, times(2)).write(mosKeyCaptor.capture(), mosValueCaptor.capture()); // doc meta assertEquals(conf.get(NAMED_OUTPUT_META), mosKeyCaptor.getAllValues().get(0)); ExtractedDocumentMetadata docMeta = (ExtractedDocumentMetadata) mosValueCaptor.getAllValues().get(0).datum(); assertNotNull(docMeta); assertEquals(id, docMeta.getId()); assertEquals("", docMeta.getText()); assertEquals(EMPTY_META, docMeta.getPublicationTypeName()); // fault assertEquals(conf.get(NAMED_OUTPUT_FAULT), mosKeyCaptor.getAllValues().get(1)); Fault fault = (Fault) mosValueCaptor.getAllValues().get(1).datum(); assertNotNull(fault); assertEquals(id, fault.getInputObjectId()); assertEquals(InvalidPdfException.class.getName(), fault.getCode()); assertTrue(fault.getTimestamp() > 0); verify(invalidPdfCounter, times(1)).increment(1); }