Django API with MQTT Integration - Connection Issues and Design Advice

Hi everyone,

I’m building a REST API in Django using Django REST Framework, and I need your help with two aspects of my project: resolving an issue with MQTT integration and validating the design structure.

Project Overview

The API helps users interact with their home devices:

  • Read current device settings: When a user connects to the app, the authentication process triggers an API endpoint to retrieve the device’s settings via an MQTT message.
  • Update device settings: Users can modify settings via another API call that publishes changes to the device through the MQTT broker.

How It Works

  1. When a user authenticates, the app sends a request to the API to fetch the device’s settings.
  2. The API establishes an MQTT connection with the broker, subscribes to a topic specific to the device, and waits for a message containing the device’s current settings.
  3. The received settings are returned in the API response.
  4. The API does not maintain a constant connection to the broker. It only connects when a request is made.
  5. I do not store the settings in a database or cache. The API always retrieves them in real-time from the MQTT broker.

The Issue

Sometimes, the MQTT client encounters a socket error when trying to connect to the broker. Here’s what I’ve ensured so far:

  • The client certificate, key, and CA certificate are valid.
  • The broker configuration (name, port, SSL/TLS settings) is correct.
  • Logging shows that the connection occasionally fails with a socket error.

Here’s the core API and MQTT client code:

API Endpoint Code
class ProductSettingsView(GenericAPIView):
    permission_classes = [IsAuthenticated]
    serializer_class = ProductSerializer

def post(self, request, *args, **kwargs):
    serializer = self.get_serializer(data=request.data)
    if serializer.is_valid(raise_exception=True):
        product_id = serializer.validated_data['product_id']
        product = ALL_POMP_AO_PRODUCTS.get(owner_id=self.request.user.id, id=product_id)

    user = self.request.user
    if not user:
        raise exceptions.BadRequest(
            {'detail': _("User doesn't exist"), "code": "user_do_not_exist"}
        )

    if not product:
        raise exceptions.BadRequest(
            {"detail": _("User/Product relation violation"), "code": "invalid_relation"}
        )

    try:
        certificate = Certificate.objects.get(client_id=user.id)
    except Certificate.DoesNotExist:
        raise exceptions.BadRequest(
            {"detail": _("Certificate doesn't exist"), "code": "certificate_do_not_exist"}
        )

    try:
        with MqttClient(user=str(user.uuid), certificate=certificate.certificate, key=certificate.key, product=product) as mqtt_client:
            message = mqtt_client.get_message()
            if message:
                return Response(message, status=status.HTTP_200_OK)
            else:
                return Response(
                    {"error": "Timeout or no message received"},
                    status=status.HTTP_504_GATEWAY_TIMEOUT
                )
    except exceptions.BadRequest as e:
        return Response(e.detail, status=status.HTTP_400_BAD_REQUEST)
MQTT Client Code
class MqttProduct:
    def __init__(self, user, certificate, key, product):
        self.user_name = user
        self.product = product
        self.certfile = certificate
        self.keyfile = key

        self.ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        self.ssl_context.load_cert_chain(certfile=self.certfile, keyfile=self.keyfile)
        self.ssl_context.load_verify_locations(cafile=ENV.path("MQTT_SERVER_CA"))

        self.mqtt_client = mqtt.Client(client_id=self.user_name, clean_session=True)
        self.mqtt_client.tls_set_context(self.ssl_context)
        self.mqtt_client.on_connect = self.on_connect
        self.mqtt_client.on_message = self.on_message
        self.lock = threading.Condition()

    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            print("Connected to broker")
            self.mqtt_client.subscribe(f"path/{self.product.name}/settings", qos=1)
        else:
            print("Failed to connect, return code %d\n", rc)

    def on_message(self, client, userdata, msg):
        with self.lock:
            # Process my message here
            self.lock.notify()

    def get_message(self):
        try:
            self.connection_ready = threading.Event()
            self.mqtt_client.connect(self.BROKER, self.BROKERPORT, 60)
            self.mqtt_client.loop_start()

            if not self.connection_ready.wait(timeout=10):  # Wait for connection
                print("Connection timeout")
                return None
        
            with self.lock:
                if not self.lock.wait(timeout=10):
                    print("Timeout waiting for message")
                    return None

            with self.lock:
                    return list(self.messages.values())
        
        except (Exception, ssl.SSLError) as e:
            logging.error(f"MQTT connection failed: {e}")
            raise exceptions.BadRequest(
                {
                    "detail": _("MQTT Connection Error"),
                   "code": "mqtt_connection_error"
                }
            )
        finally:
            self.stop()

My Question About the Design

Currently, my API fetches device settings in real-time through the MQTT broker when requested. I’m not using a database or cache to store these settings.

I’m considering an alternative approach:

  • Using InfluxDB: Store incoming MQTT payloads (settings data) in InfluxDB, and modify the API to fetch the settings from InfluxDB instead of directly querying the broker in real time.

My reasoning is that it may improve reliability and reduce the chance of connection issues with the broker during API calls.


Request for Help

  1. What could be causing the socket errors despite providing correct certificates and settings?
  2. Is my current structure (fetching settings in real-time without a database or cache) logical for my use case?
  3. Would using InfluxDB (or another database) to store MQTT payloads and querying from there be a better practice?
  4. Are there other best practices for designing such a system?

I’d love to hear your advice and suggestions. Thanks in advance for your time and help!