メインコンテンツにスキップ

Kafka schema registry

· 6分の読み時間
Haril Song
Owner, Software Engineer at 42dot

Problems

  • 메세지 스펙이 변경될 경우, 의존하고 있는 모듈 or Git Repository 마다 DTO 업데이트가 필요하다.
    • 하위호환성이나 상위호환성이 자주 깨진다.
    • DTO 관리의 복잡도가 선형적으로 증가한다.
    • Java 는 특히 Json 기반의 메세지를 다루기에 불편한 점이 많다.
  • 카프카는 ByteArray 형태로 메세지를 전송하나, 애플리케이션 레벨에서는 이를 역직렬화하여 관리하는 것이 권장된다.
    • payload 에 데이터를 담을 때마다 ByteArray 로 직렬화하는 과정, 그리고 이 반대 과정이 매번 이루어진다.
    • 코드 복잡도 상승
    • ByteArray - JSON - Object

ByteArray + DTO + ObjectMapper 방식

Kafka 메시지가 JSON 형식이라고 가정:

data class User(
val id: String,
val name: String,
val email: String?,
val age: Int?,
val createdAt: Long
)

val rawBytes: ByteArray = record.value()
val user = objectMapper.readValue(rawBytes, User::class.java)
  • 스키마 정보를 코드에서 직접 관리 (e.g. DTO 클래스)
  • Kafka 메시지의 구조가 JSON 포맷으로 되어 있어야 가능
  • Schema Registry 불필요
  • 메시지 구조가 바뀌면 DTO도 바뀌어야 하고, 호환성 검사 수동

GenericRecord (Avro + Schema Registry)

val record = consumerRecord.value() as GenericRecord
val name = record.get("name").toString()
  • DTO 없이도 동작 가능 (GenericRecord), 또는 generated class 사용 가능
  • 메시지 구조 변경 시 Registry의 호환성 정책으로 안전하게 진화 가능

SpecificRecord (Avro + Schema Registry)

// user.avsc
{
"type": "record",
"name": "User",
"fields": [...]
}
// 자동 생성
public class User extends SpecificRecordBase implements SpecificRecord {
private String id;
private String name;
...
}
@KafkaListener(topics = ["\${kafka.topic.user}"], groupId = "\${spring.kafka.consumer.group-id}")
fun consume(user: User) {
val userId = user.getId()
logger.info("Received user with id: {}, name: {}", userId, user.getName())

users[userId] = user
}

코드가 생성되어있기 때문에 직접 참조 가능

  • 정적 타입 지원
    • 직렬화/역직렬화 시 안정성 보장
    • IDE 지원 우수
  • Kafka Schema Registry와 완전 호환
  • 성능 우수
    • GenericRecord 는 리플렉션을 활용하여 비교적 느림

Schema 정의 및 사용

  • IntelliJ Junie 를 사용해서 샘플 작성
plugins {
id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}

repositories {
mavenCentral()
maven {
url = uri("https://packages.confluent.io/maven/")
}
}

dependencies {
// Avro and Schema Registry
implementation("org.apache.avro:avro:1.11.3")
implementation("io.confluent:kafka-avro-serializer:7.5.1")
implementation("io.confluent:kafka-schema-registry-client:7.5.1")
}

avro {
isCreateSetters.set(true)
isCreateOptionalGetters.set(false)
isGettersReturnOptional.set(false)
fieldVisibility.set("PRIVATE")
outputCharacterEncoding.set("UTF-8")
stringType.set("String")
templateDirectory.set(null as String?)
isEnableDecimalLogicalType.set(true)
}

User schema 정의

{
"namespace": "com.haril.kafkaschemaregistrydemo.schema",
"type": "record",
"name": "User",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "email",
"type": ["null", "string"],
"default": null
},
{
"name": "age",
"type": ["null", "int"],
"default": null
},
{
"name": "createdAt",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}

자동으로 User 클래스가 생성된 것을 확인할 수 있고,

다른 모듈에서 참조하여 사용할 수 있다.

Schema 의 업데이트

  • 레지스트리에 스키마 정보가 없을 경우, kafka 는 메세지가 발행될 때 연결된 schema registry 에 스키마를 업로드한다.
  • Web UI 를 사용하여 업데이트할수도 있다.

Schema 호환성 정책

대표적으로는 아래와 같다.

모드설명예시
BACKWARD이전 버전의 Consumer는 새 메시지를 이해 가능필드 추가 가능, 제거는 불가
FORWARD새 버전의 Consumer는 이전 메시지를 이해 가능필드 제거 가능, 추가는 불가
FULL양방향 모두 호환제한적 변경만 허용
NONE어떤 변경도 호환성 보장 안 함변경 시 consumer crash 위험 ↑

BACKWARD 정책을 사용할 때는 스키마에 필드를 추가할 때 default 값을 지정해줘야 예전 버전의 스키마를 사용하는 Consumer 도 안전하게 스키마를 역직렬화할 수 있다. null 을 default 로 지정하는 것도 가능하며 이 경우 optional 필드임을 의미하게 된다.

Kafka Streams 는 BACKWARD 만 지원한다.

FULL 정책은 양방향 호환성이 유지되므로 편리하지만 Kafka Streams 는 BACKWARD 정책만 지원하므로 선택지가 제한된다.

만약 GenericRecord 방식으로 사용할 경우, 스키마를 동적으로 로드한다. 이 경우 스키마가 변경되더라도 서비스의 재배포가 필요없다.

ConsumerRecord<String, GenericRecord> record = ...
GenericRecord value = record.value();

Integer age = (Integer) value.get("age");
String name = value.get("name").toString();

props.put("specific.avro.reader", false) 설정으로 활성화할 수 있으며, Map 으로 사용하는 것과 비슷한 경험을 제공할 수도 있다.

항목SpecificRecordGenericRecord
사용 방식Avro 스키마로 Java/Kotlin 클래스 미리 생성런타임에 스키마 파싱 후 동적으로 사용
성능빠르고 타입 안전약간 느리고 타입 안정성 떨어짐
유연성스키마 변경 시 코드 재생성 필요스키마 변경에도 유연하게 대응 가능
권장 상황스키마가 고정된 서비스스키마가 자주 바뀌거나 다양할 때

다음과 같은 사용이라면 GenericRecord 사용을 고려할 수 있다.

  • 다양한 스키마를 처리해야 하는 Kafka consumer 플랫폼
  • 스키마 registry 기반 멀티팀 환경 (스키마 버전이 자주 바뀌는 경우)
  • Avro 스키마가 외부에 의해 관리되고 있어 내부에서 클래스를 만들기 곤란할 때

Producer 에서는 명확한 데이터 스키마가 있어야하므로 .avsc 파일을 통해 객체를 생성하고, Consumer 쪽에서는 GenericRecord 를 사용하여 동적으로 대응하는 방법도 유용하다.

Schema management and Monitoring

Landoop UI

스키마 변경 내역을 계속 기록한다.

Kafka UI 에서는 value 가 schema registry 로 변경된 것을 확인할 수 있다.

Conclusion

Pros

  • 여러개의 중첩된 DTO 를 다루는 대신, 하나의 avsc 파일만 관리하면 되서 비교적 관리 부담이 줄어든다.
  • 모든 서비스는 Schema Registry에서 실시간으로 스키마 조회
    • 메시지에 스키마 정보가 포함되지 않으므로, 네트워크 대역폭을 효율적으로 사용하게 된다
  • Kafka 메시지는 스키마 ID (magic byte + schema ID) 를 포함하므로, 컨슈머는 로컬에 .avsc가 없어도 자동 역직렬화 가능
  • 여러 팀에서 하나의 스트림 파이프라인 or 토픽에 메세지를 발행하는 경우 특히 유용
    • 파이프라인에 이상한 데이터가 들어오지 않게 된다

Cons

  • 별도 API 서버를 통해 배포해야 한다.
  • 인프라 팀과 협업이 필요하다.
  • Schema registry 가 다운될 경우 파이프라인이 멈출 수 있기 때문에, 관리 포인트가 오히려 증가할 수 도 있다.

그래서 언제 쓰면 좋을까?

  • 프로젝트 초기여서 설정부터 하는 경우
  • 하나의 파이프라인을 여러 팀이 공유하여 사용하는 경우
  • 회사에 카프카를 전문적으로 다루는 별도의 팀이 있을 경우
  • protobuf 에 익숙한 경우

Reference