Hello everyone! Today, I’d like to explore how to handle graceful shutdowns in networked asynchronous applications. As always, the best way to deeply understand a concept is to articulate clearly in writing.

Specification

Goal: Create an application to manage tasks on a remote server (via WebSockets/API) and handle their lifecycle.

1) Local task tracking: Maintain a record of all tasks within the application. 2) Remote Task Creation: Sends tasks to the remote server via WebSockets. 3) Asynchronous Updates: Handle real-time task status updates sent by remote server at unpredictable intervals. 4) Task completion monitoring: Wait for every task to reach a final status (success, failure, etc). 5) Graceful Shutdown: Terminate the application only after all tasks have completed

Getting Started

Let’s begin by writing a WebSocket client that connects to an echo server, sends a message, and prints responses. Since I’m new to this, we’ll start with the simplest implementation.

Step 1: Setup

We’ll use the websockets library for Python. Install it first:

pip install websockets

Step 2: Basic WebSocket Client

import asyncio

import websockets


async def main():
    url = "wss://ws.postman-echo.com/raw"

    async with websockets.connect(url) as websocket:
        message = "Testing websockets!"

        await websocket.send(message)
        print("Message was send")

        response = await websocket.recv()
        print(f"Received response: {response}")


asyncio.run(main())

After running the script, we get the following output:

Message was send
Received response: Testing websockets!

Progress! 🎉

Tasks

Our tasks can have the following statuses:

  • New (before being sent to the remote server)
  • Processing (after being sent to the remove server)
  • Processed (successful completion)
  • Failed (unsuccessful processing)

Suppose our tasks can have several statuses

  • New (before it was sent to remote service)
  • Process (after is sent)
  • Processed (when it processed by remote service)
  • Failed (when remote processing has failed)

Starting Simple: Building an Echo Server

We need to walk before we can run. Let’s first recreate a basic echo server to understand WebSockets fundamentals.

import asyncio
from typing import Any

import websockets


async def echo(websocket: Any) -> None:
    while True:
        message = await websocket.recv()
        await websocket.send(message)


async def main() -> None:
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

This basic server simply echoes messages back to client:

Message was send
Received response: Testing websockets!

Task Overview

Now it’s time to add minimal support for task handling.

Server Code

Below is the server implementation

import asyncio
import json
from typing import Any, Dict

import websockets

task_data_storage: Dict = {}

async def handle_task(websocket: Any) -> None:
    while True:
        task_data_json = await websocket.recv()
        task: Dict = json.loads(task_data_json)

        print(f"Received task: {task}")

        if task["id"] not in task_data_storage:
            task_data_storage[task["id"]] = task

            await websocket.send(json.dumps({"id": task["id"], "status": "processed"}))


async def main() -> None:
    async with websockets.serve(handle_task, "localhost", 8765):
        await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

Client Code

Here’s the corresponding client implementation:

import asyncio
import json

import websockets


async def main():
    url = "ws://localhost:8765"

    async with websockets.connect(url) as websocket:
        message = json.dumps({"id": 1})

        await websocket.send(message)
        print("task was send")

        response = await websocket.recv()
        print(f"Received response: {response}")


asyncio.run(main())

When we send a new task to the server, we get the following output:

Received task: {'id': 1}
connection handler failed
Traceback (most recent call last):
  File "/path/graceful_stop/.venv/lib/python3.12/site-packages/websockets/asyncio/server.py", line 374, in conn_handler
    await self.handler(connection)
  File "/path/graceful_stop/server.py", line 11, in handle_task
    task_data_json = await websocket.recv()
                     ^^^^^^^^^^^^^^^^^^^^^^
  File "/path/graceful_stop/.venv/lib/python3.12/site-packages/websockets/asyncio/connection.py", line 313, in recv
    raise self.protocol.close_exc from self.recv_exc
websockets.exceptions.ConnectionClosedOK: received 1000 (OK); then sent 1000 (OK)

The server currently fails to handle client disconnections, resulting in the error.

This error occurs because the server does not properly handle the ConnectionClosedOK exception, which is raised when a client disconnects normally. Since disconnection is an expected event, it should not be treated as a critical error. Instead, we handle it gracefully using a try-catch block.

Below is the updated server code, which includes error handling for the ConnectionClosedOK exception.

import asyncio
import json
from typing import Any, Dict

import websockets
from websockets import ConnectionClosedOK

task_data_storage: Dict = {}

async def handle_task(websocket: Any) -> None:
    try:
        while True:
            task_data_json = await websocket.recv()
            task: Dict = json.loads(task_data_json)

            print(f"Received task: {task}")

            if task["id"] not in task_data_storage:
                task_data_storage[task["id"]] = task

                await websocket.send(json.dumps({"id": task["id"], "status": "processed"}))
    except ConnectionClosedOK:
        print("Connection closed")


async def main() -> None:
    async with websockets.serve(handle_task, "localhost", 8765):
        await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

New Goal: Adding Task Processing Simulation

To make the server more dynamic, we want to introduce a delay in task responses. Instead of immediately responding with a “processed” status upon receiving a message, the server will now simulate task processing. For instance, if a client sends 100 tasks to a server, each task will be “processed” for some time before the server responds with the result, indicating whether the task failed of succeeded.

Here’s how the server can look with the new behaviour:

import asyncio
import json
import random
from typing import Any, Dict

import websockets
from websockets import ConnectionClosedOK



async def send_task_status_updates(websocket: Any, task_id: int) -> None:
    process_status_repeat_number = random.randint(1, 10)

    for i in range(process_status_repeat_number):
        await asyncio.sleep(random.uniform(0.5, 5))
        await websocket.send(json.dumps({
            "task_id": task_id,
            "status": "processing"
        }))

    await asyncio.sleep(random.uniform(0.5, 5))

    await websocket.send(json.dumps({
        "task_id": task_id,
        "status": random.choice(["success", "fail"])
    }))



async def handle_task(websocket: Any) -> None:
    try:
        while True:
            task_data_json = await websocket.recv()
            task: Dict = json.loads(task_data_json)

            print(f"Received task: {task}")

            asyncio.create_task(send_task_status_updates(websocket, task["task_id"]))
    except ConnectionClosedOK:
        print("Connection closed")


async def main() -> None:
    async with websockets.serve(handle_task, "localhost", 8765):
        await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

Here’s how a simple client could look to interact with the updated server:

import asyncio
import json

import websockets


async def task_sender(websocket):
    for task_id in range(1, 100):
        await asyncio.sleep(0.1)

        message = json.dumps({"task_id": task_id})
        await websocket.send(message)
        print(f"task {task_id} sent")

    print(f"All tasks done")


async def update_handler(websocket):
    while True:
        message_json = await websocket.recv()
        message = json.loads(message_json)

        print(f"Received message: {message}")


async def main():
    url = "ws://localhost:8765"

    async with websockets.connect(url) as websocket:
        sender = task_sender(websocket)
        updated = update_handler(websocket)

        await asyncio.gather(sender, updated)


asyncio.run(main())

Everything looks great! We now have tasks that are “processed” on the server for some time before receiving their final status.

Received message: {'task_id': 82, 'status': 'processing'}
Received message: {'task_id': 76, 'status': 'fail'}
Received message: {'task_id': 95, 'status': 'fail'}
Received message: {'task_id': 52, 'status': 'success'}
Received message: {'task_id': 83, 'status': 'processing'}
Received message: {'task_id': 41, 'status': 'success'}
Received message: {'task_id': 83, 'status': 'processing'}
Received message: {'task_id': 82, 'status': 'fail'}
Received message: {'task_id': 83, 'status': 'processing'}
Received message: {'task_id': 83, 'status': 'success'}

And let’s add one more feature. We want our client to stop, but only after it has received the final statuses for all tasks and saved them to a CSV file. The CSV should include the task ID, status and the datetime when the final status was received.

Our server code remains unchanged, and we only need to update our client code, which now looks like this:

import asyncio
import csv
import json
from datetime import datetime
from typing import Dict, Set, List

import websockets


async def task_sender(websocket, task_ids: List[int]):
    for task_id in task_ids:
        await asyncio.sleep(0.1)

        message = json.dumps({"task_id": task_id})
        await websocket.send(message)
        print(f"task {task_id} sent")

    print(f"All tasks done")


async def update_handler(websocket, results: Dict[int, dict], pending_tasks: Set[int]):
    while True:
        message_json = await websocket.recv()
        message = json.loads(message_json)

        results[message["task_id"]] = {
            "status": message["status"],
            "timestamp": datetime.now().isoformat(),
        }

        if message["status"] in ("success", "fail") and message["task_id"] in pending_tasks:
            pending_tasks.remove(message["task_id"])
            print(f"Task {message["task_id"]} completed with {message["status"]}")

            if not pending_tasks:
                print("All tasks have been completed.")
                break
        else:
            print(f"Task {message["task_id"]} got status {message["status"]}")


def save_to_csv(results: Dict[int, dict]):
    with open("task_results.csv", "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["Task ID", "Status", "Completion Time"])
        for task_id in sorted(results):
            writer.writerow([
                task_id,
                results[task_id]["status"],
                results[task_id]["timestamp"]
            ])

    print("Results saved to task_results.csv")


async def main():
    url = "ws://localhost:8765"

    task_ids = list(range(100))
    results: Dict[int, dict] = {}
    pending_tasks: Set[int] = set(task_ids)

    async with websockets.connect(url) as websocket:
        sender = task_sender(websocket, task_ids)
        updated = update_handler(websocket, results, pending_tasks)

        await asyncio.gather(sender, updated)

        while pending_tasks:
            await asyncio.sleep(0.1)
            print(f"Waiting for {len(pending_tasks)} tasks...")

    save_to_csv(results)


asyncio.run(main())

Now our client produces the following output

Task 15 completed with fail
Task 55 completed with success
Task 90 got status processing
Task 87 completed with success
Task 79 completed with fail
Task 90 got status processing
Task 49 got status processing
Task 92 completed with fail
Task 90 completed with success
Task 49 got status processing
Task 49 completed with fail
All tasks have been completed.
Results saved to task_results.csv

And stores the tasks into a file with the following content:

Task ID Status Completion Time
1 fail 2025-01-27T21:15:17.695652
2 fail 2025-01-27T21:15:20.636363
3 fail 2025-01-27T21:15:06.673702
4 fail 2025-01-27T21:15:05.758094
5 success 2025-01-27T21:15:16.837756
6 success 2025-01-27T21:15:07.734332
7 fail 2025-01-27T21:15:15.439398
8 fail 2025-01-27T21:14:55.896376
9 success 2025-01-27T21:15:23.485538
10 fail 2025-01-27T21:15:22.648998
11 success 2025-01-27T21:15:23.697543