Table of Contents
- Introduction to Apache Camel
- Understanding Camel Routes
- Essential Integration Patterns
- Real-World Integration Examples
- Advanced System Management
- Health Check
Introduction
Apache Camel is like a universal translator and message router for different software systems. Think of it as a postal service for your applications - it picks up messages from one place, maybe transforms them, and delivers them to another place.
flowchart LR
subgraph "Sources"
Files[Files in Folder]
REST[Web Service]
Email[Email]
SQueue[Message Queue]
end
subgraph "Apache Camel"
Router[Message Router]
end
subgraph "Destinations"
DB[(Database)]
Queue[Message Queue]
API[Another Web Service]
end
Files --> Router
REST --> Router
Email --> Router
SQueue --> Router
Router --> DB
Router --> Queue
Router --> API
style Router fill:#f96,stroke:#333
Understanding Camel Routes
A Camel route is like a set of instructions telling Camel how to move messages from point A to point B. Here’s a simple example:
1
2
3
4
5
6
7
8
9
10
11
12
13
// This route moves files from an 'orders' folder to a 'processed' folder
from("file:orders") // 1. Watch the 'orders' folder for new files
.log("Found file: ${file:name}") // 2. Log the filename
.to("file:processed"); // 3. Move the file to 'processed' folder
// This route reads files and sends important ones to a different place
from("file:inbox") // 1. Watch the 'inbox' folder
.choice() // 2. Make a decision based on file content
.when(simple("${file:name} contains 'urgent'")) // 3. If filename has 'urgent'
.to("direct:urgent") // 4. Send to urgent processor
.otherwise() // 5. If not urgent
.to("direct:normal"); // 6. Send to normal processor
Let’s break down what this code does:
- The first route is like having someone watch a folder called “orders” for new files, and when they find one, they log its name and move it to a “processed” folder.
- The second route shows how to make decisions - it’s like a mail sorter who looks at each envelope and sends urgent mail to one place and regular mail to another.
Essential Integration Patterns
1. Content-Based Router
Routes messages based on their content:
1
2
3
4
5
6
7
from("file:incoming-orders")
.choice()
.when(simple("${body.orderType} == 'RUSH'"))
.to("direct:rush-orders")
.otherwise()
.to("direct:regular-orders")
.end();
This pattern is like a traffic controller that:
- Examines the content of each message (the orderType)
- Routes rush orders one way
- Routes regular orders another way
2. Filter Pattern
Filters messages based on criteria:
flowchart LR
Input[Input Messages] --> Filter{Contains 'important'?}
Filter -->|Yes| Output[Important Queue]
Filter -->|No| Discard[Discarded]
style Filter fill:#f96,stroke:#333
1
2
3
from("direct:start")
.filter(simple("${body} contains 'important'"))
.to("direct:important-messages");
This pattern works like a sieve that:
- Examines each message
- Only lets through messages matching certain criteria
- Discards or ignores non-matching messages
3. Transformer Pattern
Converts messages between formats:
flowchart LR
Input[JSON Input] --> Transform[Transform Process]
Transform --> Output[XML Output]
subgraph "Transform Process"
M[Marshal] --> U[Unmarshal]
end
style Transform fill:#69f,stroke:#333
1
2
3
4
from("direct:start")
.marshal().json()
.unmarshal().xml()
.to("direct:xml-processor");
This pattern acts like a format converter that:
- Takes input in one format (like JSON)
- Transforms it to another format (like XML)
- Useful when connecting systems that speak different “languages”
4. Dead Letter Channel Pattern
Handles failed messages by moving them to a Dead Letter Queue (DLQ).
flowchart TD
Start[Message] --> Process{Process Message}
Process -->|Success| Success[Success Queue]
Process -->|Failure| Retry{Retry?}
Retry -->|Yes| Process
Retry -->|No| Dead[Dead Letter Queue]
style Process fill:#f69,stroke:#333
style Retry fill:#f69,stroke:#333
1
2
3
4
5
6
7
8
errorHandler(deadLetterChannel("direct:dead-letter")
.maximumRedeliveries(3)
.redeliveryDelay(1000)
.backOffMultiplier(2));
from("direct:start")
.to("direct:possibly-failing-service")
.log("Message processed successfully");
This pattern works like a safety net that:
- Attempts to process a message.
- If processing fails, retries up to 3 times.
- Increases the delay between retries.
- If all retries fail, moves the message to a “dead letter” queue for further investigation.
5. Splitter Pattern
Splits a single message into multiple messages:
1
2
3
from("direct:start")
.split(body().tokenize(","))
.to("direct:process-split-message");
This pattern works like a paper cutter that:
- Takes a single message containing multiple items (e.g., a comma-separated list)
- Splits it into individual messages for each item
- Processes each item separately
6. Aggregator Pattern
Combines multiple messages into a single message:
1
2
3
4
from("direct:start")
.aggregate(header("orderId"), new GroupedExchangeAggregationStrategy())
.completionSize(5)
.to("direct:process-aggregated-message");
This pattern works like a collector that:
- Groups messages based on a common attribute (e.g., orderId)
- Aggregates them into a single message once a condition is met (e.g., 5 messages received)
- Processes the aggregated message
7. Wire Tap Pattern (Logging Implementation)
1
2
3
4
5
6
7
8
from("file:incoming-orders")
.wireTap("direct:audit-log")
.log("Received order: ${file:name}")
.to("direct:process-order");
from("direct:audit-log")
.log("Audit: ${file:name} received at ${date:now:yyyy-MM-dd HH:mm:ss}")
.to("file:audit-logs");
This pattern works like a phone tap that:
- Creates a copy of each message
- Sends the copy to a logging/monitoring system
- Allows the original message to continue its journey
- Useful for audit trails and monitoring
8. Error handler
1
2
3
4
5
6
from("direct:start")
.errorHandler(defaultErrorHandler()
.maximumRedeliveries(3)
.redeliveryDelay(1000))
.to("direct:possibly-failing-service")
.log("Message processed successfully");
This pattern help implement simple error handling,
- Like a persistent delivery person who tries multiple times if nobody answers the door
- If sending a message fails, it will retry up to 3 times
- Between each retry, it waits for 1 second
- If all retries fail, it gives up and reports an error
Real-World Integration Examples
Here’s an example that tries to show different patterns in use.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Order Processing Route
from("file:incoming-orders") // 1. Watch 'incoming-orders' folder for new files
.log("Received order: ${file:name}") // 2. Log when we find a new order
// 3. Convert the file content to JSON
.unmarshal().json()
// 4. Add current timestamp to the order
.setHeader("ProcessingTime", simple("${date:now:yyyy-MM-dd HH:mm:ss}"))
// 5. Make decisions based on order type
.choice()
.when(simple("${body.orderType} == 'RUSH'"))
.log("Rush order detected!")
.to("direct:rush-orders")
.otherwise()
.log("Regular order received")
.to("direct:regular-orders")
.end()
// 6. Move processed file to archive
.to("file:processed-orders");
// Rush Order Handler
from("direct:rush-orders")
.log("Processing rush order")
.to("direct:notify-shipping")
.to("direct:update-inventory");
// Regular Order Handler
from("direct:regular-orders")
.log("Processing regular order")
.delay(simple("${random(1000,5000)}")) // Simulate processing time
.to("direct:update-inventory");
Let’s break down what this order processing system does:
- File Monitoring:
- Watches a folder called “incoming-orders” for new order files
- When a new file appears, it starts processing it
- Initial Processing:
- Logs the filename so we know which order is being processed
- Converts the file content from JSON format (like a digital order form)
- Adding Information:
- Adds a timestamp to track when the order was processed
- Decision Making:
- Checks if it’s a rush order or regular order
- Rush orders get special handling
- Regular orders follow the normal process
- Different Handling:
- Rush orders: Immediately notify shipping and update inventory
- Regular orders: Process with some delay and update inventory
- File Management:
- Moves the processed order file to a different folder for record-keeping
This example shows how Camel can:
- Monitor folders for new files
- Make decisions based on content
- Process different types of orders differently
- Keep track of what’s happening (logging)
- Archive processed files
In previous example, different patterns work together to create a robust system:
- Content-Based Router sorts orders into rush and regular
- Wire Tap logs all activities for monitoring
- Transformer converts file contents into processable formats
- Dead Letter Channel handles any processing failures
- Filter could be used to only process orders meeting certain criteria
These patterns are like building blocks that can be combined in different ways to handle various integration scenarios. They help solve common problems in a standardized way, making the system more maintainable and reliable.
Kafka to ActiveMQ Integration
This example demonstrates consuming messages from Kafka and pushing them to ActiveMQ:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.LoggingLevel;
public class KafkaToMQRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
// Error Handler Configuration
errorHandler(deadLetterChannel("activemq:queue:dead.letter")
.maximumRedeliveries(3)
.redeliveryDelay(1000)
.backOffMultiplier(2)
.useExponentialBackOff()
.logRetryAttempted(true));
// Main route: Kafka → ActiveMQ
from("kafka:orders-topic?" +
"brokers=localhost:9092" +
"&groupId=orders-group" +
"&autoOffsetReset=earliest" +
"&consumersCount=3" +
"&maxPollRecords=100")
// Add correlation ID for tracking
.setHeader("correlationId", simple("${random(1000,9999)}"))
.log(LoggingLevel.INFO, "Received from Kafka: ${body} with key: ${headers[kafka.KEY]}")
.process(exchange -> {
// Example message transformation
String kafkaMessage = exchange.getIn().getBody(String.class);
OrderMessage order = convertToOrderMessage(kafkaMessage);
exchange.getIn().setBody(order);
})
.setHeader("JMSPriority", simple("${headers[kafka.PRIORITY]}"))
// Send to ActiveMQ queue
.to("activemq:queue:orders?" +
"timeToLive=86400000" +
"&deliveryPersistent=true")
.log(LoggingLevel.INFO, "Successfully sent to ActiveMQ: ${header.correlationId}");
// Dead Letter Queue Handler
from("activemq:queue:dead.letter")
.log(LoggingLevel.ERROR,
"Failed to process message: ${body}. Error: ${exception.message}")
.process(handleError)
.to("activemq:queue:failed.orders");
}
private OrderMessage convertToOrderMessage(String kafkaMessage) {
//conversion logic here
return new OrderMessage(/* converted message */);
}
}
@Getter
@Setter
@Builder
class OrderMessage {
private String orderId;
private String customerInfo;
private Double amount;
}
Let’s break down this example in detail:
- Main Components:
- Kafka Consumer: Reads messages from a Kafka topic named “orders-topic”
- Message Transformer: Converts Kafka messages to ActiveMQ format
- ActiveMQ Producer: Sends messages to ActiveMQ queue named “orders”
- Kafka Consumer Configuration:
1 2 3 4 5
"kafka:orders-topic?brokers=localhost:9092" + "&groupId=orders-group" + "&autoOffsetReset=earliest" + "&consumersCount=3" + "&maxPollRecords=100"
brokers
: Kafka server addressgroupId
: Consumer group for load balancingautoOffsetReset
: Start reading positionconsumersCount
: Number of parallel consumersmaxPollRecords
: Batch size for each poll
- Error Handling:
1
errorHandler(deadLetterChannel("activemq:queue:dead.letter")
- Retries failed messages 3 times
- Uses exponential backoff (delays increase between retries)
- Failed messages go to a dead letter queue
- Message Processing:
- Adds correlation ID for tracking
- Logs incoming messages
- Transforms message format
- Sets JMS properties
- ActiveMQ Configuration:
1
activemq:queue:orders?timeToLive=86400000
- Sets message expiry time (24 hours)
- Uses persistent delivery for reliability
Here’s a visualization of the message flow:
flowchart LR
subgraph Kafka
KT[Orders Topic]
end
subgraph "Camel Route"
C[Consumer]
T[Transform]
P[Producer]
end
subgraph ActiveMQ
Q[Orders Queue]
DL[Dead Letter Queue]
end
KT --> C
C --> T
T --> P
P --> Q
P -.-> DL
style C fill:#f96,stroke:#333
style T fill:#69f,stroke:#333
style P fill:#9f6,stroke:#333
To use this route, you would need:
- Dependencies in your
pom.xml
:
1
2
3
4
5
6
7
8
9
10
11
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-activemq</artifactId>
</dependency>
</dependencies>
- Configure connection properties in your application.properties:
1
2
3
kafka.bootstrap.servers=localhost:9092
activemq.broker.url=tcp://localhost:61616
Some key features of this implementation:
- Reliable message delivery with retry mechanism
- Message tracking with correlation IDs
- Comprehensive logging
- Dead letter queue for failed messages
- Scalable with multiple consumers
- Message transformation capability
Advanced System Management
System outages are common and can impact your services. One common approach is to use circuit breakers and route-level control to manage system outages.
Circuit Breaker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Order Processing Route with Circuit Breaker
from("kafka:orders-topic")
.routeId("order-processing-route")
.circuitBreaker()
.resilience4jConfiguration()
.failureRateThreshold(50)
// 50% failure rate to open circuit
.waitDurationInOpenState(10000)
// Wait 10 seconds in open state
.end()
.to("http://order-service/api/orders")
.onFallback()
.to("kafka:failed-orders")
.end();
This code sets up a route in Apache Camel that reads messages from a Kafka topic named orders-topic
. It uses a circuit breaker pattern to handle failures gracefully. If the failure rate exceeds 50%, the circuit breaker will open and stop sending requests to the primary endpoint (http://order-service/api/orders) for 10 seconds. During this time, any incoming messages will be sent to a fallback Kafka topic named failed-orders
. This helps to prevent overwhelming the order service with requests when it is experiencing issues.
Implementing the circuit breaker pattern in a route is quite straightforward as it is provided out of the box by Camel.
Let’s take another example to understand how we can apply this in a Camel route.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.CircuitBreakerDefinition;
import org.apache.camel.builder.DeadLetterChannelBuilder;
import org.apache.camel.LoggingLevel;
public class MultiRouteWithCircuitBreaker extends RouteBuilder {
private static final int THRESHOLD = 5;
private static final int HALF_OPEN_AFTER = 10000;
@Override
public void configure() throws Exception {
// Global error handler
errorHandler(deadLetterChannel("direct:errorHandler")
.maximumRedeliveries(3)
.redeliveryDelay(1000)
.useExponentialBackOff()
.logRetryAttempted(true));
// Route Control and Monitoring
from("timer:routeMonitor?period=60000")
.routeId("monitor-route")
.process(exchange -> {
checkDestinationHealth(exchange);
})
.choice()
.when(header("systemStatus").isEqualTo("DOWN"))
.process(exchange -> {
// Stop affected routes
getContext().getRouteController().stopRoute("order-processing-route");
getContext().getRouteController().stopRoute("payment-processing-route");
})
.otherwise()
.process(exchange -> {
// Start routes if they're stopped
if (!getContext().getRouteController().getRouteStatus("order-processing-route").isStarted()) {
getContext().getRouteController().startRoute("order-processing-route");
}
if (!getContext().getRouteController().getRouteStatus("payment-processing-route").isStarted()) {
getContext().getRouteController().startRoute("payment-processing-route");
}
});
// Order Processing Route with Circuit Breaker
from("kafka:orders-topic")
.routeId("order-processing-route")
.circuitBreaker()
.resilience4jConfiguration()
.failureRateThreshold(50)
.waitDurationInOpenState(10000)
.minimumNumberOfCalls(10) // Minimum calls before calculating failure rate
.end()
.log("Processing order: ${body}")
.to("http://order-service/api/orders")
.to("direct:payment-processor")
.onFallback()
.log(LoggingLevel.ERROR, "Circuit Breaker triggered for order processing")
.to("kafka:failed-orders")
.end();
// Payment Processing Route with Circuit Breaker
from("direct:payment-processor")
.routeId("payment-processing-route")
.circuitBreaker()
.resilience4jConfiguration()
.failureRateThreshold(50)
.waitDurationInOpenState(10000)
.end()
.log("Processing payment for order")
.to("http://payment-service/api/process")
.onFallback()
.log(LoggingLevel.ERROR, "Circuit Breaker triggered for payment processing")
.to("kafka:failed-payments")
.end();
// Error Handler Route
from("direct:errorHandler")
.routeId("error-handler-route")
.log(LoggingLevel.ERROR, "Error processing message: ${exception.message}")
.choice()
.when(header("routeId").isEqualTo("order-processing-route"))
.to("kafka:failed-orders")
.when(header("routeId").isEqualTo("payment-processing-route"))
.to("kafka:failed-payments")
.otherwise()
.to("kafka:dead-letter-queue");
// Recovery Route - Processes failed messages when system recovers
from("kafka:failed-orders")
.routeId("recovery-route")
.circuitBreaker()
.resilience4jConfiguration()
.failureRateThreshold(50)
.waitDurationInOpenState(30000)
.end()
.log("Attempting to recover failed order")
.to("http://order-service/api/orders")
.onFallback()
.log(LoggingLevel.ERROR, "Recovery still not possible")
.end();
}
}
Let’s visualize the route structure and circuit breaker flow:
flowchart TD
subgraph "Route Monitor"
M[Monitor Timer] --> HC{Health Check}
HC -->|Healthy| SR[Start Routes]
HC -->|Unhealthy| SP[Stop Routes]
end
subgraph "Order Processing"
KO[Kafka Orders] --> CB1{Circuit Breaker}
CB1 -->|Closed| OP[Process Order]
CB1 -->|Open| FB1[Fallback - Failed Orders]
OP --> PP[Payment Processing]
end
subgraph "Payment Processing"
PP --> CB2{Circuit Breaker}
CB2 -->|Closed| PS[Process Payment]
CB2 -->|Open| FB2[Fallback - Failed Payments]
end
subgraph "Recovery"
FB1 --> RR[Recovery Route]
FB2 --> RR
RR --> CB3{Circuit Breaker}
CB3 -->|Closed| RPR[Reprocess]
CB3 -->|Open| FB3[Delay Recovery]
end
style CB1 fill:#f96,stroke:#333
style CB2 fill:#f96,stroke:#333
style CB3 fill:#f96,stroke:#333
style M fill:#69f,stroke:#333
Key features of this implementation:
- Route Monitoring:
- Periodic health checks of destination systems
- Automatic route stopping/starting based on system health
- Configurable monitoring intervals
- Circuit Breaker Pattern:
- Uses Resilience4j for circuit breaker implementation
- Configurable failure thresholds and recovery times
- Separate circuit breakers for different routes
- Fallback mechanisms for handling failures
- Multiple Routes:
- Order processing route
- Payment processing route
- Error handling route
- Recovery route
- Error Handling:
- Dead letter channel for failed messages
- Different error queues for different types of failures
- Retry mechanism with exponential backoff
- Recovery Mechanism:
- Separate route for processing failed messages
- Circuit breaker protection for recovery attempts
- Longer wait times during recovery
To use this in your application:
- Add dependencies to
pom.xml
:
1
2
3
4
5
6
7
8
9
10
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-resilience4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
</dependency>
</dependencies>
- Configure in
application.properties
:
1
2
3
4
5
6
7
8
9
# Circuit Breaker Configuration
camel.circuitbreaker.threshold=5
camel.circuitbreaker.half-open-after=10000
# Kafka Configuration
kafka.bootstrap.servers=localhost:9092
# Health Check Configuration
health.check.interval=60000
Health Check (Optional)
In the previous example, we saw how based on a health check we can stop and start routes. The timer will invoke the health check route at fixed intervals. Let’s zoom into the implementation of the health check and see how it can be implemented to consider various different systems.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.client.methods.HttpGet;
import javax.sql.DataSource;
import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
import java.net.Socket;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HealthCheckRoute extends RouteBuilder {
private static final int TIMEOUT = 5000;
private static final String ORDER_SERVICE_URL = "http://order-service/health";
private static final String PAYMENT_SERVICE_URL = "http://payment-service/health";
private static final String MQ_HOST = "localhost";
private static final int MQ_PORT = 61616;
private final DataSource dataSource;
public HealthCheckRoute(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public void configure() throws Exception {
from("timer:routeMonitor?period=60000")
.routeId("health-check-route")
.process(this::checkDestinationHealth)
.choice()
.when(header("systemStatus").isEqualTo("DOWN"))
.log("System status is DOWN. Stopping routes...")
.process(exchange -> stopAffectedRoutes(exchange))
.otherwise()
.log("System status is UP. Starting routes if needed...")
.process(exchange -> startRoutes(exchange));
}
private void checkDestinationHealth(Exchange exchange) {
Map<String, ServiceHealth> healthResults = new HashMap<>();
// Check each service and store results
healthResults.put("orderService", checkHttpService(ORDER_SERVICE_URL));
healthResults.put("paymentService", checkHttpService(PAYMENT_SERVICE_URL));
healthResults.put("database", checkDatabase());
healthResults.put("messageQueue", checkMessageQueue());
// Determine overall system status
boolean isSystemHealthy = healthResults.values().stream()
.allMatch(ServiceHealth::isHealthy);
// Set headers with detailed health information
exchange.getIn().setHeader("systemStatus", isSystemHealthy ? "UP" : "DOWN");
exchange.getIn().setHeader("healthResults", healthResults);
// Store specific service statuses for granular route control
exchange.getIn().setHeader("orderServiceStatus",
healthResults.get("orderService").isHealthy() ? "UP" : "DOWN");
exchange.getIn().setHeader("paymentServiceStatus",
healthResults.get("paymentService").isHealthy() ? "UP" : "DOWN");
}
private ServiceHealth checkHttpService(String url) {
try (CloseableHttpClient client = createHttpClient()) {
HttpGet request = new HttpGet(url);
int responseCode = client.execute(request)
.getStatusLine()
.getStatusCode();
boolean isHealthy = responseCode >= 200 && responseCode < 300;
return new ServiceHealth(
isHealthy,
isHealthy ? "Service responding normally" :
"Service returned status code: " + responseCode
);
} catch (Exception e) {
log.error("Error checking HTTP service {}: {}", url, e.getMessage());
return new ServiceHealth(false, "Service check failed: " + e.getMessage());
}
}
private ServiceHealth checkDatabase() {
try (Connection conn = dataSource.getConnection()) {
boolean isValid = conn.isValid(TIMEOUT);
return new ServiceHealth(
isValid,
isValid ? "Database connection is valid" :
"Database connection test failed"
);
} catch (Exception e) {
log.error("Database health check failed: {}", e.getMessage());
return new ServiceHealth(false, "Database check failed: " + e.getMessage());
}
}
private ServiceHealth checkMessageQueue() {
try (Socket socket = new Socket(MQ_HOST, MQ_PORT)) {
boolean isConnected = socket.isConnected();
return new ServiceHealth(
isConnected,
isConnected ? "Message queue is accessible" :
"Could not connect to message queue"
);
} catch (Exception e) {
log.error("Message queue health check failed: {}", e.getMessage());
return new ServiceHealth(false, "Message queue check failed: " + e.getMessage());
}
}
private CloseableHttpClient createHttpClient() {
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(TIMEOUT)
.setConnectionRequestTimeout(TIMEOUT)
.setSocketTimeout(TIMEOUT)
.build();
return HttpClientBuilder.create()
.setDefaultRequestConfig(config)
.build();
}
private void stopAffectedRoutes(Exchange exchange) {
Map<String, ServiceHealth> healthResults =
exchange.getIn().getHeader("healthResults", Map.class);
try {
if (!healthResults.get("orderService").isHealthy()) {
getContext().getRouteController().stopRoute("order-processing-route");
log.info("Stopped order-processing-route due to health check failure");
}
if (!healthResults.get("paymentService").isHealthy()) {
getContext().getRouteController().stopRoute("payment-processing-route");
log.info("Stopped payment-processing-route due to health check failure");
}
} catch (Exception e) {
log.error("Error stopping routes: {}", e.getMessage());
}
}
private void startRoutes(Exchange exchange) {
Map<String, ServiceHealth> healthResults =
exchange.getIn().getHeader("healthResults", Map.class);
try {
if (healthResults.get("orderService").isHealthy() &&
!getContext().getRouteController()
.getRouteStatus("order-processing-route").isStarted()) {
getContext().getRouteController().startRoute("order-processing-route");
log.info("Started order-processing-route after health check recovery");
}
if (healthResults.get("paymentService").isHealthy() &&
!getContext().getRouteController()
.getRouteStatus("payment-processing-route").isStarted()) {
getContext().getRouteController().startRoute("payment-processing-route");
log.info("Started payment-processing-route after health check recovery");
}
} catch (Exception e) {
log.error("Error starting routes: {}", e.getMessage());
}
}
}
@lombok.Value
class ServiceHealth {
boolean healthy;
String message;
}
Health Check Visualization
flowchart TD
Timer[Timer Trigger] --> HC[Health Check Process]
subgraph "Health Checks"
HC --> |Check| HTTP[HTTP Services]
HC --> |Check| DB[Database]
HC --> |Check| MQ[Message Queue]
end
subgraph "Status Evaluation"
HTTP --> Eval[Evaluate Results]
DB --> Eval
MQ --> Eval
Eval --> Status{System Status}
end
Status -->|DOWN| Stop[Stop Affected Routes]
Status -->|UP| Start[Start Routes if Stopped]
style HC fill:#f96,stroke:#333
style Status fill:#69f,stroke:#333
Key points about the implementation:
- Health Check Process:
- Checks multiple services: HTTP endpoints, database, message queue
- Each check has a timeout (5 seconds)
- Results are stored in a
ServiceHealth
object containing:healthy
: boolean statusmessage
: detailed status message
- Status Headers:
systemStatus
: Overall system status (“UP” or “DOWN”)healthResults
: Map containing detailed health check resultsorderServiceStatus
: Specific status for order servicepaymentServiceStatus
: Specific status for payment service
- Route Control:
- Routes are stopped/started based on their specific service health
- Order processing route depends on order service health
- Payment processing route depends on payment service health
To use this implementation:
- Add required dependencies:
1
2
3
4
5
6
7
8
9
10
<dependencies>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
- Configure in
application.properties
:
1
2
3
4
5
6
# Health Check Configuration
health.check.timeout=5000
health.check.order.service.url=http://order-service/health
health.check.payment.service.url=http://payment-service/health
health.check.mq.host=localhost
health.check.mq.port=61616
Conclusion
Apache Camel provides a powerful framework for implementing enterprise integration patterns. By combining these patterns with robust system management features like circuit breakers and health checks, you can build reliable, scalable integration solutions.
Remember to always:
- Implement proper error handling
- Monitor system health
- Use circuit breakers to prevent cascade failures
- Maintain clear logging and monitoring
- Consider message persistence and reliability requirements
For more information, visit the Apache Camel documentation.
Comments powered by Disqus.