Spring Boot 集成 Elasticsearch 指南

本文将介绍如何在 Spring Boot 应用中集成并使用 Elasticsearch,包括配置连接、自动创建索引、以及实现文档的增删改查操作。

目录

环境准备

  • Java 8+
  • Spring Boot 2.x/3.x
  • Elasticsearch 7.x/8.x

Maven 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

Elasticsearch 配置

配置文件

首先在 application.ymlbootstrap.yml 中添加 Elasticsearch 连接配置:

elasticsearch:
  host: localhost
  port: 9200

创建配置类

@Configuration
@EnableElasticsearchRepositories(
        basePackages = "com.example.repository",
        considerNestedRepositories = true
)
public class ElasticsearchConfig {

    @Value("${elasticsearch.host}")
    private String host;

    @Value("${elasticsearch.port}")
    private Integer port;

    @Bean
    public RestClient restClient() {
        return RestClient.builder(new HttpHost(host, port))
                .build();
    }

    @Bean
    public ElasticsearchTransport elasticsearchTransport() {
        return new RestClientTransport(
                restClient(),
                new JacksonJsonpMapper()
        );
    }

    @Bean
    public ElasticsearchClient elasticsearchClient() {
        return new ElasticsearchClient(elasticsearchTransport());
    }
    
    @Bean(name = "elasticsearchTemplate")
    public ElasticsearchOperations elasticsearchTemplate() {
        ElasticsearchConverter elasticsearchConverter = new MappingElasticsearchConverter(
                new SimpleElasticsearchMappingContext());
        
        return new ElasticsearchTemplate(
                elasticsearchClient(),
                elasticsearchConverter);
    }
}

索引自动创建

通过实现 CommandLineRunner 接口,可以在应用启动时自动创建索引:

@Slf4j
@Configuration
@RequiredArgsConstructor
public class ElasticsearchIndexConfig implements CommandLineRunner {

    private final ElasticsearchOperations elasticsearchOperations;

    @Override
    public void run(String... args) {
        try {
            // 创建索引
            createIndex("product_index");
            createIndex("order_index");
        } catch (Exception e) {
            log.error("创建ES索引失败", e);
        }
    }

    /**
     * 创建索引
     * @param indexName 索引名称
     */
    private void createIndex(String indexName) {
        try {
            IndexOperations indexOperations = elasticsearchOperations.indexOps(IndexCoordinates.of(indexName));
            
            if (!indexOperations.exists()) {
                boolean acknowledged = indexOperations.create();
                log.info("创建索引 [{}] {}", indexName, acknowledged ? "成功" : "失败");
            } else {
                log.info("索引 [{}] 已存在", indexName);
            }
        } catch (Exception e) {
            log.error("创建索引 [{}] 失败: {}", indexName, e.getMessage());
        }
    }
}

文档操作

文档映射

首先创建文档类,使用 Spring Data Elasticsearch 注解进行映射:

@Document(indexName = "product_index")
@Data
public class ProductDocument {

    @Id
    private String id;
    
    @Field(type = FieldType.Text, analyzer = "ik_max_word")
    private String name;
    
    @Field(type = FieldType.Keyword)
    private String category;
    
    @Field(type = FieldType.Double)
    private Double price;
    
    @Field(type = FieldType.Text)
    private String description;
    
    @Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second)
    private Date createTime;
}

Repository 接口

创建 Repository 接口,继承 ElasticsearchRepository

public interface ProductRepository extends ElasticsearchRepository<ProductDocument, String> {

    List<ProductDocument> findByCategory(String category);
    
    List<ProductDocument> findByNameLike(String name);
    
    List<ProductDocument> findByPriceBetween(Double minPrice, Double maxPrice);
}

创建文档

@Service
@Slf4j
public class ElasticsearchService {

    private final ProductRepository productRepository;
    private final ElasticsearchOperations elasticsearchTemplate;

    @Autowired
    public ElasticsearchService(
            ProductRepository productRepository,
            @Qualifier("elasticsearchTemplate") ElasticsearchOperations elasticsearchTemplate) {
        this.productRepository = productRepository;
        this.elasticsearchTemplate = elasticsearchTemplate;
    }

    public ProductDocument saveProduct(ProductDocument document) {
        return productRepository.save(document);
    }

    public Iterable<ProductDocument> saveAllProducts(List<ProductDocument> documents) {
        return productRepository.saveAll(documents);
    }
}

查询文档

// 通过ID查询
public Optional<ProductDocument> findProductById(String id) {
    return productRepository.findById(id);
}

// 通过字段查询
public List<ProductDocument> findProductsByCategory(String category) {
    return productRepository.findByCategory(category);
}

// 模糊查询
public List<ProductDocument> findProductsByNameLike(String name) {
    return productRepository.findByNameLike("*" + name + "*");
}

// 分页查询
public Page<ProductDocument> findProductsPage(Pageable pageable) {
    return productRepository.findAll(pageable);
}

更新文档

public void updateProduct(String id, ProductDocument updatedDocument) {
    try {
        // 先检查文档是否存在
        Optional<ProductDocument> existingDocument = productRepository.findById(id);
        
        if (existingDocument.isPresent()) {
            // 文档存在,执行更新
            ProductDocument document = existingDocument.get();
            // 更新字段
            BeanUtils.copyProperties(updatedDocument, document);
            productRepository.save(document);
            log.info("产品信息更新成功, ID: {}", id);
        } else {
            // 文档不存在,执行插入
            productRepository.save(updatedDocument);
            log.info("产品不存在,已插入新产品, ID: {}", id);
        }
    } catch (Exception e) {
        log.error("更新产品信息失败: {}", e.getMessage(), e);
    }
}

删除文档

public void deleteProduct(String id) {
    try {
        if (productRepository.existsById(id)) {
            productRepository.deleteById(id);
            log.info("产品删除成功, ID: {}", id);
        } else {
            log.warn("产品不存在, ID: {}", id);
        }
    } catch (Exception e) {
        log.error("删除产品失败: {}", e.getMessage(), e);
    }
}

高级查询

全文搜索

public Page<ProductDocument> searchProducts(String keyword, Pageable pageable) {
    // 使用 QueryStringQuery 进行全文搜索
    QueryStringQuery queryString = new QueryStringQuery.Builder()
            .query("*" + keyword + "*")  // 加通配符实现模糊匹配
            .fields(Arrays.asList("name", "description", "category"))
            .defaultOperator(co.elastic.clients.elasticsearch._types.query_dsl.Operator.Or)
            .build();
            
    // 构建NativeQuery
    NativeQuery query = NativeQuery.builder()
            .withQuery(q -> q.queryString(queryString))
            .withPageable(pageable)
            .build();
    
    // 执行查询
    SearchHits<ProductDocument> searchHits = elasticsearchTemplate.search(query, ProductDocument.class);
    
    // 转换结果为Page对象
    List<ProductDocument> content = searchHits.getSearchHits().stream()
            .map(SearchHit::getContent)
            .collect(Collectors.toList());
    return new PageImpl<>(content, pageable, searchHits.getTotalHits());
}

条件查询

public List<ProductDocument> findProductsByCriteria(String category, Double minPrice, Double maxPrice) {
    Criteria criteria = new Criteria();
    
    if (category != null && !category.isEmpty()) {
        criteria = criteria.and("category").is(category);
    }
    
    if (minPrice != null && maxPrice != null) {
        criteria = criteria.and("price").between(minPrice, maxPrice);
    } else if (minPrice != null) {
        criteria = criteria.and("price").greaterThanEqual(minPrice);
    } else if (maxPrice != null) {
        criteria = criteria.and("price").lessThanEqual(maxPrice);
    }
    
    Query query = new CriteriaQuery(criteria);
    SearchHits<ProductDocument> searchHits = elasticsearchTemplate.search(query, ProductDocument.class);
    
    return searchHits.getSearchHits().stream()
            .map(SearchHit::getContent)
            .collect(Collectors.toList());
}

实践案例

同步数据库数据到ES

public <T> void syncInsertToES(String index, T document) {
    try {
        if (document == null) {
            log.warn("同步插入ES文档失败: 文档为空");
            return;
        }
        
        if ("product_index".equals(index)) {
            if (document instanceof Product) {
                Product product = (Product) document;
                ProductDocument productDocument = new ProductDocument();
                BeanUtils.copyProperties(product, productDocument);
                productRepository.save(productDocument);
                log.info("产品同步到ES成功, ID: {}", product.getId());
            }
        } else {
            log.warn("不支持的索引类型: {}", index);
        }
    } catch (Exception e) {
        log.error("同步插入ES文档失败: {}", e.getMessage(), e);
    }
}

复合搜索结果

public Map<String, Object> searchMultipleIndices(String keyword, Pageable pageable) {
    // 分别查询不同索引
    Page<ProductDocument> allProductResults = searchProducts(keyword, Pageable.unpaged());
    Page<OrderDocument> allOrderResults = searchOrders(keyword, Pageable.unpaged());
    
    // 组合结果
    List<Object> combinedResults = new ArrayList<>();
    combinedResults.addAll(allProductResults.getContent());
    combinedResults.addAll(allOrderResults.getContent());
    
    // 手动分页
    int pageSize = pageable.getPageSize();
    int pageNumber = pageable.getPageNumber();
    int fromIndex = pageNumber * pageSize;
    int toIndex = Math.min(fromIndex + pageSize, combinedResults.size());
    
    List<Object> pagedResults = fromIndex < combinedResults.size() 
            ? combinedResults.subList(fromIndex, toIndex) 
            : new ArrayList<>();
    
    // 构建结果Map
    Map<String, Object> results = new HashMap<>();
    
    // 分离结果为不同类型
    List<ProductDocument> pagedProducts = new ArrayList<>();
    List<OrderDocument> pagedOrders = new ArrayList<>();
    
    for (Object obj : pagedResults) {
        if (obj instanceof ProductDocument) {
            pagedProducts.add((ProductDocument) obj);
        } else if (obj instanceof OrderDocument) {
            pagedOrders.add((OrderDocument) obj);
        }
    }
    
    // 构造返回结果
    Map<String, Object> productData = new HashMap<>();
    productData.put("content", pagedProducts);
    productData.put("totalElements", allProductResults.getTotalElements());
    
    Map<String, Object> orderData = new HashMap<>();
    orderData.put("content", pagedOrders);
    orderData.put("totalElements", allOrderResults.getTotalElements());
    
    results.put("products", productData);
    results.put("orders", orderData);
    results.put("currentPage", pageNumber);
    results.put("pageSize", pageSize);
    results.put("totalElements", allProductResults.getTotalElements() + allOrderResults.getTotalElements());
    
    return results;
}

常见问题

1. 如何处理ES连接异常?

在实际应用中,ES可能因网络问题或服务不可用而导致连接异常。建议使用重试机制和断路器模式来处理连接异常,确保应用的稳定性。

2. 索引设计注意事项

  • 为字段选择合适的数据类型
  • 根据业务需求选择合适的分析器
  • 考虑是否需要分词
  • 对于经常用于过滤的字段使用keyword类型
  • 对于需要全文检索的字段使用text类型并配置合适的分析器

3. 如何提高查询性能?

  • 使用过滤器缩小查询范围
  • 避免使用通配符前缀查询(如 "*keyword")
  • 为经常查询的字段创建索引
  • 使用聚合缓存
  • 合理设置分片数量

4. 如何处理大批量数据?

对于大批量数据操作,应使用批量API(bulk API)而不是单个文档操作,可以显著提高性能:

public void bulkInsert(List<ProductDocument> documents) {
    List<IndexQuery> queries = new ArrayList<>();
    for (ProductDocument document : documents) {
        IndexQuery indexQuery = new IndexQueryBuilder()
            .withId(document.getId())
            .withObject(document)
            .build();
        queries.add(indexQuery);
    }
    
    if (!queries.isEmpty()) {
        elasticsearchTemplate.bulkIndex(queries, ProductDocument.class);
    }
}

以上就是Spring Boot集成Elasticsearch的基本实现方式,通过这些配置和API的使用,可以快速在Spring Boot应用中实现全文检索功能,提升应用的搜索体验。