英文:
How to set schema into spark.sql.function.from_csv?
问题
我使用的是Windows 11上的spark-3.4.1-hadoop3。我尝试生成模式以传递给from_csv函数参数。以下是我的代码。
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.from_csv;
import static org.apache.spark.sql.functions.not;
import java.util.HashMap;
import java.util.Map;
SparkSession spark = SparkSession.builder().appName("FromCsvStructExample").getOrCreate();
Dataset<Row> df = spark.read().format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/path/to/csv/file");
Map<String, String> options = new HashMap<String, String>();
String schemaString = "name string, age int, job string";
Column schema = from_csv(col("csv"), col(schemaString), options);
Dataset<Row> parsed = df.select(schema.as("data"));
parsed.printSchema();
spark.close();
但是代码引发了以下异常。
Exception in thread "main" org.apache.spark.sql.AnalysisException: [INVALID_SCHEMA.NON_STRING_LITERAL] The input schema "name string, age int, job string" is not a valid schema string. The input expression must be string literal and not null.
at org.apache.spark.sql.errors.QueryCompilationErrors$.unexpectedSchemaTypeError(QueryCompilationErrors.scala:1055)
at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalTypeExpr(ExprUtils.scala:42)
at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalSchemaExpr(ExprUtils.scala:47)
at org.apache.spark.sql.catalyst.expressions.CsvToStructs.<init>(csvExpressions.scala:72)
at org.apache.spark.sql.functions$.from_csv(functions.scala:4955)
at org.apache.spark.sql.functions.from_csv(functions.scala)
at com.aaa.etl.processor.Test_CSV.main(Test_CSV.java:43)
我担心schemaString
不正确,无法用于org.apache.spark.sql.functions.col
函数。请告诉我如何使用org.apache.spark.sql.functions.col
函数生成模式。我知道有一个重载的from_csv函数,其模式参数类型为StructType。但是在使用此函数时,我必须创建Scala函数,而我甚至没有Scala的基础知识。
== 更新部分
我尝试使用Java特定的from_csv方法。
from_csv(Column e, Column schema, java.util.Map<String,String> options)
正如您所知,模式的类型不是StructType,而是Column。我在这一部分陷入困境。我不知道如何在Java中生成Column类型的模式。如果您有任何关于生成Java Column类型模式的参考资料,请告诉我。
英文:
I use spark-3.4.1-hadoop3 on windows 11. And I try to generate the schema to pass into from_csv function parameter.
Belows are my codes.
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.from_csv;
import static org.apache.spark.sql.functions.not;
import java.util.HashMap;
import java.util.Map;
SparkSession spark = SparkSession.builder().appName("FromCsvStructExample").getOrCreate();
Dataset<Row> df = spark.read().format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/path/to/csv/file");
Map<String, String> options = new HashMap<String, String>();
String schemaString = "name string, age int, job string";
Column schema = from_csv(col("csv"), col(schemaString), options);
Dataset<Row> parsed = df.select(schema.as("data"));
parsed.printSchema();
spark.close();
But the codes throw the following exceptions.
Exception in thread "main" org.apache.spark.sql.AnalysisException: [INVALID_SCHEMA.NON_STRING_LITERAL] The input schema "name string, age int, job string" is not a valid schema string. The input expression must be string literal and not null.
at org.apache.spark.sql.errors.QueryCompilationErrors$.unexpectedSchemaTypeError(QueryCompilationErrors.scala:1055)
at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalTypeExpr(ExprUtils.scala:42)
at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalSchemaExpr(ExprUtils.scala:47)
at org.apache.spark.sql.catalyst.expressions.CsvToStructs.<init>(csvExpressions.scala:72)
at org.apache.spark.sql.functions$.from_csv(functions.scala:4955)
at org.apache.spark.sql.functions.from_csv(functions.scala)
at com.aaa.etl.processor.Test_CSV.main(Test_CSV.java:43)
I am afraid the schemaString is not correct for org.apache.spark.sql.functions.col
function. Kindly inform me how to generate the schema with org.apache.spark.sql.functions.col
function. I know there is overloaded from_csv function which schema parameter type is StructType. But in using this function I have to make scala function and I even have no basic knowledge of scala.
== Updated Part
I try to use the Java-specific from_csv method.
from_csv(Column e, Column schema, java.util.Map<String,String> options)
As you know the type of schema is not StructType, but Column. I am stuck on this part. I have no idea how to generate the Column type schema in java.
If you have any reference which generate the java Column type schema, Kindly inform me how.
答案1
得分: 1
你说得对,你不能直接根据DDL字符串生成一个Column
。一种方法是使用lit或StructType.fromDDL函数。正如你已经提到的,from_csv函数的一个签名接受一个StructType
作为模式。然后,Scala代码将如下所示:
import org.apache.spark.sql.types.StructType
var schema: StructType = StructType.fromDDL("name string, age int, job string")
// StructType(
// StructField(name,StringType,true),
// StructField(age,IntegerType,true),
// StructField(job,StringType,true)
// )
val targetCol = from_csv(col("csv"), schema, options)
对于Java,代码应该非常类似。
至于from_csv的另一个签名,它接受一个Column而不是StructType
,它与lit
函数一起使用,如相应的单元测试所示测试。这是在你更喜欢将模式作为字符串传递的情况下使用的。
对于你的情况,代码将是:
val schema = "name string, age int, job string"
val targetCol = from_csv(col("csv"), lit(schema), options)
英文:
You are right you can't generate directly a Column
given a DDL string. One way is to use lit or StructType.fromDDL function. As you already mentioned one signature of from_csv function accepts a StructType for the schema. Then the Scala code would look as next:
import org.apache.spark.sql.types.StructType
var schema: StructType = StructType.fromDDL("name string, age int, job string")
// StructType(
// StructField(name,StringType,true),
// StructField(age,IntegerType,true),
// StructField(job,StringType,true)
// )
val targetCol = from_csv(col("csv"), schema, options)
The code should be very similar for Java.
As per the other signature of from_csv, which accepts a Column instead of a StructType
, it is used in combination with lit
function as shown in the corresponding unit test. This is for cases where you prefer passing the schema as a string.
For your case that would have been:
val schema = "name string, age int, job string"
val targetCol = from_csv(col("csv"), lit(schema), options)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论