Apache Beam在Go语言中的左连接

huangapple go评论111阅读模式
英文:

Apache Beam Left Join in Go

问题

有没有一种简单的方法可以使用Go执行两个PCollections的左连接?我看到SQL连接只能在Java中使用。

  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "github.com/apache/beam/sdks/v2/go/pkg/beam"
  6. "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
  7. "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
  8. )
  9. type customer struct {
  10. CustID int
  11. FName string
  12. }
  13. type order struct {
  14. OrderID int
  15. Amount int
  16. Cust_ID int
  17. }
  18. func main() {
  19. flag.Parse()
  20. beam.Init()
  21. ctx := context.Background()
  22. p := beam.NewPipeline()
  23. s := p.Root()
  24. var custList = []customer{
  25. {1, "Bob"},
  26. {2, "Adam"},
  27. {3, "John"},
  28. {4, "Ben"},
  29. {5, "Jose"},
  30. {6, "Bryan"},
  31. {7, "Kim"},
  32. {8, "Tim"},
  33. }
  34. var orderList = []order{
  35. {123, 100, 1},
  36. {125, 30, 3},
  37. {128, 50, 7},
  38. }
  39. custPCol := beam.CreateList(s, custList)
  40. orderPCol := beam.CreateList(s, orderList)
  41. // 左连接 custPcol 和 orderPCol
  42. // 期望结果
  43. // CustID | FName |OrderID| Amount
  44. // 1 | Bob | 123 | 100
  45. // 2 | Adam | |
  46. // 3 | John | 125 | 100
  47. // 4 | Ben | |
  48. // 5 | Jose | |
  49. // 6 | Bryan | |
  50. // 7 | Kim | 125 | 100
  51. // 8 | Tim | |
  52. if err := beamx.Run(ctx, p); err != nil {
  53. log.Exitf(ctx, "Failed to execute job: %v", err)
  54. }
  55. }

我想要连接这两个PCollections并执行进一步的操作。我看了关于CoGroupByKey的文档,但无法将其转换为普通SQL Join的格式。

对此有什么建议吗?

英文:

Is there a simple to perform a left join of 2 PCollections using Go?
I see that the SQL joins are available only in Java.

  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "github.com/apache/beam/sdks/v2/go/pkg/beam"
  6. "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
  7. "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
  8. )
  9. type customer struct {
  10. CustID int
  11. FName string
  12. }
  13. type order struct {
  14. OrderID int
  15. Amount int
  16. Cust_ID int
  17. }
  18. func main() {
  19. flag.Parse()
  20. beam.Init()
  21. ctx := context.Background()
  22. p := beam.NewPipeline()
  23. s := p.Root()
  24. var custList = []customer{
  25. {1, "Bob"},
  26. {2, "Adam"},
  27. {3, "John"},
  28. {4, "Ben"},
  29. {5, "Jose"},
  30. {6, "Bryan"},
  31. {7, "Kim"},
  32. {8, "Tim"},
  33. }
  34. var orderList = []order{
  35. {123, 100, 1},
  36. {125, 30, 3},
  37. {128, 50, 7},
  38. }
  39. custPCol := beam.CreateList(s, custList)
  40. orderPCol := beam.CreateList(s, orderList)
  41. // Left Join custPcol with orderPCol
  42. // Expected Result
  43. // CustID | FName |OrderID| Amount
  44. // 1 | Bob | 123 | 100
  45. // 2 | Adam | |
  46. // 3 | John | 125 | 100
  47. // 4 | Ben | |
  48. // 5 | Jose | |
  49. // 6 | Bryan | |
  50. // 7 | Kim | 125 | 100
  51. // 8 | Tim | |
  52. if err := beamx.Run(ctx, p); err != nil {
  53. log.Exitf(ctx, "Failed to execute job: %v", err)
  54. }
  55. }

I want join these 2 PCollections and perform further operations. I saw the documentations about CoGroupByKey but unable to get it into the format which a normal SQL Join would do.

Any suggestions on this?

答案1

得分: 4

尝试这样写:

  1. type resultType struct {
  2. CustID int
  3. FName string
  4. OrderID int
  5. Amount int
  6. }
  7. result := beam.ParDo(s, func(c customer, iterOrder func(*order) bool) resultType {
  8. var o order
  9. for iterOrder(&o) {
  10. if c.CustID == o.Cust_ID {
  11. return resultType{
  12. CustID: c.CustID,
  13. FName: c.FName,
  14. OrderID: o.OrderID,
  15. Amount: o.Amount,
  16. }
  17. }
  18. }
  19. return resultType{
  20. CustID: c.CustID,
  21. FName: c.FName,
  22. }
  23. }, custPCol, beam.SideInput{Input: orderPCol})
  24. 或者如果你想使用 CoGroupByKey ...
  25. custWithKeyPCol := beam.ParDo(s, func(c customer) (int, customer) {
  26. return c.CustID, c
  27. }, custPCol)
  28. orderWithKeyPCol := beam.ParDo(s, func(o order) (int, order) {
  29. return o.Cust_ID, o
  30. }, orderPCol)
  31. resultPCol := beam.CoGroupByKey(s, custWithKeyPCol, orderWithKeyPCol)
  32. beam.ParDo0(s, func(CustID int, custIter func(*customer) bool, orderIter func(*order) bool) {
  33. c, o := customer{}, order{}
  34. for custIter(&c) {
  35. if ok := orderIter(&o); ok {
  36. fmt.Println(CustID, c.FName, o.OrderID, o.Amount)
  37. }
  38. fmt.Println(CustID, c.FName)
  39. }
  40. }, resultPCol)
英文:

try like this

  1. type resultType struct {
  2. CustID int
  3. FName string
  4. OrderID int
  5. Amount int
  6. }
  7. result := beam.ParDo(s, func(c customer, iterOrder func(*order) bool) resultType {
  8. var o order
  9. for iterOrder(&o) {
  10. if c.CustID == o.Cust_ID {
  11. return resultType{
  12. CustID: c.CustID,
  13. FName: c.FName,
  14. OrderID: o.OrderID,
  15. Amount: o.Amount,
  16. }
  17. }
  18. }
  19. return resultType{
  20. CustID: c.CustID,
  21. FName: c.FName,
  22. }
  23. }, custPCol, beam.SideInput{Input: orderPCol})

or if you want using CoGroupByKey ...

  1. custWithKeyPCol := beam.ParDo(s, func(c customer) (int, customer) {
  2. return c.CustID, c
  3. }, custPCol)
  4. orderWithKeyPCol := beam.ParDo(s, func(o order) (int, order) {
  5. return o.Cust_ID, o
  6. }, orderPCol)
  7. resultPCol := beam.CoGroupByKey(s, custWithKeyPCol, orderWithKeyPCol)
  8. beam.ParDo0(s, func(CustID int, custIter func(*customer) bool, orderIter func(*order) bool) {
  9. c, o := customer{}, order{}
  10. for custIter(&c) {
  11. if ok := orderIter(&o); ok {
  12. fmt.Println(CustID, c.FName, o.OrderID, o.Amount)
  13. }
  14. fmt.Println(CustID, c.FName)
  15. }
  16. }, resultPCol)

huangapple
  • 本文由 发表于 2023年2月2日 13:49:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/75319138.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定