
huangapple go评论70阅读模式

How to set schema into spark.sql.function.from_csv?


我使用的是Windows 11上的spark-3.4.1-hadoop3。我尝试生成模式以传递给from_csv函数参数。以下是我的代码。

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.from_csv;
import static org.apache.spark.sql.functions.not;

import java.util.HashMap;
import java.util.Map;

SparkSession spark = SparkSession.builder().appName("FromCsvStructExample").getOrCreate();

Dataset<Row> df = spark.read().format("csv")
      .option("header", "true")
      .option("inferSchema", "true")

Map<String, String> options = new HashMap<String, String>();

String schemaString = "name string, age int, job string";

Column schema = from_csv(col("csv"), col(schemaString), options);
Dataset<Row> parsed = df.select(schema.as("data"));


Exception in thread "main" org.apache.spark.sql.AnalysisException: [INVALID_SCHEMA.NON_STRING_LITERAL] The input schema "name string, age int, job string" is not a valid schema string. The input expression must be string literal and not null.
        at org.apache.spark.sql.errors.QueryCompilationErrors$.unexpectedSchemaTypeError(QueryCompilationErrors.scala:1055)    
        at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalTypeExpr(ExprUtils.scala:42)
        at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalSchemaExpr(ExprUtils.scala:47)
        at org.apache.spark.sql.catalyst.expressions.CsvToStructs.<init>(csvExpressions.scala:72)
        at org.apache.spark.sql.functions$.from_csv(functions.scala:4955)
        at org.apache.spark.sql.functions.from_csv(functions.scala)
        at com.aaa.etl.processor.Test_CSV.main(Test_CSV.java:43)


== 更新部分


from_csv(Column e, Column schema, java.util.Map<String,String> options)

正如您所知,模式的类型不是StructType,而是Column。我在这一部分陷入困境。我不知道如何在Java中生成Column类型的模式。如果您有任何关于生成Java Column类型模式的参考资料,请告诉我。


I use spark-3.4.1-hadoop3 on windows 11. And I try to generate the schema to pass into from_csv function parameter.
Belows are my codes.

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.from_csv;
import static org.apache.spark.sql.functions.not;

import java.util.HashMap;
import java.util.Map;

SparkSession spark = SparkSession.builder().appName(&quot;FromCsvStructExample&quot;).getOrCreate();

Dataset&lt;Row&gt; df = spark.read().format(&quot;csv&quot;)
      .option(&quot;header&quot;, &quot;true&quot;)
      .option(&quot;inferSchema&quot;, &quot;true&quot;)

Map&lt;String, String&gt; options = new HashMap&lt;String, String&gt;();

String schemaString = &quot;name string, age int, job string&quot;;

Column schema = from_csv(col(&quot;csv&quot;), col(schemaString), options);
Dataset&lt;Row&gt; parsed = df.select(schema.as(&quot;data&quot;));

But the codes throw the following exceptions.

Exception in thread &quot;main&quot; org.apache.spark.sql.AnalysisException: [INVALID_SCHEMA.NON_STRING_LITERAL] The input schema &quot;name string, age int, job string&quot; is not a valid schema string. The input expression must be string literal and not null.
        at org.apache.spark.sql.errors.QueryCompilationErrors$.unexpectedSchemaTypeError(QueryCompilationErrors.scala:1055)    
        at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalTypeExpr(ExprUtils.scala:42)
        at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalSchemaExpr(ExprUtils.scala:47)
        at org.apache.spark.sql.catalyst.expressions.CsvToStructs.&lt;init&gt;(csvExpressions.scala:72)
        at org.apache.spark.sql.functions$.from_csv(functions.scala:4955)
        at org.apache.spark.sql.functions.from_csv(functions.scala)
        at com.aaa.etl.processor.Test_CSV.main(Test_CSV.java:43)

I am afraid the schemaString is not correct for org.apache.spark.sql.functions.col function. Kindly inform me how to generate the schema with org.apache.spark.sql.functions.col function. I know there is overloaded from_csv function which schema parameter type is StructType. But in using this function I have to make scala function and I even have no basic knowledge of scala.

== Updated Part

I try to use the Java-specific from_csv method.

from_csv(Column e, Column schema, java.util.Map&lt;String,String&gt; options)

As you know the type of schema is not StructType, but Column. I am stuck on this part. I have no idea how to generate the Column type schema in java.
If you have any reference which generate the java Column type schema, Kindly inform me how.


得分: 1


import org.apache.spark.sql.types.StructType

var schema: StructType = StructType.fromDDL("name string, age int, job string")

// StructType(
//   StructField(name,StringType,true),
//   StructField(age,IntegerType,true),
//   StructField(job,StringType,true)
// )

val targetCol = from_csv(col("csv"), schema, options)




val schema = "name string, age int, job string"

val targetCol = from_csv(col("csv"), lit(schema), options)

You are right you can't generate directly a Column given a DDL string. One way is to use lit or StructType.fromDDL function. As you already mentioned one signature of from_csv function accepts a StructType for the schema. Then the Scala code would look as next:

import org.apache.spark.sql.types.StructType

var schema: StructType = StructType.fromDDL(&quot;name string, age int, job string&quot;)

// StructType(
//   StructField(name,StringType,true),
//   StructField(age,IntegerType,true),
//   StructField(job,StringType,true)
// )

val targetCol = from_csv(col(&quot;csv&quot;), schema, options)

The code should be very similar for Java.

As per the other signature of from_csv, which accepts a Column instead of a StructType, it is used in combination with lit function as shown in the corresponding unit test. This is for cases where you prefer passing the schema as a string.

For your case that would have been:

val schema = &quot;name string, age int, job string&quot;

val targetCol = from_csv(col(&quot;csv&quot;), lit(schema), options)

  • 本文由 发表于 2023年7月24日 19:10:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/76753880.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
