Skip to main content

4 posts tagged with "kafka"

View All Tags

Kafka schema registry

· 6 min read
Haril Song
Owner, Software Engineer at 42dot

Problems

  • 메세지 스펙이 변경될 경우, 의존하고 있는 모듈 or Git Repository 마다 DTO 업데이트가 필요하다.
    • 하위호환성이나 상위호환성이 자주 깨진다.
    • DTO 관리의 복잡도가 선형적으로 증가한다.
    • Java 는 특히 Json 기반의 메세지를 다루기에 불편한 점이 많다.
  • 카프카는 ByteArray 형태로 메세지를 전송하나, 애플리케이션 레벨에서는 이를 역직렬화하여 관리하는 것이 권장된다.
    • payload 에 데이터를 담을 때마다 ByteArray 로 직렬화하는 과정, 그리고 이 반대 과정이 매번 이루어진다.
    • 코드 복잡도 상승
    • ByteArray - JSON - Object

ByteArray + DTO + ObjectMapper 방식

Kafka 메시지가 JSON 형식이라고 가정:

data class User(
val id: String,
val name: String,
val email: String?,
val age: Int?,
val createdAt: Long
)

val rawBytes: ByteArray = record.value()
val user = objectMapper.readValue(rawBytes, User::class.java)
  • 스키마 정보를 코드에서 직접 관리 (e.g. DTO 클래스)
  • Kafka 메시지의 구조가 JSON 포맷으로 되어 있어야 가능
  • Schema Registry 불필요
  • 메시지 구조가 바뀌면 DTO도 바뀌어야 하고, 호환성 검사 수동

GenericRecord (Avro + Schema Registry)

val record = consumerRecord.value() as GenericRecord
val name = record.get("name").toString()
  • DTO 없이도 동작 가능 (GenericRecord), 또는 generated class 사용 가능
  • 메시지 구조 변경 시 Registry의 호환성 정책으로 안전하게 진화 가능

SpecificRecord (Avro + Schema Registry)

// user.avsc
{
"type": "record",
"name": "User",
"fields": [...]
}
// 자동 생성
public class User extends SpecificRecordBase implements SpecificRecord {
private String id;
private String name;
...
}
@KafkaListener(topics = ["\${kafka.topic.user}"], groupId = "\${spring.kafka.consumer.group-id}")
fun consume(user: User) {
val userId = user.getId()
logger.info("Received user with id: {}, name: {}", userId, user.getName())

users[userId] = user
}

코드가 생성되어있기 때문에 직접 참조 가능

  • 정적 타입 지원
    • 직렬화/역직렬화 시 안정성 보장
    • IDE 지원 우수
  • Kafka Schema Registry와 완전 호환
  • 성능 우수
    • GenericRecord 는 리플렉션을 활용하여 비교적 느림

Schema 정의 및 사용

  • IntelliJ Junie 를 사용해서 샘플 작성
plugins {
id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}

repositories {
mavenCentral()
maven {
url = uri("https://packages.confluent.io/maven/")
}
}

dependencies {
// Avro and Schema Registry
implementation("org.apache.avro:avro:1.11.3")
implementation("io.confluent:kafka-avro-serializer:7.5.1")
implementation("io.confluent:kafka-schema-registry-client:7.5.1")
}

avro {
isCreateSetters.set(true)
isCreateOptionalGetters.set(false)
isGettersReturnOptional.set(false)
fieldVisibility.set("PRIVATE")
outputCharacterEncoding.set("UTF-8")
stringType.set("String")
templateDirectory.set(null as String?)
isEnableDecimalLogicalType.set(true)
}

User schema 정의

{
"namespace": "com.haril.kafkaschemaregistrydemo.schema",
"type": "record",
"name": "User",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "email",
"type": ["null", "string"],
"default": null
},
{
"name": "age",
"type": ["null", "int"],
"default": null
},
{
"name": "createdAt",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}

자동으로 User 클래스가 생성된 것을 확인할 수 있고,

다른 모듈에서 참조하여 사용할 수 있다.

Schema 의 업데이트

  • 레지스트리에 스키마 정보가 없을 경우, kafka 는 메세지가 발행될 때 연결된 schema registry 에 스키마를 업로드한다.
  • Web UI 를 사용하여 업데이트할수도 있다.

Schema 호환성 정책

대표적으로는 아래와 같다.

모드설명예시
BACKWARD이전 버전의 Consumer는 새 메시지를 이해 가능필드 추가 가능, 제거는 불가
FORWARD새 버전의 Consumer는 이전 메시지를 이해 가능필드 제거 가능, 추가는 불가
FULL양방향 모두 호환제한적 변경만 허용
NONE어떤 변경도 호환성 보장 안 함변경 시 consumer crash 위험 ↑

BACKWARD 정책을 사용할 때는 스키마에 필드를 추가할 때 default 값을 지정해줘야 예전 버전의 스키마를 사용하는 Consumer 도 안전하게 스키마를 역직렬화할 수 있다. null 을 default 로 지정하는 것도 가능하며 이 경우 optional 필드임을 의미하게 된다.

Kafka Streams 는 BACKWARD 만 지원한다.

FULL 정책은 양방향 호환성이 유지되므로 편리하지만 Kafka Streams 는 BACKWARD 정책만 지원하므로 선택지가 제한된다.

만약 GenericRecord 방식으로 사용할 경우, 스키마를 동적으로 로드한다. 이 경우 스키마가 변경되더라도 서비스의 재배포가 필요없다.

ConsumerRecord<String, GenericRecord> record = ...
GenericRecord value = record.value();

Integer age = (Integer) value.get("age");
String name = value.get("name").toString();

props.put("specific.avro.reader", false) 설정으로 활성화할 수 있으며, Map 으로 사용하는 것과 비슷한 경험을 제공할 수도 있다.

항목SpecificRecordGenericRecord
사용 방식Avro 스키마로 Java/Kotlin 클래스 미리 생성런타임에 스키마 파싱 후 동적으로 사용
성능빠르고 타입 안전약간 느리고 타입 안정성 떨어짐
유연성스키마 변경 시 코드 재생성 필요스키마 변경에도 유연하게 대응 가능
권장 상황스키마가 고정된 서비스스키마가 자주 바뀌거나 다양할 때

다음과 같은 사용이라면 GenericRecord 사용을 고려할 수 있다.

  • 다양한 스키마를 처리해야 하는 Kafka consumer 플랫폼
  • 스키마 registry 기반 멀티팀 환경 (스키마 버전이 자주 바뀌는 경우)
  • Avro 스키마가 외부에 의해 관리되고 있어 내부에서 클래스를 만들기 곤란할 때

Producer 에서는 명확한 데이터 스키마가 있어야하므로 .avsc 파일을 통해 객체를 생성하고, Consumer 쪽에서는 GenericRecord 를 사용하여 동적으로 대응하는 방법도 유용하다.

Schema management and Monitoring

Landoop UI

스키마 변경 내역을 계속 기록한다.

Kafka UI 에서는 value 가 schema registry 로 변경된 것을 확인할 수 있다.

Conclusion

Pros

  • 여러개의 중첩된 DTO 를 다루는 대신, 하나의 avsc 파일만 관리하면 되서 비교적 관리 부담이 줄어든다.
  • 모든 서비스는 Schema Registry에서 실시간으로 스키마 조회
    • 메시지에 스키마 정보가 포함되지 않으므로, 네트워크 대역폭을 효율적으로 사용하게 된다
  • Kafka 메시지는 스키마 ID (magic byte + schema ID) 를 포함하므로, 컨슈머는 로컬에 .avsc가 없어도 자동 역직렬화 가능
  • 여러 팀에서 하나의 스트림 파이프라인 or 토픽에 메세지를 발행하는 경우 특히 유용
    • 파이프라인에 이상한 데이터가 들어오지 않게 된다

Cons

  • 별도 API 서버를 통해 배포해야 한다.
  • 인프라 팀과 협업이 필요하다.
  • Schema registry 가 다운될 경우 파이프라인이 멈출 수 있기 때문에, 관리 포인트가 오히려 증가할 수 도 있다.

그래서 언제 쓰면 좋을까?

  • 프로젝트 초기여서 설정부터 하는 경우
  • 하나의 파이프라인을 여러 팀이 공유하여 사용하는 경우
  • 회사에 카프카를 전문적으로 다루는 별도의 팀이 있을 경우
  • protobuf 에 익숙한 경우

Reference

KafkaKRU Meetup Review

· 2 min read
Haril Song
Owner, Software Engineer at 42dot

KafkaKRU 밋업 리뷰: Event Sourcing부터 리더 파티션 밸런싱까지

2024년 11월 21일, 서울 중구 삼화타워에서 열린 KafkaKRU 밋업에 참석했습니다. 사실 대기자 명단에 있었어서 참석이 어려운 상태였던 것 같지만, 열정으로 봐주셔서 다행히 쫓겨나지는 않았습니다. 결과적으로는 예상을 훨씬 뛰어넘는 값진 시간이었어요.

[Spring Batch] KafkaItemReader

· 2 min read
Haril Song
Owner, Software Engineer at 42dot
info

I used Docker to install Kafka before writing this post, but that content is not covered here.

What is KafkaItemReader..?

In Spring Batch, the KafkaItemReader is provided for processing data from Kafka topics.

Let's create a simple batch job.

Example

First, add the necessary dependencies.

dependencies {
...
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.kafka:spring-kafka'
...
}

Configure Kafka settings in application.yml.

spring:
kafka:
bootstrap-servers:
- localhost:9092
consumer:
group-id: batch
@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaSubscribeJobConfig {

private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final KafkaProperties kafkaProperties;

@Bean
Job kafkaJob() {
return jobBuilderFactory.get("kafkaJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}

@Bean
Step step1() {
return stepBuilderFactory.get("step1")
.<String, String>chunk(5)
.reader(kafkaItemReader())
.writer(items -> log.info("items: {}", items))
.build();
}

@Bean
KafkaItemReader<String, String> kafkaItemReader() {
Properties properties = new Properties();
properties.putAll(kafkaProperties.buildConsumerProperties());

return new KafkaItemReaderBuilder<String, String>()
.name("kafkaItemReader")
.topic("test") // 1.
.partitions(0) // 2.
.partitionOffsets(new HashMap<>()) // 3.
.consumerProperties(properties) // 4.
.build();
}
}
  1. Specify the topic from which to read the data.
  2. Specify the partition of the topic; multiple partitions can be specified.
  3. If no offset is specified in KafkaItemReader, it reads from offset 0. Providing an empty map reads from the last offset.
  4. Set the essential properties for execution.
tip

KafkaProperties provides various public interfaces to conveniently use Kafka in Spring.

Try it out

Now, when you run the batch job, consumer groups are automatically created based on the information in application.yml, and the job starts subscribing to the topic.

Let's use the kafka console producer to add data from 1 to 10 to the test topic.

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

produce-topic

You can see that the batch job is successfully subscribing to the topic.

subscribe-batch

Since we set the chunkSize to 5, the data is processed in batches of 5.

So far, we have looked at the basic usage of KafkaItemReader in Spring Batch. Next, let's see how to write test code.