如何在接收到 n 个元素后退出 akka 流?

huangapple go评论65阅读模式
英文:

How to exit akka stream after n elements recieved?

问题

以下是您提供的代码的翻译部分:

@Test
public void testReadingFromKinesisNRecords() throws ExecutionException, InterruptedException {
    final ActorSystem system = ActorSystem.create("foo");
    final Materializer materializer = ActorMaterializer.create(system);

    ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();

    final KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
        .credentialsProvider(profileCredentialsProvider)
        .region(Region.US_WEST_2)
        .httpClient(AkkaHttpClient.builder()
            .withActorSystem(system).build())
        .build();

    system.registerOnTermination(kinesisClient::close);

    String streamName = "akka-test-stream";
    String shardId = "shardId-000000000000";

    int numberOfRecordsToRead = 3;

    final ShardSettings settings = ShardSettings.create(streamName, shardId)
            .withRefreshInterval(Duration.ofSeconds(1))
            .withLimit(numberOfRecordsToRead) // 返回最多 n 条记录(然后退出?)
            .withShardIterator(ShardIterators.latest());

    final Source<Record, NotUsed> sourceKinesisBasic = KinesisSource.basic(settings, kinesisClient);

    Flow<Record, String, NotUsed> flowMapRecordToString = Flow.of(Record.class).map(record -> extractDataFromRecord(record));
    Flow<String, String, NotUsed> flowPrinter = Flow.of(String.class).map(s -> debugPrint(s));

    Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
        .via(flowMapRecordToString)
        .via(flowPrinter);

    Sink<String, CompletionStage<List<String>>> sink10 = Sink.takeLast(10);

    CompletionStage<List<String>> streamCompletion = sourceStringsFromKinesisRecords
        .runWith(sink10, materializer);
    CompletableFuture<List<String>> completableFuture = streamCompletion.toCompletableFuture();
    completableFuture.join(); // 永远不会停止运行...
    List<String> result = completableFuture.get();
    int foo = 1;
}

private String extractDataFromRecord(Record record) {
    String encType = record.encryptionTypeAsString();
    Instant arrivalTimestamp = record.approximateArrivalTimestamp();
    String data = record.data().asString(StandardCharsets.UTF_8);
    return data;
}

private String debugPrint(String s) {
    System.out.println(s);
    return s;
}

请注意,以上内容只是您提供的代码的翻译,不包括问题回答或其他附加信息。如果您有其他需要翻译的内容,请随时提供。

英文:

I'm brand new to Akka and I'm just trying to get the hang of it.

As an experiment, I want to read from a Kinesis stream and collect n messages and stop.

The only one I found that would stop reading records was Sink.head(). But that only returns one record, I'd like to get more than that.

I can't quite figure out how to stop reading from the stream after receiving the n messages though.

Here's the code I have tried so far

  @Test
public void testReadingFromKinesisNRecords() throws ExecutionException, InterruptedException {
final ActorSystem system = ActorSystem.create(&quot;foo&quot;);
final Materializer materializer = ActorMaterializer.create(system);
ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();
final KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
.credentialsProvider(profileCredentialsProvider)
.region(Region.US_WEST_2)
.httpClient(AkkaHttpClient.builder()
.withActorSystem(system).build())
.build();
system.registerOnTermination(kinesisClient::close);
String streamName = &quot;akka-test-stream&quot;;
String shardId = &quot;shardId-000000000000&quot;;
int numberOfRecordsToRead = 3;
final ShardSettings settings = ShardSettings.create(streamName, shardId)
.withRefreshInterval(Duration.ofSeconds(1))
.withLimit(numberOfRecordsToRead) // return a maximum of n records (and quit?!)
.withShardIterator(ShardIterators.latest());
final Source&lt;Record, NotUsed&gt; sourceKinesisBasic = KinesisSource.basic(settings, kinesisClient);
Flow&lt;Record, String, NotUsed&gt; flowMapRecordToString = Flow.of(Record.class).map(record -&gt; extractDataFromRecord(record));
Flow&lt;String, String, NotUsed&gt; flowPrinter = Flow.of(String.class).map(s -&gt; debugPrint(s));
//    Flow&lt;String, List&lt;String&gt;, NotUsed&gt; flowGroupedWithinMinute =
//        Flow.of(String.class).groupedWithin(
//            numberOfRecordsToRead, // group size
//            Duration.ofSeconds(60) // group time
//        );
Source&lt;String, NotUsed&gt; sourceStringsFromKinesisRecords = sourceKinesisBasic
.via(flowMapRecordToString)
.via(flowPrinter);
//        .via(flowGroupedWithinMinute); // nope
// sink to list of strings
//    Sink&lt;String, CompletionStage&lt;List&lt;String&gt;&gt;&gt; sinkToList = Sink.seq();
Sink&lt;String, CompletionStage&lt;List&lt;String&gt;&gt;&gt; sink10 = Sink.takeLast(10);
//    Sink&lt;String, CompletionStage&lt;String&gt;&gt; sinkHead = Sink.head(); // only gives you one message
CompletionStage&lt;List&lt;String&gt;&gt; streamCompletion = sourceStringsFromKinesisRecords
.runWith(sink10, materializer);
CompletableFuture&lt;List&lt;String&gt;&gt; completableFuture = streamCompletion.toCompletableFuture();
completableFuture.join(); // never stops running...
List&lt;String&gt; result = completableFuture.get();
int foo = 1;
}
private String extractDataFromRecord(Record record) {
String encType = record.encryptionTypeAsString();
Instant arrivalTimestamp = record.approximateArrivalTimestamp();
String data = record.data().asString(StandardCharsets.UTF_8);
return data;
}
private String debugPrint(String s) {
System.out.println(s);
return s;
}

Thank you for any clues

答案1

得分: 1

我发现答案是在流程级别使用 takeN

...
Flow<String, String, NotUsed> flowTakeN = Flow.of(String.class).take(numberOfRecordsToRead);

Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
    .via(flowMapRecordToString)
    .via(flowPrinter)
    .via(flowTakeN);
...
英文:

I found out the answer is to use a takeN at the flow level

...
Flow&lt;String, String, NotUsed&gt; flowTakeN = Flow.of(String.class).take(numberOfRecordsToRead);
Source&lt;String, NotUsed&gt; sourceStringsFromKinesisRecords = sourceKinesisBasic
.via(flowMapRecordToString)
.via(flowPrinter)
.via(flowTakeN);
...

答案2

得分: 1

除了你找到的答案之外,还可以更直接地表达,不使用 via

Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
    .map(record -> extractDataFromRecord(record))
    .map(s -> debugPrint(s))
    .take(10)
英文:

Just to add on to the answer you found, it is also possible to express things more directly without via:

Source&lt;String, NotUsed&gt; sourceStringsFromKinesisRecords = sourceKinesisBasic
    .map(record -&gt; extractDataFromRecord(record))
    .map(s -&gt; debugPrint(s))
    .take(10)

huangapple
  • 本文由 发表于 2020年9月26日 09:01:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/64072936.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定