Realtime GeoIPlocate with Django, websockets, Channels and redis

Hello everyone, I am developing a geolocation application that receives connection accesses to a server in real time, a constant log of Apache IPs is generated and from this constant log flow I use GEOIP2 to process this data and convert it into a json that would be sent to my front endthrough websocket using channels, which would show me a real-time world map similar to the bit defender threatmap: https://threatmap.bitdefender.com.

I created a function that simulates a tail, but I realized that when placing it in the view I have a problem in which the front end does not load.


def tail_apache_log(file_path, namefiles):
    print("COMEÇANDO TAILL")

    time_pattern = re.compile(r'\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2}')
    ip_pattern = re.compile(r'\d+\.\d+\.\d+\.\d+')
    result = f"{file_path}{namefiles}.log"

    with open(result, 'r', encoding='utf-8') as log_file:
            
            log_file.seek(0, 2)  # Vá para o final do arquivo (modo append)

            while True:
                
                line = log_file.readline()
                if not line:
                    
                    asyncio.sleep(1)
                    continue
                   
                time_match = time_pattern.search(line)
                ip_match = ip_pattern.search(line)

                if time_match and ip_match:
                    log_time = time_match.group(0)
                    log_ip = ip_match.group(0)
                    data_hora = converter_string_para_data(log_time)
                                

                if log_ip.startswith("10.") or log_ip.startswith("172."):
                    #print(f"ip interno {log_ip}")
                    pass
                    
                else:
                    lat, long, loc_country, location_city = getGeoIpData(log_ip)
    
                    #print(log_ip)
                
                    data_to_send = {'ip': log_ip, 'lat': (lat), 'long': long,'data': str(data_hora), 'ip': log_ip, 'country': loc_country, 'city': location_city}
            
                    #ENVIAR DADOS E DEPOIS VOLTAR NO LOOP

                    #ws_map.enviar_dados(data_to_send)
                    print(data_to_send)

I also thought about the alternative of calling the function in the consumer itself but I saw that this is not a good practice and it was also consuming a lot of CPU


class MapaConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.groupname = 'Pontos_Mapa'

        await self.channel_layer.group_add(
            self.groupname,
            self.channel_name,
        )
        log_thread = threading.Thread(target=self.tail_apache_log_thread)
        log_thread.daemon = True
        log_thread.start()

        await self.accept()
                               
    async def receive(self, text_data):
        await self.channel_layer.group_send(
            self.groupname,
            {
                'type': 'json_objects_tunel',
                'value': text_data,
            }
        )

    async def json_objects_tunel(self, event):
        print(event['value'])
        await self.send(event['value'])

    async def disconnect(self, close_code):
        print("\n =====CONSUMER DESCONECTADO======")
        print(f"Conexão Fechada: {close_code}\n")
        # pass
        await self.channel_layer.group_discard(
            self.groupname,
            self.channel_name,
        )

I thought about doing the function asynchronously but I’m not sure if there is a better alternative with Kafka (I’ve never used it before) or Celery. Can anyone get a light? or if I run tail asynchronously can I do it?

My view and class to create websocket connection

  def home2(request):
      #tail_apache_log(path,nome_arquivo)
      criando_classe_WS()
      context = {}
      return render(request, "site/index.html", context)

Class for connection


import websocket
import json
conections_list = []

def criando_classe_WS():
    ws1 = Conexao()
    ws1.criarConexao_Mapa()
    return ws1

class Conexao():
    def __init__(self):
        self.ws_obj = websocket.WebSocket()

    def criarConexao_Mapa(self):
        self.ws_obj.connect('ws://localhost:8000/ws/tables/')
        return self.ws_obj
        
    def send_data(self, data_to_send):
        #print(data_to_send)
        self.ws_obj.send(json.dumps(data_to_send))

If I needed to do something like this, I would set up the log monitor as an external worker process. This decouples the handling of the log file from everything else in your system, while still allowing it to issue messages through the channel layer. It also gives you a lot more flexibility in how you handle the log file.

Note: You may also need to handle those situations where the log file is rotated or otherwise reinitialized.

If you have access to the administration of the server (either you control the server or can request configuration changes from those who do), then what I would actually suggest is that you configure the web server to send the log data to you directly (in addition to, not instead of writing the log file), rather than you trying to continuously read the file.

Your log handler then becomes a udp listener for that data. (This is the way we do something very similar to this.)

1 Like

Ok, I got it, I’ll take a look at the Worker and the Background. I think that was exactly what was missing, I’ll implement it soon and bring you news. :slightly_smiling_face:

Regarding sending the data to the server, my application will run and read the logs directly on the server in real time, is there a problem?

Not really a problem, but you do need to ensure your code can handle log rotations and log-file resets. You may also want to verify that the web server isn’t using log-file buffering - that could create other potential issues.

1 Like

I couldn’t use it as the documentation, but instead I used celery and redis as a broker to do exactly what you said, it worked :).

in celery.py file

from __future__ import absolute_import, unicode_literals

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mapa.settings')

app = Celery('mapa')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

app.conf.beat_schedule={
    'tail_log_3s':{
        'task':'app_geoip.tasks.tail_apache_log',
        'schedule': 2.0
    }


}


app.autodiscover_tasks()

No arquivo de tarefas.py

BASE_DIR = os.path.dirname(os.path.dirname(__file__))
path = f'{BASE_DIR}/mapa/logs'
host_list = []
channel_layer = get_channel_layer()

count_list_country_hosts=[]


@shared_task
def tail_apache_log():
    
    time_pattern = re.compile(r'\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2}')
    ip_pattern = re.compile(r'\d+\.\d+\.\d+\.\d+')
    result = f"{path}/livelogs.log"
    

    with open(result, 'r', encoding='utf-8') as log_file:
            
            log_file.seek(0, 2)  # Vá para o final do arquivo (modo append)

            for i in range(55):
                
                line = log_file.readline()
                if not line:
                    
                    time.sleep(1)
                    continue
                   
                time_match = time_pattern.search(line)
                ip_match = ip_pattern.search(line)
                

                if time_match and ip_match:
                    log_time = time_match.group(0)
                    log_ip = ip_match.group(0)
                    data_hora = converter_string_para_data(log_time)
                                
                #print(log_ip)           

                if log_ip.startswith("10.") or log_ip.startswith("172."):
                    #print(f"ip interno {log_ip}")
                    pass
                    
                else:
                    g = GeoIP2()
                    location = g.city(log_ip)
                    lat = location["latitude"]
                    long = location["longitude"]
                    loc_country = location["country_name"]
                    if (loc_country == None):
                        loc_country == ""
                    location_city = location["city"]

                    data_to_send = {'ip': log_ip, 'lat': (lat), 'long': long,'data': str(data_hora), 'country': loc_country, 'city': location_city}
                    async_to_sync(criar_topIp_connections_TESTE(log_ip, loc_country, location_city, f'{BASE_DIR}/app_geoip/templates/site/top_10.txt'))
                    criar_top_country(loc_country, f'{BASE_DIR}/app_geoip/templates/site/country')
                    async_to_sync(channel_layer.group_send)('Pontos_Mapa', {'type': 'send_json', 'text': data_to_send})
                
                    time.sleep(0.1)