Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
public class SimpleParDoFn<InputT, OutputT, W extends BoundedWindow> implements ParDoFn {
private final SimpleParDoFnHelpers<InputT, OutputT, W> helpers;
private @Nullable StreamingSideInputProcessor<InputT, W> sideInputProcessor;
private boolean activeKey = false;

/** Creates a {@link SimpleParDoFn} using basic information about the step being executed. */
SimpleParDoFn(
Expand Down Expand Up @@ -91,12 +92,10 @@ public void startBundle(Receiver... receivers) throws Exception {
}

protected void onStartKey() {
// TODO(relax): This assumes single-key bundles, which will change! Refactor this to not make
// this assumption.
if (helpers.hasStreamingSideInput) {
sideInputProcessor =
new StreamingSideInputProcessor<>(
new StreamingSideInputFetcher<InputT, W>(
new StreamingSideInputFetcher<>(
helpers.fnInfo.getSideInputViews(),
helpers.fnInfo.getInputCoder(),
(WindowingStrategy<?, W>) helpers.fnInfo.getWindowingStrategy(),
Expand All @@ -118,16 +117,20 @@ protected void onStartKey() {
}
});
}
activeKey = true;
}

@Override
@SuppressWarnings("unchecked")
public void processElement(Object untypedElem) throws Exception {
if (helpers.fnRunner == null) {
boolean initialize = !activeKey || helpers.fnRunner == null;
if (initialize) {
// If we need to run reallyStartBundle in here, we need to make sure to switch the state
// sampler into the start state.
try (Closeable start = helpers.operationContext.enterStart()) {
helpers.reallyStartBundle();
if (helpers.fnRunner == null) {
helpers.reallyStartBundle();
}
onStartKey();
}
}
Expand Down Expand Up @@ -196,12 +199,15 @@ public void processTimers() throws Exception {
}

@Override
public void finishKey(Object key) throws Exception {}
public void finishKey(Object key) throws Exception {
this.activeKey = false;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC there could be keys with no elements or timers. Should we move the onStartKey call here to handle such keys?

Instead of calling onStartKey in startBundle and processElements, we could call it in processElements, processTimers and finishKey.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call sideInputFetcher.persist(); in finishKey

}

@Override
public void finishBundle() throws Exception {
helpers.finishBundle(sideInputProcessor);
this.sideInputProcessor = null;
this.activeKey = false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class StreamingKeyedWorkItemSideInputParDoFn<K, InputT, OutputT, W extend
private final Coder<InputT> inputCoder;
private final SimpleParDoFnHelpers<KeyedWorkItem<K, InputT>, OutputT, W> helpers;
protected @Nullable StreamingSideInputProcessor<InputT, W> sideInputProcessor;
private boolean activeKey = false;

StreamingKeyedWorkItemSideInputParDoFn(
PipelineOptions options,
Expand Down Expand Up @@ -119,7 +120,9 @@ protected void onStartKey() {
if (sideInputProcessor != null) {
boolean hasState = helpers.hasState();

// TODO(relax): We should be able to get this without writing it to state!
// TODO(relax): We should be able to get this without writing it to state! To make this work,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finishKey has the key. If we move onStartKey to finishKey, we should be able to make this optimization.

// we need a
// runner-invoked startKey that passes us the key.
@Nullable K key = keyValue().read();
if (key != null) {
sideInputProcessor.tryUnblockElementsAndTimers(
Expand All @@ -143,16 +146,20 @@ protected void onStartKey() {
});
}
}
activeKey = true;
}

@Override
@SuppressWarnings("unchecked")
public void processElement(Object untypedElem) throws Exception {
if (helpers.fnRunner == null) {
boolean initialize = !activeKey || helpers.fnRunner == null;
if (initialize) {
// If we need to run reallyStartBundle in here, we need to make sure to switch the state
// sampler into the start state.
try (Closeable start = helpers.operationContext.enterStart()) {
helpers.reallyStartBundle();
if (helpers.fnRunner == null) {
helpers.reallyStartBundle();
}
onStartKey();
}
}
Comment on lines 154 to 165

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In StreamingKeyedWorkItemSideInputParDoFn, onStartKey() reads the current key from state using keyValue().read(). However, when onStartKey() is called during processElement initialization, the new key has not yet been written to keyValue() (which normally happens later in onProcessWindowedValue). This means keyValue().read() will either return null (if the key is being processed for the first time) or the key from a previous bundle, leading to incorrect behavior or missed unblocking of elements/timers for the new key.

To fix this, we should extract the key from untypedElem and write it to keyValue() before calling onStartKey() in processElement.

Suggested change
public void processElement(Object untypedElem) throws Exception {
if (helpers.fnRunner == null) {
boolean initialize = !activeKey || helpers.fnRunner == null;
if (initialize) {
// If we need to run reallyStartBundle in here, we need to make sure to switch the state
// sampler into the start state.
try (Closeable start = helpers.operationContext.enterStart()) {
helpers.reallyStartBundle();
if (helpers.fnRunner == null) {
helpers.reallyStartBundle();
}
onStartKey();
}
}
public void processElement(Object untypedElem) throws Exception {
boolean initialize = !activeKey || helpers.fnRunner == null;
if (initialize) {
// If we need to run reallyStartBundle in here, we need to make sure to switch the state
// sampler into the start state.
try (Closeable start = helpers.operationContext.enterStart()) {
if (helpers.fnRunner == null) {
helpers.reallyStartBundle();
}
WindowedValue<KeyedWorkItem<K, InputT>> elem = (WindowedValue<KeyedWorkItem<K, InputT>>) untypedElem;
keyValue().write(elem.getValue().key());
onStartKey();
}
}

Expand Down Expand Up @@ -194,12 +201,15 @@ public void processTimers() throws Exception {
}

@Override
public void finishKey(Object key) throws Exception {}
public void finishKey(Object key) throws Exception {
this.activeKey = false;
}

@Override
public void finishBundle() throws Exception {
helpers.finishBundle(sideInputProcessor);
this.sideInputProcessor = null;
this.activeKey = false;
}

@Override
Expand All @@ -208,7 +218,7 @@ public void abort() throws Exception {
}

protected void onProcessWindowedValue(WindowedValue<KeyedWorkItem<K, InputT>> elem) {
// TODO: Get rid of this!
// TODO: Get rid of this once we know the current key.
final K key = elem.getValue().key();
keyValue().write(key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class StreamingSideInputDoFnRunner<InputT, OutputT, W extends BoundedWind
implements DoFnRunner<InputT, OutputT> {
private final DoFnRunner<InputT, OutputT> simpleDoFnRunner;
private final StreamingSideInputProcessor<InputT, W> sideInputProcessor;
boolean activeKey = false;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The activeKey field has default (package-private) visibility. It should be declared as private to maintain proper encapsulation and consistency with other classes like SimpleParDoFn and StreamingKeyedWorkItemSideInputParDoFn.

Suggested change
boolean activeKey = false;
private boolean activeKey = false;


public StreamingSideInputDoFnRunner(
DoFnRunner<InputT, OutputT> simpleDoFnRunner,
Expand All @@ -49,6 +50,11 @@ public StreamingSideInputDoFnRunner(
@Override
public void startBundle() {
simpleDoFnRunner.startBundle();
tryUnblockElements();
this.activeKey = true;
}

private void tryUnblockElements() {
sideInputProcessor.tryUnblockElements(
unblocked -> {
for (WindowedValue<InputT> elem : unblocked) {
Expand All @@ -59,6 +65,10 @@ public void startBundle() {

@Override
public void processElement(WindowedValue<InputT> compressedElem) {
if (!activeKey) {
tryUnblockElements();
this.activeKey = true;
}
for (Iterator<? extends WindowedValue<InputT>> it =
sideInputProcessor.handleProcessElement(compressedElem);
it.hasNext(); ) {
Expand All @@ -84,12 +94,14 @@ public <KeyT> void onTimer(
@Override
public <KeyT extends @Nullable Object> void finishKey(KeyT key) {
simpleDoFnRunner.finishKey(key);
this.activeKey = false;
}

@Override
public void finishBundle() {
simpleDoFnRunner.finishBundle();
sideInputProcessor.handleFinishBundle();
this.activeKey = false;
}

@Override
Expand Down
Loading