英文:
Multiple Arrow CSV Readers on same file returns null
问题
我正在尝试使用多个Goroutine读取同一个文件,每个Goroutine被分配一个起始字节和要读取的行数lineLimit
。
当文件大小适合内存时,通过将csv.ChunkSize
选项设置为chunkSize
变量,我成功地做到了这一点。然而,当文件大于内存时,我需要减小csv.ChunkSize
选项。我尝试了类似下面的代码:
package main
import (
"io"
"log"
"os"
"sync"
"github.com/apache/arrow/go/v11/arrow"
"github.com/apache/arrow/go/v11/arrow/csv"
)
// 从byteOffset开始读取文件中的行,行数由linesLimit指定。
func produce(
id int,
ch chan<- arrow.Record,
byteOffset int64,
linesLimit int64,
filename string,
wg *sync.WaitGroup,
) {
defer wg.Done()
fd, _ := os.Open(filename)
fd.Seek(byteOffset, io.SeekStart)
var remainder int64 = linesLimit % 10
limit := linesLimit - remainder
chunkSize := limit / 10
reader := csv.NewInferringReader(fd,
csv.WithChunk(int(chunkSize)),
csv.WithNullReader(true, ""),
csv.WithComma(','),
csv.WithHeader(true),
csv.WithColumnTypes(map[string]arrow.DataType{
"Start_Time": arrow.FixedWidthTypes.Timestamp_ns,
"End_Time": arrow.FixedWidthTypes.Timestamp_ns,
"Weather_Timestamp": arrow.FixedWidthTypes.Timestamp_ns,
}))
reader.Retain()
defer reader.Release()
var count int64
for reader.Next() {
rec := reader.Record()
rec.Retain() // 在通道的另一端释放
ch <- rec
count += rec.NumRows()
if count == limit {
if remainder != 0 {
flush(id, ch, fd, remainder)
}
break
} else if count > limit {
log.Panicf("Reader %d read more than it should, expected=%d, read=%d", id, linesLimit, count)
}
}
if reader.Err() != nil {
log.Panicf("error: %s in line %d,%d", reader.Err().Error(), count, id)
}
}
func flush(id int,
ch chan<- arrow.Record,
fd *os.File,
limit int64,
) {
reader := csv.NewInferringReader(fd,
csv.WithChunk(int(limit)),
csv.WithNullReader(true, ""),
csv.WithComma(','),
csv.WithHeader(false),
)
reader.Retain()
defer reader.Release()
record := reader.Record()
record.Retain() // 这里会出现空指针解引用错误
ch <- record
}
我尝试了多个版本的上述代码,包括:
- 复制文件描述符
- 复制文件描述符的偏移量,打开相同的文件并将其定位到该偏移量。
- 在调用
flush
或关闭第一个fd
之前关闭第一个读取器。
无论我如何更改代码,错误似乎都是相同的。请注意,任何对flush
的读取器的调用都会引发错误,包括reader.Next
和reader.Err()
。
我是否错误地使用了csv读取器?这是重复使用同一个文件的问题吗?
编辑:我不知道这是否有帮助,但在flush
中打开一个新的fd而没有任何Seek
可以避免错误(不知何故,任何Seek
都会导致原始错误出现)。然而,没有Seek
的代码是不正确的(即删除Seek
会导致文件的一部分根本不被任何Goroutine读取)。
英文:
I'm trying to read a the same file using multiple Goroutines, where each Goroutine is assigned a byte to start reading from and a number of lines to read lineLimit
.
I was successful in doing so when the file fits in memory by setting the csv.ChunkSize
option to the chunkSize
variable. However, when the file is larger than memory, I need to reduce the csv.ChunkSize
option. I was attempting something like this
package main
import (
"io"
"log"
"os"
"sync"
"github.com/apache/arrow/go/v11/arrow"
"github.com/apache/arrow/go/v11/arrow/csv"
)
// A reader to read lines from the file starting from the byteOffset. The number
// of lines is specified by linesLimit.
func produce(
id int,
ch chan<- arrow.Record,
byteOffset int64,
linesLimit int64,
filename string,
wg *sync.WaitGroup,
) {
defer wg.Done()
fd, _ := os.Open(filename)
fd.Seek(byteOffset, io.SeekStart)
var remainder int64 = linesLimit % 10
limit := linesLimit - remainder
chunkSize := limit / 10
reader := csv.NewInferringReader(fd,
csv.WithChunk(int(chunkSize)),
csv.WithNullReader(true, ""),
csv.WithComma(','),
csv.WithHeader(true),
csv.WithColumnTypes(map[string]arrow.DataType{
"Start_Time": arrow.FixedWidthTypes.Timestamp_ns,
"End_Time": arrow.FixedWidthTypes.Timestamp_ns,
"Weather_Timestamp": arrow.FixedWidthTypes.Timestamp_ns,
}))
reader.Retain()
defer reader.Release()
var count int64
for reader.Next() {
rec := reader.Record()
rec.Retain() // released at the other end of the channel
ch <- rec
count += rec.NumRows()
if count == limit {
if remainder != 0 {
flush(id, ch, fd, remainder)
}
break
} else if count > limit {
log.Panicf("Reader %d read more than it should, expected=%d, read=%d", id, linesLimit, count)
}
}
if reader.Err() != nil {
log.Panicf("error: %s in line %d,%d", reader.Err().Error(), count, id)
}
}
func flush(id int,
ch chan<- arrow.Record,
fd *os.File,
limit int64,
) {
reader := csv.NewInferringReader(fd,
csv.WithChunk(int(limit)),
csv.WithNullReader(true, ""),
csv.WithComma(','),
csv.WithHeader(false),
)
reader.Retain()
defer reader.Release()
record := reader.Record()
record.Retain() // nil pointer dereference error here
ch <- record
}
I tried multiple versions of this previous code, including:
- Copying the file descriptor
- Copying the offset of the file descriptor, opening the same file
and seeking to that offset. - Closing the first reader before calling
flush
or closing the firstfd
.
The error seems to be the same no matter how I change the code. Note that any call to flush
's reader raises an error. Includingreader.Next
, and reader.Err()
.
Am I using the csv readers wrong? Is this a problem with reusing the same file?
EDIT: I don't know if this helps, but opening a new fd in flush
without any Seek
avoids the error (Somehow any Seek
causes the original error to appear). However, the code is not correct without a Seek
(i.e. removing Seek
causes a part of the file to not be read at all by any Goroutine).
答案1
得分: 1
主要问题是,csv读取器使用了bufio.Reader
作为底层,它的默认缓冲区大小为4096字节。这意味着reader.Next()
会读取比所需更多的字节,并缓存额外的字节。如果在reader.Next()
之后直接从文件中读取,你将会错过缓存的字节。
下面的示例演示了这种行为:
package main
import (
"bytes"
"fmt"
"io"
"os"
"github.com/apache/arrow/go/v11/arrow"
"github.com/apache/arrow/go/v11/arrow/csv"
)
func main() {
// 创建一个包含以下内容的两列csv文件(第二列有1024字节):
// 0,000000....
// 1,111111....
// 2,222222....
// 3,333333....
temp := createTempFile()
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "i64", Type: arrow.PrimitiveTypes.Int64},
{Name: "str", Type: arrow.BinaryTypes.String},
},
nil,
)
r := csv.NewReader(
temp, schema,
csv.WithComma(','),
csv.WithChunk(3),
)
defer r.Release()
r.Next()
// 检查第一个块读取后剩下的内容。
// 如果读取器停在块的末尾,剩下的内容应该是:
// 3,333333....
// 但实际上,剩下的内容是:
// 33333333333
buf, err := io.ReadAll(temp)
if err != nil {
panic(err)
}
fmt.Printf("%s\n", buf)
}
func createTempFile() *os.File {
temp, err := os.CreateTemp("", "test*.csv")
if err != nil {
panic(err)
}
for i := 0; i < 4; i++ {
fmt.Fprintf(temp, "%d,", i)
if _, err := temp.Write(bytes.Repeat([]byte{byte('0' + i)}, 1024)); err != nil {
panic(err)
}
if _, err := temp.Write([]byte("\n")); err != nil {
panic(err)
}
}
if _, err := temp.Seek(0, io.SeekStart); err != nil {
panic(err)
}
return temp
}
似乎第二个读取器的目的是防止它读取到另一个块的csv数据。如果你事先知道下一个csv数据块的偏移量,你可以将文件包装在io.SectionReader
中,以使其只读取当前的csv数据块。当前的问题没有提供关于这部分的足够信息,也许我们应该在另一个问题中讨论。
注意事项:
fd, _ := os.Open(filename)
:永远不要忽略错误。至少记录下来。fd
通常表示文件描述符。不要将其用于类型为*os.File
的变量,特别是当*os.File
有一个名为Fd
的方法时。
英文:
The main issue is that, the csv reader uses a bufio.Reader
underneath, which has a default buffer size 4096. That means reader.Next()
will read more bytes than needed, and cache the extra bytes. If you read directly from the file after reader.Next()
, you will miss the cached bytes.
The demo below shows this behavior:
package main
import (
"bytes"
"fmt"
"io"
"os"
"github.com/apache/arrow/go/v11/arrow"
"github.com/apache/arrow/go/v11/arrow/csv"
)
func main() {
// Create a two-column csv file with this content (the second column has 1024 bytes):
// 0,000000....
// 1,111111....
// 2,222222....
// 3,333333....
temp := createTempFile()
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "i64", Type: arrow.PrimitiveTypes.Int64},
{Name: "str", Type: arrow.BinaryTypes.String},
},
nil,
)
r := csv.NewReader(
temp, schema,
csv.WithComma(','),
csv.WithChunk(3),
)
defer r.Release()
r.Next()
// To check what's left after the first chunk is read.
// If the reader stop at the end of the chunk, the content left will be:
// 3,333333....
// But in fact, the content left is:
// 33333333333
buf, err := io.ReadAll(temp)
if err != nil {
panic(err)
}
fmt.Printf("%s\n", buf)
}
func createTempFile() *os.File {
temp, err := os.CreateTemp("", "test*.csv")
if err != nil {
panic(err)
}
for i := 0; i < 4; i++ {
fmt.Fprintf(temp, "%d,", i)
if _, err := temp.Write(bytes.Repeat([]byte{byte('0' + i)}, 1024)); err != nil {
panic(err)
}
if _, err := temp.Write([]byte("\n")); err != nil {
panic(err)
}
}
if _, err := temp.Seek(0, io.SeekStart); err != nil {
panic(err)
}
return temp
}
It seems that the purpose of the second reader is to prevent it from reading into another block of csv data. If you know the offset of the next block of csv data in advance, you can wrap the file in an io.SectionReader
to make it read only the current block of csv data. The current question does not provide enough information about this part, maybe we should leave it for another question.
Notes:
fd, _ := os.Open(filename)
: Never ignore errors. At least log them.fd
means file descriptor most of the time. Don't use it for a variable of type*os.File
, especially when*os.File
has a methodFd
.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论