@Bean public Job job() { Step step = stepBuilders.get("step") .<String, String>chunk(5) .reader(reader()) .processor(processor()) .writer(writer()) .faultTolerant().retryLimit(3).skipLimit(3) .retry(OptimisticLockingFailureException.class) .retry(DeadlockLoserDataAccessException.class) .skip(DeadlockLoserDataAccessException.class) .listener(mockRetryListener()) .listener(retryListener()) .build(); return jobBuilders.get("job").start(step).build(); }
@Test public void errorCodeTranslation() { SQLExceptionTranslator sext = new SQLErrorCodeSQLExceptionTranslator(ERROR_CODES); SQLException badSqlEx = new SQLException("", "", 1); BadSqlGrammarException bsgex = (BadSqlGrammarException) sext.translate("task", "SQL", badSqlEx); assertEquals("SQL", bsgex.getSql()); assertEquals(badSqlEx, bsgex.getSQLException()); SQLException invResEx = new SQLException("", "", 4); InvalidResultSetAccessException irsex = (InvalidResultSetAccessException) sext.translate("task", "SQL", invResEx); assertEquals("SQL", irsex.getSql()); assertEquals(invResEx, irsex.getSQLException()); checkTranslation(sext, 5, DataAccessResourceFailureException.class); checkTranslation(sext, 6, DataIntegrityViolationException.class); checkTranslation(sext, 7, CannotAcquireLockException.class); checkTranslation(sext, 8, DeadlockLoserDataAccessException.class); checkTranslation(sext, 9, CannotSerializeTransactionException.class); checkTranslation(sext, 10, DuplicateKeyException.class); SQLException dupKeyEx = new SQLException("", "", 10); DataAccessException dksex = sext.translate("task", "SQL", dupKeyEx); assertTrue("Not instance of DataIntegrityViolationException", DataIntegrityViolationException.class.isAssignableFrom(dksex.getClass())); // Test fallback. We assume that no database will ever return this error code, // but 07xxx will be bad grammar picked up by the fallback SQLState translator SQLException sex = new SQLException("", "07xxx", 666666666); BadSqlGrammarException bsgex2 = (BadSqlGrammarException) sext.translate("task", "SQL2", sex); assertEquals("SQL2", bsgex2.getSql()); assertEquals(sex, bsgex2.getSQLException()); }
public void testErrorCodeTranslation() { SQLExceptionTranslator sext = new SQLErrorCodeSQLExceptionTranslator(ERROR_CODES); SQLException badSqlEx = new SQLException("", "", 1); BadSqlGrammarException bsgex = (BadSqlGrammarException) sext.translate("task", "SQL", badSqlEx); assertEquals("SQL", bsgex.getSql()); assertEquals(badSqlEx, bsgex.getSQLException()); SQLException invResEx = new SQLException("", "", 4); InvalidResultSetAccessException irsex = (InvalidResultSetAccessException) sext.translate("task", "SQL", invResEx); assertEquals("SQL", irsex.getSql()); assertEquals(invResEx, irsex.getSQLException()); checkTranslation(sext, 5, DataAccessResourceFailureException.class); checkTranslation(sext, 6, DataIntegrityViolationException.class); checkTranslation(sext, 7, CannotAcquireLockException.class); checkTranslation(sext, 8, DeadlockLoserDataAccessException.class); checkTranslation(sext, 9, CannotSerializeTransactionException.class); checkTranslation(sext, 10, DuplicateKeyException.class); SQLException dupKeyEx = new SQLException("", "", 10); DataAccessException dksex = sext.translate("task", "SQL", dupKeyEx); assertTrue("Not instance of DataIntegrityViolationException", DataIntegrityViolationException.class.isAssignableFrom(dksex.getClass())); // Test fallback. We assume that no database will ever return this error code, // but 07xxx will be bad grammar picked up by the fallback SQLState translator SQLException sex = new SQLException("", "07xxx", 666666666); BadSqlGrammarException bsgex2 = (BadSqlGrammarException) sext.translate("task", "SQL2", sex); assertEquals("SQL2", bsgex2.getSql()); assertEquals(sex, bsgex2.getSQLException()); }
@Test public void transactionalReader() throws Exception { while (jmsTemplate.receive() != null) {} int read = 12; for (int i = 1; i <= read; i++) { jmsTemplate.convertAndSend(String.valueOf(i)); } assertThat(productQueueView.getQueueSize()).isEqualTo(read); final String toFailWriting = "7"; doNothing().when(service).writing(argThat(new BaseMatcher<String>() { @Override public boolean matches(Object input) { return !toFailWriting.equals(input); } @Override public void describeTo(Description description) {} })); doThrow(new DeadlockLoserDataAccessException("", null)).when(service).writing(toFailWriting); JobExecution exec = jobLauncher.run( transactionalReaderJob, new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters() ); assertThat(exec.getExitStatus().getExitCode()).isEqualTo(ExitStatus.COMPLETED.getExitCode()); int expectedWritten = 5; int stillOnQueue = (int) productQueueView.getQueueSize(); assertRead(10, exec); assertThat(stillOnQueue).isEqualTo(read - expectedWritten); assertWrite(expectedWritten, exec); }
@Bean public Job transactionalReaderJob() { // Tx를 위해 reader 정보를 Queue에 넣습니다. Step step = stepBuilders.get("transactionalReaderStep") .<String, String>chunk(5) .readerIsTransactionalQueue() .faultTolerant().skipLimit(5).skip(DeadlockLoserDataAccessException.class) .reader(jmsReader()) .processor(processor()) .writer(writer()) .build(); return jobBuilders.get("transactionalReaderJob").start(step).build(); }
@Test public void exceptionInWritingSkippable() throws Exception { int read = 12; configureServiceForRead(service, read); final String toFailWriting = "7"; doNothing().when(service).writing(argThat(new BaseMatcher<String>() { @Override public boolean matches(Object input) { return !toFailWriting.equals(input); } @Override public void describeTo(Description desc) { } })); doThrow(new DeadlockLoserDataAccessException("", null)).when(service).writing(toFailWriting); JobExecution exec = jobLauncher.run( job, new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters() ); assertThat(exec.getExitStatus().getExitCode()).isEqualToIgnoringCase(ExitStatus.COMPLETED.getExitCode()); verify(service, times(5 + 5 + 2 + 1)).reading(); // verify(service, times(5 + 5 + (1 + 1) + 5 + 2)).processing(anyString()); verify(service, times(5 + 5 + 5 + 5 + 5 + 2)).processing(anyString()); verify(service, times(5 + 2 + 2 + 2 + 5 + 2)).writing(anyString()); assertRead(read, exec); assertWrite(read - 1, exec); assertReadSkip(0, exec); assertProcessSkip(0, exec); assertWriteSkip(1, exec); assertCommit(1 + 4 + 1, exec); assertRollback(3 + 1, exec); }
@Test public void exceptionInProcessingSkippable() throws Exception { int read = 12; configureServiceForRead(service, read); final String toFailProcessing = "7"; doNothing().when(service).processing(argThat(new BaseMatcher<String>() { @Override public boolean matches(Object input) { return !toFailProcessing.equals(input); } @Override public void describeTo(Description desc) { } })); doThrow(new DeadlockLoserDataAccessException("", null)).when(service).processing(toFailProcessing); doNothing().when(service).writing(anyString()); JobExecution exec = jobLauncher.run( job, new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters() ); assertThat(exec.getExitStatus().getExitCode()).isEqualToIgnoringCase(ExitStatus.COMPLETED.getExitCode()); verify(service, times(5 + 5 + 2 + 1)).reading(); verify(service, times(5 + 2 + 2 + 2 + 4 + 2)).processing(anyString()); verify(service, times(5 + 4 + 2)).writing(anyString()); assertRead(read, exec); assertWrite(read - 1, exec); assertReadSkip(0, exec); assertProcessSkip(1, exec); assertWriteSkip(0, exec); assertCommit(3, exec); assertRollback(3, exec); // rollback on each retry }
@Test public void exceptionInWritingNotExhausted() throws Exception { int read = 12; configureServiceForRead(service, read); final String toFailWriting = "7"; doNothing().when(service).writing(argThat(new BaseMatcher<String>() { @Override public boolean matches(Object input) { return !toFailWriting.equals(input); } @Override public void describeTo(Description desc) { } })); doThrow(new DeadlockLoserDataAccessException("", null)) .doThrow(new DeadlockLoserDataAccessException("", null)) .doNothing() .when(service).writing(toFailWriting); JobExecution exec = jobLauncher.run( job, new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters() ); assertThat(exec.getExitStatus().getExitCode()).isEqualToIgnoringCase(ExitStatus.COMPLETED.getExitCode()); verify(service, times(5 + 5 + 2 + 1)).reading(); verify(service, times(5 + 5 + 5 + 5 + 2)).processing(anyString()); verify(service, times(5 + 2 + 2 + 5 + 2)).writing(anyString()); verify(mockRetryListener, times(2)).onError(any(RetryContext.class), any(RetryCallback.class), any(Throwable.class)); assertRead(read, exec); assertWrite(read, exec); assertReadSkip(0, exec); assertProcessSkip(0, exec); assertWriteSkip(0, exec); assertCommit(3, exec); assertRollback(2, exec); }
@Test public void retryPolicy() throws Exception { int read = 12; configureServiceForRead(service, read); final String toFailProcessingConcurrency = "7"; final String toFailProcessingDeadlock = "11"; final int maxAttemptsConcurrency = 2; final int maxAttemptsDeadlock = 4; doAnswer(new Answer<Void>() { private int countConcurrency = 0; private int countDeadlock = 0; @Override public Void answer(InvocationOnMock invocation) throws Throwable { String item = (String) invocation.getArguments()[0]; if (toFailProcessingConcurrency.equals(item) && countConcurrency < maxAttemptsConcurrency ) { countConcurrency++; throw new ConcurrencyFailureException(""); } else if (toFailProcessingDeadlock.equals(item) && countDeadlock < maxAttemptsDeadlock ) { countDeadlock++; throw new DeadlockLoserDataAccessException("", null); } return null; } }).when(service).writing(anyString()); JobExecution exec = jobLauncher.run( retryPolicyJob, new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters() ); assertThat(exec.getExitStatus().getExitCode()).isEqualToIgnoringCase(ExitStatus.COMPLETED.getExitCode()); assertRead(read, exec); assertWrite(read, exec); }