Kafka schema registry
Problems
- When the message specification changes, DTO updates are required for each dependent module or Git Repository.
- Backward or forward compatibility is often broken.
- The complexity of DTO management increases linearly.
- Java has many inconveniences, especially when dealing with JSON-based messages.
- Kafka transmits messages in
ByteArray
format, but it is recommended to deserialize and manage them at the application level.- The process of serializing data into
ByteArray
every time it is put into a payload, and the reverse process, occurs every time. - Increased code complexity.
- ByteArray - JSON - Object
- The process of serializing data into
ByteArray + DTO + ObjectMapper Approach
Assuming the Kafka message is in JSON format:
data class User(
val id: String,
val name: String,
val email: String?,
val age: Int?,
val createdAt: Long
)
val rawBytes: ByteArray = record.value()
val user = objectMapper.readValue(rawBytes, User::class.java)
- Schema information is managed directly in the code (e.g., DTO class)
- The structure of the Kafka message must be in JSON format
- Schema Registry is not required
- If the message structure changes, the DTO must also change, and compatibility checks are manual
GenericRecord (Avro + Schema Registry)
val record = consumerRecord.value() as GenericRecord
val name = record.get("name").toString()
- Can operate without a DTO (
GenericRecord
), or use a generated class - When the message structure changes, it can evolve safely with the Registry's compatibility policy
SpecificRecord (Avro + Schema Registry)
// user.avsc
{
"type": "record",
"name": "User",
"fields": [...]
}
// Auto-generated
public class User extends SpecificRecordBase implements SpecificRecord {
private String id;
private String name;
...
}
@KafkaListener(topics = ["\${kafka.topic.user}"], groupId = "\${spring.kafka.consumer.group-id}")
fun consume(user: User) {
val userId = user.getId()
logger.info("Received user with id: {}, name: {}", userId, user.getName())
users[userId] = user
}
The code is generated, so it can be referenced directly.
- Static type support
- Ensures stability during serialization/deserialization
- Excellent IDE support
- Fully compatible with Kafka Schema Registry
- High performance
- GenericRecord is relatively slow due to reflection
Schema Definition and Usage
- Sample created using IntelliJ Junie
plugins {
id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}
repositories {
mavenCentral()
maven {
url = uri("https://packages.confluent.io/maven/")
}
}
dependencies {
// Avro and Schema Registry
implementation("org.apache.avro:avro:1.11.3")
implementation("io.confluent:kafka-avro-serializer:7.5.1")
implementation("io.confluent:kafka-schema-registry-client:7.5.1")
}
avro {
isCreateSetters.set(true)
isCreateOptionalGetters.set(false)
isGettersReturnOptional.set(false)
fieldVisibility.set("PRIVATE")
outputCharacterEncoding.set("UTF-8")
stringType.set("String")
templateDirectory.set(null as String?)
isEnableDecimalLogicalType.set(true)
}
User schema definition
{
"namespace": "com.haril.kafkaschemaregistrydemo.schema",
"type": "record",
"name": "User",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "email",
"type": ["null", "string"],
"default": null
},
{
"name": "age",
"type": ["null", "int"],
"default": null
},
{
"name": "createdAt",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
You can see that the User
class is automatically generated,
and can be referenced and used in other modules.
Schema Updates
- If the schema information is not in the registry, Kafka uploads the schema to the connected schema registry when a message is published.
- It can also be updated using the Web UI.
Schema Compatibility Policies
The main policies are as follows:
Mode | Description | Example |
---|---|---|
BACKWARD | Consumers with the old version can understand new messages | Fields can be added, but not removed |
FORWARD | Consumers with the new version can understand old messages | Fields can be removed, but not added |
FULL | Compatible in both directions | Only limited changes are allowed |
NONE | No compatibility guarantee for any changes | High risk of consumer crash on change ↑ |
When using the BACKWARD
policy, you must specify a default value when adding a field to the schema so that Consumers using the old version of the schema can safely deserialize it. It is also possible to specify null
as the default, which means the field is optional.
The FULL
policy is convenient because it maintains bidirectional compatibility, but Kafka Streams only supports the BACKWARD
policy, which limits the options.
If you use the GenericRecord
approach, the schema is loaded dynamically. In this case, even if the schema changes, the service does not need to be redeployed.
ConsumerRecord<String, GenericRecord> record = ...
GenericRecord value = record.value();
Integer age = (Integer) value.get("age");
String name = value.get("name").toString();
It can be enabled with the props.put("specific.avro.reader", false)
setting, and can provide an experience similar to using a Map
.
Item | SpecificRecord | GenericRecord |
---|---|---|
Usage | Pre-generate Java/Kotlin classes from Avro schema | Parse schema at runtime and use dynamically |
Performance | Fast and type-safe | Slightly slower and less type-safe |
Flexibility | Requires code regeneration on schema change | Responds flexibly to schema changes |
Recommended for | Services with a fixed schema | When schemas change frequently or are diverse |
The following use cases might be considered for using GenericRecord:
- Kafka consumer platform that needs to handle various schemas
- Schema registry-based multi-team environment (if schema versions change frequently)
- When the Avro schema is managed externally, making it difficult to create classes internally
A useful approach is to generate objects from .avsc
files in the Producer, where a clear data schema is needed, and use GenericRecord
on the Consumer side for dynamic response.
Schema management and Monitoring
Landoop UI
Continuously records schema change history.
In the Kafka UI, you can see that the value has been changed to the schema registry.
Conclusion
Pros
- Instead of dealing with multiple nested DTOs, you only need to manage one avsc file, which reduces the management burden.
- All services query the schema in real-time from the Schema Registry
- Since the message does not contain schema information, network bandwidth is used efficiently.
- Kafka messages include a schema ID (magic byte + schema ID), so consumers can automatically deserialize without having the
.avsc
file locally. - Particularly useful when multiple teams publish messages to a single stream pipeline or topic.
- Prevents strange data from entering the pipeline.
Cons
- Must be deployed through a separate API server.
- Collaboration with the infrastructure team is required.
- An infrastructure layer like AWS Glue Schema Registry needs to be set up.
- If the Schema Registry goes down, the pipeline may stop, which can increase management points.
So when is it good to use?
- When setting up a project from the beginning
- When multiple teams share a single pipeline
- When there is a dedicated team in the company that specializes in Kafka
- When you are familiar with protobuf
Reference
- Confluent Schema Registry 도입기!. Schema Registry 도입을 위한 PoC 여정을 소개합니다! | by Suyeon Kim | YOGIYO Tech Blog - 요기요 기술블로그
- Kafka 와 Confluent Schema Registry 를 사용한 스키마 관리 #1
- Kafka 와 Confluent Schema Registry 를 사용한 스키마 관리 #2
- Kafka 와 Confluent Schema Registry 를 사용한 스키마 관리 #3
- Kafka with Schema Registry and Avro Serialization
- Kafka Schema Registry and JSON Schema: A Comprehensive Guide | Pravin on Software
- [번역] 에이브로(Avro), 프로토콜 버퍼(Protocol Buffers) 그리고 스리프트(Thrift)의 스키마 변경(evolution)