How thread lock should be handled to keep other threads waiting until downloading a file and then allow all threads to read the file in one go

huangapple go评论69阅读模式
英文:

How thread lock should be handled to keep other threads waiting until downloading a file and then allow all threads to read the file in one go

问题

我正在使用ExecutorService的fixedThreadPool()来运行一个任务

这里的任务被定义为从特定URL下载文件并将其保存到数据库(如果文件不存在),否则只从数据库中读取文件。因此,这更像是一个读写问题,其中执行线程池的任何线程都可以充当一次写入者,其他线程将充当后续请求的读取者。

我正在使用Semaphore来执行此操作,但这种方法的问题是后续的读取请求是按顺序发生的。

如果有4个任务打算访问相同的URL,我需要同步直到文件下载完成并释放信号量,即在4个线程中的任何一个都可以获取锁,其余3个正在等待。下载完成后,剩余的3个线程应同时读取下载的文件。但在我的情况下,这最后一步是按顺序进行的,这将影响项目性能。

在上述用例中,以下是我的示例代码:

以下的Runnable被传递给ExecutorService以在SharedObject类上执行任务。

class DownloadRunnable(SharedObjectMT sharedObject, String url) implement Runnable {
    void run() {
        sharedObject.loadFile(url);
    }
}
class SharedObjectMT {
    // 这个HashMap充当URL和信号量映射的ConcurrentHashMap。因此,
    // 多个请求相同URL的线程只会在其对应的信号量上同步。
    // 请求不同URL的线程将并发运行。

    private static HashMap<String, Semaphore> syncMap = new HashMap<>();
    ......

    void loadFile(String url) {

        // 让所有线程按顺序进入并尝试为其URL分配一个信号量。
        // 如果尚未请求URL,那么只有一个信号量(比如S1)将被分配给该URL。
        // 对于所有其他具有*相同请求URL*的线程,将使用这个信号量(S1)来处理并发。

        synchronized(syncMap) {
             if(syncMap[url] == null) {
                syncMap[url] = new Semaphore(1);
            }
        }

        Semaphore semaphore = syncMap[url];

        synchronized(semaphore) {
            ............
            ............
            semaphore.acquire();
            String filePath = findInDatabase(url);
            if(filePath != null) {
                semaphore.release(); // 既然文件已经下载,就不需要保持信号量。
                printStatus("已从URL下载文件 = "+url);
            } else {
                // 这个DownloadThread实际上是我的实际项目的模拟,其中第三方库使用一个线程来下载文件。
                DownloadThread(() -> {
                    printStatus("URL = "+ url +" 的下载已完成。释放信号量。");
                    semaphore.release();
                }).start();
                .............
                .............
            }
        }
    }
}

我知道单个Semaphore不能帮助我。也许我们可以使用1个额外的Semaphore来区分读取锁和写入锁,或者使用其他锁定机制。所以需要一些关于用于这种一次性同步的帮助。

注意:如果您在上述代码中发现任何语法错误,请忽略,因为实际项目是Kotlin,但这是一个基本的Java多线程问题,所以我以Java代码形式发布它。

英文:

I am using an ExecutorService fixedThreadPool() to run a TASK.

A TASK here is defined as downloading a file from a specific URL and saving it to the database if it doesn't exist or else read the file from the database only. So it's more like a reader-writer problem where any of the thread of executor thread pool can act as a writer for once and others will be a reader for the subsequent request.

I am using Semaphore to perform this but the issue with this approach is subsequent read requests are happening sequentially.

If 4 TASKs are intended to hit the same URL I needed the synchronization till the file is downloaded and the semaphore is released i.e. out of 4 threads anyone can acquire the lock and rest 3 are waiting. After the download completes all the remaining 3 threads should simultaneously read the downloaded file. But this last step is happening sequentially in my case which will have an impact on project performance as well.

Having said the above use case, the following is my sample code:

Following Runnable is passed to ExecutorService to execute the task on the SharedObject class.

 class DownloadRunnable(SharedObjectMT sharedObject, String url) implement Runnable {
    void run() {
        sharedObject.loadFile(url);
    }
 }
class SharedObjectMT {
    // This Hashmap acts ConcurrentHashMap with URL and semaphore mapping. So
        // multiple threads requesting for the same URL will only be synchronized on their
        // corresponding semaphore. And threads requesting for different URLs 
        // will run concurrently.

    private static HashMap&lt;String, Semaphore&gt; syncMap = new HashMap&lt;&gt;();
    .....
    void loadFile(String url) {
        
        // Let all threads enter sequentially and try to assign a Semaphore for their url in the 
        // hashmap. If the url has never been requested, then only a Semaphore(say S1) will be 
        // assigned to that url. And for all the other threads with *same request url*, this 
        // Semaphore(S1) will be used to handle concurrency.

        synchronized(syncMap) {
             if(syncMap
== null) { syncMap
= new Semaphore(1); } } Semaphore semaphore = syncMap
; synchronized(semaphore) { ............ ............ semaphore.acquire(); String filePath = findInDatabase(url); if(filePath != null) { semaphore.release(); // no need to hold semaphore since file already downloaded. printStatus(&quot;Already downloaded file from url = &quot;+url); } else { // This DownloadThread is actually a mock of my real project where a third-party // library uses a thread to download the file. DownloadThread(() -&gt; { printStatus(&quot;Download completed for url= &quot;+ url +&quot;. Releasing semaphore.&quot;); semaphore.release(); }).start(); ............. ............. } } } }

I know that a single Semaphore can't help me out. Maybe we can use 1 more Semaphore to distinguish between read and write lock or any other locking mechanism. So need some help on what to use for this type of one-time synchronization.

>Note: Please ignore if you find any syntax error in the above code since the actual project is in Kotlin but this is a basic Java multithreading problem so I posted it as a Java code.

答案1

得分: 0

这是Java代码示例,用于在ExecutorService中下载文件并使用Future获取结果路径。代码中使用了Callable来处理下载任务,然后通过synchronized块来确保Map中的Future对象被正确创建和获取。你可以通过调用loadPath方法来实现下载,并注意第一次下载会阻塞,但后续相同URL的调用会立即完成,因为文件已经下载。

在第一个示例中,作者使用了普通的HashMap进行管理,而在第二个示例中,作者使用了ConcurrentHashMapcomputeIfAbsent来更好地处理并发情况。

请问你需要对这些示例代码进行什么样的翻译或解释?

英文:

I am not sure about Kotlin, but I can demonstrate in Java:

import java.io.IOException;
import java.util.HashMap;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class DownloadOrRead {
        
    //Utility method, which just generates a random String instance...
    private static String randomString(final int length) {
        String alphabet = &quot;abcdefghijklmnopqrstuvwxyz&quot;;
        alphabet += alphabet.toUpperCase();
        alphabet += &quot;0123456789&quot;;
        final int alphabetSize = alphabet.length();
        final char[] chars = new char[length];
        final Random rand = new Random();
        for (int i = 0; i &lt; chars.length; ++i)
            chars[i] = alphabet.charAt(rand.nextInt(alphabetSize));
        return String.valueOf(chars);
    }
    
    public static class DownLoadCallable implements Callable&lt;String&gt; {
        private final String url;
        
        public DownLoadCallable(final String url) {
            this.url = Objects.requireNonNull(url);
        }
        
        @Override
        public String call() throws IOException, InterruptedException {
            
            /*Utilize url property here to download the file...
            In our case, just simulate a download delay supposedly...*/
            Thread.sleep(5000L + (long) (Math.random() * 10000L));
            
            //Return the file&#39;s local path...
            return randomString(20); //In our case, a random String of 20 characters.
        }
    }
    
    //This is the method you are looking for:
    public static String loadPath(final ExecutorService executorService, //Can be shared between calls of loadPath...
                                  final HashMap&lt;String, Future&lt;String&gt;&gt; urlToFuture, //MUST be shared between calls of loadPath!
                                  final String url) //The URL. Can be the same as a URL in a previous call of loadPath.
            throws InterruptedException, ExecutionException {
        final Future&lt;String&gt; future;
        synchronized (urlToFuture) {
            if (!urlToFuture.containsKey(url)) //If nowhere to be seen...
                urlToFuture.put(url, executorService.submit(new DownLoadCallable(url))); //Create a Future...
            future = urlToFuture.get(url); //Obtain the Future (new or old).
        }
        return future.get(); //Outside the synchronized block!
    }
    
    public static void main(final String[] args) {
        
        System.out.println(&quot;Creating ExecutorService...&quot;);
        final ExecutorService executorService = Executors.newFixedThreadPool(10);
        
        System.out.println(&quot;Creating shared map...&quot;);
        final HashMap&lt;String, Future&lt;String&gt;&gt; urlToFuture = new HashMap&lt;&gt;();
        
        System.out.println(&quot;Creating random URLs...&quot;);
        final String[] urls = new String[]{randomString(10), randomString(20), randomString(15)};
        
        try {
            System.out.println(&quot;Downloading files sequencially...&quot;);
            final Random rand = new Random();
            for (int i = 0; i &lt; 100; ++i)
                System.out.println(loadPath(executorService, urlToFuture, urls[rand.nextInt(urls.length)]));
            
            executorService.shutdown();
            executorService.awaitTermination(10, TimeUnit.MINUTES);
        }
        catch (final InterruptedException | ExecutionException x) {
            System.err.println(x);
        }
    }
}

The whole idea is to submit Callables to the ExecutorService which handle the downloading. We also utilize the Futures returned by the submit method to get the desired result path/file/anything. Just call get on the desired Future object and there you are. The only thing you need to synchronize on is the Map of URLs to Futures.

You will notice, when running this test program, that the first file is blocking until downloaded, then subsequent calls for the same URL are finished immediately (because the URL is already downloaded) and we only block for each new URL (which is not downloaded yet). In this case I am using only 3 random URLs with each URL needing 5 to 15 seconds to complete, which gives us about 15 to 45 seconds uptime, because we download them sequencially.

That concludes the loadPath method. But in the sample code above, the files are downloaded sequencially. If you need multiple Threads also for downloading you may call loadPath from many Threads (without the need of further synchronization somewhere else than the shared Map).

As one can read in this answer here, seems like invoking the get method of the same Future after the operation is complete, will always yield the same object or throw the same Exception if failed. This is something we use to our advantage on this provided code in this post.

Edit 1:

Or even better, as pointed out by @drekbour in the comments, make use of computeIfAbsent and a ConcurrentHashMap for the job, like so:

import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class DownloadOrRead1 {
    
    //Utility method, which just generates a random String instance...
    private static String randomString(final int length) {
        String alphabet = &quot;abcdefghijklmnopqrstuvwxyz&quot;;
        alphabet += alphabet.toUpperCase();
        alphabet += &quot;0123456789&quot;;
        final int alphabetSize = alphabet.length();
        final char[] chars = new char[length];
        final Random rand = new Random();
        for (int i = 0; i &lt; chars.length; ++i)
            chars[i] = alphabet.charAt(rand.nextInt(alphabetSize));
        return String.valueOf(chars);
    }
    
    public static class DownLoadCallable implements Callable&lt;String&gt; {
        private final String url;
        
        public DownLoadCallable(final String url) {
            this.url = Objects.requireNonNull(url);
        }
        
        @Override
        public String call() throws InterruptedException {
            
            System.out.println(&quot;Downloading &quot; + url + &quot;...&quot;);
            
            /*Utilize url property here to download the file...
            In our case, just simulate a download delay supposedly...*/
            Thread.sleep(5000L + (long) (Math.random() * 10000L));
            
            System.out.println(&quot;Downloaded &quot; + url + &#39;.&#39;);
            
            //Return the file&#39;s local path...
            return randomString(20); //In our case, a random String of 20 characters.
        }
    }
    
    //This is the method you are looking for:
    public static String loadPath(final ExecutorService executorService, //Can be shared between calls of loadPath...
                                  final ConcurrentHashMap&lt;String, Future&lt;String&gt;&gt; urlToFuture, //MUST be shared between calls of loadPath!
                                  final String url) //The URL. Can be the same as a URL in a previous call of loadPath.
            throws InterruptedException, ExecutionException {
        return urlToFuture.computeIfAbsent(url, url2 -&gt; executorService.submit(new DownLoadCallable(url2))).get();
    }
    
    public static void main(final String[] args) {
        
        System.out.println(&quot;Creating ExecutorService...&quot;);
        final ExecutorService executorService = Executors.newFixedThreadPool(10);
        
        System.out.println(&quot;Creating shared Map...&quot;);
        final ConcurrentHashMap&lt;String, Future&lt;String&gt;&gt; urlToFuture = new ConcurrentHashMap&lt;&gt;();
        
        System.out.println(&quot;Creating random URLs...&quot;);
        final String[] urls = new String[]{randomString(10), randomString(10), randomString(10)};
        
        try {
            System.out.println(&quot;Downloading files sequencially...&quot;);
            final Random rand = new Random();
            for (int i = 0; i &lt; 100; ++i) {
                final String url = urls[rand.nextInt(urls.length)];
                System.out.println(&quot;Path for &quot; + url + &quot;: &quot; + loadPath(executorService, urlToFuture, url));
            }
            
            executorService.shutdown();
            executorService.awaitTermination(10, TimeUnit.MINUTES);
        }
        catch (final InterruptedException | ExecutionException x) {
            System.err.println(x);
        }
    }
}

huangapple
  • 本文由 发表于 2020年8月14日 03:10:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/63401768.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定