如何使 Spring Batch 步骤执行并行,可配置线程数?

huangapple go评论78阅读模式
英文:

How to make spring batch step execution parallel with configurable thread count?

问题

以下是您提供的Spring Batch应用程序的翻译部分:

SpringBatchApplication.java

package com.spbt.job.sample;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBatchApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBatchApplication.class, args);
    }
}

TraverseJob.java

package com.spbt.job.sample;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class TraverseJob {

    @Autowired
    protected JobBuilderFactory jobBuilderFactory;

    @Autowired
    protected StepBuilderFactory stepBuilderFactory;

    private String inputFolderPath = "/tmp/inputFolder";

    @Bean("TraverseJob")
    public Job job() {
        return jobBuilderFactory.get("TraverseJob")
                .incrementer(new RunIdIncrementer())
                .start(traverseStep())
                .build();
    }

    @Bean("TraverseStep")
    public Step traverseStep() {
        return stepBuilderFactory.get("TraverseStep")
                .tasklet(traverseJobTasklet(null))
                .build();
    }

    @Bean("TraverseJobTasklet")
    @StepScope
    public TraverseJobTasklet traverseJobTasklet(@Value("#{jobParameters[date]}") String date) {
        TraverseJobTasklet tasklet = new TraverseJobTasklet();

        tasklet.setJobDate(date);
        tasklet.setJobDirPath(inputFolderPath);

        return tasklet;
    }
}

TraverseJobTasklet.java

package com.spbt.job.sample;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.File;

public class TraverseJobTasklet implements Tasklet {

    private String jobDirPath;
    private String jobDate;

    @Autowired
    private RemoteFilePush remoteFilePush;

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
        try {
            traverseDir(new File(jobDirPath));
        } catch (Exception ex) {
            throw ex;
        }
        return RepeatStatus.FINISHED;
    }

    private void traverseDir(File filePath) throws Exception {
        try {
            File[] files = filePath.listFiles();
            if (files != null) {
                for (File file : files) {
                    String name = file.getName();
                    if (file.isDirectory()) {
                        if (remoteFilePush.isRemoteDirExist(name)) {
                            continue;
                        } else {
                            remoteFilePush.createRemoteDir(name);
                            traverseDir(file);
                        }
                    } else {
                        remoteFilePush.pushFile(file.getPath());
                    }
                }
            } else {
                throw new Exception("empty/null dir -> " + filePath.getName());
            }
        } catch (Exception ex) {
            throw ex;
        }
    }

    public String getJobDirPath() {
        return jobDirPath;
    }

    public void setJobDirPath(String jobDirPath) {
        this.jobDirPath = jobDirPath;
    }

    public String getJobDate() {
        return jobDate;
    }

    public void setJobDate(String jobDate) {
        this.jobDate = jobDate;
    }
}

RemoteFilePushLogic.java

package com.spbt.job.sample;

import org.springframework.stereotype.Component;

@Component
public class RemoteFilePush {

    public boolean isRemoteDirExist(String name) throws InterruptedException {
        boolean isRemoteDirExist = false;
        // code to check dir on remote server
        return isRemoteDirExist;
    }

    public void createRemoteDir(String name) throws InterruptedException {
        // code to create dir on remote server
    }

    public void pushFile(String path) throws InterruptedException {
        // code to push file on remote server
        System.out.println("Pushed");
    }
}

如果您需要继续翻译后续部分,请继续提供相应内容。

英文:

I am having following spring-batch application

SpringBatchApplication.java

package com.spbt.job.sample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBatchApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchApplication.class, args);
}
}

TraverseJob.java

package com.spbt.job.sample;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class TraverseJob {
@Autowired
protected JobBuilderFactory jobBuilderFactory;
@Autowired
protected StepBuilderFactory stepBuilderFactory;
private String inputFolderPath = "/tmp/inputFolder";
@Bean("TraverseJob")
public Job job() {
return jobBuilderFactory.get("TraverseJob")
.incrementer(new RunIdIncrementer())
.start(traverseStep())
.build();
}
@Bean("TraverseStep")
public Step traverseStep() {
return stepBuilderFactory.get("TraverseStep")
.tasklet(traverseJobTasklet(null))
.build();
}
@Bean("TraverseJobTasklet")
@StepScope
public TraverseJobTasklet traverseJobTasklet(@Value("#{jobParameters[date]}") String date) {
TraverseJobTasklet tasklet = new TraverseJobTasklet();
tasklet.setJobDate(date);
tasklet.setJobDirPath(inputFolderPath);
return tasklet;
}
}

TraverseJobTasklet.java

package com.spbt.job.sample;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.File;
public class TraverseJobTasklet implements Tasklet {
private String jobDirPath;
private String jobDate;
@Autowired
private RemoteFilePush remoteFilePush;
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
try {
traverseDir(new File(jobDirPath));
} catch (Exception ex) {
throw ex;
}
return RepeatStatus.FINISHED;
}
private void traverseDir(File filePath) throws Exception {
try {
File[] files = filePath.listFiles();
if (files != null) {
for (File file : files) {
String name = file.getName();
if (file.isDirectory()) {
if (remoteFilePush.isRemoteDirExist(name)) {
continue;
} else {
remoteFilePush.createRemoteDir(name);
traverseDir(file);
}
} else {
remoteFilePush.pushFile(file.getPath());
}
}
} else {
throw new Exception("empty/null dir -> " + filePath.getName());
}
} catch (Exception ex) {
throw ex;
}
}
public String getJobDirPath() {
return jobDirPath;
}
public void setJobDirPath(String jobDirPath) {
this.jobDirPath = jobDirPath;
}
public String getJobDate() {
return jobDate;
}
public void setJobDate(String jobDate) {
this.jobDate = jobDate;
}
}

RemoteFilePushLogic.java

package com.spbt.job.sample;
import org.springframework.stereotype.Component;
@Component
public class RemoteFilePush {
public boolean isRemoteDirExist(String name) throws InterruptedException {
boolean isRemoteDirExist = false;
// code to check dir on remote server
return isRemoteDirExist;
}
public void createRemoteDir(String name) throws InterruptedException {
// code to create dir on remote server
}
public void pushFile(String path) throws InterruptedException {
// code to push file on remote server
System.out.println("Pushed");
}
}

I want to do parallel traversal and execution in traverseDir method in TraverseJobTasklet, by keeping my RemoteFilePush Logic intact, my inputFolderPath can contain multiple child directories each of which contains some files in it.

I have tried to follow link for spring-batch version which I am using, But its xml based and I do not seem to get how can I create multiple step out of single traverseStep I have?

答案1

得分: 1

以下是翻译好的代码部分:

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
@EnableBatchProcessing
public class PartitionJobSample {

    private final JobBuilderFactory jobs;
    private final StepBuilderFactory steps;

    public PartitionJobSample(JobBuilderFactory jobs, StepBuilderFactory steps) {
        this.jobs = jobs;
        this.steps = steps;
    }

    @Bean
    public Step managerStep() {
        return steps.get("masterStep")
                .partitioner(workerStep().getName(), partitioner(null))
                .step(workerStep())
                .gridSize(4)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public SimpleAsyncTaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();// TODO useful for testing, use a more robust task executor in production
    }

    @Bean
    @StepScope
    public Partitioner partitioner(@Value("#{jobParameters['rootFolder']}") String rootFolder) {
        List<String> subFolders = getSubFolders(rootFolder);
        return new Partitioner() {
            @Override
            public Map<String, ExecutionContext> partition(int gridSize) {
                Map<String, ExecutionContext> map = new HashMap<>(gridSize);
                for (String folder : subFolders) {
                    ExecutionContext executionContext = new ExecutionContext();
                    executionContext.put("filePath", folder);
                    map.put("partition-for-" + folder, executionContext);
                }
                return map;
            }
        };
    }

    private List<String> getSubFolders(String rootFolder) {
        // TODO implement this
        return Arrays.asList("/data/folder1", "/data/folder2");
    }

    @Bean
    public Step workerStep() {
        return steps.get("workerStep")
                .tasklet(getTasklet(null))
                .build();
    }

    @Bean
    @StepScope
    public Tasklet getTasklet(@Value("#{stepExecutionContext['filePath']}") String filePath) {
        return new TraverseJobTasklet(filePath);
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(managerStep())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(PartitionJobSample.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("rootFolder", "/data")
                .toJobParameters();
        jobLauncher.run(job, jobParameters);
    }

    class TraverseJobTasklet implements Tasklet {

        private String filePath;

        public TraverseJobTasklet(String filePath) {
            this.filePath = filePath;
        }

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            // TODO call traversePath for filePath which is a sub-folder here
            System.out.println(Thread.currentThread().getName() + " processing sub-folder " + filePath);
            return RepeatStatus.FINISHED;
        }
    }

}

如果您运行这个代码,您应该会看到类似以下的输出:

SimpleAsyncTaskExecutor-2 processing sub-folder /data/folder1
SimpleAsyncTaskExecutor-1 processing sub-folder /data/folder2

根据您的情况进行适当的调整。

英文:

> input a sub-folder string path per worker step is where i am hitting wall with spring code, if you can point me to some ref. it will be helpful, most of the example on net is xml based.

Here is a quick self-contained example with Java config:

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
@EnableBatchProcessing
public class PartitionJobSample {
private final JobBuilderFactory jobs;
private final StepBuilderFactory steps;
public PartitionJobSample(JobBuilderFactory jobs, StepBuilderFactory steps) {
this.jobs = jobs;
this.steps = steps;
}
@Bean
public Step managerStep() {
return steps.get(&quot;masterStep&quot;)
.partitioner(workerStep().getName(), partitioner(null))
.step(workerStep())
.gridSize(4)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public SimpleAsyncTaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();// TODO useful for testing, use a more robust task executor in production
}
@Bean
@StepScope
public Partitioner partitioner(@Value(&quot;#{jobParameters[&#39;rootFolder&#39;]}&quot;) String rootFolder) {
List&lt;String&gt; subFolders = getSubFolders(rootFolder);
return new Partitioner() {
@Override
public Map&lt;String, ExecutionContext&gt; partition(int gridSize) {
Map&lt;String, ExecutionContext&gt; map = new HashMap&lt;&gt;(gridSize);
for (String folder : subFolders) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.put(&quot;filePath&quot;, folder);
map.put(&quot;partition-for-&quot; + folder, executionContext);
}
return map;
}
};
}
private List&lt;String&gt; getSubFolders(String rootFolder) {
// TODO implement this
return Arrays.asList(&quot;/data/folder1&quot;, &quot;/data/folder2&quot;);
}
@Bean
public Step workerStep() {
return steps.get(&quot;workerStep&quot;)
.tasklet(getTasklet(null))
.build();
}
@Bean
@StepScope
public Tasklet getTasklet(@Value(&quot;#{stepExecutionContext[&#39;filePath&#39;]}&quot;) String filePath) {
return new TraverseJobTasklet(filePath);
}
@Bean
public Job job() {
return jobs.get(&quot;job&quot;)
.start(managerStep())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(PartitionJobSample.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
JobParameters jobParameters = new JobParametersBuilder()
.addString(&quot;rootFolder&quot;, &quot;/data&quot;)
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
class TraverseJobTasklet implements Tasklet {
private String filePath;
public TraverseJobTasklet(String filePath) {
this.filePath = filePath;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// TODO call traversePath for filePath which is a sub-folder here
System.out.println(Thread.currentThread().getName() + &quot; processing sub-folder &quot; + filePath);
return RepeatStatus.FINISHED;
}
}
}

It passes the root directory as a job parameter and executes a partitioned step where each worker processes a sub-folder (calling your tasklet).

If you run it, you should see something like:

SimpleAsyncTaskExecutor-2 processing sub-folder /data/folder1
SimpleAsyncTaskExecutor-1 processing sub-folder /data/folder2

I will let you adapt it to your situation accordingly.

huangapple
  • 本文由 发表于 2020年10月8日 10:34:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/64254978.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定