英文:
NestJs - Worker to consume GCP Pubsub
问题
我需要开发一个 worker 来消费 GCP Pub/Sub 消息。
以下是我使用的代码:
main.ts
import { INestApplication, INestApplicationContext } from '@nestjs/common';
import { CancellationTokenService } from '@/services/cancellationToken';
import { NestFactory } from '@nestjs/core';
import { AppModule } from '@/app.module';
import { PubSubService } from '@/services/pubSub';
async function main() {
const app = await NestFactory.createApplicationContext(AppModule);
const cancellationToken = app.get(CancellationTokenService);
configureApplicationStop(cancellationToken);
await run(app, cancellationToken);
while (!cancellationToken.isCancelled) {
console.log('Microservice is running...');
await new Promise((resolve) => setTimeout(resolve, 1000));
}
await app.close();
}
async function run(
app: INestApplicationContext,
cancellationToken: CancellationTokenService,
) {
try {
const pubSub = app.get(PubSubService);
await pubSub.consumeMessages();
} catch (error) {
console.error(error);
cancellationToken.cancel();
}
}
function configureApplicationStop(cancellationToken: CancellationTokenService) {
process.on('SIGINT', () => {
cancellationToken.cancel();
console.log('Stopping application...');
});
}
main();
app.module.ts
import { Module } from '@nestjs/common';
import { PubSubService } from '@/services/pubSub';
import { CancellationTokenService } from '@/services/cancellationToken';
@Module({
imports: [],
providers: [CancellationTokenService, PubSubService],
})
export class AppModule {}
pubsub.service.ts
import { Injectable } from '@nestjs/common';
import { PubSub, Subscription } from '@google-cloud/pubsub';
import { CancellationTokenService } from '@/services/cancellationToken';
@Injectable()
export class PubSubService {
pubsub: PubSub;
subscription: Subscription;
constructor(private readonly cancellationToken: CancellationTokenService) {
this.pubsub = new PubSub();
this.initialize();
console.log('PubsubService');
}
async initialize(): Promise<void> {
this.subscription = this.initializeSubscription();
}
initializeSubscription(): Subscription {
return this.pubsub.subscription('my-subsription');
}
async consumeMessages(): Promise<void> {
if (this.cancellationToken.isCancelled) {
console.log('cancellationToken.isCancelled');
this.subscription.close();
}
console.log('cancellationToken.isCancelled');
if (!this.subscription.isOpen) this.subscription.open();
this.subscription.on('message', (message) => {
if (this.cancellationToken.isCancelled)
console.log('cancellationToken.isCancelled consuming messages');
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
message.nack();
//message.ack();
});
}
}
cancellationToken.service.ts
import { Injectable, Scope } from '@nestjs/common';
@Injectable({ scope: Scope.DEFAULT })
export class CancellationTokenService {
private _isCancelled: boolean;
constructor() {
this._isCancelled = false;
}
get isCancelled(): boolean {
return this._isCancelled;
}
cancel(): void {
this._isCancelled = true;
}
reset(): void {
this._isCancelled = false;
}
}
我需要在触发 SIGTERM 时停止消费消息。
不清楚为什么它不起作用,因为应用明显记录了 "Stopping application..."。
我对 Nest.js 相当陌生,所以我有点困惑,不确定这是否是正确的做法。
英文:
I need to develop an worker to consume GCP Pub/Sub messages.
Here is the code that I'm using:
main.ts
import { INestApplication, INestApplicationContext } from '@nestjs/common';
import { CancellationTokenService } from '@/services/cancellationToken';
import { NestFactory } from '@nestjs/core';
import { AppModule } from '@/app.module';
import { PubSubService } from '@/services/pubSub';
async function main() {
const app = await NestFactory.createApplicationContext(AppModule);
const cancellationToken = app.get(CancellationTokenService);
configureApplicationStop(cancellationToken);
await run(app, cancellationToken);
while (!cancellationToken.isCancelled) {
console.log('Microservice is running...');
await new Promise((resolve) => setTimeout(resolve, 1000));
}
await app.close();
}
async function run(
app: INestApplicationContext,
cancellationToken: CancellationTokenService,
) {
try {
const pubSub = app.get(PubSubService);
await pubSub.consumeMessages();
} catch (error) {
console.error(error);
cancellationToken.cancel();
}
}
function configureApplicationStop(cancellationToken: CancellationTokenService) {
process.on('SIGINT', () => {
cancellationToken.cancel();
console.log('Stopping application...');
});
}
main();
app.module.ts
import { Module } from '@nestjs/common';
import { PubSubService } from '@/services/pubSub';
import { CancellationTokenService } from '@/services/cancellationToken';
@Module({
imports: [],
providers: [CancellationTokenService, PubSubService],
})
export class AppModule {}
pubsub.service.ts
import { Injectable } from '@nestjs/common';
import { PubSub, Subscription } from '@google-cloud/pubsub';
import { CancellationTokenService } from '@/services/cancellationToken';
@Injectable()
export class PubSubService {
pubsub: PubSub;
subscription: Subscription;
constructor(private readonly cancellationToken: CancellationTokenService) {
this.pubsub = new PubSub();
this.initialize();
console.log('PubsubService');
}
async initialize(): Promise<void> {
this.subscription = this.initializeSubscription();
}
initializeSubscription(): Subscription {
return this.pubsub.subscription('my-subsription');
}
async consumeMessages(): Promise<void> {
if (this.cancellationToken.isCancelled) {
console.log('cancellationToken.isCancelled');
this.subscription.close();
}
console.log('cancellationToken.isCancelled');
if (!this.subscription.isOpen) this.subscription.open();
this.subscription.on('message', (message) => {
if (this.cancellationToken.isCancelled)
console.log('cancellationToken.isCancelled consuming messages');
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
message.nack();
//message.ack();
});
}
}
cancellationToken.service.ts
import { Injectable, Scope } from '@nestjs/common';
@Injectable({ scope: Scope.DEFAULT })
export class CancellationTokenService {
private _isCancelled: boolean;
constructor() {
this._isCancelled = false;
}
get isCancelled(): boolean {
return this._isCancelled;
}
cancel(): void {
this._isCancelled = true;
}
reset(): void {
this._isCancelled = false;
}
}
I need to stop consuming messages when SIGTERM is triggered.
I don't know why it isn't working, because apparenlty the application is logging Stopping application...
I'm currently new with nest js so I'm kind of lost if this is the right way to do it.
答案1
得分: 0
这可能不是最佳的方法,但目前可以解决问题。
我已经在我的app.module.ts中创建了一个cancellationTokenService
的实例和自定义提供程序。
import { CancellationTokenService } from '@/services/cancellationToken';
const cancellationTokenService = new CancellationTokenService();
//...
const cancellationTokenProvider: Provider = {
provide: CancellationTokenService,
useFactory: () => cancellationTokenService,
};
@Module({
imports: [],
providers: [cancellationTokenProvider, PubSubService],
})
英文:
Maybe this is not the best aproach, but for now this solve the issue.
I've created an instance of cancellationTokenService and custom provider in my app.module.ts
import { CancellationTokenService } from '@/services/cancellationToken';
const cancellationTokenService = new CancellationTokenService();
//...
const cancellationTokenProvider: Provider = {
provide: CancellationTokenService,
useFactory: () => cancellationTokenService,
};
@Module({
imports: [],
providers: [cancellationTokenProvider, PubSubService],
})
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论