AdditionalSkillBatchLoader.java

package fr.avenirsesr.portfolio.additionalskill.infrastructure.batch;

import fr.avenirsesr.portfolio.additionalskill.domain.model.AdditionalSkill;
import fr.avenirsesr.portfolio.additionalskill.domain.port.input.RomeAdditionalSkillService;
import fr.avenirsesr.portfolio.additionalskill.domain.port.output.RomeAdditionalSkillApi;
import fr.avenirsesr.portfolio.additionalskill.infrastructure.adapter.mapper.AdditionalSkillMapper;
import fr.avenirsesr.portfolio.additionalskill.infrastructure.adapter.model.Competence;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Predicate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.transaction.PlatformTransactionManager;

@Slf4j
@Configuration
@Profile("!test")
@RequiredArgsConstructor
public class AdditionalSkillBatchLoader {
  private final RomeAdditionalSkillApi romeAdditionalSkillApi;
  private final RomeAdditionalSkillService romeAdditionalSkillService;

  @Bean
  public Job importROME4SkillJob(JobRepository jobRepository, Flow importROME4SkillFlow) {
    return new JobBuilder("importROME4SkillJob", jobRepository)
        .incrementer(new RunIdIncrementer())
        .start(importROME4SkillFlow)
        .end()
        .listener(jobResultListener())
        .build();
  }

  @Bean
  public Flow importROME4SkillFlow(
      Step checkROME4VersionUpdateStep, Step cleanROME4SkillStep, Step importROME4SkillStep) {
    return new FlowBuilder<SimpleFlow>("importROME4SkillFlow")
        .start(checkROME4VersionUpdateStep)
        .on("NOOP")
        .end()
        .from(checkROME4VersionUpdateStep)
        .on("*")
        .to(cleanROME4SkillStep)
        .next(importROME4SkillStep)
        .build();
  }

  @Bean
  public Step checkROME4VersionUpdateStep(
      JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("checkROME4VersionUpdateStep", jobRepository)
        .tasklet(
            (contribution, chunkContext) -> {
              boolean isNewVersion = romeAdditionalSkillService.checkRomeVersionUpdated();

              if (!isNewVersion) {
                log.info(
                    "checkROME4VersionUpdateStep (NOOP) because there are no updates to ROME 4.0");
                contribution.setExitStatus(ExitStatus.NOOP);
              } else {
                log.info(
                    "checkROME4VersionUpdateStep (COMPLETED) because there are updates to ROME"
                        + " 4.0");
              }

              return RepeatStatus.FINISHED;
            },
            transactionManager)
        .build();
  }

  @Bean
  public Step cleanROME4SkillStep(
      JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("cleanROME4SkillStep", jobRepository)
        .tasklet(
            (contribution, chunkContext) -> {
              romeAdditionalSkillService.cleanAndCreateAdditionalSkillIndex();
              return RepeatStatus.FINISHED;
            },
            transactionManager)
        .build();
  }

  @Bean
  public Step importROME4SkillStep(
      JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("importROME4SkillStep", jobRepository)
        .<Competence, AdditionalSkill>chunk(100, transactionManager)
        .reader(itemReader())
        .processor(itemProcessor())
        .writer(itemWriter())
        .faultTolerant()
        .skipPolicy(
            (throwable, skipCount) -> {
              log.error("Error while importing skills", throwable);
              return throwable instanceof RuntimeException;
            })
        .listener(skipListener())
        .build();
  }

  @Bean
  public ItemReader<Competence> itemReader() {
    return new ItemReader<>() {
      private Iterator<Competence> iterator;

      @Override
      public Competence read() {
        if (iterator == null) {
          try {
            List<Competence> data = romeAdditionalSkillApi.fetchAdditionalSkills();
            iterator = data.iterator();
          } catch (Exception e) {
            log.error("Error ROME4.0 API : {}", e.getMessage());
            iterator = Collections.emptyIterator();
          }
        }
        return iterator.hasNext() ? iterator.next() : null;
      }
    };
  }

  @Bean
  public ItemProcessor<Competence, AdditionalSkill> itemProcessor() {
    return AdditionalSkillMapper::createToDomain;
  }

  @Bean
  public ItemWriter<AdditionalSkill> itemWriter() {
    return additionalSkills -> {
      List<AdditionalSkill> additionalSkillList = new ArrayList<>(additionalSkills.getItems());
      romeAdditionalSkillService.synchronizeAndIndexAdditionalSkills(additionalSkillList);
    };
  }

  @Bean
  public SkipListener<Competence, AdditionalSkill> skipListener() {
    return new SkipListener<>() {
      @Override
      public void onSkipInRead(Throwable t) {
        log.error("Skip in reading (API) : {}", t.getMessage());
      }

      @Override
      public void onSkipInProcess(Competence item, Throwable t) {
        log.error("Skip in processing for {} : {}", item, t.getMessage());
      }

      @Override
      public void onSkipInWrite(AdditionalSkill item, Throwable t) {
        log.error("Skip in writing for {} : {}", item, t.getMessage());
      }
    };
  }

  @Bean
  public JobExecutionListener jobResultListener() {
    return new JobExecutionListener() {
      @Override
      public void afterJob(JobExecution jobExecution) {
        Predicate<StepExecution> condition =
            stepExec ->
                stepExec.getReadCount() == 0
                    && stepExec.getWriteCount() == 0
                    && stepExec.getSkipCount() >= 0;

        List<StepExecution> filtered =
            jobExecution.getStepExecutions().stream()
                .filter(
                    stepExec ->
                        !"checkROME4VersionUpdateStep".equals(stepExec.getStepName())
                            || stepExec.getExitStatus().compareTo(ExitStatus.NOOP) != 0)
                .toList();

        boolean allSkippedOrEmpty = !filtered.isEmpty() && filtered.stream().allMatch(condition);

        if (allSkippedOrEmpty) {
          jobExecution.setStatus(BatchStatus.FAILED);
          jobExecution.setExitStatus(
              new ExitStatus("FAILED", "All steps skipped, no data processed"));
          log.error("Job completed as FAILED because all steps were skipped/empty.");
        }
      }
    };
  }
}