Publishing Events to Kafka in Spring Boot Microservices

Admin, Student's Library
0
Kafka Event Publishing

Key Components: KafkaTemplate | CompletableFuture | SendResult Metadata

Reliable event publishing is crucial in event-driven architectures. Spring Boot's KafkaTemplate simplifies Kafka producer operations while providing flexibility for both synchronous and asynchronous publishing patterns. This guide covers both approaches with proper error handling.

Service Implementation with KafkaTemplate

ProductServiceImpl.java
package com.example.productservice.service;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.UUID;

@Service
public class ProductServiceImpl implements ProductService {

    private final KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate;
    
    public ProductServiceImpl(
            KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public String createProduct(CreateProductRequestModel requestModel) {
        // 1. Generate product ID
        String productId = UUID.randomUUID().toString();
        
        // 2. Create event object
        ProductCreatedEvent event = new ProductCreatedEvent(
            productId,
            requestModel.getTitle(),
            requestModel.getPrice(),
            requestModel.getQuantity()
        );
        
        // 3. Publish event (asynchronous example)
        CompletableFuture<SendResult<String, ProductCreatedEvent>> future = 
            kafkaTemplate.send("product-created-events", productId, event);
        
        // 4. Handle completion (optional)
        future.whenComplete((result, ex) -> {
            if (ex != null) {
                logger.error("Failed to send message: " + ex.getMessage());
            } else {
                logger.info("Message sent successfully: " + 
                    result.getRecordMetadata().toString());
            }
        });
        
        return productId;
    }
}
Parameter Type Purpose
Topic Name String Target Kafka topic
Message Key String Partition routing key (productId)
Event Object ProductCreatedEvent Payload to serialize

Synchronous vs Asynchronous Publishing

Publishing Approaches
// Synchronous publishing (blocks until ack)
SendResult<String, ProductCreatedEvent> result = 
    kafkaTemplate.send("topic", key, event).get();

// Asynchronous with callback
CompletableFuture<SendResult<String, ProductCreatedEvent>> future = 
    kafkaTemplate.send("topic", key, event);

future.whenComplete((res, ex) -> {
    if (ex != null) {
        // Handle failure
    } else {
        // Handle success
    }
});

// Fire-and-forget (no ack)
kafkaTemplate.send("topic", key, event);

Production Tip: For critical operations, use synchronous publishing or proper error handling with asynchronous approaches to ensure message delivery.

Understanding SendResult Metadata

Metadata Access
future.whenComplete((result, ex) -> {
    if (result != null) {
        RecordMetadata metadata = result.getRecordMetadata();
        logger.info("""
            Message persisted to:
            Topic: {}
            Partition: {}
            Offset: {}
            Timestamp: {}
            """, 
            metadata.topic(),
            metadata.partition(),
            metadata.offset(),
            metadata.timestamp());
    }
});

Important: Always consider transaction boundaries when publishing events after database operations. Use @Transactional or Kafka transactions for consistency.

Complete Publishing Flow

  1. Generate unique product ID
  2. Create event object with product data
  3. Configure KafkaTemplate with proper serializers
  4. Publish event with chosen approach (sync/async)
  5. Handle success/failure scenarios
  6. Return product ID to client

Frequently Asked Questions

Q1. When to use synchronous vs asynchronous publishing?
A: Use synchronous when you need immediate acknowledgment (like in this product creation flow). Use asynchronous for better throughput in non-critical paths.

Q2. How to ensure message ordering?
A: Use the same message key for related messages and configure appropriate partitioning. The example uses productId as key for this purpose.

Q3. What serializers should I use with KafkaTemplate?
A: Configure JsonSerializer for development or AvroSerializer for production. Ensure your consumer uses matching deserializers.

Post a Comment

0 Comments
* Please Don't Spam Here. All the Comments are Reviewed by Admin.
Post a Comment (0)
Our website uses cookies to enhance your experience. Learn More
Accept !