Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/iceberg_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Status IcebergTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<Parent>();
RETURN_IF_ERROR(_writer->init_properties(p._pool));
RETURN_IF_ERROR(_writer->init_properties(p._pool, p._row_desc));
return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
#include "pipeline/exec/set_source_operator.h"
#include "pipeline/exec/sort_sink_operator.h"
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/exec/spill_iceberg_table_sink_operator.h"
#include "pipeline/exec/spill_sort_sink_operator.h"
#include "pipeline/exec/spill_sort_source_operator.h"
#include "pipeline/exec/streaming_aggregation_operator.h"
Expand Down Expand Up @@ -820,6 +821,7 @@ DECLARE_OPERATOR(OlapTableSinkV2LocalState)
DECLARE_OPERATOR(HiveTableSinkLocalState)
DECLARE_OPERATOR(TVFTableSinkLocalState)
DECLARE_OPERATOR(IcebergTableSinkLocalState)
DECLARE_OPERATOR(SpillIcebergTableSinkLocalState)
DECLARE_OPERATOR(AnalyticSinkLocalState)
DECLARE_OPERATOR(BlackholeSinkLocalState)
DECLARE_OPERATOR(SortSinkLocalState)
Expand Down Expand Up @@ -938,6 +940,8 @@ template class AsyncWriterSink<doris::vectorized::VTabletWriter, OlapTableSinkOp
template class AsyncWriterSink<doris::vectorized::VTabletWriterV2, OlapTableSinkV2OperatorX>;
template class AsyncWriterSink<doris::vectorized::VHiveTableWriter, HiveTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VIcebergTableWriter, IcebergTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VIcebergTableWriter,
SpillIcebergTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VTVFTableWriter, TVFTableSinkOperatorX>;

#ifdef BE_TEST
Expand Down
196 changes: 196 additions & 0 deletions be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "spill_iceberg_table_sink_operator.h"

#include "common/status.h"
#include "pipeline/exec/iceberg_table_sink_operator.h"
#include "pipeline/exec/spill_utils.h"
#include "vec/sink/writer/iceberg/viceberg_sort_writer.h"
#include "vec/sink/writer/iceberg/viceberg_table_writer.h"

namespace doris::pipeline {
#include "common/compile_check_begin.h"

SpillIcebergTableSinkLocalState::SpillIcebergTableSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
: Base(parent, state) {}

Status SpillIcebergTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);

_init_spill_counters();

auto& p = _parent->cast<Parent>();
RETURN_IF_ERROR(_writer->init_properties(p._pool, p._row_desc));
return Status::OK();
}

Status SpillIcebergTableSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_open_timer);
RETURN_IF_ERROR(Base::open(state));
return Status::OK();
}

bool SpillIcebergTableSinkLocalState::is_blockable() const {
return true;
}

size_t SpillIcebergTableSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos) {
if (!_writer || !_writer->_current_writer) {
return 0;
}

auto* sort_writer =
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());
if (!sort_writer || !sort_writer->sorter()) {
return 0;
}

return sort_writer->sorter()->get_reserve_mem_size(state, eos);
}

size_t SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState* state) const {
if (!_writer || !_writer->_current_writer) {
return 0;
}

auto* sort_writer =
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());
if (!sort_writer || !sort_writer->sorter()) {
return 0;
}

return sort_writer->sorter()->data_size();
}

Status SpillIcebergTableSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) {
if (!_writer || !_writer->_current_writer) {
if (spill_context) {
spill_context->on_task_finished();
}
return Status::OK();
}

auto* sort_writer =
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());

if (!sort_writer || !sort_writer->sorter()) {
if (spill_context) {
spill_context->on_task_finished();
}
return Status::OK();
}

auto exception_catch_func = [sort_writer]() {
auto status = [&]() {
RETURN_IF_CATCH_EXCEPTION({ return sort_writer->trigger_spill(); });
}();
return status;
};

state->get_query_ctx()->resource_ctx()->task_controller()->increase_revoking_tasks_count();
auto status =
SpillSinkRunnable(state, spill_context, operator_profile(), exception_catch_func).run();
if (!status.ok()) {
state->get_query_ctx()->resource_ctx()->task_controller()->decrease_revoking_tasks_count();
}
return status;
}

SpillIcebergTableSinkOperatorX::SpillIcebergTableSinkOperatorX(
ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr)
: Base(operator_id, 0, 0), _row_desc(row_desc), _t_output_expr(t_output_expr), _pool(pool) {
_spillable = true;
}

Status SpillIcebergTableSinkOperatorX::init(const TDataSink& thrift_sink) {
RETURN_IF_ERROR(Base::init(thrift_sink));
_name = "SPILL_ICEBERG_TABLE_SINK_OPERATOR";
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
return Status::OK();
}

Status SpillIcebergTableSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Base::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}

Status SpillIcebergTableSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
return local_state.sink(state, in_block, eos);
}

size_t SpillIcebergTableSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) {
auto& local_state = get_local_state(state);
return local_state.get_reserve_mem_size(state, eos);
}

size_t SpillIcebergTableSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
auto& local_state = get_local_state(state);
return local_state.get_revocable_mem_size(state);
}

Status SpillIcebergTableSinkOperatorX::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) {
auto& local_state = get_local_state(state);
return local_state.revoke_memory(state, spill_context);
}

void SpillIcebergTableSinkLocalState::_init_spill_counters() {
auto* profile = custom_profile();
//seems init_spill_write_counters()
ADD_TIMER_WITH_LEVEL(profile, "SpillWriteTime", 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteTaskWaitInQueueCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteTaskCount", TUnit::UNIT, 1);
ADD_TIMER_WITH_LEVEL(profile, "SpillWriteTaskWaitInQueueTime", 1);
ADD_TIMER_WITH_LEVEL(profile, "SpillWriteFileTime", 1);
ADD_TIMER_WITH_LEVEL(profile, "SpillWriteSerializeBlockTime", 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteBlockCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteBlockBytes", TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileBytes", TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteRows", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileTotalCount", TUnit::UNIT, 1);

//seems init_spill_read_counters()
ADD_TIMER_WITH_LEVEL(profile, "SpillTotalTime", 1);
ADD_TIMER_WITH_LEVEL(profile, "SpillRecoverTime", 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadTaskWaitInQueueCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadTaskCount", TUnit::UNIT, 1);
ADD_TIMER_WITH_LEVEL(profile, "SpillReadTaskWaitInQueueTime", 1);
ADD_TIMER_WITH_LEVEL(profile, "SpillReadFileTime", 1);
ADD_TIMER_WITH_LEVEL(profile, "SpillReadDerializeBlockTime", 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadBlockCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadBlockBytes", TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadFileBytes", TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadRows", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadFileCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentBytes", TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentCount", TUnit::UNIT, 1);
}

#include "common/compile_check_end.h"
} // namespace doris::pipeline
93 changes: 93 additions & 0 deletions be/src/pipeline/exec/spill_iceberg_table_sink_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>

#include "operator.h"
#include "vec/sink/writer/iceberg/viceberg_table_writer.h"

namespace doris::pipeline {
#include "common/compile_check_begin.h"

class SpillIcebergTableSinkLocalState;
class SpillIcebergTableSinkOperatorX;

class SpillIcebergTableSinkLocalState final
: public AsyncWriterSink<vectorized::VIcebergTableWriter, SpillIcebergTableSinkOperatorX> {
public:
using Base = AsyncWriterSink<vectorized::VIcebergTableWriter, SpillIcebergTableSinkOperatorX>;
using Parent = SpillIcebergTableSinkOperatorX;
ENABLE_FACTORY_CREATOR(SpillIcebergTableSinkLocalState);

SpillIcebergTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state);
~SpillIcebergTableSinkLocalState() override = default;

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;

bool is_blockable() const override;
[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
Status revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context);
size_t get_revocable_mem_size(RuntimeState* state) const;

private:
void _init_spill_counters();
friend class SpillIcebergTableSinkOperatorX;
};

class SpillIcebergTableSinkOperatorX final
: public DataSinkOperatorX<SpillIcebergTableSinkLocalState> {
public:
using Base = DataSinkOperatorX<SpillIcebergTableSinkLocalState>;
using LocalStateType = SpillIcebergTableSinkLocalState;

SpillIcebergTableSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr);

Status init(const TDataSink& thrift_sink) override;

Status prepare(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;

size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;

size_t revocable_mem_size(RuntimeState* state) const override;

Status revoke_memory(RuntimeState* state,
const std::shared_ptr<SpillContext>& spill_context) override;

using DataSinkOperatorX<LocalStateType>::node_id;
using DataSinkOperatorX<LocalStateType>::operator_id;
using DataSinkOperatorX<LocalStateType>::get_local_state;

private:
friend class SpillIcebergTableSinkLocalState;
template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
friend class AsyncWriterSink;

const RowDescriptor& _row_desc;
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
const std::vector<TExpr>& _t_output_expr;
ObjectPool* _pool = nullptr;
};

#include "common/compile_check_end.h"
} // namespace doris::pipeline
12 changes: 9 additions & 3 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
#include "pipeline/exec/set_source_operator.h"
#include "pipeline/exec/sort_sink_operator.h"
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/exec/spill_iceberg_table_sink_operator.h"
#include "pipeline/exec/spill_sort_sink_operator.h"
#include "pipeline/exec/spill_sort_source_operator.h"
#include "pipeline/exec/streaming_aggregation_operator.h"
Expand Down Expand Up @@ -1074,10 +1075,15 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
}
case TDataSinkType::ICEBERG_TABLE_SINK: {
if (!thrift_sink.__isset.iceberg_table_sink) {
return Status::InternalError("Missing hive table sink.");
return Status::InternalError("Missing iceberg table sink.");
}
if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
_sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
row_desc, output_exprs);
} else {
_sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
row_desc, output_exprs);
}
_sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
output_exprs);
break;
}
case TDataSinkType::JDBC_TABLE_SINK: {
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/common/sort/sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ Status FullSorter::append_block(Block* block) {

// iff have reach limit and the unsorted block capacity can't hold the block data size
if (_reach_limit() && !has_enough_capacity(block, _state->unsorted_block().get())) {
RETURN_IF_ERROR(_do_sort());
RETURN_IF_ERROR(do_sort());
}

{
Expand Down Expand Up @@ -268,7 +268,7 @@ Status FullSorter::prepare_for_read(bool is_spill) {
_state->ignore_offset();
}
if (_state->unsorted_block()->rows() > 0) {
RETURN_IF_ERROR(_do_sort());
RETURN_IF_ERROR(do_sort());
}
return _state->build_merge_tree(_sort_description);
}
Expand All @@ -282,7 +282,7 @@ Status FullSorter::merge_sort_read_for_spill(RuntimeState* state, doris::vectori
return _state->merge_sort_read(block, batch_size, eos);
}

Status FullSorter::_do_sort() {
Status FullSorter::do_sort() {
Block* src_block = _state->unsorted_block().get();
Block desc_block = src_block->clone_without_columns();
COUNTER_UPDATE(_partial_sort_counter, 1);
Expand Down
Loading
Loading