Implementing mutex with Channels

My DRF application includes some entities that could be (attempted to be) modified by more than one user at a time.

To prevent that, I want to implement some mechanism to grant mutual exclusion to the first user that opens the editor for that resource. The desired UX is that any other users waiting to acquire the lock will be kept on the editor page with a “Wait, another user is editing this resource notice.” As soon as the first user is done, one of the users in queue is let in. The ordering of the waiting users doesn’t really matter: this doesn’t have to be fair.

Here’s the solution I chose to implement: I’m looking for some guidance for fixing an issue that I’ll explain in a second.

The models that can be edited by more than one user inherit from this abstract model

class LockableModel(models.Model):
    """
    A model that is subject to concurrent editing requests in the application, and therefore
    needs to be accessed in mutual exclusion when writing to it. This class only contains
    bookkeeping variables regarding the ownership of the lock - it's up to the RESt API and WS
    to enforce the constraints
    """

    locked_by = models.ForeignKey(
        User,
        null=True,
        blank=True,
        on_delete=models.SET_NULL,
        related_name="locked_%(class)ss",
    )
    awaiting_users = models.ManyToManyField(
        User,
        blank=True,
        related_name="awaiting_on_%(class)ss",
    )

    class Meta:
        abstract = True

    def lock(self, user):
        if self.locked_by is None:
            self.locked_by = user
            self.save(update_fields=["locked_by"])
            return True

        if self.locked_by != user:
            self.awaiting_users.add(user)

        return self.locked_by == user

    def unlock(self, user):
        if self.locked_by == user:
            if self.awaiting_users.exists():
                first_in_line = self.awaiting_users.first()
                self.locked_by = first_in_line
                self.awaiting_users.remove(first_in_line)
            else:
                self.locked_by = None

            self.save(update_fields=["locked_by"])
            return True

        self.awaiting_users.remove(user)

        return self.locked_by is None

To provide the experience of waiting on the page and automatically have it unlocked when the first user leaves the editor, I chose to use djangochannelsrestframework, which defines consumers which automatically send an update when one or more models to which the client has subscribed are updated.

This way, I can notify any users in line when locked_by changes. If the new value of locked_by is the user that’s waiting on the page, the frontend enables all controls and we’re all happy.

The consumer goes something like this:

class BaseObserverConsumer(ObserverModelInstanceMixin, GenericAsyncAPIConsumer):
    """
    A consumer used to subscribe to changes to model instances.
    For model which are lockable, allows to enter mutex editing on the instance(s).
    """

    LOCK_BY_DEFAULT = True

    def __init__(self, *args, **kwargs):
        self.subscribed_instances = []
        self.locked_instances = []
        super().__init__(*args, **kwargs)

    def lock_instance(self, pk):
        return self.queryset.get(pk=pk).lock(self.scope["user"])

    def unlock_instance_or_give_up(self, pk):
        # unlocks an instance that the user has a lock on or removes the user
        # from its waiting queue if the lock hadn't been acquired yet
        return self.queryset.get(pk=pk).unlock(self.scope["user"])

    @action()
    async def subscribe_instance(self, request_id=None, **kwargs):
        lock = kwargs.get("lock", self.LOCK_BY_DEFAULT)
        pk = kwargs.get("pk", None)

        try:
            if lock:
                await database_sync_to_async(self.lock_instance)(pk)
                self.locked_instances.append(pk)

            response = await super().subscribe_instance(request_id, **kwargs)
            self.subscribed_instances.append(pk)
        except:
            await database_sync_to_async(self.unlock_instance_or_give_up)(pk)

        return response

    async def websocket_disconnect(self, message):
        for pk in self.locked_instances:
            await database_sync_to_async(self.unlock_instance_or_give_up)(pk)

        return await super().websocket_disconnect(message)

Essentially, when a client subscribes to an instance (i.e. when the user visits a page for a locked resource editor), the ws client sends a message to trigger a call to subscribe_instance, also trying to lock it.

When the client is disconnected, all instances locked by it are unlocked.

This works well most of the times, but sometimes I have found instances kept locked indefinitely. What I assume is happening (I am not able to re-create the issue reliably) is that something goes wrong during the ws disconnection, due to the ws protocol’s inherent unreliability, but I am not sure what.
Also, what may happen is that the client of the user who has the lock just stays connected without the user ever leaving the page, for hours on end, keeping the resource locked.

  1. what do you think the cause of the first issue may be? how would you try and solve it?
  2. how would you come up with a solution for the second issue (i.e. the lock is never released unless the user quits the page)? I thought about some possibile solutions like sending heartbeats, but they were all pretty complicated. Is there anything simpler that could be done to improve user experience with regard to this feature?
  1. I think you need to store locked_instances in some other manner. From my understanding, that is dependent on the process running your django application. You may have multiple servers and that connection may not necessarily be to the same one. You want to store these locked instances in the session or some other persistent store.

  2. I’d go with the heartbeats and an automatically expiring lock. Unfortunately, good user experiences are rarely simply implemented.

1 Like

Thank you!

The locked_instances variable should keep track of the variables locked by a single user, over a single connection to a single instance of the consumer.

Does it make sense the way I implemented it, or would this be affected by the issue you described? I thought a single user would only send message to a single process/consumer instance over the course of a connection.

I’m not sure. I can’t find anything in channels, daphne or uvicorn’s docs that would suggest that’s actually true. It feels like it should be, but I can’t say with any level of certainty.

It depends upon how the client was written and whether or not it opens up multiple sockets.

Yes, a single client operating on a single websocket is only going to be handled by a single consumer. But if the connection is interrupted and the socket needs to be re-opened, there is no intrinsic guarantee that it’s going to be reconnected to the same instance on the server. And, if the client is written such that it opens up multiple sockets, each one of those sockets create separate instances of the consumer in any of the running processes.

1 Like