英文:
Downloading multiple files from S3 concurrently and consolidated them
问题
我正在尝试同时从S3下载多个文件,并将它们的内容合并到一个字节缓冲区中。这些文件是以CSV格式的。我的代码似乎大部分时间都能正常工作(在10次尝试中有8次成功)。但是有时候在检查合并后的缓冲区时,我得到的内容比预期的少(通常不超过100行)。预期的总记录数是4802条。
如果我按顺序运行代码,这个问题就不会出现。但是我需要使用goroutines来提高速度。这是我尝试做的一个主要要求。我已经运行了Go数据竞争检查器,没有发现数据竞争,并且我打印的错误语句从未打印出来。
这是我使用的代码:
var pingsBuffer = aws.NewWriteAtBuffer([]byte{})
// 遍历索引文件的内容
for _, file := range indexList {
wg.Add(1)
go download(key + string(file), pingsBuffer, &wg)
}
wg.Wait()
和下载函数(也负责合并下载的文件):
func download(key string, buffer *aws.WriteAtBuffer, wg *sync.WaitGroup) {
defer wg.Done()
awsBuffer := aws.NewWriteAtBuffer([]byte{})
input := &s3.GetObjectInput{
Bucket: aws.String(defaultLocationRootBucket),
Key: aws.String(key),
}
n1, downloadError := downloader.Download(awsBuffer, input)
if downloadError != nil {
loglib.Log(loglib.LevelError, applicationType, fmt.Sprintf("Failed to download from S3, file(%v) with error : %v.", key, downloadError))
return
}
lenghts3 := int64(len(buffer.Bytes()))
n2, bufferError := buffer.WriteAt(awsBuffer.Bytes(), lenghts3)
if bufferError != nil {
loglib.Log(loglib.LevelError, applicationType, fmt.Sprintf("Failed to write to buffer, the file(%v) downloaded from S3 with error : %v.", key, bufferError))
}
}
希望对你有帮助!
英文:
Im trying to download multiple files from S3 concurrently ,and consolidate their contents into a bytes buffer.The files are csv formatted. My code seems to work most of time(8 from 10 tries).But there are instances that after i inspected the consolidated buffer, I've got less that what i should be getting(usually no more than 100 rows missing). Total number of records expected is 4802.
If run my code sequentially this problem does not appear.But i need to use goroutines for the speed.This is a major requirement on what im trying to do.I have run the go data race inspector with no data races appear , and the error statements that i print never print out.
This is the code i use:
var pingsBuffer = aws.NewWriteAtBuffer([]byte{})
//range over the contents of the index file
for _, file := range indexList {
wg.Add(1)
go download(key + string(file), pingsBuffer, &wg)
}
wg.Wait()
and the download functions (that also consolidates the downloaded files)
func download(key string, buffer *aws.WriteAtBuffer, wg *sync.WaitGroup) {
defer wg.Done()
awsBuffer := aws.NewWriteAtBuffer([]byte{})
input := &s3.GetObjectInput {
Bucket: aws.String(defaultLocationRootBucket),
Key: aws.String(key),
}
n1, downloadError := downloader.Download(awsBuffer, input)
if downloadError != nil {
loglib.Log(loglib.LevelError, applicationType, fmt.Sprintf("Failed to download from S3, file(%v) with error : %v.", key, downloadError))
return
}
lenghts3:= int64(len(buffer.Bytes()))
n2, bufferError := buffer.WriteAt(awsBuffer.Bytes(), lenghts3 )
if bufferError != nil {
loglib.Log(loglib.LevelError, applicationType, fmt.Sprintf("Failed to write to buffer, the file(%v) downloaded from S3 with error : %v.", key, bufferError))
}
答案1
得分: 2
这段代码存在并发问题:两个例程可能同时获取长度,得到相同的起始位置,然后都会在相同的起始位置写入缓冲区,相互干扰。
由于您已经将整个对象检索到内存中,而不是流式传输到组合缓冲区,因此您可以将每个文件的完整内容直接发送到一个通道上,并在该通道上设置一个接收器,以便在它们到达时将每个结果追加到共享的字节缓冲区中,以同步方式进行操作。
英文:
This code:
lenghts3:= int64(len(buffer.Bytes()))
Is a concurrency problem: two routines may get the length at the same time, getting the same start position, and both proceed to write to the buffer with the same start position, stepping on each other's toes.
Since you're already retrieving whole objects in memory and not streaming to the combined buffer, you may as well just send the full contents of each file on a channel, and have a receiver on that channel append each result to a shared byte buffer as they come in, synchronously.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论