火花广播变量 Map 给出了空值

huangapple go评论78阅读模式
英文:

spark broadcast variable Map giving null value

问题

我正在使用Java 8和Spark版本2.4.1。

我试图使用广播变量Map进行查找,如下所示:

输入数据:

+-----+-----+-----+
|code1|code2|code3|
+-----+-----+-----+
|1    |7    |  5  |
|2    |7    |  4  |
|3    |7    |  3  |
|4    |7    |  2  |
|5    |7    |  1  |
+-----+-----+-----+

期望输出:

+-----+-----+-----+
|code1|code2|code3|
+-----+-----+-----+
|1    |7    |51   |
|2    |7    |41   |
|3    |7    |31   |
|4    |7    |21   |
|5    |7    |11   |
+-----+-----+-----+

我当前的代码和我尝试过的不同解决方案如下:

Map<Integer, Integer> lookup_map = new HashMap<>();
lookup_map.put(1, 11);
lookup_map.put(2, 21);
lookup_map.put(3, 31);
lookup_map.put(4, 41);
lookup_map.put(5, 51);

JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
Broadcast<Map<Integer, Integer>> lookup_mapBcVar = javaSparkContext.broadcast(lookup_map);

Dataset<Row> resultDs = dataDs
    .withColumn("floor_code3", floor(col("code3")))
    .withColumn("floor_code3_int", floor(col("code3")).cast(DataTypes.IntegerType))
    .withColumn("map_code3", lit(lookup_mapBcVar.getValue().get(col("floor_code3_int"))))
    .withColumn("five", lit(lookup_mapBcVar.getValue().get(5)))
    .withColumn("five_lit", lit(lookup_mapBcVar.getValue().get(lit(5).cast(DataTypes.IntegerType))));

当前代码的输出:

root
 |-- code1: integer (nullable = true)
 |-- code2: integer (nullable = true)
 |-- code3: double (nullable = true)
 |-- floor_code3: long (nullable = true)
 |-- floor_code3_int: integer (nullable = true)
 |-- map_code3: null (nullable = true)
 |-- five: integer (nullable = false)
 |-- five_lit: null (nullable = true)

+-----+-----+-----+-----------+---------------+---------+----+--------+
|code1|code2|code3|floor_code3|floor_code3_int|map_code3|five|five_lit|
+-----+-----+-----+-----------+---------------+---------+----+--------+
|    1|    7|  5.0|          5|              5|     null|  51|    null|
|    2|    7|  4.0|          4|              4|     null|  51|    null|
|    3|    7|  3.0|          3|              3|     null|  51|    null|
|    4|    7|  2.0|          2|              2|     null|  51|    null|
|    5|    7|  1.0|          1|              1|     null|  51|    null|
+-----+-----+-----+-----------+---------------+---------+----+--------+

要重新创建输入数据:

List<String[]> stringAsList = new ArrayList<>();
stringAsList.add(new String[] { "1", "7", "5" });
stringAsList.add(new String[] { "2", "7", "4" });
stringAsList.add(new String[] { "3", "7", "3" });
stringAsList.add(new String[] { "4", "7", "2" });
stringAsList.add(new String[] { "5", "7", "1" });

JavaSparkContext sparkContext = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String[] row) -> RowFactory.create(row));

StructType schema = DataTypes.createStructType(new StructField[] {
    DataTypes.createStructField("code1", DataTypes.StringType, false),
    DataTypes.createStructField("code2", DataTypes.StringType, false),
    DataTypes.createStructField("code3", DataTypes.StringType, false)
});

Dataset<Row> dataDf = sparkSession.sqlContext().createDataFrame(rowRDD, schema).toDF();

Dataset<Row> dataDs = dataDf
    .withColumn("code1", col("code1").cast(DataTypes.IntegerType))
    .withColumn("code2", col("code2").cast(DataTypes.IntegerType))
    .withColumn("code3", col("code3").cast(DataTypes.IntegerType));

我在这里做错了什么?

英文:

I am using java8 with spark v2.4.1.

I am trying to use Broadcast variable Map for look up using as show below:

Input data:

+-----+-----+-----+
|code1|code2|code3|
+-----+-----+-----+
|1    |7    |  5  |
|2    |7    |  4  |
|3    |7    |  3  |
|4    |7    |  2  |
|5    |7    |  1  |
+-----+-----+-----+

Expected Output:

+-----+-----+-----+
|code1|code2|code3|
+-----+-----+-----+
|1    |7    |51   |
|2    |7    |41   |
|3    |7    |31   |
|4    |7    |21   |
|5    |7    |11   |
+-----+-----+-----+

My current code with different solutions that I have tried:

Map&lt;Integer,Integer&gt; lookup_map= new HashMap&lt;&gt;();
lookup_map.put(1,11);
lookup_map.put(2,21);
lookup_map.put(3,31);
lookup_map.put(4,41);
lookup_map.put(5,51);
JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
Broadcast&lt;Map&lt;Integer,Integer&gt;&gt; lookup_mapBcVar = javaSparkContext.broadcast(lookup_map);
Dataset&lt;Row&gt; resultDs= dataDs
.withColumn(&quot;floor_code3&quot;, floor(col(&quot;code3&quot;)))
.withColumn(&quot;floor_code3_int&quot;, floor(col(&quot;code3&quot;)).cast(DataTypes.IntegerType))
.withColumn(&quot;map_code3&quot;, lit(((Map&lt;Integer, Integer&gt;)lookup_mapBcVar.getValue()).get(col(&quot;floor_code3_int&quot;))))
.withColumn(&quot;five&quot;, lit(((Map&lt;Integer, Integer&gt;)lookup_mapBcVar.getValue()).get(5)))
.withColumn(&quot;five_lit&quot;, lit(((Map&lt;Integer, Integer&gt;)lookup_mapBcVar.getValue()).get(lit(5).cast(DataTypes.IntegerType))));

The output of the current code using:

resultDs.printSchema();						  
resultDs.show();
root
|-- code1: integer (nullable = true)
|-- code2: integer (nullable = true)
|-- code3: double (nullable = true)
|-- floor_code3: long (nullable = true)
|-- floor_code3_int: integer (nullable = true)
|-- map_code3: null (nullable = true)
|-- five: integer (nullable = false)
|-- five_lit: null (nullable = true)
+-----+-----+-----+-----------+---------------+---------+----+--------+
|code1|code2|code3|floor_code3|floor_code3_int|map_code3|five|five_lit|
+-----+-----+-----+-----------+---------------+---------+----+--------+
|    1|    7|  5.0|          5|              5|     null|  51|    null|
|    2|    7|  4.0|          4|              4|     null|  51|    null|
|    3|    7|  3.0|          3|              3|     null|  51|    null|
|    4|    7|  2.0|          2|              2|     null|  51|    null|
|    5|    7|  1.0|          1|              1|     null|  51|    null|
+-----+-----+-----+-----------+---------------+---------+----+--------+

To recreate the input data:

List&lt;String[]&gt; stringAsList = new ArrayList&lt;&gt;();
stringAsList.add(new String[] { &quot;1&quot;,&quot;7&quot;,&quot;5&quot; });
stringAsList.add(new String[] { &quot;2&quot;,&quot;7&quot;,&quot;4&quot; });
stringAsList.add(new String[] { &quot;3&quot;,&quot;7&quot;,&quot;3&quot; });
stringAsList.add(new String[] { &quot;4&quot;,&quot;7&quot;,&quot;2&quot; });
stringAsList.add(new String[] { &quot;5&quot;,&quot;7&quot;,&quot;1&quot; });
JavaSparkContext sparkContext = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD&lt;Row&gt; rowRDD = sparkContext.parallelize(stringAsList).map((String[] row) -&gt; RowFactory.create(row));
StructType schema = DataTypes
.createStructType(new StructField[] {
DataTypes.createStructField(&quot;code1&quot;, DataTypes.StringType, false),
DataTypes.createStructField(&quot;code2&quot;, DataTypes.StringType, false),
DataTypes.createStructField(&quot;code3&quot;, DataTypes.StringType, false)
});
Dataset&lt;Row&gt; dataDf= sparkSession.sqlContext().createDataFrame(rowRDD, schema).toDF();
Dataset&lt;Row&gt; dataDs = dataDf
.withColumn(&quot;code1&quot;, col(&quot;code1&quot;).cast(DataTypes.IntegerType))
.withColumn(&quot;code2&quot;, col(&quot;code2&quot;).cast(DataTypes.IntegerType))
.withColumn(&quot;code3&quot;, col(&quot;code3&quot;).cast(DataTypes.IntegerType));

What am I doing wrong here?

Scala Notebook for the same here

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/3062033079132966/7035720262824085/latest.html

答案1

得分: 1

`lit()`返回Column类型但是`map.get`要求int类型
你可以这样做

val df: DataFrame = spark.sparkContext.parallelize(Range(0, 10000), 4).toDF("sentiment")
val map = new util.HashMapInt, Int
map.put(1, 1)
map.put(2, 2)
map.put(3, 3)
val bf: Broadcast[util.HashMap[Int, Int]] = spark.sparkContext.broadcast(map)
df.rdd.map(x => {
val num = x.getInt(0)
(num, bf.value.get(num))
}).toDF("key", "add_key").show(false)


<details>
<summary>英文:</summary>
`lit()` return Column type, but map.get require the int type
you can do in this way
val df: DataFrame = spark.sparkContext.parallelize(Range(0, 10000), 4).toDF(&quot;sentiment&quot;)
val map = new util.HashMap[Int, Int]()
map.put(1, 1)
map.put(2, 2)
map.put(3, 3)
val bf: Broadcast[util.HashMap[Int, Int]] = spark.sparkContext.broadcast(map)
df.rdd.map(x =&gt; {
val num = x.getInt(0)
(num, bf.value.get(num))
}).toDF(&quot;key&quot;, &quot;add_key&quot;).show(false)

</details>

huangapple
  • 本文由 发表于 2020年9月22日 13:44:16
  • 转载请务必保留本文链接:https://go.coder-hub.com/64003697.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定