メインコンテンツにスキップ

2件の投稿件の投稿が「spring-batch」タグ付き

すべてのタグを見る

[Spring Batch] KafkaItemReader

· 3分の読み時間
Haril Song
Owner, Software Engineer at 42dot
情報

この記事を書く前にDockerを使ってKafkaをインストールしましたが、その内容はここでは扱いません。

KafkaItemReaderとは..?

Spring Batchでは、Kafkaトピックからデータを処理するためにKafkaItemReaderが提供されています。

簡単なバッチジョブを作成してみましょう。

まず、必要な依存関係を追加します。

dependencies {
...
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.kafka:spring-kafka'
...
}

application.ymlにKafkaの設定を行います。

spring:
kafka:
bootstrap-servers:
- localhost:9092
consumer:
group-id: batch
@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaSubscribeJobConfig {

private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final KafkaProperties kafkaProperties;

@Bean
Job kafkaJob() {
return jobBuilderFactory.get("kafkaJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}

@Bean
Step step1() {
return stepBuilderFactory.get("step1")
.<String, String>chunk(5)
.reader(kafkaItemReader())
.writer(items -> log.info("items: {}", items))
.build();
}

@Bean
KafkaItemReader<String, String> kafkaItemReader() {
Properties properties = new Properties();
properties.putAll(kafkaProperties.buildConsumerProperties());

return new KafkaItemReaderBuilder<String, String>()
.name("kafkaItemReader")
.topic("test") // 1.
.partitions(0) // 2.
.partitionOffsets(new HashMap<>()) // 3.
.consumerProperties(properties) // 4.
.build();
}
}
  1. データを読み取るトピックを指定します。
  2. トピックのパーティションを指定します。複数のパーティションを指定することも可能です。
  3. KafkaItemReaderでオフセットを指定しない場合、オフセット0から読み取ります。空のマップを提供すると、最後のオフセットから読み取ります。
  4. 実行に必要なプロパティを設定します。
ヒント

KafkaPropertiesは、SpringでKafkaを便利に使用するためのさまざまな公開インターフェースを提供します。

試してみる

さて、バッチジョブを実行すると、application.ymlの情報に基づいてconsumer groupsが自動的に作成され、ジョブがトピックの購読を開始します。

kafka console producerを使って、testトピックに1から10までのデータを追加してみましょう。

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

produce-topic

バッチジョブがトピックを正常に購読していることがわかります。

subscribe-batch

chunkSizeを5に設定したので、データは5件ずつバッチ処理されます。

ここまで、Spring BatchでのKafkaItemReaderの基本的な使い方を見てきました。次に、テストコードの書き方を見てみましょう。

[Spring Batch] カスタム制約ライターの実装

· 6分の読み時間
Haril Song
Owner, Software Engineer at 42dot

状況 🧐

最近、特定のロジックのために PostgreSQLUpsert を使用するバッチプロセスを設計しました。実装中に、ビジネス要件の変更により、複合一意条件に特定のカラムを追加する必要がありました。

問題は、複合一意カラムの一意制約が、特定のカラムに null 値が含まれている場合に重複を防止しないことから発生しました。

問題の状況を例で見てみましょう。

create table student
(
id integer not null
constraint student_pk
primary key,
name varchar,
major varchar,
constraint student_unique
unique (name, major)
);
idnamemajor
1songkorean
2kimenglish
3parkmath
4kimNULL
5kimNULL

null の重複を避けるために、ダミーデータを挿入するというアイデアが自然に浮かびましたが、データベースに意味のないデータを保存するのは気が進みませんでした。特に、null が発生するカラムが UUID のような複雑なデータを保存する場合、他の値の中に埋もれた意味のない値を識別するのは非常に困難です。

少し面倒ではありますが、unique partial index を使用することで、ダミーデータを挿入せずに null 値を許可しないようにすることができます。私は、たとえ挑戦的であっても、最も理想的な解決策を追求することにしました。

解決策

部分インデックス

CREATE UNIQUE INDEX stu_2col_uni_idx ON student (name, major)
WHERE major IS NOT NULL;

CREATE UNIQUE INDEX stu_1col_uni_idx ON student (name)
WHERE major IS NULL;

PostgreSQL は部分インデックスの機能を提供しています。

部分インデックス : 特定の条件が満たされた場合にのみインデックスを作成する機能。インデックスの範囲を絞ることで、効率的なインデックス作成とメンテナンスが可能になります。

name のみの値が挿入される場合、stu_1col_uni_idxmajornull の同じ name を持つ行を1行のみ許可します。2つの補完的なインデックスを作成することで、特定のカラムに null 値が含まれる重複を巧妙に防ぐことができます。

duplicate error major がない値を保存しようとするとエラーが発生します

しかし、このように2つの一意制約がある場合、Upsert 実行中に1つの制約チェックしか許可されないため、バッチは意図した通りに実行されませんでした。

多くの検討の末、SQLを実行する前に特定の値が欠落しているかどうかを確認し、条件を満たすSQLを実行することにしました。

SelectConstraintWriter の実装

public class SelectConstraintWriter extends JdbcBatchItemWriter<Student> {

@Setter
private String anotherSql;

@Override
public void write(List<? extends Student> items) {
if (items.isEmpty()) {
return;
}

List<? extends Student> existMajorStudents = items.stream()
.filter(student -> student.getMajor() != null)
.collect(toList());

List<? extends Student> nullMajorStudents = items.stream()
.filter(student -> student.getMajor() == null)
.collect(toList());

executeSql(existMajorStudents, sql);
executeSql(nullMajorStudents, anotherSql);
}

private void executeSql(List<? extends student> students, String sql) {
if (logger.isDebugEnabled()) {
logger.debug("Executing batch with " + students.size() + " items.");
}

int[] updateCounts;

if (usingNamedParameters) {
if (this.itemSqlParameterSourceProvider == null) {
updateCounts = namedParameterJdbcTemplate.batchUpdate(sql, students.toArray(new Map[students.size()]));
} else {
SqlParameterSource[] batchArgs = new SqlParameterSource[students.size()];
int i = 0;
for (student item : students) {
batchArgs[i++] = itemSqlParameterSourceProvider.createSqlParameterSource(item);
}
updateCounts = namedParameterJdbcTemplate.batchUpdate(sql, batchArgs);
}
} else {
updateCounts = namedParameterJdbcTemplate.getJdbcOperations().execute(sql,
(PreparedStatementCallback<int[]>) ps -> {
for (student item : students) {
itemPreparedStatementSetter.setValues(item, ps);
ps.addBatch();
}
return ps.executeBatch();
});
}

if (assertUpdates) {
for (int i = 0; i < updateCounts.length; i++) {
int value = updateCounts[i];
if (value == 0) {
throw new EmptyResultDataAccessException("Item " + i + " of " + updateCounts.length
+ " did not update any rows: [" + students.get(i) + "]", 1);
}
}
}
}
}

以前使用していた JdbcBatchItemWriterwrite メソッドをオーバーライドすることでこれを実装しました。コード内で major の存在を確認し、適切なSQLを選択して実行することで、duplicateKeyException に遭遇することなく Upsert ステートメントが正しく動作するようにします。

使用例は以下の通りです:

@Bean
SelectConstraintWriter studentItemWriter() {
String sql1 =
"INSERT INTO student(id, name, major) "
+ "VALUES (nextval('hibernate_sequence'), :name, :major) "
+ "ON CONFLICT (name, major) WHERE major IS NOT NULL "
+ "DO UPDATE "
+ "SET name = :name, "
+ " major = :major";

String sql2 =
"INSERT INTO student(id, name, major) "
+ "VALUES (nextval('hibernate_sequence'), :name, :major) "
+ "ON CONFLICT (name) WHERE major IS NULL "
+ "DO UPDATE "
+ "SET name = :name, "
+ " major = :major";

SelectConstraintWriter writer = new SelectConstraintWriter();
writer.setSql(sql1);
writer.setAnotherSql(sql2);
writer.setDataSource(dataSource);
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.afterPropertiesSet();
return writer;
}

結論

PostgreSQLUpsert 実行中に複数の制約チェックを許可していれば、ここまでの手間をかける必要はなかったのは残念です。将来のバージョンでの更新を期待しています。


参考

create unique constraint with null columns