同一文件上的多个箭头CSV读取器返回null。

huangapple go评论79阅读模式
英文:

Multiple Arrow CSV Readers on same file returns null

问题

我正在尝试使用多个Goroutine读取同一个文件,每个Goroutine被分配一个起始字节和要读取的行数lineLimit

当文件大小适合内存时,通过将csv.ChunkSize选项设置为chunkSize变量,我成功地做到了这一点。然而,当文件大于内存时,我需要减小csv.ChunkSize选项。我尝试了类似下面的代码:

  1. package main
  2. import (
  3. "io"
  4. "log"
  5. "os"
  6. "sync"
  7. "github.com/apache/arrow/go/v11/arrow"
  8. "github.com/apache/arrow/go/v11/arrow/csv"
  9. )
  10. // 从byteOffset开始读取文件中的行,行数由linesLimit指定。
  11. func produce(
  12. id int,
  13. ch chan<- arrow.Record,
  14. byteOffset int64,
  15. linesLimit int64,
  16. filename string,
  17. wg *sync.WaitGroup,
  18. ) {
  19. defer wg.Done()
  20. fd, _ := os.Open(filename)
  21. fd.Seek(byteOffset, io.SeekStart)
  22. var remainder int64 = linesLimit % 10
  23. limit := linesLimit - remainder
  24. chunkSize := limit / 10
  25. reader := csv.NewInferringReader(fd,
  26. csv.WithChunk(int(chunkSize)),
  27. csv.WithNullReader(true, ""),
  28. csv.WithComma(','),
  29. csv.WithHeader(true),
  30. csv.WithColumnTypes(map[string]arrow.DataType{
  31. "Start_Time": arrow.FixedWidthTypes.Timestamp_ns,
  32. "End_Time": arrow.FixedWidthTypes.Timestamp_ns,
  33. "Weather_Timestamp": arrow.FixedWidthTypes.Timestamp_ns,
  34. }))
  35. reader.Retain()
  36. defer reader.Release()
  37. var count int64
  38. for reader.Next() {
  39. rec := reader.Record()
  40. rec.Retain() // 在通道的另一端释放
  41. ch <- rec
  42. count += rec.NumRows()
  43. if count == limit {
  44. if remainder != 0 {
  45. flush(id, ch, fd, remainder)
  46. }
  47. break
  48. } else if count > limit {
  49. log.Panicf("Reader %d read more than it should, expected=%d, read=%d", id, linesLimit, count)
  50. }
  51. }
  52. if reader.Err() != nil {
  53. log.Panicf("error: %s in line %d,%d", reader.Err().Error(), count, id)
  54. }
  55. }
  56. func flush(id int,
  57. ch chan<- arrow.Record,
  58. fd *os.File,
  59. limit int64,
  60. ) {
  61. reader := csv.NewInferringReader(fd,
  62. csv.WithChunk(int(limit)),
  63. csv.WithNullReader(true, ""),
  64. csv.WithComma(','),
  65. csv.WithHeader(false),
  66. )
  67. reader.Retain()
  68. defer reader.Release()
  69. record := reader.Record()
  70. record.Retain() // 这里会出现空指针解引用错误
  71. ch <- record
  72. }

我尝试了多个版本的上述代码,包括:

  1. 复制文件描述符
  2. 复制文件描述符的偏移量,打开相同的文件并将其定位到该偏移量。
  3. 在调用flush或关闭第一个fd之前关闭第一个读取器。

无论我如何更改代码,错误似乎都是相同的。请注意,任何对flush的读取器的调用都会引发错误,包括reader.Nextreader.Err()

我是否错误地使用了csv读取器?这是重复使用同一个文件的问题吗?

编辑:我不知道这是否有帮助,但在flush中打开一个新的fd而没有任何Seek可以避免错误(不知何故,任何Seek都会导致原始错误出现)。然而,没有Seek的代码是不正确的(即删除Seek会导致文件的一部分根本不被任何Goroutine读取)。

英文:

I'm trying to read a the same file using multiple Goroutines, where each Goroutine is assigned a byte to start reading from and a number of lines to read lineLimit.

I was successful in doing so when the file fits in memory by setting the csv.ChunkSize option to the chunkSize variable. However, when the file is larger than memory, I need to reduce the csv.ChunkSize option. I was attempting something like this

  1. package main
  2. import (
  3. &quot;io&quot;
  4. &quot;log&quot;
  5. &quot;os&quot;
  6. &quot;sync&quot;
  7. &quot;github.com/apache/arrow/go/v11/arrow&quot;
  8. &quot;github.com/apache/arrow/go/v11/arrow/csv&quot;
  9. )
  10. // A reader to read lines from the file starting from the byteOffset. The number
  11. // of lines is specified by linesLimit.
  12. func produce(
  13. id int,
  14. ch chan&lt;- arrow.Record,
  15. byteOffset int64,
  16. linesLimit int64,
  17. filename string,
  18. wg *sync.WaitGroup,
  19. ) {
  20. defer wg.Done()
  21. fd, _ := os.Open(filename)
  22. fd.Seek(byteOffset, io.SeekStart)
  23. var remainder int64 = linesLimit % 10
  24. limit := linesLimit - remainder
  25. chunkSize := limit / 10
  26. reader := csv.NewInferringReader(fd,
  27. csv.WithChunk(int(chunkSize)),
  28. csv.WithNullReader(true, &quot;&quot;),
  29. csv.WithComma(&#39;,&#39;),
  30. csv.WithHeader(true),
  31. csv.WithColumnTypes(map[string]arrow.DataType{
  32. &quot;Start_Time&quot;: arrow.FixedWidthTypes.Timestamp_ns,
  33. &quot;End_Time&quot;: arrow.FixedWidthTypes.Timestamp_ns,
  34. &quot;Weather_Timestamp&quot;: arrow.FixedWidthTypes.Timestamp_ns,
  35. }))
  36. reader.Retain()
  37. defer reader.Release()
  38. var count int64
  39. for reader.Next() {
  40. rec := reader.Record()
  41. rec.Retain() // released at the other end of the channel
  42. ch &lt;- rec
  43. count += rec.NumRows()
  44. if count == limit {
  45. if remainder != 0 {
  46. flush(id, ch, fd, remainder)
  47. }
  48. break
  49. } else if count &gt; limit {
  50. log.Panicf(&quot;Reader %d read more than it should, expected=%d, read=%d&quot;, id, linesLimit, count)
  51. }
  52. }
  53. if reader.Err() != nil {
  54. log.Panicf(&quot;error: %s in line %d,%d&quot;, reader.Err().Error(), count, id)
  55. }
  56. }
  57. func flush(id int,
  58. ch chan&lt;- arrow.Record,
  59. fd *os.File,
  60. limit int64,
  61. ) {
  62. reader := csv.NewInferringReader(fd,
  63. csv.WithChunk(int(limit)),
  64. csv.WithNullReader(true, &quot;&quot;),
  65. csv.WithComma(&#39;,&#39;),
  66. csv.WithHeader(false),
  67. )
  68. reader.Retain()
  69. defer reader.Release()
  70. record := reader.Record()
  71. record.Retain() // nil pointer dereference error here
  72. ch &lt;- record
  73. }

I tried multiple versions of this previous code, including:

  1. Copying the file descriptor
  2. Copying the offset of the file descriptor, opening the same file
    and seeking to that offset.
  3. Closing the first reader before calling flush or closing the first fd.

The error seems to be the same no matter how I change the code. Note that any call to flush's reader raises an error. Includingreader.Next, and reader.Err().

Am I using the csv readers wrong? Is this a problem with reusing the same file?

EDIT: I don't know if this helps, but opening a new fd in flush without any Seek avoids the error (Somehow any Seek causes the original error to appear). However, the code is not correct without a Seek (i.e. removing Seek causes a part of the file to not be read at all by any Goroutine).

答案1

得分: 1

主要问题是,csv读取器使用了bufio.Reader作为底层,它的默认缓冲区大小为4096字节。这意味着reader.Next()会读取比所需更多的字节,并缓存额外的字节。如果在reader.Next()之后直接从文件中读取,你将会错过缓存的字节。

下面的示例演示了这种行为:

  1. package main
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "os"
  7. "github.com/apache/arrow/go/v11/arrow"
  8. "github.com/apache/arrow/go/v11/arrow/csv"
  9. )
  10. func main() {
  11. // 创建一个包含以下内容的两列csv文件(第二列有1024字节):
  12. // 0,000000....
  13. // 1,111111....
  14. // 2,222222....
  15. // 3,333333....
  16. temp := createTempFile()
  17. schema := arrow.NewSchema(
  18. []arrow.Field{
  19. {Name: "i64", Type: arrow.PrimitiveTypes.Int64},
  20. {Name: "str", Type: arrow.BinaryTypes.String},
  21. },
  22. nil,
  23. )
  24. r := csv.NewReader(
  25. temp, schema,
  26. csv.WithComma(','),
  27. csv.WithChunk(3),
  28. )
  29. defer r.Release()
  30. r.Next()
  31. // 检查第一个块读取后剩下的内容。
  32. // 如果读取器停在块的末尾,剩下的内容应该是:
  33. // 3,333333....
  34. // 但实际上,剩下的内容是:
  35. // 33333333333
  36. buf, err := io.ReadAll(temp)
  37. if err != nil {
  38. panic(err)
  39. }
  40. fmt.Printf("%s\n", buf)
  41. }
  42. func createTempFile() *os.File {
  43. temp, err := os.CreateTemp("", "test*.csv")
  44. if err != nil {
  45. panic(err)
  46. }
  47. for i := 0; i < 4; i++ {
  48. fmt.Fprintf(temp, "%d,", i)
  49. if _, err := temp.Write(bytes.Repeat([]byte{byte('0' + i)}, 1024)); err != nil {
  50. panic(err)
  51. }
  52. if _, err := temp.Write([]byte("\n")); err != nil {
  53. panic(err)
  54. }
  55. }
  56. if _, err := temp.Seek(0, io.SeekStart); err != nil {
  57. panic(err)
  58. }
  59. return temp
  60. }

似乎第二个读取器的目的是防止它读取到另一个块的csv数据。如果你事先知道下一个csv数据块的偏移量,你可以将文件包装在io.SectionReader中,以使其只读取当前的csv数据块。当前的问题没有提供关于这部分的足够信息,也许我们应该在另一个问题中讨论。

注意事项

  1. fd, _ := os.Open(filename):永远不要忽略错误。至少记录下来。
  2. fd通常表示文件描述符。不要将其用于类型为*os.File的变量,特别是当*os.File有一个名为Fd的方法时。
英文:

The main issue is that, the csv reader uses a bufio.Reader underneath, which has a default buffer size 4096. That means reader.Next() will read more bytes than needed, and cache the extra bytes. If you read directly from the file after reader.Next(), you will miss the cached bytes.

The demo below shows this behavior:

  1. package main
  2. import (
  3. &quot;bytes&quot;
  4. &quot;fmt&quot;
  5. &quot;io&quot;
  6. &quot;os&quot;
  7. &quot;github.com/apache/arrow/go/v11/arrow&quot;
  8. &quot;github.com/apache/arrow/go/v11/arrow/csv&quot;
  9. )
  10. func main() {
  11. // Create a two-column csv file with this content (the second column has 1024 bytes):
  12. // 0,000000....
  13. // 1,111111....
  14. // 2,222222....
  15. // 3,333333....
  16. temp := createTempFile()
  17. schema := arrow.NewSchema(
  18. []arrow.Field{
  19. {Name: &quot;i64&quot;, Type: arrow.PrimitiveTypes.Int64},
  20. {Name: &quot;str&quot;, Type: arrow.BinaryTypes.String},
  21. },
  22. nil,
  23. )
  24. r := csv.NewReader(
  25. temp, schema,
  26. csv.WithComma(&#39;,&#39;),
  27. csv.WithChunk(3),
  28. )
  29. defer r.Release()
  30. r.Next()
  31. // To check what&#39;s left after the first chunk is read.
  32. // If the reader stop at the end of the chunk, the content left will be:
  33. // 3,333333....
  34. // But in fact, the content left is:
  35. // 33333333333
  36. buf, err := io.ReadAll(temp)
  37. if err != nil {
  38. panic(err)
  39. }
  40. fmt.Printf(&quot;%s\n&quot;, buf)
  41. }
  42. func createTempFile() *os.File {
  43. temp, err := os.CreateTemp(&quot;&quot;, &quot;test*.csv&quot;)
  44. if err != nil {
  45. panic(err)
  46. }
  47. for i := 0; i &lt; 4; i++ {
  48. fmt.Fprintf(temp, &quot;%d,&quot;, i)
  49. if _, err := temp.Write(bytes.Repeat([]byte{byte(&#39;0&#39; + i)}, 1024)); err != nil {
  50. panic(err)
  51. }
  52. if _, err := temp.Write([]byte(&quot;\n&quot;)); err != nil {
  53. panic(err)
  54. }
  55. }
  56. if _, err := temp.Seek(0, io.SeekStart); err != nil {
  57. panic(err)
  58. }
  59. return temp
  60. }

It seems that the purpose of the second reader is to prevent it from reading into another block of csv data. If you know the offset of the next block of csv data in advance, you can wrap the file in an io.SectionReader to make it read only the current block of csv data. The current question does not provide enough information about this part, maybe we should leave it for another question.

Notes:

  1. fd, _ := os.Open(filename): Never ignore errors. At least log them.
  2. fd means file descriptor most of the time. Don't use it for a variable of type *os.File, especially when *os.File has a method Fd.

huangapple
  • 本文由 发表于 2023年5月2日 04:31:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/76149992.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定