Force queries in a context to execute in the named database

I have this code block that executes multiple separate queries:

Model1.objects.filter(...)
Model2.objects.filter(...)

I want all queries in this block to use a specific database. One way to do it is to explicitly specify the database name for each queryset:

Model1.objects.using('replica').filter(...)
Model2.objects.using('replica').filter(...)

I was wondering what would be the best way to achieve this API instead:

with using_database('replica'):
    Model1.objects.filter(...)
    Model2.objects.filter(...)

All queries inside the context using_database should be directed to the named connection, in this case ‘replica’. Ideally, the context would route to the provided database only if using= was not explicitly provided on the queryset.

I’ve tried using connection.execute_wrapper:

# NOTE: NOT WORKING AND PROBABLY WRONG!
def use_database(connection_name: str) -> AbstractContextManager[None]:
    """Force all queries in the context to execute in the named database.

    Usage:

        with use_database('replica'):
            # ... queries will use the provided connection 
            # ... unless `using` is explicitly provided.
    """
    def _handler(execute, sql, params, many, context):
        connection = db_connections[connection_name],
        return execute(sql, params, many, {
            **context,
            'connection': connection,
            'cursor': connection.cursor(),
        })
    return db_connection.execute_wrapper(_handler)

That didn’t work. I assume because the wrapper is attached to the connection, which is what we are looking to control.
Another approach would be to register a database router while inside the context, is it doable?

Would appreciate any idea.

1 Like

Welcome @hakib !

That’s an interesting idea, but I can’t think of any solution that isn’t going to be subject to a race condition or other thread-related issues.

I like the thought and concept, but I don’t believe it’s practical with the Django ORM.

<conjecture>

I think one of the issues with trying to do something like this is going to be that the queryset statement alone does not cause the queryset to execute. In other words:
qs1 = Model1.objects.get(id=1)
does not result in a query being executed.

So, that raises the issue of what would the semantics be of:

with using_database('replica'):
    qs1 = Model1.objects.filter(a_field="something")

result1 = list(qs1)

vs, say:

with using_database('replica'):
    qs1 = Model1.objects.filter(a_field="something")
    result1 = list(qs1)

This implies to me that to be consistent, this would be something needing to be handled by the model Manager - that the queryset being created is created with the using clause applied.

That just creates other issues, because the model manager is effectively a singleton. The manager reference in the class definition of the model is a class variable and not an instance variable - which means that making any change to the manager is going to affect all queries being performed at that time. This leads me to conclude that you would need to pass that context variable to the manager in the method being called - which is what the using method does anyway - so you would just be reinventing the wheel.

</conjecture>

Standard disclaimer - I’m far from being an expert in the ORM - I may be missing some option entirely - maybe one of our more knowledgeable members may chime in here with a more informed opinion.

Hi Ken,
Thank you for the detailed answer. I suspect you are right! I don’t think there is an elegant way of doing this. I also agree that lazy evalutation makes the bahviour of such function a bit confusing.

Another way to approach this is to block all other databases other than database in an allow list:

from contextlib import AbstractContextManager, ExitStack
from django.db import connections as db_connections


def allow_databases(allow_names: set[str]) -> AbstractContextManager[None]:
    """Allow only queries against the provided named databases.

    Usage:
        with allow_databases({'replica'}):
            # ... queries that use a database other than "replica" will be blocked.
    """
    def _block(alias: str) -> None:
        raise Exception("Access to database '{}' blocked here".format(alias))

    stack = ExitStack()
    for alias in db_connections:
        if alias in allow_names: 
            continue
        stack.enter_context(db_connections[alias].execute_wrapper(lambda *args, alias=alias: _block(alias)))
    return stack

Using this context we can prevent use of any other database in a specific code block. You would still have to explicitly provide using(...) to all the querysets, but atleast you’ll get an exception when you don’t.

For example, to make sure a block only executes queries against the replica:

>>> from django.contrib.auth.models import Group
>>> Group.objects.get(pk=1)
<Group: Support>

>>> with allow_databases({'replica'}):
>>>     Group.objects.get(pk=1)
Exception: Access to database 'default' blocked here

>>> with allow_databases({'replica'}):
>>>     Group.objects.using('replica').get(pk=1)
<Group: Support>

You’re looking at this in the shell, which forces the evaluation of the query in order to generate the output.

What happens if you were to try:

Hi @hakib — welcome! Nice question!

Automatic routing says router to me… — reading the discussion so far, that seems right.

The only half-thought I have is to register a default router that looks for a (Local? :thinking:) flag controlled by a context manager to switch to the replica DB. (This wouldn’t solve Ken’s concern about executing the query after leaving the context manager, but maybe you could live with that in your case.)

(I’m not sure I like this solution. It’s kind of action at a distance.)

This specific example uses get() so not lazy.

But your right, if this was something like this:

>>> with allow_databases({'replica'}):
>>>     qs1 = Group.objects.filter(pk=1)
>>>
>>> print(qs1)

Than yes, it won’t get blocked. Understanding how querysets are evaluated I wouldn’t expect it to either.

I also thought about having a router looking at some global set by a context, but like you, I felt like it might be a step too far :wink:

Hey, I actually opened a ticket about this a while ago:

We actually recently deployed a ‘monkey-patch’ to do this kind of stuff in our project:

If you’re only interested on overriding ORM queries, you could probably still it down to overriding the database router part:

import contextvars
from contextlib import contextmanager


_dbname_var = contextvars.ContextVar("dbname")
_readonly_var = contextvars.ContextVar("readonly")


@contextmanager
def db_override(dbname, readonly=False):
    dbname_token = _dbname_var.set(dbname)
    readonly_token = _readonly_var.set(readonly)
    try:
        yield
    finally:
        _dbname_var.reset(dbname_token)
        _readonly_var.reset(readonly_token)


class OverrideRouter:
    def db_for_read(self, *args, **kwargs):
        if dbname := _dbname_var.get(None):
            return dbname

    def db_for_write(self, *args, **kwargs):
        if dbname := _dbname_var.get(None):
            if _readonly_var.get(False):
                from django.db import InterfaceError

                raise InterfaceError(f"current db context does not allows writes to '{dbname}'")

            return dbname

However, any usage of transaction.atomic, connection.cursor or a lot of Django extensions usage might end up using the default DB (which is why I think routers right now are not that useful).

1 Like

Hi,
I can’t speak of any side effects but for the simple use case what you propose works.

Added this router and context processor:

# common.db.py
from contextlib import contextmanager
import contextvars


_dbname_var = contextvars.ContextVar('dbname')


@contextmanager
def db_override(dbname):
    dbname_token = _dbname_var.set(dbname)
    try:
        yield
    finally:
        _dbname_var.reset(dbname_token)


class OverrideRouter:
    def db_for_read(self, *args, **kwargs):
        if dbname := _dbname_var.get(None):
            print('using ', dbname)
            return dbname

    def db_for_write(self, *args, **kwargs):
        if dbname := _dbname_var.get(None):
            print('using ', dbname)
            return dbname

    def allow_relation(self, obj1, obj2, **hints):
        return None

    def allow_migrate(self, db, app_label, **hints):
        return None

    def allow_migrate_model(self, db, model):
        return None

Register the router in settings.py:

DATABASE_ROUTERS = [
    'common.db.OverrideRouter',
]

Now test the 3 cases:

In [1]: from common.db import db_override

# Not using context, default router is used
In [2]: User.objects.count()
Out[2]: 376

# Using context w/o explicitly specifying the dbname to the queryset, used replica
In [3]: with db_override('read_replica'):
   ...:     User.objects.count()
   ...: 
using  read_replica

# Using context and explicitly specifying a dbanem to use which is different than the context, the explicitly states db is used
In [4]: with db_override('read_replica'):
   ...:     User.objects.using('default').count()
   ...: 

# Combine both
In [5]: with db_override('read_replica'):
   ...:     User.objects.using('default').count()
   ...:     User.objects.count()
   ...: 
using  read_replica

Thanks!