From Control to Action Part 4: Building Dynamic Rule Execution APIs
Continuing our journey from flexible validation to agile, on-demand rule execution, powered by API endpoints
Table of Contents
- Introduction: From Pipeline to Precision
- The Challenge: Dynamic, Ad Hoc Rule Execution
- Design Patterns: Ad Hoc Rule Execution as a First-Class Feature
- API Architecture: Professional-Grade Dynamic Endpoints
- Implementation: The Dependency Resolution Engine
- Advanced Features: Rule Grouping and Batch Operations
- Security and Authorization: Enterprise-Grade Access Control
- Real-World Implementation: Multi-Tenant Rule Execution
- Performance Optimization: Caching and Parallel Execution
- Production Monitoring: Observability and Metrics
- Conclusion: From Static Pipelines to Dynamic Platforms
Introduction: From Pipeline to Precision
Our previous articles transformed chaotic validation logic into maintainable, scalable, and configuration-driven rule engines[1][2][3]. We mastered dependency resolution, staged orchestration, and conditional execution management. Yet production environments revealed a new frontier: users and systems don’t always want to run the entire pipeline.
Consider these real-world scenarios that traditional rule engines struggle to address:
- UI Validation: A registration form needs to validate just email format and phone number—not the full customer onboarding pipeline
- Debugging Workflows: Engineers need to re-run specific compliance rules on previously processed data to investigate failures
- A/B Testing: Product teams want to test new business rules on a subset of data without deploying entire pipeline changes
- External Integration: Partner systems need targeted validation capabilities exposed through clean API contracts
- Incident Response: Operations teams require surgical rule execution during production incidents
Today, we’ll architect a solution that transforms our rule engine from a monolithic processor into a dynamic, API-driven validation platform—preserving all architectural benefits while enabling unprecedented operational flexibility.
The Challenge: Dynamic, Ad Hoc Rule Execution
Traditional rule engines excel at predetermined workflows but fail when flexibility becomes paramount. Consider what happens when you try to execute specific rules from our established pipeline:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# This is what we DON'T want - rigid, all-or-nothing execution
def validate_customer_subset(customer_data, specific_rules):
executor = RuleExecutor()
# Problem 1: Must register ALL rules even if we only want a few
executor.register_rule(EmailRequiredRule("email_required", "email"))
executor.register_rule(EmailFormatRule("email_format", "email"))
executor.register_rule(PhoneRequiredRule("phone_required", "phone"))
executor.register_rule(PhoneFormatRule("phone_format", "phone"))
executor.register_rule(AddressValidationRule("address_validation", "address"))
# Problem 2: No way to execute only specific rules
# Problem 3: Dependencies aren't automatically resolved
# Problem 4: No audit trail for partial execution
return executor.execute_all(customer_data) # Runs everything!
The Fundamental Problems
Dependency Blindness: Requesting email_format
without email_required
produces confusing failures, but manually tracking dependencies is error-prone.
All-or-Nothing Execution: Existing patterns force complete pipeline execution even when only specific validations are needed.
No Execution Context: Traditional engines can’t distinguish between full pipeline runs and targeted validation requests.
Limited Observability: Partial executions lack proper audit trails, making debugging and compliance tracking impossible.
Integration Complexity: External systems can’t easily consume specific validation logic without understanding internal pipeline structure.
The solution requires dependency-aware, granular execution with comprehensive observability—transforming our rule engine into a true validation platform.
Design Patterns: Ad Hoc Rule Execution as a First-Class Feature
The breakthrough insight is treating selective rule execution as a primary use case, not an afterthought. This requires several architectural enhancements:
Pattern 1: Recursive Dependency Resolution
When users request specific rules, the system must automatically discover and include all prerequisites:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class DependencyResolver:
"""Advanced dependency resolution with transitive closure"""
def resolve_execution_graph(self, requested_rules: List[str],
rule_registry: Dict[str, Rule]) -> ExecutionGraph:
"""Build complete execution graph from partial rule requests"""
# Phase 1: Recursive dependency discovery
required_rules = set()
self._discover_dependencies(requested_rules, rule_registry, required_rules)
# Phase 2: Topological sorting with cycle detection
execution_order = self._topological_sort(required_rules, rule_registry)
# Phase 3: Execution graph construction
return ExecutionGraph(
requested=requested_rules,
resolved=list(required_rules),
execution_order=execution_order,
dependency_map=self._build_dependency_map(required_rules, rule_registry)
)
def _discover_dependencies(self, rule_ids: List[str],
registry: Dict[str, Rule],
discovered: Set[str]):
"""Recursively discover all rule dependencies"""
for rule_id in rule_ids:
if rule_id in discovered:
continue
if rule_id not in registry:
raise UnknownRuleError(f"Rule '{rule_id}' not found in registry")
discovered.add(rule_id)
rule = registry[rule_id]
# Handle both direct and conditional dependencies
all_deps = getattr(rule, 'dependencies', [])
conditional_deps = getattr(rule, 'conditional_dependencies', {})
all_deps.extend(conditional_deps.keys())
if all_deps:
self._discover_dependencies(all_deps, registry, discovered)
Pattern 2: Execution Context Preservation
Each API request creates an isolated execution context that preserves request metadata throughout rule execution:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@dataclass
class ApiExecutionContext:
"""Enhanced execution context for API-driven rule execution"""
# Request identification
request_id: str
execution_timestamp: datetime
# Rule selection
requested_rules: List[str]
resolved_rules: List[str]
execution_order: List[str]
# Input data and options
input_data: Dict[str, Any]
execution_options: ExecutionOptions
# Runtime state
rule_results: Dict[str, RuleResult] = field(default_factory=dict)
execution_metadata: Dict[str, Any] = field(default_factory=dict)
# Performance tracking
start_time: float = field(default_factory=time.time)
rule_timings: Dict[str, float] = field(default_factory=dict)
Pattern 3: Smart Execution Control
The API supports sophisticated execution control that maintains dependency safety:
1
2
3
4
5
6
7
8
9
10
11
class ExecutionOptions:
"""Configuration options for API-driven rule execution"""
def __init__(self):
self.fail_fast: bool = True
self.collect_failure_details: bool = True
self.skip_optional_rules: bool = False
self.parallel_execution: bool = False
self.timeout_seconds: Optional[int] = None
self.audit_level: AuditLevel = AuditLevel.STANDARD
self.custom_metadata: Dict[str, Any] = {}
These patterns ensure that API-driven execution maintains all the safety and observability characteristics of full pipeline execution while enabling unprecedented flexibility.
API Architecture: Professional-Grade Dynamic Endpoints
A production-ready API requires careful interface design that balances simplicity with powerful capabilities:
Core Endpoint Design
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
POST /api/v1/rules/execute
Content-Type: application/json
Authorization: Bearer
{
"rule_ids": ["email_format", "phone_format", "address_validation"],
"data": {
"email": "alice@example.com",
"phone": "+1-555-123-4567",
"address": "123 Main St, Anytown USA"
},
"execution_options": {
"fail_fast": false,
"collect_failure_details": true,
"parallel_execution": true,
"timeout_seconds": 30,
"audit_level": "detailed"
},
"metadata": {
"request_source": "registration_form",
"user_id": "user_12345",
"session_id": "sess_abcdef"
}
}
Comprehensive Response Structure
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
{
"request_id": "req_20250727_143052_7af8",
"execution_summary": {
"success": false,
"total_rules_executed": 6,
"requested_rules_count": 3,
"resolved_rules_count": 6,
"execution_time_ms": 247
},
"rule_execution": {
"requested": ["email_format", "phone_format", "address_validation"],
"resolved": ["email_required", "phone_required", "email_format", "phone_format", "address_required", "address_validation"],
"execution_order": ["email_required", "phone_required", "address_required", "email_format", "phone_format", "address_validation"],
"results": {
"email_required": "pass",
"phone_required": "pass",
"address_required": "pass",
"email_format": "pass",
"phone_format": "fail",
"address_validation": "skip"
}
},
"validation_details": {
"messages": {
"phone_format": "Phone number format invalid for US numbers"
},
"failure_details": {
"phone_format": {
"error_code": "INVALID_FORMAT",
"field": "phone",
"provided_value": "+1-555-123-4567",
"expected_format": "E.164 international format",
"suggestions": ["+15551234567"]
}
},
"skip_reasons": {
"address_validation": "Skipped due to phone_format failure (fail_fast enabled)"
}
},
"performance_metrics": {
"rule_timings": {
"email_required": 12,
"phone_required": 8,
"address_required": 15,
"email_format": 45,
"phone_format": 156,
"address_validation": 0
},
"dependency_resolution_ms": 11,
"total_execution_ms": 247
},
"audit_trail": {
"execution_timestamp": "2025-07-27T14:30:52.123Z",
"request_metadata": {
"request_source": "registration_form",
"user_id": "user_12345",
"session_id": "sess_abcdef"
},
"rule_registry_version": "v2.1.3",
"configuration_hash": "sha256:a7b8c9d..."
}
}
FastAPI Implementation
Here’s the complete, production-ready FastAPI implementation:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.security import HTTPBearer
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
from uuid import uuid4
import time
app = FastAPI(
title="Dynamic Rule Engine API",
description="Enterprise-grade rule execution with dependency resolution",
version="1.0.0"
)
security = HTTPBearer()
class RuleExecutionRequest(BaseModel):
"""Request model for dynamic rule execution"""
rule_ids: List[str] = Field(..., description="List of rule IDs to execute")
data: Dict[str, Any] = Field(..., description="Input data for rule execution")
execution_options: Optional[Dict[str, Any]] = Field(
default_factory=dict,
description="Execution configuration options"
)
metadata: Optional[Dict[str, Any]] = Field(
default_factory=dict,
description="Request metadata for audit and tracking"
)
class RuleExecutionResponse(BaseModel):
"""Comprehensive response for rule execution"""
request_id: str
execution_summary: Dict[str, Any]
rule_execution: Dict[str, Any]
validation_details: Dict[str, Any]
performance_metrics: Dict[str, Any]
audit_trail: Dict[str, Any]
@app.post("/api/v1/rules/execute", response_model=RuleExecutionResponse)
async def execute_rules(
request: RuleExecutionRequest,
background_tasks: BackgroundTasks,
token: str = Depends(security)
) -> RuleExecutionResponse:
"""Execute specified rules with full dependency resolution"""
# Generate unique request ID
request_id = f"req_{int(time.time())}_{uuid4().hex[:8]}"
start_time = time.time()
try:
# Authenticate and authorize request
user_context = await authenticate_request(token)
authorized_rules = await get_user_authorized_rules(user_context)
# Validate requested rules are authorized
unauthorized_rules = set(request.rule_ids) - set(authorized_rules)
if unauthorized_rules:
raise HTTPException(
status_code=403,
detail={
"error": "unauthorized_rules",
"unauthorized": list(unauthorized_rules),
"message": "Access denied to specified rules"
}
)
# Load rule registry and resolve dependencies
rule_registry = await load_rule_registry()
resolver = DependencyResolver()
execution_graph = resolver.resolve_execution_graph(
request.rule_ids,
rule_registry
)
# Create execution context
context = ApiExecutionContext(
request_id=request_id,
execution_timestamp=datetime.utcnow(),
requested_rules=request.rule_ids,
resolved_rules=execution_graph.resolved,
execution_order=execution_graph.execution_order,
input_data=request.data,
execution_options=ExecutionOptions.from_dict(request.execution_options)
)
# Execute rules with monitoring
executor = ApiRuleExecutor()
execution_result = await executor.execute_with_context(context, rule_registry)
# Build comprehensive response
response = RuleExecutionResponse(
request_id=request_id,
execution_summary=execution_result.summary,
rule_execution=execution_result.rule_details,
validation_details=execution_result.validation_details,
performance_metrics=execution_result.performance_metrics,
audit_trail=execution_result.audit_trail
)
# Schedule background audit logging
background_tasks.add_task(
log_rule_execution_audit,
request_id,
user_context,
request,
response
)
return response
except Exception as e:
# Comprehensive error handling
error_response = await handle_execution_error(
request_id,
request,
e,
start_time
)
raise HTTPException(status_code=500, detail=error_response)
Implementation: The Dependency Resolution Engine
The heart of dynamic rule execution lies in sophisticated dependency resolution that handles complex scenarios while maintaining performance:
Advanced Dependency Resolution
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
from collections import defaultdict, deque
from typing import Set, List, Dict
from dataclasses import dataclass
@dataclass
class ExecutionGraph:
"""Complete execution plan for rule dependencies"""
requested: List[str]
resolved: List[str]
execution_order: List[str]
dependency_map: Dict[str, List[str]]
conditional_dependencies: Dict[str, Dict[str, str]]
class AdvancedDependencyResolver:
"""Production-grade dependency resolver with conditional logic support"""
def __init__(self):
self.resolution_cache = {} # Cache for performance optimization
def resolve_execution_order(self, requested_rules: List[str],
all_rules: Dict[str, Rule]) -> List[str]:
"""
Resolve complete execution order with transitive dependency closure
This method implements a sophisticated algorithm that:
1. Discovers all transitive dependencies
2. Handles conditional dependencies
3. Performs topological sorting
4. Detects and reports circular dependencies
5. Optimizes for repeated requests via caching
"""
# Check cache for performance optimization
cache_key = tuple(sorted(requested_rules))
if cache_key in self.resolution_cache:
return self.resolution_cache[cache_key]
# Phase 1: Transitive dependency discovery
required_rules = set()
self._discover_transitive_dependencies(
requested_rules,
all_rules,
required_rules
)
# Phase 2: Build dependency graph
graph = defaultdict(list)
in_degree = defaultdict(int)
self._build_dependency_graph(
required_rules,
all_rules,
graph,
in_degree
)
# Phase 3: Topological sort with cycle detection
execution_order = self._topological_sort_with_validation(
required_rules,
graph,
in_degree
)
# Cache result for future requests
self.resolution_cache[cache_key] = execution_order
return execution_order
def _discover_transitive_dependencies(self, rule_ids: List[str],
registry: Dict[str, Rule],
discovered: Set[str]):
"""Recursively discover all rule dependencies"""
for rule_id in rule_ids:
if rule_id in discovered:
continue
if rule_id not in registry:
raise UnknownRuleError(
f"Rule '{rule_id}' not found in registry",
available_rules=list(registry.keys())
)
discovered.add(rule_id)
rule = registry[rule_id]
# Collect all dependency types
dependencies = []
# Direct dependencies
if hasattr(rule, 'dependencies'):
dependencies.extend(rule.dependencies)
# Conditional dependencies (from our conditional execution article)
if hasattr(rule, 'conditional_dependencies'):
dependencies.extend(rule.conditional_dependencies.keys())
# Cross-stage dependencies (from our configuration article)
if hasattr(rule, 'cross_stage_dependencies'):
dependencies.extend(rule.cross_stage_dependencies)
# Recurse for transitive closure
if dependencies:
self._discover_transitive_dependencies(
dependencies,
registry,
discovered
)
def _build_dependency_graph(self, required_rules: Set[str],
registry: Dict[str, Rule],
graph: Dict[str, List[str]],
in_degree: Dict[str, int]):
"""Build directed acyclic graph for topological sorting"""
# Initialize in-degree for all rules
for rule_id in required_rules:
in_degree[rule_id] = 0
# Build edges and calculate in-degrees
for rule_id in required_rules:
rule = registry[rule_id]
dependencies = []
# Collect all dependencies
if hasattr(rule, 'dependencies'):
dependencies.extend(rule.dependencies)
if hasattr(rule, 'conditional_dependencies'):
dependencies.extend(rule.conditional_dependencies.keys())
if hasattr(rule, 'cross_stage_dependencies'):
dependencies.extend(rule.cross_stage_dependencies)
for dep in dependencies:
if dep in required_rules: # Only include resolved dependencies
graph[dep].append(rule_id)
in_degree[rule_id] += 1
def _topological_sort_with_validation(self, required_rules: Set[str],
graph: Dict[str, List[str]],
in_degree: Dict[str, int]) -> List[str]:
"""Perform topological sort with comprehensive validation"""
# Initialize queue with rules having no dependencies
queue = deque([
rule_id for rule_id in in_degree
if in_degree[rule_id] == 0
])
execution_order = []
processed_count = 0
while queue:
current_rule = queue.popleft()
# Only include rules that were actually requested (transitively)
if current_rule in required_rules:
execution_order.append(current_rule)
processed_count += 1
# Update dependent rules
for dependent_rule in graph[current_rule]:
in_degree[dependent_rule] -= 1
if in_degree[dependent_rule] == 0:
queue.append(dependent_rule)
# Detect circular dependencies
if processed_count ExecutionGraph:
"""Build complete execution graph with metadata"""
execution_order = self.resolve_execution_order(requested_rules, registry)
resolved_rules = list(set(execution_order))
# Build dependency map for response
dependency_map = {}
conditional_deps = {}
for rule_id in resolved_rules:
rule = registry[rule_id]
dependency_map[rule_id] = getattr(rule, 'dependencies', [])
if hasattr(rule, 'conditional_dependencies'):
conditional_deps[rule_id] = rule.conditional_dependencies
return ExecutionGraph(
requested=requested_rules,
resolved=resolved_rules,
execution_order=execution_order,
dependency_map=dependency_map,
conditional_dependencies=conditional_deps
)
# Custom exceptions for better error handling
class UnknownRuleError(Exception):
"""Raised when requested rule is not found in registry"""
def __init__(self, message: str, available_rules: List[str] = None):
super().__init__(message)
self.available_rules = available_rules or []
class CircularDependencyError(Exception):
"""Raised when circular dependencies are detected"""
def __init__(self, message: str, affected_rules: List[str] = None):
super().__init__(message)
self.affected_rules = affected_rules or []
API Rule Executor
The executor manages rule execution within the API context while preserving all safety guarantees:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class ApiRuleExecutor:
"""Specialized executor for API-driven rule execution"""
def __init__(self):
self.metrics_collector = ExecutionMetricsCollector()
async def execute_with_context(self, context: ApiExecutionContext,
rule_registry: Dict[str, Rule]) -> ExecutionResult:
"""Execute rules within API context with comprehensive monitoring"""
execution_start = time.time()
results = {}
messages = {}
failure_details = {}
skip_reasons = {}
rule_timings = {}
try:
for rule_id in context.execution_order:
rule_start = time.time()
rule = rule_registry[rule_id]
# Check if rule can execute based on dependencies
if self._can_execute_rule(rule, results, context):
try:
# Execute with timeout if specified
if context.execution_options.timeout_seconds:
result = await self._execute_with_timeout(
rule,
context.input_data,
context.execution_options.timeout_seconds
)
else:
result = rule.execute(context.input_data)
# Process result
results[rule_id] = result.result.value
if result.message:
messages[rule_id] = result.message
if (result.result.value == "fail" and
hasattr(result, 'failure_data') and
result.failure_data):
failure_details[rule_id] = result.failure_data
# Handle fail-fast behavior
if (context.execution_options.fail_fast and
result.result.value == "fail"):
# Mark remaining rules as skipped
remaining_rules = context.execution_order[
context.execution_order.index(rule_id) + 1:
]
for remaining_rule in remaining_rules:
results[remaining_rule] = "skip"
skip_reasons[remaining_rule] = f"Skipped due to {rule_id} failure (fail_fast enabled)"
break
except Exception as e:
results[rule_id] = "fail"
messages[rule_id] = f"Execution error: {str(e)}"
if context.execution_options.collect_failure_details:
failure_details[rule_id] = {
"error": "execution_exception",
"exception_type": type(e).__name__,
"exception_message": str(e)
}
else:
results[rule_id] = "skip"
skip_reasons[rule_id] = "Dependencies not satisfied"
# Record timing
rule_timings[rule_id] = int((time.time() - rule_start) * 1000)
# Build execution result
total_time = int((time.time() - execution_start) * 1000)
return ExecutionResult(
success=all(r in ["pass", "skip"] for r in results.values()),
summary={
"success": all(r in ["pass", "skip"] for r in results.values()),
"total_rules_executed": len([r for r in results.values() if r != "skip"]),
"requested_rules_count": len(context.requested_rules),
"resolved_rules_count": len(context.resolved_rules),
"execution_time_ms": total_time
},
rule_details={
"requested": context.requested_rules,
"resolved": context.resolved_rules,
"execution_order": context.execution_order,
"results": results
},
validation_details={
"messages": messages,
"failure_details": failure_details,
"skip_reasons": skip_reasons
},
performance_metrics={
"rule_timings": rule_timings,
"dependency_resolution_ms": 0, # Would be calculated
"total_execution_ms": total_time
},
audit_trail={
"execution_timestamp": context.execution_timestamp.isoformat(),
"request_metadata": context.execution_options.custom_metadata,
"rule_registry_version": "v2.1.3", # Would be dynamic
"configuration_hash": "sha256:..." # Would be calculated
}
)
except Exception as e:
# Handle unexpected execution errors
return self._create_error_result(context, e, execution_start)
Advanced Features: Rule Grouping and Batch Operations
Production systems need capabilities beyond individual rule execution. Let’s implement rule grouping and batch processing:
Rule Group Management
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class RuleGroupManager:
"""Manages logical groupings of rules for easier API consumption"""
def __init__(self):
self.groups = {}
self.group_metadata = {}
def register_group(self, group_name: str, rule_ids: List[str],
description: str = "", metadata: Dict[str, Any] = None):
"""Register a named group of rules"""
self.groups[group_name] = rule_ids
self.group_metadata[group_name] = {
"description": description,
"rule_count": len(rule_ids),
"created_at": datetime.utcnow().isoformat(),
"metadata": metadata or {}
}
def resolve_group_rules(self, group_names: List[str]) -> List[str]:
"""Resolve group names to individual rule IDs"""
rule_ids = []
for group_name in group_names:
if group_name not in self.groups:
raise UnknownRuleGroupError(
f"Rule group '{group_name}' not found",
available_groups=list(self.groups.keys())
)
rule_ids.extend(self.groups[group_name])
# Remove duplicates while preserving order
return list(dict.fromkeys(rule_ids))
# Enhanced API endpoint for group execution
@app.post("/api/v1/rules/execute-groups")
async def execute_rule_groups(
request: RuleGroupExecutionRequest,
background_tasks: BackgroundTasks,
token: str = Depends(security)
) -> RuleExecutionResponse:
"""Execute predefined rule groups"""
group_manager = await get_rule_group_manager()
# Resolve groups to individual rules
resolved_rule_ids = group_manager.resolve_group_rules(request.group_names)
# Create standard execution request
execution_request = RuleExecutionRequest(
rule_ids=resolved_rule_ids,
data=request.data,
execution_options=request.execution_options,
metadata={
**request.metadata,
"execution_type": "group_execution",
"requested_groups": request.group_names
}
)
# Delegate to standard execution logic
return await execute_rules(execution_request, background_tasks, token)
Batch Processing Capabilities
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class BatchRuleExecutor:
"""Handles batch execution of rules across multiple datasets"""
def __init__(self):
self.max_batch_size = 100
self.parallel_limit = 10
async def execute_batch(self, batch_request: BatchExecutionRequest) -> BatchExecutionResponse:
"""Execute rules across multiple datasets efficiently"""
if len(batch_request.datasets) > self.max_batch_size:
raise HTTPException(
status_code=400,
detail=f"Batch size exceeds maximum of {self.max_batch_size}"
)
# Process datasets in parallel with concurrency limit
semaphore = asyncio.Semaphore(self.parallel_limit)
tasks = []
for i, dataset in enumerate(batch_request.datasets):
task = self._execute_single_dataset(
semaphore,
batch_request.rule_ids,
dataset,
batch_request.execution_options,
f"batch_item_{i}"
)
tasks.append(task)
# Wait for all executions to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results and build response
successful_results = []
failed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failed_results.append({
"dataset_index": i,
"error": str(result),
"error_type": type(result).__name__
})
else:
successful_results.append({
"dataset_index": i,
"result": result
})
return BatchExecutionResponse(
batch_id=f"batch_{int(time.time())}_{uuid4().hex[:8]}",
total_datasets=len(batch_request.datasets),
successful_count=len(successful_results),
failed_count=len(failed_results),
results=successful_results,
failures=failed_results
)
# Batch execution endpoint
@app.post("/api/v1/rules/execute-batch")
async def execute_batch_rules(
request: BatchExecutionRequest,
background_tasks: BackgroundTasks,
token: str = Depends(security)
) -> BatchExecutionResponse:
"""Execute rules across multiple datasets"""
user_context = await authenticate_request(token)
batch_executor = BatchRuleExecutor()
result = await batch_executor.execute_batch(request)
# Log batch execution for monitoring
background_tasks.add_task(
log_batch_execution,
result.batch_id,
user_context,
request,
result
)
return result
Security and Authorization: Enterprise-Grade Access Control
Production APIs require comprehensive security measures to protect business logic and sensitive data:
Role-Based Rule Access Control
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from enum import Enum
from typing import Set
class RuleAccessLevel(Enum):
PUBLIC = "public" # Available to all authenticated users
INTERNAL = "internal" # Available to internal users only
RESTRICTED = "restricted" # Requires special permissions
ADMIN = "admin" # Admin-only rules
class RuleSecurityManager:
"""Manages rule-level security and access control"""
def __init__(self):
self.rule_permissions = {}
self.user_roles = {}
self.role_permissions = {
"guest": {RuleAccessLevel.PUBLIC},
"user": {RuleAccessLevel.PUBLIC, RuleAccessLevel.INTERNAL},
"admin": {RuleAccessLevel.PUBLIC, RuleAccessLevel.INTERNAL,
RuleAccessLevel.RESTRICTED, RuleAccessLevel.ADMIN}
}
def register_rule_security(self, rule_id: str, access_level: RuleAccessLevel,
required_permissions: Set[str] = None):
"""Register security settings for a rule"""
self.rule_permissions[rule_id] = {
"access_level": access_level,
"required_permissions": required_permissions or set()
}
def check_rule_access(self, user_context: UserContext,
rule_ids: List[str]) -> Dict[str, bool]:
"""Check if user has access to specified rules"""
access_results = {}
user_role = self.user_roles.get(user_context.user_id, "guest")
user_permissions = self.role_permissions.get(user_role, set())
for rule_id in rule_ids:
if rule_id not in self.rule_permissions:
# Default to internal access level
access_results[rule_id] = RuleAccessLevel.INTERNAL in user_permissions
continue
rule_security = self.rule_permissions[rule_id]
access_level = rule_security["access_level"]
required_perms = rule_security["required_permissions"]
# Check access level
has_level_access = access_level in user_permissions
# Check specific permissions
has_specific_perms = required_perms.issubset(user_context.permissions)
access_results[rule_id] = has_level_access and has_specific_perms
return access_results
def filter_authorized_rules(self, user_context: UserContext,
rule_ids: List[str]) -> List[str]:
"""Filter rule list to only include authorized rules"""
access_results = self.check_rule_access(user_context, rule_ids)
return [rule_id for rule_id, has_access in access_results.items()
if has_access]
# Enhanced authentication and authorization
async def authenticate_request(token: str) -> UserContext:
"""Authenticate API request and return user context"""
try:
# Decode and validate JWT token
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token")
# Load user context from database/cache
user_context = await load_user_context(user_id)
if not user_context:
raise HTTPException(status_code=401, detail="User not found")
return user_context
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
async def get_user_authorized_rules(user_context: UserContext) -> List[str]:
"""Get list of rules user is authorized to execute"""
security_manager = await get_security_manager()
rule_registry = await load_rule_registry()
all_rule_ids = list(rule_registry.keys())
authorized_rules = security_manager.filter_authorized_rules(
user_context,
all_rule_ids
)
return authorized_rules
API Rate Limiting and Quotas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from collections import defaultdict
import asyncio
class RateLimiter:
"""Token bucket rate limiter for API endpoints"""
def __init__(self):
self.buckets = defaultdict(lambda: {
"tokens": 100, # Default tokens
"last_refill": time.time()
})
self.user_limits = {} # user_id -> (requests_per_minute, burst_limit)
async def check_rate_limit(self, user_id: str) -> bool:
"""Check if request is within rate limits"""
limits = self.user_limits.get(user_id, (60, 100)) # Default: 60 req/min, burst 100
requests_per_minute, burst_limit = limits
bucket = self.buckets[user_id]
current_time = time.time()
# Refill tokens based on elapsed time
time_elapsed = current_time - bucket["last_refill"]
tokens_to_add = time_elapsed * (requests_per_minute / 60.0)
bucket["tokens"] = min(burst_limit, bucket["tokens"] + tokens_to_add)
bucket["last_refill"] = current_time
# Check if request can be processed
if bucket["tokens"] >= 1:
bucket["tokens"] -= 1
return True
return False
# Rate limiting middleware
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
"""Apply rate limiting to API requests"""
if request.url.path.startswith("/api/v1/rules"):
# Extract user ID from token
auth_header = request.headers.get("authorization")
if auth_header:
try:
token = auth_header.split(" ")[1]
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
user_id = payload.get("sub")
if user_id:
rate_limiter = get_rate_limiter()
if not await rate_limiter.check_rate_limit(user_id):
return JSONResponse(
status_code=429,
content={
"error": "rate_limit_exceeded",
"message": "Too many requests. Please try again later."
}
)
except:
pass # Continue without rate limiting if token invalid
response = await call_next(request)
return response
Real-World Implementation: Multi-Tenant Rule Execution
Let’s implement a complete multi-tenant system that demonstrates the API’s power in production scenarios:
Multi-Tenant Architecture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class TenantRuleManager:
"""Manages rule configurations across multiple tenants"""
def __init__(self):
self.tenant_configs = {}
self.tenant_rule_registries = {}
async def load_tenant_configuration(self, tenant_id: str) -> Dict[str, Any]:
"""Load tenant-specific rule configuration"""
if tenant_id not in self.tenant_configs:
# Load from database or configuration service
config = await load_tenant_config_from_storage(tenant_id)
self.tenant_configs[tenant_id] = config
return self.tenant_configs[tenant_id]
async def get_tenant_rule_registry(self, tenant_id: str) -> Dict[str, Rule]:
"""Get tenant-specific rule registry"""
if tenant_id not in self.tenant_rule_registries:
config = await self.load_tenant_configuration(tenant_id)
registry = await build_rule_registry_from_config(config)
self.tenant_rule_registries[tenant_id] = registry
return self.tenant_rule_registries[tenant_id]
# Tenant-aware execution endpoint
@app.post("/api/v1/tenants/{tenant_id}/rules/execute")
async def execute_tenant_rules(
tenant_id: str,
request: RuleExecutionRequest,
background_tasks: BackgroundTasks,
token: str = Depends(security)
) -> RuleExecutionResponse:
"""Execute rules within a specific tenant context"""
# Authenticate and authorize tenant access
user_context = await authenticate_request(token)
await verify_tenant_access(user_context, tenant_id)
# Load tenant-specific rule registry
tenant_manager = get_tenant_manager()
rule_registry = await tenant_manager.get_tenant_rule_registry(tenant_id)
# Execute with tenant context
request_id = f"tenant_{tenant_id}_req_{int(time.time())}_{uuid4().hex[:8]}"
resolver = AdvancedDependencyResolver()
execution_graph = resolver.build_execution_graph(
request.rule_ids,
rule_registry
)
context = ApiExecutionContext(
request_id=request_id,
execution_timestamp=datetime.utcnow(),
requested_rules=request.rule_ids,
resolved_rules=execution_graph.resolved,
execution_order=execution_graph.execution_order,
input_data=request.data,
execution_options=ExecutionOptions.from_dict(request.execution_options)
)
# Add tenant metadata
context.execution_metadata["tenant_id"] = tenant_id
context.execution_metadata["tenant_config_version"] = "v1.2.0" # Dynamic
executor = ApiRuleExecutor()
result = await executor.execute_with_context(context, rule_registry)
# Build tenant-aware response
response = RuleExecutionResponse(
request_id=request_id,
execution_summary=result.summary,
rule_execution=result.rule_details,
validation_details=result.validation_details,
performance_metrics=result.performance_metrics,
audit_trail={
**result.audit_trail,
"tenant_id": tenant_id,
"tenant_context": True
}
)
# Schedule tenant-specific audit logging
background_tasks.add_task(
log_tenant_rule_execution,
tenant_id,
user_context,
request,
response
)
return response
Real-World Usage Examples
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# Example 1: E-commerce Product Validation
class EcommerceRuleConfiguration:
"""Real-world configuration for e-commerce product validation"""
@staticmethod
def get_product_validation_groups():
return {
"basic_info": [
"product_name_required",
"product_description_required",
"product_category_valid",
"price_format_valid"
],
"inventory": [
"sku_unique",
"inventory_positive",
"supplier_valid"
],
"compliance": [
"tax_code_valid",
"shipping_restrictions",
"regulatory_compliance"
],
"seo_optimization": [
"meta_title_length",
"meta_description_length",
"image_alt_text_present"
]
}
# Example 2: Financial Services KYC
class FinancialServicesRules:
"""Real-world financial services rule configuration"""
@staticmethod
def get_kyc_validation_pipeline():
return {
"identity_verification": [
"full_name_required",
"date_of_birth_valid",
"government_id_present",
"address_verification"
],
"risk_assessment": [
"politically_exposed_person_check",
"sanctions_list_screening",
"adverse_media_screening"
],
"compliance": [
"age_verification",
"jurisdiction_compliance",
"source_of_funds_verification"
]
}
# Example 3: Healthcare Data Validation
class HealthcareRuleConfiguration:
"""HIPAA-compliant healthcare data validation rules"""
@staticmethod
def get_patient_data_validation():
return {
"patient_identity": [
"patient_id_format",
"medical_record_number_valid",
"insurance_number_format"
],
"hipaa_compliance": [
"phi_encryption_verified",
"access_audit_trail",
"minimum_necessary_standard"
],
"clinical_data": [
"diagnosis_code_valid",
"medication_interaction_check",
"allergy_contraindication_check"
]
}
Performance Optimization: Caching and Parallel Execution
Production APIs require sophisticated performance optimization to handle enterprise scale:
Intelligent Caching System
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from typing import Optional
import hashlib
import pickle
class RuleExecutionCache:
"""Multi-level caching for rule execution optimization"""
def __init__(self):
self.dependency_cache = {} # Cache dependency resolution
self.result_cache = {} # Cache rule execution results
self.config_cache = {} # Cache rule configurations
self.cache_stats = {
"hits": 0,
"misses": 0,
"invalidations": 0
}
def get_cache_key(self, rule_ids: List[str], input_data: Dict[str, Any],
options: ExecutionOptions) -> str:
"""Generate deterministic cache key for execution request"""
# Create normalized representation
cache_input = {
"rules": sorted(rule_ids),
"data": self._normalize_dict(input_data),
"options": {
"fail_fast": options.fail_fast,
"parallel_execution": options.parallel_execution,
"timeout_seconds": options.timeout_seconds
}
}
# Generate hash
cache_str = json.dumps(cache_input, sort_keys=True)
return hashlib.sha256(cache_str.encode()).hexdigest()[:16]
def get_cached_execution(self, cache_key: str) -> Optional[ExecutionResult]:
"""Retrieve cached execution result if available"""
if cache_key in self.result_cache:
cached_result, timestamp = self.result_cache[cache_key]
# Check if cache entry is still valid (e.g., within 5 minutes)
if time.time() - timestamp cache mapping
keys_to_remove.append(cache_key)
for key in keys_to_remove:
if key in self.result_cache:
del self.result_cache[key]
self.cache_stats["invalidations"] += 1
class ParallelRuleExecutor:
"""Executes independent rules in parallel for improved performance"""
def __init__(self, max_workers: int = 10):
self.max_workers = max_workers
self.execution_semaphore = asyncio.Semaphore(max_workers)
async def execute_parallel_rules(self, rules: List[Rule],
input_data: Dict[str, Any],
dependency_graph: Dict[str, List[str]]) -> Dict[str, RuleResult]:
"""Execute rules in parallel while respecting dependencies"""
results = {}
pending_rules = set(rule.rule_id for rule in rules)
executing_rules = set()
while pending_rules or executing_rules:
# Find rules that can execute now
ready_rules = []
for rule in rules:
if (rule.rule_id in pending_rules and
self._dependencies_satisfied(rule, results)):
ready_rules.append(rule)
# Start execution for ready rules
tasks = []
for rule in ready_rules:
if rule.rule_id not in executing_rules:
task = self._execute_rule_with_semaphore(rule, input_data)
tasks.append((rule.rule_id, task))
executing_rules.add(rule.rule_id)
pending_rules.remove(rule.rule_id)
if not tasks:
# No rules ready and none executing - possible circular dependency
if not executing_rules:
raise CircularDependencyError(
f"Cannot proceed with rules: {pending_rules}"
)
# Wait for at least one executing rule to complete
await asyncio.sleep(0.01)
continue
# Wait for any rule to complete
done, pending = await asyncio.wait(
[task for _, task in tasks],
return_when=asyncio.FIRST_COMPLETED
)
# Process completed rules
for task in done:
# Find which rule completed
for rule_id, rule_task in tasks:
if rule_task == task:
try:
result = await task
results[rule_id] = result
except Exception as e:
results[rule_id] = RuleResult.FAIL
finally:
executing_rules.remove(rule_id)
break
return results
async def _execute_rule_with_semaphore(self, rule: Rule,
input_data: Dict[str, Any]) -> RuleResult:
"""Execute single rule with concurrency control"""
async with self.execution_semaphore:
# Convert synchronous rule execution to async
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, rule.execute, input_data)
Performance Monitoring and Optimization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class PerformanceMonitor:
"""Monitors and optimizes rule execution performance"""
def __init__(self):
self.execution_metrics = []
self.slow_rules = defaultdict(list)
self.performance_alerts = []
def record_execution(self, context: ApiExecutionContext,
result: ExecutionResult):
"""Record execution metrics for analysis"""
metrics = {
"timestamp": context.execution_timestamp,
"request_id": context.request_id,
"total_rules": len(context.resolved_rules),
"execution_time_ms": result.performance_metrics["total_execution_ms"],
"success": result.success,
"rule_timings": result.performance_metrics["rule_timings"]
}
self.execution_metrics.append(metrics)
# Identify slow rules
for rule_id, timing in metrics["rule_timings"].items():
if timing > 1000: # Rules taking more than 1 second
self.slow_rules[rule_id].append({
"request_id": context.request_id,
"timing_ms": timing,
"timestamp": context.execution_timestamp
})
def get_performance_insights(self) -> Dict[str, Any]:
"""Generate performance insights and recommendations"""
if not self.execution_metrics:
return {"message": "No performance data available"}
# Calculate averages
avg_execution_time = sum(
m["execution_time_ms"] for m in self.execution_metrics
) / len(self.execution_metrics)
success_rate = sum(
1 for m in self.execution_metrics if m["success"]
) / len(self.execution_metrics)
# Identify optimization opportunities
recommendations = []
if avg_execution_time > 500:
recommendations.append(
"Consider enabling parallel execution for independent rules"
)
if self.slow_rules:
recommendations.append(
f"Optimize slow rules: {list(self.slow_rules.keys())}"
)
if success_rate Dict[str, Any]:
"""Comprehensive health check"""
health_data = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0.0",
"dependencies": {}
}
# Check rule registry
try:
registry = await load_rule_registry()
health_data["dependencies"]["rule_registry"] = {
"status": "healthy",
"rule_count": len(registry)
}
except Exception as e:
health_data["dependencies"]["rule_registry"] = {
"status": "unhealthy",
"error": str(e)
}
health_data["status"] = "degraded"
# Check database connectivity
try:
await check_database_connection()
health_data["dependencies"]["database"] = {"status": "healthy"}
except Exception as e:
health_data["dependencies"]["database"] = {
"status": "unhealthy",
"error": str(e)
}
health_data["status"] = "degraded"
# Check cache connectivity
try:
await check_cache_connection()
health_data["dependencies"]["cache"] = {"status": "healthy"}
except Exception as e:
health_data["dependencies"]["cache"] = {
"status": "unhealthy",
"error": str(e)
}
# Cache is optional - don't mark as degraded
self.last_health_check = time.time()
self.health_status = health_data["status"]
return health_data
# Health check endpoints
@app.get("/health")
async def health_check():
"""Basic health check endpoint"""
health_manager = get_health_manager()
health_data = await health_manager.check_health()
status_code = 200
if health_data["status"] == "degraded":
status_code = 503
elif health_data["status"] == "unhealthy":
status_code = 503
return JSONResponse(content=health_data, status_code=status_code)
@app.get("/metrics")
async def get_metrics():
"""Prometheus metrics endpoint"""
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)
Advanced Alerting and Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class AlertManager:
"""Manages alerts for rule engine API"""
def __init__(self):
self.alert_thresholds = {
"error_rate": 0.05, # 5% error rate
"avg_response_time": 2000, # 2 seconds
"slow_rules": 5000, # 5 seconds
"dependency_failures": 3 # 3 consecutive failures
}
self.alert_history = []
async def check_alerts(self, metrics: Dict[str, Any]):
"""Check metrics against alert thresholds"""
alerts_triggered = []
# Check error rate
if "success_rate" in metrics:
error_rate = 1 - metrics["success_rate"]
if error_rate > self.alert_thresholds["error_rate"]:
alerts_triggered.append({
"type": "high_error_rate",
"severity": "warning",
"message": f"Error rate {error_rate:.1%} exceeds threshold {self.alert_thresholds['error_rate']:.1%}",
"metric_value": error_rate,
"threshold": self.alert_thresholds["error_rate"]
})
# Check response time
if "average_execution_time_ms" in metrics:
avg_time = metrics["average_execution_time_ms"]
if avg_time > self.alert_thresholds["avg_response_time"]:
alerts_triggered.append({
"type": "slow_response_time",
"severity": "warning",
"message": f"Average response time {avg_time}ms exceeds threshold {self.alert_thresholds['avg_response_time']}ms",
"metric_value": avg_time,
"threshold": self.alert_thresholds["avg_response_time"]
})
# Check for slow rules
if "slow_rules" in metrics:
for rule_id, timings in metrics["slow_rules"].items():
if timings and max(t["timing_ms"] for t in timings) > self.alert_thresholds["slow_rules"]:
alerts_triggered.append({
"type": "slow_rule",
"severity": "info",
"message": f"Rule {rule_id} consistently slow",
"rule_id": rule_id,
"recent_timings": [t["timing_ms"] for t in timings[-5:]]
})
# Send alerts if any triggered
for alert in alerts_triggered:
await self.send_alert(alert)
self.alert_history.append({
**alert,
"timestamp": datetime.utcnow().isoformat()
})
async def send_alert(self, alert: Dict[str, Any]):
"""Send alert to configured channels"""
# Implementation would integrate with Slack, PagerDuty, etc.
print(f"ALERT: {alert['message']}")
Conclusion: From Static Pipelines to Dynamic Platforms
The transformation from static rule pipelines to dynamic, API-driven execution represents a fundamental shift in how we architect business logic systems. Through this journey, we’ve evolved our rule engine from a monolithic processor into a flexible, observable, and enterprise-ready platform.
Architectural Evolution Summary
From Rigid to Flexible: Traditional rule engines forced all-or-nothing execution. Our API-driven approach enables surgical rule execution while maintaining dependency safety through automatic resolution and topological sorting.
From Hidden to Observable: Static pipelines provided limited visibility into business logic execution. Our comprehensive monitoring, structured logging, and performance tracking transform rule engines into fully observable systems.
From Monolithic to Composable: Previous architectures coupled rule definition with execution strategy. Our separation of concerns enables rule reuse across different execution contexts—full pipelines, API requests, batch processing, and debugging workflows.
From Internal to Integrated: Traditional rule engines served single applications. Our RESTful API design enables rule logic to become a shared organizational asset, accessible to UIs, external systems, and operational tools.
Production Benefits Realized
Operational Agility: Operations teams can execute specific rules for troubleshooting without running entire pipelines, reducing incident response time and system impact.
Development Velocity: UI teams can directly consume validation logic through clean APIs, eliminating duplication and ensuring consistency across all touchpoints.
Business Flexibility: Product teams can test new business rules through targeted API calls before committing to full pipeline integration, enabling safer experimentation.
Compliance Excellence: Complete audit trails, granular access control, and dependency tracking provide the transparency required for regulatory compliance.
Scalability Architecture: Multi-tenancy, caching, parallel execution, and performance monitoring ensure the platform scales with organizational growth.
Key Technical Innovations
Recursive Dependency Resolution: Our algorithm automatically discovers transitive dependencies, ensuring users can request any rule subset without understanding internal relationships.
Smart Execution Control: Context-aware execution preserves all safety guarantees while enabling unprecedented flexibility in how rules are triggered and orchestrated.
Enterprise Security: Role-based access control, rate limiting, and audit logging provide production-grade security without sacrificing usability.
Performance Optimization: Intelligent caching, parallel execution, and comprehensive monitoring ensure enterprise-scale performance characteristics.
Future Evolution Pathways
The architecture we’ve built enables several advanced capabilities:
Machine Learning Integration: Rule parameters can be automatically tuned based on execution history and business outcomes, creating self-optimizing validation systems.
Event-Driven Rule Execution: Rules can be triggered by business events in real-time, enabling reactive validation and business process automation.
Rule Composition APIs: Higher-level APIs can compose multiple rule execution requests into complex business workflows, creating a rule orchestration platform.
Federated Rule Registries: Organizations can share and consume rule libraries across teams and business units, creating enterprise-wide business logic assets.
Predictive Rule Analytics: Analysis of rule execution patterns can predict business outcomes and recommend optimization strategies.
Implementation Recommendations
For organizations adopting this pattern:
Start with High-Value Use Cases: Begin with validation scenarios that are frequently requested in isolation—form validation, data quality checks, or compliance verification.
Invest in Observability Early: Comprehensive monitoring and alerting are essential for production success. Design observability alongside functional requirements.
Plan for Multi-Tenancy: Even single-tenant organizations benefit from tenant-like isolation for different environments, business units, or customer segments.
Prioritize Security: Business rules often encode sensitive business logic and process sensitive data. Implement comprehensive security measures from the beginning.
Build Progressive Enhancement: Start with basic API functionality and add advanced features like caching, parallel execution, and batch processing as demand grows.
The Broader Impact
This architectural pattern extends beyond validation systems. The same principles apply to:
Business Process Automation: API-driven execution of business workflow steps with dependency resolution and audit trails.
Configuration Management: Dynamic, dependency-aware configuration validation and deployment across complex enterprise systems.
Data Pipeline Orchestration: Selective execution of data processing steps based on business requirements and operational constraints.
Microservice Coordination: API-driven orchestration of microservice interactions with comprehensive monitoring and error handling.
The investment in dynamic rule execution architecture pays dividends across your entire technology landscape, enabling agility, reliability, and operational excellence at enterprise scale.
Building dynamic rule execution APIs represents the maturation of business logic architecture—transforming scattered validation code into strategic organizational assets. The journey from chaos to control culminates in platforms that empower every stakeholder: developers build faster, operations teams troubleshoot effectively, product teams experiment safely, and business users access logic directly.
Ready to transform your business logic into a dynamic platform? Start with the dependency resolution algorithm, implement basic API endpoints, and evolve incrementally toward the comprehensive solution. The modular approach demonstrated here works in production environments and provides immediate value while building toward enterprise-grade capabilities.
This concludes our series on enterprise rule engine architecture, demonstrating how thoughtful application of software engineering principles transforms business-critical systems from maintenance burdens into competitive advantages.
[1] https://ppl-ai-file-upload.s3.amazonaws.com/web/direct-files/attachments/87122568/5b81f412-2767-43be-8b67-07cf7338bc4a/rule-engine2-conditional.md [2] https://ppl-ai-file-upload.s3.amazonaws.com/web/direct-files/attachments/87122568/58c8be8e-72e8-42c8-8984-28f3e74e38df/rule-engine2-configuration.md [3] https://ppl-ai-file-upload.s3.amazonaws.com/web/direct-files/attachments/87122568/f7d8b19f-69bd-4995-b6d7-d3c5bee59140/rule-engine.md