英文:
How to apply asynchronous calls to API with Pandas apply() function
问题
我有一个约14,000行的数据框,尝试通过调用API将一些数据填充到新列中。下面的代码检索了预期的响应,但似乎每次迭代都在等待响应后才继续下一行。
这是函数:
def market_sector_des(isin):
isin = '/isin/' + isin
return blp.bdp(tickers=isin, flds=['market_sector_des']).iloc[0]
我正在使用xbbg来调用Bloomberg API。
.apply()
函数返回了预期的响应:
df['new_column'] = df['ISIN'].apply(market_sector_des)
但每个响应大约需要2秒,对于14,000行来说大约需要8小时。
有没有办法使这个apply
函数异步,以便所有请求都可以并行发送?我已经看到Dask作为一种替代方法,但是在使用它时遇到了问题。
英文:
I have a ~14,000 row dataframe and attempting to fill in some data into a new column by calling an API. The code below retrieves the expected response, however, it seems each iteration waits for a response to go to the next row.
Here is the function:
def market_sector_des(isin):
isin = '/isin/' + isin
return blp.bdp(tickers = isin, flds = ['market_sector_des']).iloc[0]
I am using xbbg to call the Bloomberg API.
The .apply() function returns the expected response,
df['new_column'] = df['ISIN'].apply(market_sector_des)
but each response takes around 2 seconds, which at 14,000 lines is roughly 8 hours.
Is there any way to make this apply function asynchronous so that all requests are sent in parallel? I have seen dask as an alternative, however, I am running into issues using that as well.
答案1
得分: 1
如果以上正是您想要做的事情,那么可以通过创建一个包含要发送的股票代码语法的列,然后将该列作为一个系列通过blpapi传递来实现。
df['ISIN_NEW'] = '/isin/' + df['ISIN']
isin_new = pd.unique(df['ISIN_NEW'].dropna())
mktsec_df = blp.bdp(tickers=isin_new, flds=['market_sector_des'])
然后,您可以将新创建的df与现有的df连接,以便获得列中的数据。
newdf = pd.merge(df, mktsec_df, how='left', left_on='ISIN_NEW', right_index=True)
这应该会导致一次调用,理想情况下速度会降低到一分钟以下。如果这个方法有效,请告诉我。
英文:
If the above is exactly what you want to do, then it can be achieved by creating a column which contains the ticker syntax to be sent, and then pass that column as a series through blpapi
df['ISIN_NEW'] = '/isin/' + df['ISIN']
isin_new = pd.unique(df['ISIN_NEW'].dropna())
mktsec_df = blp.bdp(tickers = isin_new, flds = ['market_sector_des'])
You can then join the newly created df to your existing df, so that you get the figures in columns intact.
newdf = pd.merge(df, mktsec_df, how='left', left_on = 'ISIN_NEW', right_index = True )
This should result in a single call, which would ideally drop the speed to less than a minute. Do let me know if this works out.
答案2
得分: 0
以下是代码部分的翻译:
# 使用`multiprocessing`来并行执行API调用。将您的Series分成THREAD块,然后针对每个块运行一个进程:
`main.py`
```python
import multiprocessing as mp
import pandas as pd
import numpy as np
import parallel_tickers
THREADS = mp.cpu_count() - 1
# df = 您的数据框在这里
split = np.array_split(df['ISIN'], THREADS)
with mp.Pool(THREADS) as pool:
data = pool.map(proxy_func, split)
df['new_column'] = pd.concat(data)
parallel_tickers.py
import pandas as pd
from xbbg import blp
def market_sector_des(isin):
isin = '/isin/' + isin
return blp.bdp(tickers = isin, flds = ['market_sector_des']).iloc[0]
def proxy_func(sr):
return pd.Series([market_sector_des(isin) for isin in sr], index=sr.index)
EDIT: 使用另一个模块来进行mp函数
<details>
<summary>英文:</summary>
You can use `multiprocessing` to parallelize API calls. Divide your Series into THREAD chunks then run one process per chunk:
`main.py`
import multiprocessing as mp
import pandas as pd
import numpy as np
import parallel_tickers
THREADS = mp.cpu_count() - 1
df = your_dataframe_here
split = np.array_split(df['ISIN'], THREADS)
with mp.Pool(THREADS) as pool:
data = pool.map(proxy_func, split)
df['new_column'] = pd.concat(data)
`parallel_tickers.py`
import pandas as pd
from xbbg import blp
def market_sector_des(isin):
isin = '/isin/' + isin
return blp.bdp(tickers = isin, flds = ['market_sector_des']).iloc[0]
def proxy_func(sr):
return pd.Series([market_sector_des(isin) for isin in sr], index=sr.index)
EDIT: use another module for mp functions
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论