
Key Components: KafkaTemplate | CompletableFuture | SendResult Metadata
Reliable event publishing is crucial in event-driven architectures. Spring Boot's KafkaTemplate simplifies Kafka producer operations while providing flexibility for both synchronous and asynchronous publishing patterns. This guide covers both approaches with proper error handling.
Service Implementation with KafkaTemplate
package com.example.productservice.service;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.UUID;
@Service
public class ProductServiceImpl implements ProductService {
private final KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate;
public ProductServiceImpl(
KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public String createProduct(CreateProductRequestModel requestModel) {
// 1. Generate product ID
String productId = UUID.randomUUID().toString();
// 2. Create event object
ProductCreatedEvent event = new ProductCreatedEvent(
productId,
requestModel.getTitle(),
requestModel.getPrice(),
requestModel.getQuantity()
);
// 3. Publish event (asynchronous example)
CompletableFuture<SendResult<String, ProductCreatedEvent>> future =
kafkaTemplate.send("product-created-events", productId, event);
// 4. Handle completion (optional)
future.whenComplete((result, ex) -> {
if (ex != null) {
logger.error("Failed to send message: " + ex.getMessage());
} else {
logger.info("Message sent successfully: " +
result.getRecordMetadata().toString());
}
});
return productId;
}
}
Parameter | Type | Purpose |
---|---|---|
Topic Name | String | Target Kafka topic |
Message Key | String | Partition routing key (productId) |
Event Object | ProductCreatedEvent | Payload to serialize |
Synchronous vs Asynchronous Publishing
// Synchronous publishing (blocks until ack)
SendResult<String, ProductCreatedEvent> result =
kafkaTemplate.send("topic", key, event).get();
// Asynchronous with callback
CompletableFuture<SendResult<String, ProductCreatedEvent>> future =
kafkaTemplate.send("topic", key, event);
future.whenComplete((res, ex) -> {
if (ex != null) {
// Handle failure
} else {
// Handle success
}
});
// Fire-and-forget (no ack)
kafkaTemplate.send("topic", key, event);
Production Tip: For critical operations, use synchronous publishing or proper error handling with asynchronous approaches to ensure message delivery.
Understanding SendResult Metadata
future.whenComplete((result, ex) -> {
if (result != null) {
RecordMetadata metadata = result.getRecordMetadata();
logger.info("""
Message persisted to:
Topic: {}
Partition: {}
Offset: {}
Timestamp: {}
""",
metadata.topic(),
metadata.partition(),
metadata.offset(),
metadata.timestamp());
}
});
Important: Always consider transaction boundaries when publishing events after database operations. Use @Transactional or Kafka transactions for consistency.
Complete Publishing Flow
- Generate unique product ID
- Create event object with product data
- Configure KafkaTemplate with proper serializers
- Publish event with chosen approach (sync/async)
- Handle success/failure scenarios
- Return product ID to client
Next Steps: Learn about Kafka transactions | Error handling patterns
Frequently Asked Questions
Q1. When to use synchronous vs asynchronous publishing?
A: Use synchronous when you need immediate acknowledgment (like in this product creation flow). Use asynchronous for better throughput in non-critical paths.
Q2. How to ensure message ordering?
A: Use the same message key for related messages and configure appropriate partitioning. The example uses productId as key for this purpose.
Q3. What serializers should I use with KafkaTemplate?
A: Configure JsonSerializer for development or AvroSerializer for production. Ensure your consumer uses matching deserializers.