How to generate Pyspark dynamic frame name dynamically

huangapple go评论81阅读模式
英文:

How to generate Pyspark dynamic frame name dynamically

问题

我有一个表其中的数据如图所示我想创建动态生成的数据框名称来存储结果

例如在下面的示例中我想创建两个不同的数据框名称 dnb_df 和 es_df并将读取结果存储在这两个框架中并打印每个数据框的结构

当我运行下面的代码时出现错误

> SyntaxError: can't assign to operator (TestGlue2.py, line 66)

英文:

How to generate Pyspark dynamic frame name dynamically

I have a table which has data as shown in the diagram . I want to create store results in dynamically generated data frame names.

For eg here in the below example I want to create two different data frame name
dnb_df and es_df and store the read result in these two frames and print structure of each data frame

When I am running the below code getting the error

> SyntaxError: can't assign to operator (TestGlue2.py, line 66)


import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import regexp_replace, col


args = getResolvedOptions(sys.argv, ['JOB_NAME'])





sc = SparkContext()
#sc.setLogLevel('DEBUG')
glueContext = GlueContext(sc)
spark = glueContext.spark_session

#logger = glueContext.get_logger()
#logger.DEBUG('Hello Glue')
job = Job(glueContext)
job.init(args["JOB_NAME"], args)



client = boto3.client('glue', region_name='XXXXXX')
response = client.get_connection(Name='XXXXXX')
connection_properties = response['Connection']['ConnectionProperties']
URL = connection_properties['JDBC_CONNECTION_URL']
url_list = URL.split("/")
host = "{}".format(url_list[-2][:-5])
new_host=host.split('@',1)[1]
port = url_list[-2][-4:]
database = "{}".format(url_list[-1])
Oracle_Username = "{}".format(connection_properties['USERNAME'])
Oracle_Password = "{}".format(connection_properties['PASSWORD'])

#print("Oracle_Username:",Oracle_Username)
#print("Oracle_Password:",Oracle_Password)
print("Host:",host)
print("New Host:",new_host)
print("Port:",port)
print("Database:",database)
Oracle_jdbc_url="jdbc:oracle:thin:@//"+new_host+":"+port+"/"+database
print("Oracle_jdbc_url:",Oracle_jdbc_url)
source_df = spark.read.format("jdbc").option("url", Oracle_jdbc_url).option("dbtable", "(select * from schema.table order by VENDOR_EXECUTION_ORDER) ").option("user", Oracle_Username).option("password", Oracle_Password).load()
vendor_data=source_df.collect()
for row  in vendor_data :
    vendor_query=row.SRC_QUERY
   row.VENDOR_NAME+'_df'= spark.read.format("jdbc").option("url", 
               Oracle_jdbc_url).option("dbtable", vendor_query).option("user", 
            Oracle_Username).option("password", Oracle_Password).load()
    print(row.VENDOR_NAME+'_df')


Added use case in picture
How to generate Pyspark dynamic frame name dynamically

答案1

得分: 1

更新: 根据评论讨论,您的要求是进一步将所有内容与另一个数据框联接。

for row in vendor_data:
  rowAsDict=row.asDict()
  # 这里你可以使用任何变量,因为rowAsDict 不会在其他地方使用
  rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"])
  main_dataframe=main_dataframe.join(rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"], "acc_id")

输入 main_dataframe:

How to generate Pyspark dynamic frame name dynamically

source_df

How to generate Pyspark dynamic frame name dynamically

View1View2

How to generate Pyspark dynamic frame name dynamically

输出 main_dataframe:

How to generate Pyspark dynamic frame name dynamically

如果我理解正确,您需要动态生成 VENDOR_NAME_DF

你无法给 Row 对象赋值,也不能将 dataframe 赋值给 Row,因为你无法创建一个列类型为 Dataframe 的数据框。

不过,你可以使用 asDict 将行转换为字典并代替。

这样会起作用:

vendor_data=source_df.collect()

for row in vendor_data:
  rowAsDict=row.asDict()
  # 用 spark.read() 或其他方法替换这里以创建一个数据框
  rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"]) 
  rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show() 

输入 Source_DF:

How to generate Pyspark dynamic frame name dynamically

SOURCE_QUERY 的结果:

How to generate Pyspark dynamic frame name dynamically

输出 (rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show()):

How to generate Pyspark dynamic frame name dynamically

最终的 rowAsDict:

{'VENDOR_NAME': 'Name1', 'SOURCE_QUERY': 'select * from view1', 'Name1_df': DataFrame[id: string, date: string, Code: string]}
英文:

Update: As discussed in the comments, your requirement is to further join all with another dataframe

for row in vendor_data:
  rowAsDict=row.asDict()
  # Here you can use any variable as rowAsDict is not going to be used anywhere else anyway 
  rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"])
  main_dataframe=main_dataframe.join(rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"], "acc_id")

Input main_dataframe:

How to generate Pyspark dynamic frame name dynamically

source_df :

How to generate Pyspark dynamic frame name dynamically

View1 and View2:

How to generate Pyspark dynamic frame name dynamically

Output main_dataframe

How to generate Pyspark dynamic frame name dynamically

If I understood correctly, you need to generate the VENDOR_NAME_DF dynamically.

You won't be able to assign to the Row Object, neither it'll be useful to assign dataframe to a Row as you can't create a Dataframe with a column of type Dataframe.

Though, you can convert a row to a dict using asDict and use that instead.

This would work:

vendor_data=source_df.collect()

for row in vendor_data:
  rowAsDict=row.asDict()
  # Replace this with spark.read() or any way to create a Dataframe
  rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"]) 
  rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show() 

Input Source_DF:

How to generate Pyspark dynamic frame name dynamically

Result of SOURCE_QUERY:

How to generate Pyspark dynamic frame name dynamically

Output (of rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show()):

How to generate Pyspark dynamic frame name dynamically

Final rowAsDict:

{'VENDOR_NAME': 'Name1', 'SOURCE_QUERY': 'select * from view1', 'Name1_df': DataFrame[id: string, date: string, Code: string]}

答案2

得分: 1

将以下两行添加到你的for循环中,你应该能够获得结果。
第一行是使用动态的df名称创建临时表。
第二行是显示该临时表中的数据。

for row in vendor_data:
    vendor_query = row.SRC_QUERY
    spark.read.format("jdbc").option("url", Oracle_jdbc_url).option("dbtable", vendor_query).option("user", Oracle_Username).option("password", Oracle_Password).load().createOrReplaceTempView(row.VENDOR_NAME + '_df')
    spark.sql("select * from " + row.VENDOR_NAME + "_df").show()
英文:

Add the last two lines in your for loop, you should be able to get the results.
First one is creating a temp table using the dynamic df name
Second is to show the data in that temp table.

for row  in vendor_data :
    vendor_query=row.SRC_QUERY
    spark.read.format("jdbc").option("url", 
               Oracle_jdbc_url).option("dbtable", vendor_query).option("user", 
            Oracle_Username).option("password", Oracle_Password).load().createOrReplaceTempView(row.VENDOR_NAME+'_df')   
    spark.sql("select * from "+row.VENDOR_NAME+"_df").show()
    

huangapple
  • 本文由 发表于 2023年2月19日 09:44:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/75497503.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定