英文:
How to generate Pyspark dynamic frame name dynamically
问题
我有一个表,其中的数据如图所示。我想创建动态生成的数据框名称来存储结果。
例如,在下面的示例中,我想创建两个不同的数据框名称 dnb_df 和 es_df,并将读取结果存储在这两个框架中,并打印每个数据框的结构。
当我运行下面的代码时出现错误
> SyntaxError: can't assign to operator (TestGlue2.py, line 66)
英文:
I have a table which has data as shown in the diagram . I want to create store results in dynamically generated data frame names.
For eg here in the below example I want to create two different data frame name
dnb_df and es_df and store the read result in these two frames and print structure of each data frame
When I am running the below code getting the error
> SyntaxError: can't assign to operator (TestGlue2.py, line 66)
import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import regexp_replace, col
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
#sc.setLogLevel('DEBUG')
glueContext = GlueContext(sc)
spark = glueContext.spark_session
#logger = glueContext.get_logger()
#logger.DEBUG('Hello Glue')
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
client = boto3.client('glue', region_name='XXXXXX')
response = client.get_connection(Name='XXXXXX')
connection_properties = response['Connection']['ConnectionProperties']
URL = connection_properties['JDBC_CONNECTION_URL']
url_list = URL.split("/")
host = "{}".format(url_list[-2][:-5])
new_host=host.split('@',1)[1]
port = url_list[-2][-4:]
database = "{}".format(url_list[-1])
Oracle_Username = "{}".format(connection_properties['USERNAME'])
Oracle_Password = "{}".format(connection_properties['PASSWORD'])
#print("Oracle_Username:",Oracle_Username)
#print("Oracle_Password:",Oracle_Password)
print("Host:",host)
print("New Host:",new_host)
print("Port:",port)
print("Database:",database)
Oracle_jdbc_url="jdbc:oracle:thin:@//"+new_host+":"+port+"/"+database
print("Oracle_jdbc_url:",Oracle_jdbc_url)
source_df = spark.read.format("jdbc").option("url", Oracle_jdbc_url).option("dbtable", "(select * from schema.table order by VENDOR_EXECUTION_ORDER) ").option("user", Oracle_Username).option("password", Oracle_Password).load()
vendor_data=source_df.collect()
for row in vendor_data :
vendor_query=row.SRC_QUERY
row.VENDOR_NAME+'_df'= spark.read.format("jdbc").option("url",
Oracle_jdbc_url).option("dbtable", vendor_query).option("user",
Oracle_Username).option("password", Oracle_Password).load()
print(row.VENDOR_NAME+'_df')
答案1
得分: 1
更新: 根据评论讨论,您的要求是进一步将所有内容与另一个数据框联接。
for row in vendor_data:
rowAsDict=row.asDict()
# 这里你可以使用任何变量,因为rowAsDict 不会在其他地方使用
rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"])
main_dataframe=main_dataframe.join(rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"], "acc_id")
输入 main_dataframe
:
source_df
:
View1
和 View2
:
输出 main_dataframe:
如果我理解正确,您需要动态生成 VENDOR_NAME_DF
。
你无法给 Row 对象赋值,也不能将 dataframe 赋值给 Row,因为你无法创建一个列类型为 Dataframe 的数据框。
不过,你可以使用 asDict
将行转换为字典并代替。
这样会起作用:
vendor_data=source_df.collect()
for row in vendor_data:
rowAsDict=row.asDict()
# 用 spark.read() 或其他方法替换这里以创建一个数据框
rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"])
rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show()
输入 Source_DF:
SOURCE_QUERY 的结果:
输出 (rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show()
):
最终的 rowAsDict:
{'VENDOR_NAME': 'Name1', 'SOURCE_QUERY': 'select * from view1', 'Name1_df': DataFrame[id: string, date: string, Code: string]}
英文:
Update: As discussed in the comments, your requirement is to further join all with another dataframe
for row in vendor_data:
rowAsDict=row.asDict()
# Here you can use any variable as rowAsDict is not going to be used anywhere else anyway
rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"])
main_dataframe=main_dataframe.join(rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"], "acc_id")
Input main_dataframe
:
source_df
:
View1
and View2
:
Output main_dataframe
If I understood correctly, you need to generate the VENDOR_NAME_DF
dynamically.
You won't be able to assign to the Row Object, neither it'll be useful to assign dataframe to a Row as you can't create a Dataframe with a column of type Dataframe.
Though, you can convert a row to a dict using asDict
and use that instead.
This would work:
vendor_data=source_df.collect()
for row in vendor_data:
rowAsDict=row.asDict()
# Replace this with spark.read() or any way to create a Dataframe
rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"] = spark.sql(rowAsDict["SOURCE_QUERY"])
rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show()
Input Source_DF:
Result of SOURCE_QUERY:
Output (of rowAsDict[rowAsDict["VENDOR_NAME"]+"_df"].show()
):
Final rowAsDict:
{'VENDOR_NAME': 'Name1', 'SOURCE_QUERY': 'select * from view1', 'Name1_df': DataFrame[id: string, date: string, Code: string]}
答案2
得分: 1
将以下两行添加到你的for循环中,你应该能够获得结果。
第一行是使用动态的df名称创建临时表。
第二行是显示该临时表中的数据。
for row in vendor_data:
vendor_query = row.SRC_QUERY
spark.read.format("jdbc").option("url", Oracle_jdbc_url).option("dbtable", vendor_query).option("user", Oracle_Username).option("password", Oracle_Password).load().createOrReplaceTempView(row.VENDOR_NAME + '_df')
spark.sql("select * from " + row.VENDOR_NAME + "_df").show()
英文:
Add the last two lines in your for loop, you should be able to get the results.
First one is creating a temp table using the dynamic df name
Second is to show the data in that temp table.
for row in vendor_data :
vendor_query=row.SRC_QUERY
spark.read.format("jdbc").option("url",
Oracle_jdbc_url).option("dbtable", vendor_query).option("user",
Oracle_Username).option("password", Oracle_Password).load().createOrReplaceTempView(row.VENDOR_NAME+'_df')
spark.sql("select * from "+row.VENDOR_NAME+"_df").show()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论