Testing Channels Worker processes

Is there a clean & easy way to set up a test for a Channels Worker process?

In the ideal world, I could set up a test and run the (async) worker processes in the same event loop that pytest uses, communicating among those workers using the InMemoryChannelLayer.

I’m trying to use pytest. That may or may not be the wrong choice here.

Side note 1: This is the first time I’ve ever tried writing real tests for a real system. This will become more apparent if I need to ask follow-up questions on any information supplied.

I’ve tried configuring a fixture to start the worker process, but (appropriately enough) it doesn’t return so the tests don’t proceed.

I tried launching the test in a separate thread, but that throws an error about not having an event loop.

I tried creating a custom version of the runworker command’s handle method to pass the main thread’s event loop to it. At the time the fixure runs, there is no event loop.

I’ve backed off this idea for the sake of making progress with the tests themselves.
What I’m currently doing is running the tests against a separate runworker process through the redis channel layer. It works, and it’s actually quite satisfactory for what I am really trying to accomplish with this.

(That also means that I don’t have the code handy to show what I’ve tried. I can pull a copy for anyone wanting a good laugh.)

Side note 2: I’m not sending messages to the workers directly. The workers are driven by an AsyncWebsocketConsumer. In the real application, the browser sends the message to the consumer. The consumer decides which worker will handle each message, and effectively forwards the message to the appropriate worker. So my test creates the instance of the WebsocketCommunicator and sends the messages to it, allowing it to forward the messages as appropriate.
In the ideal world, I would have tests that put messages directly in the channel layer for each worker, testing them independently of the consumer, but that’s a different issue.

All thoughts, ideas, comments, suggestions, recommendations, conjectures, wild guesses, etc are appreciated. Given the current state of the project this is as much an academic exercise as anything else. I can continue with the process I have now and not lose any functionality that I’m concerned about.

Hey Ken. Interesting… — and still in folks are working it out territory, so … :stuck_out_tongue_winking_eye:

My first approach is to break it up. There’s a chain between the websocket consumer, the channel layer, and the backend worker. Checking that each one passes the baton along correctly is often enough here.

Your WebsocketCommunicator calls your consumer. You can mock the channel layer and check that the consumer calls it with the right message.

Let’s assume we can assume the channel layer is trustworthy.

Then in the worker we can give it mocked input and check that the behaviour is correct.

So far, so good. I expect you’ve done some/all of this.

Then more end-to-end, you want to spin up an actual worker. In channels.testing we have a LiveServerTestCase (which is like Django’s own, but using Daphne). There’s not yet a live worker version, but why not? (Such would spin up the working in setup and shut it down in tear down.)

So you’ve already tried this. No sure what’s going on there but django-pytest has a live server helper that basically does this, so you could hunt the pattern to use there I’d think :thinking:

I’d say for E2E tests the separate process route is the way to go…

Happy to try and input on more something more concrete if desired.

HTH C.

1 Like

That says more to me than you may realize…

e.g. There’s probably no “@adamchainz -quality” blog post describing what I’m looking to do. That’s ok - sometimes it’s fun to be in uncharted waters.

Thanks. I had looked at this, got the impression it didn’t really apply to what I was trying to do and moved on. (Good information, but I still don’t think it’s what I want.)

Also saw this, also made the spot assessment that it wasn’t the droid I was looking for and moved on. A second look today after what I’ve muddled through shows me that I was wrong. It appears it’s a lot closer to what I want.

I can appreciate that. I do understand the logic and reasoning behind it. It’s certainly also the easiest solution based on what I know at this point. However, I’m also stubborn enough to want to leave head-sized wall-dents in topics that intrigue me. I’m just curious enough about this to consider pursuing it further.

So there may be more to come…

If that’s “Hope …”, then yes - more than you can know.

Many thanks.

1 Like

Cool. Have fun! As I say, happy to try and input. Would be interested to see how you get on.

1 Like

Just a quick follow-up - I am still messing around with this, without too much real progress.

First, I’ve got the test client working well sending messages directly to, and reading the responses from, the channel layer directly, without going through a Websocket. (That means these tests don’t involve consumers at all.)

I can also start the Worker from a fixture. The problem here is that I’ve not yet figured out how to cleanly shut it down. I understand what the principles are behind the problems I’m encountering, but not quite seeing how to resolve them yet. I’m also not happy with what I’m having to do to run this. I’ve effectively recreated the live_server pytest fixture, with lots of changes to have it start a worker rather than a wsgi server.

This results in me starting up a thread, getting an event loop for that thread, and then starting the worker in that event loop.

This has a very bad smell to me.

For some reason I’ve got it in my head that, in theory, I should be able to run the worker in the same event loop that pytest uses, giving me more direct control over that task. That’s the direction I’m going to head next.

If nothing else, at least I’m learning a lot more about asyncio and pytest while I’m muddling my way through this, so it’s not a complete waste of time.

More to come…

Hey Ken.

Glad you’re making progress! By the end of this, we’ll be able to get you onboard with the Channels backlog :stuck_out_tongue_winking_eye:

OK… Perhaps I’m a bit lost by exactly what you mean by Run the worker:thinking:

As I’d describe it, the Worker is a process listening to a particular channel (name) (or set of those) and then routing any messages to a particular consumer.

So testing, I can either run the worker process, pop messages into the channel layer, and assert they’re correctly handled — this being more E2E style…

OR I can get the consumer and send messages to it directly, using ApplicationCommunicator — this letting my avoid running the worker but testing just my consumer logic. (I might prefer this way for development, then add an E2E smoke-test to make sure it was wired up correctly…)

:thinking:

Ok, there may be some terminology mismatch here - let me see if I can clarify without making things worse.

The overall architecture, as I’m using it:
Channels 1

So I’m using the term “Worker” in the context as described at Worker and Background Tasks — Channels 4.0.0 documentation

The consumers:

  • Receive messages from the browser(s),
  • Figures out which worker needs to process that message
  • Forwards the message to the right worker, via the channel layer

The guts of the work is being done in the workers.

What’s not shown in the diagram is that a worker, in the process of handling a message, could post additional messages to also be processed by the same worker, or by a different worker. So running the worker in this context means that the worker process is listening to 4 channels.

That’s what I’m wanting to test. I need to send a message to worker “A”, and (potentially) wait for responses from workers “A” and “B”.

One final aspect of this is that the logic for worker “A” is really quite trivial.

  • Read the message,
  • Get some state information from a Model,
  • Call a function as identified by the combination of message and state
    • Determine the new state,
  • Prepare a response by calling a different function

So what I’m really trying to test here is a state machine.

Testing the logic isn’t the issue. These tests are being created to test all the state transitions and verify that the state machine is accurate.

What I have working:
Current pytest
This works, but every time I make a change to the worker, I need to restart it before restarting the tests.

I wouldn’t really call this E2E, because I’m completely bypassing the Consumer layer. But I acknowledge that it’s not truly a “unit test” either because of the dispatch from worker “A” to worker “B”.
In theory, I should test the message to worker “B” separately from the message to worker “A”, and I’m not ruling that out as an option.

Where I’m trying to get to:
Desired

Now, if I start a separate thread within a fixture, I can (kinda, sorta, effectively) do this now. The problem is that I’m not cleanly stopping that thread after a test.

What I’d really like to get to:
Ultimate

And that’s where I have some ideas.

Also, in thinking more about it based on what you’ve said, I may try the even more simplified version:
Ultimate 1

Like I said - I ain’t done yet…

1 Like

And I’ve got it working!

Not sure I understand exactly why - or if there are better ways of doing this - but I’ve got a fixture working that starts a Worker task in the current event loop, communicating with my tests through the InMemoryChannelLayer. Success!

(Yea, I’m happy.)

I’m quite impressed by the performance, too. A simple “ping” / “pong” exchange from the test to/from a Worker completes in sub-MS times.

Thanks again for your thoughts, ideas, and wisdom. Now all I have to do is code up a 20-state, 400-transition state machine and ensure it all works…

1 Like

This is the fixture I’m currently using. It works for me, I don’t know where the dark corners and scary monsters may be residing. What I did was follow the chain of events from manage.py, through runworker.py, worker.py, and into the run method of server.py. I got rid of all the configuration handling code, since I decided it wasn’t relevant in a “test” environment, and reduced everything else down to the minimum amount of code that was absolutely required for it to work.

What I ended up with is this:

# Among other imports
from channels.layers import get_channel_layer
from channels.routing import get_default_application
from channels.worker import Worker
import pytest_asyncio

@pytest.fixture(scope="session")
def event_loop():
    loop = asyncio.get_event_loop_policy().new_event_loop()
    return loop

@pytest_asyncio.fixture(scope="session", autouse=True)
async def my_live_server(event_loop):
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mpge_channels.settings')
    server = Worker(
        application=get_default_application(),
        channels=['channel-1', 'channel-2'], # As defined in asgi.py
        channel_layer=get_channel_layer(),
    )
    checker_task = asyncio.create_task(server.application_checker())
    handle_task =  asyncio.create_task(server.handle())

    yield handle_task

    checker_task.cancel()
    handle_task.cancel()
    asyncio.gather(checker_task, handle_task, return_exceptions=True)

A test can then look something like this:

@pytest.mark.django_db(databases=['default'])
@pytest.mark.asyncio
async def test_channel_layer():
    channel_layer = get_channel_layer()
    channel_name = await channel_layer.new_channel()
    await channel_layer.send(
        'channel-1',
        {
            "type": "channel.ping",
            "channel": channel_name,
            "text": "This is a test"
        }
    )
    try:
        response = await asyncio.wait_for(channel_layer.receive(channel_name), 1)
        # verify the response
    except asyncio.exceptions.TimeoutError:
        # No response received in time

Hey @KenWhitesell — this is good stuff!

From your long post the other day, I was going to suggest both these points — but you made it first :smiley:

I shall be bookmarking this topic, and sending folks this way. It would make a super talk/post/…