英文:
Can I stream data from a writer to a reader in golang?
问题
我想处理一些内容超出工作机器内存的文件。到目前为止,我找到的解决方案是在上传到S3之前将处理结果保存到/tmp
目录中。
有没有办法避免将内容保存到临时文件中,而只使用有限大小的字节缓冲区在写入器和上传函数之间进行传输?
换句话说,我能否在仍然向同一缓冲区写入数据的同时开始向读取器流式传输数据?
英文:
I want to process a number of files whose contents don't fit in the memory of my worker. The solution I found so far involves saving the results to the processing to the /tmp
directory before uploading it to S3.
import (
"bufio"
"bytes"
"context"
"fmt"
"log"
"os"
"runtime"
"strings"
"sync"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/korovkin/limiter"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/writer"
)
func DownloadWarc(
ctx context.Context,
s3Client *s3.Client,
warcs []*types.Warc,
path string,
) error {
key := fmt.Sprintf("parsed_warc/%s.parquet", path)
filename := fmt.Sprintf("/tmp/%s", path)
file, err := os.Create(filename)
if err != nil {
return fmt.Errorf("error creating file: %s", err)
}
defer file.Close()
bytesWriter := bufio.NewWriter(file)
pw, err := writer.NewParquetWriterFromWriter(bytesWriter, new(Page), 4)
if err != nil {
return fmt.Errorf("Can't create parquet writer: %s", err)
}
pw.RowGroupSize = 128 * 1024 * 1024 //128M
pw.CompressionType = parquet.CompressionCodec_SNAPPY
mutex := sync.Mutex{}
numWorkers := runtime.NumCPU() * 2
fmt.Printf("Using %d workers\n", numWorkers)
limit := limiter.NewConcurrencyLimiter(numWorkers)
for i, warc := range warcs {
limit.Execute(func() {
log.Printf("%d: %+v", i, warc)
body, err := GetWarc(ctx, s3Client, warc)
if err != nil {
fmt.Printf("error getting warc: %s", err)
return
}
page, err := Parse(body)
if err != nil {
key := fmt.Sprintf("unparsed_warc/%s.warc", path)
s3Client.PutObject(
ctx,
&s3.PutObjectInput{
Body: bytes.NewReader(body),
Bucket: &s3Record.Bucket.Name,
Key: &key,
},
)
fmt.Printf("error getting page %s: %s", key, err)
return
}
mutex.Lock()
err = pw.Write(page)
pw.Flush(true)
mutex.Unlock()
if err != nil {
fmt.Printf("error writing page: %s", err)
return
}
})
}
limit.WaitAndClose()
err = pw.WriteStop()
if err != nil {
return fmt.Errorf("error writing stop: %s", err)
}
bytesWriter.Flush()
file.Seek(0, 0)
_, err = s3Client.PutObject(
ctx,
&s3.PutObjectInput{
Body: file,
Bucket: &s3Record.Bucket.Name,
Key: &key,
},
)
if err != nil {
return fmt.Errorf("error uploading warc: %s", err)
}
return nil
}
Is there a way to avoid saving the contents into a temp file and use only a limited size byte buffer between the writer and the upload function?
In other words can I begin to stream data to a reader while still writing to the same buffer?
答案1
得分: 1
是的,有一种方法可以将相同的内容写入多个写入器。使用io.MultiWriter
可以让您不使用临时文件。然而,最好还是使用临时文件。
我经常使用io.MultiWriter
将内容写入到一系列校验和(如sha256...)计算器中。实际上,上次我阅读S3客户端代码时,我注意到它在内部使用这种方式来计算校验和。对于在云端之间传输大文件,MultiWriter非常有用。
另外,如果您最终使用临时文件,您可能希望使用os.CreateTemp
来创建临时文件。如果不这样做,如果您的代码在两个进程中运行或者您的文件具有相同的名称,可能会遇到文件名冲突的问题。
请随时澄清您的问题。我可以再次尝试回答
英文:
Yes there is a way to write the same content to multiple writers. Using io.MultiWriter
might allow you to not use a temp file. However, it might still be good to use a temp file.
I often use io.MultiWriter
to write to a list of checksum (sha256...) calculators. Actually, last time I read the the S3 client code, I noticed it does this under the hood to calculate the checksum. MultiWriter is pretty useful for piping big files between cloud places.
Also, if you end up using temp files. You may want to use os.CreateTemp
to create temporary files. If you don't, you may run into issues with your created file names if your code is running in two processes or your files have the same name.
Feel free to clarify your question. I can try to answer again
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论