使用Apache Spark高效地计算多个配对的笛卡尔积。

huangapple go评论78阅读模式
英文:

Taking the cartesian product over multiple pairs efficiently using Apache Spark

问题

## Old context
我正在对我的数据集进行计算需要将每个元素与自己组合即在`JavaPairRDD <Tuple2 <StringList <Double>>Tuple2 <StringList <Double>>>>`类型上执行`mapToPair`,这种组合是使用`cartesian`执行的如下所示
```java
JavaPairRDD <StringList <Double>> keyvals;
...
JavaPairRDD <Tuple2 <StringList <Double>>Tuple2 <StringList <Double>>>> combined = keyvals.cartesiankeyvals)。filtertpl - >tpl._1._1.equalstpl._2._1));

combined.mapToPairtpl - > {
   Tuple2 <StringString> ids = new Tuple2 <>tpl._1._1tpl._2._1;

    double result = calculateResulttpl._1._2tpl._2._2;

    return new Tuple2 <>idsresult;
})。filtertpl - > tpl._2> threshold)。saveAsTextFile(“result”);

New context

我现在已经扩展了方法calculateResult,以接受三种List <Double>类型(而不是上面示例中的两种)。这需要将数据集与自身组合两次。然而,在这里,cartesian似乎不足够。

因此,我的问题是:如何将我的数据(keyvals)与自身两次组合,实际上产生与JavaPairRDD <Tuple2 <...>,Tuple2 <...>,Tuple2 <...>>相匹配的内容(伪代码)。

我的目标是在每个交叉组合对上调用方法calculateResult(List <Double> s1,List <Double> s2,List <Double> s3)。我认为我可能没有采取正确的方法,尝试扩展我上面给出的示例使用Cartesian,但我不知道正确的下一步应该是什么。

不幸的是,我只能使用Spark Java 2.4.x。


<details>
<summary>英文:</summary>

## Old context
I am doing a calculation on my dataset that requires every element to be combined with itself, i.e. by performing `mapToPair` on a `JavaPairRDD&lt;Tuple2&lt;String, List&lt;Double&gt;&gt;, Tuple2&lt;String, List&lt;Double&gt;&gt;&gt;` type. This combination is performed using `cartesian` as per:
```java
JavaPairRDD&lt;String, List&lt;Double&gt;&gt; keyvals;
...
JavaPairRDD&lt;Tuple2&lt;String, List&lt;Double&gt;&gt;, Tuple2&lt;String, List&lt;Double&gt;&gt;&gt; combined = keyvals.cartesian(keyvals).filter(tpl -&gt; !tpl._1._1.equals(tpl._2._1));

combined.mapToPair(tpl -&gt; {
   Tuple2&lt;String, String&gt; ids = new Tuple2&lt;&gt;(tpl._1._1, tpl._2._1);

    double result = calculateResult(tpl._1._2, tpl._2._2);

    return new Tuple2&lt;&gt;(ids, result);
}).filter(tpl -&gt; tpl._2 &gt; threshold).saveAsTextFile(&quot;result&quot;);

New context

I have now expanded the method calculateResult to accept three List&lt;Double&gt; types (instead of two as in the example above). This requires the dataset to be combined with itself twice. Here, though, cartesian seems to fall short.

My question is thus: how can I combine my data (keyvals) with itself twice, essentially yielding something that matches JavaPairRDD&lt;Tuple2&lt;...&gt;, Tuple2&lt;...&gt;, Tuple2&lt;...&gt;&gt; (psuedocode).

My goal is to call the method calculateResult(List&lt;Double&gt; s1, List&lt;Double&gt; s2 ,List&lt;Double&gt; s3) on each cross-combined pair. I think I may not be taking the right approach by trying to expand the example that I have given above using Cartesian, but I don't know what would be the right steps forward.

Unfortunately, I am constrained to using only Spark Java 2.4.x.

答案1

得分: 1

我希望它有所帮助

我已经添加了代码内联注释以描述我试图做的事情
如果需要执行更多的笛卡尔积我已经将`Tuple3`添加为`List`
```Java
JavaPairRDD<List<String>, List<List<Double>>> result =
                keyvals.cartesian(keyvals)
                .filter(tpl -> !tpl._1._1.equals(tpl._2._1))
                        //执行第三个笛卡尔积
                .cartesian(keyvals)
                        //跳过第一个和第三个keyvals中的公共ID
                .filter(tpl -> !tpl._1._1._1.equals(tpl._2._1))
                        //将结果映射为ID列表:List<String>和值列表:List<List<Double>>
                .mapToPair((PairFunction<Tuple2<Tuple2<Tuple2<String, List<Double>>, Tuple2<String, List<Double>>>, Tuple2<String, List<Double>>>, Tuple2<String, List<Double>>>> tuple2Tuple2Tuple2 -> {

                    //将ID组合到单个列表中
                    List<String> keys = new ArrayList<>();
                    keys.add(tuple2Tuple2Tuple2._1._1._1);
                    keys.add(tuple2Tuple2Tuple2._1._2._1);
                    keys.add(tuple2Tuple2Tuple2._2._1);

                    //将值组合到单个列表中
                    List<List<Double>> values = new ArrayList<>();
                    values.add(tuple2Tuple2Tuple2._1._1._2);
                    values.add(tuple2Tuple2Tuple2._1._2._2);
                    values.add(tuple2Tuple2Tuple2._2._2);

                    //返回ID列表和值列表的元组,大小均为3
                    return new Tuple2<>(keys, values);
                });

        result.mapToPair(tpl -> {
            Tuple3<String, String, String> ids = new Tuple3<>(tpl._1.get(0), tpl._1.get(1), tpl._1.get(2));
            double result = calculateResult(tpl._2.get(0), tpl._2.get(1), tpl._2.get(2));
            return new Tuple2<>(ids, result);
        }).filter(tpl -> tpl._2 > threshold).saveAsTextFile("result");

注意:将Spark Java代码迁移到Spark Java DataFrames将缩短代码并降低复杂性


<details>
<summary>英文:</summary>
I hope it helps 
I have added code inline comments to describe what I am trying to do
I have purpose added `List` instead of `Tuple3` if incase you need to perform more `catesian joins` 

JavaPairRDD<List<String>, List<List<Double>>> result =
keyvals.cartesian(keyvals)
.filter(tpl -> !tpl._1._1.equals(tpl._2._1))
//Perform 3rd cartesian
.cartesian(keyvals)
//Skip the common ids from 1st and 3rd keyvals
.filter(tpl -> !tpl._1._1._1.equals(tpl._2._1))
//Map the result top Pair of Ids:List<String> and Values:List<List<Double>>
.mapToPair((PairFunction<Tuple2<Tuple2<Tuple2<String, List<Double>>, Tuple2<String, List<Double>>>, Tuple2<String, List<Double>>>, List<String>, List<List<Double>>>) tuple2Tuple2Tuple2 -> {

                //Combine Ids to single List
List&lt;String&gt; keys = new ArrayList&lt;&gt;();
keys.add(tuple2Tuple2Tuple2._1._1._1);
keys.add(tuple2Tuple2Tuple2._1._2._1);
keys.add(tuple2Tuple2Tuple2._2._1);
//Combine values to single List
List&lt;List&lt;Double&gt;&gt; values = new ArrayList&lt;&gt;();
values.add(tuple2Tuple2Tuple2._1._1._2);
values.add(tuple2Tuple2Tuple2._1._2._2);
values.add(tuple2Tuple2Tuple2._2._2);
//Return tuple of List of Ids and List of Values which are of fixed size 3
return new Tuple2&lt;&gt;(keys,values);
});
result.mapToPair(tpl -&gt; {
Tuple3&lt;String, String,String&gt; ids = new Tuple3&lt;&gt;(tpl._1.get(0), tpl._1.get(1), tpl._1.get(2));
double result = calculateResult(tpl._2.get(0), tpl._2.get(1),tpl._2.get(2));
return new Tuple2&lt;&gt;(ids, result);
}).filter(tpl -&gt; tpl._2 &gt; threshold).saveAsTextFile(&quot;result&quot;);

**Note : Migrating Spark Java code to Spark java DataFrames would shorten your code and reduce complexity**
</details>

huangapple
  • 本文由 发表于 2020年6月5日 20:58:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/62215906.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定