@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaSubscribeJobConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final KafkaProperties kafkaProperties;
@Bean
Job kafkaJob() {
return jobBuilderFactory.get("kafkaJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
@Bean
Step step1() {
return stepBuilderFactory.get("step1")
.<String, String>chunk(5)
.reader(kafkaItemReader())
.writer(items -> log.info("items: {}", items))
.build();
}
@Bean
KafkaItemReader<String, String> kafkaItemReader() {
Properties properties = new Properties();
properties.putAll(kafkaProperties.buildConsumerProperties());
return new KafkaItemReaderBuilder<String, String>()
.name("kafkaItemReader")
.topic("test")
.partitions(0)
.partitionOffsets(new HashMap<>())
.consumerProperties(properties)
.build();
}
}