Skip to content

Commit beedbf8

Browse files
committed
Reuse TantivySearcher across queries via searcher pool
1 parent 1aeaee6 commit beedbf8

File tree

4 files changed

+85
-21
lines changed

4 files changed

+85
-21
lines changed

paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ public class TantivyFullTextGlobalIndexer implements GlobalIndexer {
3636
private final TantivySearcherPool searcherPool;
3737

3838
public TantivyFullTextGlobalIndexer() {
39-
this(new TantivySearcherPool(Runtime.getRuntime().availableProcessors() * 2));
39+
this(
40+
new TantivySearcherPool(
41+
TantivyFullTextIndexOptions.SEARCHER_POOL_MAX_SIZE.defaultValue()));
4042
}
4143

4244
public TantivyFullTextGlobalIndexer(TantivySearcherPool searcherPool) {

paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexerFactory.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ public class TantivyFullTextGlobalIndexerFactory implements GlobalIndexerFactory
3232
* Shared pool across all indexers created by this factory. This factory instance is a JVM-level
3333
* singleton (loaded once via {@link java.util.ServiceLoader}), so the pool naturally survives
3434
* across queries and scanners.
35+
*
36+
* <p>The pool is initialized lazily on the first {@link #create} call so that the pool size can
37+
* be read from user-supplied options.
3538
*/
36-
private final TantivySearcherPool searcherPool =
37-
new TantivySearcherPool(Runtime.getRuntime().availableProcessors() * 2);
39+
private volatile TantivySearcherPool searcherPool;
3840

3941
@Override
4042
public String identifier() {
@@ -43,6 +45,14 @@ public String identifier() {
4345

4446
@Override
4547
public GlobalIndexer create(DataField field, Options options) {
48+
if (searcherPool == null) {
49+
synchronized (this) {
50+
if (searcherPool == null) {
51+
int maxSize = options.get(TantivyFullTextIndexOptions.SEARCHER_POOL_MAX_SIZE);
52+
searcherPool = new TantivySearcherPool(maxSize);
53+
}
54+
}
55+
}
4656
return new TantivyFullTextGlobalIndexer(searcherPool);
4757
}
4858
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.tantivy.index;
20+
21+
import org.apache.paimon.options.ConfigOption;
22+
import org.apache.paimon.options.ConfigOptions;
23+
24+
/** Options for the Tantivy full-text index. */
25+
public class TantivyFullTextIndexOptions {
26+
27+
public static final ConfigOption<Integer> SEARCHER_POOL_MAX_SIZE =
28+
ConfigOptions.key("tantivy.searcher-pool.max-size")
29+
.intType()
30+
.defaultValue(32)
31+
.withDescription(
32+
"Maximum number of idle TantivySearcher instances kept in the pool "
33+
+ "across all index shards. Each entry holds the index open in "
34+
+ "Rust memory (including the FST term dictionary), so memory "
35+
+ "usage scales with this value times the index size per shard. "
36+
+ "Set to 0 to disable pooling.");
37+
}

paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivySearcherPool.java

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,33 @@
2222
import org.apache.paimon.tantivy.TantivySearcher;
2323
import org.apache.paimon.utils.IOUtils;
2424

25+
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
26+
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
27+
2528
import javax.annotation.Nullable;
2629

2730
import java.io.Closeable;
2831
import java.io.IOException;
29-
import java.util.concurrent.ConcurrentHashMap;
30-
import java.util.concurrent.LinkedBlockingDeque;
32+
import java.util.concurrent.TimeUnit;
3133

3234
/**
3335
* Pool of {@link TantivySearcher} instances keyed by index file identity ({@code filePath@size}).
3436
*
3537
* <p>Each searcher holds the Tantivy index open in Rust memory (including the FST term dictionary).
3638
* Pooling avoids the repeated cost of loading the index on every query.
3739
*
40+
* <p>At most one idle searcher is kept per key. Under concurrent queries on the same shard, the
41+
* last entry to be returned wins; the others are closed immediately. Entries idle for more than
42+
* {@link #EXPIRE_AFTER_ACCESS_MINUTES} minutes are evicted and closed automatically. The total
43+
* number of idle entries across all keys is bounded by {@code maxSize}.
44+
*
3845
* <p>Thread-safe. Borrow/return semantics guarantee at most one thread uses a given entry at a
3946
* time.
4047
*/
4148
public class TantivySearcherPool {
4249

50+
static final long EXPIRE_AFTER_ACCESS_MINUTES = 30;
51+
4352
/** A borrowed searcher + its backing stream. Both are returned together. */
4453
static final class PooledEntry implements Closeable {
4554
final TantivySearcher searcher;
@@ -57,12 +66,21 @@ public void close() throws IOException {
5766
}
5867
}
5968

60-
private final int maxSizePerKey;
61-
private final ConcurrentHashMap<String, LinkedBlockingDeque<PooledEntry>> pool =
62-
new ConcurrentHashMap<>();
69+
@Nullable private final Cache<String, PooledEntry> idleCache;
6370

64-
public TantivySearcherPool(int maxSizePerKey) {
65-
this.maxSizePerKey = maxSizePerKey;
71+
public TantivySearcherPool(int maxSize) {
72+
if (maxSize <= 0) {
73+
this.idleCache = null;
74+
} else {
75+
Cache<String, PooledEntry> cache =
76+
Caffeine.newBuilder()
77+
.maximumSize(maxSize)
78+
.expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES, TimeUnit.MINUTES)
79+
.executor(Runnable::run)
80+
.removalListener((k, v, c) -> IOUtils.closeQuietly((PooledEntry) v))
81+
.build();
82+
this.idleCache = cache;
83+
}
6684
}
6785

6886
/**
@@ -73,24 +91,21 @@ public TantivySearcherPool(int maxSizePerKey) {
7391
*/
7492
@Nullable
7593
public PooledEntry borrow(String key) {
76-
LinkedBlockingDeque<PooledEntry> deque = pool.get(key);
77-
return deque == null ? null : deque.pollFirst();
94+
if (idleCache == null) {
95+
return null;
96+
}
97+
return idleCache.asMap().remove(key);
7898
}
7999

80100
/**
81-
* Return a previously borrowed entry. If the pool is full, the entry is closed immediately.
82-
*
83-
* <p>The stream position after use is irrelevant — Rust always seeks before reading.
101+
* Return a previously borrowed entry to the pool. Any entry displaced by size eviction, TTL
102+
* expiry, or key replacement is closed automatically via the removal listener.
84103
*/
85104
public void returnEntry(String key, PooledEntry entry) {
86-
if (maxSizePerKey <= 0) {
105+
if (idleCache == null) {
87106
IOUtils.closeQuietly(entry);
88107
return;
89108
}
90-
LinkedBlockingDeque<PooledEntry> deque =
91-
pool.computeIfAbsent(key, k -> new LinkedBlockingDeque<>(maxSizePerKey));
92-
if (!deque.offerFirst(entry)) {
93-
IOUtils.closeQuietly(entry);
94-
}
109+
idleCache.put(key, entry);
95110
}
96111
}

0 commit comments

Comments
 (0)