如何在Flink中添加同一数据流的多个消费者

huangapple go评论60阅读模式
英文:

How to add multiple consumer of same data stream in Flink

问题

我正在努力实现以下问题的解决方案。

有一个名为sushistream的Kinesis流,从流中获取的事件将包含两个内容,即broadcastId和eventID。现在,我希望为每个不同的timeWindow中的每个broadcast的eventID获取滑动窗口计数。

我尝试了以下解决方案:

	DataStream<String> stream = createKinesisSource(env, parameter);
	log.info("Kinesis stream created.");

	ObjectMapper objectMapper = new ObjectMapper();
	DataStream<AnnouncementEvent> promoCodeStream = stream.map(record -> {
	        AnnouncementEvent event = objectMapper.readValue(record, AnnouncementEvent.class);
	        return event;
	}).filter(Objects::nonNull).filter(new PromoCodeEventFilter()).uid("promo-code-stream"); // PromoCodeEventFilter将筛选eventID = "PROMO_CODE_CLICK"的事件

	DataStream<AnnouncementEvent> shareIconClickStream = stream.map(record -> {
		AnnouncementEvent event = objectMapper.readValue(record, AnnouncementEvent.class);
	}).filter(Objects::nonNull).filter(new ShareIconEventFilter()).uid("share-icon-stream");  // ShareIconEventFilter将筛选eventID = "PROMO_CODE_CLICK"的事件


	DataStream<AnnouncementEventTypeCount> resultForPromoCodeClick = promoCodeStream
			.keyBy(AnnouncementEvent::getBroadcastID)
			.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
			.aggregate(new AnnouncementEventCounter()).uid("promo-code-result-stream");

	DataStream<AnnouncementEventTypeCount> resultForShareIconClick = shareIconClickStream
			.keyBy(AnnouncementEvent::getBroadcastID)
			.window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)))
			.aggregate(new AnnouncementEventCounter()).uid("share-icon-result-stream");

	resultForShareIconClick.union(resultForPromoCodeClick);
	resultForShareIconClick.addSink(createLamdbaSinkFromStaticConfig()).uid("result-published");

	env.execute("Kinesis Data Analytics Flink Application with Session Window and Aggregate Function");

我在这里使用的是flink版本1.11,当我将此代码部署到S3存储桶并尝试运行它时,反复出现以下错误:

Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint s3://90adba2e0a9002371abdf5ee58da7883e350fe4d/3b3b06e55d15e42e945da7e868ce748b-135974748953-1690290221232/savepoints/61/attempt-1/savepoint-3b3b06-c15547ca2dff. Cannot map checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI. at org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:210) at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:180) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1354) at org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:315) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:268) at org.apache.flink.runtime.scheduler.SchedulerBase.&lt;init&gt;(SchedulerBase.java:236) at org.apache.flink.runtime.scheduler.DefaultScheduler.&lt;init&gt;(DefaultScheduler.java:123) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:297) at org.apache.flink.runtime.jobmaster.JobMaster.&lt;init&gt;(JobMaster.java:284) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.&lt;init&gt;(JobManagerRunnerImpl.java:140) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:391) ... 7 more
最好有一个类似的示例,其中有人可能解决了类似的问题。

我已经多次部署并尝试使用现有答案的各种解决方案,但这些都没有帮助到我。

英文:

I am struggling with implementing solution where the problem is as below.

There is one Kinesis stream named sushistream, event comes from the stream would contains two things, i.e. broadcastId and eventID. Now I want sliding window count for every broadcast per eventID for different timeWindow.

I have tried to implemented solution like,

	DataStream&lt;String&gt; stream = createKinesisSource(env, parameter);
log.info(&quot;Kinesis stream created.&quot;);
ObjectMapper objectMapper = new ObjectMapper();
DataStream&lt;AnnouncementEvent&gt; promoCodeStream = stream.map(record -&gt; {
AnnouncementEvent event = objectMapper.readValue(record, AnnouncementEvent.class);
return event;
}).filter(Objects::nonNull).filter(new PromoCodeEventFilter()).uid(&quot;promo-code-stream&quot;); // PromoCodeEventFilter will filter the events where eventID = &quot;PROMO_CODE_CLICK&quot;
DataStream&lt;AnnouncementEvent&gt; shareIconClickStream = stream.map(record -&gt; {
AnnouncementEvent event = objectMapper.readValue(record, AnnouncementEvent.class);
}).filter(Objects::nonNull).filter(new ShareIconEventFilter()).uid(&quot;share-icon-stream&quot;);  // ShareIconEventFilter will filter the events where eventID = &quot;PROMO_CODE_CLICK&quot;
DataStream&lt;AnnouncementEventTypeCount&gt; resultForPromoCodeClick = promoCodeStream
.keyBy(AnnouncementEvent::getBroadcastID)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new AnnouncementEventCounter()).uid(&quot;promo-code-result-stream&quot;);
DataStream&lt;AnnouncementEventTypeCount&gt; resultForShareIconClick = shareIconClickStream
.keyBy(AnnouncementEvent::getBroadcastID)
.window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)))
.aggregate(new AnnouncementEventCounter()).uid(&quot;share-icon-result-stream&quot;);
resultForShareIconClick.union(resultForPromoCodeClick);
resultForShareIconClick.addSink(createLamdbaSinkFromStaticConfig()).uid(&quot;result-published&quot;);
env.execute(&quot;Kinesis Data Analytics Flink Application with  Session Window and Aggregate Function&quot;);

I am using flink version 1.11 here, When I am deploying this code to S3 bucket and trying to run it then repeatedly getting error like,

Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint s3://90adba2e0a9002371abdf5ee58da7883e350fe4d/3b3b06e55d15e42e945da7e868ce748b-135974748953-1690290221232/savepoints/61/attempt-1/savepoint-3b3b06-c15547ca2dff. Cannot map checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI. at org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:210) at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:180) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1354) at org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:315) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:268) at org.apache.flink.runtime.scheduler.SchedulerBase.&lt;init&gt;(SchedulerBase.java:236) at org.apache.flink.runtime.scheduler.DefaultScheduler.&lt;init&gt;(DefaultScheduler.java:123) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:297) at org.apache.flink.runtime.jobmaster.JobMaster.&lt;init&gt;(JobMaster.java:284) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.&lt;init&gt;(JobManagerRunnerImpl.java:140) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:391) ... 7 more
It would be better to have some similar example where someone might solved similar problem.

I have did deployment multiple times with various solution from existing answers, but that haven't helped me.

答案1

得分: 0

代码部分不要翻译。以下是翻译好的部分:

"The code looks fine. The reason why the job doesn't run is that you are trying to run this a version of your application using state saved in a checkpoint or savepoint taken with an older version, when the job graph was different. There's state in that checkpoint/savepoint that can't be restored because there's an operation in the old job that no longer exists."

英文:

The code looks fine. The reason why the job doesn't run is that you are trying to run this a version of your application using state saved in a checkpoint or savepoint taken with a older version, when the job graph was different. There's state in that checkpoint/savepoint that can't be restored, because there's an operation in the old job that no longer exists.

huangapple
  • 本文由 发表于 2023年7月28日 03:58:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/76783028.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定