feat(stream): subproject A client AST col-pruning + subproject C reader TSDB v6 refactor & vtable mapping & dual-mode#35233
feat(stream): subproject A client AST col-pruning + subproject C reader TSDB v6 refactor & vtable mapping & dual-mode#35233wangmm0220 wants to merge 21 commits into3.0from
Conversation
…sertions - Extract scan column names from serialized scan plan JSON - Helper function walks cJSON tree collecting ScanCols/ScanPseudoCols entries - Returns union of column names normalized to lowercase - Supports subsequent assertions on scan column sets in Task 2/5/7 Co-authored-by: Copilot <[email protected]>
State_window+%%trows currently leaks calc-only column c2 into trigger scan plan. New test TestStreamScanColPruning_StateWindowTrows asserts the desired post-pruning behavior. Also fix extractScanColsFromPlanJson to walk Target -> Expr -> Column.ColName instead of looking for ColName on the Target wrapper directly.
Trigger scan plan no longer appends calc-only projection columns. This removes c1 leak for state_window/period + %%trows shapes. Updates ParserStreamTest period+%%trows snapshot: trigger now omits c1 and partition cols slot ids shift -1 accordingly.
State window + %%trows + pre_filter: calc query must independently scan the pre_filter column. Currently calc misses c2 -> red.
Task 4: When trigger uses %%trows + pre_filter, AND the pre_filter into calc's WHERE so calc independently re-applies it and the filter columns flow into pScanCols. Bypass user-facing trows+WHERE check via allowTrowsWhere flag for this system injection.
T2: state_window + pre_filter, calc reads st1 (no trows compensation) T4: event_window + %%trows T5: session window + calc reads st1 T6: count_window + %%trows + pre_filter T7: interval/sliding + %%trows + pre_filter T8: period trigger
… unblock Adds st1v virtual normal table fixture and TestStreamScanColPruning_VirtualTableUnblock that currently fails at the disable check in createStreamReqBuildCalc.
After Task 4's correct two-AST col pruning + pre_filter compensation, the original guard rail in createStreamReqBuildCalc is no longer needed. Virtual-table users can now use pre_filter together with %%trows. Move the st1v test fixture to the end of mockCatalog setup so existing snapshot expectations on table IDs remain stable.
There was a problem hiding this comment.
Code Review
This pull request implements stream client-side AST scan-column pruning and pre-filter compensation. Key changes include adding an allowTrowsWhere flag to the stream parser information, introducing a virtual table fixture for testing, and adding a suite of unit tests to verify that trigger plans scan only necessary columns while ensuring pre-filter columns are correctly compensated in calculation scans. A helper function for parsing scan columns from JSON plans was also added to support these tests. I have no feedback to provide.
There was a problem hiding this comment.
Pull request overview
This PR improves stream parsing/planning on the client translation layer by pruning trigger-side scan columns derived from AST analysis, and by compensating pre_filter into the calc query when calc reads from %%trows to keep filter semantics consistent and ensure required scan columns are present. It also unblocks the previously rejected combination of virtual tables with %%trows + pre_filter, and adds targeted tests/fixtures to validate these behaviors.
Changes:
- Update stream translation to (a) stop backfilling calc-only projection columns into trigger scans, (b) inject cloned
pre_filterinto calcWHEREwhen calc uses%%trows, and (c) remove the legacy reject for virtual table +%%trows + pre_filter. - Add
allowTrowsWhereto distinguish system-injectedWHEREfrom user-writtenWHEREduring translation. - Extend parser tests and mock catalog with a virtual normal table fixture plus new scan-col pruning test coverage.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| source/libs/parser/src/parTranslater.c | Adds allowTrowsWhere gate and injects pre_filter into calc WHERE for %%trows; removes prior trigger-scan backfill and virtual-table reject logic. |
| source/libs/parser/inc/parUtil.h | Extends SParseStreamInfo with allowTrowsWhere to control %%trows + WHERE validation behavior. |
| source/libs/parser/test/mockCatalog.cpp | Adds st1v virtual normal table fixture for stream tests without breaking existing tableId snapshots. |
| source/libs/parser/test/parStreamTest.cpp | Adds helper to extract scan cols from plan JSON and introduces multiple new tests covering scan-col pruning and pre_filter compensation scenarios. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
The qStreamRemapBlockBySlotColMap helper was a v6.1 design artifact that never matched production. v7.0 replaces it with a pre-read model via pickSchemasHistory + tsdbReaderOpen options, leaving no reader-side block-level remap to test. See DS sub-project C v7.0 §6.6.
- 新增 9 个 STRIGGER_PULL_TSDB_DATA*_NEW{,_NEXT,_CALC,_CALC_NEXT}
+ STRIGGER_PULL_SET_TABLE_HISTORY enum
- 新增请求结构 SSTriggerTsdbData{,VTable}NewRequest + 编解码
- vnodeStream.c F5/F6/F7/F8 case + 双层 vtableTaskMap 缓存
- F9 与 F4 共用 vnodeProcessStreamSetTableReq, 历史/实时三件套互不干扰
- pickSchemasHistory 读前下发 options.schemas/pSlotList/isSchema=true
- STREAM_RETURN_ROWS_TSDB_NUM=50000 阈值分离, 按 case 单点持有
vnodeProcessStreamWalCalcDataNewReq 删除, 与 vnodeProcessStreamWalDataNewReq 合并为单一函数: - 入口按 base.type 判定 isCalc - isCalc 时使用 calcBlock, 否则 triggerBlock - 非 vtable 路径仍保留 transform 到 calcBlock 的兼容 dispatcher case STRIGGER_PULL_WAL_CALC_DATA_NEW fall through 到 WAL_DATA_NEW
- SCMCreateStreamReq.isOldPlan in-memory flag (set by mnode at decode) - SStreamReaderDeployFromTrigger.isOldPlan wire field (encode/decode) - mnode VER 8 -> 9, MND_STREAM_OLD_TRIGGER_COLS=8 marks legacy plan - mndDef tDecodeSStreamObj sets isOldPlan = (sver == 8) - msmBuildReaderDeployInfo passes isOldPlan to reader
Split trigger/calc filters and reproject old-plan calc output: - streamReader.h: pFilterInfo -> pFilterInfoTrigger + pFilterInfoCalc; isNewCalc helper - streamReader.c: init both filters from triggerAst/calcAst conditions - vnodeStream.c: pick filter per isOldPlan/isCalc; transformDataToCalc for old plan
- Build new dst hash off-lock; swap pointer under short W lock - Fix sizeof(uid) on 32-bit (use sizeof(*uid)) - Cleanup old dst after unlock to minimize critical section - Inner loop var renamed j to avoid shadowing outer i
- parTranslater: fix potential UAF on pre_filter injection failure by setting pCalcSelect->pWhere = NULL after ownership transfer - streamReader.h: extract getResBlock/getScanCols/getFilterInfo helpers to remove repeated dual-mode ternary chains - vnodeStream: route TSDB_DATA_NEW / VTABLE_NEW handlers through the new helpers - streamReader.c: free TSWAP'd uid hashes and dst on qBuildVTableList failure to avoid leaks - drop unused pConditions field; drop misleading comments on TsdbDataNewRequest / VTable variant; remove allowTrowsWhere flag
- parTranslater: switch trows+WHERE rejection to PAR_ERR_JRET so the _return label still frees pDbs and pScanPlanArray. - streamReader: tighten qBuildVTableList NULL checks before TSWAP and drop the redundant cleanup branch (owners are released elsewhere). - vnodeStream: move destroySlotInfo out of the public header into the only translation unit that uses it, normalize compareBlockInfo signature, and explicitly zero pSlotList in BUILD_OPTION.
- parStreamTest.cpp: collapse 9 copy-paste cases into a parameterized
fixture covering A1-A8 invariants with a 9-row table, plus a
dedicated A8 reject case
- streamReaderTsdbV6Test.cpp: add wire codec round-trips for the 8
new ESTriggerPullType variants, an isOldPlan static-grep guard,
and STREAM_RETURN_ROWS{,_TSDB}_NUM threshold checks
- 18-StreamProcessing/90-Optimize-3.4.2/: 4 new e2e pytests covering
A2/A3 pruning, A4 trows-where rejection, A5 vtable unblock, and
C1 history-backfill wire round-trip
destroySlotInfo and compareBlockInfo are file-static helpers in vnodeStream.c so they cannot be linked from the new-stream test target. Reverse-validate the DS C invariants C5 (frees members but not container) and C6 (strict ascending compare on uid) via source-level inspection - same pattern as IsOldPlanIsNotInJsonCodec.
背景
本 PR 为 v3.4.2 流计算优化 RS 的端到端落地,包含两个子项目,需同 PR 同 release 一起合入:
子项目 A:流计算 client 侧 AST 扫描列裁剪
详见
docs/releases/TSDB-v3.4.2-[20260630]/05-设计文档/流计算优化-子项目A-实施计划.md。修复 trigger 扫描计划被 calc-only 列污染、以及
%%trows + pre_filter在 calc 侧丢列的两类问题,并解禁虚拟表 +%%trows + pre_filter组合。子项目 C:Reader 端 TSDB 接口重构 + 虚拟表映射 + dual-mode 兼容
详见
docs/releases/TSDB-v3.4.2-[20260630]/05-设计文档/流计算优化-子项目C-Reader 端 TSDB 接口重构与虚拟表映射 DS.md(v7.4)。现状问题:
tsdbReader,无法独立缓存键、独立列裁剪、独立续拉slotId → colId映射不下发,BlockData 形态与 client AST 期望不一致目标:
STRIGGER_PULL_TSDB_DATA_NEW / _CALC+Next,按(type, gid)缓存tsdbReader_VTABLE_NEW / _VTABLE_NEW_CALC+Next,按(type, uid)双层 hash 缓存STRIGGER_PULL_SET_TABLE_HISTORY,与实时 set table 完全隔离slotId → colId映射「读前下发」:trigger 端通过 set table 下发uidInfoTrigger / uidInfoCalc,reader 端在tsdbReaderOpen之前通过pickSchemasHistory取出映射,转化为cids[] + slotIdList[]注入options.schemas / pSlotList / isSchema=true,由 tsdbReader 内部按 slot 摆列输出(无任何"读后块级转换")STREAM_RETURN_ROWS_TSDB_NUM = 50000,仅作用于 TSDB 接口SCMCreateStreamReq.isOldPlan标记,老流(sver=8)经 mnode 解码后标isOldPlan=true,trigger 仍发新 type,reader 走 dual-mode 兼容路径改动点
子项目 A(client / parser)
createStreamReqBuildCalc中将pre_filter作为 AND 条件克隆注入到 calc 的WHERE(仅当 calc 使用%%trows),过滤列自然进入pScanCols%%trows + pre_filter不支持"的旧守护代码SParseStreamInfo新增allowTrowsWhere标志,区分用户书写 vs 系统注入的 WHERE,绕过translateWhere硬限制st1v虚拟普通表夹具TestStreamScanColPruning_*用例覆盖 state/event/session/count/interval/period ×%%trows/st1× pre_filter/虚拟表子项目 C(reader / streamMsg / mnode / trigger)
isOldPlan字段编解码(encode/decode 对称)SStreamTriggerReaderInfo / SStreamReaderTaskInner扩展 F5–F9 + F11 字段;pickSchemasHistory实现读前下发(type, gid|uid)的tsdbReader缓存与 lifecycle、虚拟表双层 hash + free callback 级联、dual-mode WAL handler(实时路径按isOldPlan路由)、transformDataToCalchelper(旧 plan calc 兼容搬运)tEncode/Decodesver=8/9 dual-mode(isOldPlanin-memory only,不落 SDB)streamTriggerTask.c:7438+ / 7757+ / 8056+ / 8543+ / 8682+等)验证
风险
tsdbReader个数随并发 uid 数线性增长;通过vSetTableListHistory大小约束 + session 结束统一释放控制