refactor(event cache): have the caches own the pagination task#6304
refactor(event cache): have the caches own the pagination task#6304
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6304 +/- ##
==========================================
- Coverage 89.83% 89.82% -0.02%
==========================================
Files 376 376
Lines 102447 102508 +61
Branches 102447 102508 +61
==========================================
+ Hits 92033 92077 +44
- Misses 6836 6858 +22
+ Partials 3578 3573 -5 ☔ View full report in Codecov by Sentry. |
This makes it possible to share futures which output is a result that has the error type set to `EventCacheError`. See also #6304 for usage.
This makes it possible to share futures which output is a result that has the error type set to `EventCacheError`. See also #6304 for usage.
fea48b0 to
9e8ad72
Compare
|
|
||
| fn load_more_events_backwards( | ||
| &self, | ||
| ) -> impl Future<Output = Result<LoadMoreEventsBackwardsOutcome>> + Send; |
There was a problem hiding this comment.
Why don't you write async fn load_more_events_backwards instead of return an impl Future?
There was a problem hiding this comment.
async is only syntactic sugar for returns a Future which output is the return type; to imply that the returned future is Send, you need to have it appear in the function's signature explicitly. (In a subsequent commit, it even becomes SendOutsideWasm, fwiw.)
This makes it possible to share futures which output is a result that has the error type set to `EventCacheError`. See also #6304 for usage.
This makes it possible to share futures which output is a result that has the error type set to `EventCacheError`. See also #6304 for usage.
78ff319 to
e77b0b9
Compare
Hywan
left a comment
There was a problem hiding this comment.
We are progressing! I made a couple of suggestions, I believe we can simplify some parts.
I would love to see a bit more tests around this new shared mechanism. It works with the existing tests, but we are not testing concurrent pagination runs as far as I know.
| pin_project! { | ||
| /// A subscriber to a [`PaginationStatus`]. | ||
| /// | ||
| /// This is a manual implementation of a map function on top of an internal type |
There was a problem hiding this comment.
I'm struggling to understand why we need a custom map implementation:
SubscriberimplementsStream- We can use
futures_utils::StreamExt::mapto have our mapping. - It might be useful to clone the
Subscriberto dealing with lifetime issues.
Did you consider this choice?
There was a problem hiding this comment.
See comment below; the subscriber is used in more ways than just a stream.
| pub fn status(&self) -> PaginationStatusSubscriber { | ||
| PaginationStatusSubscriber { subscriber: self.0.cache.status().subscribe() } |
There was a problem hiding this comment.
With StreamExt::map, it might look like:
pub fn status(&self) -> impl Stream<Item = PaginationStatus> {
self.0.cache.status().subscribe().map(From::from)
}with a custom:
impl From<SharedPaginationStatus> for PaginationStatus { … }There was a problem hiding this comment.
The thing is that status() is used in more ways: some callers will make use of get(), next() and next_now(), so we can't just replace with a stream and be done with it, unfortunately.
We could plain expose the raw Subscriber<SharedPaginationStatus>, but it seems wrong to have the callers having to call the From/Into implementations themselves?
|
Thanks for the review! A general comment, before I dive into the detailed comments:
Have you seen |
Even though the task which started the back-pagination is aborted, since it's now the event cache owning it, it keeps on running in the background.
- use the modern MatrixMockServer facilities - provide a previous-batch token so as not to wait for the initial batch token - lower the delay for the /messages responses to 1 sec
…meline updates My only guess for this semantic change, is that the pagination status update and the end of the pagination now happen at different times, or close enough that they're regrouped in the same stream update. This doesn't fundamentally change the semantics, so we'll see if this holds on slower machines (e.g. CI).
For some reason, the previous test didn't take the initial skip count value into account, and now it does. Oh well.
e77b0b9 to
6159151
Compare
|
@Hywan All the things for which you've requested tests are already tested, be it with tests that predated the PR, some of which have been tweaked, or new tests ( |
This opens the door for running paginations as background tasks. As such, it's a prerequisite for #6014.
The implementation follows the following scheme:
shared()future, that can be polled from several tasks (and it will always return the same results for all the tasks awaiting upon it).This changes a few test expectations in subtle ways, but that end up in the same timelines, so I didn't pay too much attention.
CHANGELOG.mdfiles.