Skip to main content

2 posts tagged with "spring-batch"

View All Tags

[Spring Batch] KafkaItemReader

· 2 min read
info

I used Docker to install Kafka before writing this post, but that content is not covered here.

What is KafkaItemReader..?

In Spring Batch, the KafkaItemReader is provided for processing data from Kafka topics.

Let's create a simple batch job.

Example

First, add the necessary dependencies.

dependencies {
...
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.kafka:spring-kafka'
...
}

Configure Kafka settings in application.yml.

spring:
kafka:
bootstrap-servers:
- localhost:9092
consumer:
group-id: batch
@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaSubscribeJobConfig {

private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final KafkaProperties kafkaProperties;

@Bean
Job kafkaJob() {
return jobBuilderFactory.get("kafkaJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}

@Bean
Step step1() {
return stepBuilderFactory.get("step1")
.<String, String>chunk(5)
.reader(kafkaItemReader())
.writer(items -> log.info("items: {}", items))
.build();
}

@Bean
KafkaItemReader<String, String> kafkaItemReader() {
Properties properties = new Properties();
properties.putAll(kafkaProperties.buildConsumerProperties());

return new KafkaItemReaderBuilder<String, String>()
.name("kafkaItemReader")
.topic("test") // 1.
.partitions(0) // 2.
.partitionOffsets(new HashMap<>()) // 3.
.consumerProperties(properties) // 4.
.build();
}
}
  1. Specify the topic from which to read the data.
  2. Specify the partition of the topic; multiple partitions can be specified.
  3. If no offset is specified in KafkaItemReader, it reads from offset 0. Providing an empty map reads from the last offset.
  4. Set the essential properties for execution.
tip

KafkaProperties provides various public interfaces to conveniently use Kafka in Spring.

Try it out

Now, when you run the batch job, consumer groups are automatically created based on the information in application.yml, and the job starts subscribing to the topic.

Let's use the kafka console producer to add data from 1 to 10 to the test topic.

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

produce-topic

You can see that the batch job is successfully subscribing to the topic.

subscribe-batch

Since we set the chunkSize to 5, the data is processed in batches of 5.

So far, we have looked at the basic usage of KafkaItemReader in Spring Batch. Next, let's see how to write test code.

[Spring Batch] Implementing Custom Constraint Writer

· 4 min read

Situation 🧐

Recently, I designed a batch process that uses Upsert in PostgreSQL for a specific logic. During implementation, due to a change in business requirements, I had to add a specific column to a composite unique condition.

The issue arose from the fact that the unique constraint of the composite unique column does not prevent duplicates with null values in a specific column.

Let's take a look at an example of the problematic situation.

create table student
(
id integer not null
constraint student_pk
primary key,
name varchar,
major varchar,
constraint student_unique
unique (name, major)
);
idnamemajor
1songkorean
2kimenglish
3parkmath
4kimNULL
5kimNULL

To avoid allowing null duplicates, the idea of inserting dummy data naturally came to mind, but I felt reluctant to store meaningless data in the database. Especially if the column where null occurs stores complex data like UUID, it would be very difficult to identify meaningless values buried among other values.

Although it may be a bit cumbersome, using a unique partial index allows us to disallow null values without inserting dummy data. I decided to pursue the most ideal solution, even if it is challenging.

Solution

Partial Index

CREATE UNIQUE INDEX stu_2col_uni_idx ON student (name, major)
WHERE major IS NOT NULL;

CREATE UNIQUE INDEX stu_1col_uni_idx ON student (name)
WHERE major IS NULL;

PostgreSQL provides the functionality of partial indexes.

Partial Index : A feature that creates an index only when certain conditions are met. It allows for efficient index creation and maintenance by narrowing the scope of the index.

When a value with only name is inserted, stu_1col_uni_idx allows only one row with the same name where major is null. By creating two complementary indexes, we can skillfully prevent duplicates with null values in a specific column.

duplicate error An error occurs when trying to store a value without major

However, when there are two unique constraints like this, since only one constraint check is allowed during Upsert execution, the batch did not run as intended.

After much deliberation, I decided to check if a specific value is missing before executing the SQL and then execute the SQL that meets the conditions.

Implementing SelectConstraintWriter

public class SelectConstraintWriter extends JdbcBatchItemWriter<Student> {

@Setter
private String anotherSql;

@Override
public void write(List<? extends Student> items) {
if (items.isEmpty()) {
return;
}

List<? extends Student> existMajorStudents = items.stream()
.filter(student -> student.getMajor() != null)
.collect(toList());

List<? extends Student> nullMajorStudents = items.stream()
.filter(student -> student.getMajor() == null)
.collect(toList());

executeSql(existMajorStudents, sql);
executeSql(nullMajorStudents, anotherSql);
}

private void executeSql(List<? extends student> students, String sql) {
if (logger.isDebugEnabled()) {
logger.debug("Executing batch with " + students.size() + " items.");
}

int[] updateCounts;

if (usingNamedParameters) {
if (this.itemSqlParameterSourceProvider == null) {
updateCounts = namedParameterJdbcTemplate.batchUpdate(sql, students.toArray(new Map[students.size()]));
} else {
SqlParameterSource[] batchArgs = new SqlParameterSource[students.size()];
int i = 0;
for (student item : students) {
batchArgs[i++] = itemSqlParameterSourceProvider.createSqlParameterSource(item);
}
updateCounts = namedParameterJdbcTemplate.batchUpdate(sql, batchArgs);
}
} else {
updateCounts = namedParameterJdbcTemplate.getJdbcOperations().execute(sql,
(PreparedStatementCallback<int[]>) ps -> {
for (student item : students) {
itemPreparedStatementSetter.setValues(item, ps);
ps.addBatch();
}
return ps.executeBatch();
});
}

if (assertUpdates) {
for (int i = 0; i < updateCounts.length; i++) {
int value = updateCounts[i];
if (value == 0) {
throw new EmptyResultDataAccessException("Item " + i + " of " + updateCounts.length
+ " did not update any rows: [" + students.get(i) + "]", 1);
}
}
}
}
}

I implemented this by overriding the write method of the JdbcBatchItemWriter that was previously used. By checking the presence of major in the code and selecting and executing the appropriate SQL, we can ensure that the Upsert statement works correctly instead of encountering a duplicateKeyException.

Here is an example of usage:

@Bean
SelectConstraintWriter studentItemWriter() {
String sql1 =
"INSERT INTO student(id, name, major) "
+ "VALUES (nextval('hibernate_sequence'), :name, :major) "
+ "ON CONFLICT (name, major) WHERE major IS NOT NULL "
+ "DO UPDATE "
+ "SET name = :name, "
+ " major = :major";

String sql2 =
"INSERT INTO student(id, name, major) "
+ "VALUES (nextval('hibernate_sequence'), :name, :major) "
+ "ON CONFLICT (name) WHERE major IS NULL "
+ "DO UPDATE "
+ "SET name = :name, "
+ " major = :major";

SelectConstraintWriter writer = new SelectConstraintWriter();
writer.setSql(sql1);
writer.setAnotherSql(sql2);
writer.setDataSource(dataSource);
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.afterPropertiesSet();
return writer;
}

Conclusion

It's regrettable that if PostgreSQL allowed multiple constraint checks during Upsert execution, we wouldn't have needed to go to such lengths. I hope for updates in future versions.


Reference

create unique constraint with null columns