英文:
I have a Pandas timeseries dataframe that I would like to filter by a specific time on the 'timestamp' column
问题
我有一些用于回测交易时间序列数据的类,数据以DataFrame格式存储,我想根据'timestamp'列来筛选数据。
我尝试了以下If
语句,并期望将“worked”打印到终端:
if df.set_index(date_col).at_time('09:00').reset_index():
print("worked")
但收到了以下KeyError:
Traceback (most recent call last):
File "C:/Users/danra/dev/pycharm_projects/Deribit/backtesting/strategies/HigherLower.py", line 86, in <module>
HL.run_backtest()
File "C:/Users/danra/dev/pycharm_projects/Deribit/backtesting/strategies/HigherLower.py", line 34, in run_backtest
self.generate_signals()
File "C:/Users/danra/dev/pycharm_projects/Deribit/backtesting/strategies/HigherLower.py", line 14, in generate_signals
if df.set_index(date_col).at_time('09:00').reset_index():
File "C:\Users\danra\anaconda3\lib\site-packages\pandas\util\_decorators.py", line 311, in wrapper
return func(*args, **kwargs)
File "C:\Users\danra\anaconda3\lib\site-packages\pandas\core\frame.py", line 5446, in set_index
raise KeyError(f"None of {missing} are in the columns")
KeyError: "None of ['timestamp'] are in the columns"
Process finished with exit code 1
我尝试更改datamanager.py文件中的字典键/值,但仍然收到相同的KeyError。
以下是正在使用的三个模块。
backtest.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from backtesting.datamanager import DataManager
class BackTestSA:
"""
backtesting class for all single asset strategies,
columns must include the following :
close: float
timestamp: date
"""
def __init__(self, csv_path, date_col, max_holding):
self.dmgt = DataManager(csv_path, date_col)
# trade variables
self.open_pos = False
self.entry_price = None
self.direction = None
self.target_price = None
self.stop_price = None
# vertical barrier variable
self.max_holding = max_holding
self.max_holding_limit = max_holding
# barrier multipliers
self.ub_mult = 1.005
self.lb_mult = 0.995
# special case of vertical barrier
self.end_date = self.dmgt.df.index.values[-1]
self.returns_series = []
self.holding_series = []
self.direction_series = []
# 其他方法...
if __name__ == '__main__':
csv_path = "../data/cleaned_btc.csv"
date_col = 'timestamp'
max_holding = 8
HL = HigherLower(csv_path, date_col, max_holding)
HL.dmgt.change_resolution("120min")
HL.run_backtest()
HL.show_performace()
print(abs(HL.dmgt.df.direction).sum())
# 如果要保存回测结果到文件夹,取消注释以下代码
# HL.save_backtest()
datamanager.py
import pandas as pd
class DataManager:
def __init__(self, csv_path, date_col):
self.data = pd.read_csv(csv_path, parse_dates=[date_col], index_col=date_col)
self.data['t_plus'] = self.data.open.shift(-1)
self.data.dropna(inplace=True)
self.df = self.data.copy()
self.timeframe = '1min'
# 其他方法...
Higherlower.py
from backtesting.backtest import BackTestSA
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
class HigherLower(BackTestSA):
def __init__(self, csv_path, date_col, max_holding):
super().__init__(csv_path, date_col, max_holding)
def generate_signals(self):
df = self.dmgt.df
if df.set_index(date_col).at_time('09:00').reset_index():
print("worked")
# 其他生成信号的逻辑...
if __name__ == '__main__':
csv_path = "../data/cleaned_btc.csv"
date_col = 'timestamp'
max_holding = 8
HL = HigherLower(csv_path, date_col, max_holding)
HL.dmgt.change_resolution("120min")
HL.run_backtest()
HL.show_performace()
print(abs(HL.dmgt.df.direction).sum())
# 如果要保存回测结果到文件夹,取消注释以下代码
# HL.save_backtest()
df
timestamp,volume,open,low,high,close,t_plus
2018-08-13 12:00:00,1.24830802,6401.41,6390.0,6410.0,6390.0,6410.0
2018-08-13 14:00:00,325.93829282,6410.0,6215.82,6410.0,6283.5,6283.5
2018-08-13 16:00:00,16.79421917,6283.5,6102.5,6283.5,6102.5,6102.5
2018-08-13 18:00:00,247.51226331,6102.5,6102.5,6256.5,6218.5,6218.5
2018-08-13 20:00:00,0.01279864,6218.5,6218.5,6274.0,6235.0,6235.0
2018-08-13 22:00:00,0.07697618,6235.0,6200.0,6245.0,6245.0,6245.0
2018-08-14 00:00:00,9.44041963,6245.0,6000.0,6245.0,6000.0,6000.0
2018-08-14 02:00:00,3.52885021,6000.0,5878.5,6000.0,5948.0,5949.0
2018-08-14 04:00:00,33.85111565,5949.0,5934.0,5994.0,5952.0,5949
英文:
I have a few classes for backtesting trading timeseries data in DataFrame format and I would like to filter the data frame by the 'timestamp' column.
I tried this If
statement and expected it to print "worked" to the terminal:
if df.set_index(date_col).at_time('09:00').reset_index():
print("worked")
And received the following KeyError:
Traceback (most recent call last):
File "C:/Users/danra/dev/pycharm_projects/Deribit/backtesting/strategies/HigherLower.py", line 86, in <module>
HL.run_backtest()
File "C:/Users/danra/dev/pycharm_projects/Deribit/backtesting/strategies/HigherLower.py", line 34, in run_backtest
self.generate_signals()
File "C:/Users/danra/dev/pycharm_projects/Deribit/backtesting/strategies/HigherLower.py", line 14, in generate_signals
if df.set_index(date_col).at_time('09:00').reset_index():
File "C:\Users\danra\anaconda3\lib\site-packages\pandas\util\_decorators.py", line 311, in wrapper
return func(*args, **kwargs)
File "C:\Users\danra\anaconda3\lib\site-packages\pandas\core\frame.py", line 5446, in set_index
raise KeyError(f"None of {missing} are in the columns")
KeyError: "None of ['timestamp'] are in the columns"
Process finished with exit code 1
I have tried changing the dictionary key/values in the datamanager.py file but received the same KeyError.
These are the three modules in use.
backtest.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from backtesting.datamanager import DataManager
class BackTestSA:
"""
backtesting class for all single asset strategies,
columns must include the following :
close: float
timestamp: date
"""
def __init__(self, csv_path, date_col, max_holding):
self.dmgt = DataManager(csv_path, date_col)
# trade variables
self.open_pos = False
self.entry_price = None
self.direction = None
self.target_price = None
self.stop_price = None
# vertical barrier variable
self.max_holding = max_holding
self.max_holding_limit = max_holding
# barrier multipliers
self.ub_mult = 1.005
self.lb_mult = 0.995
# special case of vertical barrier
self.end_date = self.dmgt.df.index.values[-1]
self.returns_series = []
self.holding_series = []
self.direction_series = []
def open_long(self, price):
"""
:param price: price we open long at
:return: populates trade variables from constructor with relevant
variables
"""
self.open_pos = True
self.direction = 1
self.entry_price = price
self.target_price = price * self.ub_mult
self.stop_price = price * self.lb_mult
self.add_zeros()
def open_short(self, price):
"""
:param price: price we open short at
:return: populates trade variables from constructor with relevant
variables
"""
self.open_pos = True
self.direction = -1
self.entry_price = price
self.target_price = price * self.lb_mult
self.stop_price = price * self.ub_mult
self.add_zeros()
def reset_variables(self):
"""
resets the variables after we close a trade
"""
self.open_pos = False
self.entry_price = None
self.direction = None
self.target_price = None
self.stop_price = None
self.max_holding = self.max_holding_limit
def add_zeros(self):
self.returns_series.append(0)
self.holding_series.append(0)
self.direction_series.append(0)
def close_position(self, price):
"""
:param price: price we are exiting trade at
:return: appends the trade pnl to the returns series
and resets variables
"""
pnl = (price / self.entry_price - 1) * self.direction
self.process_close_var(pnl)
self.reset_variables()
def process_close_var(self, pnl):
self.returns_series.append(pnl)
self.direction_series.append(self.direction)
holding = self.max_holding_limit - self.max_holding
self.holding_series.append(holding)
def generate_signals(self):
"""
use this function to make sure generate signals has been included in the child class
"""
if 'entry' not in self.dmgt.df.columns:
raise Exception('You have not created signals yet')
def monitor_open_positions(self, price, timestamp):
# check upper horizontal barrier for long positions
if price >= self.target_price and self.direction == 1:
self.close_position(price)
# check lower horizontal barrier for long positions
elif price <= self.stop_price and self.direction == 1:
self.close_position(price)
# check lower horizontal barrier for short positions
elif price <= self.target_price and self.direction == -1:
self.close_position(price)
# check upper horizontal barrier for short positions
elif price >= self.stop_price and self.direction == -1:
self.close_position(price)
# cehck special case of vertical barrier
elif timestamp == self.end_date:
self.close_position(price)
# check vertical barrier
elif self.max_holding <= 0:
self.close_position(price)
# if all above conditions not true, decrement max holding by 1 and
# append a zero to returns column
else:
self.max_holding = self.max_holding - 1
self.add_zeros()
def add_trade_cols(self):
"""
merges the new columns we created for our backtest into our dataframe,
also resets the returns series to empty lists, incase we want to change
the strategy heartbeat.
"""
self.dmgt.df['returns'] = self.returns_series
self.dmgt.df['holding'] = self.holding_series
self.dmgt.df['direction'] = self.direction_series
self.returns_series = []
self.holding_series = []
self.direction_series = []
def run_backtest(self):
# signals generated from child class
self.generate_signals()
# loop over dataframe
for row in self.dmgt.df.itertuples():
# if we get a long signal and do not have open position open a long
if row.entry == 1 and self.open_pos is False:
self.open_long(row.t_plus)
# if we get a short position and do not have open position open a sort
elif row.entry == -1 and self.open_pos is False:
self.open_short(row.t_plus)
# monitor open positions to see if any of the barriers have been touched, see function above
elif self.open_pos:
self.monitor_open_positions(row.close, row.Index)
else:
self.add_zeros()
self.add_trade_cols()
def show_performace(self):
plt.style.use('ggplot')
self.dmgt.df.returns.cumsum().plot()
plt.title(f"Strategy results for {self.dmgt.timeframe} timeframe")
plt.show()
def save_backtest(self, instrument):
"""
:param instrument: ETH, BTC for Ethereum and Bitcoin
saves backtest to our backtests folder
"""
strat_name = self.__class__.__name__
tf = self.dmgt.timeframe
self.dmgt.df.to_csv(f"../data/backtests/{strat_name}_{tf}-{instrument}.csv")
datamanager.py
import pandas as pd
class DataManager:
def __init__(self, csv_path, date_col):
self.data = pd.read_csv(csv_path, parse_dates=[date_col],
index_col=date_col)
# Can use uniform to change this.
self.data['t_plus'] = self.data.open.shift(-1)
self.data.dropna(inplace=True)
self.df = self.data.copy()
self.timeframe = '1min'
def change_resolution(self, new_timeframe):
resample_dict = {'volume': 'sum', 'open': 'first',
'low': 'min', 'high': 'max',
'close': 'last',
't_plus': 'last'}
self.df = self.data.resample(new_timeframe).agg(resample_dict)
self.timeframe = new_timeframe
Higherlower.py
from backtesting.backtest import BackTestSA
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
class HigherLower(BackTestSA):
def __init__(self, csv_path, date_col, max_holding):
super().__init__(csv_path, date_col, max_holding)
def generate_signals(self):
df = self.dmgt.df
if df.set_index(date_col).at_time('09:00').reset_index():
print("worked")
df['longs'] = ((df.high > df.high.shift(1)) & (
df.high.shift(1) > df.high.shift(2))
& (df.close.shift(2) > df.high.shift(3))) * 1
df['shorts'] = ((df.low < df.low.shift(1)) & (
df.low.shift(1) < df.low.shift(2))
& (df.close.shift(2) < df.low.shift(3))) * -1
df['entry'] = df.shorts + df.longs
df.dropna(inplace=True)
def show_performace(self):
plt.style.use('ggplot')
self.dmgt.df.returns.cumsum().plot()
plt.title(f"Strategy results for {self.dmgt.timeframe} timeframe")
plt.show()
def run_backtest(self):
self.generate_signals()
for row in self.dmgt.df.itertuples():
if row.entry == 1:
# adding logic for dymanic barriers
if self.open_pos is False:
self.open_long(row.t_plus)
else:
self.target_price = self.target_price * self.ub_mult
self.max_holding = self.max_holding + int(
(self.max_holding_limit / 3))
self.add_zeros()
elif row.entry == -1:
# adding logic for dymanic barriers
if self.open_pos is False:
self.open_short(row.t_plus)
else:
self.target_price = self.target_price * self.lb_mult
self.max_holding = self.max_holding + int(
(self.max_holding_limit / 3))
self.add_zeros()
elif self.open_pos:
self.monitor_open_positions(row.close, row.Index)
else:
self.add_zeros()
self.add_trade_cols()
def show_performace(self):
plt.style.use('ggplot')
self.dmgt.df.returns.cumsum().plot()
plt.title(f"Strategy results for {self.dmgt.timeframe} timeframe")
plt.show()
def save_backtest(self):
'''
saves backtest to csv for further inspection
'''
strat_name = self.__class__.__name__
tf = self.dmgt.timeframe
self.dmgt.df.to_csv(f"../data/backtests/{strat_name}_{tf}.csv")
if __name__ == '__main__':
csv_path = "../data/cleaned_btc.csv"
date_col = 'timestamp'
max_holding = 8
HL = HigherLower(csv_path, date_col, max_holding)
HL.dmgt.change_resolution("120min")
HL.run_backtest()
HL.show_performace()
# 1049 trades with fixed stops and targets 981
print(abs(HL.dmgt.df.direction).sum())
# uncomment if you wish to save the backtest to folder
HL.save_backtest()
df
timestamp,volume,open,low,high,close,t_plus
2018-08-13 12:00:00,1.24830802,6401.41,6390.0,6410.0,6390.0,6410.0
2018-08-13 14:00:00,325.93829282,6410.0,6215.82,6410.0,6283.5,6283.5
2018-08-13 16:00:00,16.79421917,6283.5,6102.5,6283.5,6102.5,6102.5
2018-08-13 18:00:00,247.51226331,6102.5,6102.5,6256.5,6218.5,6218.5
2018-08-13 20:00:00,0.01279864,6218.5,6218.5,6274.0,6235.0,6235.0
2018-08-13 22:00:00,0.07697618,6235.0,6200.0,6245.0,6245.0,6245.0
2018-08-14 00:00:00,9.44041963,6245.0,6000.0,6245.0,6000.0,6000.0
2018-08-14 02:00:00,3.52885021,6000.0,5878.5,6000.0,5948.0,5949.0
2018-08-14 04:00:00,33.85111565,5949.0,5934.0,5994.0,5952.0,5949.5
2018-08-14 06:00:00,1.23511251,5949.5,5912.0,6040.0,6013.5,6009.0
2018-08-14 08:00:00,0.50387477,6009.0,5981.0,6046.0,6030.0,6030.0
答案1
得分: 0
看到回溯信息,问题不在于 at_time()
调用,而是在于 .set_index()
调用。
这是因为当你加载数据时,已经将时间戳列设置为索引。
self.data = pd.read_csv(csv_path, parse_dates=[date_col], index_col=date_col)
所以当你执行 df.set_index(date_col)
时会出错(因为 timestamp 列不存在)。
at_time()
函数是正常工作的:
df = pd.read_csv(csv_path, parse_dates=[date_col], index_col=date_col)
df.at_time("08:00:00")
英文:
Looking at the traceback, the problem is not in the at_time()
call but on the .set_index()
call.
This happens because when you load data you are already setting the timestamp column as the index.
self.data = pd.read_csv(csv_path, parse_dates=[date_col], index_col=date_col)
so when you do df.set_index(date_col)
you are getting an error (since the timestamp column does not exist).
The at_time()
function is working properly:
df = pd.read_csv(csv_path, parse_dates=[date_col], index_col=date_col)
df.at_time("08:00:00")
答案2
得分: 0
让我们假设我们有一个包含你提供的数据的数据框:
import pandas as pd
df = pd.DataFrame({'timestamp':['2018-08-13 12:00:00', '2018-08-13 14:00:00', '2018-08-13 16:00:00', '2018-08-13 18:00:00', '2018-08-13 20:00:00', '2018-08-13 22:00:00', '2018-08-14 00:00:00', '2018-08-14 02:00:00', '2018-08-14 04:00:00', '2018-08-14 06:00:00', '2018-08-14 08:00:00'],
'volume': [1.24830802, 325.938292, 16.7942191, 247.512263, 0.01279864, 0.07697618, 9.44041963, 3.52885021, 33.8511156, 1.23511251, 0.50387477],
'open':[6401.4, 6410.0, 6283.5, 6102.5, 6218.5, 6235.0, 6245.0, 6000.0, 5949.0, 5949.5, 6009.0],
'low':[6390.0, 6215.82, 6102.5, 6102.5, 6218.5, 6200.0, 6000.0, 5878.5, 5934.0, 5912.0, 5981.0],
'high':[6410.0, 6410.0, 6283.5, 6256.5, 6274.0, 6245.0, 6245.0, 6000.0, 5994.0, 6040.0, 6046.0],
'close':[6390.0, 6283.5, 6102.5, 6218.5, 6235.0, 6245.0, 6000.0, 5948.0, 5952.0, 6013.5, 6030.0],
't_plus':[6410.0, 6283.5, 6102.5, 6218.5, 6235.0, 6245.0, 6000.0, 5949.0, 5949.5, 6009.0, 6030.0]})
正如你所看到的,timestamp列中的数据被转换为字符串(否则会引发SyntaxError)。这是我们需要处理的第一个问题:timestamp列没有正确的时间戳格式,它是一个字符串。
然后,我们想要根据timestamp列的值筛选数据框的行。我们可以这样做:
df['timestamp'] == '12:00'
这将返回一个包含该语句的真值的Series。在这种情况下,结果是:
0 False
1 False
2 False
3 False
4 False
5 False
6 False
7 False
8 False
9 False
10 False
Name: timestamp, dtype: bool
正如我们所看到的,每一行都是False。如果我们想要在某些行中得到True,我们应该这样做:
df['timestamp'] == '2018-08-13 12:00:00'
这将返回:
0 True
1 False
2 False
3 False
4 False
5 False
6 False
7 False
8 False
9 False
10 False
Name: timestamp, dtype: bool
因为我只是复制了第一行的值到等式中,所以这是有道理的。
如果我们只想检查小时部分,我们可以做两件事:
- 使用正则表达式,我将留下一个教程链接。
- 将timestamp列分割成年、月、日、小时等部分。为此,我们需要将字符串转换为datetime,然后我们可以利用Python中的datetime对象的不同方法。Pandas具有pd.to_datetime()来帮助处理这个问题。
英文:
Let's say we have a dataframe with the data you provided:
import pandas as pd
df = pd.DataFrame({'timestamp':['2018-08-13 12:00:00', '2018-08-13 14:00:00', '2018-08-13 16:00:00', '2018-08-13 18:00:00', '2018-08-13 20:00:00', '2018-08-13 22:00:00', '2018-08-14 00:00:00', '2018-08-14 02:00:00', '2018-08-14 04:00:00', '2018-08-14 06:00:00', '2018-08-14 08:00:00'],
'volume': [1.24830802, 325.938292, 16.7942191, 247.512263, 0.01279864, 0.07697618, 9.44041963, 3.52885021, 33.8511156, 1.23511251, 0.50387477],
'open':[6401.4, 6410.0, 6283.5, 6102.5, 6218.5, 6235.0, 6245.0, 6000.0, 5949.0, 5949.5, 6009.0],
'low':[6390.0, 6215.82, 6102.5, 6102.5, 6218.5, 6200.0, 6000.0, 5878.5, 5934.0, 5912.0, 5981.0],
'high':[6410.0, 6410.0, 6283.5, 6256.5, 6274.0, 6245.0, 6245.0, 6000.0, 5994.0, 6040.0, 6046.0],
'close':[6390.0, 6283.5, 6102.5, 6218.5, 6235.0, 6245.0, 6000.0, 5948.0, 5952.0, 6013.5, 6030.0],
't_plus':[6410.0, 6283.5, 6102.5, 6218.5, 6235.0, 6245.0, 6000.0, 5949.0, 5949.5, 6009.0, 6030.0]})
As you can see, the data in the timestamp column is converted to string (otherwise it throws a SyntaxError). That's the first trick we have to deal with: the timestamp column does not have a proper timestamp format, it's a string.
Then, we want to filter the rows of the dataframe based on the value of the timestamp column. We can do
df['timestamp'] == '12:00'
which returns a Series with the truth of that statement. In this case, the result is:
0 False
1 False
2 False
3 False
4 False
5 False
6 False
7 False
8 False
9 False
10 False
Name: timestamp, dtype: bool
As we can see, it's false in every row. to get it to be true in some rows, we should do:
df['timestamp'] == '2018-08-13 12:00:00'
which returns:
0 True
1 False
2 False
3 False
4 False
5 False
6 False
7 False
8 False
9 False
10 False
Name: timestamp, dtype: bool
As I just copied the first row's value into the equality, this makes sense.
If we wanted to check only by the hour, we can do 2 things:
- Use Regular Expressions, I'll leave a starting tutorial here
- Separate the timestamp column into year, month, day, hour, ... For that, we need to convert the string into a datetime and then we can leverage the different methods python has for datetime objects. Pandas has pd.to_datetime() to help with that.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论