Skip to content

Commit 1244798

Browse files
authored
Merge branch 'master' into iceberg_stats_orders
2 parents 0526fc7 + 5a37369 commit 1244798

File tree

91 files changed

+8059
-361
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+8059
-361
lines changed

.licenserc.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ header:
4747
- "fe/fe-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java"
4848
- "fe/fe-core/src/main/antlr4/org/apache/doris/nereids/JavaLexer.g4"
4949
- "fe/fe-core/src/main/antlr4/org/apache/doris/nereids/JavaParser.g4"
50+
- "fe/.idea/vcs.xml"
5051
- "be/dict/ik/*"
5152
- "be/dict/pinyin/*"
5253
- "be/src/common/signal_handler.h"
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "cloud/cloud_cluster_info.h"
19+
20+
#include <glog/logging.h>
21+
22+
#include "cloud/cloud_meta_mgr.h"
23+
#include "cloud/cloud_storage_engine.h"
24+
#include "cloud/cloud_tablet.h"
25+
#include "cloud/config.h"
26+
#include "gen_cpp/cloud.pb.h"
27+
#include "runtime/exec_env.h"
28+
#include "util/time.h"
29+
30+
namespace doris {
31+
32+
CloudClusterInfo::~CloudClusterInfo() {
33+
stop_bg_worker();
34+
}
35+
36+
void CloudClusterInfo::start_bg_worker() {
37+
bool expected = true;
38+
if (!_bg_worker_stopped.compare_exchange_strong(expected, false)) {
39+
// Already running
40+
return;
41+
}
42+
43+
_bg_worker = std::thread(&CloudClusterInfo::_bg_worker_func, this);
44+
LOG(INFO) << "CloudClusterInfo background worker started, "
45+
<< "refresh_interval=" << config::cluster_status_cache_refresh_interval_sec << "s";
46+
}
47+
48+
void CloudClusterInfo::stop_bg_worker() {
49+
bool expected = false;
50+
if (!_bg_worker_stopped.compare_exchange_strong(expected, true)) {
51+
// Already stopped
52+
return;
53+
}
54+
55+
{
56+
std::lock_guard lock(_bg_worker_mutex);
57+
_bg_worker_cv.notify_all();
58+
}
59+
60+
if (_bg_worker.joinable()) {
61+
_bg_worker.join();
62+
}
63+
64+
LOG(INFO) << "CloudClusterInfo background worker stopped";
65+
}
66+
67+
void CloudClusterInfo::_bg_worker_func() {
68+
LOG(INFO) << "CloudClusterInfo background worker thread running";
69+
70+
while (!_bg_worker_stopped.load()) {
71+
_refresh_cluster_status();
72+
73+
std::unique_lock lock(_bg_worker_mutex);
74+
_bg_worker_cv.wait_for(
75+
lock, std::chrono::seconds(config::cluster_status_cache_refresh_interval_sec),
76+
[this] { return _bg_worker_stopped.load(); });
77+
}
78+
}
79+
80+
void CloudClusterInfo::_refresh_cluster_status() {
81+
if (!config::is_cloud_mode()) {
82+
return;
83+
}
84+
auto* cloud_engine =
85+
dynamic_cast<CloudStorageEngine*>(&ExecEnv::GetInstance()->storage_engine());
86+
if (!cloud_engine) {
87+
return;
88+
}
89+
90+
std::unordered_map<std::string, std::pair<int32_t, int64_t>> cluster_status;
91+
std::string resolved_cluster_id;
92+
Status st = cloud_engine->meta_mgr().get_cluster_status(
93+
&cluster_status, my_cluster_id().empty() ? &resolved_cluster_id : nullptr);
94+
if (!st.ok()) {
95+
LOG(WARNING) << "Failed to refresh cluster status: " << st;
96+
return;
97+
}
98+
99+
// Update cache
100+
{
101+
std::unique_lock lock(_mutex);
102+
_cluster_status_cache.clear();
103+
for (const auto& [cluster_id, status_pair] : cluster_status) {
104+
_cluster_status_cache[cluster_id] = {status_pair.first, status_pair.second};
105+
}
106+
}
107+
108+
VLOG_DEBUG << "Refreshed cluster status cache, " << cluster_status.size() << " clusters";
109+
110+
// Set our own cluster_id if resolved from the response
111+
if (my_cluster_id().empty() && !resolved_cluster_id.empty()) {
112+
set_my_cluster_id(resolved_cluster_id);
113+
LOG(INFO) << "Resolved my cluster_id: " << resolved_cluster_id;
114+
}
115+
}
116+
117+
bool CloudClusterInfo::should_skip_compaction(CloudTablet* tablet) const {
118+
if (!config::enable_compaction_rw_separation) {
119+
return false;
120+
}
121+
122+
std::string last_active_cluster = tablet->last_active_cluster_id();
123+
std::string my_cluster = my_cluster_id();
124+
int64_t tablet_id = tablet->tablet_id();
125+
126+
// Case 1: No active cluster record, any cluster can compact
127+
if (last_active_cluster.empty()) {
128+
VLOG_DEBUG << "tablet " << tablet_id << " has no last_active_cluster record, "
129+
<< "my_cluster=" << my_cluster << ", allow compaction";
130+
return false;
131+
}
132+
133+
// Case 2: This is the active cluster, allow compaction
134+
if (last_active_cluster == my_cluster) {
135+
VLOG_DEBUG << "tablet " << tablet_id << " last_active_cluster=" << last_active_cluster
136+
<< " equals my_cluster=" << my_cluster << ", allow compaction";
137+
return false;
138+
}
139+
140+
// Case 3: Check if the last active cluster is available
141+
ClusterStatusCache cache;
142+
if (!get_cluster_status(last_active_cluster, &cache)) {
143+
// Cluster not found in cache, might be deleted, allow takeover
144+
LOG(INFO) << "compaction_rw_separation: tablet " << tablet_id
145+
<< " last_active_cluster=" << last_active_cluster
146+
<< " not found in cache (maybe deleted), my_cluster=" << my_cluster
147+
<< ", allow takeover";
148+
return false;
149+
}
150+
151+
// Force compaction if tablet has too many rowsets (>80% of max_tablet_version_num),
152+
// even on read clusters, to prevent version count from growing unbounded
153+
// when the write cluster can't keep up or has compaction disabled.
154+
int64_t num_rowsets = tablet->fetch_add_approximate_num_rowsets(0);
155+
auto threshold = static_cast<int64_t>(tablet->max_version_config() *
156+
config::compaction_rw_separation_version_threshold_ratio);
157+
if (num_rowsets > threshold) {
158+
LOG(INFO) << "compaction_rw_separation: force compaction on tablet " << tablet_id
159+
<< ", num_rowsets=" << num_rowsets << " > threshold=" << threshold << " (80% of "
160+
<< tablet->max_version_config() << ")"
161+
<< ", my_cluster=" << my_cluster;
162+
return false;
163+
}
164+
165+
auto status = static_cast<cloud::ClusterStatus>(cache.status);
166+
int64_t status_mtime = cache.mtime_ms;
167+
int64_t now = UnixMillis();
168+
int64_t elapsed = now - status_mtime;
169+
int64_t timeout = config::compaction_cluster_takeover_timeout_ms;
170+
171+
// Case 4: Original cluster is NORMAL (still active), cannot takeover
172+
if (status == cloud::ClusterStatus::NORMAL) {
173+
LOG_EVERY_N(INFO, 100) << "compaction_rw_separation: skip tablet " << tablet_id
174+
<< ", last_active_cluster=" << last_active_cluster
175+
<< " is NORMAL (active), my_cluster=" << my_cluster;
176+
return true;
177+
}
178+
179+
// Case 5: Original cluster is unavailable (SUSPENDED/MANUAL_SHUTDOWN/deleted)
180+
if (elapsed > timeout) {
181+
// Takeover successful
182+
LOG(INFO) << "compaction_rw_separation: takeover tablet " << tablet_id
183+
<< ", last_active_cluster=" << last_active_cluster
184+
<< " status=" << cloud::ClusterStatus_Name(status)
185+
<< " status_mtime=" << status_mtime << " elapsed=" << elapsed
186+
<< "ms > timeout=" << timeout << "ms"
187+
<< ", my_cluster=" << my_cluster;
188+
return false;
189+
} else {
190+
// Timeout not reached yet, waiting
191+
LOG_EVERY_N(INFO, 100) << "compaction_rw_separation: skip tablet " << tablet_id
192+
<< ", last_active_cluster=" << last_active_cluster
193+
<< " status=" << cloud::ClusterStatus_Name(status)
194+
<< " status_mtime=" << status_mtime << " elapsed=" << elapsed
195+
<< "ms <= timeout=" << timeout << "ms"
196+
<< ", my_cluster=" << my_cluster << ", waiting for takeover";
197+
return true;
198+
}
199+
}
200+
201+
} // namespace doris

be/src/cloud/cloud_cluster_info.h

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,92 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
#pragma once
19+
20+
#include <atomic>
21+
#include <condition_variable>
22+
#include <mutex>
23+
#include <shared_mutex>
24+
#include <string>
25+
#include <thread>
26+
#include <unordered_map>
27+
1828
#include "runtime/cluster_info.h"
1929

2030
namespace doris {
2131

32+
class CloudTablet;
33+
34+
// Cached cluster status information
35+
struct ClusterStatusCache {
36+
int32_t status {0}; // ClusterStatus enum value
37+
int64_t mtime_ms {0}; // Timestamp when status was last changed
38+
};
39+
2240
class CloudClusterInfo : public ClusterInfo {
2341
public:
24-
bool is_in_standby() const { return _is_in_standby; }
42+
~CloudClusterInfo();
2543

44+
bool is_in_standby() const { return _is_in_standby; }
2645
void set_is_in_standby(bool flag) { _is_in_standby = flag; }
2746

47+
// Get this BE's cluster ID
48+
std::string my_cluster_id() const {
49+
std::shared_lock lock(_mutex);
50+
return _my_cluster_id;
51+
}
52+
void set_my_cluster_id(const std::string& id) {
53+
std::unique_lock lock(_mutex);
54+
_my_cluster_id = id;
55+
}
56+
57+
// Get cached cluster status, returns false if not found
58+
bool get_cluster_status(const std::string& id, ClusterStatusCache* cache) const {
59+
std::shared_lock lock(_mutex);
60+
auto it = _cluster_status_cache.find(id);
61+
if (it != _cluster_status_cache.end()) {
62+
*cache = it->second;
63+
return true;
64+
}
65+
return false;
66+
}
67+
68+
// Update cluster status cache
69+
void set_cluster_status(const std::string& id, int32_t status, int64_t mtime_ms) {
70+
std::unique_lock lock(_mutex);
71+
_cluster_status_cache[id] = {status, mtime_ms};
72+
}
73+
74+
// Clear all cached cluster status
75+
void clear_cluster_status_cache() {
76+
std::unique_lock lock(_mutex);
77+
_cluster_status_cache.clear();
78+
}
79+
80+
// Start background refresh thread
81+
void start_bg_worker();
82+
// Stop background refresh thread
83+
void stop_bg_worker();
84+
85+
// Check if this cluster should skip compaction for the given tablet
86+
// Returns true if should skip (i.e., another cluster should do the compaction)
87+
bool should_skip_compaction(CloudTablet* tablet) const;
88+
2889
private:
90+
void _bg_worker_func();
91+
void _refresh_cluster_status();
92+
2993
bool _is_in_standby = false;
94+
95+
mutable std::shared_mutex _mutex;
96+
std::string _my_cluster_id;
97+
std::unordered_map<std::string, ClusterStatusCache> _cluster_status_cache;
98+
99+
// Background worker
100+
std::thread _bg_worker;
101+
std::atomic<bool> _bg_worker_stopped {true};
102+
std::mutex _bg_worker_mutex;
103+
std::condition_variable _bg_worker_cv;
30104
};
31105

32-
} // namespace doris
106+
} // namespace doris

be/src/cloud/cloud_meta_mgr.cpp

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ static std::string debug_info(const Request& req) {
352352
} else if constexpr (is_any_v<Request, GetTabletRequest>) {
353353
return fmt::format(" tablet_id={}", req.tablet_id());
354354
} else if constexpr (is_any_v<Request, GetObjStoreInfoRequest, ListSnapshotRequest,
355-
GetInstanceRequest>) {
355+
GetInstanceRequest, GetClusterStatusRequest>) {
356356
return "";
357357
} else if constexpr (is_any_v<Request, CreateRowsetRequest>) {
358358
return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id());
@@ -775,6 +775,12 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
775775
tablet->set_cumulative_layer_point(stats.cumulative_point());
776776
tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
777777
stats.num_rows(), stats.data_size());
778+
779+
// Sync last active cluster info for compaction read-write separation
780+
if (config::enable_compaction_rw_separation && stats.has_last_active_cluster_id()) {
781+
tablet->set_last_active_cluster_info(stats.last_active_cluster_id(),
782+
stats.last_active_time_ms());
783+
}
778784
}
779785
return Status::OK();
780786
}
@@ -2260,5 +2266,36 @@ Status CloudMetaMgr::update_packed_file_info(const std::string& packed_file_path
22602266
&cloud::MetaService_Stub::update_packed_file_info);
22612267
}
22622268

2269+
Status CloudMetaMgr::get_cluster_status(
2270+
std::unordered_map<std::string, std::pair<int32_t, int64_t>>* result,
2271+
std::string* my_cluster_id) {
2272+
GetClusterStatusRequest req;
2273+
GetClusterStatusResponse resp;
2274+
req.add_cloud_unique_ids(config::cloud_unique_id);
2275+
2276+
Status s = retry_rpc("get cluster status", req, &resp, &MetaService_Stub::get_cluster_status);
2277+
if (!s.ok()) {
2278+
return s;
2279+
}
2280+
2281+
result->clear();
2282+
for (const auto& detail : resp.details()) {
2283+
for (const auto& cluster : detail.clusters()) {
2284+
// Store cluster status and mtime (mtime is in seconds from MS, convert to ms).
2285+
// If mtime is not set, use current time as a conservative default
2286+
// to avoid immediate takeover due to elapsed being huge.
2287+
int64_t mtime_ms = cluster.has_mtime() ? cluster.mtime() * 1000 : UnixMillis();
2288+
(*result)[cluster.cluster_id()] = {static_cast<int32_t>(cluster.cluster_status()),
2289+
mtime_ms};
2290+
}
2291+
}
2292+
2293+
if (my_cluster_id && resp.has_requester_cluster_id()) {
2294+
*my_cluster_id = resp.requester_cluster_id();
2295+
}
2296+
2297+
return Status::OK();
2298+
}
2299+
22632300
#include "common/compile_check_end.h"
22642301
} // namespace doris::cloud

be/src/cloud/cloud_meta_mgr.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,12 @@ class CloudMetaMgr {
169169
int64_t& max_reserved_snapshots,
170170
int64_t& snapshot_interval_seconds);
171171

172+
// Get all cluster status for the instance
173+
// Returns cluster_id -> (status, mtime_ms)
174+
// If my_cluster_id is not null, also returns the requesting node's cluster_id
175+
Status get_cluster_status(std::unordered_map<std::string, std::pair<int32_t, int64_t>>* result,
176+
std::string* my_cluster_id = nullptr);
177+
172178
private:
173179
bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t old_max_version,
174180
std::ranges::range auto&& rs_metas,

0 commit comments

Comments
 (0)