春季批处理:从数据库到JSON文件

huangapple go评论131阅读模式
英文:

Spring batch DB to JSON files

问题

这个问题可能看起来是这个问题的重复,但实际上不是。

我的需求是使用JdbcPagingItemReader从数据库中读取数据,对每条记录进行一些额外的处理,然后在写入器中为每个处理过的项创建单独的 JSON 文件,文件名为id_of_record_json_fie.txt

例如,如果读取器读取了100条记录,那么就必须创建100个 JSON 文件。

最好的方法是什么?我们可以使用 Spring Batch 来实现这个吗?

更新 1:

根据 @Mahmoud 的回答,可以使用 tasklet 来实现,我还尝试过在基于块的步骤中实现自定义的 itemwriter,这看起来也可以工作。

@Override
public void write(final List<? extends Person> persons) throws Exception {
    
    for (Person person: persons) {
        objectMapper.writeValue(new File("D:/cp/dataTwo.json"), person);
    }
}
英文:

This question might seem to be a duplicate of this but it is not

My requirement is to read data from db using JdbcPagingItemReader and process individual records for some additional processing and in writer create individual json files for each processed item with file name id_of_record_json_fie.txt

For example if reader reads 100 records then 100 JSON files has to be created

What is the best way to do this, Can we use spring batch for this ?

Update 1-:

As per @Mahmoud answer, tasklet can be used , I have also tried implementing custom itemwriter in a chunk oriented step , this also seems to work

      @Override
        public void write(final List&lt;? extends Person&gt; persons) throws Exception {
            
            for (Person  person: persons) {
                objectMapper.writeValue(new File(&quot;D:/cp/dataTwo.json&quot;), person);
            }
            
        }

答案1

得分: 4

使用基于块的任务不会起作用,因为将有一个单一的项目写入器,在其中资源被预先设置并在整个步骤期间保持不变。使用复合项目写入器可能会起作用,但您需要知道要创建和预先配置多少个不同的写入器。

我看到的最直接的选项是使用任务,类似于:

import java.util.Collections;
import java.util.HashMap;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Bean
    public JdbcPagingItemReader&lt;Person&gt; itemReader() {
        return new JdbcPagingItemReaderBuilder&lt;Person&gt;()
                .name(&quot;personItemReader&quot;)
                .dataSource(dataSource())
                .beanRowMapper(Person.class)
                .selectClause(&quot;select *&quot;)
                .fromClause(&quot;from person&quot;)
                .sortKeys(new HashMap&lt;String, Order&gt;() {{ put(&quot;id&quot;, Order.DESCENDING);}})
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get(&quot;job&quot;)
                .start(steps.get(&quot;step&quot;)
                        .tasklet(new MyTasklet(itemReader()))
                        .build())
                .build();
    }

    private static class MyTasklet implements Tasklet {

        private boolean readerInitialized;
        private JdbcPagingItemReader&lt;Person&gt; itemReader;

        public MyTasklet(JdbcPagingItemReader&lt;Person&gt; itemReader) {
            this.itemReader = itemReader;
        }

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
            if (!readerInitialized) {
                itemReader.open(executionContext);
                readerInitialized = true;
            }
            Person person = itemReader.read();
            if (person == null) {
                itemReader.close();
                return RepeatStatus.FINISHED;
            }
            // process the item
            process(person);
            // write the item in its own file (dynamically generated at runtime)
            write(person, executionContext);
            // save current state in execution context: in case of restart after failure, the job would resume where it left off.
            itemReader.update(executionContext);
            return RepeatStatus.CONTINUABLE;
        }

        private void process(Person person) {
            // do something with the item
        }

        private void write(Person person, ExecutionContext executionContext) throws Exception {
            FlatFileItemWriter&lt;Person&gt; itemWriter = new FlatFileItemWriterBuilder&lt;Person&gt;()
                    .resource(new FileSystemResource(&quot;person&quot; + person.getId() + &quot;.csv&quot;))
                    .name(&quot;personItemWriter&quot;)
                    .delimited()
                    .names(&quot;id&quot;, &quot;name&quot;)
                    .build();
            itemWriter.open(executionContext);
            itemWriter.write(Collections.singletonList(person));
            itemWriter.close();
        }

    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    @Bean
    public DataSource dataSource() {
        EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.H2)
                .addScript(&quot;/org/springframework/batch/core/schema-drop-h2.sql&quot;)
                .addScript(&quot;/org/springframework/batch/core/schema-h2.sql&quot;)
                .build();
        JdbcTemplate jdbcTemplate = new JdbcTemplate(embeddedDatabase);
        jdbcTemplate.execute(&quot;create table person (id int primary key, name varchar(20));&quot;);
        for (int i = 1; i &lt;= 10; i++) {
            jdbcTemplate.execute(String.format(&quot;insert into person values (%s, &#39;foo%s&#39;);&quot;, i, i));
        }
        return embeddedDatabase;
    }

    static class Person {
        private int id;
        private String name;

        public Person() {
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String toString() {
            return &quot;Person{id=&quot; + id + &quot;, name=&#39;&quot; + name + &#39;\&#39;&#39; + &#39;}&#39;;
        }
    }

}

此示例从数据库表中读取10个人,并生成10个CSV文件(person1.csvperson2.csv等)。

英文:

Using a chunk-oriented tasklet won't work, because there will be a single item writer on which the resource is set upfront and will be fixed during the entire step. Using a composite item writer might work but you need to know how many distinct writers to create and configure upfront.

The most straightforward option I see is to use a tasklet, something like:

import java.util.Collections;
import java.util.HashMap;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Bean
public JdbcPagingItemReader&lt;Person&gt; itemReader() {
return new JdbcPagingItemReaderBuilder&lt;Person&gt;()
.name(&quot;personItemReader&quot;)
.dataSource(dataSource())
.beanRowMapper(Person.class)
.selectClause(&quot;select *&quot;)
.fromClause(&quot;from person&quot;)
.sortKeys(new HashMap&lt;String, Order&gt;() {{ put(&quot;id&quot;, Order.DESCENDING);}})
.build();
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get(&quot;job&quot;)
.start(steps.get(&quot;step&quot;)
.tasklet(new MyTasklet(itemReader()))
.build())
.build();
}
private static class MyTasklet implements Tasklet {
private boolean readerInitialized;
private JdbcPagingItemReader&lt;Person&gt; itemReader;
public MyTasklet(JdbcPagingItemReader&lt;Person&gt; itemReader) {
this.itemReader = itemReader;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
if (!readerInitialized) {
itemReader.open(executionContext);
readerInitialized = true;
}
Person person = itemReader.read();
if (person == null) {
itemReader.close();
return RepeatStatus.FINISHED;
}
// process the item
process(person);
// write the item in its own file (dynamically generated at runtime)
write(person, executionContext);
// save current state in execution context: in case of restart after failure, the job would resume where it left off.
itemReader.update(executionContext);
return RepeatStatus.CONTINUABLE;
}
private void process(Person person) {
// do something with the item
}
private void write(Person person, ExecutionContext executionContext) throws Exception {
FlatFileItemWriter&lt;Person&gt; itemWriter = new FlatFileItemWriterBuilder&lt;Person&gt;()
.resource(new FileSystemResource(&quot;person&quot; + person.getId() + &quot;.csv&quot;))
.name(&quot;personItemWriter&quot;)
.delimited()
.names(&quot;id&quot;, &quot;name&quot;)
.build();
itemWriter.open(executionContext);
itemWriter.write(Collections.singletonList(person));
itemWriter.close();
}
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
@Bean
public DataSource dataSource() {
EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript(&quot;/org/springframework/batch/core/schema-drop-h2.sql&quot;)
.addScript(&quot;/org/springframework/batch/core/schema-h2.sql&quot;)
.build();
JdbcTemplate jdbcTemplate = new JdbcTemplate(embeddedDatabase);
jdbcTemplate.execute(&quot;create table person (id int primary key, name varchar(20));&quot;);
for (int i = 1; i &lt;= 10; i++) {
jdbcTemplate.execute(String.format(&quot;insert into person values (%s, &#39;foo%s&#39;);&quot;, i, i));
}
return embeddedDatabase;
}
static class Person {
private int id;
private String name;
public Person() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return &quot;Person{id=&quot; + id + &quot;, name=&#39;&quot; + name + &#39;\&#39;&#39; + &#39;}&#39;;
}
}
}

This example reads 10 persons from a db table and generates 10 csv files (person1.csv, person2.csv, etc)

huangapple
  • 本文由 发表于 2020年7月23日 17:04:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/63050678.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定