Skip to main content

4 posts tagged with "kafka"

View All Tags

Kafka schema registry

· 6 min read
Haril Song
Owner, Software Engineer at 42dot

Problems

  • 메세지 스펙이 변경될 경우, 의존하고 있는 모듈 or Git Repository 마다 DTO 업데이트가 필요하다.
    • 하위호환성이나 상위호환성이 자주 깨진다.
    • DTO 관리의 복잡도가 선형적으로 증가한다.
    • Java 는 특히 Json 기반의 메세지를 다루기에 불편한 점이 많다.
  • 카프카는 ByteArray 형태로 메세지를 전송하나, 애플리케이션 레벨에서는 이를 역직렬화하여 관리하는 것이 권장된다.
    • payload 에 데이터를 담을 때마다 ByteArray 로 직렬화하는 과정, 그리고 이 반대 과정이 매번 이루어진다.
    • 코드 복잡도 상승
    • ByteArray - JSON - Object

KafkaKRU Meetup Review

· 2 min read
Haril Song
Owner, Software Engineer at 42dot

KafkaKRU 밋업 리뷰: Event Sourcing부터 리더 파티션 밸런싱까지

2024년 11월 21일, 서울 중구 삼화타워에서 열린 KafkaKRU 밋업에 참석했습니다. 사실 대기자 명단에 있었어서 참석이 어려운 상태였던 것 같지만, 열정으로 봐주셔서 다행히 쫓겨나지는 않았습니다. 결과적으로는 예상을 훨씬 뛰어넘는 값진 시간이었어요.

[Spring Batch] KafkaItemReader

· 2 min read
Haril Song
Owner, Software Engineer at 42dot
info

I used Docker to install Kafka before writing this post, but that content is not covered here.

What is KafkaItemReader..?

In Spring Batch, the KafkaItemReader is provided for processing data from Kafka topics.

Let's create a simple batch job.

Example

First, add the necessary dependencies.

dependencies {
...
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.kafka:spring-kafka'
...
}

Configure Kafka settings in application.yml.

spring:
kafka:
bootstrap-servers:
- localhost:9092
consumer:
group-id: batch
@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaSubscribeJobConfig {

private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final KafkaProperties kafkaProperties;

@Bean
Job kafkaJob() {
return jobBuilderFactory.get("kafkaJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}

@Bean
Step step1() {
return stepBuilderFactory.get("step1")
.<String, String>chunk(5)
.reader(kafkaItemReader())
.writer(items -> log.info("items: {}", items))
.build();
}

@Bean
KafkaItemReader<String, String> kafkaItemReader() {
Properties properties = new Properties();
properties.putAll(kafkaProperties.buildConsumerProperties());

return new KafkaItemReaderBuilder<String, String>()
.name("kafkaItemReader")
.topic("test") // 1.
.partitions(0) // 2.
.partitionOffsets(new HashMap<>()) // 3.
.consumerProperties(properties) // 4.
.build();
}
}
  1. Specify the topic from which to read the data.
  2. Specify the partition of the topic; multiple partitions can be specified.
  3. If no offset is specified in KafkaItemReader, it reads from offset 0. Providing an empty map reads from the last offset.
  4. Set the essential properties for execution.
tip

KafkaProperties provides various public interfaces to conveniently use Kafka in Spring.

Try it out

Now, when you run the batch job, consumer groups are automatically created based on the information in application.yml, and the job starts subscribing to the topic.

Let's use the kafka console producer to add data from 1 to 10 to the test topic.

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

produce-topic

You can see that the batch job is successfully subscribing to the topic.

subscribe-batch

Since we set the chunkSize to 5, the data is processed in batches of 5.

So far, we have looked at the basic usage of KafkaItemReader in Spring Batch. Next, let's see how to write test code.