-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Handle multi-key bundles in side-input runner #38988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,6 +52,7 @@ | |
| public class SimpleParDoFn<InputT, OutputT, W extends BoundedWindow> implements ParDoFn { | ||
| private final SimpleParDoFnHelpers<InputT, OutputT, W> helpers; | ||
| private @Nullable StreamingSideInputProcessor<InputT, W> sideInputProcessor; | ||
| private boolean activeKey = false; | ||
|
|
||
| /** Creates a {@link SimpleParDoFn} using basic information about the step being executed. */ | ||
| SimpleParDoFn( | ||
|
|
@@ -91,12 +92,10 @@ public void startBundle(Receiver... receivers) throws Exception { | |
| } | ||
|
|
||
| protected void onStartKey() { | ||
| // TODO(relax): This assumes single-key bundles, which will change! Refactor this to not make | ||
| // this assumption. | ||
| if (helpers.hasStreamingSideInput) { | ||
| sideInputProcessor = | ||
| new StreamingSideInputProcessor<>( | ||
| new StreamingSideInputFetcher<InputT, W>( | ||
| new StreamingSideInputFetcher<>( | ||
| helpers.fnInfo.getSideInputViews(), | ||
| helpers.fnInfo.getInputCoder(), | ||
| (WindowingStrategy<?, W>) helpers.fnInfo.getWindowingStrategy(), | ||
|
|
@@ -118,16 +117,20 @@ protected void onStartKey() { | |
| } | ||
| }); | ||
| } | ||
| activeKey = true; | ||
| } | ||
|
|
||
| @Override | ||
| @SuppressWarnings("unchecked") | ||
| public void processElement(Object untypedElem) throws Exception { | ||
| if (helpers.fnRunner == null) { | ||
| boolean initialize = !activeKey || helpers.fnRunner == null; | ||
| if (initialize) { | ||
| // If we need to run reallyStartBundle in here, we need to make sure to switch the state | ||
| // sampler into the start state. | ||
| try (Closeable start = helpers.operationContext.enterStart()) { | ||
| helpers.reallyStartBundle(); | ||
| if (helpers.fnRunner == null) { | ||
| helpers.reallyStartBundle(); | ||
| } | ||
| onStartKey(); | ||
| } | ||
| } | ||
|
|
@@ -196,12 +199,15 @@ public void processTimers() throws Exception { | |
| } | ||
|
|
||
| @Override | ||
| public void finishKey(Object key) throws Exception {} | ||
| public void finishKey(Object key) throws Exception { | ||
| this.activeKey = false; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we call |
||
| } | ||
|
|
||
| @Override | ||
| public void finishBundle() throws Exception { | ||
| helpers.finishBundle(sideInputProcessor); | ||
| this.sideInputProcessor = null; | ||
| this.activeKey = false; | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -59,6 +59,7 @@ public class StreamingKeyedWorkItemSideInputParDoFn<K, InputT, OutputT, W extend | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private final Coder<InputT> inputCoder; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private final SimpleParDoFnHelpers<KeyedWorkItem<K, InputT>, OutputT, W> helpers; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| protected @Nullable StreamingSideInputProcessor<InputT, W> sideInputProcessor; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private boolean activeKey = false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| StreamingKeyedWorkItemSideInputParDoFn( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| PipelineOptions options, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -119,7 +120,9 @@ protected void onStartKey() { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (sideInputProcessor != null) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| boolean hasState = helpers.hasState(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // TODO(relax): We should be able to get this without writing it to state! | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // TODO(relax): We should be able to get this without writing it to state! To make this work, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. finishKey has the key. If we move onStartKey to finishKey, we should be able to make this optimization. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // we need a | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // runner-invoked startKey that passes us the key. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Nullable K key = keyValue().read(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (key != null) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| sideInputProcessor.tryUnblockElementsAndTimers( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -143,16 +146,20 @@ protected void onStartKey() { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| activeKey = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @SuppressWarnings("unchecked") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public void processElement(Object untypedElem) throws Exception { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (helpers.fnRunner == null) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| boolean initialize = !activeKey || helpers.fnRunner == null; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (initialize) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // If we need to run reallyStartBundle in here, we need to make sure to switch the state | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // sampler into the start state. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try (Closeable start = helpers.operationContext.enterStart()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| helpers.reallyStartBundle(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (helpers.fnRunner == null) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| helpers.reallyStartBundle(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| onStartKey(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
154
to
165
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In To fix this, we should extract the key from
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -194,12 +201,15 @@ public void processTimers() throws Exception { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public void finishKey(Object key) throws Exception {} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public void finishKey(Object key) throws Exception { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.activeKey = false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public void finishBundle() throws Exception { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| helpers.finishBundle(sideInputProcessor); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.sideInputProcessor = null; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.activeKey = false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -208,7 +218,7 @@ public void abort() throws Exception { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| protected void onProcessWindowedValue(WindowedValue<KeyedWorkItem<K, InputT>> elem) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // TODO: Get rid of this! | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // TODO: Get rid of this once we know the current key. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| final K key = elem.getValue().key(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| keyValue().write(key); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -38,6 +38,7 @@ public class StreamingSideInputDoFnRunner<InputT, OutputT, W extends BoundedWind | |||||
| implements DoFnRunner<InputT, OutputT> { | ||||||
| private final DoFnRunner<InputT, OutputT> simpleDoFnRunner; | ||||||
| private final StreamingSideInputProcessor<InputT, W> sideInputProcessor; | ||||||
| boolean activeKey = false; | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||
|
|
||||||
| public StreamingSideInputDoFnRunner( | ||||||
| DoFnRunner<InputT, OutputT> simpleDoFnRunner, | ||||||
|
|
@@ -49,6 +50,11 @@ public StreamingSideInputDoFnRunner( | |||||
| @Override | ||||||
| public void startBundle() { | ||||||
| simpleDoFnRunner.startBundle(); | ||||||
| tryUnblockElements(); | ||||||
| this.activeKey = true; | ||||||
| } | ||||||
|
|
||||||
| private void tryUnblockElements() { | ||||||
| sideInputProcessor.tryUnblockElements( | ||||||
| unblocked -> { | ||||||
| for (WindowedValue<InputT> elem : unblocked) { | ||||||
|
|
@@ -59,6 +65,10 @@ public void startBundle() { | |||||
|
|
||||||
| @Override | ||||||
| public void processElement(WindowedValue<InputT> compressedElem) { | ||||||
| if (!activeKey) { | ||||||
| tryUnblockElements(); | ||||||
| this.activeKey = true; | ||||||
| } | ||||||
| for (Iterator<? extends WindowedValue<InputT>> it = | ||||||
| sideInputProcessor.handleProcessElement(compressedElem); | ||||||
| it.hasNext(); ) { | ||||||
|
|
@@ -84,12 +94,14 @@ public <KeyT> void onTimer( | |||||
| @Override | ||||||
| public <KeyT extends @Nullable Object> void finishKey(KeyT key) { | ||||||
| simpleDoFnRunner.finishKey(key); | ||||||
| this.activeKey = false; | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public void finishBundle() { | ||||||
| simpleDoFnRunner.finishBundle(); | ||||||
| sideInputProcessor.handleFinishBundle(); | ||||||
| this.activeKey = false; | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC there could be keys with no elements or timers. Should we move the
onStartKeycall here to handle such keys?Instead of calling
onStartKeyin startBundle and processElements, we could call it in processElements, processTimers and finishKey.