メインコンテンツにスキップ

Kafkaスキーマレジストリ

· 8分の読み時間
Haril Song
Owner, Software Engineer at 42dot

問題点

  • メッセージ仕様が変更された場合、依存するモジュールやGitリポジトリごとにDTOの更新が必要です。
    • 下位互換性や上位互換性が頻繁に壊れます。
    • DTO管理の複雑さが線形的に増加します。
    • Javaは特にJSONベースのメッセージを扱うのに不便な点が多いです。
  • KafkaはByteArray形式でメッセージを送信しますが、アプリケーションレベルではこれをデシリアライズして管理することが推奨されます。
    • ペイロードにデータを格納するたびにByteArrayにシリアライズするプロセス、およびその逆のプロセスが毎回行われます。
    • コードの複雑さが増加します。
    • ByteArray - JSON - Object

ByteArray + DTO + ObjectMapper方式

KafkaメッセージがJSON形式であると仮定します:

data class User(
val id: String,
val name: String,
val email: String?,
val age: Int?,
val createdAt: Long
)

val rawBytes: ByteArray = record.value()
val user = objectMapper.readValue(rawBytes, User::class.java)
  • スキーマ情報をコードで直接管理(例:DTOクラス)
  • Kafkaメッセージの構造がJSON形式である必要があります
  • スキーマレジストリは不要
  • メッセージ構造が変わるとDTOも変更する必要があり、互換性チェックは手動

GenericRecord (Avro + スキーマレジストリ)

val record = consumerRecord.value() as GenericRecord
val name = record.get("name").toString()
  • DTOなしでも動作可能(GenericRecord)、または生成されたクラスを使用可能
  • メッセージ構造が変更された場合、レジストリの互換性ポリシーにより安全に進化可能

SpecificRecord (Avro + スキーマレジストリ)

// user.avsc
{
"type": "record",
"name": "User",
"fields": [...]
}
// 自動生成
public class User extends SpecificRecordBase implements SpecificRecord {
private String id;
private String name;
...
}
@KafkaListener(topics = ["\${kafka.topic.user}"], groupId = "\${spring.kafka.consumer.group-id}")
fun consume(user: User) {
val userId = user.getId()
logger.info("Received user with id: {}, name: {}", userId, user.getName())

users[userId] = user
}

コードが生成されているため、直接参照可能です。

  • 静的型サポート
    • シリアライズ/デシリアライズ時の安定性を保証
    • IDEのサポートが優れている
  • Kafkaスキーマレジストリと完全互換
  • 高性能
    • GenericRecordはリフレクションを利用するため比較的に遅い

スキーマの定義と使用

  • IntelliJ Junieを使用してサンプルを作成
plugins {
id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}

repositories {
mavenCentral()
maven {
url = uri("https://packages.confluent.io/maven/")
}
}

dependencies {
// Avro and Schema Registry
implementation("org.apache.avro:avro:1.11.3")
implementation("io.confluent:kafka-avro-serializer:7.5.1")
implementation("io.confluent:kafka-schema-registry-client:7.5.1")
}

avro {
isCreateSetters.set(true)
isCreateOptionalGetters.set(false)
isGettersReturnOptional.set(false)
fieldVisibility.set("PRIVATE")
outputCharacterEncoding.set("UTF-8")
stringType.set("String")
templateDirectory.set(null as String?)
isEnableDecimalLogicalType.set(true)
}

Userスキーマの定義

{
"namespace": "com.haril.kafkaschemaregistrydemo.schema",
"type": "record",
"name": "User",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "email",
"type": ["null", "string"],
"default": null
},
{
"name": "age",
"type": ["null", "int"],
"default": null
},
{
"name": "createdAt",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}

Userクラスが自動的に生成されたことが確認でき、

他のモジュールから参照して使用できます。

スキーマの更新

  • レジストリにスキーマ情報がない場合、Kafkaはメッセージが発行される際に接続されたスキーマレジストリにスキーマをアップロードします。
  • Web UIを使用して更新することもできます。

スキーマ互換性ポリシー

代表的なポリシーは以下の通りです:

モード説明
BACKWARD旧バージョンのコンシューマは新しいメッセージを理解可能フィールド追加可能、削除は不可
FORWARD新バージョンのコンシューマは古いメッセージを理解可能フィールド削除可能、追加は不可
FULL双方向で互換性あり限定的な変更のみ許可
NONEどの変更も互換性を保証しない変更時にコンシューマがクラッシュする危険性↑

BACKWARDポリシーを使用する場合、スキーマにフィールドを追加する際にデフォルト値を指定する必要があります。これにより、旧バージョンのスキーマを使用するコンシューマも安全にスキーマをデシリアライズできます。nullをデフォルトとして指定することも可能で、この場合はオプションフィールドであることを意味します。

Kafka StreamsはBACKWARDのみをサポートします。

FULLポリシーは双方向の互換性が維持されるため便利ですが、Kafka StreamsはBACKWARDポリシーのみをサポートするため、選択肢が限られます。

GenericRecord方式を使用する場合、スキーマは動的にロードされます。この場合、スキーマが変更されてもサービスの再デプロイは不要です。

ConsumerRecord<String, GenericRecord> record = ...
GenericRecord value = record.value();

Integer age = (Integer) value.get("age");
String name = value.get("name").toString();

props.put("specific.avro.reader", false)設定で有効にでき、Mapを使用するのと同様の体験を提供できます。

項目SpecificRecordGenericRecord
使用方法AvroスキーマからJava/Kotlinクラスを事前に生成実行時にスキーマを解析して動的に使用
パフォーマンス高速で型安全やや遅く、型安全性が低い
柔軟性スキーマ変更時にコードの再生成が必要スキーマ変更に柔軟に対応可能
推奨状況スキーマが固定されたサービススキーマが頻繁に変わるか多様な場合

以下のような使用状況では、GenericRecordの使用を検討できます:

  • 様々なスキーマを処理する必要があるKafkaコンシューマプラットフォーム
  • スキーマレジストリベースのマルチチーム環境(スキーマバージョンが頻繁に変わる場合)
  • Avroスキーマが外部で管理されており、内部でクラスを作成するのが困難な場合

プロデューサーでは明確なデータスキーマが必要なため、.avscファイルを通じてオブジェクトを生成し、コンシューマー側ではGenericRecordを使用して動的に対応する方法も有用です。

スキーマ管理とモニタリング

Landoop UI

スキーマの変更履歴を継続的に記録します。

Kafka UIでは、値がスキーマレジストリに変更されたことを確認できます。

結論

利点

  • 複数のネストされたDTOを扱う代わりに、1つのavscファイルだけを管理すればよいため、管理負担が比較的軽減されます。
  • すべてのサービスはスキーマレジストリからリアルタイムでスキーマを照会
    • メッセージにスキーマ情報が含まれないため、ネットワーク帯域幅を効率的に使用できます。
  • Kafkaメッセージには**スキーマID(マジックバイト + スキーマID)**が含まれるため、コンシューマはローカルに.avscがなくても自動的にデシリアライズできます。
  • 複数のチームが1つのストリームパイプラインやトピックにメッセージを発行する場合に特に有用です。
    • パイプラインに不正なデータが入るのを防ぎます。

欠点

  • 別のAPIサーバーを通じてデプロイする必要があります。
  • インフラチームとの協力が必要です。
  • スキーマレジストリがダウンした場合、パイプラインが停止する可能性があり、管理ポイントが増える可能性があります。

いつ使うのが良いか?

  • プロジェクトの初期段階で設定から始める場合
  • 複数のチームが1つのパイプラインを共有して使用する場合
  • 会社にKafkaを専門的に扱う別のチームがある場合
  • protobufに慣れている場合

参考文献