当前时间戳在由序列化生产者序列化后更改为1970年01月20日。

huangapple go评论88阅读模式
英文:

Timestamp of current time change to 1970-01-20 after serialisation by Serialising Producer

问题

我使用confluent-kafka[avro](版本2.1.1)库将AWS Lambda的结果以avro模式发送到我们的Kafka。模式的序列化正常,除了第一个键"creation_time",在被Kafka接收时(在AKHQ中可视化)总是被设置为类似1970-01-20T13:34:12.378Z的时间戳。只要我让消息的时间戳保持默认设置,消息的时间戳就正常,但如果我尝试使用与模式中相同的时间戳,消息在AKHQ中显示为58年前发送。

我在所有环境中都遇到了这个问题,包括本地开发环境。

我尝试过调试代码,但在序列化之后无法获取信息,以下是我确定的信息:

在序列化之前的时间戳变量内容(float):1690451888.45323
在AKHQ消息上收到的时间:1970-01-20T13:34:11.888Z

经过转换后,这个时间被转换为1686851的时间戳。我最初以为在序列化之前它被截断了,但似乎不是这样。

以下是我在值中获取时间戳的方式:

timestamp = datetime.now(tz=tz.gettz(self.config.timezone)).timestamp()
values = {
    "creationTime": timestamp,
    "description": message,
    "eventTypeId": self.config.metric_name,
    "pgd": [],
    "contracts": [],
    "points": [],
    "objects": [],
}

我的Kafka代码:

class KafkaProducer:
    def __init__(self, schema: str, kafka_info: KafkaInfo):
        # ... (你的配置信息)

    def produce(self, topic: str, key=None, value=None, timestamp=0):
        # ... (发送消息的代码)

    def flush(self):
        # ... (刷新消息的代码)

    @staticmethod
    def delivery_report(err, msg):
        # ... (消息传递报告的代码)

我的avro模式(部分内容被隐藏以保护客户信息):

{
  "type": "record",
  "name": "EventRecord",
  "namespace": "com.event",
  "doc": "Schema for raw supervision event",
  "fields": [
    {
      "name": "creationTime",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "eventTypeId",
      "type": "string"
    },
    {
      "name": "internalProductId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "description",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "contracts",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "points",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "objects",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}

在akhq主题上收到的消息:

{
  "creationTime": "1970-01-20T13:34:12.378Z",
  "eventTypeId": "test",
  "description": "Test",
  "pgd": [],
  "contracts": [],
  "points": [],
  "objects": []
}

版本信息:

  • confluent-kafka[avro]==2.1.1
  • Kafka Broker=latest
  • 操作系统:Windows(Kafka容器在Docker上)/ AWS ECS(也是Docker容器)
英文:

I've been the confluent-kafka[avro] (2.1.1) library to send the result of an AWS lambda to our kafka with an avro schema.

The serialisation of the schema is fine, except for the first key, "creation_time" which invariably is set to something like 1970-01-20T13:34:12.378Z when received by kafka (as visualised in AKHQ).
The timestamp of the message is fine as long as I let it set to default, If y try to use the same timestamp as in the schema the message is shown in the AKHQ as sent 58 years ago.

I have the problem in all our environments kafka's and I can reproduce it on my local dev env.

I tried to debug the code, but I can't get info after the serialisation, here's what I'm sure of:
Timestamp var content just before serialisation (float): 1690451888.45323
Time received on the AKHQ message: 1970-01-20T13:34:11.888Z

After conversion this time give 1686851 as timestamp.
I initially through it was somehow truncated before serialisation, but it doesn't looks like it.

Here's how I get my timestamp in the values:

timestamp = datetime.now(tz=tz.gettz(self.config.timezone)).timestamp()
values = {
    "creationTime": timestamp,
    "description": message,
    "eventTypeId": self.config.metric_name,
    "pgd": [],
    "contracts": [],
    "points": [],
    "objects": [],
}

My kafka code

"""This module contains everything necessary to send messages to kafka"""
import logging

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import StringSerializer

from param_store_models import KafkaInfo

LOGGER = logging.getLogger(__name__)


class KafkaProducer:
    """Class used to send messages to kafka"""

    def __init__(self, schema: str, kafka_info: KafkaInfo):
        producer_ssm_conf = {
            "bootstrap.servers": kafka_info.bootstrap_servers,
            "security.protocol": kafka_info.security_protocol,
            "sasl.mechanism": kafka_info.sasl_mecanism,
            "sasl.username": kafka_info.sasl_username,
            "sasl.password": kafka_info.sasl_password,
        }
        registry_ssm_conf = {"url": kafka_info.schema_registry_url}

        serializer = AvroSerializer(
            SchemaRegistryClient(registry_ssm_conf), schema, conf={"auto.register.schemas": False}
        )

        producer_default_conf = {
            "value.serializer": serializer,
            "key.serializer": StringSerializer(),
            "enable.idempotence": "true",
            "max.in.flight.requests.per.connection": 1,
            "retries": 5,
            "acks": "all",
            "retry.backoff.ms": 500,
            "queue.buffering.max.ms": 500,
            "error_cb": self.delivery_report,
        }

        self.__serializing_producer = SerializingProducer({**producer_default_conf, **producer_ssm_conf})

    def produce(self, topic: str, key=None, value=None, timestamp=0):
        """Asynchronously produce message to a topic"""
        LOGGER.info(f"Produce message {value} to topic {topic}")
        self.__serializing_producer.produce(topic, key, value, on_delivery=self.delivery_report, timestamp=timestamp)

    def flush(self):
        """
        Flush messages and trigger callbacks
        :return: Number of messages still in queue.
        """
        LOGGER.debug("Flushing messages to kafka")
        return self.__serializing_producer.flush()

    @staticmethod
    def delivery_report(err, msg):
        """
        Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush().
        """
        if err:
            LOGGER.error(f"Kafka message delivery failed: {err}")
        else:
            LOGGER.info(f"Kafka message delivered to {msg.topic()} [{msg.partition()}]")

My avro schema (partially redacted to hide the customer info)

{
  "type": "record",
  "name": "EventRecord",
  "namespace": "com.event",
  "doc": "Schéma d'un évènement de supervision brut",
  "fields": [
    {
      "name": "creationTime",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "eventTypeId",
      "type": "string"
    },
    {
      "name": "internalProductId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "description",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "contracts",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "points",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "objects",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}

Received message on the akhq topic

{
  "creationTime": "1970-01-20T13:34:12.378Z",
  "eventTypeId": "test",
  "description": "Test",
  "pgd": [],
  "contracts": [],
  "points": [],
  "objects": []
}

Versions

confluent-kafka[avro]==2.1.1

kafka broker = latest

Operating system: Windows (Kafka stock on docker) / AWS ECS (Docker containers also)

答案1

得分: 1

你的 timestamp 变量是这样创建的:

timestamp = datetime.now(tz=tz.gettz(self.config.timezone)).timestamp()

你调用的 timestamp() 函数返回自纪元以来的秒数。然而,你正在使用期望自纪元以来的毫秒数的 timestamp-millis 逻辑类型。

所以最简单的解决方案就是将你的 timestamp 乘以 1000。

另一个选项是,大多数库支持逻辑类型,所以你应该能够将 creationTime 设置为 datetime 对象,库将进行转换以确定序列化应该是多少毫秒。

英文:

Your timestamp variable is created like so:

timestamp = datetime.now(tz=tz.gettz(self.config.timezone)).timestamp()

The timestamp() function you call returns the number of seconds since epoch. However, you are using the timestamp-millis logical type which expects milliseconds since epoch.

So the easiest solution is just to multiple your timestamp by 1000.

Another option is that most libraries support logical types so that you should be able to set the creationTime to the datetime object and the library will do the conversion to figure out how many milliseconds it should be for serialization.

huangapple
  • 本文由 发表于 2023年7月27日 18:58:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/76779058.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定