33from __future__ import annotations
44
55from asyncio import (
6- CancelledError ,
76 Future ,
87 Task ,
98 ensure_future ,
2524)
2625
2726if TYPE_CHECKING :
28- from collections .abc import AsyncGenerator , Awaitable , Generator , Iterable , Sequence
27+ from collections .abc import Awaitable , Generator , Iterable , Sequence
2928
3029 from ..error .graphql_error import GraphQLError
3130 from .types import (
@@ -48,7 +47,7 @@ class IncrementalGraph:
4847
4948 _root_nodes : dict [SubsequentResultRecord , None ]
5049 _completed_queue : list [IncrementalDataRecordResult ]
51- _next_queue : list [Future [Iterable [IncrementalDataRecordResult ]]]
50+ _next_queue : list [Future [Iterable [IncrementalDataRecordResult ] | None ]]
5251
5352 _tasks : set [Task [Any ]]
5453
@@ -87,24 +86,31 @@ def add_completed_reconcilable_deferred_grouped_field_set(
8786 incremental_data_records , deferred_records
8887 )
8988
90- async def completed_incremental_data (
89+ def current_completed_batch (
9190 self ,
92- ) -> AsyncGenerator [Iterable [IncrementalDataRecordResult ], None ]:
93- """Asynchronously yield completed incremental data record results."""
91+ ) -> Generator [IncrementalDataRecordResult , None , None ]:
92+ """Yield the current completed batch of incremental data record results."""
93+ queue = self ._completed_queue
94+ while queue :
95+ yield queue .pop (0 )
96+ if not self ._root_nodes :
97+ self .abort ()
98+
99+ def next_completed_batch (
100+ self ,
101+ ) -> Future [Iterable [IncrementalDataRecordResult ] | None ]:
102+ """Return a future that resolves to the next completed batch."""
94103 loop = get_running_loop ()
95- while True :
96- if self ._completed_queue :
97- first_result = self ._completed_queue .pop (0 )
98- yield self ._yield_current_completed_incremental_data (first_result )
99- else :
100- future : Future [Iterable [IncrementalDataRecordResult ]] = (
101- loop .create_future ()
102- )
103- self ._next_queue .append (future )
104- try :
105- yield await future
106- except CancelledError :
107- break # pragma: no cover
104+ future : Future [Iterable [IncrementalDataRecordResult ] | None ] = (
105+ loop .create_future ()
106+ )
107+ self ._next_queue .append (future )
108+ return future
109+
110+ def abort (self ) -> None :
111+ """Abort the incremental graph execution."""
112+ for resolve in self ._next_queue :
113+ resolve .set_result (None )
108114
109115 def has_next (self ) -> bool :
110116 """Check if there are more results to process."""
@@ -332,11 +338,7 @@ def _yield_current_completed_incremental_data(
332338 ) -> Generator [IncrementalDataRecordResult , None , None ]:
333339 """Yield the current completed incremental data."""
334340 yield first_result
335- queue = self ._completed_queue
336- while queue :
337- yield queue .pop (0 )
338- if not self ._root_nodes :
339- self .stop_incremental_data ()
341+ yield from self .current_completed_batch ()
340342
341343 def _enqueue (self , completed : IncrementalDataRecordResult ) -> None :
342344 """Enqueue completed incremental data record result."""
0 commit comments