英文:
spark Connect Two Database tables to produce a third data
问题
DataFrameLoadedFromLeftDatabase = 使用 DataFrameReader
从第一个数据库(LeftDB)加载的数据。
我需要:
- 遍历这个数据框中的每一行,
- 连接到第二个数据库(RightDB),
- 从RightDB中查找一些匹配的记录,
- 并执行一些业务逻辑。
这是一个迭代操作,因此不能简单地通过LeftDB和RightDB之间的JOIN来查找一些新字段,创建一个名为targetDF的新数据框,然后使用DataframeWriter
写入第三个数据库(ThirdDB)。
我知道我可以使用:
val targetDF = DataFrameLoadedFromLeftDatabase.mapPartitions(
partition => {
val rightDBconnection = new DbConnection // 建立到RightDB的连接
val result = partition.map(record => {
readMatchingFromRightDBandDoBusinessLogicTransformationAndReturnAList(record, rightDBconnection)
}).toList
rightDBconnection.close()
result.iterator
}
).toDF()
targetDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "table3")
.option("user", "username")
.option("password", "password")
.save()
- 我想知道Apache Spark是否适用于这种类型的互动数据处理应用程序。
- 我担心在这种方法中遍历RightDB中的每条记录是否会导致太多的互动。
- 我期待一些改进这个设计的建议,以充分利用Spark的能力。我还希望确保处理不会因性能原因引起过多的Shuffle操作。
英文:
DataFrameLoadedFromLeftDatabase=data loaded using DataFrameReader
from first database say LeftDB.
I need to
- iterate through each row in this dataframe,
- connect to a second database say RightDB,
- find some matching record from RightDB,
- and do some business logic
This is an iterative operation so it is not simply doable with a JOIN between LeftDB and RightDB to find some new fields, create a New Dataframe targetDF and write into a third Database say ThirdDB using DataframeWriter
I know that I can use
<!-- language: scala -->
val targetDF = DataFrameLoadedFromLeftDatabase.mapPartitions(
partition => {
val rightDBconnection = new DbConnection // establish a connection to RightDB
val result = partition.map(record => {
readMatchingFromRightDBandDoBusinessLogicTransformationAndReturnAList(record, rightDBconnection)
}).toList
rightDBconnection.close()
result.iterator
}
).toDF()
targetDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "table3")
.option("user", "username")
.option("password", "password")
.save()
- I am wondering whether apache spark is suitable for these type of chatty data processing applications
- I am wondering whether interating throguh each record in RightDB will be too chatty in this approach
- I am looking forward with some advices to improve this design to make use of SPARK capabilites. I also wanted to make sure the processing do not cause too much shuffle operations for performance reasons
Ref: Related SO Post
答案1
得分: 1
在这种情况下,我们通常喜欢使用 spark.sql
。基本上,定义两个不同的数据框(DFs)并根据查询将它们连接起来,然后可以在之后应用你的业务逻辑。
例如:
import org.apache.spark.sql.{DataFrame, SparkSession}
// 在这里添加你的列
case class MyResult(ID: String, NAME: String)
// 创建一个 SparkSession
val spark = SparkSession.builder()
.appName("Join Tables and Add Prefix to ID Column")
.config("spark.master", "local[*]")
.getOrCreate()
// 从 DB1 读取第一个表
val firstTable: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost/DB1")
.option("dbtable", "FIRST_TABLE")
.option("user", "your_username")
.option("password", "your_password")
.load()
firstTable.createOrReplaceTempView("firstTable")
// 从 DB2 读取第二个表
val secondTable: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost/DB2")
.option("dbtable", "SECOND_TABLE")
.option("user", "your_username")
.option("password", "your_password")
.load()
secondTable.createOrReplaceTempView("secondTable")
// 在这里应用你的筛选
val result: DataFrame = spark.sql("SELECT f.*, s.* FROM firstTable as f left join secondTable as s on f.ID = s.ID")
val finalData = result.as[MyResult]
.map{record =>
// 执行你的业务逻辑
businessLogic(record)
}
// 将结果写入 DB3 中的第三个表
finalData.write
.format("jdbc")
.option("url", "jdbc:postgresql://localhost/DB3")
.option("dbtable", "THIRD_TABLE")
.option("user", "your_username")
.option("password", "your_password")
.save()
如果你的表很大,你可以执行一个查询并直接读取其结果。这样做可以通过按日期等进行筛选来减小输入大小:
val myQuery = """
(select * from table
where // 在这里进行筛选
) foo
"""
val df = sqlContext.format("jdbc").
option("url", "jdbc:postgresql://localhost/DB").
option("user", "your_username")
.option("password", "your_password")
.option("dbtable", myQuery)
.load()
除此之外,通过 Spark 直接进行记录特定操作是比较困难的。你必须维护你的客户端连接等自定义逻辑。Spark 设计用于读写大量数据,它创建管道用于此目的。对于它来说,简单操作将成为负担。总是在你的 map
函数中执行 API 调用(或单个数据库调用)。如果在其中使用缓存层,对性能来说可能是救命的。总是尝试在自定义数据库调用中使用连接池,否则 Spark 将尝试使用不同的连接执行所有映射操作,这可能会对你的数据库造成压力并导致连接失败。
英文:
At this kind of situations we always prefer spark.sql
. Basically define two different DFs and join them based on a query then you can apply your business logic afterwards.
For example;
import org.apache.spark.sql.{DataFrame, SparkSession}
// Add your columns here
case class MyResult(ID: String, NAME: String)
// Create a SparkSession
val spark = SparkSession.builder()
.appName("Join Tables and Add Prefix to ID Column")
.config("spark.master", "local[*]")
.getOrCreate()
// Read the first table from DB1
val firstTable: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost/DB1")
.option("dbtable", "FIRST_TABLE")
.option("user", "your_username")
.option("password", "your_password")
.load()
firstTable.createOrReplaceTempView("firstTable")
// Read the second table from DB2
val secondTable: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost/DB2")
.option("dbtable", "SECOND_TABLE")
.option("user", "your_username")
.option("password", "your_password")
.load()
secondTable.createOrReplaceTempView("secondTable")
// Apply you filtering here
val result: DataFrame = spark.sql("SELECT f.*, s.* FROM firstTable as f left join secondTable as s on f.ID = s.ID")
val finalData = result.as[MyResult]
.map{record=>
// Do your business logic
businessLogic(record)
}
// Write the result to the third table in DB3
finalData.write
.format("jdbc")
.option("url", "jdbc:postgresql://localhost/DB3")
.option("dbtable", "THIRD_TABLE")
.option("user", "your_username")
.option("password", "your_password")
.save()
If your tables are big, you can execute a query and read its results directly. If you do this you can reduce your input sizes by filtering by dates etc:
val myQuery = """
(select * from table
where // do your filetering here
) foo
"""
val df = sqlContext.format("jdbc").
option("url", "jdbc:postgresql://localhost/DB").
.option("user", "your_username")
.option("password", "your_password")
.option("dbtable", myQuery)
.load()
Other than this, it is hard to do record specific operations directly via spark. You have to maintain your client connections etc as custom logics. Spark designed to read/write huge amounts data. It creates pipelines for this purpose. Simple operations will be an overhead for it. Always do your API calls (or single DB calls) in your map
functions. If you use a cache layer in there, it could be life saving in terms of performance. Always try to use a connection pool
in your custom database calls, otherwise spark will try to execute all of mapping operations with different connections which may create a pressure on your database and cause connection failures.
答案2
得分: 0
我想到了很多改进的方法,但总的来说,它们都将取决于在HDFS、HBase、Hive数据库、MongoDB等中预先分布数据。
我的意思是:你在考虑“具有分布式处理思维的关联数据”...我以为我们已经超越了这一点 XD
英文:
Can think of a lot of improvements but in general all of them are going to depend on having the data pre-distributed in a HDFS, HBase, Hive database, MongoDB,...
I mean: You are thinking "relational data with distributed processing mindset" ... I though we were already beyond that XD
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论