I want to add rabbit mq as input into my django app

My django application is working just fine. I wanted to add rabbitmq for input of external events published on rabbitmp.

I started by importing all the required libraries. Pika, etc.

Then I modified the apps.py in my main app as follows.

from django.apps import AppConfig
import logging

logger = logging.getLogger(__name__)
logger.critical("Now logging correlator apps")

class CorrelatorConfig(AppConfig):
    name = 'correlator'

    def ready(self):
        
        logger.critical("CorrelaotrConfig.ready VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV")
        import pika, sys, os
        
        import concurrent.futures

        # RabbitMQ connection parameters
        rabbitmq_host = 'localhost'
        queue_name = 'correlator2'
        
        from correlator.api.methods.mqmethods.mqrecievecallback import MqReceiveCallback

        def callback(ch, method, properties, body):
        
            logger.critical(" Received body = %s ", body)
            logger.critical(" [x] Received properties = %s ", properties)
            logger.critical(" [x] Received method = %s ", method)
            MqReceiveCallback.callback(ch, method, properties, body)


       # Function to consume messages from the RabbitMQ queue
        def consume():
            connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host))
            channel = connection.channel()
            channel.queue_declare(queue=queue_name)

            channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

            logger.critical(f"Waiting for messages in {queue_name}. To exit press CTRL+C")
            channel.start_consuming()

        # # Set up a multithreaded daemon to monitor the RabbitMQ queue
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            future = executor.submit(consume)
            try:
                future.result()  # This will wait for the consume function to finish
                logger.critical("OOOOOOOOOOOOOOOOOOOOOOOOO thread completed! OOOOOOOOOOOOOOOOOOOO")
            except Exception as e:
                logger.error("OOOOOOOOOOOOOOOOOOOOOO error %s OOOOOOOOOOOOOOOOOOOO", e, exc_info=1)

pardon the logging for debugging. I got that working only to find my app’s web pages and admin pages were not working any longer.

When I comment out the ‘with…’ block of code my web pages and admin page return to normal.

I suspect I need to setup the rabbitmq daemon someplace else.

Correct. You do not want to start threads or processes within your Django process.

When you have a long-running task that needs to run concurrently with Django, you should run it as an external process of some kind. You could set this up as a custom management command, for example.

Yes thanks @KenWhitesell that was the answer. Here is my consumer.py code

#correlator/management/commands/consumer.py
from django.core.management.base import BaseCommand, CommandError
import logging

logger = logging.getLogger(__name__)
logger.critical("Now logging consumer command")


class Command(BaseCommand):
    help = "Closes the specified poll for voting"

    def add_arguments(self, parser):
        parser.add_argument("numthreads", nargs="+", type=int)

    def handle(self, *args, **options):
        self.stdout.write("consumer command")
        logger.critical("CorrelaotrConfig.ready VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV")
        import pika, sys, os
        
        import concurrent.futures

        # RabbitMQ connection parameters
        rabbitmq_host = 'localhost'
        queue_name = 'correlator2'
        
        from correlator.api.methods.mqmethods.mqrecievecallback import MqReceiveCallback

        def callback(ch, method, properties, body):
        
            logger.critical(" Received body = %s ", body)
            logger.critical(" [x] Received properties = %s ", properties)
            logger.critical(" [x] Received method = %s ", method)
            MqReceiveCallback.callback(ch, method, properties, body)


       # Function to consume messages from the RabbitMQ queue
        def consume():
            connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host))
            channel = connection.channel()
            channel.queue_declare(queue=queue_name)

            channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

            logger.critical(f"Waiting for messages in {queue_name}. To exit press CTRL+C")
            channel.start_consuming()

        # Set up a multithreaded daemon to monitor the RabbitMQ queue
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            future = executor.submit(consume)
            try:
                future.result()  # This will wait for the consume function to finish
                logger.critical("OOOOOOOOOOOOOOOOOOOOOOOOO thread completed! OOOOOOOOOOOOOOOOOOOO")
            except Exception as e:
                logger.error("OOOOOOOOOOOOOOOOOOOOOO error %s OOOOOOOOOOOOOOOOOOOO", e, exc_info=1)

        
            self.stdout.write(
                self.style.SUCCESS('Successfully started daemon ')
            )