在并行迭代Parquet文件时高效地填充并发映射

huangapple go评论76阅读模式
英文:

Populate concurrent map while iterating parquet files in parallel efficiently

问题

我正在使用parquet文件进行工作,我正在并行读取所有这些文件,并在每个文件的所有行中遍历后填充并发映射。总共有50个文件,每个文件的大小最大为60MB。

我需要并行化我的for循环,以并行读取所有这些parquet文件,并通过并行读取所有这些parquet文件来填充映射。data结构中的这些并发映射将由多个读取器线程并发地读取,并且在for循环内部由多个写入器并行地写入。我希望确保它们是安全的,并且操作是原子的。我还有一个getter方法来访问这些并发映射。

以下是我得到的代码,但我不确定这是否是正确的并行化方式,或者我是否遗漏了一些非常基本的东西?

import (
//....
pars3 "github.com/xitongsys/parquet-go-source/s3"
"github.com/xitongsys/parquet-go/reader"
cmap "github.com/orcaman/concurrent-map"
//....
)

type Data interface {
	GetCustomerMap() *cmap.ConcurrentMap
	GetCustomerMetricsMap() *cmap.ConcurrentMap
}

type data struct {
    // 将有getter方法来访问下面的映射
    customers          *cmap.ConcurrentMap
	customerMetrics    *cmap.ConcurrentMap
}

//loadParquet.. 这将由后台线程定期调用
func (r *data) loadParquet(path string, bucket string, key string) error {
    var err error
    var files []string
    files, err = r.s3Client.ListObjects(bucket, key, ".parquet")
    if err != nil {
        return err
    }

    var waitGroup sync.WaitGroup 
    // 设置我们要等待的有效goroutine的数量
    waitGroup.Add(len(files)) 

    // 并行化下面的for循环,以便以线程安全的方式填充映射?
    // 同一个映射也将被多个读取器线程访问。
    // 这些映射的写入发生在后台线程中,但有很多读取器线程从映射中读取。
    for i, file := range files {
        err = func() error {
            fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
            if err != nil {
                return errs.Wrap(err)
            }
            defer xio.CloseIgnoringErrors(fr)

            pr, err := reader.NewParquetReader(fr, nil, 4)
            if err != nil {
                return errs.Wrap(err)
            }

            // 对这个for循环感到困惑?
            // 我们需要在这里并行化吗?
            for {
                rows, err := pr.ReadByNumber(100)
                if err != nil {
                    return errs.Wrap(err)
                }

                if len(rows) <= 0 {
                    break
                }

                byteSlice, err := json.Marshal(rows)
                if err != nil {
                    return errs.Wrap(err)
                }

                var rows []ParquetProduct
                err = json.Unmarshal(byteSlice, &rows)
                if err != nil {
                    return errs.Wrap(err)
                }

                // 读取行结构并放入并发映射中。
                // 需要以原子和线程安全的方式填充映射
                // 在这个for循环内部进行多个并行写入
        		for i := range rows {
                  // ...
                  // ...
                  r.customers.Set(.....)
                  r.customerMetrics.Set(....)
                }
            }
            return nil
        }()

        if err != nil {
            return err
        }

        go task(&waitGroup) // 实现最大并发性
    }
    // 等待所有goroutine完成执行。
    waitGroup.Wait()  
    return nil
}

//GetCustomerMap.. 这些将被多个读取器线程访问以从映射中获取数据。
func (r *data) GetCustomerMap() *cmap.ConcurrentMap {
	return r.customers
}

//GetCustomerMetricsMap.. 这些将被多个读取器线程访问以从映射中获取数据。
func (r *data) GetCustomerMetricsMap() *cmap.ConcurrentMap {
	return r.customerMetrics
}

我正在使用这个parquet库在Go中读取文件。

英文:

I am working with parquet file where I am reading all those files in parallel and populating concurrent map after going through all those rows inside each file. Total number of files are 50 and each file size is around 60MB max.

I need to parallelize my for loop which read all these parquet files in parallel and also populate map by reading all these parquet files in parallel. These concurrent maps inside data struct will be read by multiple reader threads concurrently and also written by multiple writers in parallel inside for loop. I want to make sure they are safe and operation is atomic. I also got getter method to access those concurrent map.

Below is the code I got but I am not sure if this is the right way to parallelize it or I missing something very basics?

import (
//....
pars3 &quot;github.com/xitongsys/parquet-go-source/s3&quot;
&quot;github.com/xitongsys/parquet-go/reader&quot;
cmap &quot;github.com/orcaman/concurrent-map&quot;
//....
)
type Data interface {
GetCustomerMap() *cmap.ConcurrentMap
GetCustomerMetricsMap() *cmap.ConcurrentMap
}
type data struct {
// will have getter methods to access these below map
customers          *cmap.ConcurrentMap
customerMetrics    *cmap.ConcurrentMap
}
//loadParquet.. This will be called by background thread periodically
func (r *data) loadParquet(path string, bucket string, key string) error {
var err error
var files []string
files, err = r.s3Client.ListObjects(bucket, key, &quot;.parquet&quot;)
if err != nil {
return err
}
var waitGroup sync.WaitGroup 
// Set number of effective goroutines we want to wait upon 
waitGroup.Add(len(files)) 
// parallelize below for loop in such a way so that I can populate my map in thread safe way?
// And same map will be accessed by multiple reader threads too.
// This writes to our map happens from background thread but there are lot of reader threads reading from the map.
for i, file := range files {
err = func() error {
fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
if err != nil {
return errs.Wrap(err)
}
defer xio.CloseIgnoringErrors(fr)
pr, err := reader.NewParquetReader(fr, nil, 4)
if err != nil {
return errs.Wrap(err)
}
// confuse on this for loop?
// do we need to parallelize here too?
for {
rows, err := pr.ReadByNumber(100)
if err != nil {
return errs.Wrap(err)
}
if len(rows) &lt;= 0 {
break
}
byteSlice, err := json.Marshal(rows)
if err != nil {
return errs.Wrap(err)
}
var rows []ParquetProduct
err = json.Unmarshal(byteSlice, &amp;rows)
if err != nil {
return errs.Wrap(err)
}
// read rows struct and put inside concurrent map.
// Need to populate map in such a way so that it is atomic and thread safe
// from multiple parallel writes inside this for loop 
// and multiple reads will happen from reader threads on these maps
for i := range rows {
// ...
// ...
r.customers.Set(.....)
r.customerMetrics.Set(....)
}
}
return nil
}()
if err != nil {
return err
}
go task(&amp;waitGroup) // Achieving maximum concurrency 
}
// Wait until all goroutines have completed execution. 
waitGroup.Wait()  
return nil
}
//GetCustomerMap.. These will be accessed by multiple reader threads to get data out of map.
func (r *data) GetCustomerMap() *cmap.ConcurrentMap {
return r.customers
}
//GetCustomerMetricsMap.. These will be accessed by multiple reader threads to get data out of map.
func (r *data) GetCustomerMetricsMap() *cmap.ConcurrentMap {
return r.customerMetrics
}

I am using this parquet library in go to read files.

答案1

得分: 2

同时访问地图在Golang中往往是一种反模式,并且通常需要使用锁来防止恐慌,例如:Golang fatal error: concurrent map writes

相反,您可以使用chan(通道)类型,让每个读取文件的go例程将其数据写入该通道,然后有一个单独的go例程监听该通道并将数据添加到地图中。在这种方法中,地图只有一个读写器,没有锁,并且每个go例程在写入其结果时不会被阻塞(如果通道是缓冲的)。

可以在此处看到此模式的示例:https://play.golang.com/p/u-uYDaWiQiD

doWork()替换为您的读取文件的函数,并监听输出通道以获取字节、文件或任何您希望的类型,然后将它们放入地图中。

英文:

Concurrently accessing a map tends to be an anti-pattern in Golang, and will generally require the usage of locks to prevent panics, eg: Golang fatal error: concurrent map writes.

Instead, you could use a chan (channel) type, have each go routine that is reading a file write its data to that channel, then have a single go-routine that listens to that channel and adds the data to the map. In this approach, there is only a single reader/writer to the map, no locks, and each go-routine is not blocked when writing its results (if the channel is buffered).

An example of this pattern can be seen here: https://play.golang.com/p/u-uYDaWiQiD

Replace doWork() with the function of yours that reads files, and listen to the output channel for bytes, files, whatever type you wish, so you can then place them into your map.

huangapple
  • 本文由 发表于 2022年2月23日 06:26:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/71229088.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定