英文:
Apache Beam ParDo Filter in Go
问题
我是一个Python开发者,但是现在需要使用Go来创建一个Dataflow流水线。相比Python或Java,我发现很难找到使用Go的Apache Beam的示例。
我有以下代码,其中包含用户名和年龄的结构。任务是增加年龄,然后根据年龄进行过滤。我找到了增加年龄的方法,但在过滤部分卡住了。
package main
import (
	"context"
	"flag"
	"fmt"
	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func init() {
	beam.RegisterFunction(incrementAge)
}
type user struct {
	Name string
	Age  int
}
func printRow(ctx context.Context, list user) {
	fmt.Println(list)
}
func incrementAge(list user) user {
	list.Age++
	return list
}
func main() {
	flag.Parse()
	beam.Init()
	ctx := context.Background()
	p := beam.NewPipeline()
	s := p.Root()
	var userList = []user{
		{"Bob", 40},
		{"Adam", 50},
		{"John", 35},
		{"Ben", 8},
	}
	initial := beam.CreateList(s, userList)
	pc := beam.ParDo(s, incrementAge, initial)
	pc1 := beam.ParDo(s, func(row user, emit func(user)) {
		emit(row)
	}, pc)
	beam.ParDo0(s, printRow, pc1)
	if err := beamx.Run(ctx, p); err != nil {
		log.Exitf(ctx, "Failed to execute job: %v", err)
	}
}
我尝试创建以下函数,但它返回的是一个布尔值而不是一个user对象。我知道我错过了一些简单的东西,但无法弄清楚。
func filterAge(list user) user {
	return list.Age > 40	
}
在Python中,我可以编写以下函数:
beam.Filter(lambda line: line["Age"] >= 40)
英文:
I'am a Python developer but supposed to make a Dataflow pipeline using Go.
I couldn't find as many examples for Apache Beam using Go as compared to Python or Java.
I have the below code which has a structure of user name and age. The task is to increment the age and then filter on Age. I found the way to increment the age but stuck on the filtering part.
package main
import (
	"context"
	"flag"
	"fmt"
	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func init() {
	beam.RegisterFunction(incrementAge)
}
type user struct {
	Name string
	Age  int
}
func printRow(ctx context.Context, list user) {
	fmt.Println(list)
}
func incrementAge(list user) user {
	list.Age++
	return list
}
func main() {
	flag.Parse()
	beam.Init()
	ctx := context.Background()
	p := beam.NewPipeline()
	s := p.Root()
	var userList = []user{
		{"Bob", 40},
		{"Adam", 50},
		{"John", 35},
		{"Ben", 8},
	}
	initial := beam.CreateList(s, userList)
	pc := beam.ParDo(s, incrementAge, initial)
	pc1 := beam.ParDo(s, func(row user, emit func(user)) {
		emit(row)
	}, pc)
	beam.ParDo0(s, printRow, pc1)
	if err := beamx.Run(ctx, p); err != nil {
		log.Exitf(ctx, "Failed to execute job: %v", err)
	}
}
I tried creating a function like below but this returns a bool and not a user object. I know I'am missing out on something simple but unable to figure out.
func filterAge(list user) user {
	return list.Age > 40	
}
In Python I could write function like below.
beam.Filter(lambda line: line["Age"] >= 40))
答案1
得分: 2
你需要在函数中添加一个发射器来发射用户:
func filterAge(list user, emit func(user)) {
    if list.Age > 40 {
        emit(list)
    }
}
根据你当前的代码编写,return list.Age > 40,list.Age > 40首先会被计算为true(布尔值),然后将该布尔值返回。
英文:
You need to add an emitter in the function to emit user:
func filterAge(list user, emit func(user)) {
    if list.Age > 40 {
        emit(list)
    }
}
As written in your current code,  return list.Age > 40
list.Age > 40 evaluates to true first (boolean) and this boolean is getting returned.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论