Recently, I was experimenting with writing Python asynchronous scripts, and one particular case in which I was interested is emitting events and processing them with listeners.

My objective was to investigate how event listeners would behave when the emitter is faster than the listeners. Would they queue events and process them as a kind of FIFO queue? Or would they handle events simultaneously in parallel? Or is there another possibility? But first, let’s examine how I arrived at using specific library and workflow.

Initially, I considered using part of the standard Python library, asyncio, particularly the Event class. However, I realized that it is more focused on coroutine synchronization (such as signaling when a resource has become available) rather than implementing a true emit-subscribe model.

Below is an example of using standard Python events

import asyncio
import random

async def waiter1(event):
    while True:
        print('waiting for it 1 ...')
        await event.wait()
        print('... got it 1!')
        event.clear()

async def waiter2(event):
    while True:
        print('waiting for it 2 ...')
        await event.wait()
        print('... got it 2!')
        event.clear()

async def emitter(event):
    while True:
        await asyncio.sleep(random.uniform(1, 3))
        print("Event emitted!")
        event.set()

async def main():
    event = asyncio.Event()

    waiter_task1 = asyncio.create_task(waiter1(event))
    waiter_task2 = asyncio.create_task(waiter2(event))

    emitter_task = asyncio.create_task(emitter(event))

    await asyncio.gather(waiter_task1, waiter_task2, emitter_task)

asyncio.run(main())

Which provides a fairly straightforward output:

waiting for it 1 ...
waiting for it 2 ...
Event emitted!
... got it 1!
waiting for it 1 ...
... got it 2!
waiting for it 2 ...

Obviously, in real life, we would have different types of events handled by different listeners. We can achieve this by creating a second event instance and passing it to the emitters and listeners that depend on it. It might look something like that:

import asyncio
import random

async def waiter1(event1):
    while True:
        print('waiting for event1 ...')
        await event1.wait()
        print('... got event1!')
        event1.clear()

async def waiter2(event1):
    while True:
        print('waiting for event1 ...')
        await event1.wait()
        print('... got event1!')
        event1.clear()

async def waiter3(event2):
    while True:
        print('waiting for event2 ...')
        await event2.wait()
        print('... got event2!')
        event2.clear()

async def emitter1(event1):
    while True:
        await asyncio.sleep(random.uniform(1, 3))
        print("========================== Event1 emitted! ==========================")
        event1.set()

async def emitter2(event2):
    while True:
        await asyncio.sleep(random.uniform(2, 4))
        print("========================== Event2 emitted! ==========================")
        event2.set()

async def main():
    event1 = asyncio.Event()
    event2 = asyncio.Event()

    waiter_task1 = asyncio.create_task(waiter1(event1))
    waiter_task2 = asyncio.create_task(waiter2(event1))
    waiter_task3 = asyncio.create_task(waiter3(event2))

    emitter_task1 = asyncio.create_task(emitter1(event1))
    emitter_task2 = asyncio.create_task(emitter2(event2))

    await asyncio.gather(waiter_task1, waiter_task2, waiter_task3, emitter_task1, emitter_task2)

asyncio.run(main())

This doesn’t look bad, although we need to pass event instances back and forth, which can be error-prone. However, the real problem is that this type of event does not support passing data along with the events. To solve this, we can use queues instead of events, which allow data to be passed with messages. An example is shown below:

import asyncio
import random

async def waiter1(queue1):
    while True:
        data = await queue1.get()
        print(f'... got event1 with data: {data}!')

async def waiter2(queue2):
    while True:
        data = await queue2.get()
        print(f'... got event1 with data: {data}!')

async def waiter3(queue3):
    while True:
        data = await queue3.get()
        print(f'... got event2 with data: {data}!')

async def emitter1(queue1, queue2):
    while True:
        await asyncio.sleep(random.uniform(1, 3))
        data = random.randint(1, 100)
        print(f"=================== Event1 emitted with data: {data} ===================")
        await queue1.put(data)
        await queue2.put(data)

async def emitter2(queue3):
    while True:
        await asyncio.sleep(random.uniform(2, 4))
        data = random.randint(100, 200)
        print(f"=================== Event2 emitted with data: {data} ===================")
        await queue3.put(data)

async def main():
    queue1 = asyncio.Queue()
    queue2 = asyncio.Queue()
    queue3 = asyncio.Queue()

    waiter_task1 = asyncio.create_task(waiter1(queue1))
    waiter_task2 = asyncio.create_task(waiter2(queue2))
    waiter_task3 = asyncio.create_task(waiter3(queue3))

    emitter_task1 = asyncio.create_task(emitter1(queue1, queue2))
    emitter_task2 = asyncio.create_task(emitter2(queue3))

    await asyncio.gather(waiter_task1, waiter_task2, waiter_task3, emitter_task1, emitter_task2)

asyncio.run(main())

This approach works, but notice that we were forced to create three queues for two event types, because once waiter1 received a message from the queue, there was nothing left for waiter2. And what if we have five listeners for event1 and three listeners for event2? In that case, we need to create eight (!) queues and pass them to the waiters and emitters. Each new listener or event type only makes it worse, as it becomes difficult to keep track of what is emitting what and who is listening to what.

So, it seems like queues are not the right choice, and we need a more robust event handling solution. After some Googling, I discovered that something like pyee is likely what we need.

If we rewrite this using pyee events, it might look something like this:

import asyncio
import random
from pyee.asyncio import AsyncIOEventEmitter

emitter = AsyncIOEventEmitter()


async def waiter1():
    emitter.on('event1', lambda data: print(f'... got event1 with data: {data}!'))


async def waiter2():
    emitter.on('event1', lambda data: print(f'... got event1 with data: {data}!'))


async def waiter3():
    emitter.on('event2', lambda data: print(f'... got event2 with data: {data}!'))


async def emitter1():
    while True:
        await asyncio.sleep(random.uniform(1, 3))
        data = random.randint(1, 100)
        print(f"=================== Event1 emitted with data: {data} ===================")
        emitter.emit('event1', data)


async def emitter2():
    while True:
        await asyncio.sleep(random.uniform(2, 4))
        data = random.randint(100, 200)
        print(f"=================== Event2 emitted with data: {data} ===================")
        emitter.emit('event2', data)


async def main():
    waiter_task1 = asyncio.create_task(waiter1())
    waiter_task2 = asyncio.create_task(waiter2())
    waiter_task3 = asyncio.create_task(waiter3())

    emitter_task1 = asyncio.create_task(emitter1())
    emitter_task2 = asyncio.create_task(emitter2())

    await asyncio.gather(waiter_task1, waiter_task2, waiter_task3, emitter_task1, emitter_task2)


asyncio.run(main())

It doesn’t seem like the emitters or waiters were simplified much, but that wasn’t out goal. Our aim was to avoid multiplying queues with every additional waiter or emitter. Now that we have a simple working script that we are satisfied with, we can do some experiments to address the question we started with.

Let me remind you that I was particularly interested in answering the question: “How does this work with fast emitters and slow handlers?” As a newbie to asynchronous programming, and someone who last touched Python years ago for a now-defunct Django project, I created something similar to the script above, emulating “slow listeners” with sleep(), but it didn’t work at all. Its just emitted the events, and that was it.

import asyncio
import random
from pyee.asyncio import AsyncIOEventEmitter

emitter = AsyncIOEventEmitter()


async def waiter1():
    async def handler(data):
        await asyncio.sleep(1)
        print(f'... got event1 with data: {data}!')
    emitter.on('event1', handler)

async def waiter2():
    async def handler(data):
        await asyncio.sleep(1)
        print(f'... got event1 with data: {data}!')
    emitter.on('event1', handler)

async def waiter3():
    async def handler(data):
        await asyncio.sleep(1)
        print(f'... got event2 with data: {data}!')
    emitter.on('event2', handler)


async def emitter1():
    for i in range(10):
        data = random.randint(1, 100)
        print(f"=================== Event1 emitted with data: {data} ===================")
        emitter.emit('event1', data)


async def emitter2():
    for i in range(10):
        data = random.randint(1, 100)
        print(f"=================== Event2 emitted with data: {data} ===================")
        emitter.emit('event2', data)


async def main():
    waiter_task1 = asyncio.create_task(waiter1())
    waiter_task2 = asyncio.create_task(waiter2())
    waiter_task3 = asyncio.create_task(waiter3())

    emitter_task1 = asyncio.create_task(emitter1())
    emitter_task2 = asyncio.create_task(emitter2())

    await asyncio.gather(waiter_task1, waiter_task2, waiter_task3, emitter_task1, emitter_task2)


asyncio.run(main())

It turns out that the emitter scheduled the handlers to execute, but asyncio.gather() doesn’t care for sleeping tasks, so if they don’t do anything, the script just finishes its job. We didn’t have this problem with solution using queues because we had infinite cycles there, which we don’t have here. So, for simplicity’s sake, I decided to add a “keep-alive task” with an infinite loop to prevent the script from stopping and killing the handlers prematurely.

So let’s do that, add some verbose timing prints, and see what happens.

import asyncio
from datetime import datetime

from pyee.asyncio import AsyncIOEventEmitter

emitter = AsyncIOEventEmitter()


async def waiter1():
    async def handler(data):
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f'{current_time} waiter1 started processing event1 with data: {data}!')
        await asyncio.sleep(1)
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f'{current_time} waiter1 ended started processing event1 with data: {data}!')
    emitter.on('event1', handler)


async def waiter2():
    async def handler(data):
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f'{current_time} waiter2 started processing event1 with data: {data}!')
        await asyncio.sleep(1)
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f'{current_time} waiter2 ended started processing event1 with data: {data}!')
    emitter.on('event1', handler)


async def waiter3():
    async def handler(data):
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f'{current_time} waiter3 started processing event2 with data: {data}!')
        await asyncio.sleep(1)
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f'{current_time} waiter3 ended started processing event2 with data: {data}!')
    emitter.on('event2', handler)


async def emitter1():
    for i in range(10):
        print(f"=================== Event1 emitted with data: {i} ===================")
        emitter.emit('event1', i)


async def emitter2():
    for i in range(10):
        print(f"=================== Event2 emitted with data: {i} ===================")
        emitter.emit('event2', i)

async def infinite_sleep():
    while True:
        await asyncio.sleep(3600)


async def main():
    keep_alive_task = asyncio.create_task(infinite_sleep())

    waiter_task1 = asyncio.create_task(waiter1())
    waiter_task2 = asyncio.create_task(waiter2())
    waiter_task3 = asyncio.create_task(waiter3())

    emitter_task1 = asyncio.create_task(emitter1())
    emitter_task2 = asyncio.create_task(emitter2())

    await asyncio.gather(keep_alive_task, waiter_task1, waiter_task2, waiter_task3, emitter_task1, emitter_task2)


asyncio.run(main())

And we got this output

=================== Event1 emitted with data: 0 ===================
=================== Event1 emitted with data: 1 ===================
=================== Event1 emitted with data: 2 ===================
...........................
=================== Event2 emitted with data: 0 ===================
=================== Event2 emitted with data: 1 ===================
=================== Event2 emitted with data: 2 ===================
...........................
2024-09-21 15:38:23.635265 waiter1 started processing event1 with data: 0!
2024-09-21 15:38:23.635292 waiter2 started processing event1 with data: 0!
2024-09-21 15:38:23.635303 waiter1 started processing event1 with data: 1!
2024-09-21 15:38:23.635312 waiter2 started processing event1 with data: 1!
...........................
2024-09-21 15:38:23.635474 waiter3 started processing event2 with data: 8!
2024-09-21 15:38:23.635484 waiter3 started processing event2 with data: 9!
2024-09-21 15:38:24.636900 waiter1 ended started processing event1 with data: 0!
2024-09-21 15:38:24.637172 waiter2 ended started processing event1 with data: 0!
...........................

Well, it looks like the handlers actually work in parallel. Or do they? Perhaps it’s just because we used async sleep, and the event loop decided it could run other handlers in the meantime? Let’s find out by trying something that isn’t asynchronous.

import asyncio
from datetime import datetime

from pyee.asyncio import AsyncIOEventEmitter

emitter = AsyncIOEventEmitter()


async def waiter1():
    async def handler(data):
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f'{current_time} waiter1 started processing event1 with data: {data}!')
        total = 0
        for i in range(5_000_000):
            total += i * i
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f'{current_time} waiter1 ended started processing event1 with data: {data}!')
    emitter.on('event1', handler)


async def waiter2():
    async def handler(data):
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f'{current_time} waiter2 started processing event1 with data: {data}!')
        total = 0
        for i in range(5_000_000):
            total += i * i
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f'{current_time} waiter2 ended started processing event1 with data: {data}!')
    emitter.on('event1', handler)


async def waiter3():
    async def handler(data):
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f'{current_time} waiter3 started processing event2 with data: {data}!')
        total = 0
        for i in range(5_000_000):
            total += i * i
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f'{current_time} waiter3 ended started processing event2 with data: {data}!')
    emitter.on('event2', handler)


async def emitter1():
    for i in range(10):
        print(f"=================== Event1 emitted with data: {i} ===================")
        emitter.emit('event1', i)


async def emitter2():
    for i in range(10):
        print(f"=================== Event2 emitted with data: {i} ===================")
        emitter.emit('event2', i)

async def infinite_sleep():
    while True:
        await asyncio.sleep(3600)


async def main():
    keep_alive_task = asyncio.create_task(infinite_sleep())

    waiter_task1 = asyncio.create_task(waiter1())
    waiter_task2 = asyncio.create_task(waiter2())
    waiter_task3 = asyncio.create_task(waiter3())

    emitter_task1 = asyncio.create_task(emitter1())
    emitter_task2 = asyncio.create_task(emitter2())

    await asyncio.gather(keep_alive_task, waiter_task1, waiter_task2, waiter_task3, emitter_task1, emitter_task2)


asyncio.run(main())

The code above returned this output

2024-09-21 16:23:37.229215 waiter1 started processing event1 with data: 0!
2024-09-21 16:23:37.436909 waiter1 ended started processing event1 with data: 0!
2024-09-21 16:23:37.436951 waiter2 started processing event1 with data: 0!
2024-09-21 16:23:37.641579 waiter2 ended started processing event1 with data: 0!
2024-09-21 16:23:37.641636 waiter1 started processing event1 with data: 1!
2024-09-21 16:23:37.848059 waiter1 ended started processing event1 with data: 1!
2024-09-21 16:23:37.848091 waiter2 started processing event1 with data: 1!
2024-09-21 16:23:38.051849 waiter2 ended started processing event1 with data: 1!
...........................

And my assumption that the execution was parallel due to the use of async sleep (don’t forget, I’m a newbie with async) was indeed correct. What’s more interesting for me to note here is the fact that waiter2, despite being in a separate task, still waited for the first task to finish. So, asynchronous execution indeed is not the same as parallel execution. We also need to remember that if one of the listeners has an asynchronous network request, that’s fine - other listeners will still be able to do their jobs. However, if we perform some synchronous, CPU-heavy pandas manipulation, we could run into trouble and be unable to handle a critical event in time.

So, thats it - lots of seemingly obvious “discoveries”, but I believe the best way to master something is to experiment with it. The only better way is to write in down, which I’m obviously doing 😁