Multithread fast BufferedReader Java

huangapple go评论72阅读模式
英文:

Multithread fast BufferedReader Java

问题

你好,遇到了一个问题。
我将尝试以一般性的术语来描述它。
我需要尽快从文件中读取数据,这个文件是 .csv 格式的,并对数据进行处理。
在处理方面没有问题。但如何在读取时实现多线程呢?
我看到有几个选项。
数据的顺序并不重要。

  1. 将文件分成几个部分,同时读取。
  2. 我听说可以使用 BufferReader 并进行同步,但我找不到一个可用的示例。

我的代码

Instant start = Instant.now();
int i = 0;
try (BufferedReader reader = new BufferedReader(new FileReader("lng.csv"))) {
    String line = reader.readLine();
    while (line != null && i < 150000) {
        System.out.println(i + ") " + line);
        // 读取下一行
        line = reader.readLine();

        // 我的数据处理
        if (verifyLine(line)) {
            groupAdder(line);
        }
        else{
            System.out.println("错误的行: "+ line);
        }
        i++;               
    }
} catch (IOException e) {
    e.printStackTrace();
}

Instant end = Instant.now();
System.out.println(Duration.between(start, end));

我会很高兴为您提供解决此问题的方法。
我也非常乐意为您展示代码示例。

英文:

Hello encountered a problem.
I will try to describe it in General terms.
I need to read data FROM the file as quickly as possible .csv and process them.
There are no problems with processing. But how to make multithreading in reading.
I see several options.
The order of data is not important.

  1. Split the file into several parts and read it at the same time.
  2. I've heard that you can make a BufferReader and sync it, but I couldn't find a working example.

My code

Instant start = Instant.now();
int i = 0;
try (BufferedReader reader = new BufferedReader(new FileReader(&quot;lng.csv&quot;))) {
    String line = reader.readLine();
    while (line != null &amp;&amp; i &lt; 150000) {
        System.out.println(i + &quot;) &quot; + line);
        // read next line
        line = reader.readLine();

        //my data processing
        if (verifyLine(line)) {
            groupAdder(line);
        }
        else{
            System.out.println(&quot;Wrong line: &quot;+ line);
        }
        i++;               
    }
} catch (IOException e) {
    e.printStackTrace();
}

Instant end = Instant.now();
System.out.println(Duration.between(start, end));

I will be happy with your solutions to this problem.
I will also be very happy to look at the code examples

答案1

得分: 1

# 总体思路

一个线程读取整个文件并将处理任务提交给线程池每个提交到线程池的任务都会独立且并行地进行处理

# 可能的实现

`LineProcessingTask` 类负责处理一行

```java
public class LineProcessingTask implements Runnable {

	public static boolean verifyLine(String line) {
		return false; // 使用你的实现
	}
	
	public static void groupAdder(String s) { 
        // 使用你的实现
	}
	
	String s;
	
	public LineProcessingTask(String line) {
		s = line;
	}
	
	@Override
	public void run() {
		if (verifyLine(s)) {
			groupAdder(s);
		}
	}
}

主要方法:

public static void main(String [] args) {
	// 创建一个执行器服务,用于提交任务
	final int PARALLELISM = 4;
	ExecutorService pool = Executors.newFixedThreadPool(PARALLELISM);
	
	// 将每一行作为一个处理任务提交
	int i = 0;
	try (BufferedReader reader = new BufferedReader(new FileReader("lng.csv"))) {
	    String line = reader.readLine();
	    while (line != null && i < 150000) {
	        System.out.println(i + ") " + line);
	        pool.execute(new LineProcessingTask(line));
	        line = reader.readLine(); // 用于下一次迭代
	    }
	} catch (IOException e) {
		e.printStackTrace();
	}
		
	// 等待所有任务完成
	try {
		pool.awaitTermination(60l, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
		System.err.println("在分配的时间内并未完成所有任务");
		return;
	}
	
	// 程序的其余部分(处理行之后)
}

注意

根据处理的计算量大小,您可能不会体验到显著的加速。如果单个任务非常小,它们可能会被迅速完成,以至于读取文件的线程无法快速提交足够多的任务来保持 ExecutorService 中的所有线程都保持繁忙状态。这完全取决于瓶颈首先出现在哪里:是在从磁盘读取文件还是在处理读取的数据上?

您需要确保 groupAdder(String) 方法中的操作可以被多个线程并发地执行。要注意不要在该方法中创建任何瓶颈。


<details>
<summary>英文:</summary>

# General idea

One thread reads the entire file and submits &quot;processing tasks&quot; to a thread pool. Each task submitted to the thread pool is processed independently and in parallel. 

# A possible implementation

Class `LineProcessingTask` is in charge of processing one line. 

```java
public class LineProcessingTask implements Runnable {

	public static boolean verifyLine(String line) {
		return false; // Use your implementation
	}
	
	public static void groupAdder(String s) { 
        //Use your implementation
	}
	
	String s;
	
	public LineProcessingTask(String line) {
		s = line;
	}
	
	@Override
	public void run() {
		if (verifyLine(s)) {
			groupAdder(s);
		}
	}
}

Main method:

public static void main(String [] args) {
	// Create an executor service to which tasks will be submitted
	final int PARALLELISM = 4;
	ExecutorService pool = Executors.newFixedThreadPool(PARALLELISM);
	
	// Submit each line as a processing task
	int i = 0;
	try (BufferedReader reader = new BufferedReader(new FileReader(&quot;lng.csv&quot;))) {
	    String line = reader.readLine();
	    while (line != null &amp;&amp; i &lt; 150000) {
	        System.out.println(i + &quot;) &quot; + line);
	        pool.execute(new LineProcessingTask(line));
	        line = reader.readLine(); //For the next iteration
	    }
	} catch (IOException e) {
		e.printStackTrace();
	}
		
	// Wait for all the tasks to be finished
	try {
		pool.awaitTermination(60l, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
		System.err.println(&quot;All tasks did not complete in the allocated time&quot;);
		return;
	}
	
	//Rest of your program (after line processing)
}

Nota Bene

Depending on how computation-heavy your processing is, you may not experience significant speedups. If individual tasks are very small, they may get completed so fast that the thread that reads the file cannot submit tasks fast enough to keep all threads in the ExecutorService busy. It all depends on where your bottleneck is in the first place: was it reading the file from disk or processing the read data?

You need to make sure that what you do in method groupAdder(String) can be done by multiple threads concurrently. Be wary not to create any bottlenecks in that method.

huangapple
  • 本文由 发表于 2020年7月27日 12:10:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/63108592.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定