springbatch 分区 vs 并行

huangapple go评论55阅读模式
英文:

springbatch partition vs parallel

问题

我需要你的帮助。
这是我想要编程的故事。

  1. 每5个目标(商店)调用REST API
    以获取orderId和orderIds的总数,并将它们保存到存储库中。

  2. orderId和orderIds的总数平均分配
    并且每5个目标(商店)调用另一个REST API以获取订单列表
    使用orderId和orderIds的总数作为参数,并保存到存储库中。

我想知道在Spring Batch中应该如何实现,比如分区,并行步骤...
你能帮我组织制作它的过程吗?

我正在学习关于分区和并行步骤。

英文:

I need your help.
Here's the story that I want to program.

  1. call rest api each 5 targets(stores)
    to get the total number of orderId and orderIds and save them to the repository.

  2. select total number of orderId and orderIds dividely
    and call another rest api each 5 targets(stores) to get order list
    using as parameter total number of orderId and orderIdsand save to the repository.

I want to know how should I do in the spring batch like partitioning, parallel steps...
Can you help me to organize the process of how to make it?

I am learning about partitioning and parallel steps

答案1

得分: 0

  • 为每个主步骤定义作业:
    步骤1:从REST API获取订单数据。
    步骤2:调用另一个REST API获取订单列表并保存到存储库。

  • 为每个步骤实现分区,以并行处理多个商店。

定义作业:

@Configuration
@EnableBatchProcessing
public class OrderProcessingJobConfig {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    // 定义你的读取器、处理器和写入器的bean

    @Bean
    public Job orderProcessingJob() {
        return jobBuilderFactory.get("orderProcessingJob")
                .start(fetchOrderDataStep())
                .next(getOrderListStep())
                .build();
    }

    @Bean
    public Step fetchOrderDataStep() {
        // 步骤1的配置在这里
    }

    @Bean
    public Step getOrderListStep() {
        // 步骤2的配置在这里
    }
}

为每个步骤实现分区:

@Bean
public Step fetchOrderDataStep() {
    return stepBuilderFactory.get("fetchOrderDataStep")
            .partitioner("fetchOrderDataStep", partitioner())
            .step(fetchOrderDataStepSlave())
            .gridSize(5) // 并行线程数
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public Partitioner partitioner() {
    return new StorePartitioner(); // 自定义分区实现
}

@Bean
public Step fetchOrderDataStepSlave() {
    return stepBuilderFactory.get("fetchOrderDataStepSlave")
            .<InputType, OutputType>chunk(5)
            .reader(reader(null))
            .processor(processor())
            .writer(writer())
            .build();
}

@Bean
@StepScope
public ItemReader<InputType> reader(@Value("#{stepExecutionContext['targetStore']}") String targetStore) {
    // 实现你的REST API读取器
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor();
}

@Bean
public Step getOrderListStep() {
    return stepBuilderFactory.get("getOrderListStep")
            .partitioner("getOrderListStep", partitioner())
            .step(getOrderListStepSlave())
            .gridSize(5) // 并行线程数
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public Step getOrderListStepSlave() {
    return stepBuilderFactory.get("getOrderListStepSlave")
            .<InputType, OutputType>chunk(5)
            .reader(orderListReader(null))
            .processor(orderListProcessor())
            .writer(orderListWriter())
            .build();
}

@Bean
@StepScope
public ItemReader<InputType> orderListReader(@Value("#{stepExecutionContext['targetStore']}") String targetStore) {
    // 实现用于订单列表的REST API读取器
}

根据评论中的新要求

为两个主步骤定义作业(与之前相同):
a. 步骤1:从REST API获取订单数据。
b. 步骤2:调用另一个REST API获取订单列表并保存到存储库。

使用Flow定义每个目标商店的并行步骤。

@Configuration
@EnableBatchProcessing
public class OrderProcessingJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    // 定义你的读取器、处理器和写入器的bean

    @Bean
    public Job orderProcessingJob() {
        return jobBuilderFactory.get("orderProcessingJob")
                .start(fetchOrderDataFlow())
                .split(taskExecutor()) // 添加此行以拆分流程
                .add(getOrderListFlow())
                .end()
                .build();
    }

    @Bean
    public Flow fetchOrderDataFlow() {
        FlowBuilder<SimpleFlow> flowBuilder = new FlowBuilder<>("fetchOrderDataFlow");
        // 并行添加所有目标商店步骤
        flowBuilder.split(taskExecutor())
                .add(fetchOrderDataStep1(), fetchOrderDataStep2(), fetchOrderDataStep3(), fetchOrderDataStep4(), fetchOrderDataStep5());
        return flowBuilder.build();
    }

    @Bean
    public Flow getOrderListFlow() {
        FlowBuilder<SimpleFlow> flowBuilder = new FlowBuilder<>("getOrderListFlow");
        // 并行添加所有目标商店步骤
        flowBuilder.split(taskExecutor())
                .add(getOrderListStep1(), getOrderListStep2(), getOrderListStep3(), getOrderListStep4(), getOrderListStep5());
        return flowBuilder.build();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();
    }
}

// 为每个目标商店定义单独的步骤bean

// 步骤1:从REST API获取订单数据
@Bean
public Step fetchOrderDataStep1() {
    return createFetchOrderDataStep("fetchOrderDataStep1", "store1");
}

@Bean
public Step fetchOrderDataStep2() {
    return createFetchOrderDataStep("fetchOrderDataStep2", "store2");
}

// 其他商店的步骤类似,请自行补充

private Step createFetchOrderDataStep(String stepName, String targetStore) {
    return stepBuilderFactory.get(stepName)
            .<InputType, OutputType>chunk(5)
            .reader(reader(targetStore))
            .processor(processor())
            .writer(writer())
            .build();
}

// 步骤2:调用另一个REST API获取订单列表并保存到存储库
@Bean
public Step getOrderListStep1() {
    return createGetOrderListStep("getOrderListStep1", "store1");
}

@Bean
public Step getOrderListStep2() {
    return createGetOrderListStep("getOrderListStep2", "store2");
}

// 其他商店的步骤类似,请自行补充

private Step createGetOrderListStep(String stepName, String targetStore) {
    return stepBuilderFactory.get(stepName)
            .<InputType, OutputType>chunk(5)
            .reader(orderListReader(targetStore))
            .processor(orderListProcessor())
            .writer(orderListWriter())
            .build();
}
英文:
  • Define a Job with two main steps:
    Step 1: Fetch order data from the REST APIs.
    Step 2: Call another REST API to get the order list and save it to the repository.

  • Implement partitioning for each step to process multiple stores in parallel.

Define the Job:

@Configuration
@EnableBatchProcessing
public class OrderProcessingJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
// Define your reader, processor, and writer beans
@Bean
public Job orderProcessingJob() {
return jobBuilderFactory.get(&quot;orderProcessingJob&quot;)
.start(fetchOrderDataStep())
.next(getOrderListStep())
.build();
}
@Bean
public Step fetchOrderDataStep() {
// Step 1 configuration goes here
}
@Bean
public Step getOrderListStep() {
// Step 2 configuration goes here
}
}

Implement partitioning for each step:

@Bean
public Step fetchOrderDataStep() {
return stepBuilderFactory.get(&quot;fetchOrderDataStep&quot;)
.partitioner(&quot;fetchOrderDataStep&quot;, partitioner())
.step(fetchOrderDataStepSlave())
.gridSize(5) // Number of parallel threads
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Partitioner partitioner() {
return new StorePartitioner(); // A custom partitioner implementation
}
@Bean
public Step fetchOrderDataStepSlave() {
return stepBuilderFactory.get(&quot;fetchOrderDataStepSlave&quot;)
.&lt;InputType, OutputType&gt;chunk(5)
.reader(reader(null))
.processor(processor())
.writer(writer())
.build();
}
@Bean
@StepScope
public ItemReader&lt;InputType&gt; reader(@Value(&quot;#{stepExecutionContext[&#39;targetStore&#39;]}&quot;) String targetStore) {
// Implement your REST API reader
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}

Call another REST API to get the order list and save it to the repository.

@Bean
public Step getOrderListStep() {
return stepBuilderFactory.get(&quot;getOrderListStep&quot;)
.partitioner(&quot;getOrderListStep&quot;, partitioner())
.step(getOrderListStepSlave())
.gridSize(5) // Number of parallel threads
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step getOrderListStepSlave() {
return stepBuilderFactory.get(&quot;getOrderListStepSlave&quot;)
.&lt;InputType, OutputType&gt;chunk(5)
.reader(orderListReader(null))
.processor(orderListProcessor())
.writer(orderListWriter())
.build();
}
@Bean
@StepScope
public ItemReader&lt;InputType&gt; orderListReader(@Value(&quot;#{stepExecutionContext[&#39;targetStore&#39;]}&quot;) String targetStore) {
// Implement your REST API reader for the order list
}

for the new requirement as per the comment

Define a Job with two main steps (as before):
a. Step 1: Fetch order data from the REST APIs.
b. Step 2: Call another REST API to get the order list and save it to the repository.

Use a Flow to define parallel steps for each target store.

@Configuration
@EnableBatchProcessing
public class OrderProcessingJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
// Define your reader, processor, and writer beans
@Bean
public Job orderProcessingJob() {
return jobBuilderFactory.get(&quot;orderProcessingJob&quot;)
.start(fetchOrderDataFlow())
.split(taskExecutor()) // Add this line to split the flow
.add(getOrderListFlow())
.end()
.build();
}
@Bean
public Flow fetchOrderDataFlow() {
FlowBuilder&lt;SimpleFlow&gt; flowBuilder = new FlowBuilder&lt;&gt;(&quot;fetchOrderDataFlow&quot;);
// Add all target store steps in parallel
flowBuilder.split(taskExecutor())
.add(fetchOrderDataStep1(), fetchOrderDataStep2(), fetchOrderDataStep3(), fetchOrderDataStep4(), fetchOrderDataStep5());
return flowBuilder.build();
}
@Bean
public Flow getOrderListFlow() {
FlowBuilder&lt;SimpleFlow&gt; flowBuilder = new FlowBuilder&lt;&gt;(&quot;getOrderListFlow&quot;);
// Add all target store steps in parallel
flowBuilder.split(taskExecutor())
.add(getOrderListStep1(), getOrderListStep2(), getOrderListStep3(), getOrderListStep4(), getOrderListStep5());
return flowBuilder.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
}

2: Define separate step beans for each target store:

// Step 1: Fetch order data from the REST APIs
@Bean
public Step fetchOrderDataStep1() {
return createFetchOrderDataStep(&quot;fetchOrderDataStep1&quot;, &quot;store1&quot;);
}
@Bean
public Step fetchOrderDataStep2() {
return createFetchOrderDataStep(&quot;fetchOrderDataStep2&quot;, &quot;store2&quot;);
}
@Bean
public Step fetchOrderDataStep3() {
return createFetchOrderDataStep(&quot;fetchOrderDataStep3&quot;, &quot;store3&quot;);
}
@Bean
public Step fetchOrderDataStep4() {
return createFetchOrderDataStep(&quot;fetchOrderDataStep4&quot;, &quot;store4&quot;);
}
@Bean
public Step fetchOrderDataStep5() {
return createFetchOrderDataStep(&quot;fetchOrderDataStep5&quot;, &quot;store5&quot;);
}
private Step createFetchOrderDataStep(String stepName, String targetStore) {
return stepBuilderFactory.get(stepName)
.&lt;InputType, OutputType&gt;chunk(5)
.reader(reader(targetStore))
.processor(processor())
.writer(writer())
.build();
}

// Step 2: Call another REST API to get the order list and save it to the repository

@Bean
public Step getOrderListStep1() {
return createGetOrderListStep(&quot;getOrderListStep1&quot;, &quot;store1&quot;);
}
@Bean
public Step getOrderListStep2() {
return createGetOrderListStep(&quot;getOrderListStep2&quot;, &quot;store2&quot;);
}
@Bean
public Step getOrderListStep3() {
return createGetOrderListStep(&quot;getOrderListStep3&quot;, &quot;store3&quot;);
}
@Bean
public Step getOrderListStep4() {
return createGetOrderListStep(&quot;getOrderListStep4&quot;, &quot;store4&quot;);
}
@Bean
public Step getOrderListStep5() {
return createGetOrderListStep(&quot;getOrderListStep5&quot;, &quot;store5&quot;);
}
private Step createGetOrderListStep(String stepName, String targetStore) {
return stepBuilderFactory.get(stepName)
.&lt;InputType, OutputType&gt;chunk(5)
.reader(orderListReader(targetStore))
.processor(orderListProcessor())
.writer(orderListWriter())
.build();
}

huangapple
  • 本文由 发表于 2023年5月6日 16:34:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76187924.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定