In our last post, we used the Mapper Pattern to clean up our data. But what happens when you need to run that data through a long series of transformations, validations, and external checks? Usually, this ends up as a massive, 200-line function. The Pipeline Pattern solves this by breaking the process into a series of "pipes" or stages that a data object flows through.
The Business Use Case: Order Processing Workflow
Imagine a complex E-commerce checkout. When an order is placed, you must:
- Validate inventory.
- Apply seasonal discounts.
- Calculate international shipping taxes.
- Flag for fraud if the amount is high.
Doing this in one function makes it impossible to test individual steps or change the order of operations later.
The Problem: The Sequential "God Function"
# The "Bad" Way: A rigid, massive function
def process_order(order):
# Step 1: Inventory
if not check_inventory(order):
return False
# Step 2: Taxes
order.tax = calculate_tax(order.country)
# Step 3: Fraud
if order.total > 5000:
flag_fraud(order)
# ... and so on for 10 more steps
order.save()
This is brittle. If you want to add a "Apply Coupon" step, you have to risk breaking the logic in the middle of this giant function.
The Solution: The Pipeline Pattern
We break the logic into independent Stages. Each stage takes the data, performs one specific task, and passes it to the next.
Step 1: Define the Pipeline Stages
Each stage is a simple class or function with a single responsibility.
# stages.py
class InventoryStage:
def process(self, order):
if not check_inventory(order):
raise Exception("Out of Stock")
return order
class TaxCalculationStage:
def process(self, order):
order.tax = calculate_tax(order.country)
return order
class FraudCheckStage:
def process(self, order):
if order.total > 5000:
order.is_flagged = True
return order
Step 2: Create the Pipeline Runner
This "Orchestrator" handles the flow of data through the list of stages.
# engine.py
class OrderPipeline:
def __init__(self):
self.stages = [
InventoryStage(),
TaxCalculationStage(),
FraudCheckStage(),
]
def run(self, order):
for stage in self.stages:
order = stage.process(order)
return order
Step 3: The New Workflow
The service layer now simply initiates the pipeline. The steps are explicit and easy to read.
# services.py
def checkout_service(order_id):
order = OrderRepository.get_by_id(order_id)
pipeline = OrderPipeline()
try:
processed_order = pipeline.run(order)
processed_order.save()
except Exception as e:
handle_failure(e)
Why it's Better for Big Projects
- Unmatched Reusability: Need to calculate taxes for a different type of order? Just reuse the
TaxCalculationStagein a different pipeline. - Isolated Testing: You can write unit tests for each stage individually without running the entire checkout process.
- Dynamic Workflows: You can easily swap, add, or remove stages (e.g., adding a
BlackFridayDiscountStage) without touching the rest of the logic.
The Pipeline Pattern converts "spaghetti logic" into a clean, modular assembly line. It is the gold standard for any process in your Django app that involves more than three distinct steps.
Architecture Note: To keep your architecture clean and avoid circular import errors, keep your stages in a separate stages.py file and your runner in a pipelines.py. This prevents the "God Function" from ever coming back and makes it easy for your Service Layer to manage complex workflows while keeping the code readable and testable.