英文:
Apache Beam Left Join in Go
问题
有没有一种简单的方法可以使用Go执行两个PCollections的左连接?我看到SQL连接只能在Java中使用。
package main
import (
"context"
"flag"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
type customer struct {
CustID int
FName string
}
type order struct {
OrderID int
Amount int
Cust_ID int
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
var custList = []customer{
{1, "Bob"},
{2, "Adam"},
{3, "John"},
{4, "Ben"},
{5, "Jose"},
{6, "Bryan"},
{7, "Kim"},
{8, "Tim"},
}
var orderList = []order{
{123, 100, 1},
{125, 30, 3},
{128, 50, 7},
}
custPCol := beam.CreateList(s, custList)
orderPCol := beam.CreateList(s, orderList)
// 左连接 custPcol 和 orderPCol
// 期望结果
// CustID | FName |OrderID| Amount
// 1 | Bob | 123 | 100
// 2 | Adam | |
// 3 | John | 125 | 100
// 4 | Ben | |
// 5 | Jose | |
// 6 | Bryan | |
// 7 | Kim | 125 | 100
// 8 | Tim | |
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
我想要连接这两个PCollections并执行进一步的操作。我看了关于CoGroupByKey的文档,但无法将其转换为普通SQL Join的格式。
对此有什么建议吗?
英文:
Is there a simple to perform a left join of 2 PCollections using Go?
I see that the SQL joins are available only in Java.
package main
import (
"context"
"flag"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
type customer struct {
CustID int
FName string
}
type order struct {
OrderID int
Amount int
Cust_ID int
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
var custList = []customer{
{1, "Bob"},
{2, "Adam"},
{3, "John"},
{4, "Ben"},
{5, "Jose"},
{6, "Bryan"},
{7, "Kim"},
{8, "Tim"},
}
var orderList = []order{
{123, 100, 1},
{125, 30, 3},
{128, 50, 7},
}
custPCol := beam.CreateList(s, custList)
orderPCol := beam.CreateList(s, orderList)
// Left Join custPcol with orderPCol
// Expected Result
// CustID | FName |OrderID| Amount
// 1 | Bob | 123 | 100
// 2 | Adam | |
// 3 | John | 125 | 100
// 4 | Ben | |
// 5 | Jose | |
// 6 | Bryan | |
// 7 | Kim | 125 | 100
// 8 | Tim | |
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
I want join these 2 PCollections and perform further operations. I saw the documentations about CoGroupByKey but unable to get it into the format which a normal SQL Join would do.
Any suggestions on this?
答案1
得分: 4
尝试这样写:
type resultType struct {
CustID int
FName string
OrderID int
Amount int
}
result := beam.ParDo(s, func(c customer, iterOrder func(*order) bool) resultType {
var o order
for iterOrder(&o) {
if c.CustID == o.Cust_ID {
return resultType{
CustID: c.CustID,
FName: c.FName,
OrderID: o.OrderID,
Amount: o.Amount,
}
}
}
return resultType{
CustID: c.CustID,
FName: c.FName,
}
}, custPCol, beam.SideInput{Input: orderPCol})
或者如果你想使用 CoGroupByKey ...
custWithKeyPCol := beam.ParDo(s, func(c customer) (int, customer) {
return c.CustID, c
}, custPCol)
orderWithKeyPCol := beam.ParDo(s, func(o order) (int, order) {
return o.Cust_ID, o
}, orderPCol)
resultPCol := beam.CoGroupByKey(s, custWithKeyPCol, orderWithKeyPCol)
beam.ParDo0(s, func(CustID int, custIter func(*customer) bool, orderIter func(*order) bool) {
c, o := customer{}, order{}
for custIter(&c) {
if ok := orderIter(&o); ok {
fmt.Println(CustID, c.FName, o.OrderID, o.Amount)
}
fmt.Println(CustID, c.FName)
}
}, resultPCol)
英文:
try like this
type resultType struct {
CustID int
FName string
OrderID int
Amount int
}
result := beam.ParDo(s, func(c customer, iterOrder func(*order) bool) resultType {
var o order
for iterOrder(&o) {
if c.CustID == o.Cust_ID {
return resultType{
CustID: c.CustID,
FName: c.FName,
OrderID: o.OrderID,
Amount: o.Amount,
}
}
}
return resultType{
CustID: c.CustID,
FName: c.FName,
}
}, custPCol, beam.SideInput{Input: orderPCol})
or if you want using CoGroupByKey ...
custWithKeyPCol := beam.ParDo(s, func(c customer) (int, customer) {
return c.CustID, c
}, custPCol)
orderWithKeyPCol := beam.ParDo(s, func(o order) (int, order) {
return o.Cust_ID, o
}, orderPCol)
resultPCol := beam.CoGroupByKey(s, custWithKeyPCol, orderWithKeyPCol)
beam.ParDo0(s, func(CustID int, custIter func(*customer) bool, orderIter func(*order) bool) {
c, o := customer{}, order{}
for custIter(&c) {
if ok := orderIter(&o); ok {
fmt.Println(CustID, c.FName, o.OrderID, o.Amount)
}
fmt.Println(CustID, c.FName)
}
}, resultPCol)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论