Skip to content

Commit 3e5fc71

Browse files
committed
Add PaginatedKVStore support to VssStore
1 parent 1b2de7c commit 3e5fc71

1 file changed

Lines changed: 164 additions & 4 deletions

File tree

src/io/vss_store.rs

Lines changed: 164 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use bitcoin::Network;
2424
use lightning::impl_writeable_tlv_based_enum;
2525
use lightning::io::{self, Error, ErrorKind};
2626
use lightning::sign::{EntropySource as LdkEntropySource, RandomBytes};
27-
use lightning::util::persist::{KVStore, KVStoreSync};
27+
use lightning::util::persist::{
28+
KVStore, KVStoreSync, PageToken, PaginatedKVStore, PaginatedKVStoreSync, PaginatedListResponse,
29+
};
2830
use lightning::util::ser::{Readable, Writeable};
2931
use prost::Message;
3032
use vss_client::client::VssClient;
@@ -377,6 +379,52 @@ impl KVStore for VssStore {
377379
}
378380
}
379381

382+
impl PaginatedKVStoreSync for VssStore {
383+
fn list_paginated(
384+
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
385+
) -> io::Result<PaginatedListResponse> {
386+
let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
387+
debug_assert!(false, "Failed to access internal runtime");
388+
let msg = format!("Failed to access internal runtime");
389+
Error::new(ErrorKind::Other, msg)
390+
})?;
391+
let primary_namespace = primary_namespace.to_string();
392+
let secondary_namespace = secondary_namespace.to_string();
393+
let inner = Arc::clone(&self.inner);
394+
let fut = async move {
395+
inner
396+
.list_paginated_internal(
397+
&inner.blocking_client,
398+
primary_namespace,
399+
secondary_namespace,
400+
page_token,
401+
)
402+
.await
403+
};
404+
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
405+
}
406+
}
407+
408+
impl PaginatedKVStore for VssStore {
409+
fn list_paginated(
410+
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
411+
) -> impl Future<Output = Result<PaginatedListResponse, io::Error>> + 'static + Send {
412+
let primary_namespace = primary_namespace.to_string();
413+
let secondary_namespace = secondary_namespace.to_string();
414+
let inner = Arc::clone(&self.inner);
415+
async move {
416+
inner
417+
.list_paginated_internal(
418+
&inner.async_client,
419+
primary_namespace,
420+
secondary_namespace,
421+
page_token,
422+
)
423+
.await
424+
}
425+
}
426+
}
427+
380428
impl Drop for VssStore {
381429
fn drop(&mut self) {
382430
let internal_runtime = self.internal_runtime.take();
@@ -499,7 +547,9 @@ impl VssStoreInner {
499547
keys.push(self.extract_key(&kv.key)?);
500548
}
501549

502-
Ok((keys, response.next_page_token))
550+
// VSS may return an empty string instead of None to signal the last page.
551+
let next_page_token = response.next_page_token.filter(|t| !t.is_empty());
552+
Ok((keys, next_page_token))
503553
}
504554

505555
async fn read_internal(
@@ -629,13 +679,42 @@ impl VssStoreInner {
629679
.await?;
630680
keys.extend(page_keys);
631681
match next_page_token {
632-
Some(t) if !t.is_empty() => page_token = Some(t),
633-
_ => break,
682+
Some(t) => page_token = Some(t),
683+
None => break,
634684
}
635685
}
636686
Ok(keys)
637687
}
638688

689+
async fn list_paginated_internal(
690+
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
691+
secondary_namespace: String, page_token: Option<PageToken>,
692+
) -> io::Result<PaginatedListResponse> {
693+
check_namespace_key_validity(
694+
&primary_namespace,
695+
&secondary_namespace,
696+
None,
697+
"list_paginated",
698+
)?;
699+
700+
const PAGE_SIZE: i32 = 50;
701+
702+
let vss_page_token = page_token.map(|t| t.to_string());
703+
let (keys, next_page_token) = self
704+
.list_keys(
705+
client,
706+
&primary_namespace,
707+
&secondary_namespace,
708+
vss_page_token,
709+
Some(PAGE_SIZE),
710+
)
711+
.await?;
712+
713+
let next_page_token = next_page_token.map(PageToken::new);
714+
715+
Ok(PaginatedListResponse { keys, next_page_token })
716+
}
717+
639718
async fn execute_locked_write<
640719
F: Future<Output = Result<(), lightning::io::Error>>,
641720
FN: FnOnce() -> F,
@@ -1042,4 +1121,85 @@ mod tests {
10421121
do_read_write_remove_list_persist(&vss_store);
10431122
drop(vss_store)
10441123
}
1124+
1125+
#[test]
1126+
fn vss_paginated_listing() {
1127+
let store = build_vss_store();
1128+
let ns = "test_paginated";
1129+
let sub = "listing";
1130+
let num_entries = 5;
1131+
1132+
for i in 0..num_entries {
1133+
let key = format!("key_{:04}", i);
1134+
let data = vec![i as u8; 32];
1135+
KVStoreSync::write(&store, ns, sub, &key, data).unwrap();
1136+
}
1137+
1138+
let mut all_keys = Vec::new();
1139+
let mut page_token = None;
1140+
1141+
loop {
1142+
let response =
1143+
PaginatedKVStoreSync::list_paginated(&store, ns, sub, page_token).unwrap();
1144+
all_keys.extend(response.keys);
1145+
match response.next_page_token {
1146+
Some(token) => page_token = Some(token),
1147+
_ => break,
1148+
}
1149+
}
1150+
1151+
assert_eq!(all_keys.len(), num_entries);
1152+
1153+
// Verify no duplicates
1154+
let mut unique = all_keys.clone();
1155+
unique.sort();
1156+
unique.dedup();
1157+
assert_eq!(unique.len(), num_entries);
1158+
}
1159+
1160+
#[test]
1161+
fn vss_paginated_empty_namespace() {
1162+
let store = build_vss_store();
1163+
let response =
1164+
PaginatedKVStoreSync::list_paginated(&store, "nonexistent", "ns", None).unwrap();
1165+
assert!(response.keys.is_empty());
1166+
assert!(response.next_page_token.is_none());
1167+
}
1168+
1169+
#[test]
1170+
fn vss_paginated_removal() {
1171+
let store = build_vss_store();
1172+
let ns = "test_paginated";
1173+
let sub = "removal";
1174+
1175+
KVStoreSync::write(&store, ns, sub, "a", vec![1u8; 8]).unwrap();
1176+
KVStoreSync::write(&store, ns, sub, "b", vec![2u8; 8]).unwrap();
1177+
KVStoreSync::write(&store, ns, sub, "c", vec![3u8; 8]).unwrap();
1178+
1179+
KVStoreSync::remove(&store, ns, sub, "b", false).unwrap();
1180+
1181+
let response = PaginatedKVStoreSync::list_paginated(&store, ns, sub, None).unwrap();
1182+
assert_eq!(response.keys.len(), 2);
1183+
assert!(response.keys.contains(&"a".to_string()));
1184+
assert!(!response.keys.contains(&"b".to_string()));
1185+
assert!(response.keys.contains(&"c".to_string()));
1186+
}
1187+
1188+
#[test]
1189+
fn vss_paginated_namespace_isolation() {
1190+
let store = build_vss_store();
1191+
1192+
KVStoreSync::write(&store, "ns_a", "sub", "key_1", vec![1u8; 8]).unwrap();
1193+
KVStoreSync::write(&store, "ns_a", "sub", "key_2", vec![2u8; 8]).unwrap();
1194+
KVStoreSync::write(&store, "ns_b", "sub", "key_3", vec![3u8; 8]).unwrap();
1195+
1196+
let response = PaginatedKVStoreSync::list_paginated(&store, "ns_a", "sub", None).unwrap();
1197+
assert_eq!(response.keys.len(), 2);
1198+
assert!(response.keys.contains(&"key_1".to_string()));
1199+
assert!(response.keys.contains(&"key_2".to_string()));
1200+
1201+
let response = PaginatedKVStoreSync::list_paginated(&store, "ns_b", "sub", None).unwrap();
1202+
assert_eq!(response.keys.len(), 1);
1203+
assert!(response.keys.contains(&"key_3".to_string()));
1204+
}
10451205
}

0 commit comments

Comments
 (0)