Skip to content

Commit ca62681

Browse files
committed
fix index.clear()
Before: the `.clear()` algorithm is wrong: 1. Suppose they are 2200 documents. The first 500 are deleted: 0 1000 2000 | | | xxxxx----------------- | | | index + limit index 2. There are now 1700 documents, and the next pagination query wrongly skips the first 500 documents, and remove from 500 to 1000: 0 1000 | | -----xxxxx------- | | | index + limit index 3. There are now 1200 documents, and the next pagination query wrongly skips the first 1000 documents, and remove from 1000 to 1200: 0 1000 | | ----------xx => Only 1200 of the 2200 documents are remove, and 1000 documents remain. After: 500 documents are deleted in loop until there is nothing to delete (with a security condition on the initial documents number, in case of concurrent insertions).
1 parent 242bee2 commit ca62681

File tree

1 file changed

+26
-11
lines changed

1 file changed

+26
-11
lines changed

redisvl/index/index.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import json
33
import threading
4+
from math import ceil
45
import time
56
import warnings
67
import weakref
@@ -717,7 +718,7 @@ def _delete_batch(self, batch_keys: List[str]) -> int:
717718
batch_keys (List[str]): List of Redis keys to delete.
718719
719720
Returns:
720-
int: Count of records deleted from Redis.
721+
int: Count of recordrecords deleted from Redis.
721722
"""
722723
client = cast(SyncRedisClient, self._redis_client)
723724
is_cluster = isinstance(client, RedisCluster)
@@ -745,14 +746,21 @@ def clear(self) -> int:
745746
Returns:
746747
int: Count of records deleted from Redis.
747748
"""
749+
batch_size = 500
750+
max_ratio = 1.01
751+
752+
info = self.info()
753+
max_records_deleted = ceil(info["num_records"]*max_ratio) # Allow to remove some additional concurrent inserts
748754
total_records_deleted: int = 0
755+
query = FilterQuery(FilterExpression("*"), return_fields=["id"]).paging(0, batch_size)
749756

750-
for batch in self.paginate(
751-
FilterQuery(FilterExpression("*"), return_fields=["id"]), page_size=500
752-
):
753-
batch_keys = [record["id"] for record in batch]
754-
if batch_keys:
757+
while True:
758+
batch = self._query(query)
759+
if batch and total_records_deleted <= max_records_deleted:
760+
batch_keys = [record["id"] for record in batch]
755761
total_records_deleted += self._delete_batch(batch_keys)
762+
else:
763+
break
756764

757765
return total_records_deleted
758766

@@ -1615,14 +1623,21 @@ async def clear(self) -> int:
16151623
Returns:
16161624
int: Count of records deleted from Redis.
16171625
"""
1626+
batch_size = 500
1627+
max_ratio = 1.01
1628+
1629+
info = await self.info()
1630+
max_records_deleted = ceil(info["num_records"]*max_ratio) # Allow to remove some additional concurrent inserts
16181631
total_records_deleted: int = 0
1632+
query = FilterQuery(FilterExpression("*"), return_fields=["id"]).paging(0, batch_size)
16191633

1620-
async for batch in self.paginate(
1621-
FilterQuery(FilterExpression("*"), return_fields=["id"]), page_size=500
1622-
):
1623-
batch_keys = [record["id"] for record in batch]
1624-
if batch_keys:
1634+
while True:
1635+
batch = await self._query(query)
1636+
if batch and total_records_deleted <= max_records_deleted:
1637+
batch_keys = [record["id"] for record in batch]
16251638
total_records_deleted += await self._delete_batch(batch_keys)
1639+
else:
1640+
break
16261641

16271642
return total_records_deleted
16281643

0 commit comments

Comments
 (0)