Spring Batch 2.0: Repeat a tasklet
In some scenarios, such as downloading by FTP the same file from different servers, following tasklet repeat pattern can be useful.
Job definition
< ?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-2.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- Repeat job -->
<batch_job id="repeatJob" job-repository="jobRepository">
<batch_step id="repeatTasklet">
<batch_tasklet ref="taskletRepeat" />
</batch_step>
</batch_job>
<!-- Repeat tasklet definition -->
<bean id="taskletRepeat"
class="foo.tasklet.RepeatTasklet">
<property name="repeatPolicy" ref="repeatPolicy" />
</bean>
<bean id="repeatPolicy"
class="foo.policy.RepeatPolicy">
<property name="elements">
<list>
<value>element1</value>
<value>element2</value>
<value>element3</value>
</list>
</property>
</bean>
<!-- AOP repeat configuration -->
<aop_config>
<aop_pointcut id="transactional"
expression="execution(* foo..RepeatTasklet.execute(..))" />
<aop_advisor pointcut-ref="transactional"
advice-ref="repeatAdvice"
order="-1"/>
</aop_config>
<bean id="repeatAdvice"
class="org.springframework.batch.repeat.interceptor.RepeatOperationsInterceptor">
<property name="repeatOperations">
<bean class="org.springframework.batch.repeat.support.RepeatTemplate">
<property name="completionPolicy" ref="repeatPolicy" />
</bean>
</property>
</bean>
</beans>
Repeat Tasklet
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import es.eds.springbatch.test.repeat.policy.RepeatPolicy;
public class RepeatTasklet implements Tasklet {
private RepeatPolicy repeatPolicy;
public RepeatStatus execute(
StepContribution stepContribution,
ChunkContext chunkContext)
throws Exception {
System.out.println("Current element: " + repeatPolicy.getCurrentElement());
return RepeatStatus.FINISHED;
}
public RepeatPolicy getRepeatPolicy() {
return repeatPolicy;
}
public void setRepeatPolicy(RepeatPolicy repeatPolicy) {
this.repeatPolicy = repeatPolicy;
}
}
Repeat Policy
import java.util.List;
import org.springframework.batch.repeat.CompletionPolicy;
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.RepeatStatus;
public class RepeatPolicy implements CompletionPolicy {
private List<string> elements;
private String currentElement;
public boolean isComplete(RepeatContext repeatContext) {
return elements.size() == 0;
}
public boolean isComplete(RepeatContext repeatContext,
RepeatStatus repeatStatus) {
return elements.size() == 0;
}
public RepeatContext start(RepeatContext repeatContext) {
return repeatContext;
}
public void update(RepeatContext repeatContext) {
currentElement = elements.remove(0);
}
public List</string><string> getElements() {
return elements;
}
public void setElements(List</string><string> elements) {
this.elements = elements;
}
public String getCurrentElement() {
return currentElement;
}
}
Spring Batch 2.0: Retry a tasklet
Retry strategies in Spring Batch 2.0 allow chunk reprocessing under several fails. However, there is no core support to Tasklet steps retry and so, AOP techniques must be used.
Below both retry patterns (chunk and tasklet) are exposed.
Chunk retry: retry three times if some exception occurs on “chunk” execution
<step id="step">
<tasklet>
<chunk reader="itemReader"
writer="itemWriter"
commit-interval="1"
retry-limit="3">
<retryable_exception_classes>
java.lang.Exception
</retryable_exception_classes>
</chunk>
</tasklet>
</step>
Tasklet retry: retry three times if some exception occurs on “tasklet” execution
<step id="step">
<tasklet ref="retryableStep" />
</step>
<bean id="retryableStep" class="foo.bar.RetryableStep" />
<aop_config>
<aop_pointcut id="transactional" expression="execution(* foo..RetryableStep.execute(..))" />
<aop_advisor pointcut-ref="transactional" advice-ref="retryAdvice" order="-1"/>
</aop_config>
<bean id="retryAdvice"
class="org.springframework.batch.retry.interceptor.RetryOperationsInterceptor">
<property name="retryOperations">
<bean class="org.springframework.batch.retry.support.RetryTemplate">
<property name="retryPolicy">
<bean class="org.springframework.batch.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3"/>
<property name="retryableExceptionClasses">
<list>
<value>java.lang.Exception</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
Spring Batch 2.0: Current Resource of a MultiResourceItemReader
[UPDATE. This feature is available on Spring Batch 2.0.1. Thanks for your interest, Dan. So, following code it's just only an exercise on reflection and dirty access to private methods and fields]
MultiResourceItemReader class reads items from multiple resources sequentially.
Below a classic job configuration using this feature is exposed.
< ?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-2.0.xsd">
<job id="sampleJob" job-repository="jobRepository">
<step id="stepWSDB">
<tasklet>
<chunk processor="processor"
reader="multifileReader"
writer="itemWriter"
commit-interval="100" />
</tasklet>
</step>
</job>
<bean id="processor" class="foo.bar.CustomProcessor" />
<bean id="multifileReader"
class="org.springframework.batch.item.file.MultiResourceItemReader"
scope="step"
lazy-init="true">
<property name="resources" value="file://${loc.working.directory}/*.#{jobParameters[day]}" />
<property name="delegate" ref="itemReader" />
</bean>
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
<!-- Configuration omitted -->
</bean>
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<!-- Configuration omitted -->
</bean>
</beans>
Note. “job”, “step”, “tasklet” and “chunk” must appear as “batch:job”,… in the listing above, but WordPress sourcecode plugin does not want to show them so.
In some scenarios should be useful to know resource file name at processing time in order to perform some mappings or writings depending on this file name. However, there is no path to access this information using Spring Batch API.
So, MultiResourceItemReader must be modified (in a some dirty way) to share resource name across the Step Context.
import java.lang.reflect.Field;
import java.util.List;
import org.apache.log4j.Logger;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.core.io.Resource;
public class CustomMultiResourceItemReader extends MultiResourceItemReader
<list <String>> {
private static final Logger log = Logger.getLogger(CustomMultiResourceItemReader.class.getName());
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
super.open(executionContext);
try {
Field resourcesField = MultiResourceItemReader.class.getDeclaredField("resources");
resourcesField.setAccessible(true);
Resource[] resources = (Resource[]) resourcesField.get(this);
executionContext.put("current.resource.name", resources[0].getFilename());
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
super.update(executionContext);
try {
Field indexField = MultiResourceItemReader.class.getDeclaredField("index");
indexField.setAccessible(true);
Object index = indexField.get(this);
Field currentResourceField = index.getClass().getDeclaredField("currentResource");
currentResourceField.setAccessible(true);
Integer currentResource = (Integer)currentResourceField.get(index);
Field resourcesField = MultiResourceItemReader.class.getDeclaredField("resources");
resourcesField.setAccessible(true);
Resource[] resources = (Resource[]) resourcesField.get(this);
if (currentResource < resources.length) {
executionContext.put("current.resource.name", resources[currentResource].getFile().getPath());
} else {
executionContext.put("current.resource.name", null);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
Note that this modification may only work with Spring Batch 2.0 because it’s accessing private fields and methods within core classes.