Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
933 views
in Technique[技术] by (71.8m points)

spring - How should I use .tasklet() / .chunk() to finish job succesfully?

I use Spring Batch for cloning table from source to target database. The job is started manually from service layer using jobLauncher with passing parameters.

Everything is fine, but using current configuration (below) with .chunk(10) in step description I have only 10 rows cloned and Caused by: java.sql.SQLException: Result set already closed exception.

How to describe step properly just to finish read->write the whole table from source to target DB?

@Configuration
@EnableBatchProcessing
public class DatasetProcessingContext {

    private static final String OVERRIDEN_BY_JOB_PARAMETER = null;
    private static final String DATASET_PROCESSING_STEP = "datasetProcessingStep";
    private static final String DATASET_PROCESSING_JOB = "datasetProcessingJob";

    public static final String SUBSYSTEM = "subsystem";
    public static final String SQL = "sql";
    public static final String SOURCE_DATASOURCE = "sourceDatasource";
    public static final String INSERT_QUERY = "insertQuery";
    public static final String TARGET_DATASOURCE = "targetDatasource";

    @Autowired
    @Qualifier(DEV_DATA_SOURCE)
    private DataSource devDataSource;

    //set of datasources

    @Autowired
    private PlatformTransactionManager transactionManager;

    @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
    @Autowired
    private Map<String, TableMessageDataRowMapper> tableMessageDataRowMappers;

    @SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
    @Autowired
    private Map<String, TableMessageDataPreparedStatementSetter> messageDataPreparedStatementSetters;

    @Autowired
    private JobBuilderFactory jobsFactory;

    @Autowired
    private StepBuilderFactory stepsFactory;

    @Bean
    public JobRepository jobRepository() throws Exception {
        return new MapJobRepositoryFactoryBean(transactionManager).getObject();
    }

    @Bean
    public JobRegistry jobRegistry() {
        return new MapJobRegistry();
    }

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
        JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
        postProcessor.setJobRegistry(jobRegistry());
        return postProcessor;
    }

    @Bean
    public JobLauncher jobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository());
        return jobLauncher;
    }

    @Bean
    public static StepScope stepScope() {
        return new StepScope();
    }

    @Bean
    @SuppressWarnings("unchecked")
    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
    public ItemStreamReader jdbcReader(@Value("#{jobParameters['" + SUBSYSTEM + "']}") String subsystem,
                                       @Value("#{jobParameters['" + SQL + "']}") String sql,
                                       @Value("#{jobParameters['" + SOURCE_DATASOURCE + "']}") String sourceDatasource) {

        JdbcCursorItemReader jdbcCursorItemReader = new JdbcCursorItemReader();
        jdbcCursorItemReader.setDataSource(getDataSourceFromEnum(TargetDataSource.valueOf(sourceDatasource)));
        jdbcCursorItemReader.setSql(sql);
        jdbcCursorItemReader.setRowMapper((RowMapper) tableMessageDataRowMappers
                .get(subsystem + TABLE_MESSAGE_DATA_ROW_MAPPER));

        return jdbcCursorItemReader;
    }

    @Bean
    @SuppressWarnings("unchecked")
    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
    public ItemWriter jdbcWriter(@Value("#{jobParameters['" + SUBSYSTEM + "']}") String subsystem,
                                 @Value("#{jobParameters['" + INSERT_QUERY + "']}") String insertQuery,
                                 @Value("#{jobParameters['" + TARGET_DATASOURCE + "']}") String targetDatasource) {

        JdbcBatchItemWriter jdbcWriter = new JdbcBatchItemWriter();
        jdbcWriter.setDataSource(getDataSourceFromEnum(TargetDataSource.valueOf(targetDatasource)));
        jdbcWriter.setSql(insertQuery);
        jdbcWriter.setItemPreparedStatementSetter(messageDataPreparedStatementSetters
                .get(subsystem + TABLE_MESSAGE_DATA_PREPARED_STATEMENT_SETTER));

        return jdbcWriter;
    }

    @Bean
    @SuppressWarnings("unchecked")
    public Step datasetProcessingStep() {

        return stepsFactory.get(DATASET_PROCESSING_STEP)
                // should I create Tasklet or chunk with some CompletionPolicy?
                .chunk(10)
                .reader(jdbcReader(OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER))
                .writer(jdbcWriter(OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER))
                .allowStartIfComplete(true)
                .build();
    }

    @Bean
    public Job datasetProcessingJob() {

        return jobsFactory.get(DATASET_PROCESSING_JOB).start(datasetProcessingStep()).build();
    }
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Using .chunk(new DefaultResultCompletionPolicy()) in step description is suitable for my case. This policy returns true from isComplete(RepeatContext context, RepeatStatus result) in case of null-result - than ResultSet is over.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...