英文:
PythonException -AssertionError raised within the Spark worker
问题
对于input_table
中的每一行,应该在output_table
中创建X行,其中X等于一年中的天数(从StartDate
开始)。
Info
字段应包含Y个字符,其中Y= X*2,如果字符少于Y,则应使用附加的#
字符填充字段。
在output_table
中,AM
和PM
列将以正确的顺序填充Info
字符,以确保每个AM
和PM
字段恰好有1个字符。
以下是代码:
从pyspark.sql导入SparkSession
从pyspark.sql.functions导入udf
从pyspark.sql.types导入IntegerType,StringType,DateType,StructField,StructType,TimestampType,ArrayType
# 输入表的连接详细信息
url =“...”
user =“...”
password =“...”
input_table =“...”
output_table =“...”
# 为输入表定义模式
input_schema = StructType([
StructField(“ID1”,IntegerType(),True),
StructField(“ID2”,IntegerType(),True),
StructField(“StartDate”,TimestampType(),True),
StructField(“Info”,StringType(),True),
StructField(“Extracted”,TimestampType(),True)
]
# 为输出表定义模式
output_schema = StructType([
StructField(“ID1”,IntegerType(),True),
StructField(“ID2”,IntegerType(),True),
StructField(“Date”,DateType(),True),
StructField(“AM”,StringType(),True),
StructField(“PM”,StringType(),True),
StructField(“CurrentYear”,StringType(),True)
]
# 初始化SparkSession
spark = SparkSession.builder.getOrCreate()
# 注册用于填充标记的UDF
pad_marks_udf = udf(lambda info,days:marks.ljust(days,'#')[: days],StringType())
# 注册用于创建行的UDF
create_rows_udf = udf(lambda start_date,marks,days:[(start_date + i,info[i],info[i + 1]) for i in range(0,days,2)],
ArrayType(StructType([
StructField(“Date”,DateType(),True),
StructField(“AM”,StringType(),True),
StructField(“PM”,StringType(),True),
]))
# 定义填充标记和创建行的函数
def process_row(row):
id1 = row[“ID1”]
id2 = row[“ID2”]
start_date = row[“StartDate”]
info = row[“info”]
extracted = row[“Extracted”]
# 计算天数*2
days =(start_date.year%4 == 0 and 366或365)* 2
# 填充信息
padded_info = pad_info_udf(info,days)
# 创建行
rows = create_rows_udf(start_date,padded_info,days)
# 准备输出行
output_rows = []
for r in rows:
date = r[“Date”]
am = r[“AM”]
pm = r[“PM”]
current_year = f“{current_year .year}/{current_year .year + 1}”
output_rows.append((id1,id2,date,am,pm,current_year))
返回output_rows
# 将输入表加载为DataFrame
df_input = spark.read \
.format(“jdbc”) \
.option(“url”,url) \
.option(“dbtable”,input_table) \
.option(“user”,user) \
.option(“password”,password) \
.schema(input_schema) \
.load()
# 将处理应用于输入表
output_rows = df_input.rdd.flatMap(process_row)
# 从输出行创建DataFrame
df_output = spark.createDataFrame(output_rows,output_schema)
# 将DataFrame写入输出表
df_output.write \
.format(“jdbc”) \
.option(“url”,url) \
.option(“user”,user) \
.option(“password”,password) \
.option(“dbtable”,output_table) \
.mode(“append”) \
.save()
英文:
For every row in input_table
there should be created X amount of rows in output_table
, where X=days in year (from StartDate
)
Info
field should contain Y amount of characters, where Y= X*2, if there are less, field should be padded with additional #
characters.
In output_table
AM
and PM
columns will be filled with Info
characters in the correct order, so that each AM
& PM
fields will have exactly 1 character.
Here is the code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType, DateType, StructField, StructType, TimestampType, ArrayType
# Connection details for input table
url="..."
user="..."
password="..."
input_table="..."
output_table="..."
# Define schema for input table
input_schema = StructType([
StructField("ID1", IntegerType(), True),
StructField("ID2", IntegerType(), True),
StructField("StartDate", TimestampType(), True),
StructField("Info", StringType(), True),
StructField("Extracted", TimestampType(), True)
])
# Define schema for output table
output_schema = StructType([
StructField("ID1", IntegerType(), True),
StructField("ID2", IntegerType(), True),
StructField("Date", DateType(), True),
StructField("AM", StringType(), True),
StructField("PM", StringType(), True),
StructField("CurrentYear", StringType(), True)
])
# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()
# Register UDF for padding marks
pad_marks_udf = udf(lambda info, days: marks.ljust(days, '#')[:days], StringType())
# Register UDF for creating rows
create_rows_udf = udf(lambda start_date, marks, days: [(start_date + i, info[i], info[i + 1]) for i in range(0, days, 2)],
ArrayType(StructType([
StructField("Date", DateType(), True),
StructField("AM", StringType(), True),
StructField("PM", StringType(), True),
])))
# Define function to pad marks and create rows
def process_row(row):
id1 = row["ID1"]
id2 = row["ID2"]
start_date = row["StartDate"]
info= row["info"]
extracted = row["Extracted"]
# Calculate number of days * 2
days = (start_date.year % 4 == 0 and 366 or 365) * 2
# Pad info
padded_info = pad_info_udf(info, days)
# Create rows
rows = create_rows_udf(start_date, padded_info, days)
# Prepare output rows
output_rows = []
for r in rows:
date = r["Date"]
am = r["AM"]
pm = r["PM"]
current_year = f"{current_year .year}/{current_year .year + 1}"
output_rows.append((id1, id2, date, am, pm, current_year))
return output_rows
# Load input table as DataFrame
df_input = spark.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable", input_table) \
.option("user", user) \
.option("password", password) \
.schema(input_schema) \
.load()
# Apply processing to input table
output_rows = df_input.rdd.flatMap(process_row)
# Create DataFrame from output rows
df_output = spark.createDataFrame(output_rows, output_schema)
# Write DataFrame to output table
df_output.write \
.format("jdbc") \
.option("url", url) \
.option("user", user) \
.option("password", password) \
.option("dbtable", output_table) \
.mode("append") \
.save()
Similar code works in Python
with no problems, but when translated to PySpark
throws an AssertionError
. It needs to do no modification in input_table
and append output_table
with modified rows from input_table
.
答案1
得分: 0
代码不应该在RDD函数中使用Spark UDF,而应该使用普通函数。Spark UDF只能在Spark SQL中使用。
在本地模式下,代码之所以能够运行,是因为执行程序与驱动程序在同一个JVM中。
英文:
So the reason is that code is not supposed to use Spark UDF in functions for RDDs. The plain functions should be used instead. Spark UDF can only be used in Spark SQL.
The reason the code worked in the local machine is because in the local mode the executor is in the same JVM as the driver.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论