英文:
spark broadcast variable Map giving null value
问题
我正在使用Java 8和Spark版本2.4.1。
我试图使用广播变量Map
进行查找,如下所示:
输入数据:
+-----+-----+-----+
|code1|code2|code3|
+-----+-----+-----+
|1 |7 | 5 |
|2 |7 | 4 |
|3 |7 | 3 |
|4 |7 | 2 |
|5 |7 | 1 |
+-----+-----+-----+
期望输出:
+-----+-----+-----+
|code1|code2|code3|
+-----+-----+-----+
|1 |7 |51 |
|2 |7 |41 |
|3 |7 |31 |
|4 |7 |21 |
|5 |7 |11 |
+-----+-----+-----+
我当前的代码和我尝试过的不同解决方案如下:
Map<Integer, Integer> lookup_map = new HashMap<>();
lookup_map.put(1, 11);
lookup_map.put(2, 21);
lookup_map.put(3, 31);
lookup_map.put(4, 41);
lookup_map.put(5, 51);
JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
Broadcast<Map<Integer, Integer>> lookup_mapBcVar = javaSparkContext.broadcast(lookup_map);
Dataset<Row> resultDs = dataDs
.withColumn("floor_code3", floor(col("code3")))
.withColumn("floor_code3_int", floor(col("code3")).cast(DataTypes.IntegerType))
.withColumn("map_code3", lit(lookup_mapBcVar.getValue().get(col("floor_code3_int"))))
.withColumn("five", lit(lookup_mapBcVar.getValue().get(5)))
.withColumn("five_lit", lit(lookup_mapBcVar.getValue().get(lit(5).cast(DataTypes.IntegerType))));
当前代码的输出:
root
|-- code1: integer (nullable = true)
|-- code2: integer (nullable = true)
|-- code3: double (nullable = true)
|-- floor_code3: long (nullable = true)
|-- floor_code3_int: integer (nullable = true)
|-- map_code3: null (nullable = true)
|-- five: integer (nullable = false)
|-- five_lit: null (nullable = true)
+-----+-----+-----+-----------+---------------+---------+----+--------+
|code1|code2|code3|floor_code3|floor_code3_int|map_code3|five|five_lit|
+-----+-----+-----+-----------+---------------+---------+----+--------+
| 1| 7| 5.0| 5| 5| null| 51| null|
| 2| 7| 4.0| 4| 4| null| 51| null|
| 3| 7| 3.0| 3| 3| null| 51| null|
| 4| 7| 2.0| 2| 2| null| 51| null|
| 5| 7| 1.0| 1| 1| null| 51| null|
+-----+-----+-----+-----------+---------------+---------+----+--------+
要重新创建输入数据:
List<String[]> stringAsList = new ArrayList<>();
stringAsList.add(new String[] { "1", "7", "5" });
stringAsList.add(new String[] { "2", "7", "4" });
stringAsList.add(new String[] { "3", "7", "3" });
stringAsList.add(new String[] { "4", "7", "2" });
stringAsList.add(new String[] { "5", "7", "1" });
JavaSparkContext sparkContext = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String[] row) -> RowFactory.create(row));
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("code1", DataTypes.StringType, false),
DataTypes.createStructField("code2", DataTypes.StringType, false),
DataTypes.createStructField("code3", DataTypes.StringType, false)
});
Dataset<Row> dataDf = sparkSession.sqlContext().createDataFrame(rowRDD, schema).toDF();
Dataset<Row> dataDs = dataDf
.withColumn("code1", col("code1").cast(DataTypes.IntegerType))
.withColumn("code2", col("code2").cast(DataTypes.IntegerType))
.withColumn("code3", col("code3").cast(DataTypes.IntegerType));
我在这里做错了什么?
英文:
I am using java8 with spark v2.4.1.
I am trying to use Broadcast variable Map
for look up using as show below:
Input data:
+-----+-----+-----+
|code1|code2|code3|
+-----+-----+-----+
|1 |7 | 5 |
|2 |7 | 4 |
|3 |7 | 3 |
|4 |7 | 2 |
|5 |7 | 1 |
+-----+-----+-----+
Expected Output:
+-----+-----+-----+
|code1|code2|code3|
+-----+-----+-----+
|1 |7 |51 |
|2 |7 |41 |
|3 |7 |31 |
|4 |7 |21 |
|5 |7 |11 |
+-----+-----+-----+
My current code with different solutions that I have tried:
Map<Integer,Integer> lookup_map= new HashMap<>();
lookup_map.put(1,11);
lookup_map.put(2,21);
lookup_map.put(3,31);
lookup_map.put(4,41);
lookup_map.put(5,51);
JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
Broadcast<Map<Integer,Integer>> lookup_mapBcVar = javaSparkContext.broadcast(lookup_map);
Dataset<Row> resultDs= dataDs
.withColumn("floor_code3", floor(col("code3")))
.withColumn("floor_code3_int", floor(col("code3")).cast(DataTypes.IntegerType))
.withColumn("map_code3", lit(((Map<Integer, Integer>)lookup_mapBcVar.getValue()).get(col("floor_code3_int"))))
.withColumn("five", lit(((Map<Integer, Integer>)lookup_mapBcVar.getValue()).get(5)))
.withColumn("five_lit", lit(((Map<Integer, Integer>)lookup_mapBcVar.getValue()).get(lit(5).cast(DataTypes.IntegerType))));
The output of the current code using:
resultDs.printSchema();
resultDs.show();
root
|-- code1: integer (nullable = true)
|-- code2: integer (nullable = true)
|-- code3: double (nullable = true)
|-- floor_code3: long (nullable = true)
|-- floor_code3_int: integer (nullable = true)
|-- map_code3: null (nullable = true)
|-- five: integer (nullable = false)
|-- five_lit: null (nullable = true)
+-----+-----+-----+-----------+---------------+---------+----+--------+
|code1|code2|code3|floor_code3|floor_code3_int|map_code3|five|five_lit|
+-----+-----+-----+-----------+---------------+---------+----+--------+
| 1| 7| 5.0| 5| 5| null| 51| null|
| 2| 7| 4.0| 4| 4| null| 51| null|
| 3| 7| 3.0| 3| 3| null| 51| null|
| 4| 7| 2.0| 2| 2| null| 51| null|
| 5| 7| 1.0| 1| 1| null| 51| null|
+-----+-----+-----+-----------+---------------+---------+----+--------+
To recreate the input data:
List<String[]> stringAsList = new ArrayList<>();
stringAsList.add(new String[] { "1","7","5" });
stringAsList.add(new String[] { "2","7","4" });
stringAsList.add(new String[] { "3","7","3" });
stringAsList.add(new String[] { "4","7","2" });
stringAsList.add(new String[] { "5","7","1" });
JavaSparkContext sparkContext = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String[] row) -> RowFactory.create(row));
StructType schema = DataTypes
.createStructType(new StructField[] {
DataTypes.createStructField("code1", DataTypes.StringType, false),
DataTypes.createStructField("code2", DataTypes.StringType, false),
DataTypes.createStructField("code3", DataTypes.StringType, false)
});
Dataset<Row> dataDf= sparkSession.sqlContext().createDataFrame(rowRDD, schema).toDF();
Dataset<Row> dataDs = dataDf
.withColumn("code1", col("code1").cast(DataTypes.IntegerType))
.withColumn("code2", col("code2").cast(DataTypes.IntegerType))
.withColumn("code3", col("code3").cast(DataTypes.IntegerType));
What am I doing wrong here?
Scala Notebook for the same here
答案1
得分: 1
`lit()`返回Column类型,但是`map.get`要求int类型
你可以这样做
val df: DataFrame = spark.sparkContext.parallelize(Range(0, 10000), 4).toDF("sentiment")
val map = new util.HashMapInt, Int
map.put(1, 1)
map.put(2, 2)
map.put(3, 3)
val bf: Broadcast[util.HashMap[Int, Int]] = spark.sparkContext.broadcast(map)
df.rdd.map(x => {
val num = x.getInt(0)
(num, bf.value.get(num))
}).toDF("key", "add_key").show(false)
<details>
<summary>英文:</summary>
`lit()` return Column type, but map.get require the int type
you can do in this way
val df: DataFrame = spark.sparkContext.parallelize(Range(0, 10000), 4).toDF("sentiment")
val map = new util.HashMap[Int, Int]()
map.put(1, 1)
map.put(2, 2)
map.put(3, 3)
val bf: Broadcast[util.HashMap[Int, Int]] = spark.sparkContext.broadcast(map)
df.rdd.map(x => {
val num = x.getInt(0)
(num, bf.value.get(num))
}).toDF("key", "add_key").show(false)
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论