OpenSearchIndexImpl.java
package fr.avenirsesr.portfolio.additionalskill.infrastructure.adapter.opensearch;
import fr.avenirsesr.portfolio.additionalskill.domain.model.AdditionalSkill;
import fr.avenirsesr.portfolio.additionalskill.domain.model.AdditionalSkillCategory;
import fr.avenirsesr.portfolio.additionalskill.domain.model.AdditionalSkillPagedResult;
import fr.avenirsesr.portfolio.additionalskill.domain.model.enums.EAdditionalSkillCategoryType;
import fr.avenirsesr.portfolio.additionalskill.domain.model.enums.EAdditionalSkillType;
import fr.avenirsesr.portfolio.additionalskill.domain.port.output.OpenSearchIndex;
import fr.avenirsesr.portfolio.common.data.domain.model.PageCriteria;
import fr.avenirsesr.portfolio.common.data.domain.model.PageInfo;
import java.io.IOException;
import java.time.Instant;
import java.util.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
@Slf4j
@RequiredArgsConstructor
@Component
@Profile("!test")
public class OpenSearchIndexImpl implements OpenSearchIndex {
private final RestHighLevelClient client;
@Override
@CacheEvict(value = AdditionalSkillConstants.INDEX)
public void cleanAndCreateAdditionalSkillIndex() {
try {
if (client
.indices()
.exists(new GetIndexRequest(AdditionalSkillConstants.INDEX), RequestOptions.DEFAULT)) {
client
.indices()
.delete(new DeleteIndexRequest(AdditionalSkillConstants.INDEX), RequestOptions.DEFAULT);
log.info("Index '{}' deleted.", AdditionalSkillConstants.INDEX);
}
client
.indices()
.create(new CreateIndexRequest(AdditionalSkillConstants.INDEX), RequestOptions.DEFAULT);
log.info("Index '{}' created.", AdditionalSkillConstants.INDEX);
} catch (IOException e) {
throw new RuntimeException("Failed to recreate index: " + AdditionalSkillConstants.INDEX, e);
}
}
@Override
public void indexAll(List<AdditionalSkill> additionalSkillList) {
BulkRequest bulkRequest = new BulkRequest();
for (AdditionalSkill additionalSkill : additionalSkillList) {
try {
Map<String, Object> source = getAdditionalSkillSourceMap(additionalSkill);
bulkRequest.add(
new IndexRequest(AdditionalSkillConstants.INDEX)
.id(additionalSkill.getId().toString())
.source(source));
} catch (Exception e) {
throw new RuntimeException("Failed additionalSkill: " + additionalSkill.getId(), e);
}
}
try {
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (response.hasFailures()) {
log.error("Bulk indexing errors: {}", response.buildFailureMessage());
}
} catch (IOException e) {
throw new RuntimeException("OpenSearchIndex bulk indexing failed", e);
} finally {
log.info("Bulk indexing succeeded for {} documents", additionalSkillList.size());
}
}
@Override
@Cacheable(
value = AdditionalSkillConstants.INDEX,
key = "#keyword + '_' + #pageCriteria.page() + '_' + #pageCriteria.pageSize()")
public AdditionalSkillPagedResult search(String keyword, PageCriteria pageCriteria) {
SearchRequest searchRequest = new SearchRequest(AdditionalSkillConstants.INDEX);
SearchSourceBuilder sourceBuilder =
new SearchSourceBuilder()
.query(
QueryBuilders.matchPhrasePrefixQuery(
AdditionalSkillConstants.FIELD_SKILL_LIBELLE, keyword))
.from(pageCriteria.page() * pageCriteria.pageSize())
.size(pageCriteria.pageSize())
.trackTotalHits(true);
searchRequest.source(sourceBuilder);
try {
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
long totalHits = response.getHits().getTotalHits().value;
List<AdditionalSkill> additionalSkillList = getAdditionalSkillList(response);
return new AdditionalSkillPagedResult(
additionalSkillList,
new PageInfo(pageCriteria.page(), pageCriteria.pageSize(), totalHits));
} catch (IOException e) {
throw new RuntimeException(
"OpenSearchIndex search failed with param keyword="
+ keyword
+ ", page="
+ pageCriteria.page()
+ ", size="
+ pageCriteria.pageSize(),
e);
}
}
private Map<String, Object> getAdditionalSkillSourceMap(AdditionalSkill additionalSkill) {
var map = new HashMap<String, Object>();
map.put(AdditionalSkillConstants.FIELD_ID, additionalSkill.getId().toString());
map.put(AdditionalSkillConstants.FIELD_SKILL_LIBELLE, additionalSkill.getLibelle());
map.put(AdditionalSkillConstants.FIELD_TYPE, additionalSkill.getType());
map.put(
AdditionalSkillConstants.FIELD_SKILL_CATEGORIES,
additionalSkill.getAdditionalSkillCategory().map(this::getCategorySource).orElse(null));
return map;
}
private List<AdditionalSkill> getAdditionalSkillList(SearchResponse response) {
return Arrays.stream(response.getHits().getHits())
.map(
hit -> {
Map<String, Object> src = hit.getSourceAsMap();
return AdditionalSkill.toDomain(
UUID.fromString((String) src.get(AdditionalSkillConstants.FIELD_ID)),
(String) src.get(AdditionalSkillConstants.FIELD_SKILL_LIBELLE),
(String) src.get(AdditionalSkillConstants.FIELD_SKILL_CODE),
getCategoryFromSource(src.get(AdditionalSkillConstants.FIELD_SKILL_CATEGORIES)),
EAdditionalSkillType.valueOf(
(String) src.get(AdditionalSkillConstants.FIELD_TYPE)),
Instant.now(),
Instant.now());
})
.toList();
}
private Map<String, Object> getCategorySource(AdditionalSkillCategory additionalSkillCategory) {
if (additionalSkillCategory == null) return null;
Map<String, Object> map = new HashMap<>();
map.put("id", additionalSkillCategory.getId().toString());
map.put("libelle", additionalSkillCategory.getLibelle());
map.put("type", additionalSkillCategory.getType());
if (additionalSkillCategory.getParent().isPresent()) {
map.put("parent", getCategorySource(additionalSkillCategory.getParent().get()));
}
return map;
}
@SuppressWarnings("unchecked")
private AdditionalSkillCategory getCategoryFromSource(Object source) {
if (source == null) return null;
Map<String, Object> map = (Map<String, Object>) source;
UUID id = map.containsKey("id") ? UUID.fromString((String) map.get("id")) : null;
String libelle = (String) map.get("libelle");
EAdditionalSkillCategoryType type =
EAdditionalSkillCategoryType.valueOf(map.get("type").toString());
AdditionalSkillCategory parent = null;
if (map.containsKey("parent")) {
parent = getCategoryFromSource(map.get("parent"));
}
return AdditionalSkillCategory.toDomain(id, libelle, parent, type, null, null);
}
}