telegraf: AWS IoT-Core to InfluxDB: i/o timeout

huangapple go评论69阅读模式
英文:

telegraf: AWS IoT-Core to InfluxDB: i/o timeout

问题

I understand that you need assistance with a technical issue related to configuring Telegraf and connecting to AWS IoT-Core. It appears that you are experiencing connectivity issues when using Telegraf to subscribe to an MQTT topic on AWS IoT-Core.

Based on the provided information, you've tried various configurations and versions of Telegraf, but the issue persists. To further troubleshoot and resolve this issue, here are some steps you can take:

  1. Check Security Policies in AWS IoT-Core:
    Ensure that the security policy associated with your AWS IoT Thing allows the necessary permissions for the MQTT action, especially if you've made changes to the security policy.

  2. Double-Check Certificate and Key Files:
    Confirm that the certificate (cert.pem), private key (key.pem), and root certificate (ca.pem) files are correctly placed in the container and have the appropriate permissions. Any issues with these files can lead to TLS handshake failures.

  3. Verify Network Connectivity:
    Ensure that your container has outbound network connectivity to the AWS IoT-Core endpoint on port 8883 or 443 (depending on your configuration). You mentioned that the container can ping the AWS server, which is a good sign. Double-check that there are no network-level restrictions or firewalls blocking outgoing traffic.

  4. Review AWS IoT-Core Endpoint:
    Verify that the AWS IoT-Core endpoint (<AWS-ID>.iot.<LOCATION>.amazonaws.com) is correctly configured in your code. Make sure there are no typos or issues with the endpoint URL.

  5. TLS Version and Cipher Suites:
    You mentioned that Telegraf sometimes uses TLS 1.2 or TLS 1.3. Ensure that the TLS version and cipher suites configured in Telegraf are compatible with AWS IoT-Core's requirements. You may need to experiment with different TLS configurations to find one that works.

  6. Debugging Telegraf:
    You can enable debugging in Telegraf by setting debug = true in your telegraf.conf file. This will provide more detailed logs that may help diagnose the issue. Monitor the logs for any specific error messages that could pinpoint the problem.

  7. Review AWS IoT Logs:
    Check the AWS IoT-Core logs to see if there are any error messages or indications of why the connection is failing. AWS CloudWatch Logs or other AWS IoT-Core logging mechanisms can be helpful for this.

  8. Packet Capture Analysis:
    Continue using packet capture tools like Wireshark to analyze the network traffic. Look for any anomalies or errors in the TLS handshake process. Pay attention to any TLS alerts or renegotiations.

  9. AWS Support:
    If the issue persists after trying these steps, consider reaching out to AWS Support for assistance. They can provide more detailed insights into the AWS IoT-Core side of the configuration.

  10. Community Forums:
    Consider posting your issue on relevant technical forums or communities where others with experience in AWS IoT-Core and Telegraf may be able to offer insights and solutions.

Remember to make one change at a time and carefully monitor the results to isolate the source of the problem. Troubleshooting network and TLS issues can be complex, so a systematic approach is essential.

英文:

I have an existing (working) setup that uses InfluxDB's Native Subscriptions to transfer data from the AWS IoT-Core (as the MQTT broker) to our InfluxDB-Cloud instance. Native Subscriptions are being removed as a feature in InfluxDB - and while I can still use it at the moment there's no guarantee in the long run. The setup used port 8883

Solution: use InfluxDB's telegraf to achieve the same functionality. The data is sent in 'influx' line protocol already, so no additional parsing is necessary. My setup uses the telegraf:1.27.1 docker image to deploy telegraf in a container. I'm volume-mapping the certificates and telegraf-configuration into the container.

Here's the telegraf.conf file:

# Global settings
[agent]
  interval = "5s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = "1ms"
  debug = true
  quiet = false

[[inputs.mqtt_consumer]]
  servers = ["ssl://<AWS-ID>.iot.<LOCATION>.amazonaws.com:8883"]
  qos = 0
  connection_timeout = "30s"
  topics = ["374174.016/data"]
  client_id = "27-telegraf-e9cabc28-6a2b-4423-be74-31fb3dd231cd"
  data_format = "influx"

  # SSL configuration
  tls_ca = "/etc/telegraf/ca.pem"
  tls_cert = "/etc/telegraf/cert.pem"
  tls_key = "/etc/telegraf/key.pem"
  # insecure_skip_verify = false

[[outputs.influxdb_v2]]
  urls = ["https://<LOCATION>.aws.cloud2.influxdata.com/"]
  token = "[REDACTED]"
  organization = "[REDACTED]"
  bucket = "TELEGRAF_DEV"

Here are the startup (and failure) logs for telegraf:

2023-07-06T12:54:11Z I! Loading config: /etc/telegraf/telegraf.conf
2023-07-06T12:54:11Z I! Starting Telegraf 1.27.1
2023-07-06T12:54:11Z I! Available plugins: 237 inputs, 9 aggregators, 28 processors, 23 parsers, 59 outputs, 4 secret-stores
2023-07-06T12:54:11Z I! Loaded inputs: mqtt_consumer
2023-07-06T12:54:11Z I! Loaded aggregators: 
2023-07-06T12:54:11Z I! Loaded processors: 
2023-07-06T12:54:11Z I! Loaded secretstores: 
2023-07-06T12:54:11Z I! Loaded outputs: influxdb_v2
2023-07-06T12:54:11Z I! Tags enabled: host=d847c648021c
2023-07-06T12:54:11Z I! [agent] Config: Interval:5s, Quiet:false, Hostname:"d847c648021c", Flush Interval:10s
2023-07-06T12:54:11Z D! [agent] Initializing plugins
2023-07-06T12:54:11Z D! [agent] Connecting outputs
2023-07-06T12:54:11Z D! [agent] Attempting connection to [outputs.influxdb_v2]
2023-07-06T12:54:11Z D! [agent] Successfully connected to outputs.influxdb_v2
2023-07-06T12:54:11Z D! [agent] Starting service inputs
2023-07-06T12:55:11Z E! [telegraf] Error running agent: starting input inputs.mqtt_consumer: network Error : read tcp 172.17.0.2:51268->52.29.126.248:8443: i/o timeout

I've also written a Python-script to just read a couple of messages off the IoT-core from the same topic using the same certificate files (Python 3.9.6, awscrt==0.14.7, awsiot==0.1.3)

import json
import os.path
import time
import uuid
from dataclasses import dataclass
from typing import Callable, List, Any, Dict

from awscrt import io
from awscrt.mqtt import QoS
from awsiot import mqtt_connection_builder  # type: ignore


class AWSMQTTClient:

    def __init__(
            self,
            host_name: str,
            client_id: str,
            cert_dir: str = "."
    ):
        # write to files (compatibility with AWS library)
        self.cert_file = f"{cert_dir}/cert.pem"
        self.priv_file = f"{cert_dir}/key.pem"
        self.root_file = f"{cert_dir}/ca.pem"

        # check if files exist
        if not os.path.exists(self.cert_file):
            raise FileNotFoundError(f"cert file not found: {self.cert_file}")
        if not os.path.exists(self.priv_file):
            raise FileNotFoundError(f"priv file not found: {self.priv_file}")
        if not os.path.exists(self.root_file):
            raise FileNotFoundError(f"root file not found: {self.root_file}")

        # set up connection object
        event_loop_group = io.EventLoopGroup(1)
        host_resolver = io.DefaultHostResolver(event_loop_group)
        client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
        self.mqtt_connection = mqtt_connection_builder.mtls_from_path(
            endpoint=host_name,
            cert_filepath=self.cert_file,
            pri_key_filepath=self.priv_file,
            client_bootstrap=client_bootstrap,
            ca_filepath=self.root_file,
            client_id=f"{client_id}_{uuid.uuid4()}",
            clean_session=False,
            keep_alive_secs=6,
        )

    def subscribe_to_topic(
            self,
            topic_name: str,
            duration: float,
            on_receive: Callable,
    ) -> None:
        _connect_future = self.mqtt_connection.connect()

        sub = self.mqtt_connection.subscribe(topic=topic_name, qos=QoS(1), callback=on_receive)
        time.sleep(duration)
        _connect_future.result()
        _disconnect_future = self.mqtt_connection.disconnect()
        _disconnect_future.result()
        if sub[0].running():
            raise ConnectionError("connection not closed")


@dataclass
class TopicInfo:
    topic: str
    message: str


@dataclass
class TopicResponse:
    topic_infos: List[TopicInfo]

    def add_topic(self, topic_info: TopicInfo):
        self.topic_infos.append(topic_info)

    def on_topic_received(
            self,
            topic: str,
            payload: bytes,
            dup: Any, qos: Any, retain: Any,
            **kwargs: Dict[Any, Any],
    ) -> None:
        try:
            message_json = json.loads(payload)
            if "message" in message_json.keys():
                message = message_json["message"]
            else:
                message = f"{message_json}"
        except json.decoder.JSONDecodeError:
            message = f"{payload.decode('ascii')}"
        topic_info = TopicInfo(
            topic=topic,
            message=message,
        )
        if topic_info not in self.topic_infos:
            self.add_topic(topic_info=topic_info)


if __name__ == '__main__':

    # settings
    _host_name = "<AWS ID>.iot.<LOCATION>.amazonaws.com"
    _client_id = f"{uuid.uuid4()}"
    _topic = "374174.016/data"

    aws_mqtt_client = AWSMQTTClient(
        host_name=_host_name,
        client_id=_client_id,
    )

    response = TopicResponse(topic_infos=[])
    aws_mqtt_client.subscribe_to_topic(
        topic_name=_topic,
        on_receive=response.on_topic_received,
        duration=7.0,
    )
    print("")
    print("----RECEIVED:----")
    print(response.topic_infos)
    print("-----------------")
    print("")
    assert response.topic_infos.__len__() != 0

The Python script works as expected - it reads (and logs) a couple of messages. From what I can tell it uses Port 443 though instead of 8883.

Here's the Makefile I'm using to deploy both telegraf and the Python script listen_to_mqtt.py via docker:

###########################
# Telegraf test setup	  #
###########################

TELEGRAF_IMAGE=telegraf-test-image
TELEGRAF_CONTAINER=telegraf-test-container
TELEGRAF_LOCAL_DIR := $(shell pwd)
TELEGRAF_INTERNAL_CONFIG_DIR = /etc/telegraf

deploy_telegraf:
	docker run \
		--detach \
		--name $(TELEGRAF_CONTAINER) \
		-v $(TELEGRAF_LOCAL_DIR)/ca.pem:$(TELEGRAF_INTERNAL_CONFIG_DIR)/ca.pem \
		-v $(TELEGRAF_LOCAL_DIR)/cert.pem:$(TELEGRAF_INTERNAL_CONFIG_DIR)/cert.pem \
		-v $(TELEGRAF_LOCAL_DIR)/key.pem:$(TELEGRAF_INTERNAL_CONFIG_DIR)/key.pem \
		-v $(TELEGRAF_LOCAL_DIR)/telegraf.conf:$(TELEGRAF_INTERNAL_CONFIG_DIR)/telegraf.conf:ro \
		telegraf:1.27.1

remove_telegraf:
	-docker rm -f $(TELEGRAF_CONTAINER)

watch_logs_telegraf:
	docker logs -f $(TELEGRAF_CONTAINER)

redeploy_telegraf:
	make remove_telegraf
	make deploy_telegraf
	make watch_logs_telegraf

access_container:
	docker exec -it $(TELEGRAF_CONTAINER) bash

DOCKER_NETWORK_INTERFACE = docker0  # replace this with your Docker network interface

analyze_traffic:
	sudo tcpdump -i $(DOCKER_NETWORK_INTERFACE) -U -w ./tcpdump.pcap   #'port 8883'

analyze_traffic_file:
	wireshark ./tcpdump.pcap

analyze_traffic_file_python:
	wireshark ./tcpdump-py.pcap

perform_connection_check:
	docker exec -it $(TELEGRAF_CONTAINER) ping a1q5qvt4b4ldkh-ats.iot.eu-central-1.amazonaws.com

###########################
# Python equivalent       #
###########################

TELEGRAF_PY_IMAGE=telegraf-test-image-py
TELEGRAF_PY_CONTAINER=telegraf-test-container-py
TELEGRAF_PY_DOCKERFILE=telegraf-py.Dockerfile

build_python:
	docker build -t $(TELEGRAF_PY_IMAGE) -f $(TELEGRAF_PY_DOCKERFILE) .

deploy_python:
	docker run \
		--detach --rm \
		--name $(TELEGRAF_PY_CONTAINER) \
		$(TELEGRAF_PY_IMAGE)

watch_logs_python:
	docker logs -f $(TELEGRAF_PY_CONTAINER)

remove_python:
	-docker rm -f $(TELEGRAF_PY_CONTAINER)

redeploy_python:
	make remove_python
	make build_python
	make deploy_python
	make watch_logs_python

And here's the Dockerfile for the Python script:

FROM python:3.9.6 as py3106
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

WORKDIR /app

# copy requirement files
COPY ./requirements.txt ./requirements.txt

# install awscli
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
RUN unzip awscliv2.zip
RUN ./aws/install

# install general dependencies
RUN apt install gcc
RUN python -m pip install --upgrade pip && pip install --no-cache-dir -r requirements.txt

# copy script & certificates
COPY ./listen_to_mqtt.py ./listen_to_mqtt.py
COPY ./ca.pem ./ca.pem
COPY ./cert.pem ./cert.pem
COPY ./key.pem ./key.pem

# run the script
CMD python listen_to_mqtt.py

I used wireshark and tcpdump to analyze the network traffic from the container, but I'm admittedly a newbie doing that. When setting telegraf up with Port 8883 it fails within ~30s, and I don't see any key exchange and the [SYN] call doesn't receive a [SYN ACK]. When setting it up with Port 443 I see certificate exchanges and it takes around ~60s for the, essentially, same error message to appear: i/o timeout.

So things I tried are:

  • changing the telegraf version (for 1.18.3 the container doesn't crash, but it fails to receive messages, later versions crash after mentioned delay)
  • changing the port (443, 8883, 8443)
  • ping the AWS server from inside the telegraf-container before it crashes (works!)
  • settings insecure_skip_verify = false or insecure_skip_verify = true or leaving it out entirely
  • played around with the AWS IoT-Core's security policy (i.e. TLS13_1_3_2022_10, TLS12_1_0_2015_01, etc.) -- I see via wireshark that telegraf then uses TLS1.3 or TLS1.2, but apart from that no mayor change in behavior occurs

I'm expecting this to be about some tiny setting or configuration in either telegraf or AWS that I just don't know about. I'm not sure how I can proceed after spending close to a day on this already.

答案1

得分: 0

解决方案实际上相当简单,但由于Python脚本而有点令人困惑:出站端口在我们的网络中被阻止了。Python脚本(如前所述)使用了端口443,因此它能够通过 - 但对于Telegraf来说,端口443行不通 - 而且端口8883被阻止导致超时错误。

英文:

turns out the solution was quite simple, but a bit confusing due to the Python script: the outgoing Port was blocked in our network. The Python script (as mentioned) used Port 443 so it got through - Port 443 wouldn't work for telegraf though - and Port 8883 was blocked leading to the timeout error.

huangapple
  • 本文由 发表于 2023年7月6日 21:14:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/76629239.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定