Python异常 – 在Spark工作节点内引发了断言错误

huangapple go评论53阅读模式
英文:

PythonException -AssertionError raised within the Spark worker

问题

对于input_table中的每一行,应该在output_table中创建X行,其中X等于一年中的天数(从StartDate开始)。

Info字段应包含Y个字符,其中Y= X*2,如果字符少于Y,则应使用附加的#字符填充字段。

output_table中,AMPM列将以正确的顺序填充Info字符,以确保每个AMPM字段恰好有1个字符。

以下是代码:

从pyspark.sql导入SparkSession
从pyspark.sql.functions导入udf
从pyspark.sql.types导入IntegerTypeStringTypeDateTypeStructFieldStructTypeTimestampTypeArrayType

# 输入表的连接详细信息
url =...
user =...
password =...
input_table =...
output_table =...

# 为输入表定义模式
input_schema = StructType([
    StructField(“ID1”,IntegerType(),True),
    StructField(“ID2”,IntegerType(),True),
    StructField(“StartDate”,TimestampType(),True),
    StructField(“Info”,StringType(),True),
    StructField(“Extracted”,TimestampType(),True
]

# 为输出表定义模式
output_schema = StructType([
    StructField(“ID1”,IntegerType(),True),
    StructField(“ID2”,IntegerType(),True),
    StructField(“Date”,DateType(),True),
    StructField(“AM”,StringType(),True),
    StructField(“PM”,StringType(),True),
    StructField(“CurrentYear”,StringType(),True
]

# 初始化SparkSession
spark = SparkSession.builder.getOrCreate()

# 注册用于填充标记的UDF
pad_marks_udf = udflambda infodaysmarks.ljustdays'#'[: days]StringType())

# 注册用于创建行的UDF
create_rows_udf = udflambda start_datemarksdays[(start_date + iinfo[i]info[i + 1]) for i in range0days2]
                     ArrayTypeStructType[
                         StructField(“Date”,DateType(),True),
                         StructField(“AM”,StringType(),True),
                         StructField(“PM”,StringType(),True),
                     ]))

# 定义填充标记和创建行的函数
def process_rowrow):
    id1 = row[ID1]
    id2 = row[ID2]
    start_date = row[StartDate]
    info = row[info]
    extracted = row[Extracted]

    # 计算天数*2
    days =start_date.year4 == 0 and 366或365* 2

    # 填充信息
    padded_info = pad_info_udfinfodays

    # 创建行
    rows = create_rows_udfstart_datepadded_infodays

    # 准备输出行
    output_rows = []
    for r in rows
        date = r[Date]
        am = r[AM]
        pm = r[PM]
        current_year = f{current_year .year}/{current_year .year + 1}

        output_rows.append((id1id2dateampmcurrent_year))

    返回output_rows

# 将输入表加载为DataFrame
df_input = spark.read \
    .format(“jdbc”) \
    .option(“url”,url \
    .option(“dbtable”,input_table \
    .option(“user”,user \
    .option(“password”,password \
    .schemainput_schema \
    .load()

# 将处理应用于输入表
output_rows = df_input.rdd.flatMapprocess_row

# 从输出行创建DataFrame
df_output = spark.createDataFrameoutput_rowsoutput_schema

# 将DataFrame写入输出表
df_output.write \
    .format(“jdbc”) \
    .option(“url”,url \
    .option(“user”,user \
    .option(“password”,password \
    .option(“dbtable”,output_table \
    .mode(“append”) \
    .save()
英文:

For every row in input_table there should be created X amount of rows in output_table, where X=days in year (from StartDate)

Info field should contain Y amount of characters, where Y= X*2, if there are less, field should be padded with additional # characters.

In output_table AM and PM columns will be filled with Info characters in the correct order, so that each AM & PM fields will have exactly 1 character.

Here is the code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType, DateType, StructField, StructType, TimestampType, ArrayType
# Connection details for input table
url="..."
user="..."
password="..."
input_table="..."
output_table="..."
# Define schema for input table
input_schema = StructType([
StructField("ID1", IntegerType(), True),
StructField("ID2", IntegerType(), True),
StructField("StartDate", TimestampType(), True),
StructField("Info", StringType(), True),
StructField("Extracted", TimestampType(), True)
])
# Define schema for output table
output_schema = StructType([
StructField("ID1", IntegerType(), True),
StructField("ID2", IntegerType(), True),
StructField("Date", DateType(), True),
StructField("AM", StringType(), True),
StructField("PM", StringType(), True),
StructField("CurrentYear", StringType(), True)
])
# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()
# Register UDF for padding marks
pad_marks_udf = udf(lambda info, days: marks.ljust(days, '#')[:days], StringType())
# Register UDF for creating rows
create_rows_udf = udf(lambda start_date, marks, days: [(start_date + i, info[i], info[i + 1]) for i in range(0, days, 2)],
ArrayType(StructType([
StructField("Date", DateType(), True),
StructField("AM", StringType(), True),
StructField("PM", StringType(), True),
])))
# Define function to pad marks and create rows
def process_row(row):
id1 = row["ID1"]
id2 = row["ID2"]
start_date = row["StartDate"]
info= row["info"]
extracted = row["Extracted"]
# Calculate number of days * 2
days = (start_date.year % 4 == 0 and 366 or 365) * 2
# Pad info
padded_info = pad_info_udf(info, days)
# Create rows
rows = create_rows_udf(start_date, padded_info, days)
# Prepare output rows
output_rows = []
for r in rows:
date = r["Date"]
am = r["AM"]
pm = r["PM"]
current_year = f"{current_year .year}/{current_year .year + 1}"
output_rows.append((id1, id2, date, am, pm, current_year))
return output_rows
# Load input table as DataFrame
df_input = spark.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable", input_table) \
.option("user", user) \
.option("password", password) \
.schema(input_schema) \
.load()
# Apply processing to input table
output_rows = df_input.rdd.flatMap(process_row)
# Create DataFrame from output rows
df_output = spark.createDataFrame(output_rows, output_schema)
# Write DataFrame to output table
df_output.write \
.format("jdbc") \
.option("url", url) \
.option("user", user) \
.option("password", password) \
.option("dbtable", output_table) \
.mode("append") \
.save()

Similar code works in Python with no problems, but when translated to PySpark throws an AssertionError. It needs to do no modification in input_table and append output_table with modified rows from input_table.

答案1

得分: 0

代码不应该在RDD函数中使用Spark UDF,而应该使用普通函数。Spark UDF只能在Spark SQL中使用。

在本地模式下,代码之所以能够运行,是因为执行程序与驱动程序在同一个JVM中。

英文:

So the reason is that code is not supposed to use Spark UDF in functions for RDDs. The plain functions should be used instead. Spark UDF can only be used in Spark SQL.

The reason the code worked in the local machine is because in the local mode the executor is in the same JVM as the driver.

huangapple
  • 本文由 发表于 2023年5月24日 21:19:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/76324006.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定