Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,12 @@ Status CloudTablet::sync_meta() {
_tablet_meta->set_time_series_compaction_level_threshold(
new_time_series_compaction_level_threshold);
}
// Sync disable_auto_compaction (stored in tablet_schema)
auto new_disable_auto_compaction = tablet_meta->tablet_schema()->disable_auto_compaction();
if (_tablet_meta->tablet_schema()->disable_auto_compaction() != new_disable_auto_compaction) {
_tablet_meta->mutable_tablet_schema()->set_disable_auto_compaction(
new_disable_auto_compaction);
}

return Status::OK();
}
Expand Down
267 changes: 267 additions & 0 deletions be/test/cloud/cloud_tablet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_warm_up_manager.h"
#include "common/config.h"
#include "cpp/sync_point.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_meta.h"
Expand Down Expand Up @@ -673,4 +675,269 @@ TEST_F(CloudTabletWarmUpStateTest, TestJobCanOverrideDoneStateFromJob) {
expected_state = WarmUpState {WarmUpTriggerSource::JOB, WarmUpProgress::DOING};
EXPECT_EQ(state, expected_state);
}

// Test class for sync_meta functionality
class CloudTabletSyncMetaTest : public testing::Test {
public:
CloudTabletSyncMetaTest() : _engine(CloudStorageEngine(EngineOptions {})) {}

void SetUp() override {
config::enable_file_cache = true;

// Use incrementing tablet_id to create unique schema cache keys for each test
// This avoids cache pollution between tests
int64_t unique_tablet_id = 15673 + _test_counter++;

// Create tablet meta with a schema that has disable_auto_compaction = false
TTabletSchema tablet_schema;
tablet_schema.__set_disable_auto_compaction(false);
// Add a unique column to ensure unique cache key
TColumn col;
col.__set_column_name("test_col_" + std::to_string(unique_tablet_id));
col.__set_column_type(TColumnType());
col.column_type.__set_type(TPrimitiveType::INT);
col.__set_is_key(true);
col.__set_aggregation_type(TAggregationType::NONE);
col.__set_col_unique_id(0);
tablet_schema.__set_columns({col});
tablet_schema.__set_keys_type(TKeysType::DUP_KEYS);

// Use column ordinal 0 -> unique_id 0 mapping
_tablet_meta.reset(new TabletMeta(1, 2, unique_tablet_id, 15674, 4, 5, tablet_schema, 1,
{{0, 0}}, UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));

_tablet =
std::make_shared<CloudTablet>(_engine, std::make_shared<TabletMeta>(*_tablet_meta));

_current_tablet_id = unique_tablet_id;
}

void TearDown() override { config::enable_file_cache = false; }

protected:
// Helper to create a unique TabletMeta for mock responses
TabletMetaSharedPtr createMockTabletMeta(bool disable_auto_compaction,
const std::string& compaction_policy = "size_based") {
TTabletSchema new_schema;
new_schema.__set_disable_auto_compaction(disable_auto_compaction);
TColumn col;
col.__set_column_name("test_col_" + std::to_string(_current_tablet_id));
col.__set_column_type(TColumnType());
col.column_type.__set_type(TPrimitiveType::INT);
col.__set_is_key(true);
col.__set_aggregation_type(TAggregationType::NONE);
col.__set_col_unique_id(0);
new_schema.__set_columns({col});
new_schema.__set_keys_type(TKeysType::DUP_KEYS);

TabletMetaSharedPtr meta;
meta.reset(new TabletMeta(1, 2, _current_tablet_id, 15674, 4, 5, new_schema, 1, {{0, 0}},
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F, 0, false, std::nullopt,
compaction_policy));
return meta;
}

TabletMetaSharedPtr _tablet_meta;
std::shared_ptr<CloudTablet> _tablet;
CloudStorageEngine _engine;
int64_t _current_tablet_id;
static inline int _test_counter = 0;
};

// Test sync_meta syncs disable_auto_compaction from false to true
TEST_F(CloudTabletSyncMetaTest, TestSyncMetaDisableAutoCompactionFalseToTrue) {
// Verify initial state: disable_auto_compaction = false
EXPECT_FALSE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction());

auto sp = SyncPoint::get_instance();
sp->clear_all_call_backs();
sp->enable_processing();

// Create mock tablet meta with disable_auto_compaction = true
auto mock_tablet_meta = createMockTabletMeta(true);

// Mock get_tablet_meta to return tablet_meta with disable_auto_compaction = true
sp->set_call_back("CloudMetaMgr::get_tablet_meta", [mock_tablet_meta](auto&& args) {
auto* tablet_meta_ptr = try_any_cast<TabletMetaSharedPtr*>(args[1]);
*tablet_meta_ptr = mock_tablet_meta;

// Tell the sync point to return with Status::OK()
try_any_cast_ret<Status>(args)->second = true;
});

// Call sync_meta
Status st = _tablet->sync_meta();
EXPECT_TRUE(st.ok());

// Verify disable_auto_compaction has been synced to true
EXPECT_TRUE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction());

sp->disable_processing();
sp->clear_all_call_backs();
}

// Test sync_meta syncs disable_auto_compaction from true to false
TEST_F(CloudTabletSyncMetaTest, TestSyncMetaDisableAutoCompactionTrueToFalse) {
// Create a tablet with disable_auto_compaction = true from the start
// We need to create a fresh tablet with true to avoid cache pollution issues
TTabletSchema initial_schema;
initial_schema.__set_disable_auto_compaction(true);
TColumn col;
col.__set_column_name("test_col_true_to_false_" + std::to_string(_current_tablet_id));
col.__set_column_type(TColumnType());
col.column_type.__set_type(TPrimitiveType::INT);
col.__set_is_key(true);
col.__set_aggregation_type(TAggregationType::NONE);
col.__set_col_unique_id(0);
initial_schema.__set_columns({col});
initial_schema.__set_keys_type(TKeysType::DUP_KEYS);

TabletMetaSharedPtr tablet_meta_true;
tablet_meta_true.reset(new TabletMeta(1, 2, _current_tablet_id + 1000, 15674, 4, 5,
initial_schema, 1, {{0, 0}}, UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
_tablet =
std::make_shared<CloudTablet>(_engine, std::make_shared<TabletMeta>(*tablet_meta_true));

// Verify initial state: disable_auto_compaction = true
EXPECT_TRUE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction());

auto sp = SyncPoint::get_instance();
sp->clear_all_call_backs();
sp->enable_processing();

// Create mock tablet meta with disable_auto_compaction = false (with matching column name)
TTabletSchema mock_schema;
mock_schema.__set_disable_auto_compaction(false);
TColumn mock_col;
mock_col.__set_column_name("test_col_true_to_false_" + std::to_string(_current_tablet_id));
mock_col.__set_column_type(TColumnType());
mock_col.column_type.__set_type(TPrimitiveType::INT);
mock_col.__set_is_key(true);
mock_col.__set_aggregation_type(TAggregationType::NONE);
mock_col.__set_col_unique_id(0);
mock_schema.__set_columns({mock_col});
mock_schema.__set_keys_type(TKeysType::DUP_KEYS);

TabletMetaSharedPtr mock_tablet_meta;
mock_tablet_meta.reset(new TabletMeta(1, 2, _current_tablet_id + 1000, 15674, 4, 5, mock_schema,
1, {{0, 0}}, UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));

// Mock get_tablet_meta to return tablet_meta with disable_auto_compaction = false
sp->set_call_back("CloudMetaMgr::get_tablet_meta", [mock_tablet_meta](auto&& args) {
auto* tablet_meta_ptr = try_any_cast<TabletMetaSharedPtr*>(args[1]);
*tablet_meta_ptr = mock_tablet_meta;

// Tell the sync point to return with Status::OK()
try_any_cast_ret<Status>(args)->second = true;
});

// Call sync_meta
Status st = _tablet->sync_meta();
EXPECT_TRUE(st.ok());

// Verify disable_auto_compaction has been synced to false
EXPECT_FALSE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction());

sp->disable_processing();
sp->clear_all_call_backs();
}

// Test sync_meta when disable_auto_compaction is unchanged
TEST_F(CloudTabletSyncMetaTest, TestSyncMetaDisableAutoCompactionUnchanged) {
// Verify initial state: disable_auto_compaction = false
EXPECT_FALSE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction());

auto sp = SyncPoint::get_instance();
sp->clear_all_call_backs();
sp->enable_processing();

// Create mock tablet meta with disable_auto_compaction = false (same as current)
auto mock_tablet_meta = createMockTabletMeta(false);

// Mock get_tablet_meta to return tablet_meta with same disable_auto_compaction = false
sp->set_call_back("CloudMetaMgr::get_tablet_meta", [mock_tablet_meta](auto&& args) {
auto* tablet_meta_ptr = try_any_cast<TabletMetaSharedPtr*>(args[1]);
*tablet_meta_ptr = mock_tablet_meta;

// Tell the sync point to return with Status::OK()
try_any_cast_ret<Status>(args)->second = true;
});

// Call sync_meta
Status st = _tablet->sync_meta();
EXPECT_TRUE(st.ok());

// Verify disable_auto_compaction remains false
EXPECT_FALSE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction());

sp->disable_processing();
sp->clear_all_call_backs();
}

// Test sync_meta is skipped when enable_file_cache is false
TEST_F(CloudTabletSyncMetaTest, TestSyncMetaSkippedWhenFileCacheDisabled) {
// Disable file cache
config::enable_file_cache = false;

// Set initial state: disable_auto_compaction = false
EXPECT_FALSE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction());

auto sp = SyncPoint::get_instance();
sp->clear_all_call_backs();
sp->enable_processing();

bool callback_called = false;
sp->set_call_back("CloudMetaMgr::get_tablet_meta",
[&callback_called](auto&& args) { callback_called = true; });

// Call sync_meta - should return early without calling get_tablet_meta
Status st = _tablet->sync_meta();
EXPECT_TRUE(st.ok());
EXPECT_FALSE(callback_called);

// Verify disable_auto_compaction is not changed
EXPECT_FALSE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction());

sp->disable_processing();
sp->clear_all_call_backs();
}

// Test sync_meta syncs compaction_policy together with disable_auto_compaction
TEST_F(CloudTabletSyncMetaTest, TestSyncMetaMultipleProperties) {
// Verify initial states
EXPECT_FALSE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction());
// Default compaction_policy is "size_based"
EXPECT_EQ(_tablet->tablet_meta()->compaction_policy(), "size_based");

auto sp = SyncPoint::get_instance();
sp->clear_all_call_backs();
sp->enable_processing();

// Create mock tablet meta with updated properties
auto mock_tablet_meta = createMockTabletMeta(true, "time_series");

// Mock get_tablet_meta to return tablet_meta with updated properties
sp->set_call_back("CloudMetaMgr::get_tablet_meta", [mock_tablet_meta](auto&& args) {
auto* tablet_meta_ptr = try_any_cast<TabletMetaSharedPtr*>(args[1]);
*tablet_meta_ptr = mock_tablet_meta;

// Tell the sync point to return with Status::OK()
try_any_cast_ret<Status>(args)->second = true;
});

// Call sync_meta
Status st = _tablet->sync_meta();
EXPECT_TRUE(st.ok());

// Verify both properties are synced
EXPECT_TRUE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction());
EXPECT_EQ(_tablet->tablet_meta()->compaction_policy(), "time_series");

sp->disable_processing();
sp->clear_all_call_backs();
}
} // namespace doris
33 changes: 29 additions & 4 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1209,15 +1209,40 @@ void MetaServiceImpl::update_tablet(::google::protobuf::RpcController* controlle
}

schema_pb.set_disable_auto_compaction(tablet_meta_info.disable_auto_compaction());
// Directly overwrite schema KV instead of using put_schema_kv,
// because put_schema_kv skips writing when the key already exists.
auto key = meta_schema_key(
{instance_id, tablet_meta.index_id(), tablet_meta.schema_version()});
put_schema_kv(code, msg, txn.get(), key, schema_pb);
if (code != MetaServiceCode::OK) return;
LOG_INFO("overwrite schema kv for disable_auto_compaction update")
.tag("key", hex(key))
.tag("disable_auto_compaction", tablet_meta_info.disable_auto_compaction());
// Remove old blob chunks first to avoid orphaned KVs when
// the value format or size changes across versions.
ValueBuf old_val;
auto get_err = cloud::blob_get(txn.get(), key, &old_val);
if (get_err == TxnErrorCode::TXN_OK) {
bool skip_remove = false;
TEST_SYNC_POINT_CALLBACK("update_tablet::skip_schema_remove", &skip_remove);
if (!skip_remove) {
old_val.remove(txn.get());
}
}
uint8_t ver = config::meta_schema_value_version;
if (ver > 0) {
cloud::blob_put(txn.get(), key, schema_pb, ver);
} else {
auto schema_value = schema_pb.SerializeAsString();
txn->put(key, schema_value);
}
if (is_versioned_write) {
auto key = versioned::meta_schema_key(
{instance_id, tablet_meta.index_id(), tablet_meta.schema_version()});
put_versioned_schema_kv(code, msg, txn.get(), key, schema_pb);
if (code != MetaServiceCode::OK) return;
doris::TabletSchemaCloudPB tablet_schema(schema_pb);
if (!document_put(txn.get(), key, std::move(tablet_schema))) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = "failed to serialize versioned tablet schema";
return;
}
}
}
}
Expand Down
Loading
Loading