将RDD根据某个值拆分成不同的RDD,并将每个部分传递给一个函数。

huangapple go评论56阅读模式
英文:

How to split an RDD into different RDD's based on a value and give every part to a function

问题

我有一个RDD,其中每个元素都是一个案例类,就像这样:
case class Element(target: Boolean, data: String)
现在我需要根据String数据的内容(它是一个离散变量)拆分RDD。
然后在每个拆分上执行一个函数def f(elements: RDD[Element]): Double

我尝试创建一个类似这样的pairRDD:val test = elementsRDD.map(E => (E.data, E)),这样我就有了(key, value)对,但我不知道接下来该怎么做(因为groupBy返回了Iteravle(V),而不是所有值的RDD)。

我还可以过滤data: String的每个可能值,并在结果上执行函数f。但我事先不知道data: String可能取的所有值。而且似乎先遍历所有数据以检查不同的可能性,然后多次对其进行过滤也不高效。

那么有没有一种高效的方法来实现这个目标?

英文:

I have an RDD in which every element is a case class, like this:
case class Element(target: Boolean, data: String)
Now I need to split the RDD based on what the String data is (it is a discrete variable).
And then execute a function def f(elements: RDD[Element]): Double on every split.

I have tried to make a pairRDD like this: val test = elementsRDD.map(E => (E.data, E)) so I have (key, value) pairs but I don't know what to do after this (how to split them because groupBy gives back Iteravle(V) and not an RDD of all the values).

I could also filter on each possible value of data: String and execute function f on the results. But I don't know all the possible values that ´´´data: String´´´ can take in advance. And it doesn't seem efficient to first go over all the data to check the different possibilities and then also filter over it multiple times.

So is there a way it can be done efficiently?

答案1

得分: 1

以下是翻译好的部分:

你真正需要做的是根据 data 进行汇总计数,具体取决于布尔值可以采用的2个值。其余部分是仅依赖于这2个值的简单计算。

val rdd = sc.parallelize(
  Seq(Element(true, "a"), Element(false, "a"), Element(true, "a"),
    Element(false, "b"), Element(false, "b"), Element(true, "b")))

val log2 = math.log(2)

// 计算一个RDD[(String, (Int, Int))],元组的第一个元素是“true”的数量,第二个元素是“false”的数量
val entropy = rdd.map(e => (e.data, e.target)).aggregateByKey((0, 0))({
  case ((t, f), target) => if (target) (t + 1, f) else (t, f + 1)
}, {
  case ((t1, f1), (t2, f2)) => (t1 + t2, f1 + f2)
}).mapValues {
  case (t, f) =>
    val total = (t + f).toDouble
    val trueRatio = t.toDouble / total
    val falseRatio = f.toDouble / total
    -trueRatio * math.log(trueRatio) / log2 + falseRatio * math.log(falseRatio) / log2
}

// entropy 是一个RDD[(String, Double)]
entropy foreach println
// (a,-0.1383458330929479)
// (b,0.1383458330929479)
英文:

All you really need to do is count by aggregating by data, depending on the 2 values that the boolean can take. The rest is a simple computation that only depends on these 2 values.

<!-- language: scala -->

val rdd = sc.parallelize(
  Seq(Element(true,&quot;a&quot;),Element(false,&quot;a&quot;),Element(true,&quot;a&quot;),
    Element(false,&quot;b&quot;),Element(false,&quot;b&quot;),Element(true,&quot;b&quot;)))

val log2 = math.log(2)

// calculate an RDD[(String, (Int, Int))], first element of the tuple is the number of &quot;true&quot;s, and the second the number of &quot;false&quot;s
val entropy = rdd.map(e =&gt; (e.data, e.target)).aggregateByKey((0, 0))({
  case ((t, f), target) =&gt; if (target) (t + 1, f) else (t, f + 1)
}, {
  case ((t1, f1), (t2, f2)) =&gt; (t1 + t2, f1 + f2)
}).mapValues {
  case (t, f) =&gt;
    val total = (t + f).toDouble
    val trueRatio = t.toDouble / total
    val falseRatio = f.toDouble / total
    -trueRatio * math.log(trueRatio) / log2 + falseRatio * math.log(falseRatio) / log2
}

// entropy is an RDD[(String, Double)]
entropy foreach println
// (a,-0.1383458330929479)
// (b,0.1383458330929479)

<!-- end snippet -->

答案2

得分: 0

以下是代码部分的中文翻译:

使用DataFrame进行答案

import spark.implicits._

val rdd = sc.parallelize(
      Seq(Element(true,"a"),Element(false,"a"),Element(true,"a"),
        Element(false,"b"),Element(false,"b"),Element(true,"b")))

val log2 = math.log(2)

var df = rdd.toDF()

val groupedData = df.groupBy($"data")
  .agg(count(when($"target" === true, 1)).alias("true"),count(when($"target" === false, 1)).alias("false"))
  .withColumn("total", $"true" + $"false")
  .withColumn("true ratio", $"true" / $"total")
  .withColumn("false ratio", $"false" / $"total")
  .withColumn("entropy", -$"true ratio" * log($"true ratio") / log2 + $"false ratio" * log($"false ratio") / log2)
  .show()

输出

+----+----+-----+-----+------------------+------------------+-------------------+
|data|true|false|total|        true ratio|       false ratio|            entropy|
+----+----+-----+-----+------------------+------------------+-------------------+
|   b|   1|    2|    3|0.3333333333333333|0.6666666666666666| 0.1383458330929479|
|   a|   2|    1|    3|0.6666666666666666|0.3333333333333333|-0.1383458330929479|
+----+----+-----+-----+------------------+------------------+-------------------+

希望这对您有所帮助。如果您需要进一步的翻译或解释,请随时告诉我。

英文:

An answer using a DataFrame:

import spark.implicits._
val rdd = sc.parallelize(
Seq(Element(true,&quot;a&quot;),Element(false,&quot;a&quot;),Element(true,&quot;a&quot;),
Element(false,&quot;b&quot;),Element(false,&quot;b&quot;),Element(true,&quot;b&quot;)))
val log2 = math.log(2)
var df = rdd.toDF()
val groupedData = df.groupBy($&quot;data&quot;)
.agg(count(when($&quot;target&quot; === true, 1)).alias(&quot;true&quot;),count(when($&quot;target&quot; === false, 1)).alias(&quot;false&quot;))
.withColumn(&quot;total&quot;, $&quot;true&quot; + $&quot;false&quot;)
.withColumn(&quot;true ratio&quot;, $&quot;true&quot; / $&quot;total&quot;)
.withColumn(&quot;false ratio&quot;, $&quot;false&quot; / $&quot;total&quot;)
.withColumn(&quot;entropy&quot;, -$&quot;true ratio&quot; * log($&quot;true ratio&quot;) / log2 + $&quot;false ratio&quot; * log($&quot;false ratio&quot;) / log2)
.show()

Output:

+----+----+-----+-----+------------------+------------------+-------------------+
|data|true|false|total|        true ratio|       false ratio|            entropy|
+----+----+-----+-----+------------------+------------------+-------------------+
|   b|   1|    2|    3|0.3333333333333333|0.6666666666666666| 0.1383458330929479|
|   a|   2|    1|    3|0.6666666666666666|0.3333333333333333|-0.1383458330929479|
+----+----+-----+-----+------------------+------------------+-------------------+

huangapple
  • 本文由 发表于 2020年1月6日 21:33:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/59613055.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定