diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 2fc8fcef44a9..c00e338fd389 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -102,7 +102,7 @@ void TColumnShard::TrySwitchToWork(const TActorContext& ctx) { void TColumnShard::OnActivateExecutor(const TActorContext& ctx) { using namespace NOlap::NReader; - NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(YDB_CS_READER)); + NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(YDB_CS_SCAN)); NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(YDB_CS_DATA_SOURCE)); NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(YDB_CS)); StartInstant = TMonotonic::Now(); diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index 2649257639cd..c24902f45822 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -199,16 +199,54 @@ struct TEvPrivate { private: TConclusion> Result; TCounterGuard ScanCounter; + ui64 SourceId = 0; + ui64 BlobBytes = 0; + ui64 RawBytes = 0; + ui32 FilteredRows = 0; + ui32 TotalRows = 0; + ui64 TotalReservedBytes = 0; public: TConclusion>& MutableResult() { return Result; } + ui64 GetSourceId() const { + return SourceId; + } + + ui64 GetBlobBytes() const { + return BlobBytes; + } + + ui64 GetRawBytes() const { + return RawBytes; + } + + ui32 GetFilteredRows() const { + return FilteredRows; + } + + ui32 GetTotalRows() const { + return TotalRows; + } + + ui64 GetTotalReservedBytes() const { + return TotalReservedBytes; + } + TEvTaskProcessedResult( - TConclusion>&& result, TCounterGuard&& scanCounters) + TConclusion>&& result, TCounterGuard&& scanCounters, ui64 sourceId = 0, + ui64 blobBytes = 0, ui64 rawBytes = 0, ui32 filteredRows = 0, ui32 totalRows = 0, + ui64 totalReservedBytes = 0) : Result(std::move(result)) - , ScanCounter(std::move(scanCounters)) { + , ScanCounter(std::move(scanCounters)) + , SourceId(sourceId) + , BlobBytes(blobBytes) + , RawBytes(rawBytes) + , FilteredRows(filteredRows) + , TotalRows(totalRows) + , TotalReservedBytes(totalReservedBytes) { } }; diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 4289e5a1eba6..91bfffe2482f 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -15,6 +15,11 @@ #include #include #include + +namespace NLWTrace { +class TOrbit; +} + namespace NKikimr::NColumnShard { class TTiersManager; } // namespace NKikimr::NColumnShard @@ -160,7 +165,7 @@ class IColumnEngine { } virtual bool IsOverloadedByMetadata(const ui64 limit) const = 0; virtual std::vector Select( - TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional>& ownPortions) const = 0; + TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional>& ownPortions, const std::shared_ptr& orbit, ui64 txId = 0, ui64 scanId = 0) const = 0; virtual std::vector> StartCompaction(const std::shared_ptr& dataLocksManager) noexcept = 0; virtual ui64 GetCompactionPriority(const std::set& pathIds, const std::optional waitingPriority) const noexcept = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index f1b0e3fe55d8..4c4dbdd410f1 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -34,7 +35,7 @@ namespace { // Windows build workaround #ifndef LWTRACE_DISABLE -using namespace NKikimr::NColumnShard::LWTRACE_GET_NAMESPACE(YDB_CS); +using namespace NKikimr::NOlap::NReader::LWTRACE_GET_NAMESPACE(YDB_CS_SCAN); #endif class TPortionsSelector { @@ -46,6 +47,10 @@ class TPortionsSelector { const bool WithConflicting; const std::optional>& OwnPortions; const bool CalculateProbe; + std::shared_ptr Orbit; + const ui64 TabletId; + const ui64 TxId; + const ui64 ScanId; std::vector Result; ui64 TotalPortionsCount = 0; @@ -54,7 +59,8 @@ class TPortionsSelector { public: TPortionsSelector(std::shared_ptr granuleMeta, TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, - const std::optional>& ownPortions) + const std::optional>& ownPortions, const std::shared_ptr& orbit, + ui64 tabletId, ui64 txId, ui64 scanId) : GranuleMeta(std::move(granuleMeta)) , PathId(pathId) , Snapshot(snapshot) @@ -62,7 +68,11 @@ class TPortionsSelector { , WithNonconflicting(withNonconflicting) , WithConflicting(withConflicting) , OwnPortions(ownPortions) - , CalculateProbe(LWPROBE_ENABLED(ColumnEngineForLogsSelect)) + , CalculateProbe(LWPROBE_ENABLED(ColumnEngineForLogsSelect) || (orbit && orbit->HasShuttles())) + , Orbit(orbit) + , TabletId(tabletId) + , TxId(txId) + , ScanId(scanId) { } @@ -84,7 +94,8 @@ class TPortionsSelector { TDuration timeOfCommitted = TAppData::TimeProvider->Now() - start; if (CalculateProbe) { - LWPROBE(ColumnEngineForLogsSelect, PathId.DebugString(), timeOfInserted.MilliSeconds(), timeOfCommitted.MilliSeconds(), + AFL_VERIFY(Orbit); + LWTRACK(ColumnEngineForLogsSelect, *Orbit, PathId.GetRawValue(), TabletId, TxId, ScanId, timeOfInserted.MilliSeconds(), timeOfCommitted.MilliSeconds(), TotalPortionsCount, TotalFilteredPortionsCount, Result.size()); } @@ -652,7 +663,7 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool up std::vector TColumnEngineForLogs::Select(TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, - const std::optional>& ownPortions) const { + const std::optional>& ownPortions, const std::shared_ptr& orbit, ui64 txId, ui64 scanId) const { std::vector out; auto granuleMeta = GranulesStorage->GetGranuleOptional(pathId); @@ -660,7 +671,7 @@ std::vector TColumnEngineForLogs::Se return {}; } - return TPortionsSelector(granuleMeta, pathId, snapshot, pkRangesFilter, withNonconflicting, withConflicting, ownPortions).Select(); + return TPortionsSelector(granuleMeta, pathId, snapshot, pkRangesFilter, withNonconflicting, withConflicting, ownPortions, orbit, TabletId, txId, scanId).Select(); } bool TColumnEngineForLogs::StartActualization(const THashMap& specialPathEviction) { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 01d52d08c8ee..84cc1a3b862e 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -170,7 +170,7 @@ class TColumnEngineForLogs: public IColumnEngine { void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override; std::vector Select( - TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional>& withUncommittedOnlyForTheseWrites) const override; + TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional>& withUncommittedOnlyForTheseWrites, const std::shared_ptr& orbit, ui64 txId = 0, ui64 scanId = 0) const override; bool IsPortionExists(const TInternalPathId pathId, const ui64 portionId) const { return !!GranulesStorage->GetPortionOptional(pathId, portionId); diff --git a/ydb/core/tx/columnshard/engines/metadata_accessor.cpp b/ydb/core/tx/columnshard/engines/metadata_accessor.cpp index e3581d803557..6d810d41b01d 100644 --- a/ydb/core/tx/columnshard/engines/metadata_accessor.cpp +++ b/ydb/core/tx/columnshard/engines/metadata_accessor.cpp @@ -44,7 +44,7 @@ std::unique_ptr TUserTableAccessor::Selec // here we select portions for a read std::vector portions = context.GetEngine().Select(PathId.InternalPathId, readDescription.GetSnapshot(), *readDescription.PKRangesFilter, - readDescription.readNonconflictingPortions, readDescription.readConflictingPortions, readDescription.ownPortions); + readDescription.readNonconflictingPortions, readDescription.readConflictingPortions, readDescription.ownPortions, context.GetOrbit(), readDescription.TxId, readDescription.ScanId); if (!isPlain) { std::deque sources; for (auto&& i : portions) { diff --git a/ydb/core/tx/columnshard/engines/metadata_accessor.h b/ydb/core/tx/columnshard/engines/metadata_accessor.h index fc2eb96cf571..56b1a2783db4 100644 --- a/ydb/core/tx/columnshard/engines/metadata_accessor.h +++ b/ydb/core/tx/columnshard/engines/metadata_accessor.h @@ -8,6 +8,12 @@ #include +#include + +namespace NLWTrace { +class TOrbit; +} + namespace NKikimr::NOlap::NReader { class TReadDescription; } @@ -67,6 +73,7 @@ class ITableMetadataAccessor { private: const NOlap::IPathIdTranslator& PathIdTranslator; const IColumnEngine& Engine; + std::shared_ptr Orbit; public: const NOlap::IPathIdTranslator& GetPathIdTranslator() const { @@ -75,10 +82,14 @@ class ITableMetadataAccessor { const IColumnEngine& GetEngine() const { return Engine; } + const std::shared_ptr& GetOrbit() const { + return Orbit; + } - TSelectMetadataContext(const NOlap::IPathIdTranslator& pathIdTranslator, const IColumnEngine& engine) + TSelectMetadataContext(const NOlap::IPathIdTranslator& pathIdTranslator, const IColumnEngine& engine, const std::shared_ptr& orbit) : PathIdTranslator(pathIdTranslator) - , Engine(engine) { + , Engine(engine) + , Orbit(orbit) { } }; diff --git a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp index 3b2965cfd9b7..488459acbd85 100644 --- a/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp +++ b/ydb/core/tx/columnshard/engines/portions/data_accessor.cpp @@ -358,6 +358,24 @@ ui64 TPortionDataAccessor::GetIndexRawBytes(const std::set& entityIds, con return sum; } +ui64 TPortionDataAccessor::GetIndexBlobBytes(const std::set& entityIds, const bool validation /*= true*/) const { + ui64 sum = 0; + const auto aggr = [&](const TIndexChunk& r) { + sum += r.GetDataSize(); + }; + AggregateIndexChunksData(aggr, GetIndexesVerified(), &entityIds, validation); + return sum; +} + +ui64 TPortionDataAccessor::GetIndexBlobBytes(const bool validation /*= true*/) const { + ui64 sum = 0; + const auto aggr = [&](const TIndexChunk& r) { + sum += r.GetDataSize(); + }; + AggregateIndexChunksData(aggr, GetIndexesVerified(), nullptr, validation); + return sum; +} + ui64 TPortionDataAccessor::GetIndexRawBytes(const bool validation /*= true*/) const { ui64 sum = 0; const auto aggr = [&](const TIndexChunk& r) { diff --git a/ydb/core/tx/columnshard/engines/portions/data_accessor.h b/ydb/core/tx/columnshard/engines/portions/data_accessor.h index 60f3c56850fc..d5d3358d7bb3 100644 --- a/ydb/core/tx/columnshard/engines/portions/data_accessor.h +++ b/ydb/core/tx/columnshard/engines/portions/data_accessor.h @@ -468,6 +468,8 @@ class TPortionDataAccessor: public TPortionMetaBase { ui64 GetColumnRawBytes(const std::set& entityIds, const bool validation = true) const; ui64 GetColumnBlobBytes(const std::set& entityIds, const bool validation = true) const; ui64 GetIndexRawBytes(const std::set& entityIds, const bool validation = true) const; + ui64 GetIndexBlobBytes(const std::set& entityIds, const bool validation = true) const; + ui64 GetIndexBlobBytes(const bool validation = true) const; ui64 GetIndexRawBytes(const bool validation = true) const; void FillBlobRangesByStorage( diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.cpp b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.cpp index e7709990f620..c1b6bd788206 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.cpp @@ -14,7 +14,7 @@ TReadContext::TReadContext(const std::shared_ptr& storagesMana const std::shared_ptr& dataAccessorsManager, const std::shared_ptr& columnDataManager, const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId, const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, - const TComputeShardingPolicy& computeShardingPolicy, const ui64 scanId, const NConveyorComposite::TCPULimitsConfig& cpuLimits) + const TComputeShardingPolicy& computeShardingPolicy, const ui64 scanId, const NConveyorComposite::TCPULimitsConfig& cpuLimits, const std::shared_ptr& scanOrbit) : StoragesManager(storagesManager) , DataAccessorsManager(dataAccessorsManager) , ColumnDataManager(columnDataManager) @@ -27,6 +27,7 @@ TReadContext::TReadContext(const std::shared_ptr& storagesMana , ReadCoordinatorActorId(readCoordinatorActorId) , ComputeShardingPolicy(computeShardingPolicy) , ConveyorProcessGuard(NConveyorComposite::TScanServiceOperator::StartProcess(ScanId, cpuLimits.GetCPUGroupNameDef(NResourcePool::DEFAULT_POOL_ID), cpuLimits)) + , ScanOrbit(scanOrbit) { Y_ABORT_UNLESS(ReadMetadata); if (ReadMetadata->HasResultSchema()) { diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h index 60a4741e898f..f6247176ef2e 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h @@ -16,6 +16,8 @@ #include +#include + namespace NKikimr::NOlap::NReader { class TPartialSourceAddress; @@ -65,6 +67,7 @@ class TReadContext { std::shared_ptr ConstAbortionFlag = AbortionFlag; const NConveyorComposite::TProcessGuard ConveyorProcessGuard; std::shared_ptr Resolver; + std::shared_ptr ScanOrbit; public: const NArrow::NSSA::IColumnResolver* GetResolver() const { @@ -150,12 +153,16 @@ class TReadContext { return ResourcesTaskContext; } + const std::shared_ptr& GetScanOrbit() const { + return ScanOrbit; + } + TReadContext(const std::shared_ptr& storagesManager, const std::shared_ptr& dataAccessorsManager, const std::shared_ptr& columnDataManager, const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId, const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy, const ui64 scanId, - const NConveyorComposite::TCPULimitsConfig& cpuLimits); + const NConveyorComposite::TCPULimitsConfig& cpuLimits, const std::shared_ptr& scanOrbit); }; class IDataReader { diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp index 7b2697cdc06b..f8d47eb82bbb 100644 --- a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp @@ -38,12 +38,14 @@ NKqp::TScanStatistics TColumnShardScan::GetScanStats() { return stats; } -LWTRACE_USING(YDB_CS_READER); +LWTRACE_USING(YDB_CS_SCAN); constexpr TDuration SCAN_HARD_TIMEOUT = TDuration::Minutes(60); constexpr TDuration COMPUTE_HARD_TIMEOUT = TDuration::Minutes(10); void TColumnShardScan::PassAway() { + TDuration duration = StartInstant ? TDuration::MilliSeconds((TMonotonic::Now() - *StartInstant).MilliSeconds()) : TDuration::Zero(); + LWTRACK(ScanFinished, *ScanOrbit, PathId, TabletId, TxId, ScanId, duration, TotalRowsCount, TotalPartialSourcesCount, TotalBlobBytes, TotalRawBytes); Send(ResourceSubscribeActorId, new TEvents::TEvPoisonPill); Send(ReadCoordinatorActorId, new TEvents::TEvPoisonPill); IActor::PassAway(); @@ -54,10 +56,13 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc const std::shared_ptr& columnDataManager, const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie, ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange, NKikimrDataEvents::EDataFormat dataFormat, - const NColumnShard::TScanCounters& scanCountersPool, const NConveyorComposite::TCPULimitsConfig& cpuLimits) + const NColumnShard::TScanCounters& scanCountersPool, const NConveyorComposite::TCPULimitsConfig& cpuLimits, + std::shared_ptr orbit, ui64 pathId) : StoragesManager(storagesManager) , DataAccessorsManager(dataAccessorsManager) , ColumnDataManager(columnDataManager) + , ScanOrbit(std::move(orbit)) + , PathId(pathId) , ColumnShardActorId(columnShardActorId) , ScanComputeActorId(scanComputeActorId) , ScanDiagnosticsActorId(scanDiagnosticsActorId) @@ -91,7 +96,7 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) { std::shared_ptr context = std::make_shared(StoragesManager, DataAccessorsManager, ColumnDataManager, ScanCountersPool, ReadMetadataRange, SelfId(), - ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy, ScanId, CPULimits); + ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy, ScanId, CPULimits, ScanOrbit); ScanIterator = ReadMetadataRange->StartScan(context); auto startResult = ScanIterator->Start(); StartInstant = TMonotonic::Now(); @@ -118,7 +123,15 @@ void TColumnShardScan::HandleScan(NColumnShard::TEvPrivate::TEvTaskProcessedResu WaitTime += delta; } StartWaitTime = TInstant::Now(); - LWPROBE(TaskProcessed, TabletId, ScanId, TxId, delta); + TotalBlobBytes += ev->Get()->GetBlobBytes(); + TotalRawBytes += ev->Get()->GetRawBytes(); + TotalRowsCount += ev->Get()->GetFilteredRows(); + if (ev->Get()->GetSourceId() > 0) { + ++TotalPartialSourcesCount; + LWTRACK(ScanFinishSource, *ScanOrbit, PathId, TabletId, TxId, ScanId, (ui64)ev->Get()->GetSourceId(), + ev->Get()->GetBlobBytes(), ev->Get()->GetRawBytes(), ev->Get()->GetFilteredRows(), ev->Get()->GetTotalRows(), + ev->Get()->GetTotalReservedBytes()); + } auto g = Stats->MakeGuard("task_result", IS_INFO_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN)); auto& result = ev->Get()->MutableResult(); if (result.IsFail()) { @@ -139,7 +152,7 @@ void TColumnShardScan::HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev) StartWaitTime = TInstant::Now(); auto g = Stats->MakeGuard("ack", IS_INFO_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN)); - LWPROBE(AckReceived, TabletId, ScanId, TxId, LastResultInstant ? TDuration::MilliSeconds((TMonotonic::Now() - *LastResultInstant).MilliSeconds()) : TDuration::Zero()); + LWTRACK(AckReceived, *ScanOrbit, PathId, TabletId, TxId, ScanId, LastResultInstant ? TDuration::MilliSeconds((TMonotonic::Now() - *LastResultInstant).MilliSeconds()) : TDuration::Zero()); AFL_VERIFY(!AckReceivedInstant); AckReceivedInstant = TMonotonic::Now(); @@ -322,7 +335,7 @@ bool TColumnShardScan::ProduceResults() noexcept { Result->LastKey = ConvertLastKey(CurrentLastReadKey->GetPKCursor()->ToBatch()); } Result->LastCursorProto = CurrentLastReadKey->SerializeToProto(); - SendResult(false, false); + SendResult(false, false, result.GetSourceId()); ScanIterator->OnSentDataFromInterval(result.GetNotFinishedInterval()); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("stage", "finished")("iterator", ScanIterator->DebugString()); return true; @@ -406,7 +419,7 @@ NKikimr::TOwnedCellVec TColumnShardScan::ConvertLastKey(const std::shared_ptrArrowBatch = NArrow::ClaimMemoryOwnership(Result->ArrowBatch); - LWPROBE(SendResult, TabletId, ScanId, TxId, Result->GetRowsCount(), (Result->ArrowBatch ? NArrow::GetTableDataSize(Result->ArrowBatch) : 0), Result->CpuTime, Result->WaitTime, TInstant::Now() - LastSend, Result->Finished); + LWTRACK(SendResult, *ScanOrbit, PathId, TabletId, TxId, ScanId, sourceId, Result->GetRowsCount(), (Result->ArrowBatch ? NArrow::GetTableDataSize(Result->ArrowBatch) : 0), Result->CpuTime, Result->WaitTime, TInstant::Now() - LastSend, Result->Finished); Send(ScanComputeActorId, Result.Release(), IEventHandle::FlagTrackDelivery); // TODO: FlagSubscribeOnSession ? LastSend = TInstant::Now(); diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.h b/ydb/core/tx/columnshard/engines/reader/actor/actor.h index 98dfca483ac5..6c3c8f07238b 100644 --- a/ydb/core/tx/columnshard/engines/reader/actor/actor.h +++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.h @@ -14,6 +14,7 @@ #include #include #include +#include namespace NKikimr::NOlap::NReader { @@ -28,17 +29,24 @@ class TColumnShardScan: public TActorBootstrapped, const std::shared_ptr ColumnDataManager; std::optional StartInstant; std::optional FinishInstant; + std::shared_ptr ScanOrbit; + const ui64 PathId; public: virtual void PassAway() override; + const std::shared_ptr& GetScanOrbit() const { + return ScanOrbit; + } + TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId, const TActorId& scanDiagnosticsActorId, const std::shared_ptr& storagesManager, const std::shared_ptr& dataAccessorsManager, const std::shared_ptr& columnDataManager, const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie, ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange, NKikimrDataEvents::EDataFormat dataFormat, - const NColumnShard::TScanCounters& scanCountersPool, const NConveyorComposite::TCPULimitsConfig& cpuLimits); + const NColumnShard::TScanCounters& scanCountersPool, const NConveyorComposite::TCPULimitsConfig& cpuLimits, + std::shared_ptr orbit, ui64 pathId = 0); void Bootstrap(const TActorContext& ctx); @@ -107,7 +115,7 @@ class TColumnShardScan: public TActorBootstrapped, } }; - bool SendResult(bool pageFault, bool lastBatch); + bool SendResult(bool pageFault, bool lastBatch, ui64 sourceId = 0); void SendScanError(const TString& reason); @@ -200,6 +208,10 @@ class TColumnShardScan: public TActorBootstrapped, ui64 RowsSum = 0; ui64 PacksSum = 0; ui64 Bytes = 0; + ui64 TotalPartialSourcesCount = 0; + ui64 TotalBlobBytes = 0; + ui64 TotalRawBytes = 0; + ui64 TotalRowsCount = 0; ui32 PageFaults = 0; TInstant StartWaitTime; TDuration WaitTime; diff --git a/ydb/core/tx/columnshard/engines/reader/common/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/common/conveyor_task.cpp index b4248d58de5f..0247eed5ff86 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/conveyor_task.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common/conveyor_task.cpp @@ -8,10 +8,12 @@ void IDataTasksProcessor::ITask::DoExecute(const std::shared_ptr(taskPtr), std::move(Guard))); + new NColumnShard::TEvPrivate::TEvTaskProcessedResult(static_pointer_cast(taskPtr), std::move(Guard), GetSourceId(), + GetBlobBytes(), GetRawBytes(), GetFilteredRows(), GetTotalRows(), + GetTotalReservedBytes())); } } diff --git a/ydb/core/tx/columnshard/engines/reader/common/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/common/conveyor_task.h index dd768cdeefc6..becfcf64d822 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/conveyor_task.h +++ b/ydb/core/tx/columnshard/engines/reader/common/conveyor_task.h @@ -23,6 +23,24 @@ class IApplyAction { AppliedFlag = true; return DoApply(indexedDataRead); } + virtual ui64 GetSourceId() const { + return 0; + } + virtual ui64 GetBlobBytes() const { + return 0; + } + virtual ui64 GetRawBytes() const { + return 0; + } + virtual ui32 GetFilteredRows() const { + return 0; + } + virtual ui32 GetTotalRows() const { + return 0; + } + virtual ui64 GetTotalReservedBytes() const { + return 0; + } virtual ~IApplyAction() = default; }; diff --git a/ydb/core/tx/columnshard/engines/reader/common/description.h b/ydb/core/tx/columnshard/engines/reader/common/description.h index 0acbcb95d778..d50517d13650 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/description.h +++ b/ydb/core/tx/columnshard/engines/reader/common/description.h @@ -7,6 +7,9 @@ #include #include + +namespace NLWTrace { class TOrbit; } + namespace NKikimr::NOlap::NReader { enum class ERequestSorting { @@ -33,6 +36,7 @@ class TReadDescription { public: // Table ui64 TxId = 0; + ui64 ScanId = 0; std::optional LockId; std::optional LockNodeId; std::optional LockMode; @@ -40,6 +44,7 @@ class TReadDescription { std::shared_ptr PKRangesFilter; NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE; EDeduplicationPolicy DeduplicationPolicy = EDeduplicationPolicy::ALLOW_DUPLICATES; + std::shared_ptr Orbit; bool readNonconflictingPortions; bool readConflictingPortions; // portions that the current tx has written diff --git a/ydb/core/tx/columnshard/engines/reader/common/result.cpp b/ydb/core/tx/columnshard/engines/reader/common/result.cpp index 7c4209bc1c7f..7728c32edb8e 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/result.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common/result.cpp @@ -55,12 +55,13 @@ std::vector> TPartialReadResult::SplitResult TPartialReadResult::TPartialReadResult(const std::vector>& resourceGuards, const std::shared_ptr& gGuard, NArrow::TShardedRecordBatch&& batch, std::shared_ptr&& scanCursor, const std::shared_ptr& context, - const std::optional notFinishedInterval) + const std::optional notFinishedInterval, const ui64 sourceId) : ResourceGuards(resourceGuards) , GroupGuard(gGuard) , ResultBatch(std::move(batch)) , ScanCursor(std::move(scanCursor)) , NotFinishedInterval(notFinishedInterval) + , SourceId(sourceId) , Guard(TValidator::CheckNotNull(context)->GetCounters().GetResultsForReplyGuard()) { Y_ABORT_UNLESS(ResultBatch.GetRecordsCount()); Y_ABORT_UNLESS(ScanCursor); diff --git a/ydb/core/tx/columnshard/engines/reader/common/result.h b/ydb/core/tx/columnshard/engines/reader/common/result.h index a940eda8f310..c15f6d4d6fed 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/result.h +++ b/ydb/core/tx/columnshard/engines/reader/common/result.h @@ -35,6 +35,7 @@ class TPartialReadResult: public TNonCopyable { // NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit std::shared_ptr ScanCursor; YDB_READONLY_DEF(std::optional, NotFinishedInterval); + YDB_READONLY(ui64, SourceId, 0); const NColumnShard::TCounterGuard Guard; bool Extracted = false; @@ -75,11 +76,11 @@ class TPartialReadResult: public TNonCopyable { explicit TPartialReadResult(const std::vector>& resourceGuards, const std::shared_ptr& gGuard, NArrow::TShardedRecordBatch&& batch, std::shared_ptr&& scanCursor, const std::shared_ptr& context, - const std::optional notFinishedInterval); + const std::optional notFinishedInterval, const ui64 sourceId = 0); explicit TPartialReadResult(NArrow::TShardedRecordBatch&& batch, std::shared_ptr&& scanCursor, - const std::shared_ptr& context, const std::optional notFinishedInterval) - : TPartialReadResult({}, nullptr, std::move(batch), std::move(scanCursor), context, notFinishedInterval) { + const std::shared_ptr& context, const std::optional notFinishedInterval, const ui64 sourceId = 0) + : TPartialReadResult({}, nullptr, std::move(batch), std::move(scanCursor), context, notFinishedInterval, sourceId) { } }; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp index cd3faf5538ec..09ef95ab2877 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp @@ -29,7 +29,7 @@ TConclusionStatus TReadMetadata::Init(const NColumnShard::TColumnShard* owner, c return TConclusionStatus::Success(); } - ITableMetadataAccessor::TSelectMetadataContext context(owner->GetTablesManager(), owner->GetIndexVerified()); + ITableMetadataAccessor::TSelectMetadataContext context(owner->GetTablesManager(), owner->GetIndexVerified(), readDescription.Orbit); SourcesConstructor = readDescription.TableMetadataAccessor->SelectMetadata(context, readDescription, isPlain); if (!SourcesConstructor) { diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp index c1b65f6f4446..17e4fbd1035b 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp @@ -13,26 +13,24 @@ namespace NKikimr::NOlap::NReader::NCommon { LWTRACE_USING(YDB_CS_DATA_SOURCE); +void TColumnBlobsFetchingStep::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step, const TDuration executionDurationMs, const ui64 blobBytes, const ui64 rawBytes) const { + LWTRACK(ColumnBlobsFetching, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), source->GetAndResetWaitDuration(), executionDurationMs, Columns.GetColumnsCount(), blobBytes, rawBytes, source->GetRecordsCount(), source->GetReservedMemory()); +} + TConclusion TColumnBlobsFetchingStep::DoExecuteInplace( const std::shared_ptr& source, const TFetchingScriptCursor& step) const { - const TDuration durationMs = source->GetAndResetWaitDuration(); - - LWTRACK(ColumnBlobsFetchingStart, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), - source->GetTxId(), source->GetSourceIdx(), step.GetStepIndex(), - step.GetTracingName(), durationMs, Columns.GetColumnsCount(), source->GetRecordsCount()); const TMonotonic start = TMonotonic::Now(); auto result = !source->StartFetchingColumns(source, step, Columns); const TDuration executionDurationMs = TMonotonic::Now() - start; source->AddExecutionDuration(executionDurationMs); - ui64 bytesRead = source->GetColumnBlobBytes(Columns.GetColumnIds()); - source->AddBytesRead(bytesRead); - - const TDuration finishDurationMs = source->GetAndResetWaitDuration(); - LWTRACK(ColumnBlobsFetchingFinish, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), - source->GetTxId(), source->GetSourceIdx(), step.GetStepIndex(), - step.GetTracingName(), finishDurationMs, executionDurationMs, Columns.GetColumnsCount(), bytesRead, source->GetRecordsCount()); + ui64 blobBytes = source->GetColumnBlobBytes(Columns.GetColumnIds()); + ui64 rawBytes = source->GetColumnRawBytes(Columns.GetColumnIds()); + source->AddBytesRead(blobBytes); + ReportTracing(source, step, executionDurationMs, blobBytes, rawBytes); return result; } @@ -41,24 +39,21 @@ ui64 TColumnBlobsFetchingStep::GetProcessingDataSize(const std::shared_ptrGetColumnBlobBytes(Columns.GetColumnIds()); } +void TAssemblerStep::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step, const TDuration executionDurationMs, const ui64 bytesAssembled) const { + const TDuration finishDurationMs = source->GetAndResetWaitDuration(); + LWTRACK(AssemblerStep, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), finishDurationMs, executionDurationMs, Columns->GetColumnsCount(), bytesAssembled, source->GetRecordsCount(), source->GetReservedMemory()); +} + TConclusion TAssemblerStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { - const TDuration durationMs = source->GetAndResetWaitDuration(); - - LWTRACK(AssemblerStepStart, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), - source->GetTxId(), source->GetSourceIdx(), step.GetStepIndex(), - step.GetTracingName(), durationMs, Columns->GetColumnsCount(), source->GetRecordsCount()); - const TMonotonic start = TMonotonic::Now(); source->AssembleColumns(Columns); const TDuration executionDurationMs = TMonotonic::Now() - start; source->AddExecutionDuration(executionDurationMs); ui64 bytesAssembled = source->GetColumnRawBytes(Columns->GetColumnIds()); - - const TDuration finishDurationMs = source->GetAndResetWaitDuration(); - LWTRACK(AssemblerStepFinish, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), - source->GetTxId(), source->GetSourceIdx(), step.GetStepIndex(), - step.GetTracingName(), finishDurationMs, executionDurationMs, Columns->GetColumnsCount(), bytesAssembled, source->GetRecordsCount()); + ReportTracing(source, step, executionDurationMs, bytesAssembled); return true; } @@ -122,6 +117,12 @@ void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(cons } } +void TAllocateMemoryStep::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step, const TDuration executionDurationMs, const ui64 size) const { + LWTRACK(MemoryAllocation, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), source->GetAndResetWaitDuration(), executionDurationMs, size, true, source->GetReservedMemory()); +} + TConclusion TAllocateMemoryStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { ui64 size = PredefinedSize.value_or(0); for (auto&& i : Packs) { @@ -136,19 +137,10 @@ TConclusion TAllocateMemoryStep::DoExecuteInplace(const std::shared_ptrGetAndResetWaitDuration(); - LWTRACK(MemoryAllocationStart, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), - source->GetTxId(), source->GetSourceIdx(), step.GetStepIndex(), step.GetTracingName(), durationMs, size); - const TMonotonic start = TMonotonic::Now(); auto allocation = std::make_shared(source, size, step, StageIndex); const TDuration executionDurationMs = TMonotonic::Now() - start; - - const TDuration finishDurationMs = source->GetAndResetWaitDuration(); - LWTRACK(MemoryAllocationFinish, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), - source->GetTxId(), source->GetSourceIdx(), step.GetStepIndex(), - step.GetTracingName(), finishDurationMs, executionDurationMs, size, true); + ReportTracing(source, step, executionDurationMs, size); FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, source->AddEvent("smalloc")); NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(source->GetContext()->GetProcessMemoryControlId(), source->GetContext()->GetCommonContext()->GetScanId(), source->GetMemoryGroupId(), { allocation }, (ui32)StageIndex); diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h index 3b8abd40ea48..670a19aae352 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h @@ -24,6 +24,8 @@ class TAllocateMemoryStep: public IFetchingStep { const NArrow::NSSA::IMemoryCalculationPolicy::EStage StageIndex; const std::optional PredefinedSize; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step, const TDuration executionDurationMs, const ui64 size) const; + protected: virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; @@ -87,6 +89,7 @@ class TAssemblerStep: public IFetchingStep { virtual TString DoDebugString() const override { return TStringBuilder() << "columns=" << Columns->DebugString() << ";"; } + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step, const TDuration executionDurationMs, const ui64 bytesAssembled) const; public: virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; @@ -134,6 +137,7 @@ class TColumnBlobsFetchingStep: public IFetchingStep { private: using TBase = IFetchingStep; YDB_READONLY_DEF(TColumnsSetIds, Columns); + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step, const TDuration executionDurationMs, const ui64 blobBytes, const ui64 rawBytes) const; protected: virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h index 0a17b6b8acb9..247c44c5fd7f 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetched_data.h @@ -275,6 +275,13 @@ class TFetchedResult { return !!ChunkToReply; } + ui32 GetResultChunkRowsCount() const { + if (!ChunkToReply || !ChunkToReply->HasData()) { + return 0; + } + return ChunkToReply->GetTable()->num_rows(); + } + std::optional ExtractResultChunk() { AFL_VERIFY(!!ChunkToReply); auto result = std::move(*ChunkToReply); diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp index dafb21f029a0..536a3e9f5834 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp @@ -6,7 +6,12 @@ #include "sub_columns_fetching.h" #include +#include +#include +#include #include +#include +#include #include #include @@ -28,26 +33,38 @@ TConclusion TStepAction::DoExecuteImpl() { if (Source->GetContext()->IsAborted()) { AFL_VERIFY(!FinishedFlag); FinishedFlag = true; + CacheSourceStats(); return true; } auto executeResult = Cursor.Execute(Source); if (executeResult.IsFail()) { AFL_VERIFY(!FinishedFlag); FinishedFlag = true; + CacheSourceStats(); return executeResult; } if (*executeResult) { AFL_VERIFY(!FinishedFlag); FinishedFlag = true; + CacheSourceStats(); } return FinishedFlag; } +void TStepAction::CacheSourceStats() { + CachedBlobBytes = Source->ExtractTotalBytesRead(); + CachedRawBytes = Source->GetUsedRawBytesOptional(); + CachedFilteredRows = Source->GetFilteredRowsCount(); + CachedTotalRows = Source->GetRecordsCount(); + CachedTotalReservedBytes = Source->GetReservedMemory(); +} + TStepAction::TStepAction( std::shared_ptr&& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId, const bool changeSyncSection) : TBase(ownerActorId, source->GetContext()->GetCommonContext()->GetCounters().GetAssembleTasksGuard()) , Source(std::move(source)) - , Cursor(std::move(cursor)) { + , Cursor(std::move(cursor)) + , CachedSourceId(Source->GetDeprecatedPortionId()) { if (changeSyncSection) { Source->StartAsyncSection(); } else { @@ -55,6 +72,173 @@ TStepAction::TStepAction( } } +void TProgramStep::ReportTracing(const std::shared_ptr& source, const TDuration executionDurationMs, const TString& currentExecutionResult) const { + auto iterator = source->GetExecutionContext().GetProgramIteratorVerified(); + if (!iterator->IsValid()) { + return; + } + const auto& currentCategoryName = iterator->GetCurrentNode().GetSignalCategoryName(); + const auto& scanOrbit = source->GetContext()->GetCommonContext()->GetScanOrbit(); + if (!NLWTrace::HasShuttles(source->GetDataSourceOrbit()) + && !(scanOrbit && NLWTrace::HasShuttles(*scanOrbit)) + && !LWPROBE_ENABLED(ProgramConst) + && !LWPROBE_ENABLED(ProgramCalculation) + && !LWPROBE_ENABLED(ProgramProjection) + && !LWPROBE_ENABLED(ProgramFilter) + && !LWPROBE_ENABLED(ProgramAggregation) + && !LWPROBE_ENABLED(ProgramFetchOriginalData) + && !LWPROBE_ENABLED(ProgramAssembleOriginalData) + && !LWPROBE_ENABLED(ProgramCheckIndexData) + && !LWPROBE_ENABLED(ProgramCheckHeaderData) + && !LWPROBE_ENABLED(ProgramStreamLogic) + && !LWPROBE_ENABLED(ProgramReserveMemory)) { + source->MutableExecutionContext().SetPrevCategoryName(currentCategoryName); + source->MutableExecutionContext().SetPrevExecutionResult(currentExecutionResult); + return; + } + const auto& step = source->GetExecutionContext().GetCursorStep(); + const TString tracingName = source->GetExecutionContext().GetPrevCategoryName() + " - " + currentCategoryName; + const TString tracingExecutionResult = source->GetExecutionContext().GetPrevExecutionResult() + " - " + currentExecutionResult; + const TDuration finishDurationMs = source->GetAndResetWaitDuration(); + const auto& processor = iterator->GetProcessorVerified(); + const auto processorType = processor->GetProcessorType(); + const TString details = processor->DebugJson().GetStringRobust(); + const auto& resources = source->GetExecutionContext().GetExecutionVisitorVerified()->MutableContext().GetResources(); + const ui32 filteredRows = resources.GetRecordsCountActualOptional().value_or(source->GetRecordsCount()); +#define PROGRAM_PROBE_ARGS source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), \ + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), \ + tracingName, iterator->GetCurrentNodeId(), finishDurationMs, \ + executionDurationMs, filteredRows +#define PROGRAM_PROBE_RESERVED source->GetReservedMemory() +#define PROGRAM_PROBE_TAIL tracingExecutionResult, details + switch (processorType) { + case NArrow::NSSA::EProcessorType::Const: + LWTRACK(ProgramConst, PROGRAM_PROBE_ARGS, PROGRAM_PROBE_RESERVED, PROGRAM_PROBE_TAIL); + break; + case NArrow::NSSA::EProcessorType::Calculation: + LWTRACK(ProgramCalculation, PROGRAM_PROBE_ARGS, PROGRAM_PROBE_RESERVED, PROGRAM_PROBE_TAIL); + break; + case NArrow::NSSA::EProcessorType::Projection: + LWTRACK(ProgramProjection, PROGRAM_PROBE_ARGS, PROGRAM_PROBE_RESERVED, PROGRAM_PROBE_TAIL); + break; + case NArrow::NSSA::EProcessorType::Filter: + LWTRACK(ProgramFilter, PROGRAM_PROBE_ARGS, PROGRAM_PROBE_RESERVED, PROGRAM_PROBE_TAIL); + break; + case NArrow::NSSA::EProcessorType::Aggregation: + LWTRACK(ProgramAggregation, PROGRAM_PROBE_ARGS, PROGRAM_PROBE_RESERVED, PROGRAM_PROBE_TAIL); + break; + case NArrow::NSSA::EProcessorType::FetchOriginalData: + { + ui64 blobBytes = 0; + ui64 rawBytes = 0; + auto* fetchProcessor = dynamic_cast(processor.get()); + if (fetchProcessor) { + std::set dataColumnIds; + for (auto&& [colId, addr] : fetchProcessor->GetDataAddresses()) { + dataColumnIds.insert(colId); + } + if (!dataColumnIds.empty()) { + blobBytes += source->GetColumnBlobBytes(dataColumnIds); + rawBytes += source->GetColumnRawBytes(dataColumnIds); + } + if (!fetchProcessor->GetIndexContext().empty() && source->HasPortionAccessor() && source->GetSourceSchemaOptional()) { + const auto& accessor = source->GetPortionAccessor(); + std::set indexEntityIds; + const auto& indexInfo = source->GetSourceSchemaOptional()->GetIndexInfo(); + for (auto&& [colId, idxCtx] : fetchProcessor->GetIndexContext()) { + for (auto&& [subCol, ops] : idxCtx.GetOperationsBySubColumn().GetData()) { + NIndexes::NRequest::TOriginalDataAddress addr(colId, subCol); + for (auto&& op : ops) { + for (auto&& skipIdx : indexInfo.FindSkipIndexes(addr, op)) { + indexEntityIds.insert(skipIdx->GetIndexId()); + } + } + } + } + if (!indexEntityIds.empty()) { + blobBytes += accessor.GetIndexBlobBytes(indexEntityIds, false); + rawBytes += accessor.GetIndexRawBytes(indexEntityIds, false); + } + } + } + source->AddBytesRead(blobBytes); + LWTRACK(ProgramFetchOriginalData, PROGRAM_PROBE_ARGS, blobBytes, rawBytes, PROGRAM_PROBE_RESERVED, PROGRAM_PROBE_TAIL); + } + break; + case NArrow::NSSA::EProcessorType::AssembleOriginalData: + LWTRACK(ProgramAssembleOriginalData, PROGRAM_PROBE_ARGS, PROGRAM_PROBE_RESERVED, PROGRAM_PROBE_TAIL); + break; + case NArrow::NSSA::EProcessorType::CheckIndexData: + { + TString indexStatus = "Unknown"; + ui32 indexFilteredRows = source->GetRecordsCount(); + auto* indexProcessor = dynamic_cast(processor.get()); + if (indexProcessor && source->GetSourceSchemaOptional()) { + const auto& idxCtx = indexProcessor->GetIndexContext(); + NIndexes::NRequest::TOriginalDataAddress addr(idxCtx.GetColumnId(), idxCtx.GetSubColumnName()); + auto skipIndexes = source->GetSourceSchemaOptional()->GetIndexInfo().FindSkipIndexes(addr, idxCtx.GetOperation()); + bool hasActualIndexData = false; + if (!skipIndexes.empty() && source->HasPortionAccessor()) { + std::set indexEntityIds; + for (auto&& skipIdx : skipIndexes) { + indexEntityIds.insert(skipIdx->GetIndexId()); + } + hasActualIndexData = source->GetPortionAccessor().GetIndexBlobBytes(indexEntityIds, false) > 0; + } + if (skipIndexes.empty() || !hasActualIndexData) { + indexStatus = "NoIndex"; + indexFilteredRows = source->GetRecordsCount(); + } else { + // After DoExecute in index.cpp: + // - AllDenied/AllAccepted: output column stored, filter NOT modified + // - Partial: no output column, filter modified via AddFilter + const ui32 outputColumnId = indexProcessor->GetOutputColumnIdOnce(); + const auto& outputAccessor = resources.GetAccessorOptional(outputColumnId); + if (outputAccessor) { + // Output column exists → AllDenied or AllAccepted + auto* sparsed = dynamic_cast(outputAccessor.get()); + if (sparsed && sparsed->GetDefaultValue() && sparsed->GetDefaultValue()->is_valid) { + auto* uint8Scalar = dynamic_cast(sparsed->GetDefaultValue().get()); + if (uint8Scalar && uint8Scalar->value == 0) { + indexStatus = "AllDenied"; + indexFilteredRows = 0; + } else { + indexStatus = "AllAccepted"; + indexFilteredRows = source->GetRecordsCount(); + } + } else { + indexStatus = "AllAccepted"; + indexFilteredRows = source->GetRecordsCount(); + } + } else { + // No output column → Partial (filter was applied via AddFilter) + indexStatus = "Partial"; + indexFilteredRows = resources.GetFilter().GetFilteredCount().value_or(source->GetRecordsCount()); + } + } + } + LWTRACK(ProgramCheckIndexData, PROGRAM_PROBE_ARGS, indexFilteredRows, indexStatus, PROGRAM_PROBE_RESERVED, PROGRAM_PROBE_TAIL); + } + break; + case NArrow::NSSA::EProcessorType::CheckHeaderData: + LWTRACK(ProgramCheckHeaderData, PROGRAM_PROBE_ARGS, PROGRAM_PROBE_RESERVED, PROGRAM_PROBE_TAIL); + break; + case NArrow::NSSA::EProcessorType::StreamLogic: + LWTRACK(ProgramStreamLogic, PROGRAM_PROBE_ARGS, PROGRAM_PROBE_RESERVED, PROGRAM_PROBE_TAIL); + break; + case NArrow::NSSA::EProcessorType::ReserveMemory: + LWTRACK(ProgramReserveMemory, PROGRAM_PROBE_ARGS, source->GetReservedMemory(), PROGRAM_PROBE_RESERVED, PROGRAM_PROBE_TAIL); + break; + case NArrow::NSSA::EProcessorType::Unknown: + break; + } +#undef PROGRAM_PROBE_ARGS +#undef PROGRAM_PROBE_RESERVED +#undef PROGRAM_PROBE_TAIL + source->MutableExecutionContext().SetPrevCategoryName(currentCategoryName); + source->MutableExecutionContext().SetPrevExecutionResult(currentExecutionResult); +} + TConclusion TProgramStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { const bool started = !source->GetExecutionContext().HasProgramIterator(); if (!source->GetExecutionContext().HasProgramIterator()) { @@ -83,35 +267,15 @@ TConclusion TProgramStep::DoExecuteInplace(const std::shared_ptrMutableExecutionContext().OnStartProgramStepExecution(iterator->GetCurrentNodeId(), GetSignals(iterator->GetCurrentNodeId())); auto signals = GetSignals(iterator->GetCurrentNodeId()); - const auto& currentCategoryName = iterator->GetCurrentNode().GetSignalCategoryName(); - if (LWPROBE_ENABLED(ProgramChainStart) || source->GetDataSourceOrbit().HasShuttles()) { - const TString tracingName = source->GetExecutionContext().GetPrevCategoryName() + " - " + currentCategoryName; - const TDuration durationMs = source->GetAndResetWaitDuration(); - LWTRACK(ProgramChainStart, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), - source->GetTxId(), source->GetSourceIdx(), step.GetStepIndex(), - tracingName, iterator->GetCurrentNodeId(), durationMs, source->GetRecordsCount()); - } - const TMonotonic start = TMonotonic::Now(); auto conclusion = source->GetExecutionContext().GetExecutionVisitorVerified()->Execute(); const TDuration executionDurationMs = TMonotonic::Now() - start; source->GetContext()->GetCommonContext()->GetCounters().AddExecutionDuration(executionDurationMs); signals->AddExecutionDuration(executionDurationMs); source->AddExecutionDuration(executionDurationMs); - - if (LWPROBE_ENABLED(ProgramChainFinish) || source->GetDataSourceOrbit().HasShuttles()) { - const TString tracingName = source->GetExecutionContext().GetPrevCategoryName() + " - " + currentCategoryName; - TString currentExecutionResult = conclusion.IsFail() ? "Fail" : ToString(*conclusion); - const TString tracingExecutionResult = source->GetExecutionContext().GetPrevExecutionResult() + " - " + currentExecutionResult; - const TDuration finishDurationMs = source->GetAndResetWaitDuration(); - LWTRACK(ProgramChainFinish, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), - source->GetTxId(), source->GetSourceIdx(), step.GetStepIndex(), - tracingName, iterator->GetCurrentNodeId(), finishDurationMs, - executionDurationMs, source->GetRecordsCount(), tracingExecutionResult); - source->MutableExecutionContext().SetPrevCategoryName(currentCategoryName); - source->MutableExecutionContext().SetPrevExecutionResult(currentExecutionResult); - } - + + const TString currentExecutionResult = conclusion.IsFail() ? "Fail" : ToString(*conclusion); + ReportTracing(source, executionDurationMs, currentExecutionResult); if (conclusion.IsFail()) { source->MutableExecutionContext().OnFailedProgramStepExecution(); return conclusion; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h index 8c24a0e40c6c..36383f464f52 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h @@ -21,6 +21,14 @@ class TStepAction: public IDataTasksProcessor::ITask { std::shared_ptr Source; TFetchingScriptCursor Cursor; bool FinishedFlag = false; + ui64 CachedSourceId = 0; + ui64 CachedBlobBytes = 0; + ui64 CachedRawBytes = 0; + ui32 CachedFilteredRows = 0; + ui32 CachedTotalRows = 0; + ui64 CachedTotalReservedBytes = 0; + + void CacheSourceStats(); protected: virtual bool DoApply(IDataReader& owner) override; @@ -31,6 +39,30 @@ class TStepAction: public IDataTasksProcessor::ITask { return "STEP_ACTION"; } + virtual ui64 GetSourceId() const override { + return CachedSourceId; + } + + virtual ui64 GetBlobBytes() const override { + return CachedBlobBytes; + } + + virtual ui64 GetRawBytes() const override { + return CachedRawBytes; + } + + virtual ui32 GetFilteredRows() const override { + return CachedFilteredRows; + } + + virtual ui32 GetTotalRows() const override { + return CachedTotalRows; + } + + virtual ui64 GetTotalReservedBytes() const override { + return CachedTotalReservedBytes; + } + template TStepAction(std::shared_ptr&& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId, const bool changeSyncSection) : TStepAction(std::static_pointer_cast(source), std::move(cursor), ownerActorId, changeSyncSection) { @@ -45,6 +77,7 @@ class TProgramStep: public IFetchingStep { const std::shared_ptr Program; THashMap> Signals; const std::shared_ptr& GetSignals(const ui32 nodeId) const; + void ReportTracing(const std::shared_ptr& source, const TDuration executionDurationMs, const TString& currentExecutionResult) const; public: virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.cpp index 9a98e342602a..4883d0008a77 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.cpp @@ -2,10 +2,12 @@ #include "source.h" #include +#include namespace NKikimr::NOlap::NReader::NCommon { LWTRACE_USING(YDB_CS_DATA_SOURCE); +LWTRACE_USING(YDB_CS_SCAN); void TExecutionContext::OnStartProgramStepExecution(const ui32 nodeId, const std::shared_ptr& signals) { if (!CurrentProgramNodeId) { @@ -153,6 +155,28 @@ ui32 IDataSource::GetRecordsCount() const { } } +void IDataSource::OnStartProcessing() { + AFL_VERIFY(!SourceCreatedTimestamp); + SourceCreatedTimestamp = TMonotonic::Now(); + if (!NLWTrace::HasShuttles(DataSourceOrbit) + && !NLWTrace::HasShuttles(*GetContext()->GetCommonContext()->GetScanOrbit()) + && !LWPROBE_ENABLED(StartSourceProcessing) + && !LWPROBE_ENABLED(ScanStartSource)) { + return; + } + const ui64 portionBlobBytes = HasPortionAccessor() ? GetPortionAccessor().GetPortionInfo().GetTotalBlobBytes() : 0; + const ui64 portionRawBytes = HasPortionAccessor() ? GetPortionAccessor().GetPortionInfo().GetTotalRawBytes() : 0; + TString minPk = HasPortionAccessor() ? GetPortionAccessor().GetPortionInfo().IndexKeyStart().DebugString() : TString{}; + TString maxPk = HasPortionAccessor() ? GetPortionAccessor().GetPortionInfo().IndexKeyEnd().DebugString() : TString{}; + const TString minSnapshot = TStringBuilder() << GetRecordSnapshotMin(); + const TString maxSnapshot = TStringBuilder() << GetRecordSnapshotMax(); + LWTRACK(StartSourceProcessing, DataSourceOrbit, GetRawPathId(), GetTabletId(), GetTxId(), GetDeprecatedPortionId(), + portionBlobBytes, portionRawBytes, GetReservedMemory(), minPk, maxPk, minSnapshot, maxSnapshot); + LWTRACK(ScanStartSource, *GetContext()->GetCommonContext()->GetScanOrbit(), GetRawPathId(), GetTabletId(), + GetTxId(), GetContext()->GetCommonContext()->GetScanId(), GetDeprecatedPortionId(), + portionBlobBytes, portionRawBytes, minPk, maxPk, minSnapshot, maxSnapshot); +} + void IDataSource::StartAsyncSection() { AFL_VERIFY(AtomicCas(&SyncSectionFlag, 0, 1)); } @@ -207,7 +231,6 @@ IDataSource::IDataSource(const EType type, const ui32 sourceIdx, const std::shar , ShardingVersionOptional(shardingVersion) , HasDeletions(hasDeletions) { - SourceCreatedTimestamp = TMonotonic::Now(); FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, Events.emplace(NEvLog::TLogsThread())); FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, AddEvent("c")); } @@ -275,6 +298,11 @@ void IDataSource::OnEmptyStageData(const std::shared_ptr& DoOnEmptyStageData(sourcePtr); AFL_VERIFY(StageResult); AFL_VERIFY(!StageData); + + const TDuration durationMs = GetAndResetWaitDuration(); + LWTRACK(SourceFinished, DataSourceOrbit, GetRawPathId(), GetTabletId(), + GetTxId(), GetDeprecatedPortionId(), 0, + ExecutionContext.GetPrevCategoryName() + " - " + "SourceFinished(Empty)", durationMs, GetTotalDuration(), GetTotalBytesRead(), GetTotalExecutionDuration(), GetReservedMemory()); } void IDataSource::BuildStageResult(const std::shared_ptr& sourcePtr) { @@ -289,8 +317,8 @@ void IDataSource::BuildStageResult(const std::shared_ptr& sourcePtr const TDuration durationMs = GetAndResetWaitDuration(); LWTRACK(SourceFinished, DataSourceOrbit, GetRawPathId(), GetTabletId(), - GetTxId(), GetSourceIdx(), 0, - ExecutionContext.GetPrevCategoryName() + " - " + "SourceFinished", durationMs, GetTotalDuration(), GetTotalBytesRead(), GetTotalExecutionDuration()); + GetTxId(), GetDeprecatedPortionId(), 0, + ExecutionContext.GetPrevCategoryName() + " - " + "SourceFinished", durationMs, GetTotalDuration(), GetTotalBytesRead(), GetTotalExecutionDuration(), GetReservedMemory()); } bool IDataSource::AddTxConflict() { diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h index a3d75afd4dcb..9f609321f577 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h @@ -139,6 +139,8 @@ class IDataSource: public ICursorEntity, public NArrow::NSSA::IDataSource { std::vector> ResourceGuards; NLWTrace::TOrbit DataSourceOrbit; TMonotonic LastProbeTimestamp; + TMonotonic SourcesAheadQueueEnterTime; + ui32 SourcesAhead = 0; TMonotonic SourceCreatedTimestamp; TDuration TotalExecutionDuration; ui64 TotalBytesRead = 0; @@ -156,6 +158,34 @@ class IDataSource: public ICursorEntity, public NArrow::NSSA::IDataSource { return result; } + void SetSourcesAheadQueueEnterTime(const TMonotonic t) { + SourcesAheadQueueEnterTime = t; + } + + TDuration GetSourcesAheadQueueWaitDuration() const { + if (!SourcesAhead || !SourcesAheadQueueEnterTime) { + return TDuration::Zero(); + } + return TMonotonic::Now() - SourcesAheadQueueEnterTime; + } + + void SetSourcesAhead(const ui32 count) { + SourcesAhead = count; + } + + ui32 GetSourcesAhead() const { + return SourcesAhead; + } + + ui32 GetFilteredRowsCount() const { + if (!HasStageResult() || GetStageResult().IsEmpty()) { + return 0; + } + const auto& notAppliedFilter = GetStageResult().GetNotAppliedFilter(); + return notAppliedFilter ? notAppliedFilter->GetFilteredCount().value_or(GetStageResult().GetBatch()->num_rows()) + : GetStageResult().GetBatch()->num_rows(); + } + void AddExecutionDuration(const TDuration d) { TotalExecutionDuration += d; } @@ -164,6 +194,8 @@ class IDataSource: public ICursorEntity, public NArrow::NSSA::IDataSource { TotalBytesRead += bytes; } + void OnStartProcessing(); + TDuration GetTotalDuration() const { return SourceCreatedTimestamp ? (TMonotonic::Now() - SourceCreatedTimestamp) : TDuration::Zero(); } @@ -176,6 +208,12 @@ class IDataSource: public ICursorEntity, public NArrow::NSSA::IDataSource { return TotalBytesRead; } + ui64 ExtractTotalBytesRead() { + const ui64 result = TotalBytesRead; + TotalBytesRead = 0; + return result; + } + NLWTrace::TOrbit& GetDataSourceOrbit() { return DataSourceOrbit; } @@ -258,6 +296,15 @@ class IDataSource: public ICursorEntity, public NArrow::NSSA::IDataSource { virtual const std::shared_ptr& GetSourceSchema() const; + virtual const std::shared_ptr& GetSourceSchemaOptional() const { + static std::shared_ptr defaultValue; + return defaultValue; + } + + virtual ui64 GetUsedRawBytesOptional() const { + return 0; + } + virtual TString GetColumnStorageId(const ui32 /*columnId*/) const; virtual TString GetEntityStorageId(const ui32 /*entityId*/) const; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make index 3cb9684c09f6..e914b19c0d64 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make @@ -14,14 +14,17 @@ SRCS( ) PEERDIR( - ydb/core/tx/columnshard/engines/scheme ydb/core/formats/arrow/accessor/dictionary ydb/core/formats/arrow/accessor/plain ydb/core/formats/arrow/accessor/sub_columns - yql/essentials/minikql + ydb/core/tx/columnshard/engines/reader/tracing + ydb/core/tx/columnshard/engines/scheme + ydb/core/tx/columnshard/engines/storage/indexes/skip_index ydb/core/util/evlog + yql/essentials/minikql ) GENERATE_ENUM_SERIALIZATION(source.h) +YQL_LAST_ABI_VERSION() END() diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index fdfb72112d08..6eed19a10feb 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -30,6 +30,7 @@ void IDataSource::RegisterInterval(TFetchingInterval& interval, const std::share AFL_VERIFY(Intervals.emplace(interval.GetIntervalIdx(), &interval).second); } if (AtomicCas(&SourceStartedFlag, 1, 0)) { + OnStartProcessing(); SetMemoryGroupId(interval.GetIntervalId()); AFL_VERIFY(FetchingPlan); InitStageData(std::make_unique(GetExclusiveIntervalOnly(), GetRecordsCount())); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp index dda1dce2966a..b7c2a29501fe 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -12,13 +13,24 @@ namespace NKikimr::NOlap::NReader::NSimple { -TConclusion TPredicateFilter::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { +LWTRACE_USING(YDB_CS_DATA_SOURCE); + +void TPredicateFilter::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step, const ui32 filteredRows) const { + const TDuration durationMs = source->GetAndResetWaitDuration(); + LWTRACK(PredicateFilter, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), durationMs, source->GetRecordsCount(), filteredRows, source->GetReservedMemory()); +} + +TConclusion TPredicateFilter::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { auto filter = source->GetContext()->GetReadMetadata()->GetPKRangesFilter().BuildFilter( source->GetStageData().GetTable().ToGeneralContainer(source->GetContext()->GetCommonContext()->GetResolver(), source->GetContext()->GetReadMetadata()->GetPKRangesFilter().GetColumnIds( source->GetContext()->GetReadMetadata()->GetResultSchema()->GetIndexInfo()), true)); + const ui32 filteredRows = filter.GetFilteredCount().value_or(source->GetRecordsCount()); source->MutableStageData().AddFilter(filter); + ReportTracing(source, step, filteredRows); return true; } @@ -56,16 +68,31 @@ void VerifyConflictingPortion(const std::shared_ptr& sourc AFL_VERIFY(source->GetRecordsCount() > 0)("error", "source has no records"); } +void TConflictDetector::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + const TDuration durationMs = source->GetAndResetWaitDuration(); + LWTRACK(ConflictDetector, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), durationMs, source->GetRecordsCount(), source->GetReservedMemory()); +} + TConclusion TConflictDetector::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + const std::shared_ptr& source, const TFetchingScriptCursor& step) const { VerifyConflictingPortion(source); // it is not empty (not filtered everything out by other filters) and conflicting, so we must mark the conflict here AFL_VERIFY(source->AddTxConflict()); + ReportTracing(source, step); return true; } +void TSnapshotFilter::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + const TDuration durationMs = source->GetAndResetWaitDuration(); + LWTRACK(SnapshotFilter, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), durationMs, source->GetRecordsCount(), source->GetReservedMemory()); +} + TConclusion TSnapshotFilter::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + const std::shared_ptr& source, const TFetchingScriptCursor& step) const { auto filter = MakeSnapshotFilter(source->GetStageData().GetTable().ToTable( std::set({ (ui32)IIndexInfo::ESpecialColumn::PLAN_STEP, (ui32)IIndexInfo::ESpecialColumn::TX_ID }), @@ -73,20 +100,31 @@ TConclusion TSnapshotFilter::DoExecuteInplace( source->GetContext()->GetReadMetadata()->GetRequestSnapshot()); if (filter.GetFilteredCount().value_or(source->GetRecordsCount()) != source->GetRecordsCount()) { if (source->AddTxConflict()) { + ReportTracing(source, step); return true; } } source->MutableStageData().AddFilter(filter); + ReportTracing(source, step); return true; } +void TDeletionFilter::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + const TDuration durationMs = source->GetAndResetWaitDuration(); + LWTRACK(DeletionFilter, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), durationMs, source->GetRecordsCount(), source->GetReservedMemory()); +} + TConclusion TDeletionFilter::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + const std::shared_ptr& source, const TFetchingScriptCursor& step) const { if (!source->GetStageData().GetTable().HasColumn((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) { + ReportTracing(source, step); return true; } auto filterTable = source->GetStageData().GetTable().ToTable(std::set({ (ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG })); if (!filterTable) { + ReportTracing(source, step); return true; } AFL_VERIFY(filterTable->column(0)->type()->id() == arrow::boolean()->id()); @@ -98,51 +136,72 @@ TConclusion TDeletionFilter::DoExecuteInplace( } } source->MutableStageData().AddFilter(filter); + ReportTracing(source, step); return true; } +void TShardingFilter::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + const TDuration durationMs = source->GetAndResetWaitDuration(); + LWTRACK(ShardingFilter, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), durationMs, source->GetRecordsCount(), source->GetReservedMemory()); +} + TConclusion TShardingFilter::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + const std::shared_ptr& source, const TFetchingScriptCursor& step) const { NYDBTest::TControllers::GetColumnShardController()->OnSelectShardingFilter(); const auto& shardingInfo = source->GetContext()->GetReadMetadata()->GetRequestShardingInfo()->GetShardingInfo(); const std::set ids = source->GetContext()->GetCommonContext()->GetResolver()->GetColumnIdsSetVerified(shardingInfo->GetColumnNames()); auto filter = shardingInfo->GetFilter(source->GetStageData().GetTable().ToTable(ids, source->GetContext()->GetCommonContext()->GetResolver())); source->MutableStageData().AddFilter(filter); + ReportTracing(source, step); return true; } +void TFilterCutLimit::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + const TDuration durationMs = source->GetAndResetWaitDuration(); + LWTRACK(FilterCutLimit, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), durationMs, source->GetRecordsCount(), source->GetReservedMemory()); +} + NKikimr::TConclusion TFilterCutLimit::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + const std::shared_ptr& source, const TFetchingScriptCursor& step) const { source->MutableStageData().CutFilter(source->GetRecordsCount(), Limit, Reverse); + ReportTracing(source, step); return true; } -TConclusion TStartPortionAccessorFetchingStep::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& step) const { - FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, source->AddEvent("sacc")); - if (source->HasPortionAccessor()) { - return true; - } - return !source->MutableAs()->StartFetchingAccessor(source, step); +void TDetectInMemFlag::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step, const ui64 columnRawBytes, const ui64 columnBlobBytes) const { + const TDuration durationMs = source->GetAndResetWaitDuration(); + LWTRACK(DetectInMemFlag, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), durationMs, columnBlobBytes, columnRawBytes, source->IsSourceInMemory(), source->GetRecordsCount(), source->GetReservedMemory()); } TConclusion TDetectInMemFlag::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + const std::shared_ptr& source, const TFetchingScriptCursor& step) const { if (!source->NeedPortionData()) { source->SetSourceInMemory(true); source->MutableAs()->InitUsedRawBytes(); } if (source->HasSourceInMemoryFlag()) { + ReportTracing(source, step, 0UL, 0UL); return true; } + ui64 columnRawBytes = 0; + ui64 columnBlobBytes = 0; if (Columns.GetColumnsCount() && source->GetContext()->GetReadMetadata()->GetProgram().GetGraphOptional() && !source->GetContext()->GetReadMetadata()->GetProgram().GetChainVerified()->HasAggregations()) { + columnRawBytes = source->GetColumnRawBytes(Columns.GetColumnIds()); + columnBlobBytes = source->GetColumnBlobBytes(Columns.GetColumnIds()); source->SetSourceInMemory( - source->GetColumnRawBytes(Columns.GetColumnIds()) < NYDBTest::TControllers::GetColumnShardController()->GetMemoryLimitScanPortion()); + columnRawBytes < NYDBTest::TControllers::GetColumnShardController()->GetMemoryLimitScanPortion()); } else { source->SetSourceInMemory(true); } + ReportTracing(source, step, columnRawBytes, columnBlobBytes); return true; } @@ -159,6 +218,10 @@ class TApplySourceResult: public IApplyAction { , Step(step) { } + virtual ui64 GetSourceId() const override { + return Source ? Source->GetDeprecatedPortionId() : 0; + } + virtual bool DoApply(IDataReader& indexedDataRead) override { auto* plainReader = static_cast(&indexedDataRead); Source->MutableAs()->SetCursor(std::move(Step)); @@ -171,29 +234,60 @@ class TApplySourceResult: public IApplyAction { } // namespace +void TUpdateAggregatedMemoryStep::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + const TDuration durationMs = source->GetAndResetWaitDuration(); + LWTRACK(UpdateAggregatedMemory, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), durationMs, source->GetRecordsCount(), source->GetReservedMemory()); +} + TConclusion TUpdateAggregatedMemoryStep::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + const std::shared_ptr& source, const TFetchingScriptCursor& step) const { if (auto* portionSource = source->MutableOptionalAs()) { portionSource->ActualizeAggregatedMemoryGuards(); } + ReportTracing(source, step); return true; } +void TInitializeSourceStep::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + const TDuration durationMs = source->GetAndResetWaitDuration(); + LWTRACK(InitializeSource, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), durationMs, source->GetRecordsCount(), source->GetReservedMemory()); +} + TConclusion TInitializeSourceStep::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + const std::shared_ptr& source, const TFetchingScriptCursor& step) const { auto* simpleSource = source->MutableAs(); simpleSource->InitializeProcessing(source); + ReportTracing(source, step); return true; } +void TPortionAccessorFetchedStep::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + const TDuration durationMs = source->GetAndResetWaitDuration(); + LWTRACK(PortionAccessorFetched, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), durationMs, source->GetRecordsCount(), source->GetReservedMemory()); +} + TConclusion TPortionAccessorFetchedStep::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + const std::shared_ptr& source, const TFetchingScriptCursor& step) const { source->MutableAs()->InitUsedRawBytes(); + ReportTracing(source, step); return true; } +void TStepAggregationSources::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + const TDuration durationMs = source->GetAndResetWaitDuration(); + LWTRACK(AggregationSources, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), durationMs, source->GetRecordsCount(), source->GetReservedMemory()); +} + TConclusion TStepAggregationSources::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + const std::shared_ptr& source, const TFetchingScriptCursor& step) const { AFL_VERIFY(source->GetType() == IDataSource::EType::SimpleAggregation); auto* aggrSource = static_cast(source.get()); std::vector> collections; @@ -205,16 +299,25 @@ TConclusion TStepAggregationSources::DoExecuteInplace( return conclusion; } source->BuildStageResult(source); + ReportTracing(source, step); return true; } +void TCleanAggregationSources::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + const TDuration durationMs = source->GetAndResetWaitDuration(); + LWTRACK(CleanAggregationSources, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), durationMs, source->GetRecordsCount(), source->GetReservedMemory()); +} + TConclusion TCleanAggregationSources::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + const std::shared_ptr& source, const TFetchingScriptCursor& step) const { AFL_VERIFY(source->GetType() == IDataSource::EType::SimpleAggregation); auto* aggrSource = static_cast(source.get()); for (auto&& i : aggrSource->GetSources()) { i->MutableAs()->ClearResult(); } + ReportTracing(source, step); return true; } @@ -227,10 +330,29 @@ bool TBuildResultStep::IsPageSkippedByFilter(const std::shared_ptr& source, const TFetchingScriptCursor& step, + const TDuration executionDurationMs) const { + if (!LWPROBE_ENABLED(BuildResult) && !NLWTrace::HasShuttles(source->GetDataSourceOrbit())) { + return; + } + const TDuration durationMs = source->GetAndResetWaitDuration(); + ui32 pageFilteredRowsCount = RecordsCount; + const auto& notAppliedFilter = source->GetStageResult().GetNotAppliedFilter(); + if (notAppliedFilter && !notAppliedFilter->IsTotalAllowFilter()) { + const auto pageFilter = notAppliedFilter->Slice(StartIndex, RecordsCount); + pageFilteredRowsCount = pageFilter.GetFilteredCount().value_or(RecordsCount); + } + LWTRACK(BuildResult, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), durationMs, executionDurationMs, pageFilteredRowsCount, RecordsCount, source->GetReservedMemory(), + source->GetSourcesAheadQueueWaitDuration(), source->GetSourcesAhead()); +} + std::shared_ptr TBuildResultStep::BuildPageResultBatch(const std::shared_ptr& source) const { if (IsPageSkippedByFilter(source)) { return nullptr; } + auto context = source->GetContext(); NArrow::TGeneralContainer::TTableConstructionContext contextTableConstruct; if (!source->IsSourceInMemory()) { contextTableConstruct.SetStartIndex(StartIndex).SetRecordsCount(RecordsCount); @@ -248,6 +370,7 @@ std::shared_ptr TBuildResultStep::BuildPageResultBatch(const std:: TConclusion TBuildResultStep::DoExecuteInplace( const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + const TMonotonic startExecution = TMonotonic::Now(); auto context = source->GetContext(); auto resultBatch = BuildPageResultBatch(source); auto* sSource = source->MutableAs(); @@ -259,14 +382,28 @@ TConclusion TBuildResultStep::DoExecuteInplace( AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("empty_source", sSource->DebugJson().GetStringRobust()); } source->MutableStageResult().SetResultChunk(std::move(resultBatch), StartIndex, RecordsCount); + ReportTracing(source, step, TMonotonic::Now() - startExecution); + const ui64 blobBytes = source->GetTotalBytesRead(); NActors::TActivationContext::AsActorContext().Send(context->GetCommonContext()->GetScanActorId(), new NColumnShard::TEvPrivate::TEvTaskProcessedResult(std::make_shared(source, step), - source->GetContext()->GetCommonContext()->GetCounters().GetResultsForSourceGuard())); + source->GetContext()->GetCommonContext()->GetCounters().GetResultsForSourceGuard(), source->GetDeprecatedPortionId(), + blobBytes, sSource->GetUsedRawBytes(), recordsCount, source->GetRecordsCount(), + source->GetReservedMemory())); return false; } +void TPrepareResultStep::ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step, + const TDuration executionDurationMs) const { + const TDuration durationMs = source->GetAndResetWaitDuration(); + LWTRACK(PrepareResult, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), step.GetStepIndex(), + step.GetTracingName(), durationMs, executionDurationMs, source->GetFilteredRowsCount(), source->GetReservedMemory(), + source->GetSourcesAheadQueueWaitDuration(), source->GetSourcesAhead()); +} + TConclusion TPrepareResultStep::DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { + const std::shared_ptr& source, const TFetchingScriptCursor& step) const { + const TMonotonic startExecution = TMonotonic::Now(); const auto context = source->GetContext(); NCommon::TFetchingScriptBuilder acc(*context); if (source->IsSourceInMemory()) { @@ -289,6 +426,7 @@ TConclusion TPrepareResultStep::DoExecuteInplace( auto plan = std::move(acc).Build(); AFL_VERIFY(!plan->IsFinished(0)); source->MutableAs()->InitFetchingPlan(plan); + ReportTracing(source, step, TMonotonic::Now() - startExecution); if (StartResultBuildingInplace) { TFetchingScriptCursor cursor(plan, 0); return cursor.Execute(source); @@ -297,6 +435,13 @@ TConclusion TPrepareResultStep::DoExecuteInplace( } } +void TDuplicateFilter::TFilterSubscriber::ReportTracing(const std::shared_ptr& source) const { + const TDuration durationMs = source->GetAndResetWaitDuration(); + LWTRACK(Deduplication, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), Step.GetStepIndex(), + Step.GetTracingName(), durationMs, source->GetRecordsCount(), source->GetReservedMemory()); +} + void TDuplicateFilter::TFilterSubscriber::OnFilterReady(NArrow::TColumnFilter&& filter) { if (auto source = Source.lock()) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "fetch_filter")("source", source->GetSourceIdx())( @@ -306,6 +451,9 @@ void TDuplicateFilter::TFilterSubscriber::OnFilterReady(NArrow::TColumnFilter&& } AFL_VERIFY(filter.GetRecordsCountVerified() == source->GetRecordsCount())("filter", filter.GetRecordsCountVerified())( "source", source->GetRecordsCount()); + + ReportTracing(source); + if (const std::shared_ptr appliedFilter = source->GetStageData().GetAppliedFilter()) { filter = filter.ApplyFilterFrom(*appliedFilter); } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h index db3a9415c422..4fc3bc186243 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h @@ -34,6 +34,7 @@ class TStepAggregationSources: public IFetchingStep { private: using TBase = IFetchingStep; const std::shared_ptr Aggregator; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const; public: TStepAggregationSources(const std::shared_ptr& proc) @@ -49,6 +50,7 @@ class TCleanAggregationSources: public IFetchingStep { private: using TBase = IFetchingStep; const std::shared_ptr Aggregator; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const; public: TCleanAggregationSources(const std::shared_ptr& proc) @@ -64,6 +66,7 @@ class TDetectInMemStep: public IFetchingStep { private: using TBase = IFetchingStep; const TColumnsSetIds Columns; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const; protected: virtual TConclusion DoExecuteInplace( @@ -85,6 +88,7 @@ class TPrepareResultStep: public IFetchingStep { private: using TBase = IFetchingStep; const bool StartResultBuildingInplace; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step, const TDuration executionDurationMs) const; protected: virtual TConclusion DoExecuteInplace( @@ -111,6 +115,7 @@ class TBuildResultStep: public IFetchingStep { const ui32 RecordsCount; bool IsPageSkippedByFilter(const std::shared_ptr& source) const; std::shared_ptr BuildPageResultBatch(const std::shared_ptr& source) const; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step, const TDuration executionDurationMs) const; protected: virtual TConclusion DoExecuteInplace( @@ -134,6 +139,7 @@ class TColumnBlobsFetchingStep: public IFetchingStep { private: using TBase = IFetchingStep; TColumnsSetIds Columns; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const; protected: virtual TConclusion DoExecuteInplace( @@ -151,30 +157,14 @@ class TColumnBlobsFetchingStep: public IFetchingStep { } }; -class TStartPortionAccessorFetchingStep: public IFetchingStep { -private: - using TBase = IFetchingStep; - -protected: - virtual TConclusion DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; - virtual TString DoDebugString() const override { - return TStringBuilder(); - } - -public: - TStartPortionAccessorFetchingStep() - : TBase("START_FETCHING_ACCESSOR") { - } -}; - class TPortionAccessorFetchedStep: public IFetchingStep { private: using TBase = IFetchingStep; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const; protected: virtual TConclusion DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const override; + const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; virtual TString DoDebugString() const override { return TStringBuilder(); } @@ -190,6 +180,7 @@ class TFilterCutLimit: public IFetchingStep { using TBase = IFetchingStep; const ui32 Limit; const bool Reverse; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const; public: virtual TConclusion DoExecuteInplace( @@ -205,6 +196,7 @@ class TFilterCutLimit: public IFetchingStep { class TPredicateFilter: public IFetchingStep { private: using TBase = IFetchingStep; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step, const ui32 filteredRows) const; public: virtual TConclusion DoExecuteInplace( @@ -217,6 +209,7 @@ class TPredicateFilter: public IFetchingStep { class TConflictDetector: public IFetchingStep { private: using TBase = IFetchingStep; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const; public: virtual TConclusion DoExecuteInplace( @@ -229,6 +222,7 @@ class TConflictDetector: public IFetchingStep { class TSnapshotFilter: public IFetchingStep { private: using TBase = IFetchingStep; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const; public: virtual TConclusion DoExecuteInplace( @@ -241,10 +235,11 @@ class TSnapshotFilter: public IFetchingStep { class TInitializeSourceStep: public IFetchingStep { private: using TBase = IFetchingStep; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const; public: virtual TConclusion DoExecuteInplace( - const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const override; + const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; TInitializeSourceStep() : TBase("INITIALIZE_SOURCE") { } @@ -254,6 +249,7 @@ class TDetectInMemFlag: public IFetchingStep { private: using TBase = IFetchingStep; TColumnsSetIds Columns; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step, const ui64 columnRawBytes, const ui64 columnBlobBytes) const; public: virtual TConclusion DoExecuteInplace( @@ -267,6 +263,7 @@ class TDetectInMemFlag: public IFetchingStep { class TDeletionFilter: public IFetchingStep { private: using TBase = IFetchingStep; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const; public: virtual TConclusion DoExecuteInplace( @@ -279,6 +276,7 @@ class TDeletionFilter: public IFetchingStep { class TShardingFilter: public IFetchingStep { private: using TBase = IFetchingStep; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const; public: virtual TConclusion DoExecuteInplace( @@ -298,6 +296,7 @@ class TDuplicateFilter: public IFetchingStep { TFetchingScriptCursor Step; NColumnShard::TCounterGuard TaskGuard; + void ReportTracing(const std::shared_ptr& source) const; virtual void OnFilterReady(NArrow::TColumnFilter&& filter) override; virtual void OnFailure(const TString& reason) override; @@ -317,6 +316,7 @@ class TDuplicateFilter: public IFetchingStep { class TUpdateAggregatedMemoryStep: public IFetchingStep { private: using TBase = IFetchingStep; + void ReportTracing(const std::shared_ptr& source, const TFetchingScriptCursor& step) const; public: virtual TConclusion DoExecuteInplace( diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp index d8aa655c3179..c63549ab4e93 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp @@ -10,11 +10,14 @@ #include #include +#include #include namespace NKikimr::NOlap::NReader::NSimple { +LWTRACE_USING(YDB_CS_DATA_SOURCE); + TConclusionStatus TScanHead::Start() { return TConclusionStatus::Success(); } @@ -57,6 +60,7 @@ TConclusion TScanHead::BuildNextInterval() { if (!source) { return changed; } + source->OnStartProcessing(); SyncPoints.front()->AddSource(std::move(source)); changed = true; } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index 183395873079..317c1f3d7e8d 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -23,6 +24,8 @@ namespace NKikimr::NOlap::NReader::NSimple { +LWTRACE_USING(YDB_CS_DATA_SOURCE); + void IDataSource::InitFetchingPlan(const std::shared_ptr& fetching) { AFL_VERIFY(fetching); // AFL_VERIFY(!FetchingPlan); @@ -79,12 +82,19 @@ void IDataSource::DoOnSourceFetchingFinishedSafe(IDataReader& owner, const std:: void IDataSource::DoOnEmptyStageData(const std::shared_ptr& /*sourcePtr*/) { TMemoryProfileGuard mpg("SCAN_PROFILE::STAGE_RESULT_EMPTY", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); - ResourceGuards.clear(); + ClearMemoryGuards(); StageResult = TFetchedResult::BuildEmpty(); StageResult->SetPages({}); ClearStageData(); } +void IDataSource::ClearMemoryGuards() { + const ui64 freedBytes = GetResourceGuardsMemory(); + LWTRACK(MemoryFree, DataSourceOrbit, GetRawPathId(), GetTabletId(), GetTxId(), (ui64)GetSourceIdx(), freedBytes); + ResourceGuards.clear(); + SourceGroupGuard.reset(); +} + void IDataSource::DoBuildStageResult(const std::shared_ptr& /*sourcePtr*/) { Finalize(NYDBTest::TControllers::GetColumnShardController()->GetMemoryLimitScanPortion()); } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h index 8a2da34f1b73..a15f8b2e6ed7 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h @@ -105,10 +105,7 @@ class IDataSource: public NCommon::IDataSource { } } - void ClearMemoryGuards() { - ResourceGuards.clear(); - SourceGroupGuard.reset(); - } + void ClearMemoryGuards(); ui32 GetPurposeSyncPointIndex() const { AFL_VERIFY(PurposeSyncPointIndex); @@ -141,6 +138,10 @@ class IDataSource: public NCommon::IDataSource { return *UsedRawBytes; } + virtual ui64 GetUsedRawBytesOptional() const override { + return UsedRawBytes.value_or(0); + } + void SetUsedRawBytes(const ui64 value) { AFL_VERIFY(!UsedRawBytes); UsedRawBytes = value; @@ -171,8 +172,7 @@ class IDataSource: public NCommon::IDataSource { ClearStageData(); MutableExecutionContext().Stop(); StageResult.reset(); - ResourceGuards.clear(); - SourceGroupGuard = nullptr; + ClearMemoryGuards(); } void SetIsStartedByCursor() { @@ -331,6 +331,10 @@ class TPortionDataSource: public IDataSource { return Schema; } + virtual const std::shared_ptr& GetSourceSchemaOptional() const override { + return Schema; + } + const TReplaceKeyAdapter& GetStart() const { return Start; } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.cpp index d01f28ba5aba..d6c199b267d9 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.cpp @@ -2,6 +2,7 @@ #include +#include #include namespace NKikimr::NOlap::NReader::NSimple { @@ -19,6 +20,7 @@ void ISyncPoint::OnSourcePrepared(std::shared_ptr&& source AFL_DEBUG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG)("event_log", sourceInput->GetEventsReport())("count", SourcesSequentially.size())( "source_idx", sourceInput->GetSourceIdx()); AFL_VERIFY(sourceInput->IsSyncSection())("source_idx", sourceInput->GetSourceIdx()); + InitSourceTracingMetrics(sourceInput); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "OnSourcePrepared")("source_idx", sourceInput->GetSourceIdx())( "prepared", IsSourcePrepared(sourceInput)); AFL_VERIFY(SourcesSequentially.size()); @@ -109,6 +111,21 @@ void ISyncPoint::AddSource(std::shared_ptr&& source) { } } +void ISyncPoint::InitSourceTracingMetrics(const std::shared_ptr& source) const { + if (!NLWTrace::HasShuttles(source->GetDataSourceOrbit())) { + return; + } + source->SetSourcesAheadQueueEnterTime(TMonotonic::Now()); + ui32 sourcesAhead = 0; + for (const auto& s : SourcesSequentially) { + if (s->GetSourceIdx() == source->GetSourceIdx()) { + break; + } + ++sourcesAhead; + } + source->SetSourcesAhead(sourcesAhead); +} + void ISyncPoint::OnSourceFinished() { if (Next) { Next->OnSourceFinished(); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.h index 39811ebe989b..a2a09aa5fbce 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/abstract.h @@ -27,6 +27,8 @@ class ISyncPoint { virtual void DoAbort() = 0; bool AbortFlag = false; + void InitSourceTracingMetrics(const std::shared_ptr& source) const; + protected: const std::shared_ptr Context; const std::shared_ptr Collection; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/aggr.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/aggr.h index ebc85f4ff73f..75aa6989cadc 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/aggr.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/aggr.h @@ -2,9 +2,12 @@ #include "abstract.h" #include +#include namespace NKikimr::NOlap::NReader::NSimple { +LWTRACE_USING(YDB_CS_DATA_SOURCE); + class TScanWithLimitCollection; class TSyncPointResultsAggregationControl: public ISyncPoint { @@ -127,6 +130,9 @@ class TSyncPointResultsAggregationControl: public ISyncPoint { } virtual ESourceAction OnSourceReady(const std::shared_ptr& source, TPlainReadData& reader) override { + LWTRACK(SyncAggrSyncPoint, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), GetPointName(), source->GetFilteredRowsCount(), source->GetReservedMemory(), + source->GetSourcesAheadQueueWaitDuration(), source->GetSourcesAhead(), DebugString()); --InFlightControl; if (InFlightControl.Val() == 0) { for (auto&& i : SourcesToAggregate) { @@ -177,7 +183,7 @@ class TSyncPointResultsAggregationControl: public ISyncPoint { "table", resultChunk->GetTable()->num_rows())("original_count", source->GetRecordsCount())("activity", AggregationActivity); reader.OnIntervalResult( std::make_unique(source->ExtractResourceGuards(), source->MutableAs()->ExtractGroupGuard(), - resultChunk->ExtractTable(), std::move(cursor), Context->GetCommonContext(), std::nullopt)); + resultChunk->ExtractTable(), std::move(cursor), Context->GetCommonContext(), std::nullopt, source->GetDeprecatedPortionId())); source->MutableAs()->ClearResult(); return ESourceAction::Finish; } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp index 59cdd95bf984..bdda3a4cd778 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/limit.cpp @@ -1,9 +1,12 @@ #include "limit.h" #include +#include namespace NKikimr::NOlap::NReader::NSimple { +LWTRACE_USING(YDB_CS_DATA_SOURCE); + TSyncPointLimitControl::TSyncPointLimitControl(const ui32 limit, const ui32 pointIndex, const std::shared_ptr& context, const std::shared_ptr& collection) : TBase(pointIndex, "SYNC_LIMIT", context, collection) @@ -45,6 +48,9 @@ std::shared_ptr TSyncPointLimitControl::OnAddSource(const ISyncPoint::ESourceAction TSyncPointLimitControl::OnSourceReady( const std::shared_ptr& source, TPlainReadData& /*reader*/) { + LWTRACK(LimitSyncPoint, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), GetPointName(), source->GetFilteredRowsCount(), source->GetReservedMemory(), + source->GetSourcesAheadQueueWaitDuration(), source->GetSourcesAhead(), DebugString()); if (FetchedCount >= Limit) { return ESourceAction::Finish; } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.cpp index 97dc20aea59b..f3e9925241da 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points/result.cpp @@ -1,9 +1,12 @@ #include "result.h" #include +#include namespace NKikimr::NOlap::NReader::NSimple { +LWTRACE_USING(YDB_CS_DATA_SOURCE); + bool TSyncPointResult::IsSourcePrepared(const std::shared_ptr& source) const { if (!Next) { return source->IsSyncSection() && source->HasStageResult() && @@ -17,6 +20,12 @@ bool TSyncPointResult::IsSourcePrepared(const std::shared_ptr& source, TPlainReadData& reader) { + const ui32 resultChunkRowsCount = (source->HasStageResult() && !source->GetStageResult().IsEmpty()) + ? source->GetStageResult().GetResultChunkRowsCount() + : 0; + LWTRACK(ResultSyncPoint, source->GetDataSourceOrbit(), source->GetRawPathId(), source->GetTabletId(), + source->GetTxId(), source->GetDeprecatedPortionId(), GetPointName(), source->GetFilteredRowsCount(), resultChunkRowsCount, + source->GetReservedMemory(), source->GetSourcesAheadQueueWaitDuration(), source->GetSourcesAhead(), DebugString()); if (Next) { if (source->HasStageResult() && source->GetStageResult().IsEmpty()) { return ESourceAction::Finish; @@ -42,7 +51,7 @@ ISyncPoint::ESourceAction TSyncPointResult::OnSourceReady(const std::shared_ptr< Context->GetCommonContext()->GetReadMetadata()->GetTabletId()); reader.OnIntervalResult( std::make_unique(source->GetResourceGuards(), source->MutableAs()->GetGroupGuard(), - resultChunk->ExtractTable(), std::move(cursor), Context->GetCommonContext(), partialSourceAddress)); + resultChunk->ExtractTable(), std::move(cursor), Context->GetCommonContext(), partialSourceAddress, source->GetDeprecatedPortionId())); } else if (!isFinished) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "continue_source")("source_idx", source->GetSourceIdx())( "source_idx", source->GetSourceIdx()); diff --git a/ydb/core/tx/columnshard/engines/reader/tracing/data_source_probes.h b/ydb/core/tx/columnshard/engines/reader/tracing/data_source_probes.h index d8b7ed6f03bb..352e58f8aa6e 100644 --- a/ydb/core/tx/columnshard/engines/reader/tracing/data_source_probes.h +++ b/ydb/core/tx/columnshard/engines/reader/tracing/data_source_probes.h @@ -5,50 +5,154 @@ namespace NKikimr::NOlap::NReader { #define YDB_CS_DATA_SOURCE(PROBE, EVENT, GROUPS, TYPES, NAMES) \ - PROBE(DataSourceStepStart, \ + PROBE(ProgramConst, \ GROUPS("DataSource"), \ - TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui32), \ - NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "columnsCount", "recordsCount")) \ - PROBE(DataSourceStepFinish, \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, ui32, TDuration, TDuration, ui32, ui64, TString, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "programNodeId", "durationMs", "executionDurationMs", "rowsCount", "totalReservedBytes", "executionResult", "details")) \ + PROBE(ProgramCalculation, \ GROUPS("DataSource"), \ - TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, TDuration, ui32, ui32, ui64, ui64), \ - NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "executionDurationMs", "columnsCount", "recordsCount", "bytesRead", "indexUsage")) \ - PROBE(ProgramChainStart, \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, ui32, TDuration, TDuration, ui32, ui64, TString, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "programNodeId", "durationMs", "executionDurationMs", "rowsCount", "totalReservedBytes", "executionResult", "details")) \ + PROBE(ProgramProjection, \ GROUPS("DataSource"), \ - TYPES(ui64, ui64, ui64, ui64, ui32, TString, ui32, TDuration, ui32), \ - NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "programNodeId", "durationMs", "recordsCount")) \ - PROBE(ProgramChainFinish, \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, ui32, TDuration, TDuration, ui32, ui64, TString, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "programNodeId", "durationMs", "executionDurationMs", "rowsCount", "totalReservedBytes", "executionResult", "details")) \ + PROBE(ProgramFilter, \ GROUPS("DataSource"), \ - TYPES(ui64, ui64, ui64, ui64, ui32, TString, ui32, TDuration, TDuration, ui32, TString), \ - NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "programNodeId", "durationMs", "executionDurationMs", "recordsCount", "executionResult")) \ - PROBE(AssemblerStepStart, \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, ui32, TDuration, TDuration, ui32, ui64, TString, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "programNodeId", "durationMs", "executionDurationMs", "rowsCount", "totalReservedBytes", "executionResult", "details")) \ + PROBE(ProgramAggregation, \ GROUPS("DataSource"), \ - TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui32), \ - NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "columnsCount", "recordsCount")) \ - PROBE(AssemblerStepFinish, \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, ui32, TDuration, TDuration, ui32, ui64, TString, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "programNodeId", "durationMs", "executionDurationMs", "rowsCount", "totalReservedBytes", "executionResult", "details")) \ + PROBE(ProgramFetchOriginalData, \ GROUPS("DataSource"), \ - TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, TDuration, ui32, ui64, ui32), \ - NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "executionDurationMs", "columnsCount", "bytesAssembled", "recordsCount")) \ - PROBE(ColumnBlobsFetchingStart, \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, ui32, TDuration, TDuration, ui32, ui64, ui64, ui64, TString, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "programNodeId", "durationMs", "executionDurationMs", "rowsCount", "blobBytes", "rawBytes", "totalReservedBytes", "executionResult", "details")) \ + PROBE(ProgramAssembleOriginalData, \ GROUPS("DataSource"), \ - TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui32), \ - NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "columnsCount", "recordsCount")) \ - PROBE(ColumnBlobsFetchingFinish, \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, ui32, TDuration, TDuration, ui32, ui64, TString, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "programNodeId", "durationMs", "executionDurationMs", "rowsCount", "totalReservedBytes", "executionResult", "details")) \ + PROBE(ProgramCheckIndexData, \ GROUPS("DataSource"), \ - TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, TDuration, ui32, ui64, ui32), \ - NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "executionDurationMs", "columnsCount", "bytesRead", "recordsCount")) \ - PROBE(MemoryAllocationStart, \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, ui32, TDuration, TDuration, ui32, ui32, TString, ui64, TString, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "programNodeId", "durationMs", "executionDurationMs", "rowsCount", "filteredRows", "indexStatus", "totalReservedBytes", "executionResult", "details")) \ + PROBE(ProgramCheckHeaderData, \ GROUPS("DataSource"), \ - TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui64), \ - NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "requestedBytes")) \ - PROBE(MemoryAllocationFinish, \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, ui32, TDuration, TDuration, ui32, ui64, TString, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "programNodeId", "durationMs", "executionDurationMs", "rowsCount", "totalReservedBytes", "executionResult", "details")) \ + PROBE(ProgramStreamLogic, \ GROUPS("DataSource"), \ - TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, TDuration, ui64, bool), \ - NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "executionDurationMs", "allocatedBytes", "success")) \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, ui32, TDuration, TDuration, ui32, ui64, TString, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "programNodeId", "durationMs", "executionDurationMs", "rowsCount", "totalReservedBytes", "executionResult", "details")) \ + PROBE(ProgramReserveMemory, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, ui32, TDuration, TDuration, ui32, ui64, ui64, TString, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "programNodeId", "durationMs", "executionDurationMs", "rowsCount", "reservedBytes", "totalReservedBytes", "executionResult", "details")) \ + PROBE(AssemblerStep, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, TDuration, ui32, ui64, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "executionDurationMs", "columnsCount", "bytesAssembled", "rowsCount", "totalReservedBytes")) \ + PROBE(ColumnBlobsFetching, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, TDuration, ui32, ui64, ui64, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "executionDurationMs", "columnsCount", "blobBytes", "rawBytes", "rowsCount", "totalReservedBytes")) \ + PROBE(MemoryAllocation, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, TDuration, ui64, bool, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "executionDurationMs", "reservedBytes", "success", "totalReservedBytes")) \ + PROBE(StartSourceProcessing, \ + GROUPS("Orbit", "DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui64, ui64, ui64, TString, TString, TString, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "blobBytes", "rawBytes", "totalReservedBytes", "minPk", "maxPk", "minSnapshot", "maxSnapshot")) \ + PROBE(Deduplication, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "rowsCount", "totalReservedBytes")) \ + PROBE(DetectInMemFlag, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui64, ui64, bool, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "totalColumnBlobBytes", "totalColumnRawBytes", "sourceInMemory", "rowsCount", "totalReservedBytes")) \ + PROBE(InitializeSource, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "rowsCount", "totalReservedBytes")) \ + PROBE(PredicateFilter, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "rowsCount", "filteredRows", "totalReservedBytes")) \ + PROBE(SnapshotFilter, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "rowsCount", "totalReservedBytes")) \ + PROBE(DeletionFilter, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "rowsCount", "totalReservedBytes")) \ + PROBE(ShardingFilter, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "rowsCount", "totalReservedBytes")) \ + PROBE(FilterCutLimit, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "rowsCount", "totalReservedBytes")) \ + PROBE(ConflictDetector, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "rowsCount", "totalReservedBytes")) \ + PROBE(BuildResult, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, TDuration, ui32, ui32, ui64, TDuration, ui32), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "executionDurationMs", "rowsCount", "pageRecordsCount", "totalReservedBytes", "sourcesAheadQueueWaitMs", "sourcesAhead")) \ + PROBE(PrepareResult, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, TDuration, ui32, ui64, TDuration, ui32), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "executionDurationMs", "rowsCount", "totalReservedBytes", "sourcesAheadQueueWaitMs", "sourcesAhead")) \ + PROBE(StartPortionAccessorFetching, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "rowsCount", "totalReservedBytes")) \ + PROBE(PortionAccessorFetched, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "rowsCount", "totalReservedBytes")) \ + PROBE(UpdateAggregatedMemory, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "rowsCount", "totalReservedBytes")) \ + PROBE(AggregationSources, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "rowsCount", "totalReservedBytes")) \ + PROBE(CleanAggregationSources, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "rowsCount", "totalReservedBytes")) \ + PROBE(DetectInMem, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "rowsCount", "totalReservedBytes")) \ PROBE(SourceFinished, \ GROUPS("DataSource"), \ - TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, TDuration, ui64, TDuration), \ - NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "totalDurationMs", "totalBytesRead", "totalExecutionTimeMs")) \ + TYPES(ui64, ui64, ui64, ui64, ui32, TString, TDuration, TDuration, ui64, TDuration, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "stepIndex", "name", "durationMs", "totalDurationMs", "totalBytesRead", "totalExecutionTimeMs", "totalReservedBytes")) \ + PROBE(ResultSyncPoint, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, TString, ui32, ui32, ui64, TDuration, ui32, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "name", "rowsCount", "resultChunkRowsCount", "totalReservedBytes", "sourcesAheadQueueWaitMs", "sourcesAhead", "details")) \ + PROBE(LimitSyncPoint, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, TString, ui32, ui64, TDuration, ui32, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "name", "rowsCount", "totalReservedBytes", "sourcesAheadQueueWaitMs", "sourcesAhead", "details")) \ + PROBE(SyncAggrSyncPoint, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, TString, ui32, ui64, TDuration, ui32, TString), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "name", "rowsCount", "totalReservedBytes", "sourcesAheadQueueWaitMs", "sourcesAhead", "details")) \ + PROBE(MemoryFree, \ + GROUPS("DataSource"), \ + TYPES(ui64, ui64, ui64, ui64, ui64), \ + NAMES("pathId", "tabletId", "txId", "sourceId", "freedBytes")) \ LWTRACE_DECLARE_PROVIDER(YDB_CS_DATA_SOURCE) diff --git a/ydb/core/tx/columnshard/engines/reader/tracing/probes.cpp b/ydb/core/tx/columnshard/engines/reader/tracing/probes.cpp index 7c7dbf053229..ae1797e8558a 100644 --- a/ydb/core/tx/columnshard/engines/reader/tracing/probes.cpp +++ b/ydb/core/tx/columnshard/engines/reader/tracing/probes.cpp @@ -2,7 +2,7 @@ namespace NKikimr::NOlap::NReader { -LWTRACE_DEFINE_PROVIDER(YDB_CS_READER); +LWTRACE_DEFINE_PROVIDER(YDB_CS_SCAN); } diff --git a/ydb/core/tx/columnshard/engines/reader/tracing/probes.h b/ydb/core/tx/columnshard/engines/reader/tracing/probes.h index e196f0247dfb..8ca77062e762 100644 --- a/ydb/core/tx/columnshard/engines/reader/tracing/probes.h +++ b/ydb/core/tx/columnshard/engines/reader/tracing/probes.h @@ -4,20 +4,36 @@ namespace NKikimr::NOlap::NReader { -#define YDB_CS_READER(PROBE, EVENT, GROUPS, TYPES, NAMES) \ +#define YDB_CS_SCAN(PROBE, EVENT, GROUPS, TYPES, NAMES) \ + PROBE(StartScan, \ + GROUPS("Orbit", "Scan"), \ + TYPES(ui64, ui64, ui64, ui64), \ + NAMES("pathId", "tabletId", "txId", "scanId")) \ + PROBE(ScanFinished, \ + GROUPS("Scan"), \ + TYPES(ui64, ui64, ui64, ui64, TDuration, ui64, ui64, ui64, ui64), \ + NAMES("pathId", "tabletId", "txId", "scanId", "totalDurationMs", "totalRowsCount", "totalPartialSourcesCount", "totalBlobBytes", "totalRawBytes")) \ PROBE(SendResult, \ - GROUPS("Reader"), \ - TYPES(ui64, ui64, ui64, ui64, ui64, TDuration, TDuration, TDuration, bool), \ - NAMES("tabletId", "txId", "scanId", "rows", "bytes", "cpuTimeMs", "waitTimeMs", "elapsedMs", "finished")) \ + GROUPS("Scan"), \ + TYPES(ui64, ui64, ui64, ui64, ui64, ui64, ui64, TDuration, TDuration, TDuration, bool), \ + NAMES("pathId", "tabletId", "txId", "scanId", "sourceId", "rows", "bytes", "cpuTimeMs", "waitTimeMs", "elapsedMs", "finished")) \ PROBE(AckReceived, \ - GROUPS("Reader"), \ - TYPES(ui64, ui64, ui64, TDuration), \ - NAMES("tabletId", "txId", "scanId", "elapsedMs")) \ - PROBE(TaskProcessed, \ - GROUPS("Reader"), \ - TYPES(ui64, ui64, ui64, TDuration), \ - NAMES("tabletId", "txId", "scanId", "elapsedMs")) \ + GROUPS("Scan"), \ + TYPES(ui64, ui64, ui64, ui64, TDuration), \ + NAMES("pathId", "tabletId", "txId", "scanId", "elapsedMs")) \ + PROBE(ScanStartSource, \ + GROUPS("Scan"), \ + TYPES(ui64, ui64, ui64, ui64, ui64, ui64, ui64, TString, TString, TString, TString), \ + NAMES("pathId", "tabletId", "txId", "scanId", "sourceId", "blobBytes", "rawBytes", "minPk", "maxPk", "minSnapshot", "maxSnapshot")) \ + PROBE(ScanFinishSource, \ + GROUPS("Scan"), \ + TYPES(ui64, ui64, ui64, ui64, ui64, ui64, ui64, ui32, ui32, ui64), \ + NAMES("pathId", "tabletId", "txId", "scanId", "sourceId", "blobBytes", "rawBytes", "filteredRows", "totalRows", "totalReservedBytes")) \ + PROBE(ColumnEngineForLogsSelect, \ + GROUPS("Scan"), \ + TYPES(ui64, ui64, ui64, ui64, ui64, ui64, ui64, ui64, ui64), \ + NAMES("pathId", "tabletId", "txId", "scanId", "timeOfInsertedSelectMs", "timeOfCommittedSelectMs", "totalPortionsCount", "totalFilteredPortionsCount", "totalResultSize")) \ -LWTRACE_DECLARE_PROVIDER(YDB_CS_READER) +LWTRACE_DECLARE_PROVIDER(YDB_CS_SCAN) } \ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp b/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp index c7191aee5f6e..24feda213479 100644 --- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp +++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp @@ -3,10 +3,13 @@ #include #include #include +#include #include namespace NKikimr::NOlap::NReader { +LWTRACE_USING(YDB_CS_SCAN); + void TTxInternalScan::SendError(const TString& problem, const TString& details, const TActorContext& ctx) const { AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("problem", problem)("details", details); auto& request = *InternalScanEvent->Get(); @@ -116,9 +119,11 @@ void TTxInternalScan::Complete(const TActorContext& ctx) { scanDiagnosticsEvent->RequestId = requestCookie; ctx.Send(Self->ScanDiagnosticsActorId, std::move(scanDiagnosticsEvent)); } + auto orbit = std::make_shared(); + LWTRACK(StartScan, *orbit, request.GetPathId().GetInternalPathId().GetRawValue(), Self->TabletID(), request.GetLockId().value_or(0), ScanId); auto scanActorId = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->ScanDiagnosticsActorId, Self->GetStoragesManager(), Self->DataAccessorsManager.GetObjectPtrVerified(), Self->ColumnDataManager.GetObjectPtrVerified(), TComputeShardingPolicy(), ScanId, request.GetLockId().value_or(0), ScanGen, requestCookie, - Self->TabletID(), TDuration::Max(), readMetadataRange, NKikimrDataEvents::FORMAT_ARROW, Self->Counters.GetScanCounters(), {})); + Self->TabletID(), TDuration::Max(), readMetadataRange, NKikimrDataEvents::FORMAT_ARROW, Self->Counters.GetScanCounters(), {}, std::move(orbit))); Self->InFlightReadsTracker.AddScanActorId(requestCookie, scanActorId); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxInternalScan started")("actor_id", scanActorId)("trace_detailed", detailedInfo); diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp index 54eafa4a3ad7..3b435da29407 100644 --- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp +++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp @@ -4,10 +4,13 @@ #include #include #include +#include #include namespace NKikimr::NOlap::NReader { +LWTRACE_USING(YDB_CS_SCAN); + void TTxScan::SendError(const TString& problem, const TString& details, const TActorContext& ctx) const { AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan failed")("problem", problem)("details", details); const auto& request = Ev->Get()->Record; @@ -31,6 +34,7 @@ bool TTxScan::Execute(TTransactionContext& /*txc*/, const TActorContext& /*ctx*/ void TTxScan::Complete(const TActorContext& ctx) { TMemoryProfileGuard mpg("TTxScan::Complete"); auto& request = Ev->Get()->Record; + auto orbit = std::make_shared(); auto scanComputeActor = Ev->Sender; TSnapshot snapshot = TSnapshot(request.GetSnapshot().GetStep(), request.GetSnapshot().GetTxId()); if (snapshot.IsZero()) { @@ -62,6 +66,7 @@ void TTxScan::Complete(const TActorContext& ctx) { const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build() ("tx_id", txId)("scan_id", scanId)("gen", scanGen)( "table", table)("snapshot", snapshot)("tablet", Self->TabletID())("timeout", timeout)("cpu_limits", cpuLimits.DebugString()); + ui64 rawPathId = 0; TReadMetadataPtr readMetadataRange; std::unique_ptr scanDiagnosticsEvent; { @@ -69,7 +74,9 @@ void TTxScan::Complete(const TActorContext& ctx) { TReadDescription read(Self->TabletID(), snapshot, sorting); read.DeduplicationPolicy = deduplicationEnabled ? EDeduplicationPolicy::PREVENT_DUPLICATES : EDeduplicationPolicy::ALLOW_DUPLICATES; + read.Orbit = orbit; read.TxId = txId; + read.ScanId = scanId; read.SetLock( request.HasLockTxId() ? std::make_optional(request.GetLockTxId()) : std::nullopt, request.HasLockNodeId() ? std::make_optional(request.GetLockNodeId()) : std::nullopt, @@ -88,10 +95,13 @@ void TTxScan::Complete(const TActorContext& ctx) { read.TableMetadataAccessor = accConclusion.DetachResult(); } if (auto pathId = read.TableMetadataAccessor->GetPathId()) { + auto internalPathId = pathId->GetInternalPathIdOptional().value_or(TInternalPathId::FromRawValue(0)); + rawPathId = internalPathId.GetRawValue(); Self->Counters.GetColumnTablesCounters() - ->GetPathIdCounter(pathId->GetInternalPathIdOptional().value_or(TInternalPathId::FromRawValue(0))) + ->GetPathIdCounter(internalPathId) ->OnReadEvent(); } + LWTRACK(StartScan, *orbit, rawPathId, Self->TabletID(), request.GetTxId(), request.GetScanId()); } const TString defaultReader = [&]() { @@ -211,7 +221,8 @@ void TTxScan::Complete(const TActorContext& ctx) { auto scanActorId = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->ScanDiagnosticsActorId, Self->GetStoragesManager(), Self->DataAccessorsManager.GetObjectPtrVerified(), Self->ColumnDataManager.GetObjectPtrVerified(), shardingPolicy, scanId, txId, - scanGen, requestCookie, Self->TabletID(), timeout, readMetadataRange, dataFormat, Self->Counters.GetScanCounters(), cpuLimits)); + scanGen, requestCookie, Self->TabletID(), timeout, readMetadataRange, dataFormat, Self->Counters.GetScanCounters(), cpuLimits, + std::move(orbit), rawPathId)); Self->InFlightReadsTracker.AddScanActorId(requestCookie, scanActorId); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan started")("actor_id", scanActorId)("trace_detailed", detailedInfo); diff --git a/ydb/core/tx/columnshard/tracing/probes.h b/ydb/core/tx/columnshard/tracing/probes.h index 1b91d1256886..b3e6fc65ca3b 100644 --- a/ydb/core/tx/columnshard/tracing/probes.h +++ b/ydb/core/tx/columnshard/tracing/probes.h @@ -27,10 +27,6 @@ namespace NKikimr::NColumnShard { GROUPS("Cleanup"), \ TYPES(ui64, ui64, ui64, ui64, ui64, ui64, ui64, ui64, bool, ui64, ui64), \ NAMES("tabletId", "totalPortionsCount", "totalPortions", "portionsPrepared", "drop", "skip", "portionsBatchCount", "chunksBatchCount", "limitExceeded", "maxPortionsBatchLimit", "maxChunksBatchLimit")) \ - PROBE(ColumnEngineForLogsSelect, \ - GROUPS("Read"), \ - TYPES(TString, ui64, ui64, ui64, ui64, ui64), \ - NAMES("pathId", "timeOfInsertedSelectMs", "timeOfCommittedSelectMs", "totalPortionsCount", "totalFilteredPortionsCount", "totalResultSize")) \ LWTRACE_DECLARE_PROVIDER(YDB_CS)