NestJs – 用于消费 GCP Pubsub 的工作者

huangapple go评论131阅读模式
英文:

NestJs - Worker to consume GCP Pubsub

问题

我需要开发一个 worker 来消费 GCP Pub/Sub 消息。

以下是我使用的代码:

main.ts

import { INestApplication, INestApplicationContext } from '@nestjs/common';
import { CancellationTokenService } from '@/services/cancellationToken';
import { NestFactory } from '@nestjs/core';
import { AppModule } from '@/app.module';
import { PubSubService } from '@/services/pubSub';
async function main() {
  const app = await NestFactory.createApplicationContext(AppModule);
  const cancellationToken = app.get(CancellationTokenService);
  configureApplicationStop(cancellationToken);
  await run(app, cancellationToken);
  while (!cancellationToken.isCancelled) {
    console.log('Microservice is running...');
    await new Promise((resolve) => setTimeout(resolve, 1000));
  }
  await app.close();
}

async function run(
  app: INestApplicationContext,
  cancellationToken: CancellationTokenService,
) {
  try {
    const pubSub = app.get(PubSubService);
    await pubSub.consumeMessages();
  } catch (error) {
    console.error(error);
    cancellationToken.cancel();
  }
}

function configureApplicationStop(cancellationToken: CancellationTokenService) {
  process.on('SIGINT', () => {
    cancellationToken.cancel();
    console.log('Stopping application...');
  });
}
main();

app.module.ts

import { Module } from '@nestjs/common';
import { PubSubService } from '@/services/pubSub';
import { CancellationTokenService } from '@/services/cancellationToken';
@Module({
  imports: [],
  providers: [CancellationTokenService, PubSubService],
})
export class AppModule {}

pubsub.service.ts

import { Injectable } from '@nestjs/common';
import { PubSub, Subscription } from '@google-cloud/pubsub';
import { CancellationTokenService } from '@/services/cancellationToken';
@Injectable()
export class PubSubService {
  pubsub: PubSub;
  subscription: Subscription;
  constructor(private readonly cancellationToken: CancellationTokenService) {
    this.pubsub = new PubSub();
    this.initialize();
    console.log('PubsubService');
  }
  async initialize(): Promise<void> {
    this.subscription = this.initializeSubscription();
  }
  initializeSubscription(): Subscription {
    return this.pubsub.subscription('my-subsription');
  }
  async consumeMessages(): Promise<void> {
    if (this.cancellationToken.isCancelled) {
      console.log('cancellationToken.isCancelled');
      this.subscription.close();
    }
    console.log('cancellationToken.isCancelled');
    if (!this.subscription.isOpen) this.subscription.open();
    this.subscription.on('message', (message) => {
      if (this.cancellationToken.isCancelled)
        console.log('cancellationToken.isCancelled consuming messages');
      console.log(`Received message ${message.id}:`);
      console.log(`\tData: ${message.data}`);
      console.log(`\tAttributes: ${message.attributes}`);
      message.nack();
      //message.ack();
    });
  }
}

cancellationToken.service.ts

import { Injectable, Scope } from '@nestjs/common';
@Injectable({ scope: Scope.DEFAULT })
export class CancellationTokenService {
  private _isCancelled: boolean;
  constructor() {
    this._isCancelled = false;
  }
  get isCancelled(): boolean {
    return this._isCancelled;
  }
  cancel(): void {
    this._isCancelled = true;
  }
  reset(): void {
    this._isCancelled = false;
  }
}

我需要在触发 SIGTERM 时停止消费消息。

不清楚为什么它不起作用,因为应用明显记录了 "Stopping application..."。

我对 Nest.js 相当陌生,所以我有点困惑,不确定这是否是正确的做法。

英文:

I need to develop an worker to consume GCP Pub/Sub messages.

Here is the code that I'm using:

main.ts

import { INestApplication, INestApplicationContext } from &#39;@nestjs/common&#39;;
import { CancellationTokenService } from &#39;@/services/cancellationToken&#39;;
import { NestFactory } from &#39;@nestjs/core&#39;;
import { AppModule } from &#39;@/app.module&#39;;
import { PubSubService } from &#39;@/services/pubSub&#39;;
async function main() {
  const app = await NestFactory.createApplicationContext(AppModule);
  const cancellationToken = app.get(CancellationTokenService);
  configureApplicationStop(cancellationToken);
  await run(app, cancellationToken);
  while (!cancellationToken.isCancelled) {
    console.log(&#39;Microservice is running...&#39;);
    await new Promise((resolve) =&gt; setTimeout(resolve, 1000));
  }
  await app.close();
}



async function run(
  app: INestApplicationContext,
  cancellationToken: CancellationTokenService,
) {
  try {
    const pubSub = app.get(PubSubService);
    await pubSub.consumeMessages();
  } catch (error) {
    console.error(error);
    cancellationToken.cancel();
  }
}

function configureApplicationStop(cancellationToken: CancellationTokenService) {
  process.on(&#39;SIGINT&#39;, () =&gt; {
    cancellationToken.cancel();
    console.log(&#39;Stopping application...&#39;);
  });
}
main();

app.module.ts

import { Module } from &#39;@nestjs/common&#39;;
import { PubSubService } from &#39;@/services/pubSub&#39;;

import { CancellationTokenService } from &#39;@/services/cancellationToken&#39;;
@Module({
  imports: [],
  providers: [CancellationTokenService, PubSubService],
})
export class AppModule {}

pubsub.service.ts

import { Injectable } from &#39;@nestjs/common&#39;;
import { PubSub, Subscription } from &#39;@google-cloud/pubsub&#39;;
import { CancellationTokenService } from &#39;@/services/cancellationToken&#39;;
@Injectable()
export class PubSubService {
  pubsub: PubSub;
  subscription: Subscription;
  constructor(private readonly cancellationToken: CancellationTokenService) {
    this.pubsub = new PubSub();
    this.initialize();
    console.log(&#39;PubsubService&#39;);
  }
  async initialize(): Promise&lt;void&gt; {
    this.subscription = this.initializeSubscription();
  }
  initializeSubscription(): Subscription {
    return this.pubsub.subscription(&#39;my-subsription&#39;);
  }
  async consumeMessages(): Promise&lt;void&gt; {
    if (this.cancellationToken.isCancelled) {
      console.log(&#39;cancellationToken.isCancelled&#39;);
      this.subscription.close();
    }
    console.log(&#39;cancellationToken.isCancelled&#39;);
    if (!this.subscription.isOpen) this.subscription.open();
    this.subscription.on(&#39;message&#39;, (message) =&gt; {
      if (this.cancellationToken.isCancelled)
        console.log(&#39;cancellationToken.isCancelled consuming messages&#39;);
      console.log(`Received message ${message.id}:`);
      console.log(`\tData: ${message.data}`);
      console.log(`\tAttributes: ${message.attributes}`);
      message.nack();
      //message.ack();
    });
  }
}

cancellationToken.service.ts

import { Injectable, Scope } from &#39;@nestjs/common&#39;;
@Injectable({ scope: Scope.DEFAULT })
export class CancellationTokenService {
  private _isCancelled: boolean;
  constructor() {
    this._isCancelled = false;
  }
  get isCancelled(): boolean {
    return this._isCancelled;
  }
  cancel(): void {
    this._isCancelled = true;
  }
  reset(): void {
    this._isCancelled = false;
  }
}

I need to stop consuming messages when SIGTERM is triggered.

I don't know why it isn't working, because apparenlty the application is logging Stopping application...

I'm currently new with nest js so I'm kind of lost if this is the right way to do it.

答案1

得分: 0

这可能不是最佳的方法,但目前可以解决问题。

我已经在我的app.module.ts中创建了一个cancellationTokenService的实例和自定义提供程序。

import { CancellationTokenService } from '@/services/cancellationToken';
const cancellationTokenService = new CancellationTokenService();
//...
const cancellationTokenProvider: Provider = {
  provide: CancellationTokenService,
  useFactory: () => cancellationTokenService,
};
@Module({
  imports: [],
  providers: [cancellationTokenProvider, PubSubService],
})
英文:

Maybe this is not the best aproach, but for now this solve the issue.

I've created an instance of cancellationTokenService and custom provider in my app.module.ts

import { CancellationTokenService } from &#39;@/services/cancellationToken&#39;;
const cancellationTokenService = new CancellationTokenService();
//...
const cancellationTokenProvider: Provider = {
  provide: CancellationTokenService,
  useFactory: () =&gt; cancellationTokenService,
};
@Module({
  imports: [],
  providers: [cancellationTokenProvider, PubSubService],
})

huangapple
  • 本文由 发表于 2023年4月7日 04:12:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/75953406.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定