如何在一个迭代中异步调用5个随机API,然后重复n次。

huangapple go评论138阅读模式
英文:

How do I asynchronously call 5 random APIs in one iteration and then repeat n iterations

问题

基本上,我在我的代码中遇到了一个问题,我怀疑我实际上是在并发运行它,而不是以并行方式运行。我的结果问题在于执行一次迭代(调用5个API)平均需要1秒钟,而当调用一个API时只需要0.2秒。因此,如果是并行的话,一次迭代也应该大致需要0.2秒。简而言之,似乎我是按顺序运行它。

import concurrent.futures 
import requests 
import random
import pandas as pd
import time

# 用于确定API响应有效性的函数
def call_api(url):
    try:
        response = requests.get(url)
        return response.status_code == 200
    except requests.exceptions.RequestException:
        return False

# 从数据集加载DataFrame
df=pd.read_csv(r"C:\Users\Jose.Moquiambo\Bulk Calling APIs\confidential-sales.csv")

# 迭代次数
num_iterations = 100

# 创建一个空的DataFrame来存储结果
results_df = pd.DataFrame(columns=['Iteration', 'Pass', 'Time Taken'])

# 用于跟踪最小、最大和总时间的变量
min_time = float('inf')
max_time = float('-inf')
total_time = 0

def process_iteration(iteration):
    # 从DataFrame中随机选择一些URL
    random_urls = df['url_column'].sample(n=5).tolist() 

    # 开始计时
    start_time = time.time()

    # 使用ThreadPoolExecutor并发执行API调用
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        results = executor.map(call_api, random_urls)

    # 停止计时
    end_time = time.time()

    # 计算此迭代的时间
    iteration_time = end_time - start_time

    # 更新最小、最大和总时间
    global min_time, max_time, total_time
    min_time = min(min_time, iteration_time)
    max_time = max(max_time, iteration_time)
    total_time += iteration_time

    # 检查此迭代中是否有任何API调用不成功
    passed = 'Y' if all(results) else 'N'

    # 将迭代结果添加到DataFrame中
    results_df.loc[iteration] = [iteration, passed, iteration_time]

# 运行迭代
for i in range(1, num_iterations + 1):
    process_iteration(i)

# 计算每次迭代的平均时间
avg_time = total_time / num_iterations

# 显示结果DataFrame
print(results_df)

# 摘要统计信息
print("最小时间:", min_time)
print("最大时间:", max_time)
print("每次迭代的平均时间:", avg_time)
print("Y代表无错误响应,N代表无效响应")

输出结果:

           Iteration Pass  Time Taken
1            1    Y    1.027692
2            2    Y    1.105409
3            3    Y    0.998195
4            4    Y    1.046251
5            5    Y    1.083588
..         ...  ...         ...
96          96    Y    1.119467
97          97    Y    1.109750
98          98    Y    1.025725
99          99    Y    1.115403
100        100    Y    1.114420
[100 rows x 3 columns]
最小时间: 0.9663488864898682
最大时间: 1.529128074645996
每次迭代的平均时间: 1.083704767227173
Y代表无错误响应,N代表错误

输出以DataFrame的形式呈现,您可以清楚地看到每次迭代平均需要1秒钟,这是不正确的,因为按理应该大致为0.2秒,正如我所检查的。因此,它是按顺序运行的,而不是以并行方式运行。以下是我测试经过时间的示例:

response = requests.get("https://example.api.com")
print(response.elapsed.total_seconds())

输出:

0.274109
英文:

Basically, I have a problem with my code where I suspect that I'm actually running it concurrently but not in a parallel method. The problem with my results is that performing one iteration (calls 5 API ) takes on average 1 second which doesn't make sense when calling one API takes only 0.2 seconds. Hence if it was parallel, one iteration should also take roughly 0.2 secs. In short, it seems like I'm running it sequentially.

import concurrent.futures 
import requests 
import random
import pandas as pd
import time
#Function to determine validity of the API response 
def call_api(url):
try:
response = requests.get(url)
return response.status_code == 200
except requests.exceptions.RequestException:
return False
# Load the DataFrame from your dataset
df=pd.read_csv(r"C:\Users\Jose.Moquiambo\Bulk Calling 
APIs\confidential-sales.csv")  
# Number of iterations
num_iterations = 100
# Create an empty DataFrame to store the results
results_df = pd.DataFrame(columns=['Iteration', 'Pass', 'Time Taken'])
# Variables for tracking min, max, and total time
min_time = float('inf')
max_time = float('-inf')
total_time = 0
def process_iteration(iteration):
# Get a random sample of URLs from the DataFrame
random_urls = df['url_column'].sample(n=5).tolist() 
# Start timer
start_time = time.time()
# Execute API calls concurrently using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=5)as 
executor:
results = executor.map(call_api, random_urls)
# Stop timer
end_time = time.time()
# Calculate the time taken for this iteration
iteration_time = end_time - start_time
# Update min, max, and total time
global min_time, max_time, total_time
min_time = min(min_time, iteration_time)
max_time = max(max_time, iteration_time)
total_time += iteration_time
# Check if any API call was not successful in this iteration
passed = 'Y' if all(results) else 'N'
# Add the iteration results to the DataFrame
results_df.loc[iteration] = [iteration, passed , iteration_time]
# Run the iterations
for i in range(1, num_iterations + 1):
process_iteration(i)
# Calculate average time per iteration
avg_time = total_time / num_iterations
# Display the results DataFrame
print(results_df)
# Summary statistics
print("Minimum time taken:", min_time)
print("Maximum time taken:", max_time)
print("Average time per iteration:", avg_time)
print("Y stands for error-free response and N for invalid response")

Output

       Iteration Pass  Time Taken
1            1    Y    1.027692
2            2    Y    1.105409
3            3    Y    0.998195
4            4    Y    1.046251
5            5    Y    1.083588
..         ...  ...         ...
96          96    Y    1.119467
97          97    Y    1.109750
98          98    Y    1.025725
99          99    Y    1.115403
100        100    Y    1.114420
[100 rows x 3 columns]
Minimum time taken: 0.9663488864898682
Maximum time taken: 1.529128074645996
Average time per iteration: 1.083704767227173
Y stands for error-free response and N for errors

The output is in the form of a dataframe and you can clearly see that each iteration uses up on average 1 second which is not correct when it should be roughly 0.2 secs as I checked.Therefore, it was running sequentially and not in a parallel method. Here's an example of me testing out the elapsed time.

response = requests.get("https://example.api.com")
print(response.elapsed.total_seconds())

Output:

0.274109

答案1

得分: 2

使用线程池和其map_async功能,您可以期望在您的情况下获得良好的性能。

由于无法访问原始数据,这个答案显然是虚构的,但它作为一个有价值的模式。

这个完整的程序使用一个包含每行一个URL的平面文件。使用的测试文件包含了从http://www.testingmcafeesites.com/index.html派生的108个不同的URL。

import requests
from multiprocessing.pool import ThreadPool
from sys import stderr
from time import perf_counter

SAMPLES = '/Volumes/G-Drive/samples.txt'

def process(url: str) -> None:
    try:
        with requests.get(url) as response:
            response.raise_for_status()
            print(url.split('/')[-1], response.elapsed.total_seconds())
    except Exception as e:
        print(e, file=stderr)

def get_urls() -> list[str]:
    try:
        with open(SAMPLES) as samples:
            return list(map(str.rstrip, samples))
    except Exception as e:
        print(e, file=stderr)
    return []

def main(urls: list[str]) -> None:
    start = perf_counter()
    with ThreadPool() as pool:
        pool.map_async(process, urls).wait()
    print(f'Duration={perf_counter()-start:.2f}s')

if __name__ == '__main__':
    main(get_urls())

输出:

testcat_hm.html 0.300456
testcat_io.html 0.300651
testcat_cm.html 0.308631
...
testcat_we.html 0.335705
testcat_wp.html 0.335095
testrep_red.html 0.349888
Duration=2.36s

结论:

当您考虑到有108个GET请求,每个请求需要约0.3秒,但总持续时间小于2.5秒时,并行性的程度是显而易见的。

英文:

Using a ThreadPool and its map_async functionality, you can expect good performance for your scenario.

Without access to OP's data this answer is obviously contrived but serves as a valuable pattern.

This complete program uses a flat file that contains a URL on each of its lines. The test file used has 108 different URLs that have been derived from http://www.testingmcafeesites.com/index.html

import requests
from multiprocessing.pool import ThreadPool
from sys import stderr
from time import perf_counter
SAMPLES = '/Volumes/G-Drive/samples.txt'
def process(url: str) -> None:
try:
with requests.get(url) as response:
response.raise_for_status()
print(url.split('/')[-1], response.elapsed.total_seconds())
except Exception as e:
print(e, file=stderr)
def get_urls() -> list[str]:
try:
with open(SAMPLES) as samples:
return list(map(str.rstrip, samples))
except Exception as e:
print(e, file=stderr)
return []
def main(urls: list[str]) -> None:
start = perf_counter()
with ThreadPool() as pool:
pool.map_async(process, urls).wait()
print(f'Duration={perf_counter()-start:.2f}s')
if __name__ == '__main__':
main(get_urls())

Output:

testcat_hm.html 0.300456
testcat_io.html 0.300651
testcat_cm.html 0.308631
...
testcat_we.html 0.335705
testcat_wp.html 0.335095
testrep_red.html 0.349888
Duration=2.36s

Conclusion:

The extent of parallelism is obvious when you consider that there have been 108 GET requests each taking ~0.3s but an overall duration of <2.5s

答案2

得分: 0

The requests library itself does not provide non-blocking IO as per documentation.

But I suggest installing and using requests-futures, the syntax is simple:

from concurrent.futures import as_completed
from pprint import pprint
from requests_futures.sessions import FuturesSession

session = FuturesSession()

futures=[session.get(f'http://httpbin.org/get?{i}') for i in range(3)]

for future in as_completed(futures):
    resp = future.result()
    pprint({
        'url': resp.request.url,
        'content': resp.json(),
    })

And you'll be able to gain speed from I/O blocking operations.

If you still find the execution too slow, then you may need to split your execution into multiple processes via the multiprocessing package. Its use comes with higher restrictions, but does provide parallelism outside of the GIL (Global Interpreter Lock).

英文:

The requests library itself does not provide non-blocking IO as per documentation.

But I suggest installing and using requests-futures, the syntax is simple:

from concurrent.futures import as_completed
from pprint import pprint
from requests_futures.sessions import FuturesSession

session = FuturesSession()

futures=[session.get(f&#39;http://httpbin.org/get?{i}&#39;) for i in range(3)]

for future in as_completed(futures):
    resp = future.result()
    pprint({
        &#39;url&#39;: resp.request.url,
        &#39;content&#39;: resp.json(),
    })

And you'll be able to gain speed from I/O blocking operations.

If you still find the execution too slow, then you may need to split your execution into multiple processes via the multiprocessing package. Its use comes with higher restrictions, but does provide parallelism outside of the GIL (Global Interpreter Lock).

huangapple
  • 本文由 发表于 2023年6月12日 12:19:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/76453633.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定