迭代Spark数据集的行并在Java API中应用操作

huangapple go评论69阅读模式
英文:

Iterating rows of a Spark Dataset and applying operations in Java API

问题

// 导入必要的类
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;

// 创建 SparkSession
SparkSession sparkSession = SparkSession.builder()
    .appName("SparkExample")
    .getOrCreate();

// 从 CSV 文件中读取 Dataset
Dataset<Row> csvData = sparkSession.read()
    .format("csv")
    .load(fileToLoad.getAbsolutePath());

// 定义需要的列操作
csvData = csvData.withColumn("price", when(col("price").isNull(), lit(0.00)).otherwise(col("price")));
csvData = csvData.withColumn("price", when(col("color").equalTo("red"), col("price").plus(2.55)).otherwise(col("price")));

// 显示处理后的结果
csvData.show();

// 关闭 SparkSession
sparkSession.close();

请注意,由于你要求只返回翻译后的代码部分,上述代码是对你提供的原始内容的翻译和修改,用于在 Spark 中完成你描述的操作。如果你有任何进一步的问题或需要解释,请随时提问。

英文:

New to Spark (2.4.x) and using the Java API (not Scala!!!)

I have a Dataset that I've read in from a CSV file. It has a schema (named columns) like so:

id (integer)  |  name (string)  |  color (string)  |  price (double)  |  enabled (boolean)

An example row:

23 | &quot;hotmeatballsoup&quot; | &quot;blue&quot; | 3.95 | true

There are many (tens of thousands) rows in the dataset. I would like to write an expression using the proper Java/Spark API, that scrolls through each row and applies the following two operations on each row:

  1. If the price is null, default it to 0.00; and then
  2. If the color column value is "red", add 2.55 to the price

Since I'm so new to Spark I'm not sure even where to begin! My best attempt thus far is definitely wrong, but its a least a starting point I guess:

Dataset csvData = sparkSession.read()
    .format(&quot;csv&quot;)
    .load(fileToLoad.getAbsolutePath());

// ??? get rows somehow
Seq&lt;Seq&lt;String&gt;&gt; csvRows = csvData.getRows(???, ???);

// now how to loop through rows???
for (Seq&lt;String&gt; row : csvRows) {
    // how apply two operations specified above???
    if (row[&quot;price&quot;] == null) {
        row[&quot;price&quot;] = 0.00;
    }

    if (row[&quot;color&quot;].equals(&quot;red&quot;)) {
        row[&quot;price&quot;] = row[&quot;price&quot;] + 2.55;
    }
}

Can someone help nudge me in the right direction here?

答案1

得分: 1

以下是翻译后的内容:

你可以使用Spark SQL API来实现此操作。空值也可以使用DataFrameNaFunctions中的.fill()方法替换为特定值。否则,你可以将DataFrame转换为Dataset,并在.map中执行这些步骤,但在这种情况下,SQL API更好且更有效。

+---+---------------+-----+-----+-------+
| id|           name|color|price|enabled|
+---+---------------+-----+-----+-------+
| 23|hotmeatballsoup| blue| 3.95|   true|
| 24|            abc|  red|  1.0|   true|
| 24|            abc|  red| null|   true|
+---+---------------+-----+-----+-------+

在类声明之前导入SQL函数:

import static org.apache.spark.sql.functions.*;

使用SQL API:

df.select(
    col("id"), col("name"), col("color"),
    when(col("color").equalTo("red").and(col("price").isNotNull()), col("price").plus(2.55))
    .when(col("color").equalTo("red").and(col("price").isNull()), 2.55)
    .otherwise(col("price")).as("price"),
    col("enabled")
).show();

或者使用临时视图和SQL查询:

df.createOrReplaceTempView("df");
spark.sql("select id,name,color, case when color = 'red' and price is not null then (price + 2.55) when color = 'red' and price is null then 2.55 else price end as price, enabled from df").show();

输出结果:

+---+---------------+-----+-----+-------+
| id|           name|color|price|enabled|
+---+---------------+-----+-----+-------+
| 23|hotmeatballsoup| blue| 3.95|   true|
| 24|            abc|  red| 3.55|   true|
| 24|            abc|  red| 2.55|   true|
+---+---------------+-----+-----+-------+
英文:

You could use spark sql api to achieve it. Null values could also be replaced with values using .fill() from DataFrameNaFunctions. Otherwise you could convert Dataframe to Dataset and do these steps in .map, but sql api is better and more efficient in this case.

+---+---------------+-----+-----+-------+
| id|           name|color|price|enabled|
+---+---------------+-----+-----+-------+
| 23|hotmeatballsoup| blue| 3.95|   true|
| 24|            abc|  red|  1.0|   true|
| 24|            abc|  red| null|   true|
+---+---------------+-----+-----+-------+

import sql functions before class declaration:

import static org.apache.spark.sql.functions.*;

sql api:

df.select(
        col(&quot;id&quot;), col(&quot;name&quot;), col(&quot;color&quot;),
        when(col(&quot;color&quot;).equalTo(&quot;red&quot;).and(col(&quot;price&quot;).isNotNull()), col(&quot;price&quot;).plus(2.55))
        .when(col(&quot;color&quot;).equalTo(&quot;red&quot;).and(col(&quot;price&quot;).isNull()), 2.55)
        .otherwise(col(&quot;price&quot;)).as(&quot;price&quot;)
        ,col(&quot;enabled&quot;)
).show();

or using temp view and sql query:

df.createOrReplaceTempView(&quot;df&quot;);
spark.sql(&quot;select id,name,color, case when color = &#39;red&#39; and price is not null then (price + 2.55) when color = &#39;red&#39; and price is null then 2.55 else price end as price, enabled from df&quot;).show();

output:

+---+---------------+-----+-----+-------+
| id|           name|color|price|enabled|
+---+---------------+-----+-----+-------+
| 23|hotmeatballsoup| blue| 3.95|   true|
| 24|            abc|  red| 3.55|   true|
| 24|            abc|  red| 2.55|   true|
+---+---------------+-----+-----+-------+

huangapple
  • 本文由 发表于 2020年4月9日 01:05:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/61106112.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定