阅读由Spark Redis保存的数据,使用Java。

huangapple go评论92阅读模式
英文:

Read data saved by spark redis using Java

问题

  1. 使用 [spark-redis][1] Dataset 保存到 Redis
  2. 然后我使用 [Spring data redis][2] 读取这些数据
  3. 我保存到 Redis 的对象
  4. @Getter
  5. @Setter
  6. @AllArgsConstructor
  7. @NoArgsConstructor
  8. @Builder
  9. @RedisHash("collaborative_filtering")
  10. public class RatingResult implements Serializable {
  11. private static final long serialVersionUID = 8755574422193819444L;
  12. @Id
  13. private String id;
  14. @Indexed
  15. private int user;
  16. @Indexed
  17. private String product;
  18. private double productN;
  19. private double rating;
  20. private float prediction;
  21. public static RatingResult convert(Row row) {
  22. int user = row.getAs("user");
  23. String product = row.getAs("product");
  24. double productN = row.getAs("productN");
  25. double rating = row.getAs("rating");
  26. float prediction = row.getAs("prediction");
  27. String id = user + product;
  28. return RatingResult.builder().id(id).user(user).product(product).productN(productN).rating(rating)
  29. .prediction(prediction).build();
  30. }
  31. }
  32. 使用 spark-redis 进行对象保存
  33. JavaRDD<RatingResult> result = ...
  34. ...
  35. sparkSession.createDataFrame(result, RatingResult.class).write().format("org.apache.spark.sql.redis")
  36. .option("table", "collaborative_filtering").mode(SaveMode.Overwrite).save();
  37. Repository
  38. @Repository
  39. public interface RatingResultRepository extends JpaRepository<RatingResult, String> {
  40. }
  41. 我无法使用 Spring data redis 读取通过 spark-redis 保存在 Redis 中的数据因为结构数据由于 spark-redis spring data redis 的不同而不同我使用命令进行检查`redis-cli -p 6379 keys \*` `redis-cli hgetall $key`spark-redis spring data redis 创建的键的值不同
  42. 那么如何使用 Java 或者 Java 中的任何库来读取已经保存的这些数据呢
  43. [1]: https://github.com/RedisLabs/spark-redis/blob/master/doc/java.md
  44. [2]: https://www.baeldung.com/spring-data-redis-tutorial
英文:

I using spark-redis to save Dataset to Redis.
Then I read this data by using Spring data redis:

This object I save to redis:

  1. @Getter
  2. @Setter
  3. @AllArgsConstructor
  4. @NoArgsConstructor
  5. @Builder
  6. @RedisHash(&quot;collaborative_filtering&quot;)
  7. public class RatingResult implements Serializable {
  8. private static final long serialVersionUID = 8755574422193819444L;
  9. @Id
  10. private String id;
  11. @Indexed
  12. private int user;
  13. @Indexed
  14. private String product;
  15. private double productN;
  16. private double rating;
  17. private float prediction;
  18. public static RatingResult convert(Row row) {
  19. int user = row.getAs(&quot;user&quot;);
  20. String product = row.getAs(&quot;product&quot;);
  21. double productN = row.getAs(&quot;productN&quot;);
  22. double rating = row.getAs(&quot;rating&quot;);
  23. float prediction = row.getAs(&quot;prediction&quot;);
  24. String id = user + product;
  25. return RatingResult.builder().id(id).user(user).product(product).productN(productN).rating(rating)
  26. .prediction(prediction).build();
  27. }
  28. }

Save object by using spark-redis:

  1. JavaRDD&lt;RatingResult&gt; result = ...
  2. ...
  3. sparkSession.createDataFrame(result, RatingResult.class).write().format(&quot;org.apache.spark.sql.redis&quot;)
  4. .option(&quot;table&quot;, &quot;collaborative_filtering&quot;).mode(SaveMode.Overwrite).save();

Repository:

  1. @Repository
  2. public interface RatingResultRepository extends JpaRepository&lt;RatingResult, String&gt; {
  3. }

I can't read this data have been saved in Redis by using Spring data redis because structure data saved by spark-redis and spring data redis not same (I checked value of keys created by spark-redis and spring data redis are different by using command: redis-cli -p 6379 keys \* and redis-cli hgetall $key)

So how to read this data have been saved using Java or by any library in Java?

答案1

得分: 1

以下对我有效。

从Spark-Redis中写入数据。

我在这里使用Scala,但与您在Java中所做的基本相同。我唯一更改的是我添加了.option("key.column", "id")来指定哈希ID。

  1. val ratingResult = new RatingResult("1", 1, "product1", 2.0, 3.0, 4)
  2. val result: JavaRDD[RatingResult] = spark.sparkContext.parallelize(Seq(ratingResult)).toJavaRDD()
  3. spark
  4. .createDataFrame(result, classOf[RatingResult])
  5. .write
  6. .format("org.apache.spark.sql.redis")
  7. .option("key.column", "id")
  8. .option("table", "collaborative_filtering")
  9. .mode(SaveMode.Overwrite)
  10. .save()

在spring-data-redis中,我有以下内容:

  1. @Getter
  2. @Setter
  3. @AllArgsConstructor
  4. @NoArgsConstructor
  5. @Builder
  6. @RedisHash("collaborative_filtering")
  7. public class RatingResult implements Serializable {
  8. private static final long serialVersionUID = 8755574422193819444L;
  9. @Id
  10. private String id;
  11. @Indexed
  12. private int user;
  13. @Indexed
  14. private String product;
  15. private double productN;
  16. private double rating;
  17. private float prediction;
  18. @Override
  19. public String toString() {
  20. return "RatingResult{" +
  21. "id='" + id + '\'' +
  22. ", user=" + user +
  23. ", product='" + product + '\'' +
  24. ", productN=" + productN +
  25. ", rating=" + rating +
  26. ", prediction=" + prediction +
  27. '}';
  28. }
  29. }

我使用CrudRepository而不是JPA:

  1. @Repository
  2. public interface RatingResultRepository extends CrudRepository<RatingResult, String> {
  3. }

查询:

  1. RatingResult found = ratingResultRepository.findById("1").get();
  2. System.out.println("found = " + found);

输出:

  1. found = RatingResult{id='null', user=1, product='product1', productN=2.0, rating=3.0, prediction=4.0}

您可能注意到,id字段未填充,因为spark-redis存储具有哈希ID,而不是作为哈希属性。

英文:

The following works for me.

Writing data from spark-redis.

I use Scala here, but it's essentially the same as you do in Java. The only thing I changed is I added a .option(&quot;key.column&quot;, &quot;id&quot;) to specify the hash id.

  1. val ratingResult = new RatingResult(&quot;1&quot;, 1, &quot;product1&quot;, 2.0, 3.0, 4)
  2. val result: JavaRDD[RatingResult] = spark.sparkContext.parallelize(Seq(ratingResult)).toJavaRDD()
  3. spark
  4. .createDataFrame(result, classOf[RatingResult])
  5. .write
  6. .format(&quot;org.apache.spark.sql.redis&quot;)
  7. .option(&quot;key.column&quot;, &quot;id&quot;)
  8. .option(&quot;table&quot;, &quot;collaborative_filtering&quot;)
  9. .mode(SaveMode.Overwrite)
  10. .save()

In spring-data-redis I have the following:

  1. @Getter
  2. @Setter
  3. @AllArgsConstructor
  4. @NoArgsConstructor
  5. @Builder
  6. @RedisHash(&quot;collaborative_filtering&quot;)
  7. public class RatingResult implements Serializable {
  8. private static final long serialVersionUID = 8755574422193819444L;
  9. @Id
  10. private String id;
  11. @Indexed
  12. private int user;
  13. @Indexed
  14. private String product;
  15. private double productN;
  16. private double rating;
  17. private float prediction;
  18. @Override
  19. public String toString() {
  20. return &quot;RatingResult{&quot; +
  21. &quot;id=&#39;&quot; + id + &#39;\&#39;&#39; +
  22. &quot;, user=&quot; + user +
  23. &quot;, product=&#39;&quot; + product + &#39;\&#39;&#39; +
  24. &quot;, productN=&quot; + productN +
  25. &quot;, rating=&quot; + rating +
  26. &quot;, prediction=&quot; + prediction +
  27. &#39;}&#39;;
  28. }
  29. }

I use CrudRepository instead of JPA:

  1. @Repository
  2. public interface RatingResultRepository extends CrudRepository&lt;RatingResult, String&gt; {
  3. }

Querying:

  1. RatingResult found = ratingResultRepository.findById(&quot;1&quot;).get();
  2. System.out.println(&quot;found = &quot; + found);

The output:

  1. found = RatingResult{id=&#39;null&#39;, user=1, product=&#39;product1&#39;, productN=2.0, rating=3.0, prediction=4.0}

You may notice that the id field was not populated because the spark-redis stored has a hash id and not as a hash attribute.

huangapple
  • 本文由 发表于 2020年9月1日 18:39:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/63686047.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定