英文:
Apache Beam ParDo Filter in Go
问题
我是一个Python开发者,但是现在需要使用Go来创建一个Dataflow流水线。相比Python或Java,我发现很难找到使用Go的Apache Beam的示例。
我有以下代码,其中包含用户名和年龄的结构。任务是增加年龄,然后根据年龄进行过滤。我找到了增加年龄的方法,但在过滤部分卡住了。
package main
import (
"context"
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func init() {
beam.RegisterFunction(incrementAge)
}
type user struct {
Name string
Age int
}
func printRow(ctx context.Context, list user) {
fmt.Println(list)
}
func incrementAge(list user) user {
list.Age++
return list
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
var userList = []user{
{"Bob", 40},
{"Adam", 50},
{"John", 35},
{"Ben", 8},
}
initial := beam.CreateList(s, userList)
pc := beam.ParDo(s, incrementAge, initial)
pc1 := beam.ParDo(s, func(row user, emit func(user)) {
emit(row)
}, pc)
beam.ParDo0(s, printRow, pc1)
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
我尝试创建以下函数,但它返回的是一个布尔值而不是一个user对象。我知道我错过了一些简单的东西,但无法弄清楚。
func filterAge(list user) user {
return list.Age > 40
}
在Python中,我可以编写以下函数:
beam.Filter(lambda line: line["Age"] >= 40)
英文:
I'am a Python developer but supposed to make a Dataflow pipeline using Go.
I couldn't find as many examples for Apache Beam using Go as compared to Python or Java.
I have the below code which has a structure of user name and age. The task is to increment the age and then filter on Age. I found the way to increment the age but stuck on the filtering part.
package main
import (
"context"
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func init() {
beam.RegisterFunction(incrementAge)
}
type user struct {
Name string
Age int
}
func printRow(ctx context.Context, list user) {
fmt.Println(list)
}
func incrementAge(list user) user {
list.Age++
return list
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
var userList = []user{
{"Bob", 40},
{"Adam", 50},
{"John", 35},
{"Ben", 8},
}
initial := beam.CreateList(s, userList)
pc := beam.ParDo(s, incrementAge, initial)
pc1 := beam.ParDo(s, func(row user, emit func(user)) {
emit(row)
}, pc)
beam.ParDo0(s, printRow, pc1)
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
I tried creating a function like below but this returns a bool and not a user object. I know I'am missing out on something simple but unable to figure out.
func filterAge(list user) user {
return list.Age > 40
}
In Python I could write function like below.
beam.Filter(lambda line: line["Age"] >= 40))
答案1
得分: 2
你需要在函数中添加一个发射器来发射用户:
func filterAge(list user, emit func(user)) {
if list.Age > 40 {
emit(list)
}
}
根据你当前的代码编写,return list.Age > 40
,list.Age > 40
首先会被计算为true(布尔值),然后将该布尔值返回。
英文:
You need to add an emitter in the function to emit user:
func filterAge(list user, emit func(user)) {
if list.Age > 40 {
emit(list)
}
}
As written in your current code, return list.Age > 40
list.Age > 40
evaluates to true first (boolean) and this boolean is getting returned.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论