Skip to content

feat(risingwave): support tumble and hop window agg#11970

Open
litaxc wants to merge 4 commits intoibis-project:mainfrom
litaxc:main
Open

feat(risingwave): support tumble and hop window agg#11970
litaxc wants to merge 4 commits intoibis-project:mainfrom
litaxc:main

Conversation

@litaxc
Copy link
Copy Markdown

@litaxc litaxc commented Mar 13, 2026

Description of changes

Implement the visit_WindowAggregate function as it is done in Flink.

Issues closed

Resolves #11969

@github-actions github-actions bot added the sql Backends that generate SQL label Mar 13, 2026
@deepyaman
Copy link
Copy Markdown
Collaborator

Can you add tests for this similar to ibis/backends/flink/tests/test_window.py and ibis/backends/pyspark/tests/test_window.py? TBH I'm not sure where there aren't also tests for hopping windows in these.

Alternatively, it could also be an option to centralize the window tests in ibis/backends/tests/test_window.py, and to only test some of these options for streaming backends, but I think it's probably easiest to take the first route for now.

@github-actions github-actions bot added tests Issues or PRs related to tests risingwave The RisingWave backend labels Mar 17, 2026
@litaxc
Copy link
Copy Markdown
Author

litaxc commented Mar 17, 2026

I added tests for the tumble and hop functions.
I increased the window size to one day (it was 30 seconds in both Flink and PySpark's tests) to make the engine actually aggregate something.
Let me know if you prefer the original setting.

Copy link
Copy Markdown
Collaborator

@deepyaman deepyaman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, very much aligned to the Flink implementation (which is great). @cpcloud / @gforsyth / @NickCrews could one of you kick off the CI workflow?

t = alltypes
expr = (
t.window_by(t.timestamp_col)
.hop(size=ibis.interval(days=10), slide=ibis.interval(days=10))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe should use a different slide so that it's not essentially a hopping window? :) I'm sure it works, but even reviewing at a glance I just noticed the results are the same.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried using size=20 days, but I couldn't quite understand the resulting window_start/end produced by RisingWave. So I changed back to use the same size and slide to ensure I would't introduce wrong results.

@NickCrews
Copy link
Copy Markdown
Contributor

Thanks for your work here @litaxc! Looks like a good implementation.

I would prefer these tests to be backend-agnostic so they run on all backends and we can ensure that all backends produce the same result. I see we already have tests in the flink and pyspark backend tests (grep for window_by in files matching *test*). So I don't blame you for adding a risingwave test file. But unlucky you! You are going to have to have to clean up the mess that others left ;)

Can you please:

  • add a new ibis/backends/tests/test_temporal_window.py test file that tests all backends. (I'm naming this to follow the naming convention of ibis/tests/expr/test_temporal_window.py which is the equivalent of this, but doesn't require the backend to execute)
  • migrate the existing flink and pyspark tests there, and add this new risingwave test there? All the other backends will need to be pytest.mark.never() or pytest.mark.notimpl() depending on if the backend will sometime/ever support streaming.

I would also like to have better visibility that the result is actually correct. Just testing the shape feels both inadequate and hard to grok. Either I would like to

  1. see the actual generated SQL, eg use the snapshot fixture (similar to how it is used in ibis/backends/tests/test_sql.py) and then commit the sql snapshots to git. If you do this generated SQL, and then just check the output shape (which is a smoketest that the backend can actually execute the expression), that feels enough
  2. Or, inline some handcrafted dummy input data that is easy to reason about (you know, maybe like 6-8 rows and a few columns) and then check the output results actually match the expected data. In this case, we wouldn't need to check the output shape or the SQL

I think either of these should be enough to give us enough visibility and confidence around #11970 (comment)

@deepyaman
Copy link
Copy Markdown
Collaborator

@NickCrews I agree, but I'm happy to take this myself in a follow-up (unless you particularly want to @litaxc):

But unlucky you! You are going to have to have to clean up the mess that others left ;)

Can you please:

add a new ibis/backends/tests/test_temporal_window.py test file that tests all backends. (I'm naming this to follow the naming convention of ibis/tests/expr/test_temporal_window.py which is the equivalent of this, but doesn't require the backend to execute)
migrate the existing flink and pyspark tests there, and add this new risingwave test there? All the other backends will need to be pytest.mark.never() or pytest.mark.notimpl() depending on if the backend will sometime/ever support streaming.

I was planning to take a look at the result shape, when get a chance, since that's a bit more concerning; sorry for the delay on that.

@NickCrews
Copy link
Copy Markdown
Contributor

I'm fine with the cleanup being in a followup as long as it's not me that has to do it :) that would be awesome if you would be willing to take it on @deepyaman!!

If we get some more visibility/confidence around the pending inline comment so we are confident it is correct then I am happy to merge. Thanks

@litaxc
Copy link
Copy Markdown
Author

litaxc commented Mar 25, 2026

One problem with merging all the temporal window tests into one file is that the same query produces different results for Flink, PySpark, and RisingWave.
For example, they all have the test test_tumble_window_by_grouped_agg, when aggregate with 30sec interval, the results are the following:

backend output shape
Flink (610, 4)
PySpark (7299, 4)
RisingWave (7300, 4)
DuckDB (w/ date_trunc) (7300, 4)

(BTW the test data length is 7300)

(EDIT: I verify the result with DuckDB using date_trunc() and update the table)

@NickCrews
Copy link
Copy Markdown
Contributor

the same query produces different results for Flink, PySpark, and RisingWave

I agree this isn't ideal. If possible, it would be great to eventually smooth over these inconsistencies so that ibis's promise of portability actually is true. But for this first step, I just want the tests to be co-located so that it is obvious that they give different results. Later we can get around to making them consistent in a followup PR.

In the meantime we can parameterize the expected_rows so that the tests pass. eg

_nrows_expected_by_backend = {
   "duckdb": 7300,
    # etc
}

def test_tumble_window_by_grouped_agg(con):
    expected_shape = (_nrows_expected_by_backend(con.name), 4)
    # etc

PS, I'm familiar at all with these streaming backends, if you have any insight on WHY they give different results, and ideas on how to normalize them, that would be awesome to jot down notes here for someone in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

risingwave The RisingWave backend sql Backends that generate SQL tests Issues or PRs related to tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(risingwave): support tumble and hop window aggregation

3 participants