A short introduction to Spring Batch 2.0
Spring Batch provides a framework in order to develop batch processes in Java. Recently version 2.0 has been released. However it’s still hard to find good samples and resources in the web, so old-fashioned development (based on RTFM) must be performed.
The main idea is to build Jobs from sequential steps. Each step can be a single task or an input/output process. Following Spring philosophy, configuration can be done using XML.
At least, two XML files must be defined:
- Launch context. Job launcher, datasources and other usual Spring resources.
- Job definition. Jobs and Spring beans implementing tasks.
A batch can be launched from command line using CommandLineJobLauncher class provided by the framework. The argument “job” indicates job name and some other job parameters can be added using “key=value” pattern.
java CommandLineJobRunner job-definition.xml job date=2008/01/01
Below a basic sample (read web server log file and insert processed lines to database) is exposed.
Launch context (job-definition.xml)
< ?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">
<import resource="classpath:module-context.xml"/>
<!-- In memory JOB Management -->
<bean id="jobLauncher" class="es.lacaixa.premia.batch.launcher.StatisticsBatchLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager"/>
</bean>
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<!-- Datasource -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.SingleConnectionDataSource">
<property name="driverClassName" value="${jdbc.driver}" />
<property name="url" value="${jdbc.url}" />
<property name="username" value="${jdbc.username}" />
<property name="password" value="${jdbc.password}" />
</bean>
<bean id="transactionManagerAccess" class="org.springframework.jdbc.datasource.DataSourceTransactionManager" lazy-init="true">
<property name="dataSource" ref="dataSource" />
</bean>
</beans>
Job definition (module-context.xml)
< ?xml version="1.0" encoding="UTF-8"?>
<beans :beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-2.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-2.0.xsd">
<job id="job" job-repository="jobRepository">
<step id="stepWSLogFiles">
<tasklet>
<chunk processor="wsLogToDbTransform"
reader="flatFileWSItemReader"
writer="localDBItemWriter" commit-interval="100" />
</tasklet>
</step>
</job>
</beans><beans :bean id="localDbItemWriter" class="com.foo.PeticionDao">
<beans :property name="dataSource" ref="dataSource" />
</beans>
<beans :bean id="flatFileWSItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
<beans :property name="resource" value="${webserver.file}*" />
</beans><beans :property name="lineMapper">
</beans><beans :bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
</beans><beans :property name="lineTokenizer">
</beans><beans :bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<beans :property name="delimiter" value=";" />
<beans :property name="names" value="date,hora,tm" />
</beans>
<beans :property name="fieldSetMapper">
</beans><beans :bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<beans :property name="targetType" value="com.foo.FileLogWS" />
</beans>
<beans :bean id="wsLogToDbTransform" class="com.foo.WSLogToDbTransform" />
Oracle JDBC Driver and Spring 2.0.X Timestamp issue
Oracle JDBC Driver (ojdbc14.jar)
import org.springframework.jdbc.support.rowset.SqlRowSet;
public class QueryDAO extends JdbcDaoSupport {
public SqlRowSet loadSqlRowSet(String sSQL) {
return getJdbcTemplate().queryForRowSet(sSQL);
}
}
Using Spring JDBC in such way Timestamp fields are recovered without time information (hours, minutes and seconds set to 0). It seems some issue relative to Sun or Oracle, but there is a workaround in order to achieve desired result.
SqlRowSet rset =
jdbcTemplate.queryForRowSet("select cast(sysdate as timestamp) from dual");
while (rset.next()) {
System.out.println("-> " +
((oracle.sql.TIMESTAMP)rset.getObject(1)).timestampValue());
}
Our batch process failed last night because of this…
Spring WS 1.0 using Castor 1.1.1 marshalling (force UTF-8 encoding)
While XMLBeans performs marshalling operations using UTF-8 encoding by default, Castor XML relies on implicit Stream encoding. So, it’s mandatory to set UTF-8 encoding when using Castor marshalling on a Web Services client.
Below a Spring WS web service client sample using XWSS 3.0 is exposed. Spring’s application context (applicationContext.xml) and XWSS security policy file (wss-client-config.xml) are required (Spring WS Client).
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import javax.xml.soap.SOAPMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.oxm.castor.CastorMarshaller;
import org.springframework.ws.WebServiceMessage;
import org.springframework.ws.client.core.WebServiceMessageCallback;
import org.springframework.ws.client.core.WebServiceTemplate;
import org.springframework.ws.client.core.support.WebServiceGatewaySupport;
import org.springframework.ws.soap.saaj.SaajSoapMessage;
import org.springframework.ws.soap.security.xwss.XwsSecuritySecurementException;
import org.springframework.ws.soap.security.xwss.callback.KeyStoreCallbackHandler;
import com.sun.xml.wss.ProcessingContext;
import com.sun.xml.wss.XWSSProcessor;
import com.sun.xml.wss.XWSSProcessorFactory;
import com.sun.xml.wss.XWSSecurityException;
public class WSCastorClient extends WebServiceGatewaySupport {
// XWSS supporting objects
private static XWSSProcessor cprocessor;
private KeyStoreCallbackHandler ksch;
/**
* Get the keystore
* @return
*/
public KeyStoreCallbackHandler getKsch() {
return ksch;
}
/**
* Set the keystore
* @param ksch
*/
public void setKsch(KeyStoreCallbackHandler ksch) {
this.ksch = ksch;
}
/**
* Default constructor.
*
* @param ksch Keystore
* @param castorMarshaller Castor marshaller
* @param castorUnmarshaller Castor unmarshaller
*/
public WSCastorClient(KeyStoreCallbackHandler ksch, CastorMarshaller castorMarshaller, CastorMarshaller castorUnmarshaller) {
this.ksch = ksch;
this.setMarshaller(castorMarshaller);
this.setUnmarshaller(castorUnmarshaller);
}
/**
* Invokes a web service using defaultUri
*
* @param the unnmarshalled message payload as object
*
* @return the object to be marshalled as response
*/
private Object callWs(Object input) {
WebServiceTemplate wst = getWebServiceTemplate();
wst.setMarshaller(getMarshaller());
wst.setUnmarshaller(getUnmarshaller());
Object result = wst.marshalSendAndReceive(input,
new WebServiceMessageCallback() {
public void doWithMessage(WebServiceMessage message) throws IOException {
SaajSoapMessage saajSoapMessage = (SaajSoapMessage) message;
// UTF-8 encoding
try {
ByteArrayOutputStream os = new ByteArrayOutputStream();
message.writeTo(os);
StringBuilder sm = new StringBuilder(os.toString());
os.close();
ByteArrayInputStream is = new ByteArrayInputStream(sm.toString().getBytes("UTF-8"));
SaajSoapMessage wsm = (SaajSoapMessage)getWebServiceTemplate().getMessageFactory().createWebServiceMessage(is);
saajSoapMessage.setSaajMessage(wsm.getSaajMessage());
is.close();
} catch (Exception e) {
e.printStackTrace();
}
// Do WSS signature
SOAPMessage saajMessage = saajSoapMessage.getSaajMessage();
try {
ProcessingContext context = new ProcessingContext();
context.setSOAPMessage(saajMessage);
SOAPMessage securedMessage = cprocessor.secureOutboundMessage(context);
saajSoapMessage.setSaajMessage(securedMessage);
} catch (XWSSecurityException e) {
throw new XwsSecuritySecurementException(e.getMessage());
}
}
});
return result;
}
/**
* Invoke web service
* @param opeRequest
* @param invoker
* @return
*/
public static Object invoke(Object opeRequest, Class invoker) {
// Application Context load
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml", invoker);
// XWSS parameters
Resource xwssConfig = new ClassPathResource("wss-client-config.xml", invoker);
// XWSS client
WSCastorClient xwssClient = (WSCastorClient) applicationContext.getBean("wsCastorClient");
Object ret = null;
try {
// XWSS initializacion
XWSSProcessorFactory factory = XWSSProcessorFactory.newInstance();
cprocessor = factory.createProcessorForSecurityConfiguration(xwssConfig.getInputStream(), xwssClient.getKsch());
// Web service invocation
ret = xwssClient.callWs(opeRequest);
} catch (Exception e) {
e.printStackTrace();
}
// response
return ret;
}
}