OpenSearchIndexImpl.java

package fr.avenirsesr.portfolio.additionalskill.infrastructure.adapter.opensearch;

import fr.avenirsesr.portfolio.additionalskill.domain.model.AdditionalSkill;
import fr.avenirsesr.portfolio.additionalskill.domain.model.AdditionalSkillPagedResult;
import fr.avenirsesr.portfolio.additionalskill.domain.model.PathSegments;
import fr.avenirsesr.portfolio.additionalskill.domain.model.SegmentDetail;
import fr.avenirsesr.portfolio.additionalskill.domain.model.enums.EAdditionalSkillType;
import fr.avenirsesr.portfolio.additionalskill.domain.port.output.OpenSearchIndex;
import fr.avenirsesr.portfolio.additionalskill.infrastructure.adapter.utils.AdditionalSkillConstants;
import fr.avenirsesr.portfolio.shared.domain.model.PageCriteria;
import fr.avenirsesr.portfolio.shared.domain.model.PageInfo;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Component;

@Slf4j
@RequiredArgsConstructor
@Component
public class OpenSearchIndexImpl implements OpenSearchIndex {
  private final RestHighLevelClient client;

  @Override
  @CacheEvict(value = AdditionalSkillConstants.INDEX)
  public void cleanAndCreateAdditionalSkillIndex() {
    try {
      if (client
          .indices()
          .exists(new GetIndexRequest(AdditionalSkillConstants.INDEX), RequestOptions.DEFAULT)) {
        client
            .indices()
            .delete(new DeleteIndexRequest(AdditionalSkillConstants.INDEX), RequestOptions.DEFAULT);
        log.info("Index '{}' deleted.", AdditionalSkillConstants.INDEX);
      }

      client
          .indices()
          .create(new CreateIndexRequest(AdditionalSkillConstants.INDEX), RequestOptions.DEFAULT);
      log.info("Index '{}' created.", AdditionalSkillConstants.INDEX);
    } catch (IOException e) {
      throw new RuntimeException("Failed to recreate index: " + AdditionalSkillConstants.INDEX, e);
    }
  }

  @Override
  public void indexAll(List<AdditionalSkill> additionalSkillList) {
    BulkRequest bulkRequest = new BulkRequest();

    for (AdditionalSkill additionalSkill : additionalSkillList) {
      try {
        Map<String, Object> source = getAdditionalSkillSourceMap(additionalSkill);
        bulkRequest.add(
            new IndexRequest(AdditionalSkillConstants.INDEX)
                .id(additionalSkill.getId().toString())
                .source(source));
      } catch (Exception e) {
        throw new RuntimeException("Failed additionalSkill: " + additionalSkill.getId(), e);
      }
    }

    try {
      BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
      if (response.hasFailures()) {
        log.error("Bulk indexing errors: {}", response.buildFailureMessage());
      } else {
        log.info("Bulk indexing succeeded for {} documents", additionalSkillList.size());
      }
    } catch (IOException e) {
      throw new RuntimeException("OpenSearchIndex bulk indexing failed", e);
    }
  }

  @Override
  @Cacheable(
      value = AdditionalSkillConstants.INDEX,
      key = "#keyword + '_' + #pageCriteria.page() + '_' + #pageCriteria.pageSize()")
  public AdditionalSkillPagedResult search(String keyword, PageCriteria pageCriteria) {
    SearchRequest searchRequest = new SearchRequest(AdditionalSkillConstants.INDEX);

    SearchSourceBuilder sourceBuilder =
        new SearchSourceBuilder()
            .query(
                QueryBuilders.matchPhrasePrefixQuery(
                    AdditionalSkillConstants.FIELD_SKILL_LIBELLE, keyword))
            .from(pageCriteria.page() * pageCriteria.pageSize())
            .size(pageCriteria.pageSize())
            .trackTotalHits(true);

    searchRequest.source(sourceBuilder);

    try {
      SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
      long totalHits = response.getHits().getTotalHits().value;

      List<AdditionalSkill> additionalSkillList = getAdditionalSkillList(response);

      return new AdditionalSkillPagedResult(
          additionalSkillList,
          new PageInfo(pageCriteria.page(), pageCriteria.pageSize(), totalHits));
    } catch (IOException e) {
      throw new RuntimeException(
          "OpenSearchIndex search failed with param keyword="
              + keyword
              + ", page="
              + pageCriteria.page()
              + ", size="
              + pageCriteria.pageSize(),
          e);
    }
  }

  private Map<String, Object> getAdditionalSkillSourceMap(AdditionalSkill additionalSkill) {
    return Map.ofEntries(
        Map.entry(AdditionalSkillConstants.FIELD_ID, additionalSkill.getId().toString()),
        Map.entry(
            AdditionalSkillConstants.FIELD_SKILL_CODE,
            additionalSkill.getPathSegments().getSkill().getCode()),
        Map.entry(
            AdditionalSkillConstants.FIELD_SKILL_LIBELLE,
            additionalSkill.getPathSegments().getSkill().getLibelle()),
        Map.entry(
            AdditionalSkillConstants.FIELD_MACRO_SKILL_CODE,
            additionalSkill.getPathSegments().getMacroSkill().getCode()),
        Map.entry(
            AdditionalSkillConstants.FIELD_MACRO_SKILL_LIBELLE,
            additionalSkill.getPathSegments().getMacroSkill().getLibelle()),
        Map.entry(
            AdditionalSkillConstants.FIELD_TARGET_CODE,
            additionalSkill.getPathSegments().getTarget().getCode()),
        Map.entry(
            AdditionalSkillConstants.FIELD_TARGET_LIBELLE,
            additionalSkill.getPathSegments().getTarget().getLibelle()),
        Map.entry(
            AdditionalSkillConstants.FIELD_ISSUE_CODE,
            additionalSkill.getPathSegments().getIssue().getCode()),
        Map.entry(
            AdditionalSkillConstants.FIELD_ISSUE_LIBELLE,
            additionalSkill.getPathSegments().getIssue().getLibelle()),
        Map.entry(
            AdditionalSkillConstants.FIELD_DOMAIN_CODE,
            additionalSkill.getPathSegments().getDomain().getCode()),
        Map.entry(
            AdditionalSkillConstants.FIELD_DOMAIN_LIBELLE,
            additionalSkill.getPathSegments().getDomain().getLibelle()),
        Map.entry(AdditionalSkillConstants.FIELD_TYPE, additionalSkill.getType()));
  }

  private List<AdditionalSkill> getAdditionalSkillList(SearchResponse response) {
    return Arrays.stream(response.getHits().getHits())
        .map(
            hit -> {
              Map<String, Object> src = hit.getSourceAsMap();
              return AdditionalSkill.toDomain(
                  UUID.fromString((String) src.get(AdditionalSkillConstants.FIELD_ID)),
                  PathSegments.toDomain(
                      SegmentDetail.toDomain(
                          (String) src.get(AdditionalSkillConstants.FIELD_SKILL_CODE),
                          (String) src.get(AdditionalSkillConstants.FIELD_SKILL_LIBELLE)),
                      SegmentDetail.toDomain(
                          (String) src.get(AdditionalSkillConstants.FIELD_MACRO_SKILL_CODE),
                          (String) src.get(AdditionalSkillConstants.FIELD_MACRO_SKILL_LIBELLE)),
                      SegmentDetail.toDomain(
                          (String) src.get(AdditionalSkillConstants.FIELD_TARGET_CODE),
                          (String) src.get(AdditionalSkillConstants.FIELD_TARGET_LIBELLE)),
                      SegmentDetail.toDomain(
                          (String) src.get(AdditionalSkillConstants.FIELD_ISSUE_CODE),
                          (String) src.get(AdditionalSkillConstants.FIELD_ISSUE_LIBELLE)),
                      SegmentDetail.toDomain(
                          (String) src.get(AdditionalSkillConstants.FIELD_DOMAIN_CODE),
                          (String) src.get(AdditionalSkillConstants.FIELD_DOMAIN_LIBELLE))),
                  EAdditionalSkillType.valueOf(
                      (String) src.get(AdditionalSkillConstants.FIELD_TYPE)));
            })
        .toList();
  }
}