英文:
python recursive fx cross rates lookup?
问题
给定一个字典中的汇率,例如:
rates = {
'BTC/USDT' : 30000, <-- 路径 A, 步骤 1
'ETH/USDT' : 1875,
'BTC/ETH' : 16, <-- 路径 B, 步骤 1
'DOGE/ETH' : 0.0000342, <-- 路径 B, 步骤 2
'DOGE/USDC' : 0.06267, <-- 路径 B, 步骤 3
'USDT/USDC' : 1.0005 <-- 路径 A, 步骤 2
}
如何递归地计算外汇交叉汇率?
def calc_fx(base_ccy: str, quote_ccy: str, rates: Dict[str, float]) -> float:
if not any([ x for x in rates if x.split('/')[0]==base_ccy or x.split('/')[-1]==base_ccy or x.split('/')[0]==quote_ccy or x.split('/')[-1]==quote_ccy]):
return None
if [ x for x in rates if x.split('/')[0]==base_ccy and x.split('/')[-1]==quote_ccy]:
return rates[f"{base_ccy}/{quote_ccy}"]
elif [ x for x in rates if x.split('/')[-1]==base_ccy and x.split('/')[0]==quote_ccy]:
return 1 / rates[f"{quote_ccy}/{base_ccy}"]
else:
# 如何递归查找汇率?
return None
base_ccy : str = 'BTC'
quote_ccy : str = 'USDC'
rate = calc_fx(base_ccy, quote_ccy, rates)
print(rate)
例如,BTC/USDC = ?,在字典中没有直接查找的方法。你需要通过三个步骤递归地进行计算。并找到具有最少步骤的路径?例如,路径 A 选择了两个步骤。如果没有找到路径,则返回 None。
路径 A
步骤 1 BTC/USDT 30000
步骤 2 USDT/USDC 1.0005
BTC/USDC = 30000 * 1.0005 = 30015
路径 B
步骤 1 BTC/ETH = 16
步骤 2 DOGE/ETH = 0.0000342
步骤 3 DOGE/USDC = 0.06267
BTC/USDC = 16 * (1/0.0000342) * 0.06267 = 29319.30
在上例中,选择了路径 A。
另一个对于这个广度优先搜索/深度优先搜索问题的调整是**返回所有可能的路径**。
**我在下面发布了解决方案。**
英文:
Given rates in a dict, for example:
rates = {
'BTC/USDT' : 30000, <-- path A, hop 1
'ETH/USDT' : 1875,
'BTC/ETH' : 16, <-- path B, hop 1
'DOGE/ETH' : 0.0000342, <-- path B, hop 2
'DOGE/USDC' : 0.06267, <-- path B, hop 3
'USDT/USDC' : 1.0005 <-- path A, hop 2
}
How do I calculate fx cross rates recursive?
def calc_fx(base_ccy: str, quote_ccy: str, rates: Dict[str, float]) -> float:
if not any([ x for x in rates if x.split('/')[0]==base_ccy or x.split('/')[-1]==base_ccy or x.split('/')[0]==quote_ccy or x.split('/')[-1]==quote_ccy]):
return None
if [ x for x in rates if x.split('/')[0]==base_ccy and x.split('/')[-1]==quote_ccy]:
return rates[f"{base_ccy}/{quote_ccy}"]
elif [ x for x in rates if x.split('/')[-1]==base_ccy and x.split('/')[0]==quote_ccy]:
return 1 / rates[f"{quote_ccy}/{base_ccy}"]
else:
# How to look up rates recursively?
return None
base_ccy : str = 'BTC'
quote_ccy : str = 'USDC'
rate = calc_fx(base_ccy, quote_ccy, rates)
print(rate)
For example, BTC/USDC = ?, There's no direct lookup from dictionary. You need to do this via three hops, recursively. And find the path with minumum number of hops? For example Path A two hops will be choosen. Return None if no path found.
Path A
hop 1 BTC/USDT 30000
hop 2 USDT/USDC 1.0005
BTC/USDC = 30000 * 1.0005 = 30015
Path B
hop 1 BTC/ETH = 16
hop 2 DOGE/ETH = 0.0000342
hop 3 DOGE/USDC = 0.06267
BTC/USDC = 16 * (1/0.0000342) * 0.06267 = 29319.30
In above, path A will be choosen.
Another tweak of this BFS/DFS problem is return all possible paths.
I posted Solutions below.
答案1
得分: 1
您可以使用igraph
来获取最短路径,然后计算货币:
import pandas as pd, igraph as ig, numpy as np
class Convert:
def __init__(self, rates):
df = pd.DataFrame(rates, index=[0]).melt()
d = df.variable.str.split('/', expand=True)
e = pd.concat([d, d.rename(columns={0:1, 1:0})])
self.rates = rates
self.f = e.assign(value=np.r_[df.value, 1/df.value])
self.a = pd.Categorical(e.to_numpy().flatten())
self.graph = ig.Graph(self.a.codes.reshape(e.shape))
def calc_fx(self, base_ccy, quote_ccy):
nodes = np.where(np.in1d(self.a.categories, [base_ccy, quote_ccy]))[0]
path = self.graph.get_shortest_paths(*nodes)
vals = self.a.from_codes(path, dtype=self.a.dtype).to_numpy().flatten()
return self.f.merge(pd.DataFrame(np.c_[vals[:-1], vals[1:]])).value.prod()
rates1 = {
'BTC/USDT': 30000,
'ETH/USDT': 1875,
'BTC/ETH': 16, # hop 1
'DOGE/ETH': 0.0000342, # hop 2
'DOGE/USDC': 0.06267 # hop 3
}
rates2 = {
'BTC/USDT': 30000, # <-- path A, hop 1
'ETH/USDT': 1875,
'BTC/ETH': 16, ## <-- path B, hop 1
'DOGE/ETH': 0.0000342, # <-- path B, hop 2
'DOGE/USDC': 0.06267, # <-- path B, hop 3
'USDT/USDC': 1.0005 # <-- path A, hop 2
}
Convert(rates1).calc_fx('BTC', 'USDC')
# Out[308]: 29319.29824561404
Convert(rates2).calc_fx('BTC', 'USDC')
# Out[309]: 30015.0
以上的原因是为了能够重用类中存储的值:
A = Convert(rates1)
A.calc_fx('BTC', 'USDC')
# Out[315]: 29319.29824561404
A.calc_fx('DOGE', 'USDT')
# Out[316]: 0.064125
B = Convert(rates2)
B.calc_fx('BTC', 'USDC')
# Out[317]: 30015.0
B.calc_fx('DOGE', 'USDT')
# Out[318]: 0.064125
英文:
You can use igraph
to obtain the shortest paths, and then compute the currency:
import pandas as pd, igraph as ig, numpy as np
class Convert:
def __init__(self, rates):
df = pd.DataFrame(rates, index = [0]).melt()
d = df.variable.str.split('/', expand = True)
e = pd.concat([d, d.rename(columns={0:1,1:0})])
self.rates = rates
self.f = e.assign(value = np.r_[df.value, 1/df.value])
self.a = pd.Categorical(e.to_numpy().flatten())
self.graph = ig.Graph(self.a.codes.reshape(e.shape))
def calc_fx(self,base_ccy, quote_ccy):
nodes = np.where(np.in1d(self.a.categories, [base_ccy, quote_ccy]))[0]
path = self.graph.get_shortest_paths(*nodes)
vals = self.a.from_codes(path, dtype = self.a.dtype).to_numpy().flatten()
return self.f.merge(pd.DataFrame(np.c_[vals[:-1], vals[1:]])).value.prod()
rates1 = {
'BTC/USDT' : 30000,
'ETH/USDT' : 1875,
'BTC/ETH' : 16, # hop 1
'DOGE/ETH' : 0.0000342, # hop 2
'DOGE/USDC' : 0.06267 # hop 3
}
rates2 = {
'BTC/USDT' : 30000, #<-- path A, hop 1
'ETH/USDT' : 1875,
'BTC/ETH' : 16, ##<-- path B, hop 1
'DOGE/ETH' : 0.0000342, #<-- path B, hop 2
'DOGE/USDC' : 0.06267, #<-- path B, hop 3
'USDT/USDC' : 1.0005 #<-- path A, hop 2
}
Convert(rates1).calc_fx('BTC', 'USDC')
Out[308]: 29319.29824561404
Convert(rates2).calc_fx('BTC', 'USDC')
Out[309]: 30015.0
The reason for above it to be able to resuse the values stored within the class:
A = Convert(rates1)
A.calc_fx('BTC', 'USDC')
Out[315]: 29319.29824561404
A.calc_fx('DOGE', 'USDT')
Out[316]: 0.064125
B = Convert(rates2)
B.calc_fx('BTC', 'USDC')
Out[317]: 30015.0
B.calc_fx('DOGE', 'USDT')
Out[318]: 0.064125
答案2
得分: 0
这是您提供的代码的翻译部分:
from typing import List, Dict
rates = {
'BTC/USDT': 30000,
'ETH/USDT': 1875,
'BTC/ETH': 16,
'DOGE/ETH': 0.0000342,
'DOGE/USDC': 0.06267,
'USDT/USDC': 1.0005
}
class Node():
def __init__(self, ticker: str, rate: float, parent) -> None:
self.ticker = ticker
self.rate = rate
self.children = []
if parent:
self.parent = parent
parent.children.append(self)
else:
self.parent = None
def find_root(node):
target = None
while node:
target = node
node = node.parent
return target
def trace_to_root(node):
trace = []
while node:
if node.rate != 0:
trace.append(node)
node = node.parent
return trace
def flip(ticker):
return f"{ticker.split('/')[-1]}/{ticker.split('/')[0]}"
def compute_paths(base_ccy: str, quote_ccy: str, rates: Dict[str, float], parent, solutions: List, visited: List) -> Node:
ticker = f"{base_ccy}/{quote_ccy}"
if ticker in visited or flip(ticker) in visited:
return
visited.append(f"{base_ccy}/{quote_ccy}")
visited.append(flip(f"{base_ccy}/{quote_ccy}"))
inscope_rates = [x for x in rates if x.split('/')[0] == base_ccy or x.split('/')[-1] == base_ccy or x.split('/')[0] == quote_ccy or x.split('/')[-1] == quote_ccy]
if not any(inscope_rates):
return
root = find_root(parent) if parent.parent else parent
if [x for x in inscope_rates if x.split('/')[0] == base_ccy and x.split('/')[-1] == quote_ccy]:
node = Node(f"{base_ccy}/{quote_ccy}", rates[f"{base_ccy}/{quote_ccy}"], parent)
is_final = (node.ticker.split('/')[-1] == root.ticker.split('/')[-1] or node.ticker.split('/')[-1] == root.ticker.split('/')[-1])
if is_final:
solutions.append(trace_to_root(node))
elif [x for x in inscope_rates if x.split('/')[-1] == base_ccy and x.split('/')[0] == quote_ccy]:
node = Node(f"{quote_ccy}/{base_ccy}", 1 / rates[f"{quote_ccy}/{base_ccy}"], parent)
is_final = (node.ticker.split('/')[0] == root.ticker.split('/')[-1] or node.ticker.split('/')[-1] == root.ticker.split('/')[-1])
if is_final:
solutions.append(trace_to_root(node))
else:
for pair in inscope_rates:
pair_base, pair_quote = pair.split('/')
if base_ccy == pair_base and pair not in visited:
node = Node(pair, rates[pair], parent)
visited.append(node.ticker)
visited.append(flip(node.ticker))
is_final = (node.ticker.split('/')[0] == root.ticker.split('/')[-1] or node.ticker.split('/')[-1] == root.ticker.split('/')[-1])
if not is_final:
compute_paths(pair_quote, quote_ccy, rates, node, solutions, visited)
else:
solutions.append(trace_to_root(node))
elif base_ccy == pair_quote and pair not in visited:
node = Node(pair, rates[pair], parent)
visited.append(node.ticker)
visited.append(flip(node.ticker))
is_final = (node.ticker.split('/')[0] == root.ticker.split('/')[-1] or node.ticker.split('/')[-1] == root.ticker.split('/')[-1])
if not is_final:
compute_paths(pair_base, quote_ccy, rates, node, solutions, visited)
else:
solutions.append(trace_to_root(node))
def calc_fx(path: List[Node]):
last_ticker = None
overall_fx = None
for node in reversed(path):
ticker = node.ticker
fx = node.rate
if last_ticker:
fx = fx if ticker.split('/')[0] == last_ticker.split('/')[0] else 1 / fx
overall_fx = fx if not overall_fx else fx * overall_fx
last_ticker = ticker
return overall_fx
base_ccy: str = 'BTC' # Try BTC or ETH for multi hops
quote_ccy: str = 'USDC'
root = Node(f"{base_ccy}/{quote_ccy}", 0, None)
visited = []
solutions = []
compute_paths(base_ccy, quote_ccy, rates, root, solutions, visited)
for path in solutions:
fx = calc_fx(path)
print(f"fx: {fx}, #hops: {len(path)}")
如果您需要更多帮助,请告诉我。
英文:
Here it is:
from typing import List, Dict
rates = {
'BTC/USDT' : 30000,
'ETH/USDT' : 1875,
'BTC/ETH' : 16,
'DOGE/ETH' : 0.0000342,
'DOGE/USDC' : 0.06267,
'USDT/USDC' : 1.0005
}
class Node():
def __init__(self, ticker : str, rate : float, parent) -> None:
self.ticker = ticker
self.rate = rate
self.children = []
if parent:
self.parent = parent
parent.children.append(self)
else:
self.parent = None
def find_root(node):
target = None
while node:
target = node
node = node.parent
return target
def trace_to_root(node):
trace = []
while node:
if node.rate!=0:
trace.append(node)
node = node.parent
return trace
def flip(ticker):
return f"{ticker.split('/')[-1]}/{ticker.split('/')[0]}"
def compute_paths(base_ccy: str, quote_ccy: str, rates: Dict[str, float], parent, solutions : List, visited : List) -> Node:
ticker = f"{base_ccy}/{quote_ccy}"
if ticker in visited or flip(ticker) in visited:
return
visited.append(f"{base_ccy}/{quote_ccy}")
visited.append(flip(f"{base_ccy}/{quote_ccy}"))
inscope_rates = [ x for x in rates if x.split('/')[0]==base_ccy or x.split('/')[-1]==base_ccy or x.split('/')[0]==quote_ccy or x.split('/')[-1]==quote_ccy]
if not any(inscope_rates):
return
root = find_root(parent) if parent.parent else parent
if [ x for x in inscope_rates if x.split('/')[0]==base_ccy and x.split('/')[-1]==quote_ccy]:
node = Node(f"{base_ccy}/{quote_ccy}", rates[f"{base_ccy}/{quote_ccy}"], parent)
is_final = (node.ticker.split('/')[0]==root.ticker.split('/')[-1] or node.ticker.split('/')[-1]==root.ticker.split('/')[-1])
if is_final:
solutions.append(trace_to_root(node))
elif [ x for x in inscope_rates if x.split('/')[-1]==base_ccy and x.split('/')[0]==quote_ccy]:
node = Node(f"{quote_ccy}/{base_ccy}", 1 / rates[f"{quote_ccy}/{base_ccy}"], parent)
is_final = (node.ticker.split('/')[0]==root.ticker.split('/')[-1] or node.ticker.split('/')[-1]==root.ticker.split('/')[-1])
if is_final:
solutions.append(trace_to_root(node))
else:
for pair in inscope_rates:
pair_base, pair_quote = pair.split('/')[0], pair.split('/')[-1]
if base_ccy==pair_base and pair not in visited:
node = Node(pair, rates[pair], parent)
visited.append(node.ticker)
visited.append(flip(node.ticker))
is_final = (node.ticker.split('/')[0]==root.ticker.split('/')[-1] or node.ticker.split('/')[-1]==root.ticker.split('/')[-1])
if not is_final:
compute_paths(pair_quote, quote_ccy, rates, node, solutions, visited)
else:
solutions.append(trace_to_root(node))
elif base_ccy==pair_quote and pair not in visited:
node = Node(pair, rates[pair], parent)
visited.append(node.ticker)
visited.append(flip(node.ticker))
is_final = (node.ticker.split('/')[0]==root.ticker.split('/')[-1] or node.ticker.split('/')[-1]==root.ticker.split('/')[-1])
if not is_final:
compute_paths(pair_base, quote_ccy, rates, node, solutions, visited)
else:
solutions.append(trace_to_root(node))
def calc_fx(path : List[Node]):
last_ticker = None
overall_fx = None
for node in reversed(path):
ticker = node.ticker
fx = node.rate
if last_ticker:
fx = fx if ticker.split('/')[0]==last_ticker.split('/')[0] else 1/fx
overall_fx = fx if not overall_fx else fx * overall_fx
last_ticker = ticker
return overall_fx
base_ccy : str = 'BTC' # Try BTC or ETH for multi hops
quote_ccy : str = 'USDC'
root = Node(f"{base_ccy}/{quote_ccy}", 0, None)
visited = []
solutions = []
compute_paths(base_ccy, quote_ccy, rates, root, solutions, visited)
for path in solutions:
fx = calc_fx(path)
print(f"fx: {fx}, #hops: {len(path)}")
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论