英文:
In Python: How can I merge these two dataframes without running into Memory Error?
问题
我已经理解你的要求,以下是你要的翻译:
# 将两个数据框按照特定方式合并,避免内存错误和长时间运行的问题
# event_df 包含两个ID(ID_1和ID_2)、时间戳以及事件。可能会有多个ID_2映射到一个ID_1,
# 也可能会有多个ID_1和ID_2映射到一个时间戳。ID_1、ID_2和时间戳都可能在各自的列中重复。
# 原始数据框有8,732,787行。
event_df = {'ID_1': [1, 1, 1, 2, 3],
'ID_2': [1, 1, 2, 1, 1],
'Timestamp': ['1984-05-11 14:30:00',
'1984-05-11 15:30:00',
'1990-12-11 09:10:00',
'1975-01-08 23:23:00',
'1984-05-11 14:30:00'],
'Event': [0, 1, 0, 1, 1]}
event_df = pd.DataFrame(event_df)
event_df['Timestamp'] = event_df['Timestamp'].astype('datetime64展开收缩')
# lab_df 包含相同的ID以及另一个时间戳,表示采集血样的时间。
# 分析的血液参数也是列(原始数据框有22个参数,这里仅为示例选择了3个)。
# 每个样本中都有一些缺失值。原始数据框有328,340行。
lab_df = {'ID_1': [1, 1, 1, 2, 3],
'ID_2': [1, 1, 1, 1, 1],
'Timestamp_Lab': ['1984-05-11 14:00:00',
'1984-05-11 14:15:00',
'1984-05-11 15:00:00',
'1975-01-08 20:00:00',
'1984-05-11 14:00:00'],
'Hemoglobin': [np.nan, 14, 13, 10, 11],
'Leukocytes': [123, np.nan, 123, 50, 110],
'Platelets': [50, 50, 50, 110, 50]}
lab_df = pd.DataFrame(lab_df)
lab_df['Timestamp_Lab'] = lab_df['Timestamp_Lab'].astype('datetime64展开收缩')
# 目标是将dataframe 2的参数添加到dataframe 1,
# 如果血样的时间戳(dataframe 2)早于dataframe 1的时间戳。
# 如果同一类型的参数中有缺失值,它们应该向上移动一个位置。
# 如果列完全为空,将其删除。还可以筛选空行。
# 结果应如下所示:
result = {'ID_1': [1, 1, 1, 2, 3],
'ID_2': [1, 1, 2, 1, 1],
'Timestamp': ['1984-05-11 14:30:00',
'1984-05-11 15:30:00',
'1990-12-11 09:10:00',
'1975-01-08 23:23:00',
'1984-05-11 14:30:00'],
'Event': [0, 1, 0, 1, 1],
'Hemoglobin_1': [14, 14, np.nan, 10, 11],
'Hemoglobin_2': [np.nan, 13, np.nan, np.nan, np.nan],
'Leukocytes_1': [123, 123, np.nan, 50, 110],
'Leukocytes_2': [np.nan, 123, np.nan, np.nan, np.nan],
'Platelets_1': [50, 50, np.nan, 110, 50],
'Platelets_2': [50, 50, np.nan, np.nan, np.nan],
'Platelets_3': [np.nan, 50, np.nan, np.nan, np.nan]}
result = pd.DataFrame(result)
# 你是否有建议如何将dataframe 2的参数添加到dataframe 1,同时避免内存错误?
# 我尝试过使用“outer merge”合并两个数据框,然后使用布尔索引来筛选包含实验室数据的行,
# 这些行在event_df的时间戳之后。之后,我将数据框的参数扩展以使每行对应于event_df的行。
# 这适用于较小的数据框,但对于原始数据框会导致内存错误。我尝试了分块读取event_df,
# 合并它,然后分块写入csv。这个过程大约需要8小时,并生成一个130GB的csv文件。
# 在之后将合并的数据框扩展时,我也遇到了问题。我想知道是否有更优雅或更高效的方法来实现这一点。
希望这有助于解决你的问题。如果有其他问题或需要进一步的帮助,请告诉我。
英文:
I have two dataframes that I want to combine with each other in a specific way (see below). Both dataframes are somewhat large and I had problems with Memory Error, as well as programs running for a very long time.
The first dataframe contains two IDs (ID_1 and ID_2) and a timestamp, as well as an event. There can be multiple ID_2 mapping to one ID_1. There can also be multiple ID_1 and ID_2 mapping to a timestamp. ID_1, ID_2 and the timestamp all have duplicates in their respective column. The original dataframe has a length of 8,732,787 rows.
event_df = {'ID_1': [1, 1, 1, 2, 3],
'ID_2': [1, 1, 2, 1, 1],
'Timestamp': ['1984-05-11 14:30:00',
'1984-05-11 15:30:00',
'1990-12-11 09:10:00',
'1975-01-08 23:23:00',
'1984-05-11 14:30:00'],
'Event': [0, 1, 0, 1, 1]
}
event_df = pd.DataFrame(event_df)
event_df['Timestamp'] = event_df['Timestamp'].astype('datetime64展开收缩')
The second dataframe contains the same IDs and also a timestamp. This timestamp is different from the one of the first dataframe. It indicates at what time a blood sample was taken. The analysed blood parameters are columns as well (in the original dataframe 22 parameters, in this case for example purposes only 3). There are some missing values in each sample. The original dataframe has a length of 328,340 rows.
lab_df = {'ID_1': [1, 1, 1, 2, 3],
'ID_2': [1, 1, 1, 1, 1],
'Timestamp_Lab': ['1984-05-11 14:00:00',
'1984-05-11 14:15:00',
'1984-05-11 15:00:00',
'1975-01-08 20:00:00',
'1984-05-11 14:00:00'],
'Hemoglobin': [np.nan, 14, 13, 10, 11],
'Leukocytes': [123, np.nan, 123, 50, 110],
'Platelets': [50, 50, 50, 110, 50]
}
lab_df = pd.DataFrame(lab_df)
lab_df['Timestamp_Lab'] = lab_df['Timestamp_Lab'].astype('datetime64展开收缩')
My goal is to add the parameters of dataframe 2 to dataframe 1, if the timestamp of the blood sample (dataframe 2) is earlier than that of the timestamp from dataframe 1. If there are missing values in between multiple parameters of the same type they should move one position up. If a column is completely empty it is dropped. Empty rows can be filtered as well. It should look something like this:
result = {'ID_1': [1, 1, 1, 2, 3],
'ID_2': [1, 1, 2, 1, 1],
'Timestamp': ['1984-05-11 14:30:00',
'1984-05-11 15:30:00',
'1990-12-11 09:10:00',
'1975-01-08 23:23:00',
'1984-05-11 14:30:00'],
'Event': [0, 1, 0, 1, 1],
'Hemoglobin_1': [14, 14, np.nan, 10, 11],
'Hemoglobin_2': [np.nan, 13, np.nan, np.nan, np.nan],
'Leukocytes_1': [123, 123, np.nan, 50, 110],
'Leukocytes_2': [np.nan, 123, np.nan, np.nan, np.nan],
'Platelets_1': [50, 50, np.nan, 110, 50],
'Platelets_2': [50, 50, np.nan, np.nan, np.nan],
'Platelets_3': [np.nan, 50, np.nan, np.nan, np.nan]
}
result = pd.DataFrame(result)
Do you have any suggestions how I could add the parameters of dataframe 2 to dataframe 1, while at the same time using an approach that doesn't lead to a memory error?
I tried merging the two dataframes with an 'outer merge' and then afterwards used boolean indexing to filter the rows with the laboratory data that lie after the timestamp of the event_df. After that I pivoted the parameters of the dataframe wider so that each row corresponds to the rows of the event_df again. This works for smaller dataframes, but produces a memory error for the original dataframe. I then tried to read in the event_df chunkwise, merge it and afterwards wrote it chunkwise into a csv. This process took about 8 hours and produced a 130GB csv file. I also ran into trouble with this approach when pivoting the merged dataframe wider afterwards. I was wondering if there was a more elegant or efficient way to do this.
答案1
得分: 1
这是在处理大型数据集时使用pandas时的主要问题,尤其是在类似于"Cross-Join"情况下应用外部合并时,OOM(内存耗尽)错误非常常见。
最好的方法是远离Python + Pandas,转向分布式计算框架,如Databricks/Spark,这样你可以利用集群并行计算、内存管理、Delta Lake优化和列式数据格式,从而显著加速查询。然而,如果你不能使用这些框架,我建议以下几点:
- 尝试使用datetime而不是datetime64,或转换为小型数据格式: 默认情况下,pandas不会分配最有效的数据类型。将数据转换为较小的类型将释放更多内存用于合并数据。
- 将数据处理成小批次并在合并后进行检查点处理: 你可以按月份分割工作负载,减小数据规模,并在合并操作后对中间结果进行检查点处理,以释放大量内存。通常情况下,我会说,对于处理数据来说,"机枪比火箭炮更好"。
- 使用ThreadPoolExecutor进行并行执行: 你可以按月份读取数据,然后在每周内并行触发合并操作。
- 将逻辑拆分为小步骤: 显然,通过执行外部合并,你一次性完成了所有逻辑。你可以尝试在每个数据帧中创建将每个血液信息(例如1血红蛋白、2血小板等)映射到索引的索引,然后只与
ID_1、ID_2、DateTime和索引
执行外部合并,应用与之前创建的索引相符的索引过滤器,进行检查点处理,最后连接回以前创建的数据帧1或数据帧2中需要的所有列。这是一个额外的步骤,但最终可以实现更精简的工作流程。 - 避免使用CSV进行中间数据的检查点处理: CSV不是列式压缩格式,如果可能的话,尝试使用Parquet来执行所有处理。
- 尝试使用Polars进行更高效的数据处理: Pola.rs可以使pandas操作更高效,因为它利用了PyArrow。你可以查看https://www.pola.rs/以获取更多信息。
其他有用的链接:
- https://www.geeksforgeeks.org/pandas-memory-management/
- https://medium.com/bigdatarepublic/advanced-pandas-optimize-speed-and-memory-a654b53be6c2
英文:
This is the main issue when using pandas for large datasets, the OOM (out of memory) errors are really common especially in a "Cross-Join" like scenario by applying the outer merge in your case.
The best thing would be to move away from Python + Pandas to a distributed computing framework like Databricks/Spark where you would leverage cluster parallel computing, memory management, delta lake optimizations, and columnar data format which would speed up your query certainly. However, if you can't use any of those, I'd suggest the following:
- Try using datetime instead datetime64 or convert to a small-size data format: By default, pandas doesn't allocate the most efficient data. Casting to a smaller type will free up more memory when merging data.
- Process your data into mini-batches and checkpoint the results after merge You can split your workloads by months to reduce your data universe, and checkpoint the intermediary results in order to free up the massive memory usage. Normally I say, for processing data "it's better a machine gun than a bazooka".
- Apply ThreadPoolExecutor for parallel execution: You can read your data per month and trigger the merge operation in parallel per week.
- Reduce the logic into small steps: Apparently, by doing the outer merge you are doing all the logic at once. You can try creating indexes in each data frame that maps each blood info (e.g. 1 hemoglobin, 2 platelets etc) then perform the outer merge only with the
ID_1, ID_2, DateTime and the indexes
, apply the index filter as you did, checkpoint the results, and finally join back to pick up all the columns you need from dataframe 1 or dataframe 2 according to the index created in the beginning. This is an extra step but could leverage to a more lean workflow in the end. - Avoid CSV for checkpointing your intermediary data: CSV is not column-wise compressed format, so try using parquet to perform all processing if possible.
- Try using Polars for more efficient data processing: Pola.rs makes pandas operations more efficient given it leverages the PyArrow https://www.pola.rs/
Other nice links:
答案2
得分: 1
以下是使用[tag:python-polars]中提到的可能方法的翻译。
它使用.from_pandas()
从现有的pandas数据框加载数据,但理想情况下,您应该根据数据所在的位置使用正确的惰性加载方法,例如对于CSV文件,使用pl.scan_csv()
。
如果输出不要求使用CSV,考虑使用parquet代替:out.sink_parquet()
import polars as pl
df = (
pl.from_pandas(event_df).lazy().with_row_count()
.join(
pl.from_pandas(lab_df).lazy(),
on = ['ID_1', 'ID_2'],
how = 'left',
)
)
cols = ['Hemoglobin', 'Leukocytes', 'Platelets']
out = (
df.with_columns(
pl.when(pl.col('Timestamp') > pl.col('Timestamp_Lab')).then(
pl.col('Timestamp_Lab', *cols)
)
)
.with_columns(
pl.when(pl.when(pl.all() == None).then(True).forward_fill().over('row_nr')) # shift everything after first null
.then(pl.all().shift(-1).over('row_nr'))
.otherwise(pl.all())
)
.groupby('row_nr', maintain_order=True)
.agg(
pl.col('ID_1', 'ID_2', 'Timestamp', 'Event').first(),
*cols
)
.with_columns(
pl.col(col).list.to_struct(
n_field_strategy = 'max_width',
fields = lambda idx, col=col: f'{col}_{idx + 1}',
)
for col in cols
)
.unnest(*cols)
.drop('row_nr')
)
# out.sink_parquet('output.parquet')
print(out.collect())
形状: (5, 13)
┌──────┬──────┬─────────────────────┬───────┬──────────────┬──────────────┬──────────────┬──────────────┬──────────────┬──────────────┬─────────────┬─────────────┬─────────────┐
│ ID_1 ┆ ID_2 ┆ Timestamp ┆ Event ┆ Hemoglobin_1 ┆ Hemoglobin_2 ┆ Hemoglobin_3 ┆ Leukocytes_1 ┆ Leukocytes_2 ┆ Leukocytes_3 ┆ Platelets_1 ┆ Platelets_2 ┆ Platelets_3 │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ datetime[ns] ┆ i64 ┆ f64 ┆ f64 ┆ f64 ┆ f64 ┆ f64 ┆ f64 ┆ i64 ┆ i64 ┆ i64 │
╞══════╪══════╪═════════════════════╪═══════╪══════════════╪══════════════╪══════════════╪══════════════╪══════════════╪══════════════╪═════════════╪═════════════╪═════════════╡
│ 1 ┆ 1 ┆ 1984-05-11 14:30:00 ┆ 0 ┆ 14.0 ┆ null ┆ null ┆ 123.0 ┆ null ┆ null ┆ 50 ┆ 50 ┆ null │
│ 1 ┆ 1 ┆ 1984-05-11 15:30:00 ┆ 1 ┆ 14.0 ┆ 13.0 ┆ null ┆ 123.0 ┆ 123.0 ┆ null ┆ 50 ┆ 50 ┆ 50 │
│ 1 ┆ 2 ┆ 1990-12-11 09:10:00 ┆ 0 ┆ null ┆ null ┆ null ┆ null ┆ null ┆ null ┆ null ┆ null ┆ null │
│ 2 ┆ 1 ┆ 1975-01-08 23:23:00 ┆ 1 ┆ 10.0 ┆ null ┆ null ┆ 50.0 ┆ null ┆ null ┆ 110 ┆ null ┆ null │
│ 3 ┆ 1 ┆ 1984-05-11 14:30:00 ┆ 1 ┆ 11.0 ┆ null ┆ null ┆ 110.0 ┆ null ┆ null ┆ 50 ┆ null ┆ null │
└──────┴──────┴─────────────────────┴───────┴──────────────┴──────────────┴──────────────┴──────────────┴──────────────┴──────────────┴─────────────┴─────────────┴─────────────┘
英文:
Here's a possible approach using [tag:python-polars] as mentioned in another answer.
It's using .from_pandas()
to load the data from your existing pandas dataframes, however ideally you'd use the correct lazy loading method depending on where your data lives e.g. pl.scan_csv()
for CSV files.
If CSV is not a requirement for the output, consider using parquet instead: out.sink_parquet()
import polars as pl
df = (
pl.from_pandas(event_df).lazy().with_row_count()
.join(
pl.from_pandas(lab_df).lazy(),
on = ['ID_1', 'ID_2'],
how = 'left',
)
)
cols = ['Hemoglobin', 'Leukocytes', 'Platelets']
out = (
df.with_columns(
pl.when(pl.col('Timestamp') > pl.col('Timestamp_Lab')).then(
pl.col('Timestamp_Lab', *cols)
)
)
.with_columns(
pl.when(pl.when(pl.all() == None).then(True).forward_fill().over('row_nr')) # shift everything after first null
.then(pl.all().shift(-1).over('row_nr'))
.otherwise(pl.all())
)
.groupby('row_nr', maintain_order=True)
.agg(
pl.col('ID_1', 'ID_2', 'Timestamp', 'Event').first(),
*cols
)
.with_columns(
pl.col(col).list.to_struct(
n_field_strategy = 'max_width',
fields = lambda idx, col=col: f'{col}_{idx + 1}',
)
for col in cols
)
.unnest(*cols)
.drop('row_nr')
)
# out.sink_parquet('output.parquet')
print(out.collect())
shape: (5, 13)
┌──────┬──────┬─────────────────────┬───────┬──────────────┬──────────────┬──────────────┬──────────────┬──────────────┬──────────────┬─────────────┬─────────────┬─────────────┐
│ ID_1 ┆ ID_2 ┆ Timestamp ┆ Event ┆ Hemoglobin_1 ┆ Hemoglobin_2 ┆ Hemoglobin_3 ┆ Leukocytes_1 ┆ Leukocytes_2 ┆ Leukocytes_3 ┆ Platelets_1 ┆ Platelets_2 ┆ Platelets_3 │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ datetime[ns] ┆ i64 ┆ f64 ┆ f64 ┆ f64 ┆ f64 ┆ f64 ┆ f64 ┆ i64 ┆ i64 ┆ i64 │
╞══════╪══════╪═════════════════════╪═══════╪══════════════╪══════════════╪══════════════╪══════════════╪══════════════╪══════════════╪═════════════╪═════════════╪═════════════╡
│ 1 ┆ 1 ┆ 1984-05-11 14:30:00 ┆ 0 ┆ 14.0 ┆ null ┆ null ┆ 123.0 ┆ null ┆ null ┆ 50 ┆ 50 ┆ null │
│ 1 ┆ 1 ┆ 1984-05-11 15:30:00 ┆ 1 ┆ 14.0 ┆ 13.0 ┆ null ┆ 123.0 ┆ 123.0 ┆ null ┆ 50 ┆ 50 ┆ 50 │
│ 1 ┆ 2 ┆ 1990-12-11 09:10:00 ┆ 0 ┆ null ┆ null ┆ null ┆ null ┆ null ┆ null ┆ null ┆ null ┆ null │
│ 2 ┆ 1 ┆ 1975-01-08 23:23:00 ┆ 1 ┆ 10.0 ┆ null ┆ null ┆ 50.0 ┆ null ┆ null ┆ 110 ┆ null ┆ null │
│ 3 ┆ 1 ┆ 1984-05-11 14:30:00 ┆ 1 ┆ 11.0 ┆ null ┆ null ┆ 110.0 ┆ null ┆ null ┆ 50 ┆ null ┆ null │
└──────┴──────┴─────────────────────┴───────┴──────────────┴──────────────┴──────────────┴──────────────┴──────────────┴──────────────┴─────────────┴─────────────┴─────────────┘
答案3
得分: 0
我会使用 janitor
的 conditional_join
函数,然后进行数据重塑:
import janitor
cols = ['ID_1', 'ID_2', 'Timestamp', 'Event']
vals = ['Hemoglobin', 'Leukocytes', 'Platelets']
out = (event_df.reset_index()
.conditional_join(lab_df,
('ID_1', 'ID_1', '=='),
('ID_2', 'ID_2', '=='),
('Timestamp', 'Timestamp_Lab', '>=')
)
# 移除不需要的级别和重复的列
.droplevel(0, axis=1).loc[:,lambda d: ~d.columns.duplicated()]
# 重塑为长格式,将非NA值移到前面
.melt(['index']+cols, value_vars=vals).dropna(subset=['value'])
.assign(n=lambda d: d.groupby(['index', 'variable']).cumcount().add(1))
# 重塑为宽格式并展开多重索引
.pivot(index=['index']+cols, columns=['variable', 'n'], values='value')
.pipe(lambda d: d.set_axis(d.columns.map(lambda x: f'{x[0]}_{x[1]}'), axis=1))
.reset_index(cols)
# 恢复没有匹配的行
.pipe(lambda d: d.combine_first(event_df)[list(d)])
)
输出:
ID_1 ID_2 Timestamp Event Hemoglobin_1 Hemoglobin_2 Leukocytes_1 Leukocytes_2 Platelets_1 Platelets_2 Platelets_3
0 1 1 1984-05-11 14:30:00 0 14.0 NaN 123.0 NaN 50.0 50.0 NaN
1 1 1 1984-05-11 15:30:00 1 14.0 13.0 123.0 123.0 50.0 50.0 50.0
2 1 2 1990-12-11 09:10:00 0 NaN NaN NaN NaN NaN NaN NaN
3 2 1 1975-01-08 23:23:00 1 10.0 NaN 50.0 NaN 110.0 NaN NaN
4 3 1 1984-05-11 14:30:00 1 11.0 NaN 110.0 NaN 50.0 NaN NaN
请注意,这是一段Python代码,用于使用janitor
库中的conditional_join
函数进行数据操作和重塑,并生成所需的输出。
英文:
I would use janitor
's conditional_join
, then reshaping:
import janitor
cols = ['ID_1', 'ID_2', 'Timestamp', 'Event']
vals = ['Hemoglobin', 'Leukocytes', 'Platelets']
out = (event_df.reset_index()
.conditional_join(lab_df,
('ID_1', 'ID_1', '=='),
('ID_2', 'ID_2', '=='),
('Timestamp', 'Timestamp_Lab', '>=')
)
# remove unneeded level and duplicated columns
.droplevel(0, axis=1).loc[:,lambda d: ~d.columns.duplicated()]
# reshape to long form to push the non-NA at the beginning
.melt(['index']+cols, value_vars=vals).dropna(subset=['value'])
.assign(n=lambda d: d.groupby(['index', 'variable']).cumcount().add(1))
# reshape to wide form and flatten the MultiIndex
.pivot(index=['index']+cols, columns=['variable', 'n'], values='value')
.pipe(lambda d: d.set_axis(d.columns.map(lambda x: f'{x[0]}_{x[1]}'), axis=1))
.reset_index(cols)
# restore rows without a match
.pipe(lambda d: d.combine_first(event_df)[list(d)])
)
Output:
ID_1 ID_2 Timestamp Event Hemoglobin_1 Hemoglobin_2 Leukocytes_1 Leukocytes_2 Platelets_1 Platelets_2 Platelets_3
0 1 1 1984-05-11 14:30:00 0 14.0 NaN 123.0 NaN 50.0 50.0 NaN
1 1 1 1984-05-11 15:30:00 1 14.0 13.0 123.0 123.0 50.0 50.0 50.0
2 1 2 1990-12-11 09:10:00 0 NaN NaN NaN NaN NaN NaN NaN
3 2 1 1975-01-08 23:23:00 1 10.0 NaN 50.0 NaN 110.0 NaN NaN
4 3 1 1984-05-11 14:30:00 1 11.0 NaN 110.0 NaN 50.0 NaN NaN
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论