Python递归外汇交叉汇率查找?

huangapple go评论69阅读模式
英文:

python recursive fx cross rates lookup?

问题

给定一个字典中的汇率例如

    rates = {
            'BTC/USDT' : 30000,         <-- 路径 A, 步骤 1
            'ETH/USDT' : 1875,
            'BTC/ETH'  : 16,            <-- 路径 B, 步骤 1
            'DOGE/ETH' : 0.0000342,     <-- 路径 B, 步骤 2
            'DOGE/USDC' : 0.06267,      <-- 路径 B, 步骤 3
            'USDT/USDC' : 1.0005        <-- 路径 A, 步骤 2
        }

如何递归地计算外汇交叉汇率

    def calc_fx(base_ccy: str, quote_ccy: str, rates: Dict[str, float]) -> float:
        if not any([ x for x in rates if x.split('/')[0]==base_ccy or x.split('/')[-1]==base_ccy or x.split('/')[0]==quote_ccy or x.split('/')[-1]==quote_ccy]):
           return None
        if [ x for x in rates if x.split('/')[0]==base_ccy and x.split('/')[-1]==quote_ccy]:
            return rates[f"{base_ccy}/{quote_ccy}"]
        elif [ x for x in rates if x.split('/')[-1]==base_ccy and x.split('/')[0]==quote_ccy]:
            return 1 / rates[f"{quote_ccy}/{base_ccy}"]
        else:
            # 如何递归查找汇率?
            return None
    
    base_ccy : str = 'BTC'
    quote_ccy : str = 'USDC'
    rate = calc_fx(base_ccy, quote_ccy, rates)
    print(rate)

例如BTC/USDC = ?,在字典中没有直接查找的方法你需要通过三个步骤递归地进行计算并找到具有最少步骤的路径例如路径 A 选择了两个步骤如果没有找到路径则返回 None

    路径 A
        步骤 1 BTC/USDT 30000
        步骤 2 USDT/USDC 1.0005
    
        BTC/USDC = 30000 * 1.0005 = 30015
    
    路径 B
        步骤 1 BTC/ETH = 16
            步骤 2 DOGE/ETH = 0.0000342
                步骤 3 DOGE/USDC = 0.06267
    
        BTC/USDC = 16 * (1/0.0000342) * 0.06267 = 29319.30

在上例中选择了路径 A

另一个对于这个广度优先搜索/深度优先搜索问题的调整是**返回所有可能的路径**

**我在下面发布了解决方案**
英文:

Given rates in a dict, for example:

rates = {
'BTC/USDT' : 30000,         <-- path A, hop 1
'ETH/USDT' : 1875,
'BTC/ETH'  : 16,            <-- path B, hop 1
'DOGE/ETH' : 0.0000342,     <-- path B, hop 2
'DOGE/USDC' : 0.06267,      <-- path B, hop 3
'USDT/USDC' : 1.0005        <-- path A, hop 2
}

How do I calculate fx cross rates recursive?

def calc_fx(base_ccy: str, quote_ccy: str, rates: Dict[str, float]) -> float:
if not any([ x for x in rates if x.split('/')[0]==base_ccy or x.split('/')[-1]==base_ccy or x.split('/')[0]==quote_ccy or x.split('/')[-1]==quote_ccy]):
return None
if [ x for x in rates if x.split('/')[0]==base_ccy and x.split('/')[-1]==quote_ccy]:
return rates[f"{base_ccy}/{quote_ccy}"]
elif [ x for x in rates if x.split('/')[-1]==base_ccy and x.split('/')[0]==quote_ccy]:
return 1 / rates[f"{quote_ccy}/{base_ccy}"]
else:
# How to look up rates recursively?
return None
base_ccy : str = 'BTC'
quote_ccy : str = 'USDC'
rate = calc_fx(base_ccy, quote_ccy, rates)
print(rate)

For example, BTC/USDC = ?, There's no direct lookup from dictionary. You need to do this via three hops, recursively. And find the path with minumum number of hops? For example Path A two hops will be choosen. Return None if no path found.

Path A
hop 1 BTC/USDT 30000
hop 2 USDT/USDC 1.0005
BTC/USDC = 30000 * 1.0005 = 30015
Path B
hop 1 BTC/ETH = 16
hop 2 DOGE/ETH = 0.0000342
hop 3 DOGE/USDC = 0.06267
BTC/USDC = 16 * (1/0.0000342) * 0.06267 = 29319.30

In above, path A will be choosen.

Another tweak of this BFS/DFS problem is return all possible paths.

I posted Solutions below.

答案1

得分: 1

您可以使用igraph来获取最短路径,然后计算货币:

import pandas as pd, igraph as ig, numpy as np
class Convert:
    def __init__(self, rates):
        df = pd.DataFrame(rates, index=[0]).melt()
        d = df.variable.str.split('/', expand=True)
        e = pd.concat([d, d.rename(columns={0:1, 1:0})])
        self.rates = rates
        self.f = e.assign(value=np.r_[df.value, 1/df.value])
        self.a = pd.Categorical(e.to_numpy().flatten())
        self.graph = ig.Graph(self.a.codes.reshape(e.shape))

    def calc_fx(self, base_ccy, quote_ccy):
        nodes = np.where(np.in1d(self.a.categories, [base_ccy, quote_ccy]))[0]
        path = self.graph.get_shortest_paths(*nodes)
        vals = self.a.from_codes(path, dtype=self.a.dtype).to_numpy().flatten()
        return self.f.merge(pd.DataFrame(np.c_[vals[:-1], vals[1:]])).value.prod()


rates1 = {
    'BTC/USDT': 30000,
    'ETH/USDT': 1875,
    'BTC/ETH': 16,  # hop 1
    'DOGE/ETH': 0.0000342,  # hop 2
    'DOGE/USDC': 0.06267  # hop 3
}
rates2 = {
        'BTC/USDT': 30000,         # <-- path A, hop 1
        'ETH/USDT': 1875,
        'BTC/ETH': 16,            ## <-- path B, hop 1
        'DOGE/ETH': 0.0000342,     # <-- path B, hop 2
        'DOGE/USDC': 0.06267,      # <-- path B, hop 3
        'USDT/USDC': 1.0005        # <-- path A, hop 2
    }

Convert(rates1).calc_fx('BTC', 'USDC')
# Out[308]: 29319.29824561404

Convert(rates2).calc_fx('BTC', 'USDC')
# Out[309]: 30015.0

以上的原因是为了能够重用类中存储的值

A = Convert(rates1)
A.calc_fx('BTC', 'USDC')
# Out[315]: 29319.29824561404

A.calc_fx('DOGE', 'USDT')
# Out[316]: 0.064125

B = Convert(rates2)
B.calc_fx('BTC', 'USDC')
# Out[317]: 30015.0

B.calc_fx('DOGE', 'USDT')
# Out[318]: 0.064125
英文:

You can use igraph to obtain the shortest paths, and then compute the currency:

import pandas as pd, igraph as ig, numpy as np
class Convert:
def __init__(self, rates):
df = pd.DataFrame(rates, index = [0]).melt()
d = df.variable.str.split(&#39;/&#39;, expand = True)
e = pd.concat([d, d.rename(columns={0:1,1:0})])
self.rates = rates
self.f = e.assign(value = np.r_[df.value, 1/df.value])
self.a = pd.Categorical(e.to_numpy().flatten())
self.graph = ig.Graph(self.a.codes.reshape(e.shape))
def calc_fx(self,base_ccy, quote_ccy):
nodes = np.where(np.in1d(self.a.categories, [base_ccy, quote_ccy]))[0]
path = self.graph.get_shortest_paths(*nodes)
vals = self.a.from_codes(path, dtype = self.a.dtype).to_numpy().flatten()
return self.f.merge(pd.DataFrame(np.c_[vals[:-1], vals[1:]])).value.prod()
rates1 = {
&#39;BTC/USDT&#39; : 30000,
&#39;ETH/USDT&#39; : 1875,
&#39;BTC/ETH&#39;  : 16,    # hop 1
&#39;DOGE/ETH&#39; : 0.0000342, # hop 2
&#39;DOGE/USDC&#39; : 0.06267 # hop 3
}
rates2 = {
&#39;BTC/USDT&#39; : 30000,         #&lt;-- path A, hop 1
&#39;ETH/USDT&#39; : 1875,
&#39;BTC/ETH&#39;  : 16,            ##&lt;-- path B, hop 1
&#39;DOGE/ETH&#39; : 0.0000342,     #&lt;-- path B, hop 2
&#39;DOGE/USDC&#39; : 0.06267,      #&lt;-- path B, hop 3
&#39;USDT/USDC&#39; : 1.0005        #&lt;-- path A, hop 2
}
Convert(rates1).calc_fx(&#39;BTC&#39;, &#39;USDC&#39;)
Out[308]: 29319.29824561404
Convert(rates2).calc_fx(&#39;BTC&#39;, &#39;USDC&#39;)
Out[309]: 30015.0

The reason for above it to be able to resuse the values stored within the class:

A = Convert(rates1)
A.calc_fx(&#39;BTC&#39;, &#39;USDC&#39;)
Out[315]: 29319.29824561404
A.calc_fx(&#39;DOGE&#39;, &#39;USDT&#39;)
Out[316]: 0.064125
B = Convert(rates2)
B.calc_fx(&#39;BTC&#39;, &#39;USDC&#39;)
Out[317]: 30015.0
B.calc_fx(&#39;DOGE&#39;, &#39;USDT&#39;)
Out[318]: 0.064125

答案2

得分: 0

这是您提供的代码的翻译部分:

from typing import List, Dict

rates = {
    'BTC/USDT': 30000,
    'ETH/USDT': 1875,
    'BTC/ETH': 16,
    'DOGE/ETH': 0.0000342,
    'DOGE/USDC': 0.06267,
    'USDT/USDC': 1.0005
}

class Node():
    def __init__(self, ticker: str, rate: float, parent) -> None:
        self.ticker = ticker
        self.rate = rate
        self.children = []

        if parent:
            self.parent = parent
            parent.children.append(self)
        else:
            self.parent = None

def find_root(node):
    target = None
    while node:
        target = node
        node = node.parent
    return target

def trace_to_root(node):
    trace = []
    while node:
        if node.rate != 0:
            trace.append(node)
        node = node.parent
    return trace

def flip(ticker):
    return f"{ticker.split('/')[-1]}/{ticker.split('/')[0]}"

def compute_paths(base_ccy: str, quote_ccy: str, rates: Dict[str, float], parent, solutions: List, visited: List) -> Node:
    ticker = f"{base_ccy}/{quote_ccy}"
    if ticker in visited or flip(ticker) in visited:
        return

    visited.append(f"{base_ccy}/{quote_ccy}")
    visited.append(flip(f"{base_ccy}/{quote_ccy}"))

    inscope_rates = [x for x in rates if x.split('/')[0] == base_ccy or x.split('/')[-1] == base_ccy or x.split('/')[0] == quote_ccy or x.split('/')[-1] == quote_ccy]
    if not any(inscope_rates):
        return

    root = find_root(parent) if parent.parent else parent

    if [x for x in inscope_rates if x.split('/')[0] == base_ccy and x.split('/')[-1] == quote_ccy]:
        node = Node(f"{base_ccy}/{quote_ccy}", rates[f"{base_ccy}/{quote_ccy}"], parent)
        is_final = (node.ticker.split('/')[-1] == root.ticker.split('/')[-1] or node.ticker.split('/')[-1] == root.ticker.split('/')[-1])
        if is_final:
            solutions.append(trace_to_root(node))

    elif [x for x in inscope_rates if x.split('/')[-1] == base_ccy and x.split('/')[0] == quote_ccy]:
        node = Node(f"{quote_ccy}/{base_ccy}", 1 / rates[f"{quote_ccy}/{base_ccy}"], parent)
        is_final = (node.ticker.split('/')[0] == root.ticker.split('/')[-1] or node.ticker.split('/')[-1] == root.ticker.split('/')[-1])
        if is_final:
            solutions.append(trace_to_root(node))

    else:
        for pair in inscope_rates:
            pair_base, pair_quote = pair.split('/')
            if base_ccy == pair_base and pair not in visited:
                node = Node(pair, rates[pair], parent)
                visited.append(node.ticker)
                visited.append(flip(node.ticker))
                is_final = (node.ticker.split('/')[0] == root.ticker.split('/')[-1] or node.ticker.split('/')[-1] == root.ticker.split('/')[-1])
                if not is_final:
                    compute_paths(pair_quote, quote_ccy, rates, node, solutions, visited)
                else:
                    solutions.append(trace_to_root(node))

            elif base_ccy == pair_quote and pair not in visited:
                node = Node(pair, rates[pair], parent)
                visited.append(node.ticker)
                visited.append(flip(node.ticker))
                is_final = (node.ticker.split('/')[0] == root.ticker.split('/')[-1] or node.ticker.split('/')[-1] == root.ticker.split('/')[-1])
                if not is_final:
                    compute_paths(pair_base, quote_ccy, rates, node, solutions, visited)
                else:
                    solutions.append(trace_to_root(node))

def calc_fx(path: List[Node]):
    last_ticker = None
    overall_fx = None
    for node in reversed(path):
        ticker = node.ticker
        fx = node.rate
        if last_ticker:
            fx = fx if ticker.split('/')[0] == last_ticker.split('/')[0] else 1 / fx
        overall_fx = fx if not overall_fx else fx * overall_fx
        last_ticker = ticker
    return overall_fx

base_ccy: str = 'BTC' # Try BTC or ETH for multi hops
quote_ccy: str = 'USDC'
root = Node(f"{base_ccy}/{quote_ccy}", 0, None)
visited = []
solutions = []
compute_paths(base_ccy, quote_ccy, rates, root, solutions, visited)

for path in solutions:
    fx = calc_fx(path)
    print(f"fx: {fx}, #hops: {len(path)}")

如果您需要更多帮助,请告诉我。

英文:

Here it is:

from typing import List, Dict
rates = {
&#39;BTC/USDT&#39; : 30000, 
&#39;ETH/USDT&#39; : 1875,
&#39;BTC/ETH&#39;  : 16,
&#39;DOGE/ETH&#39; : 0.0000342,
&#39;DOGE/USDC&#39; : 0.06267,
&#39;USDT/USDC&#39; : 1.0005
}
class Node():
def __init__(self, ticker : str, rate : float, parent) -&gt; None:
self.ticker = ticker
self.rate = rate
self.children = []
if parent:
self.parent = parent
parent.children.append(self)
else:
self.parent = None
def find_root(node):
target = None
while node:
target = node
node = node.parent
return target
def trace_to_root(node):
trace = []
while node:
if node.rate!=0:
trace.append(node)
node = node.parent
return trace
def flip(ticker):
return f&quot;{ticker.split(&#39;/&#39;)[-1]}/{ticker.split(&#39;/&#39;)[0]}&quot;
def compute_paths(base_ccy: str, quote_ccy: str, rates: Dict[str, float], parent, solutions : List, visited : List) -&gt; Node:
ticker = f&quot;{base_ccy}/{quote_ccy}&quot;
if ticker  in visited or flip(ticker) in visited:
return
visited.append(f&quot;{base_ccy}/{quote_ccy}&quot;)
visited.append(flip(f&quot;{base_ccy}/{quote_ccy}&quot;))
inscope_rates = [ x for x in rates if x.split(&#39;/&#39;)[0]==base_ccy or x.split(&#39;/&#39;)[-1]==base_ccy or x.split(&#39;/&#39;)[0]==quote_ccy or x.split(&#39;/&#39;)[-1]==quote_ccy]
if not any(inscope_rates):
return
root = find_root(parent) if parent.parent else parent
if [ x for x in inscope_rates if x.split(&#39;/&#39;)[0]==base_ccy and x.split(&#39;/&#39;)[-1]==quote_ccy]:
node = Node(f&quot;{base_ccy}/{quote_ccy}&quot;, rates[f&quot;{base_ccy}/{quote_ccy}&quot;], parent)
is_final = (node.ticker.split(&#39;/&#39;)[0]==root.ticker.split(&#39;/&#39;)[-1] or node.ticker.split(&#39;/&#39;)[-1]==root.ticker.split(&#39;/&#39;)[-1])
if is_final:
solutions.append(trace_to_root(node))
elif [ x for x in inscope_rates if x.split(&#39;/&#39;)[-1]==base_ccy and x.split(&#39;/&#39;)[0]==quote_ccy]:
node = Node(f&quot;{quote_ccy}/{base_ccy}&quot;, 1 / rates[f&quot;{quote_ccy}/{base_ccy}&quot;], parent)
is_final = (node.ticker.split(&#39;/&#39;)[0]==root.ticker.split(&#39;/&#39;)[-1] or node.ticker.split(&#39;/&#39;)[-1]==root.ticker.split(&#39;/&#39;)[-1])
if is_final:
solutions.append(trace_to_root(node))
else:
for pair in inscope_rates:
pair_base, pair_quote = pair.split(&#39;/&#39;)[0], pair.split(&#39;/&#39;)[-1]
if base_ccy==pair_base and pair not in visited:
node = Node(pair, rates[pair], parent)
visited.append(node.ticker)
visited.append(flip(node.ticker))
is_final = (node.ticker.split(&#39;/&#39;)[0]==root.ticker.split(&#39;/&#39;)[-1] or node.ticker.split(&#39;/&#39;)[-1]==root.ticker.split(&#39;/&#39;)[-1])
if not is_final:
compute_paths(pair_quote, quote_ccy, rates, node, solutions, visited)
else:
solutions.append(trace_to_root(node))
elif base_ccy==pair_quote and pair not in visited:
node = Node(pair, rates[pair], parent)
visited.append(node.ticker)
visited.append(flip(node.ticker))
is_final = (node.ticker.split(&#39;/&#39;)[0]==root.ticker.split(&#39;/&#39;)[-1] or node.ticker.split(&#39;/&#39;)[-1]==root.ticker.split(&#39;/&#39;)[-1])
if not is_final:
compute_paths(pair_base, quote_ccy, rates, node, solutions, visited)
else:
solutions.append(trace_to_root(node))
def calc_fx(path : List[Node]):
last_ticker = None
overall_fx = None
for node in reversed(path):
ticker = node.ticker
fx = node.rate 
if last_ticker:
fx = fx if ticker.split(&#39;/&#39;)[0]==last_ticker.split(&#39;/&#39;)[0] else 1/fx
overall_fx = fx if not overall_fx else fx * overall_fx
last_ticker = ticker
return overall_fx
base_ccy : str = &#39;BTC&#39; # Try BTC or ETH for multi hops
quote_ccy : str = &#39;USDC&#39;
root = Node(f&quot;{base_ccy}/{quote_ccy}&quot;, 0, None)
visited = []
solutions = []
compute_paths(base_ccy, quote_ccy, rates, root, solutions, visited)
for path in solutions:
fx = calc_fx(path)
print(f&quot;fx: {fx}, #hops: {len(path)}&quot;)

huangapple
  • 本文由 发表于 2023年6月29日 07:35:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/76577279.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定