Apache Beam中的ParDo Filter在Go中的实现

huangapple go评论95阅读模式
英文:

Apache Beam ParDo Filter in Go

问题

我是一个Python开发者,但是现在需要使用Go来创建一个Dataflow流水线。相比Python或Java,我发现很难找到使用Go的Apache Beam的示例。

我有以下代码,其中包含用户名和年龄的结构。任务是增加年龄,然后根据年龄进行过滤。我找到了增加年龄的方法,但在过滤部分卡住了。

package main

import (
	"context"
	"flag"
	"fmt"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
	beam.RegisterFunction(incrementAge)
}

type user struct {
	Name string
	Age  int
}

func printRow(ctx context.Context, list user) {
	fmt.Println(list)
}

func incrementAge(list user) user {
	list.Age++
	return list
}

func main() {

	flag.Parse()
	beam.Init()

	ctx := context.Background()

	p := beam.NewPipeline()
	s := p.Root()

	var userList = []user{
		{"Bob", 40},
		{"Adam", 50},
		{"John", 35},
		{"Ben", 8},
	}
	initial := beam.CreateList(s, userList)

	pc := beam.ParDo(s, incrementAge, initial)

	pc1 := beam.ParDo(s, func(row user, emit func(user)) {
		emit(row)
	}, pc)

	beam.ParDo0(s, printRow, pc1)

	if err := beamx.Run(ctx, p); err != nil {
		log.Exitf(ctx, "Failed to execute job: %v", err)
	}

}

我尝试创建以下函数,但它返回的是一个布尔值而不是一个user对象。我知道我错过了一些简单的东西,但无法弄清楚。

func filterAge(list user) user {
	return list.Age > 40	
}

在Python中,我可以编写以下函数:

beam.Filter(lambda line: line["Age"] >= 40)
英文:

I'am a Python developer but supposed to make a Dataflow pipeline using Go.
I couldn't find as many examples for Apache Beam using Go as compared to Python or Java.

I have the below code which has a structure of user name and age. The task is to increment the age and then filter on Age. I found the way to increment the age but stuck on the filtering part.

package main

import (
	"context"
	"flag"
	"fmt"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
	beam.RegisterFunction(incrementAge)
}

type user struct {
	Name string
	Age  int
}

func printRow(ctx context.Context, list user) {
	fmt.Println(list)
}

func incrementAge(list user) user {
	list.Age++
	return list
}

func main() {

	flag.Parse()
	beam.Init()

	ctx := context.Background()

	p := beam.NewPipeline()
	s := p.Root()

	var userList = []user{
		{"Bob", 40},
		{"Adam", 50},
		{"John", 35},
		{"Ben", 8},
	}
	initial := beam.CreateList(s, userList)

	pc := beam.ParDo(s, incrementAge, initial)

	pc1 := beam.ParDo(s, func(row user, emit func(user)) {
		emit(row)
	}, pc)

	beam.ParDo0(s, printRow, pc1)

	if err := beamx.Run(ctx, p); err != nil {
		log.Exitf(ctx, "Failed to execute job: %v", err)
	}

}

I tried creating a function like below but this returns a bool and not a user object. I know I'am missing out on something simple but unable to figure out.

func filterAge(list user) user {
	return list.Age > 40	
}

In Python I could write function like below.

beam.Filter(lambda line: line["Age"] >= 40))

答案1

得分: 2

你需要在函数中添加一个发射器来发射用户:

func filterAge(list user, emit func(user)) {
    if list.Age > 40 {
        emit(list)
    }
}

根据你当前的代码编写,return list.Age > 40list.Age > 40首先会被计算为true(布尔值),然后将该布尔值返回。

英文:

You need to add an emitter in the function to emit user:

func filterAge(list user, emit func(user)) {
    if list.Age > 40 {
        emit(list)
    }
}

As written in your current code, return list.Age > 40
list.Age > 40 evaluates to true first (boolean) and this boolean is getting returned.

huangapple
  • 本文由 发表于 2023年2月1日 08:49:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/75304601.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定