如何基于repast4py中另一个等级的代理创建幽灵代理?

huangapple go评论106阅读模式
英文:

How to create ghost agents based on agents from another rank in repast4py?

问题

我正在尝试创建一个OrderBook代理,可以在所有排名之间共享。我可以使用mpi4py发送和接收操作共享代理。但基于文档这里,我假设 self.context.synchronize(restore_orderbook) 会在每个排名中创建幽灵代理 但程序显示 KeyError: 0。即使在使用 request_agents 函数之后,排名为0的代理也没有被复制到其他排名作为幽灵代理。

预期输出:

订单簿在排名0上创建,然后使用在排名0上创建的订单簿在排名1、2、3上创建幽灵副本。根据step函数中的if else逻辑打印订单。

test2.py

from typing import Dict, Tuple
from mpi4py import MPI
import numpy as np
from dataclasses import dataclass

from repast4py import core, random, space, schedule, logging, parameters
from repast4py import context as ctx
import repast4py
from queue import Queue
import pandas as pd
from datetime import datetime, timedelta
import random as rnd

# ...(其余代码未翻译)

test.yaml

random.seed: 42
stop.at: 100
orders.count: 1000

运行命令

mpirun -n 4 python test2.py test.yaml

我认为幽灵代理没有使用 self.context.request_agents(requests, create_agent) 正确创建。这个用法正确吗?

错误:

Traceback (most recent call last):
  File "test2.py", line 136, in <module>
    run(params)
  File "test2.py", line 129, in run
    model.start()
  File "test2.py", line 124, in start
    self.runner.execute()    
  File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 584, in execute
    self.schedule.execute()
  File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 406, in execute
    self.executing_group.execute(self.queue)
  File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 243, in execute
    interrupted = self.execute_evts(self.prioritized, queue)
  File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 223, in execute_evts
    evt()
  File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 60, in __call__
    self.evt()
  File "test2.py", line 108, in step
    for b in self.context.agents(OrderBook.TYPE):
  File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/context.py", line 358, in agents
    return self._agents_by_type[agent_type].values().__iter__()
KeyError: 0

编辑: 可行解决方案

我替换了

for b in self.context.agents(OrderBook.TYPE):
    order = b.get_order(tick)
    print(self.rank, order)

b  = self.context.ghost_agent((1,0,0))
order = b.get_order(tick)
print(self.rank, order)
英文:

I am trying to create an OrderBook agent that can be shared across all ranks. I can share the agent using mpi4py send and receive operations. But based on the documentation here I am assuming self.context.synchronize(restore_orderbook) will create ghost agents in each rank but the program is showing KeyError: 0. The agent created in rank 0 is not being copied to other ranks as ghost even after using request_agents function.

Expected output:

Orderbook is created on rank 0 and ghost copies are made on rank 1,2,3 using the OrderBook created in rank 0. Print orders as per the if else logic present in the step function.

test2.py

from typing import Dict, Tuple
from mpi4py import MPI
import numpy as np
from dataclasses import dataclass

from repast4py import core, random, space, schedule, logging, parameters
from repast4py import context as ctx
import repast4py
from queue import Queue
import pandas as pd
from datetime import datetime, timedelta
import random as rnd


order_count = 1000
items = np.arange(1, 101)
quantities = np.arange(1, 11)
start_date = datetime(2023, 1, 1)
end_date = datetime(2023, 1, 3)

item_values = np.random.choice(items, size=order_count)
qty_values = np.random.choice(quantities, size=order_count)
time_deltas = np.random.randint(0, int((end_date - start_date).total_seconds()), size=order_count)
date_values = [start_date + timedelta(seconds=int(delta)) for delta in time_deltas]

data = {
    &#39;item&#39;: item_values,
    &#39;qty&#39;: qty_values,
    &#39;order_datetime&#39;: date_values
}
df = pd.DataFrame(data).sort_values(by=[&#39;order_datetime&#39;], ascending=True).reset_index(drop=True)
df[&#39;order_id&#39;] = np.arange(1, order_count+1)
df[&#39;order_datetime&#39;] = pd.to_datetime(df[&#39;order_datetime&#39;])
df[&#39;tick&#39;] = df.apply(lambda x: 1 + int((x[&#39;order_datetime&#39;] - df[&#39;order_datetime&#39;].min()).total_seconds()/173), axis=1)
df.to_csv(&#39;test_data.csv&#39;, index=False)


class OrderBook(core.Agent): 
    TYPE = 0
    def __init__(self, a_id: int, rank: int):
        super().__init__(id=a_id, type=OrderBook.TYPE, rank=rank)
        self.df = list(pd.read_csv(&#39;test_data.csv&#39;).to_records())
        # self.df[&#39;order_datetime&#39;] = pd.to_datetime(self.df[&#39;order_datetime&#39;])
    
    def save(self) -&gt; Tuple:
        return (self.uid, self.df)

    def get_order(self, tick):
        return rnd.choice(self.df)

    def update(self, data):
        self.df = data 

orderbook_cache = {}


def restore_orderbook(orderbook_data: Tuple):

    uid = orderbook_data[0]
    if uid[1] == OrderBook.TYPE:
        if uid in orderbook_cache:
            ob = orderbook_cache[uid]
        else:
            ob = OrderBook(uid[0], uid[1], uid[2])
            orderbook_cache[uid] = ob

        ob.df = orderbook_data[1]
        return ob

def create_agent(agent_data):
    uid = agent_data[0]
    book = OrderBook(uid[0], uid[2])
    book.df = agent_data[1]
    return book
            
class Model:

    def __init__(self, comm: MPI.Intracomm, params: Dict):
        self.context = ctx.SharedContext(comm)
        self.rank = comm.Get_rank()
        if self.rank == 0:
            book = OrderBook(1, 0)
            self.context.add(book)
            requests = []
            print(book.uid)

        else:
            requests = [((1,0,0), 0)]

        self.context.request_agents(requests, create_agent)

        
        self.runner = schedule.init_schedule_runner(comm)
        self.runner.schedule_repeating_event(1, 1, self.step)
        self.runner.schedule_stop(params[&#39;stop.at&#39;])
        self.runner.schedule_end_event(self.at_end)
                
        
    def step(self):
        tick = self.runner.schedule.tick
        self.context.synchronize(restore_orderbook)
        
        if self.rank == 0 and tick &lt; 25:
            for b in self.context.agents(OrderBook.TYPE):
                order = b.get_order(tick)
                print(self.rank, order)    
        elif self.rank == 1 and 25 &lt;= tick &lt; 50:
            for b in self.context.agents(OrderBook.TYPE):
                order = b.get_order(tick)
                print(self.rank, order)
        elif self.rank == 2 and 50 &lt;= tick &lt; 75:
            for b in self.context.agents(OrderBook.TYPE):
                order = b.get_order(tick)
                print(self.rank, order)
        elif self.rank == 3 and 75 &lt;= tick &lt; 100:
            for b in self.context.agents(OrderBook.TYPE):
                order = b.get_order(tick)
                print(self.rank, order)
    
    def at_end(self):
        print(&#39;simulation complete&#39;)

    def start(self):
        self.runner.execute()    
         
def run(params: Dict):
    global model
    model = Model(MPI.COMM_WORLD, params)
    model.start()


if __name__ == &quot;__main__&quot;:
    parser = parameters.create_args_parser()
    args = parser.parse_args()
    params = parameters.init_params(args.parameters_file, args.parameters)
    run(params)

test.yaml

random.seed: 42
stop.at: 100
orders.count: 1000

command to run

mpirun -n 4 python test2.py test.yaml

I think the ghost agents are not getting created with self.context.request_agents(requests, create_agent). Is this usage right?

Error:

Traceback (most recent call last):
  File &quot;test2.py&quot;, line 136, in &lt;module&gt;
    run(params)
  File &quot;test2.py&quot;, line 129, in run
    model.start()
  File &quot;test2.py&quot;, line 124, in start
    self.runner.execute()    
  File &quot;/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py&quot;, line 584, in execute
    self.schedule.execute()
  File &quot;/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py&quot;, line 406, in execute
    self.executing_group.execute(self.queue)
  File &quot;/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py&quot;, line 243, in execute
    interrupted = self.execute_evts(self.prioritized, queue)
  File &quot;/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py&quot;, line 223, in execute_evts
    evt()
  File &quot;/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py&quot;, line 60, in __call__
    self.evt()
  File &quot;test2.py&quot;, line 108, in step
    for b in self.context.agents(OrderBook.TYPE):
  File &quot;/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/context.py&quot;, line 358, in agents
    return self._agents_by_type[agent_type].values().__iter__()
KeyError: 0

Edit: Working solution

I replaced

for b in self.context.agents(OrderBook.TYPE):
order = b.get_order(tick)
print(self.rank, order)

with

b  = self.context.ghost_agent((1,0,0))
order = b.get_order(tick)
print(self.rank, order)

答案1

得分: 1

问题在于所有的排名都必须调用request_agents。在您的代码中,排名0没有调用它,所以代码一直等待排名0完成调用。在MPI中,这被称为集体操作。这也在request_agents的文档中提到:

"这是一个集体操作,所有排名都必须调用它,无论该排名是否正在请求代理。请求的代理将自动添加为此排名的幽灵。"

然而,这很容易被忽视。当MPI程序出现挂起时,通常是这样的情况。

我尝试了带有此更新的代码,

if self.rank == 0:
    book = OrderBook(1, 0)
    self.context.add(book)
    requests = []
    # print(book.uid)
else:
    requests = [((1,0,0), 0)]
    
self.context.request_agents(requests, create_agent)

并且request_agents的调用有效。然而,在create_agent调用中存在一个问题。OrderBook构造函数只接受3个参数,但create_agent有4个参数。

最后,我不确定pandas数据帧在进程间传输时会有多好。这可能会起作用,但如果出现错误,可以尝试将数据帧传递为列表的列表或类似的内容。

英文:

The problem here is that all the ranks must call request_agents. In your code rank 0 is not calling it and so the code hangs waiting for rank 0 to complete the call. In MPI, this is called a collective operation. This is also mentioned in the docs for request_agents

"This is a collective operation and all ranks must call it, regardless of whether agents are being requested by that rank. The requested agents will be automatically added as ghosts to this rank"

This is easy to overlook though. When there's a hang in an MPI program it's usually something like this.

I tried the code with this update,

   if self.rank == 0:
        book = OrderBook(1, 0)
        self.context.add(book)
        requests = []
        # print(book.uid)

    else:
        requests = [((1,0,0), 0)]
    
    self.context.request_agents(requests, create_agent)

and the request_agents call works. However, there's an issue in the create_agent call. The OrderBook constructor only takes 3 arguments but create_agent has 4.

Lastly, I'm not sure how well the pandas data frame will pickle and transfer across processes. That might work, but if you do get errors maybe try to pass the data frame as a list of lists or something like that.

huangapple
  • 本文由 发表于 2023年6月19日 21:11:42
  • 转载请务必保留本文链接:https://go.coder-hub.com/76506976.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定