英文:
GroupByKey always holds holds everything in RAM, causing OOM
问题
我正在编写一个在批处理和流处理模式下使用DataFlow的管道代码,并且在批处理模式下使用GroupByKey
时遇到了OOM问题。下面的代码显示了问题所在:当我有一个大文件时,GroupByKey
似乎会将所有内容都保存在内存中,只有在输入完成后才会发出值。我尝试使用触发器来强制触发事件,但失败了。我找不到在大文件上使用此转换的任何方法。
如何在Beam Go中实现一个包含分组的管道,可以高效地处理大文件?
package sisubqio_test
import (
"context"
"flag"
"fmt"
"io"
"os"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func TestWriter(t *testing.T) {
mustNotFail := func(err error) {
if err != nil {
t.Fatal(err)
}
}
// test file with a few lines of text
fName := "in.tmp.txt"
f, err := os.OpenFile(fName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
mustNotFail(err)
defer func() {
mustNotFail(f.Close())
mustNotFail(os.Remove(fName))
}()
for i := 0; i < 10; i++ {
_, err = fmt.Fprintf(f, "line %d\n", i)
mustNotFail(err)
}
_, err = f.Seek(0, io.SeekStart)
mustNotFail(err)
flag.Parse()
beam.Init()
pipeline, s := beam.NewPipelineWithRoot()
col := textio.Read(s, fName)
// add timestamp to messages: each message has a timestamp 20s after
// the previous one
now := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
var counter int32
col = beam.ParDo(s, func(line beam.X) (typex.EventTime, beam.X) {
i := atomic.AddInt32(&counter, 1) - 1
evTime := mtime.Time(mtime.FromTime(now.Add(20 * time.Duration(i) * time.Second)).Milliseconds())
t.Logf("[0] input event, time=%v", evTime)
return evTime, line
}, col)
// add a window and inspect events, when emitted
col = beam.WindowInto(s,
window.NewFixedWindows(time.Minute),
col,
beam.Trigger(window.TriggerAlways()), // I tried all triggers here; makes no difference
)
col = beam.ParDo(s, func(w typex.Window, e string) string {
t.Logf("[1] window: %v", w)
return e
}, col)
// add a key and group by it; inspect events, when emitted
col = beam.AddFixedKey(s, col)
col = beam.ParDo(s, func(et typex.EventTime, group int, x beam.X) (int, beam.X) {
t.Logf("[2] at %v got (group %d)",
time.UnixMilli(int64(et)),
group)
return group, x
}, col)
// ISSUE IS HERE
// It doesn't matter the trigger I use, it looks like GroupByKey
// always wants to hold everything into memory and only then
// emit it's outputs. With large files is always OOMs.
col = beam.GroupByKey(s, col)
beam.ParDo0(s, func(w typex.Window, group int, valIter func(*string) bool) {
sb := strings.Builder{}
fmt.Fprintf(&sb, "[3] win=%v out group=%d", w, group)
var elm string
for valIter(&elm) {
fmt.Fprintf(&sb, " %s;", elm)
}
t.Log(sb.String())
}, col)
mustNotFail(beamx.Run(context.Background(), pipeline))
}
输出:
writer_test.go:58: [0] input event, time=1577836800000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836820000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836840000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836860000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836880000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836900000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836920000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836940000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836960000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836980000
writer_test.go:69: [1] window: [1577836980000:1577837040000)
writer_test.go:79: [2] at 2020-01-01 01:03:00 +0100 CET got (group 0)
writer_test.go:95: [3] win=[1577836920000:1577836980000) out group=0 line 6; line 7; line 8;
writer_test.go:95: [3] win=[1577836980000:1577837040000) out group=0 line 9;
writer_test.go:95: [3] win=[1577836800000:1577836860000) out group=0 line 0; line 1; line 2;
writer_test.go:95: [3] win=[1577836860000:1577836920000) out group=0 line 3; line 4; line 5;
编辑:我找到了与触发器和窗口相关的Jira票据,目前来看,触发器和特别是触发器传播是一个正在进行中的工作。
英文:
I'm writing a pipeline code that will be used in both batch and streaming mode with DataFlow, and I'm having OOM issues when using GroupByKey
when operating in batch mode. The code bellow shows the issue: when I have a large file, GroupByKey
appears to hold everything in memory, only emitting values after the input finishes. I tried to use triggers to force events to be triggered, but failed. I can't find any way of using this transform on big files.
How can implement a pipeline in beam go that includes grouping and that can efficiently work on large files?
package sisubqio_test
import (
"context"
"flag"
"fmt"
"io"
"os"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func TestWriter(t *testing.T) {
mustNotFail := func(err error) {
if err != nil {
t.Fatal(err)
}
}
// test file with a few lines of text
fName := "in.tmp.txt"
f, err := os.OpenFile(fName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
mustNotFail(err)
defer func() {
mustNotFail(f.Close())
mustNotFail(os.Remove(fName))
}()
for i := 0; i < 10; i++ {
_, err = fmt.Fprintf(f, "line %d\n", i)
mustNotFail(err)
}
_, err = f.Seek(0, io.SeekStart)
mustNotFail(err)
flag.Parse()
beam.Init()
pipeline, s := beam.NewPipelineWithRoot()
col := textio.Read(s, fName)
// add timestamp to messages: each message has a timestamp 20s after
// the previous one
now := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
var counter int32
col = beam.ParDo(s, func(line beam.X) (typex.EventTime, beam.X) {
i := atomic.AddInt32(&counter, 1) - 1
evTime := mtime.Time(mtime.FromTime(now.Add(20 * time.Duration(i) * time.Second)).Milliseconds())
t.Logf("[0] input event, time=%v", evTime)
return evTime, line
}, col)
// add a window and inspect events, when emitted
col = beam.WindowInto(s,
window.NewFixedWindows(time.Minute),
col,
beam.Trigger(window.TriggerAlways()), // I tried all triggers here; makes no difference
)
col = beam.ParDo(s, func(w typex.Window, e string) string {
t.Logf("[1] window: %v", w)
return e
}, col)
// add a key and group by it; inspect events, when emitted
col = beam.AddFixedKey(s, col)
col = beam.ParDo(s, func(et typex.EventTime, group int, x beam.X) (int, beam.X) {
t.Logf("[2] at %v got (group %d)",
time.UnixMilli(int64(et)),
group)
return group, x
}, col)
// ISSUE IS HERE
// It doesn't matter the trigger I use, it looks like GroupByKey
// always wants to hold everything into memory and only then
// emit it's outputs. With large files is always OOMs.
col = beam.GroupByKey(s, col)
beam.ParDo0(s, func(w typex.Window, group int, valIter func(*string) bool) {
sb := strings.Builder{}
fmt.Fprintf(&sb, "[3] win=%v out group=%d", w, group)
var elm string
for valIter(&elm) {
fmt.Fprintf(&sb, " %s;", elm)
}
t.Log(sb.String())
}, col)
mustNotFail(beamx.Run(context.Background(), pipeline))
}
Output:
writer_test.go:58: [0] input event, time=1577836800000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836820000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836840000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836860000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836880000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836900000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836920000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836940000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836960000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836980000
writer_test.go:69: [1] window: [1577836980000:1577837040000)
writer_test.go:79: [2] at 2020-01-01 01:03:00 +0100 CET got (group 0)
writer_test.go:95: [3] win=[1577836920000:1577836980000) out group=0 line 6; line 7; line 8;
writer_test.go:95: [3] win=[1577836980000:1577837040000) out group=0 line 9;
writer_test.go:95: [3] win=[1577836800000:1577836860000) out group=0 line 0; line 1; line 2;
writer_test.go:95: [3] win=[1577836860000:1577836920000) out group=0 line 3; line 4; line 5;
EDIT: I found Jira tickets related to triggers and windows that, at the time of writing this, make be believe that the trigger, and specially trigger propagation is a WIP.
答案1
得分: 1
Beam
使用映射和归约操作。映射
(转换)可以并行在不同的工作节点/虚拟机上进行。归约
需要知道所有要执行的元素,因此它会将所有元素加载到内存中,然后执行归约groupBy
操作。
你有两个解决方案:
-
你可以创建窗口来仅处理大文件的部分数据块。然而,你的
groupBy
操作将不是全局的,而是基于窗口的。 -
你也可以尝试新的
Dataflow
Prime选项。它是无服务器且可完全扩展的。承诺解决所有OOM错误(我只在Java中遇到过,我从未使用过Beam Go SDK)。
你还可以增加工作节点的内存,但这不是可扩展的解决方案(而且成本更高!)。Prime选项是一个好选择(但仍处于预览阶段)。
英文:
Beam
uses map and reduce operations. Map
(transforms) can be done in parallel and on different workers/VM. Reduce
needs to know all the elements to be performed, thus it loads all the elements in memory and then it performs the reduce groupBy
operation.
You have 2 solutions:
-
You can create windows to process only chunks of your large file. However, your
groupBy
won't be global, but per window. -
You can also try the new
Dataflow
prime option. It's serverless and fully scalable. The promise is to remove all the OOM error (that I got only in Java, I never use the Beam Go SDK)
You can also increase the memory of your worker, but it's not a scalable solution (and it costs more!). The prime option is the good one (but still in preview)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论