如何在Angular RxJS中将subscribe替换为subscrib?

huangapple go评论86阅读模式
英文:

How do I replace subscrib in a subscribe in Angular RxJS?

问题

我有一个关于RxJS的问题。
我正在创建一个用于管理协会成员的Web应用程序。
我想创建一个按钮来“重置”网站的数据库。
步骤如下:

  • 发送电子邮件给所有成员重新注册
  • 删除数据
  • 刷新页面

这是我编写的代码,它可以工作,但我知道有一些问题。我对RxJS还不太了解所有的原理...

newYear(){
    this.amicalisteService.getAmicalistesValides("").subscribe({
      // 发送电子邮件给所有有效的成员
      next: amicalistes => {
        for(const amicaliste of amicalistes) {
          const to = amicaliste.email;
          const cc = null;
          const subject = "Adhère à l'AEIR";
          const body = ""
          this.amicalisteService.sendMail(null, to, cc, subject, body).subscribe({
            next: response => {
              console.log('E-mail envoyé avec succès !', response);
            },
            error: error => {
              console.error('Erreur lors de l\'envoi de l\'e-mail :', error);
            }
          });
        }
      },
      complete: () => {
        // 删除成员、照片和成员卡
        this.amicalisteService.getAmicalistes().subscribe(amicalistes  => {
          for(const amicaliste of amicalistes){
            this.amicalisteService.deleteAmicalisteById(amicaliste.id).subscribe();
          }
        });
        this.imageService.getImages("amicaliste").subscribe(images => {
          for(const image of images){
            this.imageService.deleteImageByName("amicaliste", this.imageService.getImageName(image.toString())).subscribe();
          }
        });
        this.imageService.getImages("pdf").subscribe(pdfs => {
          for(const pdf of pdfs){
            this.imageService.deleteImageByName("pdf", this.imageService.getImageName(pdf.toString())).subscribe();
          }
        })
      }
      // 刷新页面...
    })
  }

我听说在subscribe()内部使用subscribe()不是一个好的做法,但我无法找到其他的方法。
然而,我希望在这段代码中保留几个方面。在complete中,这3个subscribe()是并行执行的,如果我没有弄错的话。我希望保持这一点。
另外,我知道使用switchMap可能会对我有所帮助,但我似乎无法实现它。有人可以给我一些建议吗?

非常感谢!

英文:

I have a question about RxJS.
I am creating a web app to manage the members of an association.
I want to create a button to "reset" the database of a site.
The steps are as follows:

  • send an e-mail to all members to re-register
  • Delete data
  • Refresh page

Here's the code I've made, which works, but I know there are a few things wrong. I'm new to RxJS, so I don't quite understand all the principles...

newYear(){
    this.amicalisteService.getAmicalistesValides("").subscribe({
      // Envoyer un mail à l'ensemble des amicalistes valides
      next: amicalistes => {
        for(const amicaliste of amicalistes) {
          const to = amicaliste.email;
          const cc = null;
          const subject = "Adhère à l'AEIR";
          const body = ""
          this.amicalisteService.sendMail(null, to, cc, subject, body).subscribe({
            next: response => {
              console.log('E-mail envoyé avec succès !', response);
            },
            error: error => {
              console.error('Erreur lors de l\'envoi de l\'e-mail :', error);
            }
          });
        }
      },
      complete: () => {
        // Supprimer amicalistes, photos et cartes amicalistes
        this.amicalisteService.getAmicalistes().subscribe(amicalistes  => {
          for(const amicaliste of amicalistes){
            this.amicalisteService.deleteAmicalisteById(amicaliste.id).subscribe();
          }
        });
        this.imageService.getImages("amicaliste").subscribe(images => {
          for(const image of images){
            this.imageService.deleteImageByName("amicaliste", this.imageService.getImageName(image.toString())).subscribe();
          }
        });
        this.imageService.getImages("pdf").subscribe(pdfs => {
          for(const pdf of pdfs){
            this.imageService.deleteImageByName("pdf", this.imageService.getImageName(pdf.toString())).subscribe();
          }
        })
      }
      //Refresh...
    })
  }

I've heard it's not good practice to use subscribe() inside subscribe(), but I can't figure out how to do it differently.
There are several things I'd like to keep in this code, however. In the complete, the 3 subscribe() run in parallel, if I'm not mistaken. I'd like to keep that.
Otherwise, I understand that using a switchMap could help me, but I can't seem to implement it. Can anyone give me some advice?

Thanks you very much !

答案1

得分: 2

可观察对象是事件流。

远程调用(例如调用服务器发送邮件或调用数据库清理数据)被实现为只通知一个事件(即远程调用的响应)然后完成或出错的事件流。

使用 RxJs 运算符,你可以组合这些事件流。例如,你可以执行以下操作:

  • 从一个流开始,Stream_A,它发出 event_A,然后完成
  • 你可以有第二个流,Stream_B,它发出 event_B,然后完成
  • 然后,你可以将 Stream_AStream_B 组合起来创建第三个流,Stream_A_B,它首先触发 Stream_A 的执行并发出 event_A,一旦 event_A 被通知,就触发 Stream_B 的执行并发出 Stream_B 通知的所有事件,这种情况下只有 event_B
  • 为了在 RxJs 中创建这个组合流,我们使用 concatMap 运算符(注意:人们经常使用 switchMap 来连接流,结果通常是相同的,但含义和潜在行为略有不同,对于必须按顺序发生的远程服务调用序列,通常首选 concatMap 方法)

另一个将多个流组合成新流的示例如下:

  • 有 2 个流,Stream_1Stream_2Stream_3。每个流都发出一个值,然后完成。
  • 我们可以组合这 3 个流,等待所有 3 个流发出并完成,然后只发出一个值,即所有流发出的值的数组,然后完成。
  • 使用 RxJs,可以使用 forkJoin 函数获得这种新的组合流。

说了这么多,希望能对 RxJs 和可观察对象有所了解,下面是我在你的情况下会做的事情:

newYear(){
    // 假设 getAmicalistesValides 返回一个发出远程调用结果的可观察对象
    this.amicalisteService.getAmicalistesValides("")
    // 要组合可观察对象,我们需要使用 "pipe" 运算符,即按顺序执行运算符
    .pipe(
      // 这里首先要做的事情似乎是为每个 amicaliste 发送一封电子邮件
      // 假设我们想要并行发送所有电子邮件,我们可以先为每个要发送的邮件创建一个可观察对象,然后使用 forkJoin 并行执行它们
      // 但是所有这些都必须在 getAmicalistesValides 返回的可观察对象发出其值之后发生,因此我们使用 concatMap
      concatMap(amicalistes => {
        for(const amicaliste of amicalistes) {
          const to = amicaliste.email;
          const cc = null;
          const subject = "Adhère à l'AEIR";
          const body = ""
          // 这里我们创建可观察对象数组
          const sendMailObs = this.amicalisteService.sendMail(null, to, cc, subject, body)
          // 每个这些可观察对象都可以打印一些内容或对错误做出反应
          .pipe(tap({
            next: response => {
              console.log('E-mail envoyé avec succès !', response);
            },
            error: error => {
              console.error('Erreur lors de l\'envoi de l\'e-mail :', error);
            }}))
          });
          // 现在我们触发所有 sendMail 可观察对象的并发执行
          return forkJoin(sendMailObs)
      }),
      // 在发送邮件之后,你想要做更多的事情:删除数据、图片等等 - 假设每个操作都是一个可观察对象
      // 你将需要使用 concatMap,并在其中创建新的可观察对象,并使用 forkJoin 并行触发它们,如上所述
      concatMap(mailSentResults => {
         const deleteDataObs = ....
         const deleteImagesObs = ...
         ...
         return forkJoin([deleteDataObs, deleteImagesObs, // 可能还有其他可观察对象])
      })
    )
    // 到目前为止,你已经创建了一个新的流,组合了其他各种流
    // 现在是订阅这个新流的时候了,这是你唯一想要显式订阅的流
    .subscribe({
      next: res => ... // 处理上游通知的值
      error: err => ... // 处理错误
      complete: () => ... // 在全部完成时执行某些操作(如果需要的话)
    })
  }

希望我理解了你的情况,并且以上内容有一定的意义。

英文:

Observables are streams of events.

Remote calls (e.g. calling a server to send mails or calling a db to clean up some data) are implemented as streams of events that notify just one event (i.e. the response of the remote call) and then complete or just `error○s.

With RxJs operators you can combine such streams. For instance you do the following:

  • you start with one stream, Stream_A, that emits event_A and then completes
  • you can have a second stream, Stream_B, that emits event_B and then completes
  • and then you combine Stream_A and Stream_B to create a third stream, Stream_A_B that first triggers the execution of Stream_A and emits event_A and, as soon as event_A has been notified, triggers the execution of Stream_B and emits all the events notified by Stream_B, which in this case is just event_B
  • In order to create this combined stream in RxJs we use the oprator concatMap (note: often people use switchMap to concatenate streams - often the result is the same but the meaning and the potential behaviors are slightly different - with sequences of calls to remote services which have to occur one after the other, concatMap is usually the preferred approach)

Another example of combination of more streams to obtain a new stream is the following:

  • There are 2 streams, Stream_1 Stream_2 and Stream_3. Each of these streams emits one value and then completes.
  • We can combine these 3 streams that waits for all 3 streams to emit and complete and then emits only one value, which is the array of all values emitted by the streams, and then complete.
  • With RxJs such new combined stream is obtained with the function forkJoin

Havin said that, with the hope to cast some clarity on RxJs and Observables, here is what I would do in your case

newYear(){
// assume getAmicalistesValides returns an Observable that emits the result
// of a remote call
this.amicalisteService.getAmicalistesValides("")
// to combine Observables we need to "pipe" operators, i.e. to execute
// operators one after the other
.pipe(
// first thing to do here seems to send an email for each amicaliste
// assuming we want to send the emails all in parallel, we can first
// create one Observable for each mail to be sent and then use forkJoin
// to execute them all in parallel
// But all this has to happen after the Observable returned by getAmicalistesValides
// has emitted its value, hence we use concatMap
concatMap(amicalistes => {
for(const amicaliste of amicalistes) {
const to = amicaliste.email;
const cc = null;
const subject = "Adhère à l'AEIR";
const body = ""
// here we create the array of Observables
const sendMailObs = this.amicalisteService.sendMail(null, to, cc, subject, body)
// each of these Observables can print something or react to errors
.pipe(tap({
next: response => {
console.log('E-mail envoyé avec succès !', response);
},
error: error => {
console.error('Erreur lors de l\'envoi de l\'e-mail :', error);
}))
});
// now we trigger the concurrent execution of all sendMail observables
return forkJoin(sendMailObs)
}),
// after having sent the mails you want to do more stuff: delete data, images
// and so on - assume each of these operations is an Observable
// you will have to use concatMap and within it create the new Observables
// and trigger them in parallel using forkJoin, as above
concatMap(mailSentResults => {
const deleteDataObs = ....
const deleteImagesObs = ...
...
return forkJoin([deleteDataObs, deleteImagesObs, // maybe other Obsevables])
})
)
// up to here you have created a new stream, composing various other streams
// and now is the time to subscribe to this new stream, which is the only stream 
// you want to explicitely subscribe
.subscribe({
next: res => ... // manage the value notified by upstream
error: err => ... // manage error
complete: () => ... // do something when all is completed, if required
})
}

I hope I have understood your case and all this makes some sense

答案2

得分: 1

这里我基本上将许多可观察对象转换为一个可观察对象,主要依靠flatMap(在rxjs中是concatMap)和forkJoin将多个可观察对象组合成一个可观察对象,类似于Promise.all用于 promises。

不幸的是,我没有找到“单子”式的 flat map,所以我们必须将值数组映射为可观察对象数组,并使用forkJoin来处理它们。

this.amicalisteService.getAmicalistesValides("").pipe(
  concatMap(amicalistes => { // Observable<Amicaliste[]> -> Observable<void[]>
    return forkJoin( // Observable<void>[] -> Observable<void[]>
      amicalistes.map(amicaliste => { // Amicaliste -> Observable<void>
        const to = amicaliste.email;
        const cc = null;
        const subject = "Adhère à l'AEIR";
        const body = "";
        return this.amicalisteService.sendMail(null, to, cc, subject, body)
          .pipe(tap({
            next: response => {
              console.log('E-mail envoyé avec succès !', response);
            },
            error: error => {
              console.error('Erreur lors de l\'envoi de l\'e-mail :', error);
            }
          }));
      })
    );
  }),

  // 然后,在上述完成后执行剩下的部分
  concatMap(() => forkJoin([
    this.amicalisteService.getAmicalistes()
      .pipe(
        concatMap(amicalistes => {
          return forkJoin(amicalistes.map(amicaliste => this.amicalisteService.deleteAmicalisteById(amicaliste.id)));
        })
      ),
    this.imageService.getImages("amicaliste")
      .pipe(
        concatMap(images => {
          return forkJoin(
            images.map(image => this.imageService.deleteImageByName("amicaliste", this.imageService.getImageName(image.toString())))
          )
        })
      ),
    this.imageService.getImages("pdf")
      .pipe(
        concatMap(pdfs => {
          return forkJoin(
            pdfs.map(pdf => this.imageService.deleteImageByName("pdf", this.imageService.getImageName(pdf.toString())))
          )
        })
      )
  ])), 
);

希望对你有帮助!

英文:

Here I basically turn a lot of your observables into one, relying mostly on flatMap (concatMap when it comes to rxjs) and forkJoin to combine multiple observables into one which is analogous to Promise.all for promises.

There's no "monadic" flat map (that I could find) sadly, so we have to map arrays of values to arrays of observable and forkJoin them.

this.amicalisteService.getAmicalistesValides(&quot;&quot;).pipe(
concatMap(amicalistes =&gt; { // Observable&lt;Amicaliste[]&gt; -&gt; Observable&lt;void[]&gt;
return forkJoin( // Observable&lt;void&gt;[] -&gt; Observable&lt;void[]&gt;
amicalistes.map(amicaliste =&gt; { // Amicaliste -&gt; Observable&lt;void&gt;
const to = amicaliste.email;
const cc = null;
const subject = &quot;Adh&#232;re &#224; l&#39;AEIR&quot;;
const body = &quot;&quot;
return this.amicalisteService.sendMail(null, to, cc, subject, body)
.pipe(tap({
next: response =&gt; {
console.log(&#39;E-mail envoy&#233; avec succ&#232;s !&#39;, response);
},
error: error =&gt; {
console.error(&#39;Erreur lors de l\&#39;envoi de l\&#39;e-mail :&#39;, error);
}
}));
})
);
}),
// Then, when the above is completed, execute the rest
concatMap(() =&gt; forkJoin([
this.amicalisteService.getAmicalistes()
.pipe(
concatMap(amicalistes =&gt; {
return forkJoin(amicalistes.map(amicaliste =&gt; this.amicalisteService.deleteAmicalisteById(amicaliste.id)));
})
),
this.imageService.getImages(&quot;amicaliste&quot;)
.pipe(
concatMap(images =&gt; {
return forkJoin(
images.map(image =&gt; this.imageService.deleteImageByName(&quot;amicaliste&quot;, this.imageService.getImageName(image.toString())))
)
})
),
this.imageService.getImages(&quot;pdf&quot;)
.pipe(
concatMap(pdfs =&gt; {
return forkJoin(
pdfs.map(pdf =&gt; this.imageService.deleteImageByName(&quot;pdf&quot;, this.imageService.getImageName(pdf.toString())))
)
})
)
])), 
);

huangapple
  • 本文由 发表于 2023年8月8日 23:14:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/76860928.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定