Spark Streaming 在写入数据库时,forEachBatch 提供的结果不一致/无序。

huangapple go评论69阅读模式
英文:

Spark streaming forEachBatch giving inconsistent/unordered result while writing to database

问题

问题:
在单个流中接收到多个表/模式数据。
现在,在对数据进行分离后,我为每个表打开了一个并行写入流。

我在forEachBatch中使用的函数是:

def writeToAurora(df, batch_id, tableName):
    df = df.persist()
    stagingTable = f'{str(tableName.lower())}_delta'
    
    df.write \
            .mode("overwrite") \
            .format("jdbc") \
            .option("truncate", "true") \
            .option("driver", DB_conf['DRIVER']) \
            .option("batchsize", 1000) \
            .option("url", DB_conf['URL']) \
            .option("dbtable", stagingTable) \
            .option("user", DB_conf['USER_ID']) \
            .option("password", DB_conf['PASSWORD']) \
            .save() 
    df.unpersist()

用于打开多个写入流的逻辑是:

data_df = spark.readStream.format("kinesis") \
    .option("streamName", stream_name) \
    .option("startingPosition", initial_position) \
    .load()

# 区分每个表的数据框
distinctTables = ['Table1', 'Table2', 'Table3']
tablesDF = {table: data_df.filter(f"TableName = '{table}'") for table in distinctTables}

# 处理每个表
for table, tableDF in tablesDF.items():
    df = tableDF.withColumn('csvData', F.from_csv('finalData', schema=tableSchema[table], options={'sep': '|','quote': '"'}))\
        .select('csvData.*')

    vars()[table+'_query'] = df.writeStream\
                .trigger(processingTime='120 seconds') \
                .foreachBatch(lambda fdf, batch_id: writeToAurora(fdf, batch_id, table)) \
                .option("checkpointLocation", f"s3://{bucket}/temporary/checkpoint/{table}")\
                .start()

for table in tablesDF.keys():
    eval(table+'_query').awaitTermination()

问题:
现在,在运行上述代码时,有时Table1的数据被加载到Table2中,而且每次代码运行时顺序都不同。
在数据框和应该加载到其中的表之间没有保持顺序。

需要帮助理解为什么会发生这种情况。

英文:

Problem:
I am receiving multiple table/schema data in a single stream.
Now after segregating the data I am opening a parallel write stream for each table.

The function I used in forEachBatch is:

def writeToAurora(df, batch_id, tableName):
    df = df.persist()
    stagingTable = f'{str(tableName.lower())}_delta'
    
    df.write \
            .mode("overwrite") \
            .format("jdbc") \
            .option("truncate", "true") \
            .option("driver", DB_conf['DRIVER']) \
            .option("batchsize", 1000) \
            .option("url", DB_conf['URL']) \
            .option("dbtable", stagingTable) \
            .option("user", DB_conf['USER_ID']) \
            .option("password", DB_conf['PASSWORD']) \
            .save() 
	df.unpersist()

The logic to open multiple writestreams is

data_df = spark.readStream.format("kinesis") \
	.option("streamName", stream_name) \
	.option("startingPosition", initial_position) \
	.load()

				 
#Distinguishing table wise df
distinctTables = ['Table1', 'Table2', 'Table3']
tablesDF = {table: data_df.filter(f"TableName = '{table}'") for table in distinctTables}


#Processing Each Table
for table, tableDF in tablesDF.items():
	df = tableDF.withColumn('csvData', F.from_csv('finalData', schema=tableSchema[table], options={'sep': '|','quote': '"'}))\
		.select('csvData.*')


	vars()[table+'_query'] = df.writeStream\
				.trigger(processingTime='120 seconds') \
				.foreachBatch(lambda fdf, batch_id: writeToAurora(fdf, batch_id, table)) \
				.option("checkpointLocation", f"s3://{bucket}/temporary/checkpoint/{table}")\
				.start()
				
				
for table in tablesDF.keys():
	eval(table+'_query').awaitTermination()

Issue:
Now when running the above code sometimes the table1 is getting loaded in table2 and the order is differenet each time the code runs.
The order is not maintained between the dataframe and the table in which it should be loaded.

Need help on understanding why this is happening.

答案1

得分: 3

这是由于在foreachBatch方法中,你的lambda函数使用了late binding导致的。

以下是一个示例。这将尝试将所有表写入"t2",但会失败(实际上只写入了"t2"表,但写入了"t0"数据):

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToTable(df, epochId, table_name):
  df.write.mode("overwrite").saveAsTable(f"custanwo.dsci.stream_test_{table_name}")

data_df = spark.readStream.format("rate").load()
data_df = (data_df
           .selectExpr("value % 10 as key")
           .groupBy("key")
           .count()
           .withColumn("t", concat(lit("t"), (col("key") % 3).astype("string")))
)

table_names = ["t0", "t1", "t2"]
table_df = {t: data_df.filter(f"t = '{t}'") for t in table_names}

for t, df in table_df.items():
    vars()[f"{t}_query"] = (df
                            .writeStream
                            .foreachBatch(lambda df, epochId: writeToTable(df, epochId, t))
                            .outputMode("update")
                            .start()
                            )

为了解决这个问题,有几种选项之一是使用partial

from functools import partial

def writeToTable(df, epochId, table_name):
  df.write.mode("overwrite").saveAsTable(f"custanwo.dsci.stream_test_{table_name}")

data_df = spark.readStream.format("rate").load()
data_df = (data_df
           .selectExpr("value % 10 as key")
           .groupBy("key")
           .count()
           .withColumn("t", concat(lit("t"), (col("key") % 3).astype("string")))
) 

table_names = ["t0", "t1", "t2"]
table_df = {t: data_df.filter(f"t = '{t}'") for t in table_names}

for t, df in table_df.items():
    vars()[f"{t}_query"] = (df
                            .writeStream
                            .foreachBatch(partial(writeToTable, table_name=t))
                            .outputMode("update")
                            .start()
                            )

在你的代码中,重写你的writeStream如下:

    vars()[table+'_query'] = df.writeStream\
                .trigger(processingTime='120 seconds') \
                .foreachBatch(partial(writeToAurora, tableName = table)) \
                .option("checkpointLocation", f"s3://{bucket}/temporary/checkpoint/{table}")\
                .start()
英文:

This is caused by late binding for your lambda function in the foreachBatch method.

Here's an example. This will try and write all tables to "t2", and fails (actually only writing the "t2" table, but writing the "t0" data:

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToTable(df, epochId, table_name):
  df.write.mode("overwrite").saveAsTable(f"custanwo.dsci.stream_test_{table_name}")

data_df = spark.readStream.format("rate").load()
data_df = (data_df
           .selectExpr("value % 10 as key")
           .groupBy("key")
           .count()
           .withColumn("t", concat(lit("t"),(col("key")%3).astype("string")))
)

table_names = ["t0", "t1", "t2"]
table_df = {t: data_df.filter(f"t = '{t}'") for t in table_names}

for t, df in table_df.items():
    vars()[f"{t}_query"] = (df
                            .writeStream
                            .foreachBatch(lambda df, epochId: writeToTable(df, epochId, t))
                            .outputMode("update")
                            .start()
                            )

To resolve this there are a few options. One is using partial:

from functools import partial

def writeToTable(df, epochId, table_name):
  df.write.mode("overwrite").saveAsTable(f"custanwo.dsci.stream_test_{table_name}")

data_df = spark.readStream.format("rate").load()
data_df = (data_df
           .selectExpr("value % 10 as key")
           .groupBy("key")
           .count()
           .withColumn("t", concat(lit("t"),(col("key")%3).astype("string")))
) 

table_names = ["t0", "t1", "t2"]
table_df = {t: data_df.filter(f"t = '{t}'") for t in table_names}

for t, df in table_df.items():
    vars()[f"{t}_query"] = (df
                            .writeStream
                            .foreachBatch(partial(writeToTable, table_name=t))
                            .outputMode("update")
                            .start()
                            )

In your code, rewrite your writeStream to:

    vars()[table+'_query'] = df.writeStream\
                .trigger(processingTime='120 seconds') \
                .foreachBatch(partial(writeToAurora, tableName = table)) \
                .option("checkpointLocation", f"s3://{bucket}/temporary/checkpoint/{table}")\
                .start()

答案2

得分: -1

def writeToAurora(df, batch_id, tableName):
    df = df.withColumn("TableName", F.lit(tableName))  # Add the TableName column to the DataFrame
    df = df.persist()
    stagingTable = f'{str(tableName.lower())}_delta'
    
    df.write \
        .mode("overwrite") \
        .format("jdbc") \
        .option("truncate", "true") \
        .option("driver", DB_conf['DRIVER']) \
        .option("batchsize", 1000) \
        .option("url", DB_conf['URL']) \
        .option("dbtable", stagingTable) \
        .option("user", DB_conf['USER_ID']) \
        .option("password", DB_conf['PASSWORD']) \
        .save() 
    df.unpersist()
英文:
def writeToAurora(df, batch_id, tableName):
    df = df.withColumn("TableName", F.lit(tableName))  # Add the TableName column to the DataFrame
    df = df.persist()
    stagingTable = f'{str(tableName.lower())}_delta'
    
    df.write \
        .mode("overwrite") \
        .format("jdbc") \
        .option("truncate", "true") \
        .option("driver", DB_conf['DRIVER']) \
        .option("batchsize", 1000) \
        .option("url", DB_conf['URL']) \
        .option("dbtable", stagingTable) \
        .option("user", DB_conf['USER_ID']) \
        .option("password", DB_conf['PASSWORD']) \
        .save() 
    df.unpersist()

With this change, the DataFrame df passed to the writeToAurora function will now have an additional column named "TableName" containing the name of the table for which the data belongs. The writeToAurora function will then use this information to write the data to the appropriate staging table in Aurora.

huangapple
  • 本文由 发表于 2023年7月17日 17:51:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/76703269.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定