英文:
Iterating rows of a Spark Dataset and applying operations in Java API
问题
// 导入必要的类
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;
// 创建 SparkSession
SparkSession sparkSession = SparkSession.builder()
.appName("SparkExample")
.getOrCreate();
// 从 CSV 文件中读取 Dataset
Dataset<Row> csvData = sparkSession.read()
.format("csv")
.load(fileToLoad.getAbsolutePath());
// 定义需要的列操作
csvData = csvData.withColumn("price", when(col("price").isNull(), lit(0.00)).otherwise(col("price")));
csvData = csvData.withColumn("price", when(col("color").equalTo("red"), col("price").plus(2.55)).otherwise(col("price")));
// 显示处理后的结果
csvData.show();
// 关闭 SparkSession
sparkSession.close();
请注意,由于你要求只返回翻译后的代码部分,上述代码是对你提供的原始内容的翻译和修改,用于在 Spark 中完成你描述的操作。如果你有任何进一步的问题或需要解释,请随时提问。
英文:
New to Spark (2.4.x) and using the Java API (not Scala!!!)
I have a Dataset
that I've read in from a CSV file. It has a schema (named columns) like so:
id (integer) | name (string) | color (string) | price (double) | enabled (boolean)
An example row:
23 | "hotmeatballsoup" | "blue" | 3.95 | true
There are many (tens of thousands) rows in the dataset. I would like to write an expression using the proper Java/Spark API, that scrolls through each row and applies the following two operations on each row:
- If the price is
null
, default it to0.00
; and then - If the color column value is "red", add
2.55
to the price
Since I'm so new to Spark I'm not sure even where to begin! My best attempt thus far is definitely wrong, but its a least a starting point I guess:
Dataset csvData = sparkSession.read()
.format("csv")
.load(fileToLoad.getAbsolutePath());
// ??? get rows somehow
Seq<Seq<String>> csvRows = csvData.getRows(???, ???);
// now how to loop through rows???
for (Seq<String> row : csvRows) {
// how apply two operations specified above???
if (row["price"] == null) {
row["price"] = 0.00;
}
if (row["color"].equals("red")) {
row["price"] = row["price"] + 2.55;
}
}
Can someone help nudge me in the right direction here?
答案1
得分: 1
以下是翻译后的内容:
你可以使用Spark SQL API来实现此操作。空值也可以使用DataFrameNaFunctions
中的.fill()
方法替换为特定值。否则,你可以将DataFrame转换为Dataset,并在.map
中执行这些步骤,但在这种情况下,SQL API更好且更有效。
+---+---------------+-----+-----+-------+
| id| name|color|price|enabled|
+---+---------------+-----+-----+-------+
| 23|hotmeatballsoup| blue| 3.95| true|
| 24| abc| red| 1.0| true|
| 24| abc| red| null| true|
+---+---------------+-----+-----+-------+
在类声明之前导入SQL函数:
import static org.apache.spark.sql.functions.*;
使用SQL API:
df.select(
col("id"), col("name"), col("color"),
when(col("color").equalTo("red").and(col("price").isNotNull()), col("price").plus(2.55))
.when(col("color").equalTo("red").and(col("price").isNull()), 2.55)
.otherwise(col("price")).as("price"),
col("enabled")
).show();
或者使用临时视图和SQL查询:
df.createOrReplaceTempView("df");
spark.sql("select id,name,color, case when color = 'red' and price is not null then (price + 2.55) when color = 'red' and price is null then 2.55 else price end as price, enabled from df").show();
输出结果:
+---+---------------+-----+-----+-------+
| id| name|color|price|enabled|
+---+---------------+-----+-----+-------+
| 23|hotmeatballsoup| blue| 3.95| true|
| 24| abc| red| 3.55| true|
| 24| abc| red| 2.55| true|
+---+---------------+-----+-----+-------+
英文:
You could use spark sql api to achieve it. Null values could also be replaced with values using .fill()
from DataFrameNaFunctions
. Otherwise you could convert Dataframe to Dataset and do these steps in .map
, but sql api is better and more efficient in this case.
+---+---------------+-----+-----+-------+
| id| name|color|price|enabled|
+---+---------------+-----+-----+-------+
| 23|hotmeatballsoup| blue| 3.95| true|
| 24| abc| red| 1.0| true|
| 24| abc| red| null| true|
+---+---------------+-----+-----+-------+
import sql functions before class declaration:
import static org.apache.spark.sql.functions.*;
sql api:
df.select(
col("id"), col("name"), col("color"),
when(col("color").equalTo("red").and(col("price").isNotNull()), col("price").plus(2.55))
.when(col("color").equalTo("red").and(col("price").isNull()), 2.55)
.otherwise(col("price")).as("price")
,col("enabled")
).show();
or using temp view and sql query:
df.createOrReplaceTempView("df");
spark.sql("select id,name,color, case when color = 'red' and price is not null then (price + 2.55) when color = 'red' and price is null then 2.55 else price end as price, enabled from df").show();
output:
+---+---------------+-----+-----+-------+
| id| name|color|price|enabled|
+---+---------------+-----+-----+-------+
| 23|hotmeatballsoup| blue| 3.95| true|
| 24| abc| red| 3.55| true|
| 24| abc| red| 2.55| true|
+---+---------------+-----+-----+-------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论