Publish messages to MQTT using Celery-Beat scheduled tasks

Hello everyone,

I am developing a Django+Vue web application along with an MQTT server. I want to implement Celery Beat to execute scheduled tasks, which I define in the “calendarios” models where I input the tasks according to the date, interval, repetition, and others. In “eventosCalendarios,” I save each task with the time and date to execute. What I aim for is that upon executing each task, I can publish to a specific topic on the MQTT server.

Models.py

class calendarios(models.Model):
    nombre=models.CharField(max_length= 100, verbose_name='Nombre', unique=True)
    acciones=models.ForeignKey(acciones, null=True,blank=True,on_delete=models.CASCADE,verbose_name='Acciones')
    fecha_inicio = models.DateField(null=True, blank=True)
    fecha_fin = models.DateField(null=True, blank=True)
    repeticion= models.CharField(max_length=1, choices=repeticionChoices, default='D') # Ej: 'diaria', 'semanal', 'mensual'
    intervalo = models.IntegerField(null=True, blank=True)  # Número de días, semanas o meses dependiendo de la repeticion
    todoCultivo= models.CharField(max_length=1, choices=SioNO, default='S')
    hora_repeticion_1 = models.TimeField(null=True, blank=True)  # Hora de la primera repetición diaria
    hora_repeticion_2 = models.TimeField(null=True, blank=True)  # Hora de la segunda repetición diaria
    hora_repeticion_3 = models.TimeField(null=True, blank=True)
    cultivo=models.ManyToManyField(cultivo, verbose_name="Cultivos",blank=True)
    plantas=models.ManyToManyField(plantas, verbose_name="Plantas",blank=True)
    


    def __str__(self):
        return self.nombre

    class Meta:
        verbose_name = 'Calendario'
        verbose_name_plural = 'Calendarios'
        db_table = 'calendarios'
        ordering = ['id']

class eventosCalendarios(models.Model):
    
    title = models.CharField(max_length=100)
    start = models.DateTimeField()
    calendario = models.ForeignKey(calendarios, on_delete=models.CASCADE)
    allDay = models.BooleanField(default=False)
    
    def __str__(self):
        return self.title
 
    class Meta:
        verbose_name = 'Evento calendario'
        verbose_name_plural = 'Eventos calendarios'
        db_table = 'eventos_calendarios'
        ordering = ['id']

I have already implemented a method in views.py that allows me to publish when I access the following URL from the browser: “http://localhost:8000/publish/”.

View.py

from django.views.decorators.csrf import csrf_exempt
import paho.mqtt.publish as publish
from django.http import HttpResponse
import os 
import json

@csrf_exempt
def publish_message(request):
    message = {
            'interface': 'send_aio'
        }
    # Definir las credenciales del cliente MQTT
    auth = {
        'username': '***',
        'password': '***'
    }
    publish.single("comandos", json.dumps(message), hostname=os.environ['MQTT_SERVER'], auth=auth)
    return HttpResponse("Published")

Urls.py

path('publish/', publish_message, name= 'publish' )

Now, I want to implement it with Celery-Beat in tasks.py. At the moment, I have managed that when there is a task, it returns a message at the scheduled date and time. I need to execute publish_message each time a task is executed. I appreciate your help.

Tasks.py

from django.utils import timezone
from .models import eventosCalendarios
from celery import shared_task

@shared_task
def ejecutar_evento_programado():
    ahora = timezone.localtime(timezone.now())  # Convertir la hora actual a la zona horaria local

    # Obtener el evento más cercano a la hora actual que aún no ha ocurrido
    evento_mas_cercano = eventosCalendarios.objects.filter(start__gte=ahora).order_by('start').first()
    

    if evento_mas_cercano:
        # Si la fecha y hora del evento coinciden con la fecha y hora actual, ejecutar el evento
        if evento_mas_cercano.start == ahora:
            ejecutar_evento.delay(evento_mas_cercano.id)  # Usamos .delay() para programar la tarea
            
            evento_mas_cercano.delete()  # Eliminar el evento después de ejecutarlo
            
        # Calcular el tiempo de espera hasta la hora de inicio del evento
        tiempo_espera = evento_mas_cercano.start - ahora

        # Programar la ejecución de la tarea para la hora de inicio del evento
        ejecutar_evento.apply_async(args=[evento_mas_cercano.id], countdown=tiempo_espera.total_seconds())
        # Convertir la hora del evento a la zona horaria local
        hora_evento_local = evento_mas_cercano.start.astimezone(timezone.get_current_timezone())

        return f"La proxima tarea a ejecutar es '{evento_mas_cercano.title}' a las {hora_evento_local}"

    else:
        return "No hay eventos pendientes en este momento"

@shared_task
def ejecutar_evento(evento_id):
    evento = eventosCalendarios.objects.get(id=evento_id)
    # Aquí ejecuta la acción asociada al evento
    return f"Ejecutando evento '{evento.title}'"

Call your publish.single method within your task - or create a function similar to publish_message that isn’t a view, and call it from your task. (A Celery task doesn’t get a request and doesn’t return a response, so it doesn’t make sense to call a view from a task.)