当我下载大文件时,Kafka会重试多次。

huangapple go评论88阅读模式
英文:

kafka retry many times when i download large file

问题

我是Kafka的新手,我尝试构建一个发送带附件的邮件的服务。
执行流程:

  • Kafka将接收到发送邮件的消息
  • 获取文件的函数将从URL下载文件,缩放图像并保存文件
  • 发送邮件时,我将从文件夹中获取文件并附加到表单中
    问题:
  • 当我多次发送带有大文件的邮件时,Kafka会多次重试,我会收到很多邮件

Kafka错误:"kafka server: The provided member is not known in the current generation"

我监听了MaxProcessingTime,但我尝试测试一个带有大文件的邮件,它仍然正常工作

Kafka信息:1个代理,3个消费者

func (s *customerMailService) SendPODMail() error {
    filePaths, err := DownloadFiles(podURLs, orderInfo.OrderCode)

    if err != nil {
        countRetry := 0
        for countRetry <= NUM_OF_RETRY {
            filePaths, err = DownloadFiles(podURLs, orderInfo.OrderCode)
            if err == nil {
                break
            }
            countRetry++
        }
    }

    err = s.sendMailService.Send(ctx, orderInfo.CustomerEmail, tmsPod, content, filePaths)
}

下载文件的函数:

func DownloadFiles(files []string, orderCode string) ([]string, error) {
    var filePaths []string

    err := os.Mkdir(tempDir, 0750)
    if err != nil && !os.IsExist(err) {
        return nil, err
    }

    tempDirPath := tempDir + "/" + orderCode
    err = os.Mkdir(tempDirPath, 0750)
    if err != nil && !os.IsExist(err) {
        return nil, err
    }

    for _, fileUrl := range files {
        fileUrlParsed, err := url.ParseRequestURI(fileUrl)
        if err != nil {
            logrus.WithError(err).Infof("Pod url is invalid %s", orderCode)
            return nil, err
        }

        extFile := filepath.Ext(fileUrlParsed.Path)
        dir, err := os.MkdirTemp(tempDirPath, "tempDir")

        if err != nil {
            return nil, err
        }

        f, err := os.CreateTemp(dir, "tmpfile-*"+extFile)
        if err != nil {
            return nil, err
        }
        defer f.Close()

        response, err := http.Get(fileUrl)
        if err != nil {
            return nil, err
        }
        defer response.Body.Close()

        contentTypes := response.Header["Content-Type"]
        isTypeAllow := false
        for _, contentType := range contentTypes {
            if contentType == "image/png" || contentType == "image/jpeg" {
                isTypeAllow = true
            }
        }

        if !isTypeAllow {
            logrus.WithError(err).Infof("Pod image type is invalid %s", orderCode)
            return nil, errors.New("Pod image type is invalid")
        }

        decodedImg, err := imaging.Decode(response.Body)
        if err != nil {
            return nil, err
        }

        resizedImg := imaging.Resize(decodedImg, 1024, 0, imaging.Lanczos)

        imaging.Save(resizedImg, f.Name())

        filePaths = append(filePaths, f.Name())
    }
    return filePaths, nil
}

发送邮件的函数:

func (s *tikiMailService) SendFile(ctx context.Context, receiver string, templateCode string, data interface{}, filePaths []string) error {
    path := "/v1/emails"
    fullPath := fmt.Sprintf("%s%s", s.host, path)

    formValue := &bytes.Buffer{}
    writer := multipart.NewWriter(formValue)
    _ = writer.WriteField("template", templateCode)
    _ = writer.WriteField("to", receiver)

    if data != nil {
        b, err := json.Marshal(data)
        if err != nil {
            return errors.Wrapf(err, "Cannot marshal mail data to json with object %+v", data)
        }

        _ = writer.WriteField("params", string(b))
    }

    for _, filePath := range filePaths {
        part, err := writer.CreateFormFile(filePath, filepath.Base(filePath))

        if err != nil {
            return err
        }

        pipeReader, pipeWriter := io.Pipe()

        go func() {
            defer pipeWriter.Close()

            file, err := os.Open(filePath)
            if err != nil {
                return
            }
            defer file.Close()

            io.Copy(pipeWriter, file)
        }()

        io.Copy(part, pipeReader)
    }

    err := writer.Close()
    if err != nil {
        return err
    }

    request, err := http.NewRequest("POST", fullPath, formValue)
    if err != nil {
        return err
    }
    request.Header.Set("Content-Type", writer.FormDataContentType())

    resp, err := s.doer.Do(request)
    if err != nil {
        return errors.Wrap(err, "Cannot send request to send email")
    }
    defer resp.Body.Close()

    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return err
    }

    if resp.StatusCode != http.StatusOK {
        return errors.New(fmt.Sprintf("Send email with code %s error: status code %d, response %s",
            templateCode, resp.StatusCode, string(b)))
    } else {
        logrus.Infof("Send email with attachment ,code %s success with response %s , box-code", templateCode, string(b), filePaths)
    }
    return nil
}

希望对你有帮助!

英文:

I am newbie in kafka, i try build a service send mail with attach files.
Execution flow:

  • Kafka will receive a message to send mail
  • function get file will download file from url , scale image, and save file
  • when send mail i will get files from folder and attach to form
    Issues:
  • when i send mail with large files many times , kafka retry many times, i will receive many mail

kafka error: "kafka server: The provided member is not known in the current generation"

I listened MaxProcessingTime , but i try to test a mail with large file, it still work fine

Kafka info : 1 broker , 3 consumer

func (s *customerMailService) SendPODMail() error { filePaths, err := DownloadFiles(podURLs, orderInfo.OrderCode)
if err != nil{
countRetry := 0
for countRetry &lt;= NUM_OF_RETRY{
filePaths, err = DownloadFiles(podURLs, orderInfo.OrderCode)
if err == nil{
break
}
countRetry++
}
}
err = s.sendMailService.Send(ctx, orderInfo.CustomerEmail, tmsPod, content,filePaths)}

function download file :

func DownloadFiles(files []string, orderCode string) ([]string, error) {
var filePaths []string
err := os.Mkdir(tempDir, 0750)
if err != nil &amp;&amp; !os.IsExist(err) {
return nil, err
}
tempDirPath := tempDir + &quot;/&quot; + orderCode
err = os.Mkdir(tempDirPath, 0750)
if err != nil &amp;&amp; !os.IsExist(err) {
return nil, err
}
for _, fileUrl := range files {
fileUrlParsed, err := url.ParseRequestURI(fileUrl)
if err != nil {
logrus.WithError(err).Infof(&quot;Pod url is invalid %s&quot;, orderCode)
return nil, err
}
extFile := filepath.Ext(fileUrlParsed.Path)
dir, err := os.MkdirTemp(tempDirPath, &quot;tempDir&quot;)
if err != nil {
return nil, err
}
f, err := os.CreateTemp(dir, &quot;tmpfile-*&quot;+extFile)
if err != nil {
return nil, err
}
defer f.Close()
response, err := http.Get(fileUrl)
if err != nil {
return nil, err
}
defer response.Body.Close()
contentTypes := response.Header[&quot;Content-Type&quot;]
isTypeAllow := false
for _, contentType := range contentTypes {
if contentType == &quot;image/png&quot; || contentType == &quot;image/jpeg&quot; {
isTypeAllow = true
}
}
if !isTypeAllow {
logrus.WithError(err).Infof(&quot;Pod image type is invalid %s&quot;, orderCode)
return nil, errors.New(&quot;Pod image type is invalid&quot;)
}
decodedImg, err := imaging.Decode(response.Body)
if err != nil {
return nil, err
}
resizedImg := imaging.Resize(decodedImg, 1024, 0, imaging.Lanczos)
imaging.Save(resizedImg, f.Name())
filePaths = append(filePaths, f.Name())
}
return filePaths, nil}

function send mail

func (s *tikiMailService) SendFile(ctx context.Context, receiver string, templateCode string, data interface{}, filePaths []string) error {
path := &quot;/v1/emails&quot;
fullPath := fmt.Sprintf(&quot;%s%s&quot;, s.host, path)
formValue := &amp;bytes.Buffer{}
writer := multipart.NewWriter(formValue)
_ = writer.WriteField(&quot;template&quot;, templateCode)
_ = writer.WriteField(&quot;to&quot;, receiver)
if data != nil {
b, err := json.Marshal(data)
if err != nil {
return errors.Wrapf(err, &quot;Cannot marshal mail data to json with object %+v&quot;, data)
}
_ = writer.WriteField(&quot;params&quot;, string(b))
}
for _, filePath := range filePaths {
part, err := writer.CreateFormFile(filePath, filepath.Base(filePath))
if err != nil {
return err
}
pipeReader, pipeWriter := io.Pipe()
go func() {
defer pipeWriter.Close()
file, err := os.Open(filePath)
if err != nil {
return 
}
defer file.Close()
io.Copy(pipeWriter, file)
}()
io.Copy(part, pipeReader)
}
err := writer.Close()
if err != nil {
return err
}
request, err := http.NewRequest(&quot;POST&quot;, fullPath, formValue)
if err != nil {
return err
}
request.Header.Set(&quot;Content-Type&quot;, writer.FormDataContentType())
resp, err := s.doer.Do(request)
if err != nil {
return errors.Wrap(err, &quot;Cannot send request to send email&quot;)
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return errors.New(fmt.Sprintf(&quot;Send email with code %s error: status code %d, response %s&quot;,
templateCode, resp.StatusCode, string(b)))
} else {
logrus.Infof(&quot;Send email with attachment ,code %s success with response %s , box-code&quot;, templateCode, string(b),filePaths)
}
return nil
}

Thank

答案1

得分: 2

我的团队在重新部署k8s pods时发现了我的问题,这导致了冲突的领导者分区,从而引起了重新平衡。它将尝试再次处理pods缓冲区中的剩余消息。

解决方案:我不会获取保存在缓冲区中的许多消息,我只获取一条消息并按照配置进行处理:

ChannelBufferSize = 0

示例冲突的领导者分区:

消费者A和B同时启动
消费者A将自己注册为领导者,并拥有所有分区的主题
消费者B将自己注册为领导者,然后开始重新平衡并拥有所有分区
消费者A重新平衡并获取所有分区,但无法消费,因为成员ID已过时,需要一个新的成员ID
消费者B再次重新平衡并拥有所有分区,但这已经被消费者A获取了
英文:

My team found my problem when I redeploy k8s pods, which lead to conflict leader partition causing rebalance. It will try to process the remaining messages in buffer of pods again.

Solution: I don't fetch many messages saved in buffer , I just get a message and process it by config :

ChannelBufferSize = 0

Example conflict leader parition:

consumer A and B startup in the same time
consumer A registers itself as leader, and owns the topic with all partitions
consumer B registers itself as leader, and then begins to rebalance and owns all partitions
consumer A rebalance and obtains all partitions, but can not consume because the memberId is old and need a new one
consumer B rebalance again and owns the topic with all partitions, but it&#39;s already obtained by consumer A

答案2

得分: 0

我的建议是:对于非常大的附件,消费者需要花费相当长的时间来读取文件并将其作为附件发送。

这会增加两个poll()调用之间的时间间隔。如果该时间大于max.poll.interval.ms,则认为消费者失败,并且不会提交分区偏移量。结果是消息会再次被处理,如果执行时间恰好低于轮询间隔,则最终会提交偏移量。这会导致多次发送邮件。

尝试增加消费者端的max.poll.interval.ms

英文:

My two cents: in case of very big attachments, the consumer takes quite a lot of time to read the file and to send it as an attachment.

This increases the amount of time between two poll() calls. If that time is greater than max.poll.interval.ms, the consumer is thought to be failed and the partition offset is not committed. As a result, the message is processed again and eventually, if by chance the execution time stays below the poll interval, the offset is committed. The effect is a multiple email send.

Try increasing the max.poll.interval.ms on the consumer side.

huangapple
  • 本文由 发表于 2022年4月4日 12:53:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/71732182.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定