Skip to content

Conversation

@gianm
Copy link
Contributor

@gianm gianm commented Dec 29, 2025

This patch integrates MSQ with virtual storage. It also refactors how MSQ reads inputs to give stages more control over how inputs are read and merged. In particular, stages are now able to fully control merging logic.

The main changes:

  1. Integrate with virtual storage. Removed DataSegmentProvider, replaced it with direct usage of SegmentManager by SegmentsInputSliceReader. The SegmentManager reference ends up wrapped into RegularLoadableSegment, which provides methods acquire() and acquireIfCached() to the query logic.

  2. Give stages control over input merging: rework InputSliceReader to return ReadablePartitions directly, without embedding any merging logic. Break out StandardPartitionReader as a separate class.

Other changes:

  1. Move ReadableInput to the querykit package. It is no longer specific to the MSQ framework.

  2. Remove StandardStageProcessor, refactoring dependent code to not require it.

  3. Remove ExternalColumnSelectorFactory wrapper. Type casting is now handled directly by RowBasedColumnSelectorFactory.

  4. Include full query context in worker context, rather than just a subset.

The purpose of this config is to enable using SegmentLocalCacheManager
for loading segments on MSQ worker tasks, where segments are not
assigned by load/drop rules, and where there is not generally a specific
maxSize configured for the local cache. We need to evict segments
immediately so local disks don't fill up.

The main changes:

1) In StorageLocation, update the releaseHold runnable to check for
   evictImmediately. If it is set, unmount the cache entry if all holds
   have been released.

2) In SegmentLocalCacheManager, when evictImmediately is set, "mount"
   sets an onUnmount handler to delete the info file.
This patch integrates MSQ with virtual storage. It also refactors how MSQ
reads inputs to give stages more control over how inputs are read and merged.
In particular, stages are now able to fully control merging logic.

The main changes:

1) Integrate with virtual storage: merge the two DataSegmentProvider impls
   (Dart and Task) into DataSegmentProviderImpl that relies on SegmentManager.

2) Give stages control over input merging: rework InputSliceReader to return
   ReadablePartitions directly, without embedding any merging logic. Break out
   StandardPartitionReader as a separate class.

Other changes:

1) Move ReadableInput to the querykit package. It is no longer specific to the
   MSQ framework.

2) Remove StandardStageProcessor, refactoring dependent code to not require it.

3) Remove ExternalColumnSelectorFactory wrapper. Type casting is now handled
   directly by RowBasedColumnSelectorFactory.

4) Include full query context in worker context, rather than just a subset.

Includes apache#18871.
@github-actions github-actions bot added Area - Batch Ingestion Area - Segment Format and Ser/De Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Dec 29, 2025
for (AcquireSegmentAction acquireSegmentAction : loadingSegments) {
CloseableUtils.closeAndSuppressExceptions(
acquireSegmentAction,
e -> log.warn(e, "Failed to close loadingSegment[%s]", acquireSegmentAction)

Check notice

Code scanning / CodeQL

Use of default toString() Note

Default toString(): AcquireSegmentAction inherits toString() from Object, and so is not suitable for printing.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks legit, maybe we should decorate AcquireSegmentAction to have a segmentId or something and implement toString

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that but didn't want to add a field to AcquireSegmentAction just for this. I was going to leave it the way it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment about this though.

final int partitionNum = i % 2;

segments.add(
DataSegment.builder()

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note test

Invoking
DataSegment.builder
should be avoided because it has been deprecated.
*/
public static DataSegment createDataSegmentForTest(final SegmentId segmentId)
{
return DataSegment.builder()

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note test

Invoking
DataSegment.builder
should be avoided because it has been deprecated.
Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm 🤘

);
final SegmentLoaderConfig loaderConfig =
new SegmentLoaderConfig()
.setLocations(Collections.singletonList(new StorageLocationConfig(storageDir, null, null)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know this isn't new, but should this have a size so like... there is a guard rail to not just wreck things if something tries to load too much? There is tmpStorageBytesPerTask already on the task config, but that is used for tmp sorter output it looks like so.. maybe those files should be managed with StorageLocation too. This doesn't need to be done in this PR, just thinking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping to manage more stuff through the storage locations as time goes on and that would be a good time to unify things. The other thing that isn't currently tracked is temporary storage of external data that has been downloaded.

Comment on lines +95 to +97
// The LoadableSegment generated here does not acquire a real hold, and ends up loading the external data in a
// processing thread (when the cursor is created). Ideally, this would be better integrated with the virtual
// storage system, giving us storage holds and the ability to load data outside of a processing thread.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 it feels like this should be possible, StorageLocation is pretty generic, but agree we can worry about this later

* throws an exception. Regardless of success or failure of this method, the old reference should be discarded.
* Do not call {@link #close()} on the old reference, only call it on the new one.
*/
public SegmentReference map(final SegmentMapFunction segmentMapFn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you update the javadoc at the top of this class to indicate that callers can call this if they don't have a mapFn to apply to the segment at the time of creation of the SegmentReference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to:

/**
 * Wrapper for a {@link SegmentDescriptor} and {@link Optional<Segment>}. For regular segments, the latter is typically
 * created by a {@link ReferenceCountedSegmentProvider} that may have a {@link SegmentMapFunction} applied to it.
 *
 * If the {@link SegmentMapFunction} you want to apply is not available at the time the {@link SegmentReference}
 * is created, use {@link #map(SegmentMapFunction)} to apply it.
 *
 * Closing this object closes both the {@link #getSegmentReference()} and any closeables attached from the process of
 * creating this object, such as from {@link AcquireSegmentAction}. The object from {@link #getSegmentReference()}
 * should not be closed directly by callers.
 */

for (AcquireSegmentAction acquireSegmentAction : loadingSegments) {
CloseableUtils.closeAndSuppressExceptions(
acquireSegmentAction,
e -> log.warn(e, "Failed to close loadingSegment[%s]", acquireSegmentAction)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks legit, maybe we should decorate AcquireSegmentAction to have a segmentId or something and implement toString

* An {@link InputSlice} that has been prepared for reading by an {@link InputSliceReader}. Nothing contained in
* this class references open resources, so this class is not closeable and does not need to be closed.
*/
public class PhysicalInputSlice
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does 'physical' mean here? obligatory 'naming is hard' comment. Is it basically 'slice that gets processed to kick everything off by a leaf processor'? If so, should it be LeafInputSlice to match other terminology? though i guess i could see how that could be confusing too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's definitely a "naming is hard" thing.

It's a somewhat-more-physical version of InputSlice. InputSlice is always purely logical (it just has the identifiers / paths of things that should be read, as strings and POJOs) but PhysicalInputSlice may contain the ability to actually read external data or actually load segments.

It's not just "leaf" since it includes partition inputs, which would be used by non-leaf stages.

if (cachedDataSegment != null) {
return Futures.immediateFuture(cachedDataSegment);
} else if (coordinatorClient != null) {
return coordinatorClient.fetchSegment(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i find myself wondering if maybe we can just do something to make SegmentManager more flexible (like say backed with a coordinator client to download datasegments on the fly) to make it so we can just do all the stuff with a segmentManager, but haven't really thought about this and maybe is too weird, either way, i think this is fine for this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Along with the ability to manage non-segment data, like external files from cloud storage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Batch Ingestion Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Segment Format and Ser/De

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants