OpenSearchIndexImpl.java

package fr.avenirsesr.portfolio.additionalskill.infrastructure.adapter.opensearch;

import fr.avenirsesr.portfolio.additionalskill.domain.model.AdditionalSkill;
import fr.avenirsesr.portfolio.additionalskill.domain.model.AdditionalSkillCategory;
import fr.avenirsesr.portfolio.additionalskill.domain.model.AdditionalSkillPagedResult;
import fr.avenirsesr.portfolio.additionalskill.domain.model.enums.EAdditionalSkillCategoryType;
import fr.avenirsesr.portfolio.additionalskill.domain.model.enums.EAdditionalSkillType;
import fr.avenirsesr.portfolio.additionalskill.domain.port.output.OpenSearchIndex;
import fr.avenirsesr.portfolio.common.data.domain.model.PageCriteria;
import fr.avenirsesr.portfolio.common.data.domain.model.PageInfo;
import java.io.IOException;
import java.time.Instant;
import java.util.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

@Slf4j
@RequiredArgsConstructor
@Component
@Profile("!test")
public class OpenSearchIndexImpl implements OpenSearchIndex {
  private final RestHighLevelClient client;

  @Override
  @CacheEvict(value = AdditionalSkillConstants.INDEX)
  public void cleanAndCreateAdditionalSkillIndex() {
    try {
      if (client
          .indices()
          .exists(new GetIndexRequest(AdditionalSkillConstants.INDEX), RequestOptions.DEFAULT)) {
        client
            .indices()
            .delete(new DeleteIndexRequest(AdditionalSkillConstants.INDEX), RequestOptions.DEFAULT);
        log.info("Index '{}' deleted.", AdditionalSkillConstants.INDEX);
      }

      client
          .indices()
          .create(new CreateIndexRequest(AdditionalSkillConstants.INDEX), RequestOptions.DEFAULT);
      log.info("Index '{}' created.", AdditionalSkillConstants.INDEX);
    } catch (IOException e) {
      throw new RuntimeException("Failed to recreate index: " + AdditionalSkillConstants.INDEX, e);
    }
  }

  @Override
  public void indexAll(List<AdditionalSkill> additionalSkillList) {
    BulkRequest bulkRequest = new BulkRequest();

    for (AdditionalSkill additionalSkill : additionalSkillList) {
      try {
        Map<String, Object> source = getAdditionalSkillSourceMap(additionalSkill);
        bulkRequest.add(
            new IndexRequest(AdditionalSkillConstants.INDEX)
                .id(additionalSkill.getId().toString())
                .source(source));
      } catch (Exception e) {
        throw new RuntimeException("Failed additionalSkill: " + additionalSkill.getId(), e);
      }
    }

    try {
      BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
      if (response.hasFailures()) {
        log.error("Bulk indexing errors: {}", response.buildFailureMessage());
      }
    } catch (IOException e) {
      throw new RuntimeException("OpenSearchIndex bulk indexing failed", e);
    } finally {
      log.info("Bulk indexing succeeded for {} documents", additionalSkillList.size());
    }
  }

  @Override
  @Cacheable(
      value = AdditionalSkillConstants.INDEX,
      key = "#keyword + '_' + #pageCriteria.page() + '_' + #pageCriteria.pageSize()")
  public AdditionalSkillPagedResult search(String keyword, PageCriteria pageCriteria) {
    SearchRequest searchRequest = new SearchRequest(AdditionalSkillConstants.INDEX);

    SearchSourceBuilder sourceBuilder =
        new SearchSourceBuilder()
            .query(
                QueryBuilders.matchPhrasePrefixQuery(
                    AdditionalSkillConstants.FIELD_SKILL_LIBELLE, keyword))
            .from(pageCriteria.page() * pageCriteria.pageSize())
            .size(pageCriteria.pageSize())
            .trackTotalHits(true);

    searchRequest.source(sourceBuilder);

    try {
      SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
      long totalHits = response.getHits().getTotalHits().value;

      List<AdditionalSkill> additionalSkillList = getAdditionalSkillList(response);

      return new AdditionalSkillPagedResult(
          additionalSkillList,
          new PageInfo(pageCriteria.page(), pageCriteria.pageSize(), totalHits));
    } catch (IOException e) {
      throw new RuntimeException(
          "OpenSearchIndex search failed with param keyword="
              + keyword
              + ", page="
              + pageCriteria.page()
              + ", size="
              + pageCriteria.pageSize(),
          e);
    }
  }

  private Map<String, Object> getAdditionalSkillSourceMap(AdditionalSkill additionalSkill) {
    var map = new HashMap<String, Object>();
    map.put(AdditionalSkillConstants.FIELD_ID, additionalSkill.getId().toString());
    map.put(AdditionalSkillConstants.FIELD_SKILL_LIBELLE, additionalSkill.getLibelle());
    map.put(AdditionalSkillConstants.FIELD_TYPE, additionalSkill.getType());
    map.put(
        AdditionalSkillConstants.FIELD_SKILL_CATEGORIES,
        additionalSkill.getAdditionalSkillCategory().map(this::getCategorySource).orElse(null));

    return map;
  }

  private List<AdditionalSkill> getAdditionalSkillList(SearchResponse response) {
    return Arrays.stream(response.getHits().getHits())
        .map(
            hit -> {
              Map<String, Object> src = hit.getSourceAsMap();
              return AdditionalSkill.toDomain(
                  UUID.fromString((String) src.get(AdditionalSkillConstants.FIELD_ID)),
                  (String) src.get(AdditionalSkillConstants.FIELD_SKILL_LIBELLE),
                  (String) src.get(AdditionalSkillConstants.FIELD_SKILL_CODE),
                  getCategoryFromSource(src.get(AdditionalSkillConstants.FIELD_SKILL_CATEGORIES)),
                  EAdditionalSkillType.valueOf(
                      (String) src.get(AdditionalSkillConstants.FIELD_TYPE)),
                  Instant.now(),
                  Instant.now());
            })
        .toList();
  }

  private Map<String, Object> getCategorySource(AdditionalSkillCategory additionalSkillCategory) {
    if (additionalSkillCategory == null) return null;

    Map<String, Object> map = new HashMap<>();
    map.put("id", additionalSkillCategory.getId().toString());
    map.put("libelle", additionalSkillCategory.getLibelle());
    map.put("type", additionalSkillCategory.getType());
    if (additionalSkillCategory.getParent().isPresent()) {
      map.put("parent", getCategorySource(additionalSkillCategory.getParent().get()));
    }
    return map;
  }

  @SuppressWarnings("unchecked")
  private AdditionalSkillCategory getCategoryFromSource(Object source) {
    if (source == null) return null;

    Map<String, Object> map = (Map<String, Object>) source;
    UUID id = map.containsKey("id") ? UUID.fromString((String) map.get("id")) : null;
    String libelle = (String) map.get("libelle");
    EAdditionalSkillCategoryType type =
        EAdditionalSkillCategoryType.valueOf(map.get("type").toString());

    AdditionalSkillCategory parent = null;
    if (map.containsKey("parent")) {
      parent = getCategoryFromSource(map.get("parent"));
    }

    return AdditionalSkillCategory.toDomain(id, libelle, parent, type, null, null);
  }
}