Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void TColumnShard::TrySwitchToWork(const TActorContext& ctx) {

void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
using namespace NOlap::NReader;
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(YDB_CS_READER));
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(YDB_CS_SCAN));
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(YDB_CS_DATA_SOURCE));
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(YDB_CS));
StartInstant = TMonotonic::Now();
Expand Down
41 changes: 39 additions & 2 deletions ydb/core/tx/columnshard/columnshard_private_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,53 @@ struct TEvPrivate {
private:
TConclusion<std::shared_ptr<NOlap::NReader::IApplyAction>> Result;
TCounterGuard ScanCounter;
ui64 SourceId = 0;
ui64 BlobBytes = 0;
ui64 RawBytes = 0;
ui32 ColumnsCount = 0;
ui32 FilteredRows = 0;
ui32 TotalRows = 0;

public:
TConclusion<std::shared_ptr<NOlap::NReader::IApplyAction>>& MutableResult() {
return Result;
}

ui64 GetSourceId() const {
return SourceId;
}

ui64 GetBlobBytes() const {
return BlobBytes;
}

ui64 GetRawBytes() const {
return RawBytes;
}

ui32 GetColumnsCount() const {
return ColumnsCount;
}

ui32 GetFilteredRows() const {
return FilteredRows;
}

ui32 GetTotalRows() const {
return TotalRows;
}

TEvTaskProcessedResult(
TConclusion<std::shared_ptr<NOlap::NReader::IApplyAction>>&& result, TCounterGuard&& scanCounters)
TConclusion<std::shared_ptr<NOlap::NReader::IApplyAction>>&& result, TCounterGuard&& scanCounters, ui64 sourceId = 0,
ui64 blobBytes = 0, ui64 rawBytes = 0, ui32 columnsCount = 0, ui32 filteredRows = 0, ui32 totalRows = 0)
: Result(std::move(result))
, ScanCounter(std::move(scanCounters)) {
, ScanCounter(std::move(scanCounters))
, SourceId(sourceId)
, BlobBytes(blobBytes)
, RawBytes(rawBytes)
, ColumnsCount(columnsCount)
, FilteredRows(filteredRows)
, TotalRows(totalRows) {
}
};

Expand Down
7 changes: 6 additions & 1 deletion ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
#include <ydb/core/tx/columnshard/engines/scheme/tiering/tier_info.h>
#include <ydb/core/tx/columnshard/resource_subscriber/container.h>
#include <ydb/core/tx/columnshard/tx_reader/abstract.h>

namespace NLWTrace {
class TOrbit;
}

namespace NKikimr::NColumnShard {
class TTiersManager;
} // namespace NKikimr::NColumnShard
Expand Down Expand Up @@ -160,7 +165,7 @@ class IColumnEngine {
}
virtual bool IsOverloadedByMetadata(const ui64 limit) const = 0;
virtual std::vector<TSelectedPortionInfo> Select(
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional<THashSet<TInsertWriteId>>& ownPortions) const = 0;
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional<THashSet<TInsertWriteId>>& ownPortions, const std::shared_ptr<NLWTrace::TOrbit>& orbit, ui64 txId = 0, ui64 scanId = 0) const = 0;
virtual std::vector<std::shared_ptr<TColumnEngineChanges>> StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept = 0;
virtual ui64 GetCompactionPriority(const std::set<TInternalPathId>& pathIds,
const std::optional<ui64> waitingPriority) const noexcept = 0;
Expand Down
23 changes: 17 additions & 6 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <ydb/core/tx/columnshard/common/path_id.h>
#include <ydb/core/tx/columnshard/data_locks/manager/manager.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/engines/reader/tracing/probes.h>
#include <ydb/core/tx/columnshard/tracing/probes.h>
#include <ydb/core/tx/columnshard/tx_reader/composite.h>
#include <ydb/core/tx/tiering/manager.h>
Expand All @@ -34,7 +35,7 @@ namespace {

// Windows build workaround
#ifndef LWTRACE_DISABLE
using namespace NKikimr::NColumnShard::LWTRACE_GET_NAMESPACE(YDB_CS);
using namespace NKikimr::NOlap::NReader::LWTRACE_GET_NAMESPACE(YDB_CS_SCAN);
#endif

class TPortionsSelector {
Expand All @@ -46,6 +47,10 @@ class TPortionsSelector {
const bool WithConflicting;
const std::optional<THashSet<TInsertWriteId>>& OwnPortions;
const bool CalculateProbe;
std::shared_ptr<NLWTrace::TOrbit> Orbit;
const ui64 TabletId;
const ui64 TxId;
const ui64 ScanId;

std::vector<TColumnEngineForLogs::TSelectedPortionInfo> Result;
ui64 TotalPortionsCount = 0;
Expand All @@ -54,15 +59,20 @@ class TPortionsSelector {
public:
TPortionsSelector(std::shared_ptr<TGranuleMeta> granuleMeta, TInternalPathId pathId, TSnapshot snapshot,
const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting,
const std::optional<THashSet<TInsertWriteId>>& ownPortions)
const std::optional<THashSet<TInsertWriteId>>& ownPortions, const std::shared_ptr<NLWTrace::TOrbit>& orbit,
ui64 tabletId, ui64 txId, ui64 scanId)
: GranuleMeta(std::move(granuleMeta))
, PathId(pathId)
, Snapshot(snapshot)
, PkRangesFilter(pkRangesFilter)
, WithNonconflicting(withNonconflicting)
, WithConflicting(withConflicting)
, OwnPortions(ownPortions)
, CalculateProbe(LWPROBE_ENABLED(ColumnEngineForLogsSelect))
, CalculateProbe(LWPROBE_ENABLED(ColumnEngineForLogsSelect) || (orbit && orbit->HasShuttles()))
, Orbit(orbit)
, TabletId(tabletId)
, TxId(txId)
, ScanId(scanId)
{ }


Expand All @@ -84,7 +94,8 @@ class TPortionsSelector {
TDuration timeOfCommitted = TAppData::TimeProvider->Now() - start;

if (CalculateProbe) {
LWPROBE(ColumnEngineForLogsSelect, PathId.DebugString(), timeOfInserted.MilliSeconds(), timeOfCommitted.MilliSeconds(),
AFL_VERIFY(Orbit);
LWTRACK(ColumnEngineForLogsSelect, *Orbit, PathId.GetRawValue(), TabletId, TxId, ScanId, timeOfInserted.MilliSeconds(), timeOfCommitted.MilliSeconds(),
TotalPortionsCount, TotalFilteredPortionsCount, Result.size());
Comment on lines +97 to 99
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TPortionsSelector can abort when ColumnEngineForLogsSelect is enabled but orbit is null: CalculateProbe becomes true via LWPROBE_ENABLED(...), then AFL_VERIFY(Orbit) triggers. This is currently reachable (e.g., internal scans build read metadata without setting TReadDescription::Orbit). Consider either requiring/passing a non-null orbit on all Select() call paths, or making probe emission conditional on Orbit (and using LWPROBE when Orbit is null) so tracing can't crash production code.

Suggested change
AFL_VERIFY(Orbit);
LWTRACK(ColumnEngineForLogsSelect, *Orbit, PathId.GetRawValue(), TabletId, TxId, ScanId, timeOfInserted.MilliSeconds(), timeOfCommitted.MilliSeconds(),
TotalPortionsCount, TotalFilteredPortionsCount, Result.size());
if (Orbit) {
LWTRACK(ColumnEngineForLogsSelect, *Orbit, PathId.GetRawValue(), TabletId, TxId, ScanId, timeOfInserted.MilliSeconds(), timeOfCommitted.MilliSeconds(),
TotalPortionsCount, TotalFilteredPortionsCount, Result.size());
} else {
LWPROBE(ColumnEngineForLogsSelect, PathId.GetRawValue(), TabletId, TxId, ScanId, timeOfInserted.MilliSeconds(), timeOfCommitted.MilliSeconds(),
TotalPortionsCount, TotalFilteredPortionsCount, Result.size());
}

Copilot uses AI. Check for mistakes.
}

Expand Down Expand Up @@ -652,15 +663,15 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool up

std::vector<TColumnEngineForLogs::TSelectedPortionInfo> TColumnEngineForLogs::Select(TInternalPathId pathId, TSnapshot snapshot,
const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting,
const std::optional<THashSet<TInsertWriteId>>& ownPortions) const {
const std::optional<THashSet<TInsertWriteId>>& ownPortions, const std::shared_ptr<NLWTrace::TOrbit>& orbit, ui64 txId, ui64 scanId) const {
std::vector<TSelectedPortionInfo> out;

auto granuleMeta = GranulesStorage->GetGranuleOptional(pathId);
if (!granuleMeta) {
return {};
}

return TPortionsSelector(granuleMeta, pathId, snapshot, pkRangesFilter, withNonconflicting, withConflicting, ownPortions).Select();
return TPortionsSelector(granuleMeta, pathId, snapshot, pkRangesFilter, withNonconflicting, withConflicting, ownPortions, orbit, TabletId, txId, scanId).Select();
}

bool TColumnEngineForLogs::StartActualization(const THashMap<TInternalPathId, TTiering>& specialPathEviction) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class TColumnEngineForLogs: public IColumnEngine {
void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override;

std::vector<TSelectedPortionInfo> Select(
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional<THashSet<TInsertWriteId>>& withUncommittedOnlyForTheseWrites) const override;
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional<THashSet<TInsertWriteId>>& withUncommittedOnlyForTheseWrites, const std::shared_ptr<NLWTrace::TOrbit>& orbit, ui64 txId = 0, ui64 scanId = 0) const override;

bool IsPortionExists(const TInternalPathId pathId, const ui64 portionId) const {
return !!GranulesStorage->GetPortionOptional(pathId, portionId);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/metadata_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ std::unique_ptr<NReader::NCommon::ISourcesConstructor> TUserTableAccessor::Selec
// here we select portions for a read
std::vector<IColumnEngine::TSelectedPortionInfo> portions =
context.GetEngine().Select(PathId.InternalPathId, readDescription.GetSnapshot(), *readDescription.PKRangesFilter,
readDescription.readNonconflictingPortions, readDescription.readConflictingPortions, readDescription.ownPortions);
readDescription.readNonconflictingPortions, readDescription.readConflictingPortions, readDescription.ownPortions, context.GetOrbit(), readDescription.TxId, readDescription.ScanId);
if (!isPlain) {
std::deque<NReader::NSimple::TSourceConstructor> sources;
for (auto&& i : portions) {
Expand Down
15 changes: 13 additions & 2 deletions ydb/core/tx/columnshard/engines/metadata_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@

#include <ydb/library/accessor/accessor.h>

#include <memory>

namespace NLWTrace {
class TOrbit;
}

namespace NKikimr::NOlap::NReader {
class TReadDescription;
}
Expand Down Expand Up @@ -67,6 +73,7 @@ class ITableMetadataAccessor {
private:
const NOlap::IPathIdTranslator& PathIdTranslator;
const IColumnEngine& Engine;
std::shared_ptr<NLWTrace::TOrbit> Orbit;

public:
const NOlap::IPathIdTranslator& GetPathIdTranslator() const {
Expand All @@ -75,10 +82,14 @@ class ITableMetadataAccessor {
const IColumnEngine& GetEngine() const {
return Engine;
}
const std::shared_ptr<NLWTrace::TOrbit>& GetOrbit() const {
return Orbit;
}

TSelectMetadataContext(const NOlap::IPathIdTranslator& pathIdTranslator, const IColumnEngine& engine)
TSelectMetadataContext(const NOlap::IPathIdTranslator& pathIdTranslator, const IColumnEngine& engine, const std::shared_ptr<NLWTrace::TOrbit>& orbit)
: PathIdTranslator(pathIdTranslator)
, Engine(engine) {
, Engine(engine)
, Orbit(orbit) {
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ TReadContext::TReadContext(const std::shared_ptr<IStoragesManager>& storagesMana
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<NColumnFetching::TColumnDataManager>& columnDataManager,
const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId, const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId,
const TComputeShardingPolicy& computeShardingPolicy, const ui64 scanId, const NConveyorComposite::TCPULimitsConfig& cpuLimits)
const TComputeShardingPolicy& computeShardingPolicy, const ui64 scanId, const NConveyorComposite::TCPULimitsConfig& cpuLimits, const std::shared_ptr<NLWTrace::TOrbit>& scanOrbit)
: StoragesManager(storagesManager)
, DataAccessorsManager(dataAccessorsManager)
, ColumnDataManager(columnDataManager)
Expand All @@ -27,6 +27,7 @@ TReadContext::TReadContext(const std::shared_ptr<IStoragesManager>& storagesMana
, ReadCoordinatorActorId(readCoordinatorActorId)
, ComputeShardingPolicy(computeShardingPolicy)
, ConveyorProcessGuard(NConveyorComposite::TScanServiceOperator::StartProcess(ScanId, cpuLimits.GetCPUGroupNameDef(NResourcePool::DEFAULT_POOL_ID), cpuLimits))
, ScanOrbit(scanOrbit)
{
Y_ABORT_UNLESS(ReadMetadata);
if (ReadMetadata->HasResultSchema()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include <ydb/library/accessor/accessor.h>

#include <library/cpp/lwtrace/shuttle.h>

namespace NKikimr::NOlap::NReader {

class TPartialSourceAddress;
Expand Down Expand Up @@ -65,6 +67,7 @@ class TReadContext {
std::shared_ptr<const TAtomicCounter> ConstAbortionFlag = AbortionFlag;
const NConveyorComposite::TProcessGuard ConveyorProcessGuard;
std::shared_ptr<NArrow::NSSA::IColumnResolver> Resolver;
std::shared_ptr<NLWTrace::TOrbit> ScanOrbit;

public:
const NArrow::NSSA::IColumnResolver* GetResolver() const {
Expand Down Expand Up @@ -150,12 +153,16 @@ class TReadContext {
return ResourcesTaskContext;
}

const std::shared_ptr<NLWTrace::TOrbit>& GetScanOrbit() const {
return ScanOrbit;
}

TReadContext(const std::shared_ptr<IStoragesManager>& storagesManager,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<NColumnFetching::TColumnDataManager>& columnDataManager, const NColumnShard::TConcreteScanCounters& counters,
const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId, const TActorId& resourceSubscribeActorId,
const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy, const ui64 scanId,
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TReadContext ctor signature now requires scanOrbit, but at least one call site still uses the old signature (e.g. ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ut/ut_manager.cpp:291 constructs TReadContext without the new argument). This will break the build/tests; please update remaining call sites (and decide what orbit to pass in unit tests, e.g. std::make_shared<NLWTrace::TOrbit>() or nullptr if the code tolerates it).

Suggested change
const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy, const ui64 scanId,
const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy, const ui64 scanId,
const NConveyorComposite::TCPULimitsConfig& cpuLimits)
: TReadContext(storagesManager, dataAccessorsManager, columnDataManager, counters, readMetadata, scanActorId,
resourceSubscribeActorId, readCoordinatorActorId, computeShardingPolicy, scanId, cpuLimits, nullptr) {
}
TReadContext(const std::shared_ptr<IStoragesManager>& storagesManager,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<NColumnFetching::TColumnDataManager>& columnDataManager, const NColumnShard::TConcreteScanCounters& counters,
const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId, const TActorId& resourceSubscribeActorId,
const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy, const ui64 scanId,

Copilot uses AI. Check for mistakes.
const NConveyorComposite::TCPULimitsConfig& cpuLimits);
const NConveyorComposite::TCPULimitsConfig& cpuLimits, const std::shared_ptr<NLWTrace::TOrbit>& scanOrbit);
};

class IDataReader {
Expand Down
28 changes: 20 additions & 8 deletions ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ NKqp::TScanStatistics TColumnShardScan::GetScanStats() {
return stats;
}

LWTRACE_USING(YDB_CS_READER);
LWTRACE_USING(YDB_CS_SCAN);

constexpr TDuration SCAN_HARD_TIMEOUT = TDuration::Minutes(60);
constexpr TDuration COMPUTE_HARD_TIMEOUT = TDuration::Minutes(10);

void TColumnShardScan::PassAway() {
TDuration duration = StartInstant ? TDuration::MilliSeconds((TMonotonic::Now() - *StartInstant).MilliSeconds()) : TDuration::Zero();
LWTRACK(ScanFinished, *ScanOrbit, PathId, TabletId, TxId, ScanId, duration, TotalRowsCount, TotalSourcesCount, TotalBlobBytes, TotalRawBytes);
Send(ResourceSubscribeActorId, new TEvents::TEvPoisonPill);
Send(ReadCoordinatorActorId, new TEvents::TEvPoisonPill);
IActor::PassAway();
Expand All @@ -54,10 +56,13 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
const std::shared_ptr<NColumnFetching::TColumnDataManager>& columnDataManager, const TComputeShardingPolicy& computeShardingPolicy,
ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie, ui64 tabletId, TDuration timeout,
const TReadMetadataBase::TConstPtr& readMetadataRange, NKikimrDataEvents::EDataFormat dataFormat,
const NColumnShard::TScanCounters& scanCountersPool, const NConveyorComposite::TCPULimitsConfig& cpuLimits)
const NColumnShard::TScanCounters& scanCountersPool, const NConveyorComposite::TCPULimitsConfig& cpuLimits,
std::shared_ptr<NLWTrace::TOrbit> orbit, ui64 pathId)
: StoragesManager(storagesManager)
, DataAccessorsManager(dataAccessorsManager)
, ColumnDataManager(columnDataManager)
, ScanOrbit(std::move(orbit))
, PathId(pathId)
, ColumnShardActorId(columnShardActorId)
, ScanComputeActorId(scanComputeActorId)
, ScanDiagnosticsActorId(scanDiagnosticsActorId)
Expand Down Expand Up @@ -91,7 +96,7 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {

std::shared_ptr<TReadContext> context =
std::make_shared<TReadContext>(StoragesManager, DataAccessorsManager, ColumnDataManager, ScanCountersPool, ReadMetadataRange, SelfId(),
ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy, ScanId, CPULimits);
ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy, ScanId, CPULimits, ScanOrbit);
ScanIterator = ReadMetadataRange->StartScan(context);
auto startResult = ScanIterator->Start();
StartInstant = TMonotonic::Now();
Expand All @@ -118,7 +123,14 @@ void TColumnShardScan::HandleScan(NColumnShard::TEvPrivate::TEvTaskProcessedResu
WaitTime += delta;
}
StartWaitTime = TInstant::Now();
LWPROBE(TaskProcessed, TabletId, ScanId, TxId, delta);
++TotalSourcesCount;
TotalBlobBytes += ev->Get()->GetBlobBytes();
TotalRawBytes += ev->Get()->GetRawBytes();
TotalRowsCount += ev->Get()->GetFilteredRows();
if (ev->Get()->GetSourceId() > 0) {
LWTRACK(ScanFinishSource, *ScanOrbit, PathId, TabletId, TxId, ScanId, (ui64)ev->Get()->GetSourceId(),
ev->Get()->GetBlobBytes(), ev->Get()->GetRawBytes(), ev->Get()->GetColumnsCount(), ev->Get()->GetFilteredRows(), ev->Get()->GetTotalRows());
}
auto g = Stats->MakeGuard("task_result", IS_INFO_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN));
auto& result = ev->Get()->MutableResult();
if (result.IsFail()) {
Expand All @@ -139,7 +151,7 @@ void TColumnShardScan::HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev)
StartWaitTime = TInstant::Now();
auto g = Stats->MakeGuard("ack", IS_INFO_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN));

LWPROBE(AckReceived, TabletId, ScanId, TxId, LastResultInstant ? TDuration::MilliSeconds((TMonotonic::Now() - *LastResultInstant).MilliSeconds()) : TDuration::Zero());
LWTRACK(AckReceived, *ScanOrbit, PathId, TabletId, TxId, ScanId, LastResultInstant ? TDuration::MilliSeconds((TMonotonic::Now() - *LastResultInstant).MilliSeconds()) : TDuration::Zero());

AFL_VERIFY(!AckReceivedInstant);
AckReceivedInstant = TMonotonic::Now();
Expand Down Expand Up @@ -322,7 +334,7 @@ bool TColumnShardScan::ProduceResults() noexcept {
Result->LastKey = ConvertLastKey(CurrentLastReadKey->GetPKCursor()->ToBatch());
}
Result->LastCursorProto = CurrentLastReadKey->SerializeToProto();
SendResult(false, false);
SendResult(false, false, result.GetSourceId());
ScanIterator->OnSentDataFromInterval(result.GetNotFinishedInterval());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("stage", "finished")("iterator", ScanIterator->DebugString());
return true;
Expand Down Expand Up @@ -406,7 +418,7 @@ NKikimr::TOwnedCellVec TColumnShardScan::ConvertLastKey(const std::shared_ptr<ar
return singleRowWriter.Row;
}

bool TColumnShardScan::SendResult(bool pageFault, bool lastBatch) {
bool TColumnShardScan::SendResult(bool pageFault, bool lastBatch, ui64 sourceId) {
if (Finished) {
return true;
}
Expand Down Expand Up @@ -456,7 +468,7 @@ bool TColumnShardScan::SendResult(bool pageFault, bool lastBatch) {

Result->ArrowBatch = NArrow::ClaimMemoryOwnership(Result->ArrowBatch);

LWPROBE(SendResult, TabletId, ScanId, TxId, Result->GetRowsCount(), (Result->ArrowBatch ? NArrow::GetTableDataSize(Result->ArrowBatch) : 0), Result->CpuTime, Result->WaitTime, TInstant::Now() - LastSend, Result->Finished);
LWTRACK(SendResult, *ScanOrbit, PathId, TabletId, TxId, ScanId, sourceId, Result->GetRowsCount(), (Result->ArrowBatch ? NArrow::GetTableDataSize(Result->ArrowBatch) : 0), Result->CpuTime, Result->WaitTime, TInstant::Now() - LastSend, Result->Finished);
Send(ScanComputeActorId, Result.Release(), IEventHandle::FlagTrackDelivery); // TODO: FlagSubscribeOnSession ?
LastSend = TInstant::Now();

Expand Down
Loading
Loading