Pyspark – 将字符串类型的嵌套JSON转换为数据框中的列

huangapple go评论58阅读模式
英文:

Pyspark - Converting a stringtype nested json to columns in dataframe

问题

I understand that you want a Chinese translation for the provided code and description, excluding the code part. Here's the translation:

我正在处理通过Kafka表接收的CDC数据,并将其加载到Databricks Delta表中。除了使用from_json和spark.read.json时未加载的嵌套JSON字符串外,我已经使其正常工作。

当我尝试从第一层获取JSON的模式时,使用"spark.read.json(df.rdd.map(lambda row: row.value)).schema",列INPUT_DATA被视为加载为字符串对象。我提供了示例JSON字符串、我尝试的代码以及期望的结果。

我有许多要处理的主题,每个主题都可能有不同的模式,因此我希望能够动态处理,不想存储模式,因为模式可能随时间而变化,我希望我的代码能够自动处理这些变化。

非常感谢您的帮助,因为我已经花了整天的时间来弄清楚,仍在尝试中。提前感谢您的帮助。

示例带有嵌套树的JSON:

after = {
    "id_transaction": "121",
    "product_id": 25,
    "transaction_dt": 1662076800000000,
    "creation_date": 1662112153959000,
    "product_account": "40012",
    "input_data": "{\"amount\":[{\"type\":\"CASH\",\"amount\":1000.00}],\"currency\":\"USD\",\"coreData\":{\"CustId\":11021,\"Cust_Currency\":\"USD\",\"custCategory\":\"Premium\"},\"context\":{\"authRequired\":false,\"waitForConfirmation\":false,\"productAccount\":\"CA12001\"},\"brandId\":\"TOYO-2201\",\"dealerId\":\"1\",\"operationInfo\":{\"trans_Id\":\"3ED23-89DKS-001AA-2321\",\"transactionDate\":1613420860087},\"ip_address\":null,\"last_executed_step\":\"PURCHASE_ORDER_CREATED\",\"last_result\":\"OK\",\"output_dataholder\":\"{\"DISCOUNT_AMOUNT\":\"0\",\"BONUS_AMOUNT_APPLIED\":\"10000\"}\",
    "dealer_id": 1,
    "dealer_currency": "USD",
    "Cust_id": 11021,
    "process_status": "IN_PROGRESS",
    "tot_amount": 10000,
    "validation_result_code": "OK_SAVE_AND_PROCESS",
    "operation": "Create",
    "timestamp_ms": 1675673484042
}

我已创建以下脚本以获取JSON结构的所有列:

import json
json_keys = {}
child_members = []
table_column_schema = {}
column_schema = []
dbname = "mydb"
tbl_name = "tbl_name"

def get_table_keys(dbname):
    table_values_extracted = "select value from {mydb}.{tbl_name} limit 1"
    cmd_key_pair_data = spark.sql(table_values_extracted)
    jsonkeys=cmd_key_pair_data.collect()[0][0]
    json_keys = json.loads(jsonkeys)
    column_names_as_keys = json_keys["after"].keys()
    value_column_data = json_keys["after"].values()
    column_schema = list(column_names_as_keys)
    for i in value_column_data:
        if ("{" in str(i) and "}" in str(i)):
            a = json.loads(i)
            for i2 in a.values():
                if (str(i2).startswith("{") and str(i2).endswith('}')):
                    column_schema = column_schema + list(i2.keys())
    table_column_schema['temp_table1'] = column_schema
    return 0

get_table_keys("dbname")

以下代码用于处理JSON并创建包含所有嵌套JSON列的数据帧:

from pyspark.sql.functions import from_json, to_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType, MapType
import time

dbname = "mydb"
tbl_name = "tbl_name"
start = time.time()

df = spark.sql(f'select value from {mydb}.{tbl_name} limit 2')
tbl_columns = table_column_schema[tbl_name]

data = []
for i in tbl_columns:
    if i == 'input_data':
        data.append(StructField(f'{i}', MapType(StringType(), StringType()), True))
    else:
        data.append(StructField(f'{i}', StringType(), True))

schema2 = spark.read.json(df.rdd.map(lambda row: row.value)).schema
print(type(schema2))

df2 = df.withColumn("value", from_json("value", schema2)).select(col('value.after.*'), col('value.op'))

当前数据帧输出:

Pyspark – 将字符串类型的嵌套JSON转换为数据框中的列

期望的数据帧输出:

Pyspark – 将字符串类型的嵌套JSON转换为数据框中的列

英文:

I am working on processing a CDC data recieved via kafka tables, and load them into databricks delta tables. I am able to get it working all, except for a nested JSON string which is not getting loaded when using from_json, spark.read.json.

When I try to fetch schema of the json from level 1, using "spark.read.json(df.rdd.map(lambda row: row.value)).schema", the column INPUT_DATA is considered as string loaded as a string object. Am giving sample json string, the code that I tried, and the expected results.

I have many topics to process and each topic will have different schema, so I would like to process dynamically, and do not prefer to store the schemas, since the schema may change over time, and i would like to have my code handle the changes automatically.

Appreciate any help as I have spent whole day to figure out, and still trying. Thanks in advance.

Sample Json with nested tree:

after = {
"id_transaction": "121",
"product_id": 25,
"transaction_dt": 1662076800000000,
"creation_date": 1662112153959000,
"product_account": "40012",
"input_data": "{\"amount\":[{\"type\":\"CASH\",\"amount\":1000.00}],\"currency\":\"USD\",\"coreData\":{\"CustId\":11021,\"Cust_Currency\":\"USD\",\"custCategory\":\"Premium\"},\"context\":{\"authRequired\":false,\"waitForConfirmation\":false,\"productAccount\":\"CA12001\"},\"brandId\":\"TOYO-2201\",\"dealerId\":\"1\",\"operationInfo\":{\"trans_Id\":\"3ED23-89DKS-001AA-2321\",\"transactionDate\":1613420860087},\"ip_address\":null,\"last_executed_step\":\"PURCHASE_ORDER_CREATED\",\"last_result\":\"OK\",\"output_dataholder\":\"{\"DISCOUNT_AMOUNT\":\"0\",\"BONUS_AMOUNT_APPLIED\":\"10000\"}",
"dealer_id": 1,
"dealer_currency": "USD",
"Cust_id": 11021,
"process_status": "IN_PROGRESS",
"tot_amount": 10000,
"validation_result_code": "OK_SAVE_AND_PROCESS",
"operation": "Create",
"timestamp_ms": 1675673484042
}

I have created following script to get all the columns of the json structure:

import json
# table_column_schema = {}
json_keys = {}
child_members = []
table_column_schema = {}
column_schema = []
dbname = "mydb"
tbl_name = "tbl_name"
def get_table_keys(dbname):
table_values_extracted = "select value from {mydb}.{tbl_name} limit 1"
cmd_key_pair_data = spark.sql(table_values_extracted)
jsonkeys=cmd_key_pair_data.collect()[0][0]
json_keys = json.loads(jsonkeys)
column_names_as_keys = json_keys["after"].keys()
value_column_data = json_keys["after"].values()
column_schema = list(column_names_as_keys)
for i in value_column_data:
if ("{" in str(i) and "}" in str(i)):
a = json.loads(i)
for i2 in a.values():
if (str(i2).startswith("{") and str(i2).endswith('}')):
column_schema = column_schema + list(i2.keys())
table_column_schema['temp_table1'] = column_schema
return 0
get_table_keys("dbname")

The following code is used to process the json and create a dataframe with all nested jsons as the columns:

from pyspark.sql.functions import from_json, to_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType, MapType
import time
dbname = "mydb"
tbl_name = "tbl_name"
start = time.time()
df = spark.sql(f'select value from {mydb}.{tbl_name} limit 2')
tbl_columns = table_column_schema[tbl_name]
data = []
for i in tbl_columns:
if i == 'input_data':
#         print('FOUND !!!!')
data.append(StructField(f'{i}', MapType(StringType(),StringType()), True))
else:
data.append(StructField(f'{i}', StringType(), True))
schema2 = spark.read.json(df.rdd.map(lambda row: row.value)).schema
print(type(schema2))
df2 = df.withColumn("value", from_json("value", schema2)).select(col('value.after.*'), col('value.op'))

Note: The VALUE is a column in my delta table (bronze layer)

Current dataframe output:
Pyspark – 将字符串类型的嵌套JSON转换为数据框中的列

Expected dataframe output:
Pyspark – 将字符串类型的嵌套JSON转换为数据框中的列

答案1

得分: 1

你可以使用 rdd 获取架构,然后使用 from_json 以 JSON 格式读取值。

schema = spark.read.json(df.rdd.map(lambda r: r.input_data)).schema
df = df.withColumn('input_data', f.from_json('input_data', schema))

new_cols = df.columns + df.select('input_data.*').columns
df = df.select('*', 'input_data.*').toDF(*new_cols).drop('input_data')

df.show(truncate=False)
英文:

You can use rdd to get the schema and from_json to read the value as json.

schema = spark.read.json(df.rdd.map(lambda r: r.input_data)).schema
df = df.withColumn('input_data', f.from_json('input_data', schema))
new_cols = df.columns + df.select('input_data.*').columns
df = df.select('*', 'input_data.*').toDF(*new_cols).drop('input_data')
df.show(truncate=False)
+-------+----------------+---------------+---------+--------------+---------+--------------+---------------+----------+-------------+----------+----------------+----------------------+----------------+---------+-----------------------+---------------------+--------+--------+----------+----------------------+-----------+---------------------------------------+-----------------+
|Cust_id|creation_date   |dealer_currency|dealer_id|id_transaction|operation|process_status|product_account|product_id|timestamp_ms |tot_amount|transaction_dt  |validation_result_code|amount          |brandId  |context                |coreData             |currency|dealerId|ip_address|last_executed_step    |last_result|operationInfo                          |output_dataholder|
+-------+----------------+---------------+---------+--------------+---------+--------------+---------------+----------+-------------+----------+----------------+----------------------+----------------+---------+-----------------------+---------------------+--------+--------+----------+----------------------+-----------+---------------------------------------+-----------------+
|11021  |1662112153959000|USD            |1        |121           |Create   |IN_PROGRESS   |40012          |25        |1675673484042|10000     |1662076800000000|OK_SAVE_AND_PROCESS   |[{1000.0, CASH}]|TOYO-2201|{false, CA12001, false}|{11021, USD, Premium}|USD     |1       |null      |PURCHASE_ORDER_CREATED|OK         |{3ED23-89DKS-001AA-2321, 1613420860087}|{10000, 0}       |
+-------+----------------+---------------+---------+--------------+---------+--------------+---------------+----------+-------------+----------+----------------+----------------------+----------------+---------+-----------------------+---------------------+--------+--------+----------+----------------------+-----------+---------------------------------------+-----------------+

huangapple
  • 本文由 发表于 2023年2月10日 14:25:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/75407605.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定