Code: github.com/punitarani/message-queue
Building a scalable and fault-tolerant order processing system is crucial for maintaining a smooth and responsive user experience. This system will handle high traffic, ensure data integrity, and provide real-time order status updates. In this post, we'll show you how to create this system using FastAPI, PostgreSQL, RabbitMQ, Docker, and Nginx.
Table of Contents
- System Overview
- Setting Up the Database
- Building the API with FastAPI
- Implementing the Consumer with RabbitMQ
- Configuring Nginx as a Reverse Proxy
- Applicattion
- Ensuring Fault Tolerance and Data Integrity
- Conclusion
System Overview
The system we're building allows clients to place orders and retrieve their status in real-time. Here's a high-level view of how the components interact:
- Client: Sends HTTP requests to place orders and check their status.
- API (FastAPI): Receives client requests, interacts with the database, and publishes messages to RabbitMQ.
- Database (PostgreSQL): Stores order data and maintains data integrity.
- Message Queue (RabbitMQ): Queues order processing tasks for asynchronous handling.
- Consumer: Listens to RabbitMQ, processes orders, and updates the database.
- Nginx: Acts as a reverse proxy, managing incoming traffic and distributing it across API replicas.
This architecture ensures that the system can handle high traffic, recover gracefully from failures, and maintain accurate order records.
Architecture Diagram
- Client sends a request to Nginx.
- Nginx proxies the request to one of the API instances.
- The API interacts with PostgreSQL to store order details and publishes a message to RabbitMQ.
- Consumer services listen to RabbitMQ, process the order, and update the PostgreSQL database.
- Clients can query the API to check the status of their orders.
Results
We tested the limits of our system and achieved over 1600 requests per second with a 100% success rate on 3 API replicas and 1 consumer with only 100 concurrent database connections running on a laptop. The median latency was sub 100ms and P99 latency under 1s.
The main bottleneck was the database connection limit as we write to the database even before the order is published to RabbitMQ for processing.
Setting Up the Database
We create a simple database with a single table called orders
.
Database Interaction
To interact with the database, we use asyncpg
with sqlalchemy
ORM to make the code cleaner and more maintainable.
Connection Pooling allows us to reuse existing database connections instead of creating new ones, which saves time and resources.
pool_size
is the maximum number of permanent connections.max_overflow
is the maximum number of temporary connections.pool_timeout
is the maximum wait time for a connection. If a connection is not available within this time, an error is raised.pool_recycle
is the number of seconds to recycle connections after. This is useful to prevent the database from running out of connections.
The Order
model implementes the ORM for the orders
table allowing us to interact with the database using python in a much simpler and cleaner way without having to write raw SQL.
Building the API with FastAPI
Implementing the API endpoints using FastAPI to handle order placement and status retrieval.
- Endpoints:
POST /order/place
: Creates a new order, stores it in the database, and publishes a message to RabbitMQ for processing.GET /order/get/{order_id}
: Retrieves the status of a specific order.
- RabbitMQ Integration:
- Startup Event: Establishes a robust connection to RabbitMQ and declares the
order_queue
. - Shutdown Event: Gracefully closes the RabbitMQ connection.
- Startup Event: Establishes a robust connection to RabbitMQ and declares the
- Message Publishing:
- Orders are published to RabbitMQ with
delivery_mode=Persistent
to ensure messages aren't lost if RabbitMQ restarts.
- Orders are published to RabbitMQ with
Implementing the Consumer with RabbitMQ
The consumer listens to the order_queue
and processes incoming orders asynchronously.
Explanation:
- Processing Logic:
- Fetch Order: Retrieves the order from the database using the
order_id
. - Update Status: Changes the order status to
'processing'
, simulates processing time, and finally updates the status to'done'
.
- Fetch Order: Retrieves the order from the database using the
- Concurrency Control:
- Uses an
asyncio.Semaphore
to limit the number of concurrent order processing tasks to 20. - This prevents resource exhaustion and simulates a constrained resource in a real-world scenario.
- Uses an
- Error Handling:
- Rolls back the transaction in case of any database errors to maintain data integrity.
- Persistent Connections:
- Utilizes
connect_robust
fromaio_pika
for resilient RabbitMQ connections that automatically handle reconnections.
- Utilizes
Configuring Nginx as a Reverse Proxy
nginx.conf
Nginx serves as a reverse proxy, distributing incoming HTTP requests to multiple API instances.
Explanation:
worker_connections 1024
limits the number of connections to 1024 per worker.upstream api_servers { server api:3000; }
forwards client requests to the API running on port3000
.- This is the port that the FastAPI application is running on.
listen 80
specifies that Nginx should listen on port80
for incoming HTTP requests.- You can connect to the Nginx server using
http://localhost:80
.
- You can connect to the Nginx server using
proxy_pass
passes the request to the API.proxy_set_header
preserves important headers for logging and security.
Benefits:
- Horizontal Scaling: Easy to scale horizontally by adding more API instances to ensure high availability to the clients.
- Load Balancing: Can be extended to include multiple API instances for better load distribution.
Application
We use Docker Compose to define and run the database, message broker, API, and consumer services.
Postgres
- Runs PostgreSQL database using the official Alpine-based image
- Initializes with a custom
init.sql
script - Implements a health check to ensure readiness
RabbitMQ
- Runs RabbitMQ message broker with management UI
- Exposes ports 5672 (AMQP) and 15672 (Management UI)
- Includes a health check for operational verification
API
- Builds from the project's Dockerfile
- Runs multiple replicas for load balancing
- Depends on Postgres and RabbitMQ being healthy
- Exposes port 3000 internally
Consumer
- Builds from the same Dockerfile as the API
- Runs the message consumer script
- Depends on Postgres and RabbitMQ being healthy
Nginx
- Uses the official Nginx image
- Acts as a reverse proxy, exposing port 80 to the host
- Loads custom nginx.conf for routing
- Depends on the API service
Network Configuration
This connects all services to a shared network, enabling them to communicate with each other.
Explanation:
- Services:
- Postgres: Runs the PostgreSQL database with initial setup from
init.sql
. Includes a health check to ensure readiness. - RabbitMQ: Hosts RabbitMQ with management UI accessible on port
15672
. Also includes a health check. - API: Builds from the provided
Dockerfile
, connects to Postgres and RabbitMQ, and scales to 3 replicas for load balancing. - Consumer: Processes messages from RabbitMQ, ensuring orders are handled asynchronously.
- Nginx: Acts as a reverse proxy, forwarding incoming traffic to the API service.
- Postgres: Runs the PostgreSQL database with initial setup from
- Networks: All services are connected through the
app-network
bridge network for seamless inter-service communication.
Benefits:
- Isolation: Each service runs in its own container, ensuring environment consistency.
- Scalability: Easily scale the API service by adjusting the
replicas
count. - Resilience: Health checks and
restart: always
ensure services recover from failures automatically.
Ensuring Fault Tolerance and Data Integrity
Building a robust system requires addressing potential points of failure and ensuring data remains consistent. Here's how our architecture achieves this:
1. Durable Message Queues
RabbitMQ is configured with durable=True
and messages are published with delivery_mode=Persistent
. This ensures that messages are not lost even if RabbitMQ restarts.
2. Database Transactions
All database operations are performed within transactions. If an error occurs during order processing, changes are rolled back to maintain data integrity.
3. Message Acknowledgment
Messages are acknowledged only after successful processing. If the consumer crashes before acknowledgment, RabbitMQ requeues the message for another attempt, preventing message loss.
4. Graceful Shutdowns
Both the API and consumer handle shutdown events gracefully, ensuring connections to RabbitMQ are closed properly to prevent message loss or corruption.
5. Health Checks and Dependencies
Docker Compose uses health checks and depends_on
with service_healthy
conditions to ensure services only start when their dependencies are ready, reducing the risk of connection failures.
Conclusion
This project demonstrates building a scalable message queue system using FastAPI, PostgreSQL, RabbitMQ, Docker, and Nginx. It handles thousands of requests per second and showcases a basic microservice architecture.
The guide covers setup, integration, and basic fault tolerance. While not production-ready, it's a solid starting point for understanding scalable system concepts.