如何在接收到 n 个元素后退出 akka 流?

huangapple go评论108阅读模式
英文:

How to exit akka stream after n elements recieved?

问题

以下是您提供的代码的翻译部分:

  1. @Test
  2. public void testReadingFromKinesisNRecords() throws ExecutionException, InterruptedException {
  3. final ActorSystem system = ActorSystem.create("foo");
  4. final Materializer materializer = ActorMaterializer.create(system);
  5. ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();
  6. final KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
  7. .credentialsProvider(profileCredentialsProvider)
  8. .region(Region.US_WEST_2)
  9. .httpClient(AkkaHttpClient.builder()
  10. .withActorSystem(system).build())
  11. .build();
  12. system.registerOnTermination(kinesisClient::close);
  13. String streamName = "akka-test-stream";
  14. String shardId = "shardId-000000000000";
  15. int numberOfRecordsToRead = 3;
  16. final ShardSettings settings = ShardSettings.create(streamName, shardId)
  17. .withRefreshInterval(Duration.ofSeconds(1))
  18. .withLimit(numberOfRecordsToRead) // 返回最多 n 条记录(然后退出?)
  19. .withShardIterator(ShardIterators.latest());
  20. final Source<Record, NotUsed> sourceKinesisBasic = KinesisSource.basic(settings, kinesisClient);
  21. Flow<Record, String, NotUsed> flowMapRecordToString = Flow.of(Record.class).map(record -> extractDataFromRecord(record));
  22. Flow<String, String, NotUsed> flowPrinter = Flow.of(String.class).map(s -> debugPrint(s));
  23. Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
  24. .via(flowMapRecordToString)
  25. .via(flowPrinter);
  26. Sink<String, CompletionStage<List<String>>> sink10 = Sink.takeLast(10);
  27. CompletionStage<List<String>> streamCompletion = sourceStringsFromKinesisRecords
  28. .runWith(sink10, materializer);
  29. CompletableFuture<List<String>> completableFuture = streamCompletion.toCompletableFuture();
  30. completableFuture.join(); // 永远不会停止运行...
  31. List<String> result = completableFuture.get();
  32. int foo = 1;
  33. }
  34. private String extractDataFromRecord(Record record) {
  35. String encType = record.encryptionTypeAsString();
  36. Instant arrivalTimestamp = record.approximateArrivalTimestamp();
  37. String data = record.data().asString(StandardCharsets.UTF_8);
  38. return data;
  39. }
  40. private String debugPrint(String s) {
  41. System.out.println(s);
  42. return s;
  43. }

请注意,以上内容只是您提供的代码的翻译,不包括问题回答或其他附加信息。如果您有其他需要翻译的内容,请随时提供。

英文:

I'm brand new to Akka and I'm just trying to get the hang of it.

As an experiment, I want to read from a Kinesis stream and collect n messages and stop.

The only one I found that would stop reading records was Sink.head(). But that only returns one record, I'd like to get more than that.

I can't quite figure out how to stop reading from the stream after receiving the n messages though.

Here's the code I have tried so far

  1. @Test
  2. public void testReadingFromKinesisNRecords() throws ExecutionException, InterruptedException {
  3. final ActorSystem system = ActorSystem.create(&quot;foo&quot;);
  4. final Materializer materializer = ActorMaterializer.create(system);
  5. ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();
  6. final KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
  7. .credentialsProvider(profileCredentialsProvider)
  8. .region(Region.US_WEST_2)
  9. .httpClient(AkkaHttpClient.builder()
  10. .withActorSystem(system).build())
  11. .build();
  12. system.registerOnTermination(kinesisClient::close);
  13. String streamName = &quot;akka-test-stream&quot;;
  14. String shardId = &quot;shardId-000000000000&quot;;
  15. int numberOfRecordsToRead = 3;
  16. final ShardSettings settings = ShardSettings.create(streamName, shardId)
  17. .withRefreshInterval(Duration.ofSeconds(1))
  18. .withLimit(numberOfRecordsToRead) // return a maximum of n records (and quit?!)
  19. .withShardIterator(ShardIterators.latest());
  20. final Source&lt;Record, NotUsed&gt; sourceKinesisBasic = KinesisSource.basic(settings, kinesisClient);
  21. Flow&lt;Record, String, NotUsed&gt; flowMapRecordToString = Flow.of(Record.class).map(record -&gt; extractDataFromRecord(record));
  22. Flow&lt;String, String, NotUsed&gt; flowPrinter = Flow.of(String.class).map(s -&gt; debugPrint(s));
  23. // Flow&lt;String, List&lt;String&gt;, NotUsed&gt; flowGroupedWithinMinute =
  24. // Flow.of(String.class).groupedWithin(
  25. // numberOfRecordsToRead, // group size
  26. // Duration.ofSeconds(60) // group time
  27. // );
  28. Source&lt;String, NotUsed&gt; sourceStringsFromKinesisRecords = sourceKinesisBasic
  29. .via(flowMapRecordToString)
  30. .via(flowPrinter);
  31. // .via(flowGroupedWithinMinute); // nope
  32. // sink to list of strings
  33. // Sink&lt;String, CompletionStage&lt;List&lt;String&gt;&gt;&gt; sinkToList = Sink.seq();
  34. Sink&lt;String, CompletionStage&lt;List&lt;String&gt;&gt;&gt; sink10 = Sink.takeLast(10);
  35. // Sink&lt;String, CompletionStage&lt;String&gt;&gt; sinkHead = Sink.head(); // only gives you one message
  36. CompletionStage&lt;List&lt;String&gt;&gt; streamCompletion = sourceStringsFromKinesisRecords
  37. .runWith(sink10, materializer);
  38. CompletableFuture&lt;List&lt;String&gt;&gt; completableFuture = streamCompletion.toCompletableFuture();
  39. completableFuture.join(); // never stops running...
  40. List&lt;String&gt; result = completableFuture.get();
  41. int foo = 1;
  42. }
  43. private String extractDataFromRecord(Record record) {
  44. String encType = record.encryptionTypeAsString();
  45. Instant arrivalTimestamp = record.approximateArrivalTimestamp();
  46. String data = record.data().asString(StandardCharsets.UTF_8);
  47. return data;
  48. }
  49. private String debugPrint(String s) {
  50. System.out.println(s);
  51. return s;
  52. }

Thank you for any clues

答案1

得分: 1

我发现答案是在流程级别使用 takeN

  1. ...
  2. Flow<String, String, NotUsed> flowTakeN = Flow.of(String.class).take(numberOfRecordsToRead);
  3. Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
  4. .via(flowMapRecordToString)
  5. .via(flowPrinter)
  6. .via(flowTakeN);
  7. ...
英文:

I found out the answer is to use a takeN at the flow level

  1. ...
  2. Flow&lt;String, String, NotUsed&gt; flowTakeN = Flow.of(String.class).take(numberOfRecordsToRead);
  3. Source&lt;String, NotUsed&gt; sourceStringsFromKinesisRecords = sourceKinesisBasic
  4. .via(flowMapRecordToString)
  5. .via(flowPrinter)
  6. .via(flowTakeN);
  7. ...

答案2

得分: 1

除了你找到的答案之外,还可以更直接地表达,不使用 via

  1. Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
  2. .map(record -> extractDataFromRecord(record))
  3. .map(s -> debugPrint(s))
  4. .take(10)
英文:

Just to add on to the answer you found, it is also possible to express things more directly without via:

  1. Source&lt;String, NotUsed&gt; sourceStringsFromKinesisRecords = sourceKinesisBasic
  2. .map(record -&gt; extractDataFromRecord(record))
  3. .map(s -&gt; debugPrint(s))
  4. .take(10)

huangapple
  • 本文由 发表于 2020年9月26日 09:01:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/64072936.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定