聊聊Spring AI Alibaba的ElasticsearchDocumentReader

序本文主要研究一下Spring AI Alibaba的ElasticsearchDocumentReaderElasticsearchDocumentReadercommunitydocument-readersspring-ai-al

聊聊Spring AI Alibaba的ElasticsearchDocumentReader

本文主要研究一下Spring AI Alibaba的ElasticsearchDocumentReader

ElasticsearchDocumentReader

community/document-readers/spring-ai-alibaba-starter-document-reader-elasticsearch/src/main/java/com/alibaba/cloud/ai/document/reader/es/ElasticsearchDocumentReader.java

代码语言:javascript代码运行次数:0运行复制
public class ElasticsearchDocumentReader implements DocumentReader {

	private final ElasticsearchConfig config;

	private final ElasticsearchClient client;

	/**
	 * Constructor that initializes the Elasticsearch client with the provided
	 * configuration.
	 * @param config The Elasticsearch configuration
	 */
	public ElasticsearchDocumentReader(ElasticsearchConfig config) {
		this.config = config;
		try {
			this.client = createClient();
		}
		catch (Exception e) {
			throw new RuntimeException("Failed to create Elasticsearch client", e);
		}
	}

	@Override
	public List<Document> get() {
		try {
			// Get all documents
			SearchResponse<Map> response = client.search(
					s -> s.index(config.getIndex()).query(q -> q.matchAll(m -> m)).size(config.getMaxResults()),
					Map.class);

			return getDocuments(response);
		}
		catch (IOException e) {
			throw new RuntimeException("Failed to get documents from Elasticsearch", e);
		}
	}

	@NotNull
	private List<Document> getDocuments(SearchResponse<Map> response) {
		List<Document> documents = new ArrayList<>();
		response.hits().hits().forEach(hit -> {
			Map<String, Object> source = hit.source();
			if (source != null) {
				Document document = new Document(source.getOrDefault(config.getQueryField(), "").toString(), source);
				documents.add(document);
			}
		});
		return documents;
	}

	/**
	 * Get a document by its ID.
	 * @param id The document ID
	 * @return The document if found, null otherwise
	 */
	public Document getById(String id) {
		try {
			var response = client.get(g -> g.index(config.getIndex()).id(id), Map.class);

			if (!response.found() || response.source() == null) {
				return null;
			}

			return new Document(response.source().getOrDefault(config.getQueryField(), "").toString(),
					response.source());
		}
		catch (IOException e) {
			throw new RuntimeException("Failed to get document from Elasticsearch with id: " + id, e);
		}
	}

	/**
	 * Read documents matching the specified query.
	 * @param query The search query
	 * @return List of matching documents
	 */
	public List<Document> readWithQuery(String query) {
		try {
			// Build the search request with query
			SearchResponse<Map> response = client.search(s -> s.index(config.getIndex())
				.query(q -> q.match(new MatchQuery.Builder().field(config.getQueryField()).query(query).build()))
				.size(config.getMaxResults()), Map.class);

			return getDocuments(response);
		}
		catch (IOException e) {
			throw new RuntimeException("Failed to read documents from Elasticsearch with query: " + query, e);
		}
	}

	private ElasticsearchClient createClient()
			throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
		// Create HttpHosts for all nodes
		HttpHost[] httpHosts;
		if (!CollectionUtils.isEmpty(config.getNodes())) {
			httpHosts = config.getNodes().stream().map(node -> {
				String[] parts = node.split(":");
				return new HttpHost(parts[0], Integer.parseInt(parts[1]), config.getScheme());
			}).toArray(HttpHost[]::new);
		}
		else {
			// Fallback to single node configuration
			httpHosts = new HttpHost[] { new HttpHost(config.getHost(), config.getPort(), config.getScheme()) };
		}

		var restClientBuilder = RestClient.builder(httpHosts);

		// Add authentication if credentials are provided
		if (StringUtils.hasText(config.getUsername()) && StringUtils.hasText(config.getPassword())) {
			CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
			credentialsProvider.setCredentials(AuthScope.ANY,
					new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));

			// Create SSL context if using HTTPS
			if ("https".equalsIgnoreCase(config.getScheme())) {
				SSLContext sslContext = SSLContextBuilder.create()
					.loadTrustMaterial(null, (chains, authType) -> true)
					.build();

				restClientBuilder.setHttpClientConfigCallback(
						httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
							.setSSLContext(sslContext)
							.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE));
			}
			else {
				restClientBuilder.setHttpClientConfigCallback(
						httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
			}
		}

		// Create the transport and client
		ElasticsearchTransport transport = new RestClientTransport(restClientBuilder.build(), new JacksonJsonpMapper());
		return new ElasticsearchClient(transport);
	}

}

ElasticsearchDocumentReader使用elasticsearch官方的ElasticsearchClient去读取数据,它默认读取指定索引的10条记录,并将指定字段的内容作为document的内容

示例

代码语言:javascript代码运行次数:0运行复制
@EnabledIfEnvironmentVariable(named = "ES_HOST", matches = ".+")
public class ElasticsearchDocumentReaderTest {

	private static final String TEST_INDEX = "spring-ai-test";

	private static final String TEST_DOC_ID = "1";

	// Get ES configuration from environment variables, use defaults if not set
	private static final String ES_HOST = System.getenv("ES_HOST") != null ? System.getenv("ES_HOST") : "localhost";

	private static final int ES_PORT = System.getenv("ES_PORT") != null ? Integer.parseInt(System.getenv("ES_PORT"))
			: 9200;

	private static final String ES_USERNAME = System.getenv("ES_USERNAME") != null ? System.getenv("ES_USERNAME")
			: "elastic";

	private static final String ES_PASSWORD = System.getenv("ES_PASSWORD") != null ? System.getenv("ES_PASSWORD")
			: "r-tooRd7RgrX_uZV0klZ";

	private static final String ES_SCHEME = System.getenv("ES_SCHEME") != null ? System.getenv("ES_SCHEME") : "https";

	private static ElasticsearchClient client;

	private static ElasticsearchDocumentReader reader;

	private static ElasticsearchDocumentReader clusterReader;

	// Flag to indicate if ES is available
	private static boolean esAvailable = false;

	static {
		if (System.getenv("ES_HOST") == null) {
			System.out.println("ES_HOST environment variable is not set. Tests will be skipped.");
		}
	}

	/**
	 * Check if Elasticsearch is available
	 * @return true if ES is available, false otherwise
	 */
	public static boolean isElasticsearchAvailable() {
		return esAvailable;
	}

	/**
	 * Try to connect to Elasticsearch
	 * @return true if connection successful, false otherwise
	 */
	private static boolean canConnectToElasticsearch() {
		try (Socket socket = new Socket()) {
			socket.connect(new InetSocketAddress(ES_HOST, ES_PORT), 1000);
			return true;
		}
		catch (Exception e) {
			System.out.println("Cannot connect to Elasticsearch: " + e.getMessage());
			return false;
		}
	}

	@BeforeAll
	static void setUp() throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
		// Check if ES_HOST environment variable is set
		String esHost = System.getenv("ES_HOST");
		assumeTrue(esHost != null && !esHost.isEmpty(),
				"Skipping test because ES_HOST environment variable is not set");

		// Check if we can connect to ES
		esAvailable = canConnectToElasticsearch();

		// Skip setup if ES is not available
		if (!esAvailable) {
			System.out
				.println("Skipping Elasticsearch tests because ES server is not available: " + ES_HOST + ":" + ES_PORT);
			return;
		}

		try {
			// Create SSL context that trusts all certificates
			SSLContext sslContext = SSLContextBuilder.create()
				.loadTrustMaterial(null, (chains, authType) -> true)
				.build();

			// Create client with authentication and SSL
			CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
			credentialsProvider.setCredentials(AuthScope.ANY,
					new UsernamePasswordCredentials(ES_USERNAME, ES_PASSWORD));

			RestClient restClient = RestClient.builder(new HttpHost(ES_HOST, ES_PORT, ES_SCHEME))
				.setHttpClientConfigCallback(
						httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
							.setSSLContext(sslContext)
							.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE))
				.build();

			client = new ElasticsearchClient(new RestClientTransport(restClient, new JacksonJsonpMapper()));

			// Delete index if exists
			boolean indexExists = client.indices().exists(e -> e.index(TEST_INDEX)).value();
			if (indexExists) {
				DeleteIndexResponse deleteResponse = client.indices().delete(c -> c.index(TEST_INDEX));
				assertThat(deleteResponse.acknowledged()).isTrue();
			}

			// Create test index with mapping
			CreateIndexResponse createResponse = client.indices()
				.create(c -> c.index(TEST_INDEX)
					.mappings(m -> m.properties("content", p -> p.text(t -> t.analyzer("standard")))
						.properties("title", p -> p.keyword(k -> k))));
			assertThat(createResponse.acknowledged()).isTrue();

			// Configure and create single node reader
			ElasticsearchConfig config = new ElasticsearchConfig();
			config.setHost(ES_HOST);
			config.setPort(ES_PORT);
			config.setIndex(TEST_INDEX);
			config.setQueryField("content");
			config.setUsername(ES_USERNAME);
			config.setPassword(ES_PASSWORD);
			config.setScheme(ES_SCHEME);
			reader = new ElasticsearchDocumentReader(config);

			// Configure and create cluster reader
			ElasticsearchConfig clusterConfig = new ElasticsearchConfig();
			clusterConfig.setNodes(Arrays.asList(ES_HOST + ":" + ES_PORT, ES_HOST + ":9201", ES_HOST + ":9202"));
			clusterConfig.setIndex(TEST_INDEX);
			clusterConfig.setQueryField("content");
			clusterConfig.setUsername(ES_USERNAME);
			clusterConfig.setPassword(ES_PASSWORD);
			clusterConfig.setScheme(ES_SCHEME);
			clusterReader = new ElasticsearchDocumentReader(clusterConfig);

			// Index test documents
			indexTestDocuments();
		}
		catch (Exception e) {
			System.out.println("Failed to set up Elasticsearch test environment: " + e.getMessage());
			esAvailable = false;
		}
	}

	@AfterAll
	static void tearDown() throws IOException {
		// Skip cleanup if ES is not available or client is null
		if (!esAvailable || client == null) {
			return;
		}

		try {
			DeleteIndexResponse deleteResponse = client.indices().delete(c -> c.index(TEST_INDEX));
			assertThat(deleteResponse.acknowledged()).isTrue();
		}
		catch (Exception e) {
			System.out.println("Failed to clean up Elasticsearch test environment: " + e.getMessage());
		}
	}

	@Test
	@EnabledIf("isElasticsearchAvailable")
	void testGet() {
		List<Document> documents = reader.get();
		assertThat(documents).hasSize(3);
		assertThat(documents.get(0).getText()).contains("Spring Framework");
		assertThat(documents.get(0).getMetadata()).containsKey("title");
	}

	//......

	private static void indexTestDocuments() throws IOException {
		// First document
		Map<String, Object> doc1 = new HashMap<>();
		doc1.put("content", "Spring Framework is the most popular application development framework for Java.");
		doc1.put("title", "Spring Introduction");
		client.index(i -> i.index(TEST_INDEX).id(TEST_DOC_ID).document(doc1));

		// Second document
		Map<String, Object> doc2 = new HashMap<>();
		doc2.put("content",
				"Spring Boot makes it easy to create stand-alone, production-grade Spring based Applications.");
		doc2.put("title", "Spring Boot Guide");
		client.index(i -> i.index(TEST_INDEX).document(doc2));

		// Third document
		Map<String, Object> doc3 = new HashMap<>();
		doc3.put("content", "Java is a popular programming language and platform.");
		doc3.put("title", "Java Programming");
		client.index(i -> i.index(TEST_INDEX).document(doc3));

		// Refresh index to make documents searchable
		client.indices().refresh();
	}

}	

这里ElasticsearchConfig指定了读取content字段

小结

spring-ai-alibaba-starter-document-reader-elasticsearch提供了ElasticsearchDocumentReader用于读取es文档的指定字段作为document的内容。

doc

  • java2ai
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。原始发表:2025-04-19,如有侵权请联系 cloudcommunity@tencent 删除esclientconfigdocumentspring

发布者:admin,转转请注明出处:http://www.yc00.com/web/1747602415a4668937.html

相关推荐

  • 聊聊Spring AI Alibaba的ElasticsearchDocumentReader

    序本文主要研究一下Spring AI Alibaba的ElasticsearchDocumentReaderElasticsearchDocumentReadercommunitydocument-readersspring-ai-al

    16小时前
    20

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信