GroupByKey总是将所有内容保存在RAM中,导致内存溢出(OOM)。

huangapple go评论80阅读模式
英文:

GroupByKey always holds holds everything in RAM, causing OOM

问题

我正在编写一个在批处理和流处理模式下使用DataFlow的管道代码,并且在批处理模式下使用GroupByKey时遇到了OOM问题。下面的代码显示了问题所在:当我有一个大文件时,GroupByKey似乎会将所有内容都保存在内存中,只有在输入完成后才会发出值。我尝试使用触发器来强制触发事件,但失败了。我找不到在大文件上使用此转换的任何方法。

如何在Beam Go中实现一个包含分组的管道,可以高效地处理大文件?

package sisubqio_test

import (
	"context"
	"flag"
	"fmt"
	"io"
	"os"
	"strings"
	"sync/atomic"
	"testing"
	"time"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func TestWriter(t *testing.T) {
	mustNotFail := func(err error) {
		if err != nil {
			t.Fatal(err)
		}
	}

	// test file with a few lines of text
	fName := "in.tmp.txt"
	f, err := os.OpenFile(fName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
	mustNotFail(err)
	defer func() {
		mustNotFail(f.Close())
		mustNotFail(os.Remove(fName))
	}()
	for i := 0; i < 10; i++ {
		_, err = fmt.Fprintf(f, "line %d\n", i)
		mustNotFail(err)
	}

	_, err = f.Seek(0, io.SeekStart)
	mustNotFail(err)

	flag.Parse()
	beam.Init()

	pipeline, s := beam.NewPipelineWithRoot()
	col := textio.Read(s, fName)

	// add timestamp to messages: each message has a timestamp 20s after
	// the previous one
	now := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
	var counter int32
	col = beam.ParDo(s, func(line beam.X) (typex.EventTime, beam.X) {
		i := atomic.AddInt32(&counter, 1) - 1
		evTime := mtime.Time(mtime.FromTime(now.Add(20 * time.Duration(i) * time.Second)).Milliseconds())
		t.Logf("[0] input event, time=%v", evTime)
		return evTime, line
	}, col)

	// add a window and inspect events, when emitted
	col = beam.WindowInto(s,
		window.NewFixedWindows(time.Minute),
		col,
		beam.Trigger(window.TriggerAlways()), // I tried all triggers here; makes no difference
	)
	col = beam.ParDo(s, func(w typex.Window, e string) string {
		t.Logf("[1] window: %v", w)
		return e
	}, col)

	// add a key and group by it; inspect events, when emitted
	col = beam.AddFixedKey(s, col)
	col = beam.ParDo(s, func(et typex.EventTime, group int, x beam.X) (int, beam.X) {
		t.Logf("[2] at %v got (group %d)",
			time.UnixMilli(int64(et)),
			group)
		return group, x
	}, col)

	// ISSUE IS HERE
	// It doesn't matter the trigger I use, it looks like GroupByKey
	// always wants to hold everything into memory and only then
	// emit it's outputs. With large files is always OOMs.
	col = beam.GroupByKey(s, col)
	beam.ParDo0(s, func(w typex.Window, group int, valIter func(*string) bool) {
		sb := strings.Builder{}
		fmt.Fprintf(&sb, "[3] win=%v out group=%d", w, group)
		var elm string
		for valIter(&elm) {
			fmt.Fprintf(&sb, " %s;", elm)
		}
		t.Log(sb.String())
	}, col)

	mustNotFail(beamx.Run(context.Background(), pipeline))
}

输出:

    writer_test.go:58: [0] input event, time=1577836800000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836820000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836840000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836860000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836880000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836900000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836920000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836940000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836960000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836980000
writer_test.go:69: [1] window: [1577836980000:1577837040000)
writer_test.go:79: [2] at 2020-01-01 01:03:00 +0100 CET got (group 0)
writer_test.go:95: [3] win=[1577836920000:1577836980000) out group=0 line 6; line 7; line 8;
writer_test.go:95: [3] win=[1577836980000:1577837040000) out group=0 line 9;
writer_test.go:95: [3] win=[1577836800000:1577836860000) out group=0 line 0; line 1; line 2;
writer_test.go:95: [3] win=[1577836860000:1577836920000) out group=0 line 3; line 4; line 5;

编辑:我找到了与触发器窗口相关的Jira票据,目前来看,触发器和特别是触发器传播是一个正在进行中的工作。

英文:

I'm writing a pipeline code that will be used in both batch and streaming mode with DataFlow, and I'm having OOM issues when using GroupByKey when operating in batch mode. The code bellow shows the issue: when I have a large file, GroupByKey appears to hold everything in memory, only emitting values after the input finishes. I tried to use triggers to force events to be triggered, but failed. I can't find any way of using this transform on big files.

How can implement a pipeline in beam go that includes grouping and that can efficiently work on large files?

package sisubqio_test

import (
	&quot;context&quot;
	&quot;flag&quot;
	&quot;fmt&quot;
	&quot;io&quot;
	&quot;os&quot;
	&quot;strings&quot;
	&quot;sync/atomic&quot;
	&quot;testing&quot;
	&quot;time&quot;

	&quot;github.com/apache/beam/sdks/v2/go/pkg/beam&quot;
	&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime&quot;
	&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window&quot;
	&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex&quot;
	&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio&quot;
	&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx&quot;
)

func TestWriter(t *testing.T) {
	mustNotFail := func(err error) {
		if err != nil {
			t.Fatal(err)
		}
	}

	// test file with a few lines of text
	fName := &quot;in.tmp.txt&quot;
	f, err := os.OpenFile(fName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
	mustNotFail(err)
	defer func() {
		mustNotFail(f.Close())
		mustNotFail(os.Remove(fName))
	}()
	for i := 0; i &lt; 10; i++ {
		_, err = fmt.Fprintf(f, &quot;line %d\n&quot;, i)
		mustNotFail(err)
	}

	_, err = f.Seek(0, io.SeekStart)
	mustNotFail(err)

	flag.Parse()
	beam.Init()

	pipeline, s := beam.NewPipelineWithRoot()
	col := textio.Read(s, fName)

	// add timestamp to messages: each message has a timestamp 20s after
	// the previous one
	now := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
	var counter int32
	col = beam.ParDo(s, func(line beam.X) (typex.EventTime, beam.X) {
		i := atomic.AddInt32(&amp;counter, 1) - 1
		evTime := mtime.Time(mtime.FromTime(now.Add(20 * time.Duration(i) * time.Second)).Milliseconds())
		t.Logf(&quot;[0] input event, time=%v&quot;, evTime)
		return evTime, line
	}, col)

	// add a window and inspect events, when emitted
	col = beam.WindowInto(s,
		window.NewFixedWindows(time.Minute),
		col,
		beam.Trigger(window.TriggerAlways()), // I tried all triggers here; makes no difference
	)
	col = beam.ParDo(s, func(w typex.Window, e string) string {
		t.Logf(&quot;[1] window: %v&quot;, w)
		return e
	}, col)

	// add a key and group by it; inspect events, when emitted
	col = beam.AddFixedKey(s, col)
	col = beam.ParDo(s, func(et typex.EventTime, group int, x beam.X) (int, beam.X) {
		t.Logf(&quot;[2] at %v got (group %d)&quot;,
			time.UnixMilli(int64(et)),
			group)
		return group, x
	}, col)

	// ISSUE IS HERE
	// It doesn&#39;t matter the trigger I use, it looks like GroupByKey
	// always wants to hold everything into memory and only then
	// emit it&#39;s outputs. With large files is always OOMs.
	col = beam.GroupByKey(s, col)
	beam.ParDo0(s, func(w typex.Window, group int, valIter func(*string) bool) {
		sb := strings.Builder{}
		fmt.Fprintf(&amp;sb, &quot;[3] win=%v out group=%d&quot;, w, group)
		var elm string
		for valIter(&amp;elm) {
			fmt.Fprintf(&amp;sb, &quot; %s;&quot;, elm)
		}
		t.Log(sb.String())
	}, col)

	mustNotFail(beamx.Run(context.Background(), pipeline))
}

Output:

    writer_test.go:58: [0] input event, time=1577836800000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836820000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836840000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836860000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836880000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836900000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836920000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836940000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836960000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836980000
writer_test.go:69: [1] window: [1577836980000:1577837040000)
writer_test.go:79: [2] at 2020-01-01 01:03:00 +0100 CET got (group 0)
writer_test.go:95: [3] win=[1577836920000:1577836980000) out group=0 line 6; line 7; line 8;
writer_test.go:95: [3] win=[1577836980000:1577837040000) out group=0 line 9;
writer_test.go:95: [3] win=[1577836800000:1577836860000) out group=0 line 0; line 1; line 2;
writer_test.go:95: [3] win=[1577836860000:1577836920000) out group=0 line 3; line 4; line 5;

EDIT: I found Jira tickets related to triggers and windows that, at the time of writing this, make be believe that the trigger, and specially trigger propagation is a WIP.

答案1

得分: 1

Beam使用映射和归约操作。映射(转换)可以并行在不同的工作节点/虚拟机上进行。归约需要知道所有要执行的元素,因此它会将所有元素加载到内存中,然后执行归约groupBy操作。

你有两个解决方案:

  • 你可以创建窗口来仅处理大文件的部分数据块。然而,你的groupBy操作将不是全局的,而是基于窗口的。

  • 你也可以尝试新的Dataflow Prime选项。它是无服务器且可完全扩展的。承诺解决所有OOM错误(我只在Java中遇到过,我从未使用过Beam Go SDK)。

你还可以增加工作节点的内存,但这不是可扩展的解决方案(而且成本更高!)。Prime选项是一个好选择(但仍处于预览阶段)。

英文:

Beam uses map and reduce operations. Map (transforms) can be done in parallel and on different workers/VM. Reduce needs to know all the elements to be performed, thus it loads all the elements in memory and then it performs the reduce groupBy operation.

You have 2 solutions:

  • You can create windows to process only chunks of your large file. However, your groupBy won't be global, but per window.

  • You can also try the new Dataflow prime option. It's serverless and fully scalable. The promise is to remove all the OOM error (that I got only in Java, I never use the Beam Go SDK)

You can also increase the memory of your worker, but it's not a scalable solution (and it costs more!). The prime option is the good one (but still in preview)

huangapple
  • 本文由 发表于 2021年9月24日 01:07:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/69304255.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定