Spring Batch 2.0: Repeat a tasklet

In some scenarios, such as downloading by FTP the same file from different servers, following tasklet repeat pattern can be useful.

Job definition

< ?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:batch="http://www.springframework.org/schema/batch"
  xmlns:aop="http://www.springframework.org/schema/aop"
  xsi:schemaLocation="http://www.springframework.org/schema/aop
    http://www.springframework.org/schema/aop/spring-aop.xsd
    http://www.springframework.org/schema/batch
    http://www.springframework.org/schema/batch/spring-batch-2.0.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- Repeat job -->
    <batch_job id="repeatJob" job-repository="jobRepository">
        <batch_step id="repeatTasklet">
            <batch_tasklet ref="taskletRepeat" />
        </batch_step>
    </batch_job>

    <!-- Repeat tasklet definition -->
    <bean id="taskletRepeat"
        class="foo.tasklet.RepeatTasklet">
<property name="repeatPolicy" ref="repeatPolicy" />
    </bean>

    <bean id="repeatPolicy"
        class="foo.policy.RepeatPolicy">
<property name="elements">
	<list>
                <value>element1</value>
                <value>element2</value>
                <value>element3</value>
            </list>
        </property>
    </bean>

    <!-- AOP repeat configuration -->
    <aop_config>
        <aop_pointcut id="transactional"
            expression="execution(* foo..RepeatTasklet.execute(..))" />
        <aop_advisor pointcut-ref="transactional"
            advice-ref="repeatAdvice"
            order="-1"/>
    </aop_config>

    <bean id="repeatAdvice"
        class="org.springframework.batch.repeat.interceptor.RepeatOperationsInterceptor">
<property name="repeatOperations">
          <bean class="org.springframework.batch.repeat.support.RepeatTemplate">
<property name="completionPolicy" ref="repeatPolicy" />
          </bean>
      </property>
    </bean>

</beans>

Repeat Tasklet

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

import es.eds.springbatch.test.repeat.policy.RepeatPolicy;

public class RepeatTasklet implements Tasklet {

  private RepeatPolicy repeatPolicy;

  public RepeatStatus execute(
      StepContribution stepContribution,
      ChunkContext chunkContext)
      throws Exception {

    System.out.println("Current element: " + repeatPolicy.getCurrentElement());
    return RepeatStatus.FINISHED;

  }

  public RepeatPolicy getRepeatPolicy() {
    return repeatPolicy;
  }

  public void setRepeatPolicy(RepeatPolicy repeatPolicy) {
    this.repeatPolicy = repeatPolicy;
  }

}

Repeat Policy

import java.util.List;
import org.springframework.batch.repeat.CompletionPolicy;
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.RepeatStatus;

public class RepeatPolicy implements CompletionPolicy {

  private List<string> elements;
  private String currentElement;

  public boolean isComplete(RepeatContext repeatContext) {
    return elements.size() == 0;
  }

  public boolean isComplete(RepeatContext repeatContext,
      RepeatStatus repeatStatus) {
    return elements.size() == 0;
  }

  public RepeatContext start(RepeatContext repeatContext) {
    return repeatContext;
  }

  public void update(RepeatContext repeatContext) {
    currentElement = elements.remove(0);
    }

  public List</string><string> getElements() {
    return elements;
  }

  public void setElements(List</string><string> elements) {
    this.elements = elements;
  }

  public String getCurrentElement() {
    return currentElement;
  }

}

Spring Batch 2.0: Retry a tasklet

Retry strategies in Spring Batch 2.0 allow chunk reprocessing under several fails. However, there is no core support to Tasklet steps retry and so, AOP techniques must be used.

Below both retry patterns (chunk and tasklet) are exposed.

Chunk retry: retry three times if some exception occurs on “chunk” execution


    
        
            
                java.lang.Exception
            
        
    

Tasklet retry: retry three times if some exception occurs on “tasklet” execution


    





    
    



  
    
      
        
          
          
	            
              java.lang.Exception
            
          
        
      
    
  


Spring Batch 2.0: Current Resource of a MultiResourceItemReader

[UPDATE. This feature is available on Spring Batch 2.0.1. Thanks for your interest, Dan. So, following code it’s just only an exercise on reflection and dirty access to private methods and fields]

MultiResourceItemReader class reads items from multiple resources sequentially.

Below a classic job configuration using this feature is exposed.




    
        
            
                
            
        
    

    

    


  

  
     <!-- Configuration omitted -->
  

  
     <!-- Configuration omitted -->
  


Note. “job”, “step”, “tasklet” and “chunk” must appear as “batch:job”,… in the listing above, but WordPress sourcecode plugin does not want to show them so.

In some scenarios should be useful to know resource file name at processing time in order to perform some mappings or writings depending on this file name. However, there is no path to access this information using Spring Batch API.

So, MultiResourceItemReader must be modified (in a some dirty way) to share resource name across the Step Context.

import java.lang.reflect.Field;
import java.util.List;

import org.apache.log4j.Logger;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.core.io.Resource;

public class CustomMultiResourceItemReader extends MultiResourceItemReader
	<list > {

  private static final Logger log = Logger.getLogger(CustomMultiResourceItemReader.class.getName());

  @Override
  public void open(ExecutionContext executionContext) throws ItemStreamException {

    super.open(executionContext);

    try {

      Field resourcesField = MultiResourceItemReader.class.getDeclaredField("resources");
      resourcesField.setAccessible(true);
      Resource[] resources = (Resource[]) resourcesField.get(this);

      executionContext.put("current.resource.name", resources[0].getFilename());

    } catch (Exception e) {
      log.error(e.getMessage(), e);
    }
  }

  @Override
  public void update(ExecutionContext executionContext) throws ItemStreamException {

    super.update(executionContext);

    try {

      Field indexField = MultiResourceItemReader.class.getDeclaredField("index");
      indexField.setAccessible(true);
      Object index = indexField.get(this);

      Field currentResourceField = index.getClass().getDeclaredField("currentResource");
      currentResourceField.setAccessible(true);
      Integer currentResource = (Integer)currentResourceField.get(index);

      Field resourcesField = MultiResourceItemReader.class.getDeclaredField("resources");
      resourcesField.setAccessible(true);
      Resource[] resources = (Resource[]) resourcesField.get(this);

      if (currentResource < resources.length) {
          executionContext.put("current.resource.name", resources[currentResource].getFile().getPath());
      } else {
          executionContext.put("current.resource.name", null);
      }

    } catch (Exception e) {
      log.error(e.getMessage(), e);
    }
  }

}

Note that this modification may only work with Spring Batch 2.0 because it's accessing private fields and methods within core classes.