如何优化在Kotlin中进行的n个异步并发调用。

huangapple go评论76阅读模式
英文:

How to optimize n async concurrent calls made in Kotlin

问题

Here's the translation of the code:

Kotlin版本1.8.20-release-327 (JRE 17.0.7+7)

import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.net.URI
import kotlin.system.measureTimeMillis

import kotlinx.coroutines.*

fun main() = runBlocking {
    val timeElapsed = measureTimeMillis {
        val deferreds: List<Deferred<String>> = (1..7).map {
            async {
                val client = HttpClient.newBuilder().build()
                val request = HttpRequest.newBuilder()
                    .uri(URI.create("URL_WITH_INCREASING_PATH_PARAMETER/$it"))
                    .build()

                val response = client.send(request, HttpResponse.BodyHandlers.ofString())
                response.body()
            }
        }

        val data = deferreds.awaitAll()
        println("$data")
    }

    println("time elapsed $timeElapsed")
}

Python版本3.11.1

import time

import aiohttp
import asyncio

start_time = time.time()

async def get_data(session, url):
    async with session.get(url) as response:
        data = await response.json()
        return data

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for number in range(1, 7):
            url = f'URL_WITH_INCREASING_PATH_PARAMETER/{number}'
            tasks.append(asyncio.ensure_future(get_data(session, url)))
        data_list = await asyncio.gather(*tasks)
        print(data_list)

asyncio.run(main())
print('time elapsed %s' % (time.time() - start_time))
英文:

I have the following code in Kotlin and curious if there is a way to further optimize it compared to the Python implementation that I think is somewhat equivalent?

Kotlin version 1.8.20-release-327 (JRE 17.0.7+7

import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.net.URI
import kotlin.system.measureTimeMillis

import kotlinx.coroutines.*

fun main() = runBlocking {
    val timeElapsed = measureTimeMillis {
        val deferreds: List&lt;Deferred&lt;String&gt;&gt; = (1..7).map {
            async {
                val client = HttpClient.newBuilder().build();
                val request = HttpRequest.newBuilder()
                    .uri(URI.create(&quot;URL_WITH_INCREASING_PATH_PARAMETER/$it&quot;))
                    .build();

                val response = client.send(request, HttpResponse.BodyHandlers.ofString());
                response.body()
            }
        }

        val data = deferreds.awaitAll()
        println(&quot;$data&quot;)
    }

    println(&quot;time elapsed $timeElapsed&quot;)
}

// time elapsed 2126

Python version 3.11.1

import time

import aiohttp
import asyncio

start_time = time.time()

async def get_data(session, url):
    async with session.get(url) as response:
        data = await response.json()
        return data

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for number in range(1, 7):
            url = f&#39;URL_WITH_INCREASING_PATH_PARAMETER/{number}&#39;
            tasks.append(asyncio.ensure_future(get_data(session, url)))
        data_list = await asyncio.gather(*tasks)
        print(data_list)

asyncio.run(main())
print(&#39;time elapsed %s&#39; % (time.time() - start_time))

# time elapsed 0.7383

答案1

得分: 3

这段代码存在多个性能问题:

  1. 你正在使用 JDK 11 的 HttpClient.send() 方法,这是阻塞的。而且你在使用 runBlocking 时没有覆盖其调度程序,所以你得到的是一个单线程事件循环作为调度程序。这意味着在这里没有并行运行任何内容,请求将依次运行,阻塞着你使用的单个线程。

  2. 你为每个请求创建了一个全新的 HttpClient 实例。通常情况下,这是一个不好的做法,因为这些客户端管理了大量资源(如线程池),最好的做法是重用一个而不是创建多个。

首先,你可以将客户端的创建移到循环外部,以便重用同一个客户端。

然后,你可以使用 Dispatchers.IO 来运行这些阻塞的 IO 调用,如下所示:

fun main() = runBlocking(Dispatchers.IO) {
 ...
}

但这样使用了比必要更多的线程,因为很可能 HTTP 客户端已经有了自己的线程池。

更好的选择是使用 send 方法的实际异步变体:sendAsync。它返回一个 CompletableFuture,你可以通过使用 CompletableFuture.await() 以挂起的方式等待它:

fun main() = runBlocking {
    val client = HttpClient.newHttpClient();
    val timeElapsed = measureTimeMillis {
        val deferreds = List(7) {
            async {
                val request = HttpRequest.newBuilder()
                    .uri(URI.create("URL_WITH_INCREASING_PATH_PARAMETER/$it"))
                    .build()

                val response = client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).await()
                response.body()
            }
        }

        val data = deferreds.awaitAll()
        println("$data")
    }

    println("time elapsed $timeElapsed")
}

*注意:CompletableFuture.await() 扩展函数在版本小于 1.7.0 的情况下曾在 kotlinx-coroutines-jdk8 模块中。自从 1.7.0 版本开始,此扩展函数在核心协程模块中可用。

英文:

This is slow for multiple reasons:

  1. You're using the JDK 11's HttpClient.send() method, which is blocking. Also you're using runBlocking without overriding its dispatcher, so you're getting a single-threaded event-loop as dispatcher. This means that nothing runs in parallel here, the requests will be run sequentially, blocking turn by turn the single thread you're using.

  2. You're creating an entirely new HttpClient instance for every request. This is usually a bad idea, because these clients manage a bunch of resources (like thread pools), and it's better to reuse one rather than create many.

The first thing you can do is move the client creation out of the loop, so you reuse the same client.

Then you could use Dispatchers.IO to run these blocking IO calls like this:

fun main() = runBlocking(Dispatchers.IO) {
 ...
}

But that is using more threads than necessary, because most likely the HTTP client already has its own thread pool.

A better option would be to use the actual asynchronous variant of the send method: sendAsync. It returns a CompletableFuture that you can await in a suspending way by using CompletableFuture.await()*:

fun main() = runBlocking {
    val client = HttpClient.newHttpClient();
    val timeElapsed = measureTimeMillis {
        val deferreds = List(7) {
            async {
                val request = HttpRequest.newBuilder()
                    .uri(URI.create(&quot;URL_WITH_INCREASING_PATH_PARAMETER/$it&quot;))
                    .build()

                val response = client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).await()
                response.body()
            }
        }

        val data = deferreds.awaitAll()
        println(&quot;$data&quot;)
    }

    println(&quot;time elapsed $timeElapsed&quot;)
}

*Note: the CompletableFuture.await() extension function used to be in the kotlinx-coroutines-jdk8 module in versions < 1.7.0. Since 1.7.0, this extension is available in the core coroutines module.

huangapple
  • 本文由 发表于 2023年5月7日 06:27:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/76191444.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定