BufferError: memoryview has 1 exported buffer trying to close a shared memory where I put a dataframe of pyarrow data type values

huangapple go评论59阅读模式
英文:

BufferError: memoryview has 1 exported buffer trying to close a shared memory where I put a dataframe of pyarrow data type values

问题

我正在尝试将不同类型的数据框放入多进程共享内存中。当我使用包含Python类型的数据框时,没有问题,但是当我使用pyarrow类型时,关闭共享内存时出现问题。我有一个用于将数据框放入共享内存的函数,另一个用于接收这个数据框。当我在数据框中使用Arrow类型(如string[pyarrow])时,出现两个问题:

  • 当我在sm_put上调用close,并在接收数据框后调用sm_get上的close时,立即出现错误,导致代码停止执行:sm_get.close() -> self.close() -> self._buf.release() -> BufferError: memoryview has 1 exported buffer。
  • 当我在sm_put上调用close,但不调用sm_get上的close时,立即没有问题,但是过一段时间后会收到警告(这让我感到惊讶,因为我没有调用任何self.close()函数):self.close() -> self._buf.release() -> BufferError: memoryview has 1 exported buffer。

我用于将数据框放入共享内存和从共享内存接收数据框的函数如下:

def put_df(data, logTime, logSize, usingArrow=False):
    if(usingArrow):
        table = (pyarrow.Table.from_pandas(data)).combine_chunks()
        record_batch = table.to_batches(max_chunksize=sys.maxsize)[0]
    else:
        record_batch = pa.RecordBatch.from_pandas(data)  

    mock_sink = pa.MockOutputStream()
    stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema)
    stream_writer.write_batch(record_batch)
    stream_writer.close()
    data_size = mock_sink.size()
    
    sm_put = SharedMemory(create=True, size=data_size)
    buffer = pa.py_buffer(sm_put.buf)

    stream = pa.FixedSizeBufferWriter(buffer)
    stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
    stream_writer.write_batch(record_batch)
    stream_writer.close()

    del stream_writer
    del stream
    del buffer
    if(usingArrow):
        del table
    del record_batch

    sm_put.close()
    return sm_put.name 

def get_df(sm_get_name, logTime, logSize, usingArrow=False):
    sm_get = SharedMemory(name=sm_get_name, create=False)
    buffer = pa.BufferReader(sm_get.buf)

    reader = pa.RecordBatchStreamReader(buffer)
    record_batch = reader.read_next_batch()

    if(usingArrow):
        data = record_batch.to_pandas(types_mapper=pd.ArrowDtype)
    else:
        data = record_batch.to_pandas()
            
    del buffer
    del reader
    del record_batch

    sm_get.close()
    sm_get.unlink()

    return data

if __name__ == '__main__':
    names = ["0", "1"]
    types = {"0": "string[pyarrow]", "1": "string[pyarrow]"}
    df = pd.read_csv(open(file, "r"), index_col=False, header=None, names=names, dtype=types, engine="c")
    sm_get_name = put_df(df)
    data = get_df(sm_get_name=sm_get_name)

我注意到,当我尝试删除pyarrow对象(如table或record_batch)时,内存不会立即释放(我正在使用memory_profiler库进行检查)。这可能是问题所在,有人知道这是否可能是问题,以及如何解决它?

我正在使用pyarrow 11.0.0和pandas 1.5.3。出现的错误如下:
当我执行sm_put.close()和sm_get.close()时出现的错误
当我只执行sm_put.close()或不调用任何close()方法时出现的错误

使用pd.ArrowDtype在数据框中的内存使用情况概要让我认为可能存在导致BufferError的内存泄漏:
使用带有pd.ArrowDtype的数据框的内存使用概要

英文:

I'm trying to put dataframes of different types in multiprocessing shared memory. When I'm using dataframes that contain python types I've no problem, but when I use pyarrow types I've problems closing the Shared Memories. I've a function to put a dataframe to the Shared Memory and another to receive this dataframe. I have two problems when I'm using Arrow types like string[pyarrow] inside a dataframe:

  • When I call the close on the sm_put and after receiving the dataframe I call the close on the sm_get I get instantly an error, that causes the code to stop executing: sm_get.close() -> self.close() -> self._buf.release() -> BufferError: memoryview has 1 exported buffer.
  • When I call the close on the sm_put, and I don't call the close on the sm_get, I've no problem instantly, but I get a warning after a while (that surprises me because I don't call any function self.close()): self.close() -> self._buf.release() -> BufferError: memoryview has 1 exported buffer.

My functions to put the dataframe to the shared memory and to receive the dataframe from shared memory are these:

> def put_df(data, logTime, logSize, usingArrow=False):
>     if(usingArrow):
>         table = (pyarrow.Table.from_pandas(data)).combine_chunks()
>         record_batch = table.to_batches(max_chunksize = sys.maxsize)[0]
>     else:
>         record_batch = pa.RecordBatch.from_pandas(data)  
>     # Determine the size of buffer to request from the shared memory
>     mock_sink = pa.MockOutputStream()
>     stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema)
>     stream_writer.write_batch(record_batch)
>     stream_writer.close()
>     data_size = mock_sink.size()
>     
>     sm_put = SharedMemory(create=True, size=data_size)
>     buffer = pa.py_buffer(sm_put.buf)
> 
>     # Write the PyArrow RecordBatch to SharedMemory
>     stream = pa.FixedSizeBufferWriter(buffer)
>     stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
>     stream_writer.write_batch(record_batch)
>     stream_writer.close()
> 
>     del stream_writer
>     del stream
>     del buffer
>     if(usingArrow):
>         del table
>     del record_batch
> 
>     sm_put.close()
>     return sm_put.name 
> 
> def get_df(sm_get_name, logTime, logSize, usingArrow=False):
>     sm_get = SharedMemory(name = sm_get_name, create = False)
>     buffer = pa.BufferReader(sm_get.buf)
> 
>     # Convert object back into an Arrow RecordBatch
>     reader = pa.RecordBatchStreamReader(buffer)
>     record_batch = reader.read_next_batch()
> 
>     # Convert back into Pandas
>     if(usingArrow):
>         data = record_batch.to_pandas(types_mapper = pd.ArrowDtype)
>     else:
>         data = record_batch.to_pandas()
>            
>     del buffer
>     del reader
>     del record_batch
> 
>     sm_get.close()
>     sm_get.unlink()
> 
>     return data
> 
> if __name__ == '__main__':
>     names = [ "0", "1"]
>     types = { "0": "string[pyarrow]", "1": "string[pyarrow]" }
>     df = pd.read_csv(open(file, "r"), index_col=False, header=None, names=names, dtype=types, engine="c")
>     sm_get_name = put_df(df)
>     data = get_df(sm_get_name=sm_get_name)

I've seen that when I'm trying to del the pyarrow objects (like table or record_batch) the memory is not released instantly (I'm checking It with the memory_profiler library). This could be the problem, someone know If that can be the problem and how can I solve It?

I'm using pyarrow 11.0.0 and pandas 1.5.3

The errors are these:
Error appeared when I do sm_put.close() and sm_get.close()
Error appeared when I only do the sm_put.close() or I don't call any close() method

The profile of memory when I'm using pd.ArrowDtype inside dataframes makes me think about memory leaks that can cause the BufferError:

Profile of memory using dataframes with pd.ArrowDtpe

答案1

得分: 1

我无法完全复制您的示例。我的理解是 put_df 正在执行正确的操作。但是在 get_df 中,record_batch 保持了对缓冲区/共享内存的视图(以避免复制数据)。

这意味着在关闭共享内存之前,您不应该保留对 record_batch(和 reader)的引用。

我成功让它工作了,但与您的解决方案没有太大区别。我只是确保了 RecordBatchStreamReader 被关闭。

def get_df(sm_get_name):
    sm_get = SharedMemory(name=sm_get_name, create=False)
    buffer = pa.BufferReader(sm_get.buf)

    with pa.RecordBatchStreamReader(buffer) as reader:
        record_batch = reader.read_next_batch()
        df = record_batch.to_pandas()
        del record_batch
    del reader
    del buffer

    sm_get.close()
    sm_get.unlink()

    return df

也有可能使用 pd.ArrowDtype 不起作用,因为这意味着共享内存数据根本没有复制。而标准的 to_pandas 实现会将数据从 Arrow 复制到 numpy 格式。

PS:

内存不会立即释放(我正在使用 memory_profiler 库进行检查)

Python 不会立即将已释放的内存释放给操作系统。因此,请不要期望进程内存会减少。相反,您应该确保如果多次调用此函数,则内存不会增加。

英文:

I'm not able to completely reproduce your example. My understanding is that put_df is doing the right thing. But in get_df, the record_batch is holding to a view of the buffer / shared memory (in an effort to avoid copying data).

It means that you shouldn't hold on to the record batch (and the reader) before you close the shared memory.

I got it to work, but it's not very different from your solution. All I did is make sure the RecordBatchStreamReader is closed.

def get_df(sm_get_name):
sm_get = SharedMemory(name = sm_get_name, create = False)
buffer = pa.BufferReader(sm_get.buf)
with pa.RecordBatchStreamReader(buffer) as reader:
record_batch = reader.read_next_batch()
df = record_batch.to_pandas()
del record_batch
del reader
del buffer
sm_get.close()
sm_get.unlink()
return df

It's also possible that using pd.ArrowDtype doesn't work because it means the shared memory data isn't copied at all. Whereas the standard to_pandas implementation copies the data from arrow to numpy format.

PS:

> the memory is not released instantly (I'm checking It with the memory_profiler library)

python doesn't immediately release the memory that has been freed to the OS. So don't expect the process memory to decrease. Instead you should jsut make sure that the memory doesn't increase if you call this function several times.

huangapple
  • 本文由 发表于 2023年6月15日 19:04:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/76481829.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定