英文:
Taking the cartesian product over multiple pairs efficiently using Apache Spark
问题
## Old context
我正在对我的数据集进行计算,需要将每个元素与自己组合,即在`JavaPairRDD <Tuple2 <String,List <Double>>,Tuple2 <String,List <Double>>>>`类型上执行`mapToPair`,这种组合是使用`cartesian`执行的,如下所示:
```java
JavaPairRDD <String,List <Double>> keyvals;
...
JavaPairRDD <Tuple2 <String,List <Double>>,Tuple2 <String,List <Double>>>> combined = keyvals.cartesian(keyvals)。filter(tpl - >!tpl._1._1.equals(tpl._2._1));
combined.mapToPair(tpl - > {
Tuple2 <String,String> ids = new Tuple2 <>(tpl._1._1,tpl._2._1);
double result = calculateResult(tpl._1._2,tpl._2._2);
return new Tuple2 <>(ids,result);
})。filter(tpl - > tpl._2> threshold)。saveAsTextFile(“result”);
New context
我现在已经扩展了方法calculateResult
,以接受三种List <Double>
类型(而不是上面示例中的两种)。这需要将数据集与自身组合两次。然而,在这里,cartesian
似乎不足够。
因此,我的问题是:如何将我的数据(keyvals
)与自身两次组合,实际上产生与JavaPairRDD <Tuple2 <...>,Tuple2 <...>,Tuple2 <...>>
相匹配的内容(伪代码)。
我的目标是在每个交叉组合对上调用方法calculateResult(List <Double> s1,List <Double> s2,List <Double> s3)
。我认为我可能没有采取正确的方法,尝试扩展我上面给出的示例使用Cartesian,但我不知道正确的下一步应该是什么。
不幸的是,我只能使用Spark Java 2.4.x。
<details>
<summary>英文:</summary>
## Old context
I am doing a calculation on my dataset that requires every element to be combined with itself, i.e. by performing `mapToPair` on a `JavaPairRDD<Tuple2<String, List<Double>>, Tuple2<String, List<Double>>>` type. This combination is performed using `cartesian` as per:
```java
JavaPairRDD<String, List<Double>> keyvals;
...
JavaPairRDD<Tuple2<String, List<Double>>, Tuple2<String, List<Double>>> combined = keyvals.cartesian(keyvals).filter(tpl -> !tpl._1._1.equals(tpl._2._1));
combined.mapToPair(tpl -> {
Tuple2<String, String> ids = new Tuple2<>(tpl._1._1, tpl._2._1);
double result = calculateResult(tpl._1._2, tpl._2._2);
return new Tuple2<>(ids, result);
}).filter(tpl -> tpl._2 > threshold).saveAsTextFile("result");
New context
I have now expanded the method calculateResult
to accept three List<Double>
types (instead of two as in the example above). This requires the dataset to be combined with itself twice. Here, though, cartesian
seems to fall short.
My question is thus: how can I combine my data (keyvals
) with itself twice, essentially yielding something that matches JavaPairRDD<Tuple2<...>, Tuple2<...>, Tuple2<...>>
(psuedocode).
My goal is to call the method calculateResult(List<Double> s1, List<Double> s2 ,List<Double> s3)
on each cross-combined pair. I think I may not be taking the right approach by trying to expand the example that I have given above using Cartesian, but I don't know what would be the right steps forward.
Unfortunately, I am constrained to using only Spark Java 2.4.x.
答案1
得分: 1
我希望它有所帮助
我已经添加了代码内联注释,以描述我试图做的事情
如果需要执行更多的笛卡尔积,我已经将`Tuple3`添加为`List`
```Java
JavaPairRDD<List<String>, List<List<Double>>> result =
keyvals.cartesian(keyvals)
.filter(tpl -> !tpl._1._1.equals(tpl._2._1))
//执行第三个笛卡尔积
.cartesian(keyvals)
//跳过第一个和第三个keyvals中的公共ID
.filter(tpl -> !tpl._1._1._1.equals(tpl._2._1))
//将结果映射为ID列表:List<String>和值列表:List<List<Double>>
.mapToPair((PairFunction<Tuple2<Tuple2<Tuple2<String, List<Double>>, Tuple2<String, List<Double>>>, Tuple2<String, List<Double>>>, Tuple2<String, List<Double>>>> tuple2Tuple2Tuple2 -> {
//将ID组合到单个列表中
List<String> keys = new ArrayList<>();
keys.add(tuple2Tuple2Tuple2._1._1._1);
keys.add(tuple2Tuple2Tuple2._1._2._1);
keys.add(tuple2Tuple2Tuple2._2._1);
//将值组合到单个列表中
List<List<Double>> values = new ArrayList<>();
values.add(tuple2Tuple2Tuple2._1._1._2);
values.add(tuple2Tuple2Tuple2._1._2._2);
values.add(tuple2Tuple2Tuple2._2._2);
//返回ID列表和值列表的元组,大小均为3
return new Tuple2<>(keys, values);
});
result.mapToPair(tpl -> {
Tuple3<String, String, String> ids = new Tuple3<>(tpl._1.get(0), tpl._1.get(1), tpl._1.get(2));
double result = calculateResult(tpl._2.get(0), tpl._2.get(1), tpl._2.get(2));
return new Tuple2<>(ids, result);
}).filter(tpl -> tpl._2 > threshold).saveAsTextFile("result");
注意:将Spark Java代码迁移到Spark Java DataFrames将缩短代码并降低复杂性
<details>
<summary>英文:</summary>
I hope it helps
I have added code inline comments to describe what I am trying to do
I have purpose added `List` instead of `Tuple3` if incase you need to perform more `catesian joins`
JavaPairRDD<List<String>, List<List<Double>>> result =
keyvals.cartesian(keyvals)
.filter(tpl -> !tpl._1._1.equals(tpl._2._1))
//Perform 3rd cartesian
.cartesian(keyvals)
//Skip the common ids from 1st and 3rd keyvals
.filter(tpl -> !tpl._1._1._1.equals(tpl._2._1))
//Map the result top Pair of Ids:List<String> and Values:List<List<Double>>
.mapToPair((PairFunction<Tuple2<Tuple2<Tuple2<String, List<Double>>, Tuple2<String, List<Double>>>, Tuple2<String, List<Double>>>, List<String>, List<List<Double>>>) tuple2Tuple2Tuple2 -> {
//Combine Ids to single List
List<String> keys = new ArrayList<>();
keys.add(tuple2Tuple2Tuple2._1._1._1);
keys.add(tuple2Tuple2Tuple2._1._2._1);
keys.add(tuple2Tuple2Tuple2._2._1);
//Combine values to single List
List<List<Double>> values = new ArrayList<>();
values.add(tuple2Tuple2Tuple2._1._1._2);
values.add(tuple2Tuple2Tuple2._1._2._2);
values.add(tuple2Tuple2Tuple2._2._2);
//Return tuple of List of Ids and List of Values which are of fixed size 3
return new Tuple2<>(keys,values);
});
result.mapToPair(tpl -> {
Tuple3<String, String,String> ids = new Tuple3<>(tpl._1.get(0), tpl._1.get(1), tpl._1.get(2));
double result = calculateResult(tpl._2.get(0), tpl._2.get(1),tpl._2.get(2));
return new Tuple2<>(ids, result);
}).filter(tpl -> tpl._2 > threshold).saveAsTextFile("result");
**Note : Migrating Spark Java code to Spark java DataFrames would shorten your code and reduce complexity**
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论