๐Ÿ”„ Distributed Patterns: Saga Pattern

1. Overview and Problem Statement ๐ŸŽฏโ€‹


The Saga Pattern is a distributed transactions pattern that manages data consistency across multiple services in a microservices architecture. It breaks down long-running transactions into a sequence of local transactions, where each local transaction updates data within a single service.

Problems Solvedโ€‹

  • Maintaining data consistency across services
  • Managing distributed transactions
  • Handling partial failures
  • Coordinating complex business processes
  • Implementing compensating transactions

Business Valueโ€‹

  • Data Consistency: Ensures business data remains consistent
  • System Reliability: Handles failures gracefully
  • Scalability: Enables distributed transaction management
  • Visibility: Provides clear transaction state tracking
  • Maintainability: Simplifies complex workflows

2. Detailed Solution/Architecture ๐Ÿ—๏ธโ€‹

Core Componentsโ€‹

Implementation Typesโ€‹

  1. Choreography-based Saga

    • Services publish events
    • Each service listens and reacts
    • Decentralized coordination
  2. Orchestration-based Saga

    • Central coordinator
    • Explicit flow control
    • Centralized monitoring

3. Technical Implementation ๐Ÿ’ปโ€‹

1. Basic Saga Implementationโ€‹

from enum import Enum
from typing import List, Callable, Dict, Any
from dataclasses import dataclass
import logging
import uuid

class TransactionStatus(Enum):

class SagaStep:
name: str
execute: Callable
compensate: Callable
status: TransactionStatus = TransactionStatus.PENDING
data: Dict[str, Any] = None

class SagaCoordinator:
def __init__(self):
self.steps: List[SagaStep] = []
self.saga_id = str(uuid.uuid4())
self.logger = logging.getLogger(__name__)

def add_step(self, name: str, execute: Callable, compensate: Callable):
step = SagaStep(name=name, execute=execute, compensate=compensate)

async def execute(self):
current_step_index = 0

# Execute forward transactions
for step in self.steps:"Executing step: {}") = await step.execute()
step.status = TransactionStatus.COMPLETED
current_step_index += 1

except Exception as e:
self.logger.error(f"Step {} failed: {str(e)}")
step.status = TransactionStatus.FAILED

# Execute compensating transactions
await self._compensate(current_step_index)

async def _compensate(self, failed_step_index: int):"Starting compensation process")

# Execute compensating transactions in reverse order
for step in reversed(self.steps[:failed_step_index]):
try:"Compensating step: {}")
step.status = TransactionStatus.COMPENSATING
await step.compensate(
step.status = TransactionStatus.COMPENSATED
except Exception as e:
self.logger.error(f"Compensation failed for step {}: {str(e)}")
# Handle compensation failure (might need manual intervention)

# Usage Example
async def book_trip():
saga = SagaCoordinator()

# Add saga steps
name="Reserve Hotel",
name="Book Flight",
name="Charge Payment",

await saga.execute()

2. Choreography-based Saga Implementationโ€‹

from abc import ABC, abstractmethod
from typing import Dict, Any
import asyncio
import json

class EventBus:
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}

def subscribe(self, event_type: str, callback: Callable):
if event_type not in self.subscribers:
self.subscribers[event_type] = []

async def publish(self, event_type: str, data: Any):
if event_type in self.subscribers:
for callback in self.subscribers[event_type]:
await callback(data)

class SagaParticipant(ABC):
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus

def register_handlers(self):

async def execute_transaction(self, data: Dict):

async def execute_compensation(self, data: Dict):

class HotelService(SagaParticipant):
def register_handlers(self):
self.event_bus.subscribe("BookTrip", self.handle_book_trip)
self.event_bus.subscribe("CompensateHotel", self.execute_compensation)

async def handle_book_trip(self, data: Dict):
result = await self.execute_transaction(data)
await self.event_bus.publish("HotelBooked", result)
except Exception as e:
await self.event_bus.publish("HotelBookingFailed", str(e))

async def execute_transaction(self, data: Dict):
# Implement hotel booking logic
return {"reservation_id": "123"}

async def execute_compensation(self, data: Dict):
# Implement hotel cancellation logic

class ChoreographySaga:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self.completed_steps = []
self.failed = False

async def start(self, data: Dict):
await self.event_bus.publish("BookTrip", data)

def handle_success(self, step: str):

def handle_failure(self, step: str):
self.failed = True
# Start compensation

async def _start_compensation(self):
for step in reversed(self.completed_steps):
await self.event_bus.publish(f"Compensate{step}", {})

# Usage Example
async def main():
event_bus = EventBus()
hotel_service = HotelService(event_bus)
flight_service = FlightService(event_bus)
payment_service = PaymentService(event_bus)

saga = ChoreographySaga(event_bus)
await saga.start({"trip_id": "T123"})

3. Orchestration-based Saga Implementationโ€‹

from dataclasses import dataclass
from typing import List, Dict, Optional
import asyncio

class SagaDefinition:
steps: List[Dict[str, Callable]]
compensation_steps: List[Dict[str, Callable]]

class SagaOrchestrator:
def __init__(self):
self.sagas: Dict[str, SagaDefinition] = {}
self.active_sagas: Dict[str, Dict] = {}

def define_saga(self, saga_name: str, saga_definition: SagaDefinition):
self.sagas[saga_name] = saga_definition

async def start_saga(self, saga_name: str, initial_data: Dict) -> str:
if saga_name not in self.sagas:
raise ValueError(f"Saga {saga_name} not found")

saga_instance_id = str(uuid.uuid4())
self.active_sagas[saga_instance_id] = {
"definition": self.sagas[saga_name],
"state": initial_data,
"current_step": 0,
"status": "RUNNING"

await self._execute_saga(saga_instance_id)
return saga_instance_id

async def _execute_saga(self, saga_instance_id: str):
saga = self.active_sagas[saga_instance_id]
current_step = saga["current_step"]

while current_step < len(saga["definition"].steps):
step = saga["definition"].steps[current_step]
result = await step["execute"](saga["state"])
saga["current_step"] += 1
current_step += 1

saga["status"] = "COMPLETED"

except Exception as e:
saga["status"] = "FAILED"
await self._compensate_saga(saga_instance_id)

async def _compensate_saga(self, saga_instance_id: str):
saga = self.active_sagas[saga_instance_id]
current_step = saga["current_step"]

for compensation_step in reversed(saga["definition"].compensation_steps[:current_step]):
await compensation_step["compensate"](saga["state"])
except Exception as e:
# Log compensation failure

saga["status"] = "COMPENSATED"

# Usage Example
async def create_order_saga():
orchestrator = SagaOrchestrator()

saga_definition = SagaDefinition(
{"execute": create_order},
{"execute": reserve_inventory},
{"execute": process_payment},
{"execute": ship_order}
{"compensate": cancel_shipment},
{"compensate": refund_payment},
{"compensate": release_inventory},
{"compensate": cancel_order}

orchestrator.define_saga("CreateOrder", saga_definition)
return orchestrator

4. Best Practices and Guidelines ๐Ÿ“โ€‹

1. Idempotency Implementationโ€‹

from functools import wraps
import hashlib

def idempotent(key_generator):
def decorator(func):
async def wrapper(self, *args, **kwargs):
# Generate idempotency key
key = key_generator(*args, **kwargs)

# Check if operation was already executed
if await self.is_already_executed(key):
return await self.get_stored_result(key)

# Execute operation
result = await func(self, *args, **kwargs)

# Store result
await self.store_result(key, result)
return result
return wrapper
return decorator

class PaymentService:
@idempotent(lambda transaction_id: f"payment_{transaction_id}")
async def process_payment(self, transaction_id: str, amount: float):
# Process payment logic

2. Error Handlingโ€‹

class SagaError(Exception):
def __init__(self, step_name: str, original_error: Exception):
self.step_name = step_name
self.original_error = original_error
super().__init__(f"Saga step {step_name} failed: {str(original_error)}")

class CompensationError(Exception):
def __init__(self, step_name: str, original_error: Exception):
self.step_name = step_name
self.original_error = original_error
super().__init__(f"Compensation for step {step_name} failed: {str(original_error)}")

class SagaErrorHandler:
async def handle_error(error: Exception, saga_instance: Dict):
if isinstance(error, SagaError):
# Log error details
logging.error(f"Saga failed at step {error.step_name}: {error.original_error}")

# Start compensation
await saga_instance["orchestrator"].compensate()

elif isinstance(error, CompensationError):
# Log compensation failure
logging.error(f"Compensation failed at step {error.step_name}: {error.original_error}")

# Notify administrators
await notify_administrators(error)

5. Monitoring and Observability ๐Ÿ“Šโ€‹


from dataclasses import dataclass
from datetime import datetime
import prometheus_client as prom

class SagaMetrics:
saga_duration = prom.Histogram(
'Time spent executing saga',
saga_success = prom.Counter(
'Number of successfully completed sagas',
saga_failures = prom.Counter(
'Number of failed sagas',
compensation_success = prom.Counter(
'Number of successful compensations',
compensation_failures = prom.Counter(
'Number of failed compensations',

class MonitoredSagaOrchestrator:
def __init__(self):
self.metrics = SagaMetrics()

def monitor_saga_execution(self, saga_name: str):
start_time = time.time()
except Exception:
duration = time.time() - start_time

6. Testing Strategies ๐Ÿงชโ€‹

Integration Testingโ€‹

from typing import AsyncGenerator
import pytest
import pytest_asyncio
import asyncio
from databases import Database

class TestSagaIntegration:
async def database(self) -> AsyncGenerator[Database, None]:
db = Database("postgresql://user:pass@localhost/test")
await db.connect()
yield db
await db.disconnect()

async def saga_orchestrator(self, database):
orchestrator = SagaOrchestrator()

async def create_order(data):
await database.execute(
"INSERT INTO orders (id, status) VALUES (:id, :status)",
{"id": data["order_id"], "status": "PENDING"}
return {"order_created": True}

async def reserve_inventory(data):
await database.execute(
"UPDATE inventory SET quantity = quantity - :quantity WHERE product_id = :product_id",
{"product_id": data["product_id"], "quantity": data["quantity"]}
return {"inventory_reserved": True}

saga_definition = SagaDefinition(
{"execute": create_order},
{"execute": reserve_inventory}
{"compensate": release_inventory},
{"compensate": cancel_order}

orchestrator.define_saga("CreateOrder", saga_definition)
return orchestrator

async def test_create_order_saga(self, saga_orchestrator, database):
# Setup test data
initial_data = {
"order_id": "O123",
"product_id": "P456",
"quantity": 1

# Execute saga
saga_id = await saga_orchestrator.start_saga("CreateOrder", initial_data)

# Verify final state
order = await database.fetch_one(
"SELECT status FROM orders WHERE id = :id",
{"id": initial_data["order_id"]}
assert order["status"] == "COMPLETED"

inventory = await database.fetch_one(
"SELECT quantity FROM inventory WHERE product_id = :id",
{"id": initial_data["product_id"]}
assert inventory["quantity"] == initial_data["quantity"]

Failure Testingโ€‹

class TestSagaFailures:
def failing_saga(self):
orchestrator = SagaOrchestrator()

async def failing_step(data):
raise Exception("Simulated failure")

async def compensation_step(data):
return {"compensated": True}

saga_definition = SagaDefinition(
{"execute": Mock(return_value={"step1": "completed"})},
{"execute": failing_step}
{"compensate": compensation_step},
{"compensate": Mock()}

orchestrator.define_saga("FailingSaga", saga_definition)
return orchestrator

async def test_saga_compensation_on_failure(self, failing_saga):
with pytest.raises(Exception) as exc_info:
await failing_saga.start_saga("FailingSaga", {})

assert str(exc_info.value) == "Simulated failure"

# Verify compensation was executed
saga_state = failing_saga.active_sagas[0]
assert saga_state["status"] == "COMPENSATED"
assert saga_state["state"].get("compensated") == True

7. Performance Optimization โšกโ€‹

1. Parallel Executionโ€‹

class ParallelSagaOrchestrator:
async def _execute_parallel_steps(self, steps: List[Dict], state: Dict) -> Dict:
tasks = []
for step in steps:
if step.get("parallel", False):
task = asyncio.create_task(step["execute"](state))
# Execute non-parallel steps sequentially
result = await step["execute"](state)

if tasks:
# Wait for all parallel tasks to complete
results = await asyncio.gather(*tasks, return_exceptions=True)

# Handle any exceptions from parallel execution
for result in results:
if isinstance(result, Exception):
raise result

return state

# Usage Example
saga_definition = SagaDefinition(
{"execute": create_order},
"execute": reserve_inventory,
"parallel": True
"execute": process_payment,
"parallel": True

2. Batch Processingโ€‹

class BatchSagaOrchestrator:
def __init__(self, batch_size: int = 10):
self.batch_size = batch_size
self.pending_sagas: List[Dict] = []

async def add_to_batch(self, saga_name: str, initial_data: Dict):
"saga_name": saga_name,
"data": initial_data

if len(self.pending_sagas) >= self.batch_size:
await self.process_batch()

async def process_batch(self):
tasks = []
for saga in self.pending_sagas:
task = self.start_saga(saga["saga_name"], saga["data"])

results = await asyncio.gather(*tasks, return_exceptions=True)
self.pending_sagas = []
return results

8. Common Patterns and Use Cases ๐ŸŒŸโ€‹

1. Order Processing Sagaโ€‹

class OrderProcessingSaga:
def __init__(self):
self.orchestrator = SagaOrchestrator()

saga_definition = SagaDefinition(
"name": "Validate Order",
"execute": self.validate_order
"name": "Reserve Inventory",
"execute": self.reserve_inventory
"name": "Process Payment",
"execute": self.process_payment
"name": "Ship Order",
"execute": self.ship_order
"name": "Cancel Shipment",
"compensate": self.cancel_shipment
"name": "Refund Payment",
"compensate": self.refund_payment
"name": "Release Inventory",
"compensate": self.release_inventory
"name": "Cancel Order",
"compensate": self.cancel_order

self.orchestrator.define_saga("OrderProcessing", saga_definition)

async def process_order(self, order_data: Dict):
return await self.orchestrator.start_saga("OrderProcessing", order_data)

2. Travel Booking Sagaโ€‹

class TravelBookingSaga:
def __init__(self):
self.orchestrator = SagaOrchestrator()

saga_definition = SagaDefinition(
"name": "Book Flight",
"execute": self.book_flight,
"parallel": True
"name": "Book Hotel",
"execute": self.book_hotel,
"parallel": True
"name": "Book Car",
"execute": self.book_car,
"parallel": True
"name": "Process Payment",
"execute": self.process_payment
"name": "Refund Payment",
"compensate": self.refund_payment
"name": "Cancel Car",
"compensate": self.cancel_car
"name": "Cancel Hotel",
"compensate": self.cancel_hotel
"name": "Cancel Flight",
"compensate": self.cancel_flight

self.orchestrator.define_saga("TravelBooking", saga_definition)

9. Security Considerations ๐Ÿ”’โ€‹

1. Transaction Isolationโ€‹

class IsolatedSagaOrchestrator:
def __init__(self):
self.locked_resources: Dict[str, str] = {}

async def acquire_lock(self, resource_id: str, saga_id: str) -> bool:
if resource_id in self.locked_resources:
return False
self.locked_resources[resource_id] = saga_id
return True

async def release_lock(self, resource_id: str, saga_id: str):
if self.locked_resources.get(resource_id) == saga_id:
del self.locked_resources[resource_id]

async def execute_isolated_step(self, step: Dict, saga_id: str, data: Dict):
required_resources = step.get("required_resources", [])

# Acquire all locks
locks_acquired = all(
await self.acquire_lock(resource, saga_id)
for resource in required_resources

if not locks_acquired:
raise ResourceLockError("Failed to acquire required locks")

return await step["execute"](data)
# Release all locks
for resource in required_resources:
await self.release_lock(resource, saga_id)

10. References and Resources ๐Ÿ“šโ€‹


