英文:
Pyspark - Converting a stringtype nested json to columns in dataframe
问题
I understand that you want a Chinese translation for the provided code and description, excluding the code part. Here's the translation:
我正在处理通过Kafka表接收的CDC数据,并将其加载到Databricks Delta表中。除了使用from_json和spark.read.json时未加载的嵌套JSON字符串外,我已经使其正常工作。
当我尝试从第一层获取JSON的模式时,使用"spark.read.json(df.rdd.map(lambda row: row.value)).schema",列INPUT_DATA被视为加载为字符串对象。我提供了示例JSON字符串、我尝试的代码以及期望的结果。
我有许多要处理的主题,每个主题都可能有不同的模式,因此我希望能够动态处理,不想存储模式,因为模式可能随时间而变化,我希望我的代码能够自动处理这些变化。
非常感谢您的帮助,因为我已经花了整天的时间来弄清楚,仍在尝试中。提前感谢您的帮助。
示例带有嵌套树的JSON:
after = {
"id_transaction": "121",
"product_id": 25,
"transaction_dt": 1662076800000000,
"creation_date": 1662112153959000,
"product_account": "40012",
"input_data": "{\"amount\":[{\"type\":\"CASH\",\"amount\":1000.00}],\"currency\":\"USD\",\"coreData\":{\"CustId\":11021,\"Cust_Currency\":\"USD\",\"custCategory\":\"Premium\"},\"context\":{\"authRequired\":false,\"waitForConfirmation\":false,\"productAccount\":\"CA12001\"},\"brandId\":\"TOYO-2201\",\"dealerId\":\"1\",\"operationInfo\":{\"trans_Id\":\"3ED23-89DKS-001AA-2321\",\"transactionDate\":1613420860087},\"ip_address\":null,\"last_executed_step\":\"PURCHASE_ORDER_CREATED\",\"last_result\":\"OK\",\"output_dataholder\":\"{\"DISCOUNT_AMOUNT\":\"0\",\"BONUS_AMOUNT_APPLIED\":\"10000\"}\",
"dealer_id": 1,
"dealer_currency": "USD",
"Cust_id": 11021,
"process_status": "IN_PROGRESS",
"tot_amount": 10000,
"validation_result_code": "OK_SAVE_AND_PROCESS",
"operation": "Create",
"timestamp_ms": 1675673484042
}
我已创建以下脚本以获取JSON结构的所有列:
import json
json_keys = {}
child_members = []
table_column_schema = {}
column_schema = []
dbname = "mydb"
tbl_name = "tbl_name"
def get_table_keys(dbname):
table_values_extracted = "select value from {mydb}.{tbl_name} limit 1"
cmd_key_pair_data = spark.sql(table_values_extracted)
jsonkeys=cmd_key_pair_data.collect()[0][0]
json_keys = json.loads(jsonkeys)
column_names_as_keys = json_keys["after"].keys()
value_column_data = json_keys["after"].values()
column_schema = list(column_names_as_keys)
for i in value_column_data:
if ("{" in str(i) and "}" in str(i)):
a = json.loads(i)
for i2 in a.values():
if (str(i2).startswith("{") and str(i2).endswith('}')):
column_schema = column_schema + list(i2.keys())
table_column_schema['temp_table1'] = column_schema
return 0
get_table_keys("dbname")
以下代码用于处理JSON并创建包含所有嵌套JSON列的数据帧:
from pyspark.sql.functions import from_json, to_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType, MapType
import time
dbname = "mydb"
tbl_name = "tbl_name"
start = time.time()
df = spark.sql(f'select value from {mydb}.{tbl_name} limit 2')
tbl_columns = table_column_schema[tbl_name]
data = []
for i in tbl_columns:
if i == 'input_data':
data.append(StructField(f'{i}', MapType(StringType(), StringType()), True))
else:
data.append(StructField(f'{i}', StringType(), True))
schema2 = spark.read.json(df.rdd.map(lambda row: row.value)).schema
print(type(schema2))
df2 = df.withColumn("value", from_json("value", schema2)).select(col('value.after.*'), col('value.op'))
当前数据帧输出:
期望的数据帧输出:
英文:
I am working on processing a CDC data recieved via kafka tables, and load them into databricks delta tables. I am able to get it working all, except for a nested JSON string which is not getting loaded when using from_json, spark.read.json.
When I try to fetch schema of the json from level 1, using "spark.read.json(df.rdd.map(lambda row: row.value)).schema", the column INPUT_DATA is considered as string loaded as a string object. Am giving sample json string, the code that I tried, and the expected results.
I have many topics to process and each topic will have different schema, so I would like to process dynamically, and do not prefer to store the schemas, since the schema may change over time, and i would like to have my code handle the changes automatically.
Appreciate any help as I have spent whole day to figure out, and still trying. Thanks in advance.
Sample Json with nested tree:
after = {
"id_transaction": "121",
"product_id": 25,
"transaction_dt": 1662076800000000,
"creation_date": 1662112153959000,
"product_account": "40012",
"input_data": "{\"amount\":[{\"type\":\"CASH\",\"amount\":1000.00}],\"currency\":\"USD\",\"coreData\":{\"CustId\":11021,\"Cust_Currency\":\"USD\",\"custCategory\":\"Premium\"},\"context\":{\"authRequired\":false,\"waitForConfirmation\":false,\"productAccount\":\"CA12001\"},\"brandId\":\"TOYO-2201\",\"dealerId\":\"1\",\"operationInfo\":{\"trans_Id\":\"3ED23-89DKS-001AA-2321\",\"transactionDate\":1613420860087},\"ip_address\":null,\"last_executed_step\":\"PURCHASE_ORDER_CREATED\",\"last_result\":\"OK\",\"output_dataholder\":\"{\"DISCOUNT_AMOUNT\":\"0\",\"BONUS_AMOUNT_APPLIED\":\"10000\"}",
"dealer_id": 1,
"dealer_currency": "USD",
"Cust_id": 11021,
"process_status": "IN_PROGRESS",
"tot_amount": 10000,
"validation_result_code": "OK_SAVE_AND_PROCESS",
"operation": "Create",
"timestamp_ms": 1675673484042
}
I have created following script to get all the columns of the json structure:
import json
# table_column_schema = {}
json_keys = {}
child_members = []
table_column_schema = {}
column_schema = []
dbname = "mydb"
tbl_name = "tbl_name"
def get_table_keys(dbname):
table_values_extracted = "select value from {mydb}.{tbl_name} limit 1"
cmd_key_pair_data = spark.sql(table_values_extracted)
jsonkeys=cmd_key_pair_data.collect()[0][0]
json_keys = json.loads(jsonkeys)
column_names_as_keys = json_keys["after"].keys()
value_column_data = json_keys["after"].values()
column_schema = list(column_names_as_keys)
for i in value_column_data:
if ("{" in str(i) and "}" in str(i)):
a = json.loads(i)
for i2 in a.values():
if (str(i2).startswith("{") and str(i2).endswith('}')):
column_schema = column_schema + list(i2.keys())
table_column_schema['temp_table1'] = column_schema
return 0
get_table_keys("dbname")
The following code is used to process the json and create a dataframe with all nested jsons as the columns:
from pyspark.sql.functions import from_json, to_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType, MapType
import time
dbname = "mydb"
tbl_name = "tbl_name"
start = time.time()
df = spark.sql(f'select value from {mydb}.{tbl_name} limit 2')
tbl_columns = table_column_schema[tbl_name]
data = []
for i in tbl_columns:
if i == 'input_data':
# print('FOUND !!!!')
data.append(StructField(f'{i}', MapType(StringType(),StringType()), True))
else:
data.append(StructField(f'{i}', StringType(), True))
schema2 = spark.read.json(df.rdd.map(lambda row: row.value)).schema
print(type(schema2))
df2 = df.withColumn("value", from_json("value", schema2)).select(col('value.after.*'), col('value.op'))
Note: The VALUE is a column in my delta table (bronze layer)
答案1
得分: 1
你可以使用 rdd
获取架构,然后使用 from_json
以 JSON 格式读取值。
schema = spark.read.json(df.rdd.map(lambda r: r.input_data)).schema
df = df.withColumn('input_data', f.from_json('input_data', schema))
new_cols = df.columns + df.select('input_data.*').columns
df = df.select('*', 'input_data.*').toDF(*new_cols).drop('input_data')
df.show(truncate=False)
英文:
You can use rdd
to get the schema and from_json
to read the value as json.
schema = spark.read.json(df.rdd.map(lambda r: r.input_data)).schema
df = df.withColumn('input_data', f.from_json('input_data', schema))
new_cols = df.columns + df.select('input_data.*').columns
df = df.select('*', 'input_data.*').toDF(*new_cols).drop('input_data')
df.show(truncate=False)
+-------+----------------+---------------+---------+--------------+---------+--------------+---------------+----------+-------------+----------+----------------+----------------------+----------------+---------+-----------------------+---------------------+--------+--------+----------+----------------------+-----------+---------------------------------------+-----------------+
|Cust_id|creation_date |dealer_currency|dealer_id|id_transaction|operation|process_status|product_account|product_id|timestamp_ms |tot_amount|transaction_dt |validation_result_code|amount |brandId |context |coreData |currency|dealerId|ip_address|last_executed_step |last_result|operationInfo |output_dataholder|
+-------+----------------+---------------+---------+--------------+---------+--------------+---------------+----------+-------------+----------+----------------+----------------------+----------------+---------+-----------------------+---------------------+--------+--------+----------+----------------------+-----------+---------------------------------------+-----------------+
|11021 |1662112153959000|USD |1 |121 |Create |IN_PROGRESS |40012 |25 |1675673484042|10000 |1662076800000000|OK_SAVE_AND_PROCESS |[{1000.0, CASH}]|TOYO-2201|{false, CA12001, false}|{11021, USD, Premium}|USD |1 |null |PURCHASE_ORDER_CREATED|OK |{3ED23-89DKS-001AA-2321, 1613420860087}|{10000, 0} |
+-------+----------------+---------------+---------+--------------+---------+--------------+---------------+----------+-------------+----------+----------------+----------------------+----------------+---------+-----------------------+---------------------+--------+--------+----------+----------------------+-----------+---------------------------------------+-----------------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论