Apache Beam 在 Go 中从 PCollection 中选择前 N 行

huangapple go评论84阅读模式
英文:

Apache Beam Select Top N rows from PCollection in Go

问题

我正在为您翻译以下内容:

我有一个PCollection,我需要从中选择n个最大的行。我正在尝试使用Go创建一个Dataflow流水线,但在这一点上遇到了问题。

package main

import (
	"context"
	"flag"
	"fmt"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type User struct {
	Name string
	Age  int
}

func printRow(ctx context.Context, list User) {
	fmt.Println(list)
}

func main() {

	flag.Parse()
	beam.Init()

	ctx := context.Background()

	p := beam.NewPipeline()
	s := p.Root()

	var userList = []User{
		{"Bob", 5},
		{"Adam", 8},
		{"John", 3},
		{"Ben", 1},
		{"Jose", 1},
		{"Bryan", 1},
		{"Kim", 1},
		{"Tim", 1},
	}
	initial := beam.CreateList(s, userList)

	pc2 := beam.ParDo(s, func(row User, emit func(User)) {
		emit(row)
	}, initial)

	beam.ParDo0(s, printRow, pc2)

	if err := beamx.Run(ctx, p); err != nil {
		log.Exitf(ctx, "Failed to execute job: %v", err)
	}

}

从上面的代码中,我需要根据User.Age选择前5行。我找到了top package的链接,它有一个函数可以做到这一点,但它说它返回一个单个元素的PCollection。它有什么不同?

package main

import (
	"context"
	"flag"
	"fmt"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
	beam.RegisterFunction(less)
}

type User struct {
	Name string
	Age  int
}

func printRow(ctx context.Context, list User) {
	fmt.Println(list)
}

func less(a, b User) bool {
	return a.Age < b.Age
}

func main() {

	flag.Parse()
	beam.Init()

	ctx := context.Background()

	p := beam.NewPipeline()
	s := p.Root()

	var userList = []User{
		{"Bob", 5},
		{"Adam", 8},
		{"John", 3},
		{"Ben", 1},
		{"Jose", 1},
		{"Bryan", 1},
		{"Kim", 1},
		{"Tim", 1},
	}
	initial := beam.CreateList(s, userList)

	best := top.Largest(s, initial, 5, less)

	pc2 := beam.ParDo(s, func(row User, emit func(User)) {
		emit(row)
	}, best)

	beam.ParDo0(s, printRow, pc2)

	if err := beamx.Run(ctx, p); err != nil {
		log.Exitf(ctx, "Failed to execute job: %v", err)
	}

}

我添加了上述函数来选择前5行,但是我收到一个错误[]main.User is not assignable to main.User

我需要以与之前相同的格式获得PCollection,因为我还需要进行进一步的处理。我怀疑这是因为top.Largest函数返回一个单元素的PCollection。有关如何转换格式的任何想法吗?

英文:

I'am having a PCollection from which I need to choose n largest rows. I'am trying to create a Dataflow pipeline using Go and stuck at this.

package main
import (
&quot;context&quot;
&quot;flag&quot;
&quot;fmt&quot;
&quot;github.com/apache/beam/sdks/v2/go/pkg/beam&quot;
&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/log&quot;
&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx&quot;
)
type User struct {
Name string
Age  int
}
func printRow(ctx context.Context, list User) {
fmt.Println(list)
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
var userList = []User{
{&quot;Bob&quot;, 5},
{&quot;Adam&quot;, 8},
{&quot;John&quot;, 3},
{&quot;Ben&quot;, 1},
{&quot;Jose&quot;, 1},
{&quot;Bryan&quot;, 1},
{&quot;Kim&quot;, 1},
{&quot;Tim&quot;, 1},
}
initial := beam.CreateList(s, userList)
pc2 := beam.ParDo(s, func(row User, emit func(User)) {
emit(row)
}, initial)
beam.ParDo0(s, printRow, pc2)
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, &quot;Failed to execute job: %v&quot;, err)
}
}

From the above code I need to choose top 5 rows based on User.Age
I found the link top package which has a function does the same but it says it returns a single element PCollection. How is it different?

package main
import (
&quot;context&quot;
&quot;flag&quot;
&quot;fmt&quot;
&quot;github.com/apache/beam/sdks/v2/go/pkg/beam&quot;
&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/log&quot;
&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top&quot;
&quot;github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx&quot;
)
func init() {
beam.RegisterFunction(less)
}
type User struct {
Name string
Age  int
}
func printRow(ctx context.Context, list User) {
fmt.Println(list)
}
func less(a, b User) bool {
return a.Age &lt; b.Age
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
var userList = []User{
{&quot;Bob&quot;, 5},
{&quot;Adam&quot;, 8},
{&quot;John&quot;, 3},
{&quot;Ben&quot;, 1},
{&quot;Jose&quot;, 1},
{&quot;Bryan&quot;, 1},
{&quot;Kim&quot;, 1},
{&quot;Tim&quot;, 1},
}
initial := beam.CreateList(s, userList)
best := top.Largest(s, initial, 5, less)
pc2 := beam.ParDo(s, func(row User, emit func(User)) {
emit(row)
}, best)
beam.ParDo0(s, printRow, pc2)
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, &quot;Failed to execute job: %v&quot;, err)
}
}

I added the function to select the top 5 rows like above, but I get an error []main.User is not assignable to main.User

I need the PCollection in the same format as before since I have further processing to do. I suspect this is because the top.Largest function is returning a single-element PCollection. Any ideas on how I can convert the format?

答案1

得分: 2

最好的 PCollection 是 []User

所以尝试...

pc2 := beam.ParDo(s, func(rows []User, emit func(User)) {
for _, row := range rows {
emit(row)
}
}, best)
英文:

best PCollection is []User

so try...

pc2 := beam.ParDo(s, func(rows []User, emit func(User)) {
for _, row := range rows {
emit(row)
}
}, best)

huangapple
  • 本文由 发表于 2023年2月2日 07:16:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/75317186.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定