メインコンテンツにスキップ

[Spring Batch] KafkaItemReader

· 3分の読み時間
Haril Song
Owner, Software Engineer at 42dot
情報

この記事を書く前にDockerを使ってKafkaをインストールしましたが、その内容はここでは扱いません。

KafkaItemReaderとは..?

Spring Batchでは、Kafkaトピックからデータを処理するためにKafkaItemReaderが提供されています。

簡単なバッチジョブを作成してみましょう。

まず、必要な依存関係を追加します。

dependencies {
...
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.kafka:spring-kafka'
...
}

application.ymlにKafkaの設定を行います。

spring:
kafka:
bootstrap-servers:
- localhost:9092
consumer:
group-id: batch
@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaSubscribeJobConfig {

private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final KafkaProperties kafkaProperties;

@Bean
Job kafkaJob() {
return jobBuilderFactory.get("kafkaJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}

@Bean
Step step1() {
return stepBuilderFactory.get("step1")
.<String, String>chunk(5)
.reader(kafkaItemReader())
.writer(items -> log.info("items: {}", items))
.build();
}

@Bean
KafkaItemReader<String, String> kafkaItemReader() {
Properties properties = new Properties();
properties.putAll(kafkaProperties.buildConsumerProperties());

return new KafkaItemReaderBuilder<String, String>()
.name("kafkaItemReader")
.topic("test") // 1.
.partitions(0) // 2.
.partitionOffsets(new HashMap<>()) // 3.
.consumerProperties(properties) // 4.
.build();
}
}
  1. データを読み取るトピックを指定します。
  2. トピックのパーティションを指定します。複数のパーティションを指定することも可能です。
  3. KafkaItemReaderでオフセットを指定しない場合、オフセット0から読み取ります。空のマップを提供すると、最後のオフセットから読み取ります。
  4. 実行に必要なプロパティを設定します。
ヒント

KafkaPropertiesは、SpringでKafkaを便利に使用するためのさまざまな公開インターフェースを提供します。

試してみる

さて、バッチジョブを実行すると、application.ymlの情報に基づいてconsumer groupsが自動的に作成され、ジョブがトピックの購読を開始します。

kafka console producerを使って、testトピックに1から10までのデータを追加してみましょう。

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

produce-topic

バッチジョブがトピックを正常に購読していることがわかります。

subscribe-batch

chunkSizeを5に設定したので、データは5件ずつバッチ処理されます。

ここまで、Spring BatchでのKafkaItemReaderの基本的な使い方を見てきました。次に、テストコードの書き方を見てみましょう。

Qodanaで簡単に静的コード解析を行う方法

· 3分の読み時間
Haril Song
Owner, Software Engineer at 42dot

Qodanaとは?

Qodanaは、JetBrainsが提供するコード品質向上ツールです。非常に使いやすいので、簡単に紹介したいと思います。

まず、Dockerがインストールされた環境が必要です。

docker run --rm -it -p 8080:8080 \
-v <source-directory>/:/data/project/ \
-v <output-directory>/:/data/results/ \
jetbrains/qodana-jvm --show-report

私はJavaアプリケーションを解析しているので、jvmイメージを使用しました。別の言語を使用している場合は、Qodanaのウェブサイトで適切なイメージを見つけることができます。

  • <source-directory>には解析したいプロジェクトのパスを置き換えてください。
  • <output-directory>には解析結果を保存するパスを入力してください。これについては後ほど説明します。

解析結果を保存するために、ルートディレクトリにqodanaというフォルダを作成しました。

mkdir ~/qodana
# そして<output-directory>を~/qodanaに置き換えます。

次に、上記のdocker run ~コマンドを実行し、しばらく待つと以下のような結果が表示されます。

私はテスト用にシンプルなJavaアプリケーションを使用しました。

image

今、http://localhost:8080 にアクセスすると、コード解析の結果を見ることができます。

image1

Dockerがインストールされていれば、現在のプロジェクトのコード解析結果を簡単に取得できます。

このような解析ツールはコードレビューの一形態として機能し、レビュアーの疲労を軽減し、より詳細なレビューに集中できるようにします。このようなコード品質管理ツールを積極的に活用することで、非常に便利な開発体験を得ることができます。

[Spring Batch] カスタム制約ライターの実装

· 6分の読み時間
Haril Song
Owner, Software Engineer at 42dot

状況 🧐

最近、特定のロジックのために PostgreSQLUpsert を使用するバッチプロセスを設計しました。実装中に、ビジネス要件の変更により、複合一意条件に特定のカラムを追加する必要がありました。

問題は、複合一意カラムの一意制約が、特定のカラムに null 値が含まれている場合に重複を防止しないことから発生しました。

問題の状況を例で見てみましょう。

create table student
(
id integer not null
constraint student_pk
primary key,
name varchar,
major varchar,
constraint student_unique
unique (name, major)
);
idnamemajor
1songkorean
2kimenglish
3parkmath
4kimNULL
5kimNULL

null の重複を避けるために、ダミーデータを挿入するというアイデアが自然に浮かびましたが、データベースに意味のないデータを保存するのは気が進みませんでした。特に、null が発生するカラムが UUID のような複雑なデータを保存する場合、他の値の中に埋もれた意味のない値を識別するのは非常に困難です。

少し面倒ではありますが、unique partial index を使用することで、ダミーデータを挿入せずに null 値を許可しないようにすることができます。私は、たとえ挑戦的であっても、最も理想的な解決策を追求することにしました。

解決策

部分インデックス

CREATE UNIQUE INDEX stu_2col_uni_idx ON student (name, major)
WHERE major IS NOT NULL;

CREATE UNIQUE INDEX stu_1col_uni_idx ON student (name)
WHERE major IS NULL;

PostgreSQL は部分インデックスの機能を提供しています。

部分インデックス : 特定の条件が満たされた場合にのみインデックスを作成する機能。インデックスの範囲を絞ることで、効率的なインデックス作成とメンテナンスが可能になります。

name のみの値が挿入される場合、stu_1col_uni_idxmajornull の同じ name を持つ行を1行のみ許可します。2つの補完的なインデックスを作成することで、特定のカラムに null 値が含まれる重複を巧妙に防ぐことができます。

duplicate error major がない値を保存しようとするとエラーが発生します

しかし、このように2つの一意制約がある場合、Upsert 実行中に1つの制約チェックしか許可されないため、バッチは意図した通りに実行されませんでした。

多くの検討の末、SQLを実行する前に特定の値が欠落しているかどうかを確認し、条件を満たすSQLを実行することにしました。

SelectConstraintWriter の実装

public class SelectConstraintWriter extends JdbcBatchItemWriter<Student> {

@Setter
private String anotherSql;

@Override
public void write(List<? extends Student> items) {
if (items.isEmpty()) {
return;
}

List<? extends Student> existMajorStudents = items.stream()
.filter(student -> student.getMajor() != null)
.collect(toList());

List<? extends Student> nullMajorStudents = items.stream()
.filter(student -> student.getMajor() == null)
.collect(toList());

executeSql(existMajorStudents, sql);
executeSql(nullMajorStudents, anotherSql);
}

private void executeSql(List<? extends student> students, String sql) {
if (logger.isDebugEnabled()) {
logger.debug("Executing batch with " + students.size() + " items.");
}

int[] updateCounts;

if (usingNamedParameters) {
if (this.itemSqlParameterSourceProvider == null) {
updateCounts = namedParameterJdbcTemplate.batchUpdate(sql, students.toArray(new Map[students.size()]));
} else {
SqlParameterSource[] batchArgs = new SqlParameterSource[students.size()];
int i = 0;
for (student item : students) {
batchArgs[i++] = itemSqlParameterSourceProvider.createSqlParameterSource(item);
}
updateCounts = namedParameterJdbcTemplate.batchUpdate(sql, batchArgs);
}
} else {
updateCounts = namedParameterJdbcTemplate.getJdbcOperations().execute(sql,
(PreparedStatementCallback<int[]>) ps -> {
for (student item : students) {
itemPreparedStatementSetter.setValues(item, ps);
ps.addBatch();
}
return ps.executeBatch();
});
}

if (assertUpdates) {
for (int i = 0; i < updateCounts.length; i++) {
int value = updateCounts[i];
if (value == 0) {
throw new EmptyResultDataAccessException("Item " + i + " of " + updateCounts.length
+ " did not update any rows: [" + students.get(i) + "]", 1);
}
}
}
}
}

以前使用していた JdbcBatchItemWriterwrite メソッドをオーバーライドすることでこれを実装しました。コード内で major の存在を確認し、適切なSQLを選択して実行することで、duplicateKeyException に遭遇することなく Upsert ステートメントが正しく動作するようにします。

使用例は以下の通りです:

@Bean
SelectConstraintWriter studentItemWriter() {
String sql1 =
"INSERT INTO student(id, name, major) "
+ "VALUES (nextval('hibernate_sequence'), :name, :major) "
+ "ON CONFLICT (name, major) WHERE major IS NOT NULL "
+ "DO UPDATE "
+ "SET name = :name, "
+ " major = :major";

String sql2 =
"INSERT INTO student(id, name, major) "
+ "VALUES (nextval('hibernate_sequence'), :name, :major) "
+ "ON CONFLICT (name) WHERE major IS NULL "
+ "DO UPDATE "
+ "SET name = :name, "
+ " major = :major";

SelectConstraintWriter writer = new SelectConstraintWriter();
writer.setSql(sql1);
writer.setAnotherSql(sql2);
writer.setDataSource(dataSource);
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.afterPropertiesSet();
return writer;
}

結論

PostgreSQLUpsert 実行中に複数の制約チェックを許可していれば、ここまでの手間をかける必要はなかったのは残念です。将来のバージョンでの更新を期待しています。


参考

create unique constraint with null columns

[Kotlin] 中置関数

· 2分の読み時間
Haril Song
Owner, Software Engineer at 42dot

Kotlinでは、中置関数と呼ばれる関数の定義方法があります。これは、Javaを主要な言語として使用していた時には想像もできなかった構文です。Kotlinを始めたばかりの方に向けて、これを紹介しましょう。

単一のパラメータを持つメンバー関数は、中置関数に変換することができます。

中置関数の代表的な例の一つに、標準ライブラリに含まれている to 関数があります。

val pair = "Ferrari" to "Katrina"
println(pair)
// (Ferrari, Katrina)

必要に応じて、to のような新しい中置関数を定義することもできます。例えば、Int を次のように拡張することができます:

infix fun Int.times(str: String) = str.repeat(this)
println(2 times "Hello ")
// Hello Hello

to を新しい中置関数 onto として再定義したい場合は、次のように書くことができます:

infix fun String.onto(other: String) = Pair(this, other)
val myPair = "McLaren" onto "Lucas"
println(myPair)
// (McLaren, Lucas)

このようなKotlinの構文により、非常に独特なコーディング方法が可能になります。

class Person(val name: String) {
val likedPeople = mutableListOf<Person>()

infix fun likes(other: Person) {
likedPeople.add(other)
}
}

fun main() {
val sophia = Person("Sophia")
val claudia = Person("Claudia")

sophia likes claudia // !!
}

参考

Kotlin Docs

[Kotlin] 拡張されたループ

· 3分の読み時間
Haril Song
Owner, Software Engineer at 42dot

Kotlinでは、Javaに比べてはるかにシンプルで便利なループを書くことができます。どのように使うか見てみましょう。

1. .. 演算子

val fruits = listOf("Apple", "Banana", "Cherry", "Durian")

fun main() {
for (index in 0..fruits.size - 1) {
val fruit = fruits[index]
println("$index: $fruit")
}
}

.. を使うと、1ずつインクリメントする従来のループが作成されます。

2. downTo

val fruits = listOf("Apple", "Banana", "Cherry", "Durian")

fun main() {
for (index in fruits.size - 1 downTo 0) {
val fruit = fruits[index]
println("$index: $fruit")
}
}

downTo を使うと、期待通りにデクリメントするループが作成されます。

3. step

val fruits = listOf("Apple", "Banana", "Cherry", "Durian")

fun main() {
for (index in 0..fruits.size - 1 step 2) {
val fruit = fruits[index]
println("$index: $fruit")
}
}

step キーワードを使うと、特定の数の要素をスキップするループを実装できます。これは downTo にも適用されます。

4. until

val fruits = listOf("Apple", "Banana", "Cherry", "Durian")

fun main() {
for (index in 0 until fruits.size) {
val fruit = fruits[index]
println("$index: $fruit")
}
}

until を使うと、最後の数を含まないループが作成され、-1 を使う必要がなくなります。

5. lastIndex

val fruits = listOf("Apple", "Banana", "Cherry", "Durian")

fun main() {
for (index in 0 .. fruits.lastIndex) {
val fruit = fruits[index]
println("$index: $fruit")
}
}

lastIndex プロパティを使うと、ループが読みやすくなります。しかし、もちろんまだ他にも方法があります。

6. indices

val fruits = listOf("Apple", "Banana", "Cherry", "Durian")

fun main() {
for (index in fruits.indices) {
val fruit = fruits[index]
println("$index: $fruit")
}
}

indices はコレクションのインデックス範囲を返します。

7. withIndex()

val fruits = listOf("Apple", "Banana", "Cherry", "Durian")

fun main() {
for ((index, fruit) in fruits.withIndex()) {
println("$index: $fruit")
}
}

withIndex() を使うと、インデックスと値を同時に抽出でき、コードがシンプルになります。これはPythonのシンプルさに似ています。これでほとんどのループシナリオに対応できますが、もう一つ方法があります。

8. forEachIndexed

val fruits = listOf("Apple", "Banana", "Cherry", "Durian")

fun main() {
fruits.forEachIndexed { index, fruit ->
println("$index: $fruit")
}
}

forEachIndexed にラムダ関数を使うと、コードがより簡潔で直感的になります。ニーズに合った適切な方法を選びましょう。


参考

Kotlin Tips: Loops

Kotlinで地球の楕円体を利用する

· 4分の読み時間
Haril Song
Owner, Software Engineer at 42dot

背景

earth 画像の参照1

地球が平らでも完全な球体でもなく、不規則な楕円体であることを考えると、異なる経度と緯度の2点間の距離を迅速かつ正確に計算するための完璧な公式は存在しません。

しかし、geotoolsライブラリを使用することで、数学的に補正された近似値を簡単に取得することができます。

依存関係の追加

geotoolsで地球の楕円体を使用するには、関連するライブラリの依存関係を追加する必要があります。

repositories {
maven { url "https://repo.osgeo.org/repository/release/" }
maven { url "https://download.osgeo.org/webdav/geotools/" }
mavenCentral()
}

dependencies {
...
implementation 'org.geotools:gt-referencing:26.2'
...
}

コードの記述

まず、ソウルと釜山の座標をenumクラスとして定義します。

enum class City(val latitude: Double, val longitude: Double) {
SEOUL(37.5642135, 127.0016985),
BUSAN(35.1104, 129.0431);
}

次に、テストコードを通じて簡単な使用例を見てみましょう。

class EllipsoidTest {

@Test
internal fun createEllipsoid() {
val ellipsoid = DefaultEllipsoid.WGS84 // GPSで使用されるWGS84測地系を使用して、地球に最も近い楕円体を作成

val isSphere = ellipsoid.isSphere // 球体か楕円体かを判定
val semiMajorAxis = ellipsoid.semiMajorAxis // 赤道半径、楕円体の長い半径
val semiMinorAxis = ellipsoid.semiMinorAxis // 極半径、楕円体の短い半径
val eccentricity = ellipsoid.eccentricity // 離心率、楕円体が球体にどれだけ近いかを示す
val inverseFlattening = ellipsoid.inverseFlattening // 逆扁平率の値
val ivfDefinitive = ellipsoid.isIvfDefinitive // この楕円体に対して逆扁平率が決定的かどうかを示す

// 大円距離
val orthodromicDistance = ellipsoid.orthodromicDistance(
City.SEOUL.longitude,
City.SEOUL.latitude,
City.BUSAN.longitude,
City.BUSAN.latitude
)

println("isSphere = $isSphere")
println("semiMajorAxis = $semiMajorAxis")
println("semiMinorAxis = $semiMinorAxis")
println("eccentricity = $eccentricity")
println("inverseFlattening = $inverseFlattening")
println("ivfDefinitive = $ivfDefinitive")
println("orthodromicDistance = $orthodromicDistance")
}
}
isSphere = false
semiMajorAxis = 6378137.0
semiMinorAxis = 6356752.314245179
eccentricity = 0.08181919084262128
inverseFlattening = 298.257223563
ivfDefinitive = true
orthodromicDistance = 328199.9794919944

DefaultEllipsoid.WGS84を使用して地球の楕円体を作成できます。WGS84の代わりにSPHEREを使用すると、半径6371kmの球体が作成されます。

距離の結果はメートル(m)で表示されるため、キロメートルに変換すると約328kmになります。Googleで検索すると325kmと表示されるかもしれませんが、私が選んだ座標とGoogleが選んだ座標の違いを考慮すると、これは悪くない数字です。

他にも多くの機能がありますが、すべてをこの投稿でカバーするのは難しいため、必要に応じて別の投稿で取り上げます。

情報

ビジネス要件によっては誤差が満足できない場合があるため、実際の実装前にgeotoolsの他の方法を十分にテストしてください。


Footnotes

  1. SRIDと座標系の概要

@JsonNamingの使い方

· 3分の読み時間
Haril Song
Owner, Software Engineer at 42dot

APIで使用されるJSONの命名規則が、アプリケーション内の命名戦略と異なる場合があります。

{
"Title": "Frozen",
"Year": "2013",
"Type": "movie",
"Poster": "https://m.media-amazon.com/images/M/MV5BMTQ1MjQwMTE5OF5BMl5BanBnXkFtZTgwNjk3MTcyMDE@._V1_SX300.jpg",
"imdbID": "tt2294629"
}
private String title;
private String year;
private String imdbId;
private String type;
private String poster;

変数名がJSONのキーと一致しない場合、データは正しくマッピングされません。

このような場合、プロジェクト内の変数名を変更せずにデータをマッピングするために@JsonProperty(value)を使用できます。しかし、多くのフィールドが異なる命名戦略を持つ場合、各フィールドに@JsonProperty(value)を使用すると、コードが多くのアノテーションで乱雑になります。

ここで@JsonNamingアノテーションが役立ちます。クラス全体の命名戦略を一度に変更することができます。

@JsonNaming

v2.12以前

以下のようにエレガントに解決できます:

@Data
@JsonNaming(value = PropertyNamingStrategy.UpperCamelCaseStrategy.class)
public class Movie {

private String title;
private String year;

@JsonProperty("imdbID") // 必要な場合のみ!
private String imdbId;
private String type;
private String poster;

}

image この方法は非推奨で、取り消し線が引かれています。

しかし、この方法はJackson 2.12以降非推奨となったため、新しいアプローチを見てみましょう。

v2.12以降

バージョン2.12以降では、PropertyNamingStrategiesを使用する必要があります。

@JsonNaming(value = PropertyNamingStrategies.UpperCamelCaseStrategy.class)

内部実装の詳細な説明は長くなりすぎるため割愛しますが、非常に興味深い実装となっているので、一度見てみることをお勧めします!

情報

簡単に言うと、更新された内部実装にはNamingBaseという抽象クラスが含まれており、これは元のPropertyNamingStrategyを継承し、その後命名戦略がNamingBaseを継承します。NamingBaseは中間実装クラスとして使用されます。