In our last post, we used the Mapper Pattern to clean up our data. But what happens when you need to run that data through a long series of transformations, validations, and external checks? Usually, this ends up as a massive, 200-line function. The Pipeline Pattern solves this by breaking the process into a series of "pipes" or stages that a data object flows through.
The Business Use Case: Order Processing Workflow
Imagine a complex E-commerce checkout. When an order is placed, you must: 1. Validate inventory. 2. Apply seasonal discounts. 3. Calculate international shipping taxes. 4. Flag for fraud if the amount is high. Doing this in one function makes it impossible to test individual steps or change the order of operations later.
The Problem: The Sequential "God Function"
# The "Bad" Way: A rigid, massive function
def process_order(order):
# Step 1: Inventory
if not check_inventory(order):
return False
# Step 2: Taxes
order.tax = calculate_tax(order.country)
# Step 3: Fraud
if order.total > 5000:
flag_fraud(order)
# ... and so on for 10 more steps
order.save()
This is brittle. If you want to add a "Apply Coupon" step, you have to risk breaking the logic in the middle of this giant function.
The Solution: The Pipeline Pattern
We break the logic into independent Stages. Each stage takes the data, performs one specific task, and passes it to the next.
Step 1: Define the Pipeline Stages
Each stage is a simple class or function with a single responsibility.
# stages.py
class InventoryStage:
def process(self, order):
if not check_inventory(order):
raise Exception("Out of Stock")
return order
class TaxCalculationStage:
def process(self, order):
order.tax = calculate_tax(order.country)
return order
class FraudCheckStage:
def process(self, order):
if order.total > 5000:
order.is_flagged = True
return order
Step 2: Create the Pipeline Runner
This "Orchestrator" handles the flow of data through the list of stages.
# engine.py
class OrderPipeline:
def __init__(self):
self.stages = [
InventoryStage(),
TaxCalculationStage(),
FraudCheckStage(),
]
def run(self, order):
for stage in self.stages:
order = stage.process(order)
return order
Step 3: The New Workflow
The service layer now simply initiates the pipeline. The steps are explicit and easy to read.
# services.py
def checkout_service(order_id):
order = OrderRepository.get_by_id(order_id)
pipeline = OrderPipeline()
try:
processed_order = pipeline.run(order)
processed_order.save()
except Exception as e:
handle_failure(e)
Why it's Better for Big Projects
- Unmatched Reusability: Need to calculate taxes for a different type of order? Just reuse the
TaxCalculationStagein a different pipeline. - Isolated Testing: You can write unit tests for each stage individually without running the entire checkout process.
- Dynamic Workflows: You can easily swap, add, or remove stages (e.g., adding a
BlackFridayDiscountStage) without touching the rest of the logic.
The Pipeline Pattern converts "spaghetti logic" into a clean, modular assembly line. It is the gold standard for any process in your Django app that involves more than three distinct steps.
Architecture Note: To keep your architecture clean and avoid circular import errors, keep your stages in a separate stages.py file and your runner in a pipelines.py. This prevents the "God Function" from ever coming back and makes it easy for your Service Layer to manage complex workflows while keeping the code readable and testable.