英文:
Spark sql registered temp table cannot be used in sqlContext.read()?
问题
我有以下的代码:
Map<String, String> props = getDbConnectionProps();
props.put("dbtable", sql);
props.put("fetchSize", "100000");
props.put("partitionColumn", "col1");
props.put("lowerBound", "25");
props.put("upperBound", "100");
props.put("numPartitions", "10");
String sql = "...";
DataFrame df = sqlContext.read().format("jdbc").options(props).load();
df.registerTempTable("myTable");
df.cache();
Map<String, String> props = getDbConnectionProps();
props.put("dbtable", sql2);
props.put("fetchSize", "100000");
props.put("partitionColumn", "col1");
props.put("lowerBound", "25");
props.put("upperBound", "100");
props.put("numPartitions", "10");
String sql2 = "... inner join myTable on ...."; // 请注意,sql2 使用了临时表
DataFrame df2 = sqlContext.read().format("jdbc").options(props).load();
然而,我收到以下错误:
java.sql.SQLSyntaxErrorException: 表 'myDbSchema.myTable' 不存在
所以,注册的临时表不能在 sqlContext.read()
中使用吗?我理解我可以使用 sqlContext.sql(sql2)
来使用临时表获取结果。但是,如何在 sqlContext.sql()
方式中设置那些分区信息等属性呢?
谢谢。
英文:
I have the following code
Map<String, String> props = getDbConnectionProps();
props.put("dbtable", sql);
props.put("fetchSize", "100000");
props.put("partitionColumn", "col1");
props.put("lowerBound", "25");
props.put("upperBound", "100");
props.put("numPartitions", "10");
String sql = "..."
DataFrame df = sqlContext.read().format("jdbc").options(props).load();
df.registerTempTable("myTable");
df.cache();
Map<String, String> props = getDbConnectionProps();
props.put("dbtable", sql2);
props.put("fetchSize", "100000");
props.put("partitionColumn", "col1");
props.put("lowerBound", "25");
props.put("upperBound", "100");
props.put("numPartitions", "10");
String sql2 = "... inner join myTable on ...." // Note here the sql2 use the temp table
DataFrame df2 = sqlContext.read().format("jdbc").options(props).load();
However, I got an error below
java.sql.SQLSyntaxErrorException: Table 'myDbSchema.myTable' doesn't exist
So the registered temp table cannot be used in sqlContext.read()? I understand I can use sqlContext.sql(sql2)
to get the result using temp table. However, how can I set those properties such as partition information in the sqlContext.sql() way?
Thanks.
答案1
得分: 0
我不确定如何在没有SQL的情况下执行它,但我认为错误可能是因为您试图使用格式(“jdbc”)而不是从您的临时存储设置方式来读取表格“myTable”。
英文:
I am not sure how to do it without sql, but I think the error may be showing up since you are trying to read the table "myTable" with format("jdbc") instead of from however your temporary storage is setup.
答案2
得分: 0
以下是翻译好的部分:
"It is clearly evident that you are reading from Database using .format("jdbc")
, while df.registerTempTable("myTable");
is an spark entity/data present in memory after loading data."
"很明显,你正在使用 .format("jdbc")
从数据库中读取数据,而 df.registerTempTable("myTable");
是在加载数据后在内存中存在的 Spark 实体/数据。"
"DataFrame df2 = sqlContext.read().format("jdbc").options(props).load();"
"DataFrame df2 = sqlContext.read().format("jdbc").options(props).load();"
"And error states that myDbSchema.myTable doesn't exist, because the query string which you have passed is operating on Database."
"错误提示称 myDbSchema.myTable 不存在,因为你传递的查询字符串是在数据库上执行的。"
"sql2 = "... inner join myTable on ....""
"sql2 = "... inner join myTable on ....""
"java.sql.SQLSyntaxErrorException: Table 'myDbSchema.myTable' doesn't exist"
"java.sql.SQLSyntaxErrorException: 表 'myDbSchema.myTable' 不存在"
"For your Question: I understand I can use sqlContext.sql(sql2) to get the result using temp table. However, how can I set those properties such as partition information in the sqlContext.sql() way?"
"对于你的问题:我理解我可以使用 sqlContext.sql(sql2) 来使用临时表获取结果。但是,我该如何在 sqlContext.sql() 方式中设置分区等属性呢?"
"Solution 1 is optimal when two datasets are too huge to be joined/handled by database and vice versa. please find below pseudo code."
"解决方案 1 在两个数据集过大以至于无法由数据库连接/处理时是最佳选择,反之亦然。请查看下面的伪代码。"
"Solution 1: Load data of 2nd table in DF2 then perform join in spark."
"解决方案 1: 在 DF2 中加载第二个表的数据,然后在 Spark 中执行连接。"
"DataFrame df = sqlContext.read().format("jdbc").options(props).load();"
"DataFrame df = sqlContext.read().format("jdbc").options(props).load();"
"DataFrame df2 = sqlContext.read().format("jdbc").options(props2).load();"
"DataFrame df2 = sqlContext.read().format("jdbc").options(props2).load();"
"spark.conf.set("spark.sql.shuffle.partitions",10)"
"spark.conf.set("spark.sql.shuffle.partitions",10)"
"DataFrame joindf = df.join(df2, joinCondition, "inner")"
"DataFrame joindf = df.join(df2, joinCondition, "inner")"
"Solution 2: Create a view/table in database by joining two tables, for example joinedview
here, and load data through read parallelism => Partitions into spark."
"解决方案 2: 通过连接两个表在数据库中创建一个视图/表,例如这里的 joinedview
,然后通过读取并行处理 => 划分分区加载数据到 Spark 中。"
"In Database:"
"在数据库中:"
"create view joinedview as select * from table inner join myTable on (joincondition)"
"create view joinedview as select * from table inner join myTable on (joincondition)"
"In Spark:"
"在 Spark 中:"
"Map<String, String> props = getDbConnectionProps();"
"Map<String, String> props = getDbConnectionProps();"
"props.put("dbtable", joinedview);"
"props.put("dbtable", joinedview);"
"props.put("fetchSize", "100000")"
"props.put("fetchSize", "100000")"
"props.put("partitionColumn", "col1")"
"props.put("partitionColumn", "col1")"
"props.put("lowerBound", "25")"
"props.put("lowerBound", "25")"
"props.put("upperBound", "100")"
"props.put("upperBound", "100")"
"props.put("numPartitions", "10")"
"props.put("numPartitions", "10")"
"DataFrame df2 = sqlContext.read().format("jdbc").options(props).load();"
"DataFrame df2 = sqlContext.read().format("jdbc").options(props).load();"
英文:
It is clearly evident that you are reading from Database using .format("jdbc")
, while df.registerTempTable("myTable");
is an spark entity/data present in memory after loading data.
DataFrame df2 = sqlContext.read().format("jdbc").options(props).load();
And error states that myDbSchema.myTable doesn't exist, because the query string which you have passed is operating on Database.
sql2 = "... inner join myTable on ...."
java.sql.SQLSyntaxErrorException: Table 'myDbSchema.myTable' doesn't exist
For your Question: I understand I can use sqlContext.sql(sql2) to get the result using temp table. However, how can I set those properties such as partition information in the sqlContext.sql() way?
Solution 1 is optimal when two datasets are too huge to be joined/handled by database and vice versa. please find below pseudo code.
Solution 1: Load data of 2nd table in DF2 then perform join in spark.
DataFrame df = sqlContext.read().format("jdbc").options(props).load();
DataFrame df2 = sqlContext.read().format("jdbc").options(props2).load();
spark.conf.set("spark.sql.shuffle.partitions",10)
DataFrame joindf = df.join(df2, joinCondition, "inner")
Solution 2: Create a view/table in database by joining two tables, for example joinedview
here, and load data through read parallelism => Partitions into spark.
In Database:
create view joinedview as
select * from table inner join myTable
on (joincondition)
In Spark:
Map<String, String> props = getDbConnectionProps();
props.put("dbtable", joinedview);
props.put("fetchSize", "100000");
props.put("partitionColumn", "col1");
props.put("lowerBound", "25");
props.put("upperBound", "100");
props.put("numPartitions", "10");
DataFrame df2 = sqlContext.read().format("jdbc").options(props).load();
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论