Post

From Chaos to Control Part 2: Configuration-Driven Rule Orchestration

From Chaos to Control Part 2: Configuration-Driven Rule Orchestration

How we evolved our rule engine to support dynamic staging, configuration-driven execution, and enterprise data quality pipelines


Table of Contents

  1. Introduction: Beyond Basic Validation
  2. The Challenge: Multi-Stage Data Processing
  3. Foundation: Rule Staging Architecture
  4. Evolution: Configuration-Driven Rule Loading
  5. Intelligence: Dynamic Stage Management
  6. Architecture: The Stage-Aware Execution Engine
  7. Real-World Implementation: Data Quality Pipeline
  8. Enterprise Features: Advanced Stage Orchestration
  9. Production Ready: Complete Configuration System
  10. Conclusion: Lessons from Production

Introduction: Beyond Basic Validation

In Part 1, we built a robust rule engine that solved the fundamental problems of scattered business logic. We achieved automatic dependency resolution, clean abstractions, and enterprise-grade dependency injection. But production environments revealed a new challenge: not all rules should execute at the same time.

Consider a typical data processing pipeline in a financial services company:

  • Preprocessing Stage: Clean and normalize incoming data
  • Data Quality Stage: Validate completeness, accuracy, and consistency
  • Compliance Stage: Ensure regulatory requirements are met
  • Business Rules Stage: Apply domain-specific validation logic
  • Post-Processing Stage: Generate reports and audit trails

Each stage has distinct requirements, performance characteristics, and failure handling strategies. Running all rules together creates several problems:

  • Performance Degradation: Expensive compliance checks run on data that hasn’t passed basic quality validation
  • Error Cascade: Business rule failures mask fundamental data quality issues
  • Resource Waste: Complex validation logic executes on data that will be rejected by preprocessing
  • Operational Complexity: All-or-nothing execution makes it difficult to isolate and debug specific pipeline stages

Today, we’ll evolve our rule engine to support configuration-driven stage management that solves these production challenges.

The Challenge: Multi-Stage Data Processing

Let’s examine what happens when we try to force all rules into a single execution context:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# This is what we DON'T want - monolithic rule execution
def validate_customer_data(customer_data):
    executor = RuleExecutor()
    
    # Preprocessing rules
    executor.register_rule(TrimWhitespaceRule("trim_name", "name"))
    executor.register_rule(NormalizePhoneRule("normalize_phone", "phone"))
    
    # Data quality rules  
    executor.register_rule(RequiredFieldRule("name_required", "name"))
    executor.register_rule(EmailFormatRule("email_format", "email"))
    
    # Compliance rules (expensive!)
    executor.register_rule(KYCValidationRule("kyc_check", ["name", "address"]))
    executor.register_rule(SanctionsListRule("sanctions_check", "name"))
    
    # Business rules
    executor.register_rule(CreditScoreRule("credit_check", "ssn"))
    executor.register_rule(RiskAssessmentRule("risk_assessment", ["income", "debt"]))
    
    # Execute everything at once - problematic!
    return executor.execute_all(customer_data)
graph TD
    A[Customer Data] --> B[Monolithic Rule Executor]
    B --> C[All Rules Execute Together]
    C --> D[Mixed Success/Failure Results]
    
    E[Problems] --> F[Resource Waste]
    E --> G[Error Cascade]
    E --> H[Poor Debugging]
    E --> I[No Isolation]
    
    style B fill:#ffcccc
    style E fill:#ff6666

The Problems This Creates

Resource Inefficiency: A customer record with a missing required field still triggers expensive KYC and credit checks before failing basic validation.

Poor User Experience: Users receive a mix of preprocessing errors, data quality failures, and business rule violations simultaneously, making it difficult to understand what needs to be fixed first.

Debugging Nightmares: When the pipeline fails, engineers must sift through results from all stages to identify the root cause.

Operational Inflexibility: You can’t run just the compliance checks on previously validated data, or reprocess only the business rules after a requirement change.

The solution is to organize rules into logical execution stages that can be independently configured, executed, and monitored.

Foundation: Rule Staging Architecture

The key insight is that rules naturally group into processing stages, and each stage represents a distinct business concern. Let’s formalize this concept:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Set

class ExecutionStage(Enum):
    """Define the stages of our data processing pipeline"""
    PREPROCESSING = "preprocessing"
    DATA_QUALITY = "data_quality" 
    COMPLIANCE = "compliance"
    BUSINESS_RULES = "business_rules"
    POST_PROCESSING = "post_processing"

@dataclass
class StageConfig:
    """Configuration for a single execution stage"""
    stage: ExecutionStage
    enabled: bool = True
    fail_fast: bool = True  # Stop on first failure
    parallel_execution: bool = False
    timeout_seconds: int = 30
    retry_attempts: int = 0

class StagedRule(Rule):
    """Enhanced rule that belongs to specific execution stages"""
    
    def __init__(self, rule_id: str, stage: ExecutionStage, 
                 dependencies: List[str] = None, 
                 cross_stage_dependencies: List[str] = None):
        super().__init__(rule_id, dependencies or [])
        self.stage = stage
        self.cross_stage_dependencies = cross_stage_dependencies or []
    
    def can_execute_in_stage(self, stage: ExecutionStage, 
                           completed_stages: Set[ExecutionStage],
                           stage_results: Dict[str, RuleResult]) -> bool:
        """Check if rule can execute in the given stage context"""
        # Must be the correct stage
        if self.stage != stage:
            return False
        
        # Check cross-stage dependencies
        for cross_dep in self.cross_stage_dependencies:
            if stage_results.get(cross_dep) != RuleResult.PASS:
                return False
        
        # Check same-stage dependencies
        return self.can_execute(stage_results)
graph LR
    A[Preprocessing] --> B[Data Quality]
    B --> C[Compliance]
    B --> D[Business Rules]
    C --> E[Post Processing]
    D --> E
    
    A1[Trim WhitespaceNormalize PhoneParse Names] -.-> A
    B1[Required FieldsFormat ValidationConsistency Checks] -.-> B
    C1[KYC VerificationSanctions ScreeningAML Checks] -.-> C
    D1[Credit ScoreIncome VerificationRisk Assessment] -.-> D
    E1[Generate ReportsAudit TrailsNotifications] -.-> E
    
    style A fill:#e1f5fe
    style B fill:#fff3e0
    style C fill:#fce4ec
    style D fill:#f3e5f5
    style E fill:#e8f5e8

This foundation introduces several important concepts:

Stage-Aware Rules

Rules now explicitly declare which stage they belong to, eliminating ambiguity about execution order and enabling stage-specific optimizations.

Cross-Stage Dependencies

Rules can depend on results from previous stages, enabling sophisticated pipeline logic while maintaining stage separation.

Stage Configuration

Each stage can be independently configured for performance characteristics, error handling, and execution strategy.

Let’s see this in action with concrete implementations:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class TrimWhitespaceRule(StagedRule):
    """Preprocessing rule that normalizes string data"""
    
    def __init__(self, rule_id: str, field_name: str):
        super().__init__(rule_id, ExecutionStage.PREPROCESSING)
        self.field_name = field_name
    
    def execute(self, data: dict) -> RuleResult:
        value = data.get(self.field_name)
        if isinstance(value, str):
            # Modify data in place during preprocessing
            data[self.field_name] = value.strip()
            return RuleResult.PASS
        return RuleResult.SKIP

class DataCompletenessRule(StagedRule):
    """Data quality rule that checks for required fields"""
    
    def __init__(self, rule_id: str, required_fields: List[str]):
        super().__init__(rule_id, ExecutionStage.DATA_QUALITY)
        self.required_fields = required_fields
    
    def execute(self, data: dict) -> RuleResult:
        missing_fields = [
            field for field in self.required_fields 
            if not data.get(field)
        ]
        return RuleResult.PASS if not missing_fields else RuleResult.FAIL

class KYCComplianceRule(StagedRule):
    """Compliance rule that performs expensive external validation"""
    
    def __init__(self, rule_id: str, kyc_service: KYCService):
        # Depends on data quality passing first
        super().__init__(
            rule_id, 
            ExecutionStage.COMPLIANCE,
            cross_stage_dependencies=["data_completeness_check"]
        )
        self.kyc_service = kyc_service
    
    def execute(self, data: dict) -> RuleResult:
        # This is expensive - only run after data quality validation
        try:
            is_valid = self.kyc_service.validate_identity(
                data["name"], data["address"], data["ssn"]
            )
            return RuleResult.PASS if is_valid else RuleResult.FAIL
        except Exception:
            return RuleResult.FAIL

The transformation is immediate: rules become self-documenting about their place in the processing pipeline, and expensive operations are protected by appropriate prerequisites.

Evolution: Configuration-Driven Rule Loading

Hard-coding rule registration quickly becomes unwieldy in enterprise environments. Different environments need different rule sets, business requirements change frequently, and operations teams need control over pipeline configuration without code changes.

The solution is configuration-driven rule loading using Python dictionaries that can be externalized to JSON, YAML, or database storage:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# Pipeline configuration as a Python dictionary
PIPELINE_CONFIG = {
    "stages": {
        "preprocessing": {
            "enabled": True,
            "fail_fast": False,  # Continue processing even if some preprocessing fails
            "parallel_execution": True,
            "timeout_seconds": 10,
            "rules": {
                "trim_name": {
                    "class": "TrimWhitespaceRule",
                    "params": {"field_name": "name"}
                },
                "trim_email": {
                    "class": "TrimWhitespaceRule", 
                    "params": {"field_name": "email"}
                },
                "normalize_phone": {
                    "class": "PhoneNormalizationRule",
                    "params": {"field_name": "phone", "country_code": "US"}
                }
            }
        },
        "data_quality": {
            "enabled": True,
            "fail_fast": True,  # Stop immediately on data quality failures
            "parallel_execution": False,
            "timeout_seconds": 15,
            "rules": {
                "required_fields": {
                    "class": "DataCompletenessRule",
                    "params": {"required_fields": ["name", "email", "phone"]}
                },
                "email_format": {
                    "class": "EmailFormatValidationRule",
                    "params": {"field_name": "email"},
                    "dependencies": ["required_fields"]
                },
                "phone_format": {
                    "class": "PhoneFormatValidationRule", 
                    "params": {"field_name": "phone"},
                    "dependencies": ["required_fields"]
                }
            }
        },
        "compliance": {
            "enabled": True,
            "fail_fast": True,
            "parallel_execution": True,  # Independent compliance checks can run in parallel
            "timeout_seconds": 60,  # Compliance checks are slower
            "rules": {
                "kyc_validation": {
                    "class": "KYCComplianceRule",
                    "params": {"service_name": "primary_kyc"},
                    "cross_stage_dependencies": ["required_fields", "email_format"]
                },
                "sanctions_screening": {
                    "class": "SanctionsScreeningRule",
                    "params": {"screening_level": "enhanced"},
                    "cross_stage_dependencies": ["required_fields"]
                },
                "pep_screening": {
                    "class": "PEPScreeningRule",
                    "params": {"risk_threshold": "medium"},
                    "cross_stage_dependencies": ["kyc_validation"]
                }
            }
        },
        "business_rules": {
            "enabled": True,
            "fail_fast": False,  # Collect all business rule violations
            "parallel_execution": True,
            "timeout_seconds": 30,
            "rules": {
                "credit_score_check": {
                    "class": "CreditScoreValidationRule",
                    "params": {"minimum_score": 650, "bureau": "experian"},
                    "cross_stage_dependencies": ["kyc_validation"]
                },
                "income_verification": {
                    "class": "IncomeVerificationRule",
                    "params": {"verification_method": "bank_statements"},
                    "cross_stage_dependencies": ["kyc_validation"]
                },
                "debt_to_income_ratio": {
                    "class": "DebtToIncomeRule",
                    "params": {"max_ratio": 0.43},
                    "dependencies": ["income_verification"]
                }
            }
        }
    },
    "execution_settings": {
        "stop_on_stage_failure": True,  # Stop pipeline if any stage fails
        "enable_audit_logging": True,
        "max_parallel_rules": 4,
        "default_timeout": 30
    }
}
graph TB
    A[Configuration Dictionary] --> B[Rule Configuration Loader]
    B --> C[Dynamic Rule Instantiation]
    C --> D[Service Dependency Injection]
    D --> E[Staged Rule Registry]
    
    F[Environment Overrides] --> A
    G[Feature Flags] --> A
    H[Business Parameters] --> A
    
    E --> I[Preprocessing Rules]
    E --> J[Data Quality Rules]
    E --> K[Compliance Rules]
    E --> L[Business Rules]
    
    style A fill:#e3f2fd
    style E fill:#f1f8e9

Configuration Structure Benefits

This structure provides several enterprise advantages:

Environment-Specific Rules: Different configurations for development, staging, and production environments.

Feature Flags: Enable/disable entire stages or individual rules without code deployment.

Performance Tuning: Adjust timeouts, parallelism, and failure handling per environment.

Business Flexibility: Non-technical users can modify business rule parameters.

Operational Control: Operations teams can disable problematic rules during incidents.

Now we need a configuration loader that transforms this dictionary into executable rule instances:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import importlib
from typing import Type, Any

class RuleConfigurationLoader:
    """Loads rules from configuration and instantiates them with dependency injection"""
    
    def __init__(self, service_container: ServiceContainer):
        self.container = service_container
        self.rule_registry = {}  # rule_class_name -> Rule class
        self._register_built_in_rules()
    
    def _register_built_in_rules(self):
        """Register all available rule classes"""
        self.rule_registry.update({
            "TrimWhitespaceRule": TrimWhitespaceRule,
            "DataCompletenessRule": DataCompletenessRule,
            "EmailFormatValidationRule": EmailFormatValidationRule,
            "PhoneFormatValidationRule": PhoneFormatValidationRule,
            "KYCComplianceRule": KYCComplianceRule,
            "SanctionsScreeningRule": SanctionsScreeningRule,
            "PEPScreeningRule": PEPScreeningRule,
            "CreditScoreValidationRule": CreditScoreValidationRule,
            "IncomeVerificationRule": IncomeVerificationRule,
            "DebtToIncomeRule": DebtToIncomeRule,
        })
    
    def register_rule_class(self, name: str, rule_class: Type[StagedRule]):
        """Register custom rule classes for dynamic loading"""
        self.rule_registry[name] = rule_class
    
    def load_rules_for_stage(self, stage: ExecutionStage, 
                           config: Dict[str, Any]) -> List[StagedRule]:
        """Load all rules for a specific stage from configuration"""
        stage_name = stage.value
        
        if stage_name not in config["stages"]:
            return []
        
        stage_config = config["stages"][stage_name]
        
        if not stage_config.get("enabled", True):
            return []
        
        rules = []
        
        for rule_id, rule_config in stage_config["rules"].items():
            rule = self._create_rule_from_config(rule_id, rule_config, stage)
            if rule:
                rules.append(rule)
        
        return rules
    
    def _create_rule_from_config(self, rule_id: str, 
                               rule_config: Dict[str, Any],
                               stage: ExecutionStage) -> StagedRule:
        """Create a rule instance from configuration"""
        rule_class_name = rule_config["class"]
        
        if rule_class_name not in self.rule_registry:
            raise ValueError(f"Unknown rule class: {rule_class_name}")
        
        rule_class = self.rule_registry[rule_class_name]
        params = rule_config.get("params", {})
        dependencies = rule_config.get("dependencies", [])
        cross_stage_deps = rule_config.get("cross_stage_dependencies", [])
        
        # Inject services into rule parameters
        resolved_params = self._resolve_service_dependencies(params)
        resolved_params.update({
            "rule_id": rule_id,
            "stage": stage,
            "dependencies": dependencies,
            "cross_stage_dependencies": cross_stage_deps
        })
        
        try:
            return rule_class(**resolved_params)
        except Exception as e:
            print(f"Failed to create rule {rule_id}: {e}")
            return None
    
    def _resolve_service_dependencies(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """Resolve service dependencies in rule parameters"""
        resolved = {}
        
        for key, value in params.items():
            if isinstance(value, str) and value.startswith("service:"):
                # Format: "service:ServiceInterface:optional_name"
                service_parts = value.split(":")
                service_name = service_parts[1]
                optional_name = service_parts[2] if len(service_parts) > 2 else None
                
                # Dynamically resolve service interface
                service_interface = globals().get(service_name)
                if service_interface:
                    resolved[key] = self.container.resolve(service_interface, optional_name)
                else:
                    raise ValueError(f"Unknown service interface: {service_name}")
            else:
                resolved[key] = value
        
        return resolved

Configuration Loading Benefits

This loader provides several powerful capabilities:

Dynamic Rule Instantiation: Rules are created from configuration at runtime, enabling flexible pipeline composition.

Service Injection: External dependencies are automatically injected based on configuration parameters.

Error Isolation: Rule creation failures don’t crash the entire pipeline.

Extensibility: New rule classes can be registered without modifying the loader.

Validation: Configuration structure is validated during rule creation.

Intelligence: Dynamic Stage Management

With configuration-driven rule loading in place, we need intelligent stage management that can execute specific stages, handle cross-stage dependencies, and provide detailed execution feedback:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from datetime import datetime

@dataclass 
class StageExecutionResult:
    """Results from executing a single stage"""
    stage: ExecutionStage
    success: bool
    rule_results: Dict[str, RuleResult]
    execution_time_ms: int
    failed_rules: List[str]
    skipped_rules: List[str]
    error_messages: Dict[str, str]

class StageManager:
    """Manages the execution of rules within individual stages"""
    
    def __init__(self, config_loader: RuleConfigurationLoader):
        self.config_loader = config_loader
        self.global_results = {}  # Accumulates results across all stages
    
    def execute_stage(self, stage: ExecutionStage, 
                     config: Dict[str, Any], 
                     context: ExecutionContext) -> StageExecutionResult:
        """Execute all rules for a specific stage"""
        start_time = time.time()
        
        # Load rules for this stage
        rules = self.config_loader.load_rules_for_stage(stage, config)
        
        if not rules:
            return self._create_empty_result(stage, start_time)
        
        stage_config = config["stages"][stage.value]
        
        # Execute rules based on stage configuration
        if stage_config.get("parallel_execution", False):
            stage_results = self._execute_rules_parallel(
                rules, context, stage_config
            )
        else:
            stage_results = self._execute_rules_sequential(
                rules, context, stage_config
            )
        
        # Update global results for cross-stage dependencies
        self.global_results.update(stage_results)
        
        execution_time = int((time.time() - start_time) * 1000)
        
        return StageExecutionResult(
            stage=stage,
            success=self._is_stage_successful(stage_results),
            rule_results=stage_results,
            execution_time_ms=execution_time,
            failed_rules=[r for r, result in stage_results.items() 
                         if result == RuleResult.FAIL],
            skipped_rules=[r for r, result in stage_results.items() 
                          if result == RuleResult.SKIP],
            error_messages={}  # Would be populated from exception handling
        )
    
    def _execute_rules_sequential(self, rules: List[StagedRule], 
                                context: ExecutionContext,
                                stage_config: Dict[str, Any]) -> Dict[str, RuleResult]:
        """Execute rules sequentially with dependency resolution"""
        resolver = DependencyResolver()
        
        # Build dependency graph for this stage
        for rule in rules:
            resolver.add_rule(rule)
        
        execution_order = resolver.get_execution_order()
        results = {}
        fail_fast = stage_config.get("fail_fast", True)
        
        for rule_id in execution_order:
            rule = next(r for r in rules if r.rule_id == rule_id)
            
            # Check if rule can execute given current results
            if rule.can_execute_in_stage(rule.stage, set(), self.global_results):
                try:
                    result = rule.execute(context.data)
                    results[rule_id] = result
                    
                    if fail_fast and result == RuleResult.FAIL:
                        # Mark remaining rules as skipped
                        remaining_rules = execution_order[execution_order.index(rule_id) + 1:]
                        for remaining_rule_id in remaining_rules:
                            results[remaining_rule_id] = RuleResult.SKIP
                        break
                        
                except Exception as e:
                    results[rule_id] = RuleResult.FAIL
                    if fail_fast:
                        break
            else:
                results[rule_id] = RuleResult.SKIP
        
        return results
    
    def _execute_rules_parallel(self, rules: List[StagedRule],
                              context: ExecutionContext,
                              stage_config: Dict[str, Any]) -> Dict[str, RuleResult]:
        """Execute independent rules in parallel for performance"""
        max_workers = min(len(rules), stage_config.get("max_parallel_rules", 4))
        timeout = stage_config.get("timeout_seconds", 30)
        results = {}
        
        # Identify rules that can run in parallel (no dependencies within stage)
        independent_rules = [r for r in rules if not r.dependencies]
        dependent_rules = [r for r in rules if r.dependencies]
        
        # Execute independent rules in parallel
        if independent_rules:
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                future_to_rule = {
                    executor.submit(self._execute_single_rule, rule, context): rule
                    for rule in independent_rules
                    if rule.can_execute_in_stage(rule.stage, set(), self.global_results)
                }
                
                try:
                    for future in as_completed(future_to_rule, timeout=timeout):
                        rule = future_to_rule[future]
                        try:
                            result = future.result()
                            results[rule.rule_id] = result
                        except Exception as e:
                            results[rule.rule_id] = RuleResult.FAIL
                except TimeoutError:
                    # Handle timeout by marking incomplete rules as failed
                    for future, rule in future_to_rule.items():
                        if not future.done():
                            future.cancel()
                            results[rule.rule_id] = RuleResult.FAIL
        
        # Execute dependent rules sequentially after parallel execution
        if dependent_rules:
            sequential_results = self._execute_rules_sequential(
                dependent_rules, context, stage_config
            )
            results.update(sequential_results)
        
        return results
    
    def _execute_single_rule(self, rule: StagedRule, 
                           context: ExecutionContext) -> RuleResult:
        """Execute a single rule with error handling"""
        try:
            return rule.execute(context.data)
        except Exception:
            return RuleResult.FAIL
    
    def _is_stage_successful(self, results: Dict[str, RuleResult]) -> bool:
        """Determine if a stage execution was successful"""
        return all(result in [RuleResult.PASS, RuleResult.SKIP] 
                  for result in results.values())
    
    def _create_empty_result(self, stage: ExecutionStage, 
                           start_time: float) -> StageExecutionResult:
        """Create result for stages with no rules"""
        execution_time = int((time.time() - start_time) * 1000)
        return StageExecutionResult(
            stage=stage,
            success=True,
            rule_results={},
            execution_time_ms=execution_time,
            failed_rules=[],
            skipped_rules=[],
            error_messages={}
        )
sequenceDiagram
    participant PM as Pipeline Manager
    participant SM as Stage Manager
    participant CL as Config Loader
    participant RS as Rule Service
    
    PM->>SM: execute_stage(DATA_QUALITY)
    SM->>CL: load_rules_for_stage()
    CL-->>SM: [RequiredFields, EmailFormat, PhoneFormat]
    
    alt Parallel Execution
        SM->>RS: execute(RequiredFields)
        SM->>RS: execute(EmailFormat)
        SM->>RS: execute(PhoneFormat)
        par
            RS-->>SM: PASS
        and
            RS-->>SM: PASS
        and
            RS-->>SM: FAIL
        end
    else Sequential Execution
        SM->>RS: execute(RequiredFields)
        RS-->>SM: PASS
        SM->>RS: execute(EmailFormat)
        RS-->>SM: PASS
        SM->>RS: execute(PhoneFormat)
        RS-->>SM: FAIL
    end
    
    SM-->>PM: StageExecutionResult

Stage Management Benefits

This stage manager provides enterprise-grade execution capabilities:

Performance Optimization: Parallel execution of independent rules within stages reduces total processing time.

Intelligent Dependency Handling: Cross-stage dependencies are resolved correctly while maintaining stage isolation.

Failure Isolation: Stage failures don’t automatically cascade to other stages.

Comprehensive Reporting: Detailed execution metrics enable performance monitoring and troubleshooting.

Timeout Protection: Long-running rules can’t block the entire pipeline indefinitely.

Architecture: The Stage-Aware Execution Engine

Now we need to orchestrate the entire pipeline, providing a clean API for executing specific stages or complete processing workflows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@dataclass
class PipelineExecutionResult:
    """Complete results from pipeline execution"""
    success: bool
    executed_stages: List[ExecutionStage]
    stage_results: Dict[ExecutionStage, StageExecutionResult]
    total_execution_time_ms: int
    pipeline_metadata: Dict[str, Any]
    
    def get_failed_stages(self) -> List[ExecutionStage]:
        """Get stages that failed during execution"""
        return [stage for stage, result in self.stage_results.items() 
                if not result.success]
    
    def get_all_failed_rules(self) -> Dict[ExecutionStage, List[str]]:
        """Get all failed rules organized by stage"""
        return {stage: result.failed_rules 
                for stage, result in self.stage_results.items()
                if result.failed_rules}

class ConfigurablePipelineExecutor:
    """Main orchestration engine for stage-aware rule execution"""
    
    def __init__(self, config: Dict[str, Any], 
                 service_container: ServiceContainer):
        self.config = config
        self.config_loader = RuleConfigurationLoader(service_container)
        self.stage_manager = StageManager(self.config_loader)
    
    def execute_pipeline(self, data: Dict[str, Any], 
                        stages: List[ExecutionStage] = None,
                        metadata: Dict[str, Any] = None) -> PipelineExecutionResult:
        """Execute complete pipeline or specific stages"""
        start_time = time.time()
        
        # Default to all configured stages
        if stages is None:
            stages = [ExecutionStage(stage_name) 
                     for stage_name in self.config["stages"].keys()]
        
        context = ExecutionContext(
            data=data,
            metadata=metadata or {},
            results={}
        )
        
        executed_stages = []
        stage_results = {}
        stop_on_failure = self.config["execution_settings"].get(
            "stop_on_stage_failure", True
        )
        
        for stage in stages:
            if stage.value not in self.config["stages"]:
                continue  # Skip unconfigured stages
            
            stage_config = self.config["stages"][stage.value]
            if not stage_config.get("enabled", True):
                continue  # Skip disabled stages
            
            # Execute the stage
            result = self.stage_manager.execute_stage(stage, self.config, context)
            stage_results[stage] = result
            executed_stages.append(stage)
            
            # Check if we should stop on failure
            if stop_on_failure and not result.success:
                break
        
        total_time = int((time.time() - start_time) * 1000)
        
        return PipelineExecutionResult(
            success=all(result.success for result in stage_results.values()),
            executed_stages=executed_stages,
            stage_results=stage_results,
            total_execution_time_ms=total_time,
            pipeline_metadata={
                "execution_timestamp": datetime.utcnow().isoformat(),
                "configuration_version": self.config.get("version", "unknown"),
                "total_rules_executed": sum(
                    len(result.rule_results) for result in stage_results.values()
                )
            }
        )
    
    def execute_single_stage(self, stage: ExecutionStage, 
                           data: Dict[str, Any],
                           metadata: Dict[str, Any] = None) -> StageExecutionResult:
        """Execute only a specific stage - useful for debugging and testing"""
        context = ExecutionContext(
            data=data,
            metadata=metadata or {},
            results={}
        )
        
        return self.stage_manager.execute_stage(stage, self.config, context)
    
    def validate_configuration(self) -> List[str]:
        """Validate the pipeline configuration for errors"""
        errors = []
        
        # Check for required configuration sections
        if "stages" not in self.config:
            errors.append("Missing 'stages' configuration section")
            return errors
        
        if "execution_settings" not in self.config:
            errors.append("Missing 'execution_settings' configuration section")
        
        # Validate each stage configuration
        for stage_name, stage_config in self.config["stages"].items():
            stage_errors = self._validate_stage_config(stage_name, stage_config)
            errors.extend(stage_errors)
        
        return errors
    
    def _validate_stage_config(self, stage_name: str, 
                             stage_config: Dict[str, Any]) -> List[str]:
        """Validate configuration for a single stage"""
        errors = []
        
        # Check for required stage fields
        if "rules" not in stage_config:
            errors.append(f"Stage '{stage_name}' missing 'rules' section")
            return errors
        
        # Validate each rule configuration
        for rule_id, rule_config in stage_config["rules"].items():
            if "class" not in rule_config:
                errors.append(f"Rule '{rule_id}' in stage '{stage_name}' missing 'class'")
            
            rule_class_name = rule_config.get("class")
            if rule_class_name not in self.config_loader.rule_registry:
                errors.append(f"Unknown rule class '{rule_class_name}' in rule '{rule_id}'")
        
        return errors
    
    def get_stage_summary(self) -> Dict[str, Any]:
        """Get summary information about configured stages"""
        summary = {}
        
        for stage_name, stage_config in self.config["stages"].items():
            summary[stage_name] = {
                "enabled": stage_config.get("enabled", True),
                "rule_count": len(stage_config.get("rules", {})),
                "parallel_execution": stage_config.get("parallel_execution", False),
                "fail_fast": stage_config.get("fail_fast", True),
                "timeout_seconds": stage_config.get("timeout_seconds", 30)
            }
        
        return summary
graph TD
    A[Pipeline Executor] --> B[Configuration Loader]
    A --> C[Stage Manager]
    
    B --> D[Rule Registry]
    B --> E[Service Container]
    
    C --> F[Sequential Execution]
    C --> G[Parallel Execution]
    C --> H[Dependency Resolution]
    
    F --> I[Stage Results]
    G --> I
    H --> I
    
    I --> J[Pipeline Results]
    
    K[Monitoring & Metrics] --> A
    L[Error Handling] --> A
    M[Configuration Validation] --> B
    
    style A fill:#e1f5fe
    style I fill:#f3e5f5
    style J fill:#e8f5e8

Architectural Benefits

This orchestration engine provides several key architectural advantages:

Selective Execution: Execute only the stages you need, enabling efficient debugging and testing workflows.

Configuration Validation: Catch configuration errors before runtime, preventing production failures.

Comprehensive Monitoring: Detailed metrics at both stage and pipeline levels support operational excellence.

Flexible Orchestration: Support for both complete pipeline execution and individual stage processing.

Error Isolation: Stage failures are contained and don’t corrupt the overall execution context.

Real-World Implementation: Data Quality Pipeline

Let’s implement a complete data quality pipeline that demonstrates the power of configuration-driven stage management in a realistic scenario:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
# Real-world pipeline configuration for customer onboarding
CUSTOMER_ONBOARDING_CONFIG = {
    "version": "2.1.0",
    "stages": {
        "preprocessing": {
            "enabled": True,
            "fail_fast": False,
            "parallel_execution": True,
            "timeout_seconds": 5,
            "max_parallel_rules": 6,
            "rules": {
                "trim_whitespace": {
                    "class": "MultiFieldTrimRule",
                    "params": {
                        "fields": ["name", "email", "address", "city", "state"]
                    }
                },
                "normalize_phone": {
                    "class": "PhoneNormalizationRule",
                    "params": {
                        "field_name": "phone",
                        "country_code": "US",
                        "format": "E164"
                    }
                },
                "normalize_address": {
                    "class": "AddressNormalizationRule",
                    "params": {
                        "address_service": "service:AddressService:google_maps"
                    }
                },
                "extract_name_components": {
                    "class": "NameParsingRule",
                    "params": {
                        "field_name": "name",
                        "extract_fields": ["first_name", "last_name", "middle_name"]
                    }
                },
                "standardize_email": {
                    "class": "EmailNormalizationRule",
                    "params": {
                        "field_name": "email",
                        "lowercase": True,
                        "remove_dots": False
                    }
                }
            }
        },
        
        "data_quality": {
            "enabled": True,
            "fail_fast": True,
            "parallel_execution": False,
            "timeout_seconds": 10,
            "rules": {
                "required_personal_info": {
                    "class": "RequiredFieldsRule",
                    "params": {
                        "required_fields": ["first_name", "last_name", "email", "phone"],
                        "field_descriptions": {
                            "first_name": "Customer first name",
                            "last_name": "Customer last name", 
                            "email": "Valid email address",
                            "phone": "US phone number"
                        }
                    }
                },
                "email_format_validation": {
                    "class": "EmailFormatRule",
                    "params": {
                        "field_name": "email",
                        "allow_disposable": False,
                        "check_mx_record": True
                    },
                    "dependencies": ["required_personal_info"]
                },
                "phone_format_validation": {
                    "class": "PhoneFormatRule",
                    "params": {
                        "field_name": "phone",
                        "country_code": "US",
                        "allow_extensions": False
                    },
                    "dependencies": ["required_personal_info"]
                },
                "data_consistency_check": {
                    "class": "DataConsistencyRule",
                    "params": {
                        "validation_rules": [
                            {
                                "fields": ["state", "zip_code"],
                                "validator": "state_zip_consistency"
                            },
                            {
                                "fields": ["area_code", "phone"],
                                "validator": "area_code_consistency"
                            }
                        ]
                    },
                    "dependencies": ["email_format_validation", "phone_format_validation"]
                }
            }
        },
        
        "compliance": {
            "enabled": True,
            "fail_fast": False,  # Collect all compliance violations
            "parallel_execution": True,
            "timeout_seconds": 45,
            "max_parallel_rules": 3,
            "rules": {
                "kyc_identity_verification": {
                    "class": "KYCIdentityRule",
                    "params": {
                        "kyc_service": "service:KYCService:jumio",
                        "verification_level": "enhanced",
                        "document_types": ["drivers_license", "passport", "state_id"]
                    },
                    "cross_stage_dependencies": ["required_personal_info", "data_consistency_check"]
                },
                "ofac_sanctions_screening": {
                    "class": "OFACSanctionsRule",
                    "params": {
                        "screening_service": "service:SanctionsService:worldcheck",
                        "match_threshold": 0.85,
                        "include_historical": True
                    },
                    "cross_stage_dependencies": ["required_personal_info"]
                },
                "pep_screening": {
                    "class": "PEPScreeningRule",
                    "params": {
                        "pep_service": "service:PEPService:dowjones",
                        "risk_categories": ["high", "medium"],
                        "include_associates": True
                    },
                    "cross_stage_dependencies": ["required_personal_info"],
                    "dependencies": ["ofac_sanctions_screening"]
                },
                "aml_risk_assessment": {
                    "class": "AMLRiskRule",
                    "params": {
                        "risk_engine": "service:RiskEngine:quantexa",
                        "risk_factors": [
                            "geographic_risk", 
                            "occupation_risk", 
                            "transaction_patterns"
                        ]
                    },
                    "cross_stage_dependencies": ["kyc_identity_verification"],
                    "dependencies": ["pep_screening"]
                }
            }
        },
        
        "business_rules": {
            "enabled": True,
            "fail_fast": False,
            "parallel_execution": True,
            "timeout_seconds": 30,
            "max_parallel_rules": 4,
            "rules": {
                "age_verification": {
                    "class": "AgeVerificationRule",
                    "params": {
                        "minimum_age": 18,
                        "date_field": "date_of_birth",
                        "verification_strict": True
                    },
                    "cross_stage_dependencies": ["kyc_identity_verification"]
                },
                "credit_score_check": {
                    "class": "CreditScoreRule",
                    "params": {
                        "credit_bureau": "service:CreditBureau:experian",
                        "minimum_score": 650,
                        "score_model": "FICO_8"
                    },
                    "cross_stage_dependencies": ["kyc_identity_verification"],
                    "dependencies": ["age_verification"]
                },
                "income_verification": {
                    "class": "IncomeVerificationRule",
                    "params": {
                        "verification_service": "service:IncomeService:argyle",
                        "minimum_monthly_income": 3000,
                        "verification_methods": ["bank_statements", "pay_stubs", "tax_returns"]
                    },
                    "cross_stage_dependencies": ["kyc_identity_verification"]
                },
                "debt_to_income_calculation": {
                    "class": "DebtToIncomeRule",
                    "params": {
                        "maximum_ratio": 0.43,
                        "include_mortgage": True,
                        "debt_service": "service:DebtService:yodlee"
                    },
                    "dependencies": ["income_verification", "credit_score_check"]
                },
                "geographic_eligibility": {
                    "class": "GeographicEligibilityRule",
                    "params": {
                        "allowed_states": [
                            "CA", "NY", "TX", "FL", "IL", "PA", "OH", "GA", "NC", "MI"
                        ],
                        "restricted_countries": ["sanctioned_countries"],
                        "address_field": "address"
                    },
                    "cross_stage_dependencies": ["data_consistency_check"]
                }
            }
        },
        
        "post_processing": {
            "enabled": True,
            "fail_fast": False,
            "parallel_execution": True,
            "timeout_seconds": 15,
            "rules": {
                "generate_customer_id": {
                    "class": "CustomerIDGenerationRule",
                    "params": {
                        "id_format": "CUST-{timestamp}-{hash}",
                        "hash_fields": ["email", "phone", "ssn"]
                    }
                },
                "create_audit_trail": {
                    "class": "AuditTrailRule",
                    "params": {
                        "audit_service": "service:AuditService:primary",
                        "include_pii": False,
                        "retention_days": 2555  # 7 years
                    }
                },
                "risk_score_calculation": {
                    "class": "RiskScoreAggregationRule",
                    "params": {
                        "scoring_model": "composite_v2",
                        "weight_factors": {
                            "credit_score": 0.4,
                            "aml_risk": 0.3,
                            "geographic_risk": 0.2,
                            "identity_confidence": 0.1
                        }
                    }
                },
                "notification_dispatch": {
                    "class": "NotificationRule",
                    "params": {
                        "notification_service": "service:NotificationService:sendgrid",
                        "templates": {
                            "success": "customer_onboarding_success",
                            "failure": "customer_onboarding_failure",
                            "manual_review": "customer_manual_review_required"
                        }
                    }
                }
            }
        }
    },
    
    "execution_settings": {
        "stop_on_stage_failure": True,
        "enable_audit_logging": True,
        "max_parallel_rules": 8,
        "default_timeout": 30,
        "retry_failed_rules": False,
        "performance_monitoring": {
            "enabled": True,
            "slow_rule_threshold_ms": 5000,
            "alert_on_timeout": True
        }
    }
}

# Concrete rule implementations for the data quality pipeline
class MultiFieldTrimRule(StagedRule):
    """Preprocessing rule that trims whitespace from multiple fields"""
    
    def __init__(self, rule_id: str, fields: List[str]):
        super().__init__(rule_id, ExecutionStage.PREPROCESSING)
        self.fields = fields
    
    def execute(self, data: dict) -> RuleResult:
        modified = False
        for field in self.fields:
            if field in data and isinstance(data[field], str):
                original_value = data[field]
                trimmed_value = original_value.strip()
                if original_value != trimmed_value:
                    data[field] = trimmed_value
                    modified = True
        
        return RuleResult.PASS if modified else RuleResult.SKIP

class RequiredFieldsRule(StagedRule):
    """Data quality rule with detailed error reporting"""
    
    def __init__(self, rule_id: str, required_fields: List[str], 
                 field_descriptions: Dict[str, str] = None):
        super().__init__(rule_id, ExecutionStage.DATA_QUALITY)
        self.required_fields = required_fields
        self.field_descriptions = field_descriptions or {}
    
    def execute(self, data: dict) -> RuleResult:
        missing_fields = []
        
        for field in self.required_fields:
            value = data.get(field)
            if value is None or (isinstance(value, str) and not value.strip()):
                missing_fields.append(field)
        
        if missing_fields:
            # Store detailed error information for reporting
            error_details = {
                "missing_fields": missing_fields,
                "field_descriptions": {
                    field: self.field_descriptions.get(field, f"Field '{field}' is required")
                    for field in missing_fields
                }
            }
            # In a real implementation, you'd store this in the execution context
            return RuleResult.FAIL
        
        return RuleResult.PASS

class KYCIdentityRule(StagedRule):
    """Compliance rule that performs identity verification"""
    
    def __init__(self, rule_id: str, kyc_service: 'KYCService', 
                 verification_level: str = "standard",
                 document_types: List[str] = None):
        super().__init__(rule_id, ExecutionStage.COMPLIANCE)
        self.kyc_service = kyc_service
        self.verification_level = verification_level
        self.document_types = document_types or ["drivers_license"]
    
    def execute(self, data: dict) -> RuleResult:
        try:
            verification_result = self.kyc_service.verify_identity(
                first_name=data.get("first_name"),
                last_name=data.get("last_name"),
                date_of_birth=data.get("date_of_birth"),
                address=data.get("address"),
                verification_level=self.verification_level,
                accepted_documents=self.document_types
            )
            
            # Store verification details for downstream rules
            data["kyc_verification_id"] = verification_result.verification_id
            data["identity_confidence_score"] = verification_result.confidence_score
            
            return RuleResult.PASS if verification_result.verified else RuleResult.FAIL
            
        except Exception as e:
            # Log the error and fail the rule
            print(f"KYC verification failed: {e}")
            return RuleResult.FAIL

class RiskScoreAggregationRule(StagedRule):
    """Post-processing rule that calculates composite risk scores"""
    
    def __init__(self, rule_id: str, scoring_model: str, 
                 weight_factors: Dict[str, float]):
        super().__init__(rule_id, ExecutionStage.POST_PROCESSING)
        self.scoring_model = scoring_model
        self.weight_factors = weight_factors
    
    def execute(self, data: dict) -> RuleResult:
        risk_components = {}
        
        # Extract risk scores from previous rule executions
        if "credit_score" in data:
            # Convert credit score to risk score (higher credit = lower risk)
            risk_components["credit_score"] = max(0, (850 - data["credit_score"]) / 850)
        
        if "aml_risk_score" in data:
            risk_components["aml_risk"] = data["aml_risk_score"]
        
        if "geographic_risk_score" in data:
            risk_components["geographic_risk"] = data["geographic_risk_score"]
        
        if "identity_confidence_score" in data:
            # Convert confidence to risk (higher confidence = lower risk)
            risk_components["identity_confidence"] = 1.0 - data["identity_confidence_score"]
        
        # Calculate weighted composite score
        composite_score = 0.0
        total_weight = 0.0
        
        for component, score in risk_components.items():
            if component in self.weight_factors:
                weight = self.weight_factors[component]
                composite_score += score * weight
                total_weight += weight
        
        if total_weight > 0:
            final_risk_score = composite_score / total_weight
            data["composite_risk_score"] = final_risk_score
            
            # Determine risk category
            if final_risk_score < 0.3:
                data["risk_category"] = "low"
            elif final_risk_score < 0.7:
                data["risk_category"] = "medium"
            else:
                data["risk_category"] = "high"
            
            return RuleResult.PASS
        
        return RuleResult.FAIL
flowchart TD
    A[Customer Application] --> B[Preprocessing Stage]
    
    B --> B1[Trim Whitespace]
    B --> B2[Normalize Phone]
    B --> B3[Standardize Email]
    
    B1 --> C[Data Quality Stage]
    B2 --> C
    B3 --> C
    
    C --> C1[Required Fields Check]
    C1 --> C2[Email Format Validation]
    C2 --> C3[Data Consistency Check]
    
    C3 --> D[Compliance Stage]
    
    D --> D1[KYC Identity Verification]
    D --> D2[OFAC Sanctions Screening]
    D1 --> D3[AML Risk Assessment]
    
    D1 --> E[Business Rules Stage]
    
    E --> E1[Age Verification]
    E1 --> E2[Credit Score Check]
    E --> E3[Income Verification]
    
    E2 --> F[Application Decision]
    E3 --> F
    
    style A fill:#e3f2fd
    style B fill:#f3e5f5
    style C fill:#fff3e0
    style D fill:#fce4ec
    style E fill:#f1f8e9
    style F fill:#e8f5e8

Let’s see this pipeline in action:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def process_customer_application():
    """Demonstrate the complete customer onboarding pipeline"""
    
    # Setup service container with mock services
    container = ServiceContainer()
    setup_customer_services(container)
    
    # Create pipeline executor
    executor = ConfigurablePipelineExecutor(CUSTOMER_ONBOARDING_CONFIG, container)
    
    # Validate configuration before execution
    config_errors = executor.validate_configuration()
    if config_errors:
        print("Configuration errors found:")
        for error in config_errors:
            print(f"  - {error}")
        return
    
    # Sample customer data
    customer_data = {
        "name": "  John Michael Smith  ",  # Has whitespace to be trimmed
        "email": "JOHN.SMITH@EXAMPLE.COM",  # Needs normalization
        "phone": "(555) 123-4567",  # Needs normalization
        "address": "123 Main St, Apt 2B",
        "city": "New York",
        "state": "NY",
        "zip_code": "10001",
        "date_of_birth": "1985-03-15",
        "ssn": "123-45-6789"
    }
    
    print("🚀 Starting customer onboarding pipeline...")
    print(f"📋 Pipeline configuration version: {CUSTOMER_ONBOARDING_CONFIG['version']}")
    print()
    
    # Execute the complete pipeline
    result = executor.execute_pipeline(
        data=customer_data,
        metadata={
            "application_id": "APP-2024-001234",
            "channel": "web_application",
            "referral_source": "organic_search"
        }
    )
    
    # Display comprehensive results
    print("📊 PIPELINE EXECUTION RESULTS")
    print("=" * 50)
    print(f"Overall Success: {'✅ PASS' if result.success else '❌ FAIL'}")
    print(f"Total Execution Time: {result.total_execution_time_ms}ms")
    print(f"Stages Executed: {len(result.executed_stages)}")
    print()
    
    # Stage-by-stage breakdown
    for stage in result.executed_stages:
        stage_result = result.stage_results[stage]
        status_icon = "" if stage_result.success else ""
        
        print(f"{status_icon} {stage.value.upper()} ({stage_result.execution_time_ms}ms)")
        print(f"   Rules Executed: {len(stage_result.rule_results)}")
        
        if stage_result.failed_rules:
            print(f"   Failed Rules: {', '.join(stage_result.failed_rules)}")
        
        if stage_result.skipped_rules:
            print(f"   Skipped Rules: {', '.join(stage_result.skipped_rules)}")
        
        print()
    
    # Failed rules summary
    failed_rules = result.get_all_failed_rules()
    if failed_rules:
        print("❌ FAILED RULES BY STAGE")
        print("-" * 30)
        for stage, rules in failed_rules.items():
            print(f"{stage.value}: {', '.join(rules)}")
        print()
    
    # Final customer data (after all transformations)
    print("📋 FINAL CUSTOMER DATA")
    print("-" * 25)
    for key, value in customer_data.items():
        print(f"{key}: {value}")
    
    return result

# Example of executing only specific stages
def process_data_quality_only():
    """Demonstrate executing only the data quality stage"""
    
    container = ServiceContainer()
    setup_customer_services(container)
    executor = ConfigurablePipelineExecutor(CUSTOMER_ONBOARDING_CONFIG, container)
    
    customer_data = {
        "first_name": "John",
        "last_name": "Smith", 
        "email": "john.smith@example.com",
        "phone": "+15551234567"
    }
    
    print("🔍 Running DATA QUALITY stage only...")
    
    # Execute only the data quality stage
    result = executor.execute_single_stage(
        ExecutionStage.DATA_QUALITY,
        customer_data,
        metadata={"stage_test": True}
    )
    
    print(f"Data Quality Result: {'✅ PASS' if result.success else '❌ FAIL'}")
    print(f"Execution Time: {result.execution_time_ms}ms")
    
    if result.failed_rules:
        print(f"Failed Rules: {', '.join(result.failed_rules)}")
    
    return result

Real-World Benefits Demonstrated

This implementation showcases several production-ready capabilities:

Comprehensive Data Processing: The pipeline handles everything from basic data cleaning to complex compliance validation.

Performance Optimization: Rules are executed in parallel where possible, and expensive operations are protected by prerequisites.

Business Flexibility: Different stages can be enabled/disabled based on business requirements or regulatory changes.

Operational Excellence: Detailed reporting and metrics support monitoring and troubleshooting in production.

Regulatory Compliance: Compliance rules are isolated and can be audited independently of business logic.

Enterprise Features: Advanced Stage Orchestration

Production environments require sophisticated orchestration capabilities beyond basic stage execution. Let’s implement advanced features that enable enterprise-scale operations:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
from typing import Callable, Optional
from enum import Enum
import asyncio
from concurrent.futures import Future

class StageExecutionMode(Enum):
    """Different execution modes for pipeline stages"""
    SEQUENTIAL = "sequential"      # Execute stages one by one
    PARALLEL = "parallel"          # Execute independent stages in parallel
    CONDITIONAL = "conditional"    # Execute stages based on conditions
    RETRY_ON_FAILURE = "retry"     # Retry failed stages with backoff

class ConditionalStageConfig:
    """Configuration for conditional stage execution"""
    
    def __init__(self, condition: Callable[[Dict], bool], 
                 reason: str = "Condition not met"):
        self.condition = condition
        self.reason = reason
    
    def should_execute(self, data: Dict[str, Any], 
                      previous_results: Dict[ExecutionStage, StageExecutionResult]) -> bool:
        """Determine if stage should execute based on current state"""
        return self.condition(data)

class RetryConfig:
    """Configuration for stage retry behavior"""
    
    def __init__(self, max_attempts: int = 3,
                 backoff_seconds: float = 1.0,
                 backoff_multiplier: float = 2.0,
                 retryable_failures: List[str] = None):
        self.max_attempts = max_attempts
        self.backoff_seconds = backoff_seconds
        self.backoff_multiplier = backoff_multiplier
        self.retryable_failures = retryable_failures or []

class AdvancedPipelineOrchestrator:
    """Enterprise-grade pipeline orchestrator with advanced execution modes"""
    
    def __init__(self, config: Dict[str, Any], 
                 service_container: ServiceContainer):
        self.config = config
        self.executor = ConfigurablePipelineExecutor(config, service_container)
        self.conditional_configs = {}
        self.retry_configs = {}
        self.stage_callbacks = {}
        
    def add_conditional_stage(self, stage: ExecutionStage, 
                            condition_config: ConditionalStageConfig):
        """Add conditional execution logic for a stage"""
        self.conditional_configs[stage] = condition_config
    
    def add_retry_config(self, stage: ExecutionStage, retry_config: RetryConfig):
        """Add retry configuration for a stage"""
        self.retry_configs[stage] = retry_config
    
    def add_stage_callback(self, stage: ExecutionStage, 
                          callback: Callable[[StageExecutionResult], None]):
        """Add callback to be executed after stage completion"""
        if stage not in self.stage_callbacks:
            self.stage_callbacks[stage] = []
        self.stage_callbacks[stage].append(callback)
    
    async def execute_pipeline_advanced(self, 
                                      data: Dict[str, Any],
                                      execution_mode: StageExecutionMode = StageExecutionMode.SEQUENTIAL,
                                      metadata: Dict[str, Any] = None) -> PipelineExecutionResult:
        """Execute pipeline with advanced orchestration features"""
        
        if execution_mode == StageExecutionMode.SEQUENTIAL:
            return await self._execute_sequential_with_advanced_features(data, metadata)
        elif execution_mode == StageExecutionMode.PARALLEL:
            return await self._execute_parallel_stages(data, metadata)
        elif execution_mode == StageExecutionMode.CONDITIONAL:
            return await self._execute_conditional_pipeline(data, metadata)
        elif execution_mode == StageExecutionMode.RETRY_ON_FAILURE:
            return await self._execute_with_retry_logic(data, metadata)
        else:
            # Fallback to standard execution
            return self.executor.execute_pipeline(data, metadata=metadata)
    
    async def _execute_sequential_with_advanced_features(self, 
                                                       data: Dict[str, Any],
                                                       metadata: Dict[str, Any]) -> PipelineExecutionResult:
        """Sequential execution with conditional logic and callbacks"""
        start_time = time.time()
        executed_stages = []
        stage_results = {}
        
        all_stages = [ExecutionStage(stage_name) 
                     for stage_name in self.config["stages"].keys()]
        
        for stage in all_stages:
            # Check if stage should be executed conditionally
            if stage in self.conditional_configs:
                condition_config = self.conditional_configs[stage]
                if not condition_config.should_execute(data, stage_results):
                    print(f"⏭️  Skipping {stage.value}: {condition_config.reason}")
                    continue
            
            # Execute the stage with potential retry logic
            if stage in self.retry_configs:
                result = await self._execute_stage_with_retry(stage, data, metadata)
            else:
                result = self.executor.execute_single_stage(stage, data, metadata)
            
            stage_results[stage] = result
            executed_stages.append(stage)
            
            # Execute callbacks
            if stage in self.stage_callbacks:
                for callback in self.stage_callbacks[stage]:
                    try:
                        callback(result)
                    except Exception as e:
                        print(f"⚠️  Callback error for {stage.value}: {e}")
            
            # Check if we should stop on failure
            if not result.success and self.config["execution_settings"].get("stop_on_stage_failure", True):
                break
        
        total_time = int((time.time() - start_time) * 1000)
        
        return PipelineExecutionResult(
            success=all(result.success for result in stage_results.values()),
            executed_stages=executed_stages,
            stage_results=stage_results,
            total_execution_time_ms=total_time,
            pipeline_metadata={
                "execution_mode": "sequential_advanced",
                "conditional_stages": list(self.conditional_configs.keys()),
                "retry_enabled_stages": list(self.retry_configs.keys()),
                **(metadata or {})
            }
        )
    
    async def _execute_stage_with_retry(self, stage: ExecutionStage, 
                                      data: Dict[str, Any],
                                      metadata: Dict[str, Any]) -> StageExecutionResult:
        """Execute stage with retry logic on failure"""
        retry_config = self.retry_configs[stage]
        
        for attempt in range(retry_config.max_attempts):
            try:
                result = self.executor.execute_single_stage(stage, data, metadata)
                
                if result.success:
                    return result
                
                # Check if failures are retryable
                if retry_config.retryable_failures:
                    retryable = any(failed_rule in retry_config.retryable_failures 
                                  for failed_rule in result.failed_rules)
                    if not retryable:
                        print(f"{stage.value}: Non-retryable failure, aborting retries")
                        return result
                
                if attempt < retry_config.max_attempts - 1:
                    backoff_time = (retry_config.backoff_seconds * 
                                  (retry_config.backoff_multiplier ** attempt))
                    print(f"🔄 {stage.value}: Attempt {attempt + 1} failed, retrying in {backoff_time}s")
                    await asyncio.sleep(backoff_time)
                
            except Exception as e:
                print(f"{stage.value}: Exception on attempt {attempt + 1}: {e}")
                if attempt == retry_config.max_attempts - 1:
                    # Create a failed result for the final attempt
                    return StageExecutionResult(
                        stage=stage,
                        success=False,
                        rule_results={},
                        execution_time_ms=0,
                        failed_rules=["stage_execution_error"],
                        skipped_rules=[],
                        error_messages={"stage_execution_error": str(e)}
                    )
        
        return result
    
    async def _execute_parallel_stages(self, data: Dict[str, Any],
                                     metadata: Dict[str, Any]) -> PipelineExecutionResult:
        """Execute independent stages in parallel for maximum performance"""
        start_time = time.time()
        
        # Analyze stage dependencies to determine parallel execution groups
        dependency_groups = self._analyze_stage_dependencies()
        
        stage_results = {}
        executed_stages = []
        
        for group in dependency_groups:
            # Execute all stages in this group in parallel
            if len(group) == 1:
                # Single stage - execute normally
                stage = group[0]
                result = self.executor.execute_single_stage(stage, data, metadata)
                stage_results[stage] = result
                executed_stages.append(stage)
            else:
                # Multiple stages - execute in parallel
                tasks = []
                for stage in group:
                    task = asyncio.create_task(
                        self._execute_stage_async(stage, data, metadata)
                    )
                    tasks.append((stage, task))
                
                # Wait for all stages in this group to complete
                for stage, task in tasks:
                    result = await task
                    stage_results[stage] = result
                    executed_stages.append(stage)
            
            # Check if any stage in this group failed and we should stop
            group_success = all(stage_results[stage].success for stage in group)
            if not group_success and self.config["execution_settings"].get("stop_on_stage_failure", True):
                break
        
        total_time = int((time.time() - start_time) * 1000)
        
        return PipelineExecutionResult(
            success=all(result.success for result in stage_results.values()),
            executed_stages=executed_stages,
            stage_results=stage_results,
            total_execution_time_ms=total_time,
            pipeline_metadata={
                "execution_mode": "parallel",
                "dependency_groups": len(dependency_groups),
                **(metadata or {})
            }
        )
    
    async def _execute_stage_async(self, stage: ExecutionStage, 
                                 data: Dict[str, Any],
                                 metadata: Dict[str, Any]) -> StageExecutionResult:
        """Async wrapper for stage execution"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            None, 
            self.executor.execute_single_stage, 
            stage, data, metadata
        )
    
    def _analyze_stage_dependencies(self) -> List[List[ExecutionStage]]:
        """Analyze stage dependencies to create parallel execution groups"""
        # This is a simplified implementation - in practice, you'd analyze
        # cross-stage dependencies to determine which stages can run in parallel
        
        # For demonstration, we'll create groups based on typical pipeline flow
        return [
            [ExecutionStage.PREPROCESSING],
            [ExecutionStage.DATA_QUALITY],
            [ExecutionStage.COMPLIANCE, ExecutionStage.BUSINESS_RULES],  # These can run in parallel
            [ExecutionStage.POST_PROCESSING]
        ]
graph TB
    A[Advanced Pipeline Orchestrator] --> B[Conditional Execution]
    A --> C[Retry Logic]
    A --> D[Stage Callbacks]
    A --> E[Parallel Execution]
    
    B --> B1[Risk-Based Compliance]
    B --> B2[Business Hours Logic]
    B --> B3[Feature Flags]
    
    C --> C1[Exponential Backoff]
    C --> C2[Retryable Failures]
    C --> C3[Circuit Breaker]
    
    D --> D1[Monitoring Hooks]
    D --> D2[Audit Logging]
    D --> D3[Metrics Collection]
    
    E --> E1[Independent Stages]
    E --> E2[Dependency Analysis]
    E --> E3[Resource Optimization]
    
    style A fill:#e1f5fe
    style B fill:#fff3e0
    style C fill:#fce4ec
    style D fill:#f3e5f5
    style E fill:#e8f5e8

Concrete implementations for advanced features

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def create_enterprise_orchestrator() -> AdvancedPipelineOrchestrator:
    """Create a fully configured enterprise orchestrator"""
    
    container = ServiceContainer()
    setup_enterprise_services(container)
    
    orchestrator = AdvancedPipelineOrchestrator(CUSTOMER_ONBOARDING_CONFIG, container)
    
    # Add conditional execution for high-risk customers
    orchestrator.add_conditional_stage(
        ExecutionStage.COMPLIANCE,
        ConditionalStageConfig(
            condition=lambda data: data.get("risk_category", "medium") in ["medium", "high"],
            reason="Low-risk customers skip enhanced compliance checks"
        )
    )
    
    # Add retry configuration for flaky external services
    orchestrator.add_retry_config(
        ExecutionStage.COMPLIANCE,
        RetryConfig(
            max_attempts=3,
            backoff_seconds=2.0,
            backoff_multiplier=2.0,
            retryable_failures=["kyc_identity_verification", "ofac_sanctions_screening"]
        )
    )
    
    # Add monitoring callbacks
    orchestrator.add_stage_callback(
        ExecutionStage.COMPLIANCE,
        lambda result: log_compliance_metrics(result)
    )
    
    orchestrator.add_stage_callback(
        ExecutionStage.BUSINESS_RULES,
        lambda result: update_business_metrics(result)
    )
    
    return orchestrator

def log_compliance_metrics(result: StageExecutionResult):
    """Callback to log compliance stage metrics"""
    print(f"📊 Compliance Metrics:")
    print(f"   Execution Time: {result.execution_time_ms}ms")
    print(f"   Success Rate: {len([r for r in result.rule_results.values() if r == RuleResult.PASS])}/{len(result.rule_results)}")
    
    if result.failed_rules:
        print(f"   Failed Compliance Rules: {', '.join(result.failed_rules)}")

def update_business_metrics(result: StageExecutionResult):
    """Callback to update business rule metrics"""
    print(f"💼 Business Rules Metrics:")
    print(f"   Rules Evaluated: {len(result.rule_results)}")
    print(f"   Approval Rate: {(len([r for r in result.rule_results.values() if r == RuleResult.PASS]) / len(result.rule_results)) * 100:.1f}%")

Production Ready: Complete Configuration System

Let’s implement a complete configuration management system that handles environment-specific configurations, dynamic updates, and operational monitoring:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import json
import yaml
from pathlib import Path
from typing import Union
from datetime import datetime
import hashlib

class ConfigurationManager:
    """Enterprise configuration manager with versioning and validation"""
    
    def __init__(self, config_path: Union[str, Path] = None):
        self.config_path = Path(config_path) if config_path else None
        self.current_config = None
        self.config_version = None
        self.config_hash = None
        self.validation_schema = self._load_validation_schema()
    
    def load_configuration(self, environment: str = "production") -> Dict[str, Any]:
        """Load configuration for specific environment"""
        
        if self.config_path and self.config_path.exists():
            # Load from file
            if self.config_path.suffix.lower() == '.json':
                with open(self.config_path, 'r') as f:
                    base_config = json.load(f)
            elif self.config_path.suffix.lower() in ['.yml', '.yaml']:
                with open(self.config_path, 'r') as f:
                    base_config = yaml.safe_load(f)
            else:
                raise ValueError(f"Unsupported configuration file format: {self.config_path.suffix}")
        else:
            # Use embedded configuration
            base_config = CUSTOMER_ONBOARDING_CONFIG.copy()
        
        # Apply environment-specific overrides
        config = self._apply_environment_overrides(base_config, environment)
        
        # Validate configuration
        validation_errors = self._validate_configuration(config)
        if validation_errors:
            raise ValueError(f"Configuration validation failed: {validation_errors}")
        
        # Update tracking information
        self.current_config = config
        self.config_version = config.get("version", "unknown")
        self.config_hash = self._calculate_config_hash(config)
        
        return config
    
    def _apply_environment_overrides(self, base_config: Dict[str, Any], 
                                   environment: str) -> Dict[str, Any]:
        """Apply environment-specific configuration overrides"""
        
        # Environment-specific overrides
        env_overrides = {
            "development": {
                "execution_settings": {
                    "enable_audit_logging": False,
                    "performance_monitoring": {"enabled": False}
                },
                "stages": {
                    "compliance": {
                        "enabled": False,  # Skip compliance in dev
                        "rules": {}
                    },
                    "business_rules": {
                        "timeout_seconds": 5  # Faster timeouts in dev
                    }
                }
            },
            "staging": {
                "execution_settings": {
                    "stop_on_stage_failure": False,  # Continue processing in staging
                },
                "stages": {
                    "compliance": {
                        "timeout_seconds": 15,  # Shorter timeouts for testing
                        "rules": {
                            "kyc_identity_verification": {
                                "params": {
                                    "verification_level": "basic"  # Faster verification
                                }
                            }
                        }
                    }
                }
            },
            "production": {
                # Production uses base configuration with enhanced monitoring
                "execution_settings": {
                    "performance_monitoring": {
                        "enabled": True,
                        "slow_rule_threshold_ms": 3000,
                        "alert_on_timeout": True,
                        "detailed_metrics": True
                    }
                }
            }
        }
        
        if environment in env_overrides:
            return self._deep_merge_configs(base_config, env_overrides[environment])
        
        return base_config
    
    def _deep_merge_configs(self, base: Dict[str, Any], 
                          override: Dict[str, Any]) -> Dict[str, Any]:
        """Deep merge configuration dictionaries"""
        result = base.copy()
        
        for key, value in override.items():
            if key in result and isinstance(result[key], dict) and isinstance(value, dict):
                result[key] = self._deep_merge_configs(result[key], value)
            else:
                result[key] = value
        
        return result
    
    def _validate_configuration(self, config: Dict[str, Any]) -> List[str]:
        """Validate configuration against schema"""
        errors = []
        
        # Required top-level sections
        required_sections = ["stages", "execution_settings"]
        for section in required_sections:
            if section not in config:
                errors.append(f"Missing required section: {section}")
        
        # Validate stages
        if "stages" in config:
            for stage_name, stage_config in config["stages"].items():
                stage_errors = self._validate_stage_configuration(stage_name, stage_config)
                errors.extend(stage_errors)
        
        return errors
    
    def _validate_stage_configuration(self, stage_name: str, 
                                    stage_config: Dict[str, Any]) -> List[str]:
        """Validate individual stage configuration"""
        errors = []
        
        # Check for required stage fields
        if "rules" not in stage_config:
            errors.append(f"Stage '{stage_name}' missing 'rules' section")
            return errors
        
        # Validate rule configurations
        for rule_id, rule_config in stage_config["rules"].items():
            if "class" not in rule_config:
                errors.append(f"Rule '{rule_id}' in stage '{stage_name}' missing 'class' field")
            
            # Validate timeout values
            if "timeout_seconds" in stage_config:
                timeout = stage_config["timeout_seconds"]
                if not isinstance(timeout, (int, float)) or timeout <= 0:
                    errors.append(f"Invalid timeout for stage '{stage_name}': {timeout}")
        
        return errors
    
    def _calculate_config_hash(self, config: Dict[str, Any]) -> str:
        """Calculate hash of configuration for change detection"""
        config_str = json.dumps(config, sort_keys=True)
        return hashlib.sha256(config_str.encode()).hexdigest()[:12]
    
    def _load_validation_schema(self) -> Dict[str, Any]:
        """Load configuration validation schema"""
        # In a real implementation, this would load from a JSON Schema file
        return {
            "type": "object",
            "required": ["stages", "execution_settings"],
            "properties": {
                "stages": {"type": "object"},
                "execution_settings": {"type": "object"}
            }
        }

graph TD
    A[Configuration Manager] --> B[Environment Detection]
    B --> C[Base Configuration]
    B --> D[Environment Overrides]
    
    C --> E[Configuration Merger]
    D --> E
    
    E --> F[Configuration Validation]
    F --> G[Version Tracking]
    G --> H[Hash Calculation]
    
    H --> I[Production Config]
    
    J[Development Overrides] -.-> D
    K[Staging Overrides] -.-> D
    L[Production Overrides] -.-> D
    
    M[JSON/YAML Files] --> C
    N[Database Config] --> C
    O[Environment Variables] --> D
    
    style A fill:#e1f5fe
    style I fill:#e8f5e8

Production Pipeline Manager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
class ProductionPipelineManager:
    """Production-ready pipeline manager with full operational support"""
    
    def __init__(self, config_manager: ConfigurationManager,
                 environment: str = "production"):
        self.config_manager = config_manager
        self.environment = environment
        self.orchestrator = None
        self.metrics_collector = MetricsCollector()
        self._initialize_pipeline()
    
    def _initialize_pipeline(self):
        """Initialize the pipeline with current configuration"""
        config = self.config_manager.load_configuration(self.environment)
        
        container = ServiceContainer()
        self._setup_production_services(container)
        
        self.orchestrator = AdvancedPipelineOrchestrator(config, container)
        self._configure_production_features()
    
    def _setup_production_services(self, container: ServiceContainer):
        """Setup all production service dependencies"""
        # Database services
        container.register(DatabaseClient, ProductionDatabaseClient, singleton=True)
        container.register(CacheClient, ProductionRedisClient, singleton=True)
        
        # External API services
        container.register(KYCService, JumioKYCService, singleton=True)
        container.register(SanctionsService, WorldCheckService, singleton=True)
        container.register(CreditBureau, ExperianCreditService, singleton=True)
        
        # Monitoring and logging
        container.register(Logger, StructuredLogger, name="application")
        container.register(Logger, MetricsLogger, name="metrics")
        container.register(MetricsCollector, self.metrics_collector, singleton=True)
    
    def _configure_production_features(self):
        """Configure production-specific features"""
        # Add comprehensive monitoring callbacks
        for stage in ExecutionStage:
            self.orchestrator.add_stage_callback(
                stage, 
                lambda result, s=stage: self._record_stage_metrics(s, result)
            )
        
        # Configure retry logic for external services
        self.orchestrator.add_retry_config(
            ExecutionStage.COMPLIANCE,
            RetryConfig(max_attempts=3, backoff_seconds=1.0, backoff_multiplier=2.0)
        )
        
        # Add conditional execution based on business hours
        self.orchestrator.add_conditional_stage(
            ExecutionStage.BUSINESS_RULES,
            ConditionalStageConfig(
                condition=lambda data: self._is_business_hours(),
                reason="Business rules only execute during business hours"
            )
        )
    
    async def process_application(self, application_data: Dict[str, Any],
                                application_id: str = None) -> Dict[str, Any]:
        """Process customer application with full production monitoring"""
        
        processing_start = datetime.utcnow()
        application_id = application_id or f"APP-{int(time.time())}"
        
        # Add processing metadata
        metadata = {
            "application_id": application_id,
            "processing_start": processing_start.isoformat(),
            "environment": self.environment,
            "config_version": self.config_manager.config_version,
            "config_hash": self.config_manager.config_hash
        }
        
        try:
            # Execute the pipeline
            result = await self.orchestrator.execute_pipeline_advanced(
                data=application_data,
                execution_mode=StageExecutionMode.SEQUENTIAL,
                metadata=metadata
            )
            
            # Record overall metrics
            self.metrics_collector.record_pipeline_execution(
                application_id=application_id,
                success=result.success,
                execution_time_ms=result.total_execution_time_ms,
                stages_executed=len(result.executed_stages),
                failed_stages=len(result.get_failed_stages())
            )
            
            # Generate comprehensive response
            response = {
                "application_id": application_id,
                "processing_result": "approved" if result.success else "rejected",
                "processing_time_ms": result.total_execution_time_ms,
                "stages_executed": [stage.value for stage in result.executed_stages],
                "pipeline_metadata": result.pipeline_metadata,
                "customer_data": application_data,  # Including any transformations
                "audit_trail": self._generate_audit_trail(result),
                "next_steps": self._determine_next_steps(result)
            }
            
            # Add failure details if applicable
            if not result.success:
                response["rejection_reasons"] = self._generate_rejection_reasons(result)
                response["failed_rules"] = result.get_all_failed_rules()
            
            return response
            
        except Exception as e:
            # Record error metrics
            self.metrics_collector.record_pipeline_error(
                application_id=application_id,
                error_type=type(e).__name__,
                error_message=str(e)
            )
            
            return {
                "application_id": application_id,
                "processing_result": "error",
                "error_message": str(e),
                "processing_time_ms": int((datetime.utcnow() - processing_start).total_seconds() * 1000)
            }
    
    def _record_stage_metrics(self, stage: ExecutionStage, result: StageExecutionResult):
        """Record detailed metrics for each stage"""
        self.metrics_collector.record_stage_execution(
            stage=stage.value,
            success=result.success,
            execution_time_ms=result.execution_time_ms,
            rules_executed=len(result.rule_results),
            rules_failed=len(result.failed_rules),
            rules_skipped=len(result.skipped_rules)
        )
    
    def _is_business_hours(self) -> bool:
        """Check if current time is within business hours"""
        current_hour = datetime.utcnow().hour
        return 9 <= current_hour <= 17  # 9 AM to 5 PM UTC
    
    def _generate_audit_trail(self, result: PipelineExecutionResult) -> List[Dict[str, Any]]:
        """Generate detailed audit trail for compliance"""
        audit_events = []
        
        for stage in result.executed_stages:
            stage_result = result.stage_results[stage]
            
            audit_events.append({
                "timestamp": datetime.utcnow().isoformat(),
                "event_type": "stage_execution",
                "stage": stage.value,
                "result": "success" if stage_result.success else "failure",
                "execution_time_ms": stage_result.execution_time_ms,
                "rules_details": {
                    rule_id: result.value for rule_id, result in stage_result.rule_results.items()
                }
            })
        
        return audit_events
    
    def _determine_next_steps(self, result: PipelineExecutionResult) -> List[str]:
        """Determine recommended next steps based on pipeline results"""
        if result.success:
            return [
                "Customer application approved",
                "Proceed with account setup",
                "Send welcome email to customer"
            ]
        
        failed_stages = result.get_failed_stages()
        next_steps = []
        
        if ExecutionStage.DATA_QUALITY in failed_stages:
            next_steps.append("Request additional information from customer")
        
        if ExecutionStage.COMPLIANCE in failed_stages:
            next_steps.append("Escalate to compliance team for manual review")
        
        if ExecutionStage.BUSINESS_RULES in failed_stages:
            next_steps.append("Consider alternative product offerings")
        
        return next_steps or ["Manual review required"]
    
    def _generate_rejection_reasons(self, result: PipelineExecutionResult) -> List[str]:
        """Generate customer-friendly rejection reasons"""
        failed_rules = result.get_all_failed_rules()
        reasons = []
        
        for stage, rules in failed_rules.items():
            if stage == ExecutionStage.DATA_QUALITY:
                reasons.append("Please verify and complete all required information")
            elif stage == ExecutionStage.COMPLIANCE:
                reasons.append("Additional identity verification required")
            elif stage == ExecutionStage.BUSINESS_RULES:
                reasons.append("Application does not meet current approval criteria")
        
        return reasons or ["Application cannot be processed at this time"]

class MetricsCollector:
    """Collects and aggregates pipeline metrics for monitoring"""
    
    def __init__(self):
        self.pipeline_executions = []
        self.stage_executions = []
        self.errors = []
    
    def record_pipeline_execution(self, application_id: str, success: bool,
                                execution_time_ms: int, stages_executed: int,
                                failed_stages: int):
        """Record pipeline-level execution metrics"""
        self.pipeline_executions.append({
            "timestamp": datetime.utcnow().isoformat(),
            "application_id": application_id,
            "success": success,
            "execution_time_ms": execution_time_ms,
            "stages_executed": stages_executed,
            "failed_stages": failed_stages
        })
    
    def record_stage_execution(self, stage: str, success: bool,
                             execution_time_ms: int, rules_executed: int,
                             rules_failed: int, rules_skipped: int):
        """Record stage-level execution metrics"""
        self.stage_executions.append({
            "timestamp": datetime.utcnow().isoformat(),
            "stage": stage,
            "success": success,
            "execution_time_ms": execution_time_ms,
            "rules_executed": rules_executed,
            "rules_failed": rules_failed,
            "rules_skipped": rules_skipped
        })
    
    def record_pipeline_error(self, application_id: str, 
                            error_type: str, error_message: str):
        """Record pipeline execution errors"""
        self.errors.append({
            "timestamp": datetime.utcnow().isoformat(),
            "application_id": application_id,
            "error_type": error_type,
            "error_message": error_message
        })
    
    def get_performance_summary(self) -> Dict[str, Any]:
        """Generate performance summary for monitoring dashboards"""
        if not self.pipeline_executions:
            return {"message": "No execution data available"}
        
        successful_executions = [e for e in self.pipeline_executions if e["success"]]
        total_executions = len(self.pipeline_executions)
        
        return {
            "total_executions": total_executions,
            "success_rate": len(successful_executions) / total_executions if total_executions > 0 else 0,
            "average_execution_time_ms": sum(e["execution_time_ms"] for e in self.pipeline_executions) / total_executions,
            "error_count": len(self.errors),
            "stage_performance": self._calculate_stage_performance()
        }
    
    def _calculate_stage_performance(self) -> Dict[str, Any]:
        """Calculate performance metrics by stage"""
        stage_stats = {}
        
        for execution in self.stage_executions:
            stage = execution["stage"]
            if stage not in stage_stats:
                stage_stats[stage] = {
                    "executions": 0,
                    "successes": 0,
                    "total_time_ms": 0,
                    "failures": 0
                }
            
            stats = stage_stats[stage]
            stats["executions"] += 1
            stats["total_time_ms"] += execution["execution_time_ms"]
            
            if execution["success"]:
                stats["successes"] += 1
            else:
                stats["failures"] += 1
        
        # Calculate rates and averages
        for stage, stats in stage_stats.items():
            stats["success_rate"] = stats["successes"] / stats["executions"]
            stats["average_time_ms"] = stats["total_time_ms"] / stats["executions"]
        
        return stage_stats

# Complete production example
async def run_production_pipeline():
    """Demonstrate complete production pipeline with monitoring"""
    
    print("🏭 PRODUCTION PIPELINE DEMO")
    print("=" * 50)
    
    # Initialize configuration manager
    config_manager = ConfigurationManager()
    
    # Create production pipeline manager
    pipeline_manager = ProductionPipelineManager(config_manager, environment="production")
    
    # Sample applications to process
    applications = [
        {
            "name": "John Smith",
            "email": "john.smith@example.com",
            "phone": "555-123-4567",
            "address": "123 Main St",
            "city": "New York", 
            "state": "NY",
            "zip_code": "10001",
            "date_of_birth": "1985-03-15",
            "ssn": "123-45-6789",
            "annual_income": 75000
        },
        {
            "name": "Jane Doe",
            "email": "jane.doe@example.com", 
            "phone": "555-987-6543",
            # Missing required fields to test failure handling
            "city": "Los Angeles",
            "state": "CA"
        }
    ]
    
    # Process each application
    for i, app_data in enumerate(applications, 1):
        print(f"\n📋 Processing Application {i}")
        print("-" * 30)
        
        result = await pipeline_manager.process_application(
            application_data=app_data,
            application_id=f"DEMO-APP-{i:03d}"
        )
        
        print(f"Application ID: {result['application_id']}")
        print(f"Result: {result['processing_result'].upper()}")
        print(f"Processing Time: {result['processing_time_ms']}ms")
        
        if result['processing_result'] == 'rejected':
            print("Rejection Reasons:")
            for reason in result.get('rejection_reasons', []):
                print(f"{reason}")
        
        print("Next Steps:")
        for step in result.get('next_steps', []):
            print(f"{step}")
    
    # Display performance metrics
    print(f"\n📊 PERFORMANCE METRICS")
    print("=" * 30)
    
    metrics = pipeline_manager.metrics_collector.get_performance_summary()
    print(f"Total Executions: {metrics['total_executions']}")
    print(f"Success Rate: {metrics['success_rate']:.1%}")
    print(f"Average Execution Time: {metrics['average_execution_time_ms']:.0f}ms")
    print(f"Error Count: {metrics['error_count']}")
    
    print(f"\n📈 STAGE PERFORMANCE")
    print("-" * 25)
    for stage, stats in metrics['stage_performance'].items():
        print(f"{stage.upper()}:")
        print(f"  Success Rate: {stats['success_rate']:.1%}")
        print(f"  Avg Time: {stats['average_time_ms']:.0f}ms")
        print(f"  Executions: {stats['executions']}")

# Mock service implementations for demonstration
class ProductionDatabaseClient:
    def query(self, sql: str): return []
    def execute(self, sql: str): return True

class ProductionRedisClient:
    def get(self, key: str): return None
    def set(self, key: str, value: str): return True

class JumioKYCService:
    def verify_identity(self, **kwargs): 
        return type('VerificationResult', (), {
            'verified': True, 
            'verification_id': 'KYC-12345',
            'confidence_score': 0.95
        })()

class WorldCheckService:
    def screen_sanctions(self, **kwargs): return {"match": False, "confidence": 0.0}

Conclusion: Lessons from Production

Building this configuration-driven rule engine taught us several crucial lessons about enterprise software architecture:

mindmap
  root((Production Lessons))
    Configuration as Code
      Versioning Support
      Environment Flexibility
      Business User Access
      Deployment Independence
    
    Stage Isolation
      Performance Optimization
      Error Containment
      Independent Development
      Operational Flexibility
    
    Monitoring Built-In
      Stage-Level Metrics
      Rule Performance
      Business Outcomes
      Operational Excellence
    
    Architecture Principles
      Progressive Enhancement
      Configuration Validation
      Error Isolation
      Business Transparency

Key Insights

Configuration as Code Works: Treating pipeline configurations as first-class code artifacts enabled versioning, testing, and environment-specific deployments. The ability to modify business logic through configuration changes rather than code deployments proved invaluable in regulated industries.

Stage Isolation Enables Scalability: Organizing rules into logical stages solved multiple problems simultaneously - performance optimization, error isolation, debugging simplicity, and operational flexibility. Teams could work on different stages independently without conflicts.

Dependency Management is Critical: Automatic dependency resolution within stages and explicit cross-stage dependencies eliminated entire classes of bugs while making business rule relationships explicit and maintainable.

Monitoring Must Be Built-In: Production systems require comprehensive observability from day one. Stage-level metrics, rule-level performance tracking, and business outcome monitoring proved essential for operational excellence.

Flexibility vs. Complexity Trade-offs: Advanced features like conditional execution, retry logic, and parallel processing add significant value but also increase system complexity. The key is providing sensible defaults while enabling power users to access advanced capabilities.

Architecture Principles That Emerged

Progressive Enhancement: Start with simple sequential execution and add advanced features as business needs evolve. Each enhancement should be opt-in and backward compatible.

Configuration Validation: Invalid configurations cause production outages. Comprehensive validation with clear error messages prevents most configuration-related incidents.

Error Isolation: Failures in one stage or rule should not cascade to unrelated parts of the system. Bulkhead patterns and circuit breakers are essential for resilience.

Business Logic Transparency: Non-technical stakeholders must be able to understand and verify business rules. Configuration-driven approaches make business logic accessible to domain experts.

Operational Excellence: Production systems need more than just functional correctness - they need comprehensive monitoring, debugging capabilities, and operational tooling.

Future Evolution

The architecture we’ve built provides a foundation for advanced capabilities:

Machine Learning Integration: Rule parameters can be learned from historical data rather than manually configured. Success rates, timeout values, and risk thresholds can be continuously optimized.

Real-Time Configuration Updates: Hot-swapping rule configurations without system restarts enables rapid response to changing business conditions or regulatory requirements.

A/B Testing Framework: Different customer segments can be processed with different rule configurations to optimize business outcomes while maintaining compliance.

Distributed Execution: Rules can be distributed across multiple services or cloud regions for improved performance and resilience.

Rule Recommendation Engine: Analysis of rule execution patterns can suggest configuration optimizations or identify redundant rules.

Broader Applications

This pattern extends far beyond data validation:

Content Moderation Pipelines: Social media platforms can use staged rule engines to detect and handle problematic content with appropriate escalation paths.

Fraud Detection Systems: Financial institutions can implement sophisticated fraud detection with preprocessing (data enrichment), detection (rule evaluation), and response (account actions) stages.

Supply Chain Automation: Manufacturing companies can automate quality control, compliance checking, and logistics optimization through configurable rule pipelines.

Healthcare Workflows: Medical systems can implement patient care pathways with condition-specific rules, treatment protocols, and outcome monitoring.

Regulatory Compliance: Any regulated industry can benefit from explicit, auditable, and configurable compliance rule engines.

Implementation Recommendations

For organizations adopting this pattern:

Start Small: Begin with a single complex validation function and extract rules incrementally. Don’t attempt to build the complete system at once.

Invest in Configuration Tooling: Business users need intuitive interfaces for managing rule configurations. Command-line tools and JSON files are sufficient for technical teams but limit business involvement.

Plan for Scale: Design your rule registry and dependency resolution systems to handle hundreds or thousands of rules. Performance characteristics change significantly at scale.

Embrace Testing: Configuration-driven systems require comprehensive test coverage of both individual rules and complete pipeline scenarios. Invest in testing infrastructure early.

Monitor Everything: Production systems need metrics at multiple levels - individual rules, stages, complete pipelines, and business outcomes. Design monitoring requirements alongside functional requirements.


The journey from scattered validation logic to enterprise-grade rule orchestration demonstrates how thoughtful application of software architecture principles can transform business-critical systems. The key insight is that business rules deserve the same engineering rigor we apply to other system components - proper abstraction, dependency management, configuration management, and operational excellence.

Ready to implement configuration-driven rule orchestration in your organization? Start with the stage abstraction pattern and build incrementally. The modular approach we’ve demonstrated works in production environments and provides immediate value while building toward comprehensive rule management capabilities.

Code file

Download the complete code for this rule engine implementation

The next evolution in this series will explore Conditional Rule Orchestration - stay tuned for Part 3!

This post is licensed under CC BY 4.0 by the author.