Apache Beam在Go语言中的左连接

huangapple go评论81阅读模式
英文:

Apache Beam Left Join in Go

问题

有没有一种简单的方法可以使用Go执行两个PCollections的左连接?我看到SQL连接只能在Java中使用。

package main

import (
	"context"
	"flag"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

type customer struct {
	CustID int
	FName  string
}

type order struct {
	OrderID int
	Amount  int
	Cust_ID int
}

func main() {

	flag.Parse()
	beam.Init()

	ctx := context.Background()

	p := beam.NewPipeline()
	s := p.Root()

	var custList = []customer{
		{1, "Bob"},
		{2, "Adam"},
		{3, "John"},
		{4, "Ben"},
		{5, "Jose"},
		{6, "Bryan"},
		{7, "Kim"},
		{8, "Tim"},
	}

	var orderList = []order{
		{123, 100, 1},
		{125, 30, 3},
		{128, 50, 7},
	}

	custPCol := beam.CreateList(s, custList)

	orderPCol := beam.CreateList(s, orderList)

	// 左连接 custPcol 和 orderPCol
	// 期望结果
	// CustID | FName	|OrderID| Amount
	//     1  | Bob  	|	123	| 100
	//     2  | Adam  	|		|
	//     3  | John  	|	125	| 100
	//     4  | Ben  	|		|
	//     5  | Jose  	|		|
	//     6  | Bryan  	|		|
	//     7  | Kim  	|	125	| 100
	//     8  | Tim  	|		|

	if err := beamx.Run(ctx, p); err != nil {
		log.Exitf(ctx, "Failed to execute job: %v", err)
	}

}

我想要连接这两个PCollections并执行进一步的操作。我看了关于CoGroupByKey的文档,但无法将其转换为普通SQL Join的格式。

对此有什么建议吗?

英文:

Is there a simple to perform a left join of 2 PCollections using Go?
I see that the SQL joins are available only in Java.

package main
import (
"context"
"flag"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
type customer struct {
CustID int
FName  string
}
type order struct {
OrderID int
Amount  int
Cust_ID int
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
var custList = []customer{
{1, "Bob"},
{2, "Adam"},
{3, "John"},
{4, "Ben"},
{5, "Jose"},
{6, "Bryan"},
{7, "Kim"},
{8, "Tim"},
}
var orderList = []order{
{123, 100, 1},
{125, 30, 3},
{128, 50, 7},
}
custPCol := beam.CreateList(s, custList)
orderPCol := beam.CreateList(s, orderList)
// Left Join custPcol with orderPCol
// Expected Result
// CustID | FName	|OrderID| Amount
//     1  | Bob  	|	123	| 100
//     2  | Adam  	|		|
//     3  | John  	|	125	| 100
//     4  | Ben  	|		|
//     5  | Jose  	|		|
//     6  | Bryan  	|		|
//     7  | Kim  	|	125	| 100
//     8  | Tim  	|		|
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}

I want join these 2 PCollections and perform further operations. I saw the documentations about CoGroupByKey but unable to get it into the format which a normal SQL Join would do.

Any suggestions on this?

答案1

得分: 4

尝试这样写:

type resultType struct {
	CustID  int
	FName   string
	OrderID int
	Amount  int
}

result := beam.ParDo(s, func(c customer, iterOrder func(*order) bool) resultType {
	var o order

	for iterOrder(&o) {
		if c.CustID == o.Cust_ID {
			return resultType{
				CustID:  c.CustID,
				FName:   c.FName,
				OrderID: o.OrderID,
				Amount:  o.Amount,
			}
		}
	}

	return resultType{
		CustID: c.CustID,
		FName:  c.FName,
	}
}, custPCol, beam.SideInput{Input: orderPCol})


或者如果你想使用 CoGroupByKey ...

custWithKeyPCol := beam.ParDo(s, func(c customer) (int, customer) {
	return c.CustID, c
}, custPCol)

orderWithKeyPCol := beam.ParDo(s, func(o order) (int, order) {
	return o.Cust_ID, o
}, orderPCol)

resultPCol := beam.CoGroupByKey(s, custWithKeyPCol, orderWithKeyPCol)

beam.ParDo0(s, func(CustID int, custIter func(*customer) bool, orderIter func(*order) bool) {
	c, o := customer{}, order{}
	for custIter(&c) {
		if ok := orderIter(&o); ok {
			fmt.Println(CustID, c.FName, o.OrderID, o.Amount)
		}
		fmt.Println(CustID, c.FName)
	}
}, resultPCol)
英文:

try like this

type resultType struct {
CustID  int
FName   string
OrderID int
Amount  int
}
result := beam.ParDo(s, func(c customer, iterOrder func(*order) bool) resultType {
var o order
for iterOrder(&o) {
if c.CustID == o.Cust_ID {
return resultType{
CustID:  c.CustID,
FName:   c.FName,
OrderID: o.OrderID,
Amount:  o.Amount,
}
}
}
return resultType{
CustID: c.CustID,
FName:  c.FName,
}
}, custPCol, beam.SideInput{Input: orderPCol})

or if you want using CoGroupByKey ...

custWithKeyPCol := beam.ParDo(s, func(c customer) (int, customer) {
return c.CustID, c
}, custPCol)
orderWithKeyPCol := beam.ParDo(s, func(o order) (int, order) {
return o.Cust_ID, o
}, orderPCol)
resultPCol := beam.CoGroupByKey(s, custWithKeyPCol, orderWithKeyPCol)
beam.ParDo0(s, func(CustID int, custIter func(*customer) bool, orderIter func(*order) bool) {
c, o := customer{}, order{}
for custIter(&c) {
if ok := orderIter(&o); ok {
fmt.Println(CustID, c.FName, o.OrderID, o.Amount)
}
fmt.Println(CustID, c.FName)
}
}, resultPCol)

huangapple
  • 本文由 发表于 2023年2月2日 13:49:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/75319138.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定