
huangapple go评论70阅读模式

kafka retry many times when i download large file



  • Kafka将接收到发送邮件的消息
  • 获取文件的函数将从URL下载文件,缩放图像并保存文件
  • 发送邮件时,我将从文件夹中获取文件并附加到表单中
  • 当我多次发送带有大文件的邮件时,Kafka会多次重试,我会收到很多邮件

Kafka错误:"kafka server: The provided member is not known in the current generation"



func (s *customerMailService) SendPODMail() error {
    filePaths, err := DownloadFiles(podURLs, orderInfo.OrderCode)

    if err != nil {
        countRetry := 0
        for countRetry <= NUM_OF_RETRY {
            filePaths, err = DownloadFiles(podURLs, orderInfo.OrderCode)
            if err == nil {

    err = s.sendMailService.Send(ctx, orderInfo.CustomerEmail, tmsPod, content, filePaths)


func DownloadFiles(files []string, orderCode string) ([]string, error) {
    var filePaths []string

    err := os.Mkdir(tempDir, 0750)
    if err != nil && !os.IsExist(err) {
        return nil, err

    tempDirPath := tempDir + "/" + orderCode
    err = os.Mkdir(tempDirPath, 0750)
    if err != nil && !os.IsExist(err) {
        return nil, err

    for _, fileUrl := range files {
        fileUrlParsed, err := url.ParseRequestURI(fileUrl)
        if err != nil {
            logrus.WithError(err).Infof("Pod url is invalid %s", orderCode)
            return nil, err

        extFile := filepath.Ext(fileUrlParsed.Path)
        dir, err := os.MkdirTemp(tempDirPath, "tempDir")

        if err != nil {
            return nil, err

        f, err := os.CreateTemp(dir, "tmpfile-*"+extFile)
        if err != nil {
            return nil, err
        defer f.Close()

        response, err := http.Get(fileUrl)
        if err != nil {
            return nil, err
        defer response.Body.Close()

        contentTypes := response.Header["Content-Type"]
        isTypeAllow := false
        for _, contentType := range contentTypes {
            if contentType == "image/png" || contentType == "image/jpeg" {
                isTypeAllow = true

        if !isTypeAllow {
            logrus.WithError(err).Infof("Pod image type is invalid %s", orderCode)
            return nil, errors.New("Pod image type is invalid")

        decodedImg, err := imaging.Decode(response.Body)
        if err != nil {
            return nil, err

        resizedImg := imaging.Resize(decodedImg, 1024, 0, imaging.Lanczos)

        imaging.Save(resizedImg, f.Name())

        filePaths = append(filePaths, f.Name())
    return filePaths, nil


func (s *tikiMailService) SendFile(ctx context.Context, receiver string, templateCode string, data interface{}, filePaths []string) error {
    path := "/v1/emails"
    fullPath := fmt.Sprintf("%s%s", s.host, path)

    formValue := &bytes.Buffer{}
    writer := multipart.NewWriter(formValue)
    _ = writer.WriteField("template", templateCode)
    _ = writer.WriteField("to", receiver)

    if data != nil {
        b, err := json.Marshal(data)
        if err != nil {
            return errors.Wrapf(err, "Cannot marshal mail data to json with object %+v", data)

        _ = writer.WriteField("params", string(b))

    for _, filePath := range filePaths {
        part, err := writer.CreateFormFile(filePath, filepath.Base(filePath))

        if err != nil {
            return err

        pipeReader, pipeWriter := io.Pipe()

        go func() {
            defer pipeWriter.Close()

            file, err := os.Open(filePath)
            if err != nil {
            defer file.Close()

            io.Copy(pipeWriter, file)

        io.Copy(part, pipeReader)

    err := writer.Close()
    if err != nil {
        return err

    request, err := http.NewRequest("POST", fullPath, formValue)
    if err != nil {
        return err
    request.Header.Set("Content-Type", writer.FormDataContentType())

    resp, err := s.doer.Do(request)
    if err != nil {
        return errors.Wrap(err, "Cannot send request to send email")
    defer resp.Body.Close()

    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return err

    if resp.StatusCode != http.StatusOK {
        return errors.New(fmt.Sprintf("Send email with code %s error: status code %d, response %s",
            templateCode, resp.StatusCode, string(b)))
    } else {
        logrus.Infof("Send email with attachment ,code %s success with response %s , box-code", templateCode, string(b), filePaths)
    return nil



I am newbie in kafka, i try build a service send mail with attach files.
Execution flow:

  • Kafka will receive a message to send mail
  • function get file will download file from url , scale image, and save file
  • when send mail i will get files from folder and attach to form
  • when i send mail with large files many times , kafka retry many times, i will receive many mail

kafka error: "kafka server: The provided member is not known in the current generation"

I listened MaxProcessingTime , but i try to test a mail with large file, it still work fine

Kafka info : 1 broker , 3 consumer

func (s *customerMailService) SendPODMail() error { filePaths, err := DownloadFiles(podURLs, orderInfo.OrderCode)
if err != nil{
countRetry := 0
for countRetry &lt;= NUM_OF_RETRY{
filePaths, err = DownloadFiles(podURLs, orderInfo.OrderCode)
if err == nil{
err = s.sendMailService.Send(ctx, orderInfo.CustomerEmail, tmsPod, content,filePaths)}

function download file :

func DownloadFiles(files []string, orderCode string) ([]string, error) {
var filePaths []string
err := os.Mkdir(tempDir, 0750)
if err != nil &amp;&amp; !os.IsExist(err) {
return nil, err
tempDirPath := tempDir + &quot;/&quot; + orderCode
err = os.Mkdir(tempDirPath, 0750)
if err != nil &amp;&amp; !os.IsExist(err) {
return nil, err
for _, fileUrl := range files {
fileUrlParsed, err := url.ParseRequestURI(fileUrl)
if err != nil {
logrus.WithError(err).Infof(&quot;Pod url is invalid %s&quot;, orderCode)
return nil, err
extFile := filepath.Ext(fileUrlParsed.Path)
dir, err := os.MkdirTemp(tempDirPath, &quot;tempDir&quot;)
if err != nil {
return nil, err
f, err := os.CreateTemp(dir, &quot;tmpfile-*&quot;+extFile)
if err != nil {
return nil, err
defer f.Close()
response, err := http.Get(fileUrl)
if err != nil {
return nil, err
defer response.Body.Close()
contentTypes := response.Header[&quot;Content-Type&quot;]
isTypeAllow := false
for _, contentType := range contentTypes {
if contentType == &quot;image/png&quot; || contentType == &quot;image/jpeg&quot; {
isTypeAllow = true
if !isTypeAllow {
logrus.WithError(err).Infof(&quot;Pod image type is invalid %s&quot;, orderCode)
return nil, errors.New(&quot;Pod image type is invalid&quot;)
decodedImg, err := imaging.Decode(response.Body)
if err != nil {
return nil, err
resizedImg := imaging.Resize(decodedImg, 1024, 0, imaging.Lanczos)
imaging.Save(resizedImg, f.Name())
filePaths = append(filePaths, f.Name())
return filePaths, nil}

function send mail

func (s *tikiMailService) SendFile(ctx context.Context, receiver string, templateCode string, data interface{}, filePaths []string) error {
path := &quot;/v1/emails&quot;
fullPath := fmt.Sprintf(&quot;%s%s&quot;, s.host, path)
formValue := &amp;bytes.Buffer{}
writer := multipart.NewWriter(formValue)
_ = writer.WriteField(&quot;template&quot;, templateCode)
_ = writer.WriteField(&quot;to&quot;, receiver)
if data != nil {
b, err := json.Marshal(data)
if err != nil {
return errors.Wrapf(err, &quot;Cannot marshal mail data to json with object %+v&quot;, data)
_ = writer.WriteField(&quot;params&quot;, string(b))
for _, filePath := range filePaths {
part, err := writer.CreateFormFile(filePath, filepath.Base(filePath))
if err != nil {
return err
pipeReader, pipeWriter := io.Pipe()
go func() {
defer pipeWriter.Close()
file, err := os.Open(filePath)
if err != nil {
defer file.Close()
io.Copy(pipeWriter, file)
io.Copy(part, pipeReader)
err := writer.Close()
if err != nil {
return err
request, err := http.NewRequest(&quot;POST&quot;, fullPath, formValue)
if err != nil {
return err
request.Header.Set(&quot;Content-Type&quot;, writer.FormDataContentType())
resp, err := s.doer.Do(request)
if err != nil {
return errors.Wrap(err, &quot;Cannot send request to send email&quot;)
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
if resp.StatusCode != http.StatusOK {
return errors.New(fmt.Sprintf(&quot;Send email with code %s error: status code %d, response %s&quot;,
templateCode, resp.StatusCode, string(b)))
} else {
logrus.Infof(&quot;Send email with attachment ,code %s success with response %s , box-code&quot;, templateCode, string(b),filePaths)
return nil



得分: 2

我的团队在重新部署k8s pods时发现了我的问题,这导致了冲突的领导者分区,从而引起了重新平衡。它将尝试再次处理pods缓冲区中的剩余消息。


ChannelBufferSize = 0



My team found my problem when I redeploy k8s pods, which lead to conflict leader partition causing rebalance. It will try to process the remaining messages in buffer of pods again.

Solution: I don't fetch many messages saved in buffer , I just get a message and process it by config :

ChannelBufferSize = 0

Example conflict leader parition:

consumer A and B startup in the same time
consumer A registers itself as leader, and owns the topic with all partitions
consumer B registers itself as leader, and then begins to rebalance and owns all partitions
consumer A rebalance and obtains all partitions, but can not consume because the memberId is old and need a new one
consumer B rebalance again and owns the topic with all partitions, but it&#39;s already obtained by consumer A


得分: 0





My two cents: in case of very big attachments, the consumer takes quite a lot of time to read the file and to send it as an attachment.

This increases the amount of time between two poll() calls. If that time is greater than max.poll.interval.ms, the consumer is thought to be failed and the partition offset is not committed. As a result, the message is processed again and eventually, if by chance the execution time stays below the poll interval, the offset is committed. The effect is a multiple email send.

Try increasing the max.poll.interval.ms on the consumer side.

  • 本文由 发表于 2022年4月4日 12:53:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/71732182.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
