合并来自事件桥的多个事件

huangapple go评论57阅读模式
英文:

Combine several events from event-bridge

问题

我有一些工作需要两个S3对象。对象A由另一个系统上传;我需要生成对象B。

实际上,并不只有一个对象A,而是有若干个(A1,A2,A3)。每个对象由外部系统随时上传。对于每个对象A,都需要启动一个工作实例。

另一方面,对象B在指定的一段时间内保持不变,之后我需要重新生成它。生成对象需要时间。

我可以使用“EventBridge Scheduler”来生成对象B,也可以使用事件桥来触发每个被上传的对象Ax的事件。

我的问题是如何组合这两个事件,以便只有在对象B生成后,并且对象Ax被上传后才启动作业,确保每次上传对象Ax时都会启动一个作业。

(类似于JavaScript中的Promise.all

英文:

I have some work that needs two s3 objects. Object A is uploaded by another system; I have to generate the Object B.

In fact, there is not one Object A, but several (A1, A2, A3). Each one is uploaded by an external system at any time. For each object A, another instance of the work has to be launched.

On the other hand, Object B remains the same for a specified period time, after which I have to regenerate it. Generating the object takes time.

I can use EventBridge Scheduler to generate the object B, and I can also use event bridge to fire events for each Object Ax that gets uploaded.

My question is how do combine these two events, so that I can launch a job only after both Object B is generated, and Object Ax is uploaded, ensuring that for every object Ax that gets uploaded, exactly one job is launched.

(Something similar to Promise.all in javascript)

答案1

得分: 3

The code you provided appears to be in JavaScript and discusses a solution involving Lambda functions, S3 notifications, and orchestration scenarios. If you need any specific translations or clarifications about this code, please let me know.

英文:

While the step functions approach above seems like the right solution generally for these kind of orchestration scenarios, I wonder if in this simple case it would be sufficient to have a single lambda listen to both S3 notifications and simply check if the other file already exists?

Something like:

exports.handler = async function (event, context) {
  const s3Object = extractS3Object(event);
  if (isSecondFile(s3Object) && firstFileExists() 
    || isFirstFile(s3Object) && secondFileExists()) {
    // Do stuff
  } else {
    // Don't do stuff
  }
};

Update: This doesn't work as great if you need a strong guarantee that job is only executed once, but it's still possible with a bit of a hack. You can:

  • Set your lambda concurrency limit to 1
  • Store IDs of successfully processed jobs in a DynamoDB table
  • Check ID in the table before processing a job, and if found, skip.

However, I would first think if it's possible to make your job idempotent, so that you don't have to enforce "only-once" policy

答案2

得分: 2

我建议您使用Step Functions工作流,由EventBridge Scheduler在指定时间触发,来实现此逻辑。它可以生成第二步所需的文件。然后,它可以使用Step Functions AWS SDK服务集成(例如 arn:aws:states:::aws-sdk:s3:listObjectsV2)来确定是否已经上传了来自其他系统的对象。如果已经上传,那么继续执行。如果没有,那么您可以等待直到它上传(例如,使用作业轮询模式来检查是否存在,如果找不到则等待),并实施您可能需要的任何补偿逻辑(例如,处理如果文件没有在您期望的时间内由其他系统上传)。这将是最简单的方法(最少的移动部件),并且可能适合您的需求。

如果以后有其他事情需要等待(例如,您需要等待来自不同外部系统的两个对象),您可以使用Step Functions的并行状态映射状态将其构建到您的工作流中。

如果其他系统通常在定时运行之前上传文件,那么这将是低延迟的。在其他系统之后上传文件的情况下,您的Step Functions工作流中的轮询调用之间将有一些额外的延迟。在轮询频率(降低对象创建和检测之间的延迟)和状态转换的成本之间存在权衡(因为工作流每次都需要执行工作)。如果容量足够大且延迟对您很重要,您可以进一步增强此功能。例如,当来自其他系统的对象不存在时,您可以使用Step Functions的.waitForTaskToken服务集成模式将任务令牌写入DynamoDB表以等待预期的对象。然后,由EventBridge S3通知触发的另一个Step Functions Express工作流查找DynamoDB表中的预期对象,如果找到它,就回调到Step Functions(SendTaskSuccess)以完成工作流执行。如果您的场景是高容量的,那么这将是首选,但正如您所见,这将需要更多的开发工作,并且您需要更仔细地管理竞争条件。

英文:

I suggest you use a Step Functions workflow, triggered at the specified time by EventBridge Scheduler, to implement this logic. It can do the work to generate the file required for your second step. It can then use Step Functions AWS SDK Service Integrations (e.g., arn:aws:states:::aws-sdk:s3:listObjectsV2) to determine if the object from the other system has been uploaded yet. If it has, then carry on. If not, then you can wait until it does (e.g., using a Job Poller pattern to check for existence and wait if not found) and implement any compensating logic you might want (e.g., to handle if the file is not uploaded by the other system within the timeline you expect). This would be the simplest approach (fewest moving parts) and would likely work well for you.

If you later had additional things you wanted to wait for (e.g., you had two objects from different external systems to wait for), you could build that into your workflow using Step Functions Parallel State or Map State.

If the other system typically uploads the files before your timed run, then this will be low latency. In scenarios where the other system uploads after, there will be some additional latency between the polling calls in your Step Functions workflow. You also have a trade-off there between frequency of polling (lowers latency between object creation and detection) and cost for state transitions (as the workflow needs to do work each time). If the volume is high enough and the latency important enough to you, you could enhance this further. For example, when the object from the other system isn't there, you could use the .waitForTaskToken Service Integration Pattern from Step Functions to write a Task Token to a DynamoDB table for the expected object and wait. Then have another Step Functions Express Workflow triggered by EventBridge S3 Notifications that looked up the expected object in the DynamoDB table and, if it found it, call back to Step Functions (SendTaskSuccess) to complete the workflow execution. This would be preferrable if your scenario is high volume, but as you can see it woud be more development effort and you'd need to be more careful to manage race-conditions.

huangapple
  • 本文由 发表于 2023年6月5日 09:34:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/76403083.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定