英文:
How to exit akka stream after n elements recieved?
问题
以下是您提供的代码的翻译部分:
@Test
public void testReadingFromKinesisNRecords() throws ExecutionException, InterruptedException {
final ActorSystem system = ActorSystem.create("foo");
final Materializer materializer = ActorMaterializer.create(system);
ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();
final KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
.credentialsProvider(profileCredentialsProvider)
.region(Region.US_WEST_2)
.httpClient(AkkaHttpClient.builder()
.withActorSystem(system).build())
.build();
system.registerOnTermination(kinesisClient::close);
String streamName = "akka-test-stream";
String shardId = "shardId-000000000000";
int numberOfRecordsToRead = 3;
final ShardSettings settings = ShardSettings.create(streamName, shardId)
.withRefreshInterval(Duration.ofSeconds(1))
.withLimit(numberOfRecordsToRead) // 返回最多 n 条记录(然后退出?)
.withShardIterator(ShardIterators.latest());
final Source<Record, NotUsed> sourceKinesisBasic = KinesisSource.basic(settings, kinesisClient);
Flow<Record, String, NotUsed> flowMapRecordToString = Flow.of(Record.class).map(record -> extractDataFromRecord(record));
Flow<String, String, NotUsed> flowPrinter = Flow.of(String.class).map(s -> debugPrint(s));
Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
.via(flowMapRecordToString)
.via(flowPrinter);
Sink<String, CompletionStage<List<String>>> sink10 = Sink.takeLast(10);
CompletionStage<List<String>> streamCompletion = sourceStringsFromKinesisRecords
.runWith(sink10, materializer);
CompletableFuture<List<String>> completableFuture = streamCompletion.toCompletableFuture();
completableFuture.join(); // 永远不会停止运行...
List<String> result = completableFuture.get();
int foo = 1;
}
private String extractDataFromRecord(Record record) {
String encType = record.encryptionTypeAsString();
Instant arrivalTimestamp = record.approximateArrivalTimestamp();
String data = record.data().asString(StandardCharsets.UTF_8);
return data;
}
private String debugPrint(String s) {
System.out.println(s);
return s;
}
请注意,以上内容只是您提供的代码的翻译,不包括问题回答或其他附加信息。如果您有其他需要翻译的内容,请随时提供。
英文:
I'm brand new to Akka and I'm just trying to get the hang of it.
As an experiment, I want to read from a Kinesis stream and collect n messages and stop.
The only one I found that would stop reading records was Sink.head(). But that only returns one record, I'd like to get more than that.
I can't quite figure out how to stop reading from the stream after receiving the n messages though.
Here's the code I have tried so far
@Test
public void testReadingFromKinesisNRecords() throws ExecutionException, InterruptedException {
final ActorSystem system = ActorSystem.create("foo");
final Materializer materializer = ActorMaterializer.create(system);
ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();
final KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
.credentialsProvider(profileCredentialsProvider)
.region(Region.US_WEST_2)
.httpClient(AkkaHttpClient.builder()
.withActorSystem(system).build())
.build();
system.registerOnTermination(kinesisClient::close);
String streamName = "akka-test-stream";
String shardId = "shardId-000000000000";
int numberOfRecordsToRead = 3;
final ShardSettings settings = ShardSettings.create(streamName, shardId)
.withRefreshInterval(Duration.ofSeconds(1))
.withLimit(numberOfRecordsToRead) // return a maximum of n records (and quit?!)
.withShardIterator(ShardIterators.latest());
final Source<Record, NotUsed> sourceKinesisBasic = KinesisSource.basic(settings, kinesisClient);
Flow<Record, String, NotUsed> flowMapRecordToString = Flow.of(Record.class).map(record -> extractDataFromRecord(record));
Flow<String, String, NotUsed> flowPrinter = Flow.of(String.class).map(s -> debugPrint(s));
// Flow<String, List<String>, NotUsed> flowGroupedWithinMinute =
// Flow.of(String.class).groupedWithin(
// numberOfRecordsToRead, // group size
// Duration.ofSeconds(60) // group time
// );
Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
.via(flowMapRecordToString)
.via(flowPrinter);
// .via(flowGroupedWithinMinute); // nope
// sink to list of strings
// Sink<String, CompletionStage<List<String>>> sinkToList = Sink.seq();
Sink<String, CompletionStage<List<String>>> sink10 = Sink.takeLast(10);
// Sink<String, CompletionStage<String>> sinkHead = Sink.head(); // only gives you one message
CompletionStage<List<String>> streamCompletion = sourceStringsFromKinesisRecords
.runWith(sink10, materializer);
CompletableFuture<List<String>> completableFuture = streamCompletion.toCompletableFuture();
completableFuture.join(); // never stops running...
List<String> result = completableFuture.get();
int foo = 1;
}
private String extractDataFromRecord(Record record) {
String encType = record.encryptionTypeAsString();
Instant arrivalTimestamp = record.approximateArrivalTimestamp();
String data = record.data().asString(StandardCharsets.UTF_8);
return data;
}
private String debugPrint(String s) {
System.out.println(s);
return s;
}
Thank you for any clues
答案1
得分: 1
我发现答案是在流程级别使用 takeN
...
Flow<String, String, NotUsed> flowTakeN = Flow.of(String.class).take(numberOfRecordsToRead);
Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
.via(flowMapRecordToString)
.via(flowPrinter)
.via(flowTakeN);
...
英文:
I found out the answer is to use a takeN at the flow level
...
Flow<String, String, NotUsed> flowTakeN = Flow.of(String.class).take(numberOfRecordsToRead);
Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
.via(flowMapRecordToString)
.via(flowPrinter)
.via(flowTakeN);
...
答案2
得分: 1
除了你找到的答案之外,还可以更直接地表达,不使用 via
:
Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
.map(record -> extractDataFromRecord(record))
.map(s -> debugPrint(s))
.take(10)
英文:
Just to add on to the answer you found, it is also possible to express things more directly without via
:
Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
.map(record -> extractDataFromRecord(record))
.map(s -> debugPrint(s))
.take(10)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论