[Store][Feature]Hot Standby and Oplog Interface for Master Service HA#1515
[Store][Feature]Hot Standby and Oplog Interface for Master Service HA#1515Libotry wants to merge 18 commits intokvcache-ai:mainfrom
Conversation
Summary of ChangesHello @Libotry, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Master Service's High Availability capabilities by introducing a hot standby replication mechanism. It establishes a robust framework for replicating metadata changes from a primary instance to standby instances using an etcd-based operation log. This ensures data consistency and enables rapid failover, thereby improving the overall fault tolerance and reliability of the system. The changes encompass the entire replication pipeline, from logging operations to their application on standby nodes, complete with state management and comprehensive monitoring. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive high-availability (HA) mechanism for the Master Service, centered around an etcd-based operational log (oplog). The changes are extensive and well-structured, adding key components like EtcdOpLogStore for persistence, OpLogManager for creating entries, OpLogWatcher for observing changes, OpLogApplier for applying them, and a StandbyStateMachine to manage the lifecycle of a hot standby instance. The design thoughtfully includes features for robustness, such as checksums, group commit, gap detection in the oplog, and a "read-then-watch" pattern for consistency.
My review identified a significant performance issue where an EtcdOpLogStore object is inefficiently created within a loop. I also found some opportunities for code improvement by removing duplicated helper functions and simplifying some logic. Overall, this is a strong feature addition with a solid design and good test coverage.
| // Note: `/latest` is batch-updated on Primary, so this is for monitoring only. | ||
| #ifdef STORE_USE_ETCD | ||
| if (!cluster_id_.empty()) { | ||
| EtcdOpLogStore oplog_store(cluster_id_, /*enable_latest_seq_batch_update=*/false); |
There was a problem hiding this comment.
The EtcdOpLogStore is created inside the ReplicationLoop's while loop. This is inefficient as it will be constructed and destructed on every iteration. The constructor for EtcdOpLogStore can be expensive as it may involve etcd operations. This object should be created once before the loop begins to avoid this repeated overhead.
For example:
void HotStandbyService::ReplicationLoop() {
LOG(INFO) << "Replication loop started (etcd-based OpLog sync)";
#ifdef STORE_USE_ETCD
std::unique_ptr<EtcdOpLogStore> oplog_store;
if (!cluster_id_.empty()) {
oplog_store = std::make_unique<EtcdOpLogStore>(cluster_id_, /*enable_latest_seq_batch_update=*/false);
}
#endif
while (IsRunning()) {
// ...
#ifdef STORE_USE_ETCD
if (oplog_store) {
uint64_t latest_seq = 0;
ErrorCode err = oplog_store->GetLatestSequenceId(latest_seq);
if (err == ErrorCode::OK) {
primary_seq_id_.store(latest_seq);
}
}
#endif
// ...
}
// ...
}| // Base64 encoding for binary payload | ||
| // JsonCpp treats strings as UTF-8, so we must encode binary data | ||
| std::string Base64Encode(const std::string& data) { | ||
| static const char base64_chars[] = | ||
| "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; | ||
|
|
||
| std::string result; | ||
| result.reserve(((data.size() + 2) / 3) * 4); | ||
|
|
||
| size_t i = 0; | ||
| size_t data_len = data.size(); | ||
|
|
||
| // Process 3 bytes at a time | ||
| while (i + 2 < data_len) { | ||
| uint32_t octet_a = static_cast<unsigned char>(data[i++]); | ||
| uint32_t octet_b = static_cast<unsigned char>(data[i++]); | ||
| uint32_t octet_c = static_cast<unsigned char>(data[i++]); | ||
|
|
||
| uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c; | ||
|
|
||
| result.push_back(base64_chars[(triple >> 18) & 0x3F]); | ||
| result.push_back(base64_chars[(triple >> 12) & 0x3F]); | ||
| result.push_back(base64_chars[(triple >> 6) & 0x3F]); | ||
| result.push_back(base64_chars[triple & 0x3F]); | ||
| } | ||
|
|
||
| // Handle remaining bytes | ||
| size_t remaining = data_len - i; | ||
| if (remaining > 0) { | ||
| uint32_t octet_a = static_cast<unsigned char>(data[i++]); | ||
| uint32_t octet_b = (remaining > 1) ? static_cast<unsigned char>(data[i++]) : 0; | ||
| uint32_t octet_c = 0; | ||
|
|
||
| uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c; | ||
|
|
||
| result.push_back(base64_chars[(triple >> 18) & 0x3F]); | ||
| result.push_back(base64_chars[(triple >> 12) & 0x3F]); | ||
| result.push_back((remaining > 1) ? base64_chars[(triple >> 6) & 0x3F] : '='); | ||
| result.push_back('='); | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| std::string Base64Decode(const std::string& encoded) { | ||
| static const unsigned char decode_table[256] = { | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 62, 64, 64, 64, 63, | ||
| 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 64, 64, 64, 64, 64, 64, | ||
| 64, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, | ||
| 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 64, 64, 64, 64, 64, | ||
| 64, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, | ||
| 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64 | ||
| }; | ||
|
|
||
| std::string result; | ||
| result.reserve((encoded.size() * 3) / 4); | ||
|
|
||
| size_t i = 0; | ||
| while (i < encoded.size()) { | ||
| // Skip whitespace and invalid chars | ||
| while (i < encoded.size() && (encoded[i] == ' ' || encoded[i] == '\n' || encoded[i] == '\r' || encoded[i] == '\t')) { | ||
| i++; | ||
| } | ||
| if (i >= encoded.size()) break; | ||
|
|
||
| uint32_t sextet_a = decode_table[static_cast<unsigned char>(encoded[i++])]; | ||
| if (i >= encoded.size() || sextet_a == 64) break; | ||
|
|
||
| uint32_t sextet_b = decode_table[static_cast<unsigned char>(encoded[i++])]; | ||
| if (sextet_b == 64) break; | ||
|
|
||
| uint32_t sextet_c = (i < encoded.size()) ? decode_table[static_cast<unsigned char>(encoded[i++])] : 64; | ||
| uint32_t sextet_d = (i < encoded.size()) ? decode_table[static_cast<unsigned char>(encoded[i++])] : 64; | ||
|
|
||
| uint32_t triple = (sextet_a << 18) | (sextet_b << 12) | | ||
| ((sextet_c != 64) ? (sextet_c << 6) : 0) | | ||
| ((sextet_d != 64) ? sextet_d : 0); | ||
|
|
||
| result.push_back(static_cast<char>((triple >> 16) & 0xFF)); | ||
| if (sextet_c != 64) { | ||
| result.push_back(static_cast<char>((triple >> 8) & 0xFF)); | ||
| } | ||
| if (sextet_d != 64) { | ||
| result.push_back(static_cast<char>(triple & 0xFF)); | ||
| } | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| } // namespace |
There was a problem hiding this comment.
The Base64Encode and Base64Decode helper functions are also implemented in mooncake-store/src/oplog_watcher.cpp. This code duplication should be avoided. Consider moving these functions to a common utility file (e.g., utils.h/.cpp) and calling them from both places to improve maintainability. Using a well-tested third-party library for Base64 encoding/decoding would be even more robust if one is available in the project's dependencies.
| MetadataPayload payload; | ||
| bool parse_success = false; | ||
| auto result = struct_pack::deserialize_to(payload, entry.payload); | ||
| if (result == struct_pack::errc::ok) { | ||
| parse_success = true; | ||
| } else { | ||
| LOG(ERROR) << "OpLogApplier: failed to deserialize payload for key=" << entry.object_key | ||
| << ", sequence_id=" << entry.sequence_id | ||
| << ", payload_size=" << entry.payload.size() | ||
| << ", error_code=" << static_cast<int>(result); | ||
| } | ||
|
|
||
| if (!parse_success) { | ||
| // Fallback to empty metadata if parsing fails | ||
| StandbyObjectMetadata empty_metadata; | ||
| empty_metadata.last_sequence_id = entry.sequence_id; | ||
| metadata_store_->PutMetadata(entry.object_key, empty_metadata); | ||
| return; | ||
| } |
There was a problem hiding this comment.
The logic for deserializing the payload and handling failure can be simplified. The parse_success boolean variable is redundant. You can directly check the result of struct_pack::deserialize_to and handle the failure case within the same if block.
MetadataPayload payload;
auto result = struct_pack::deserialize_to(payload, entry.payload);
if (result != struct_pack::errc::ok) {
LOG(ERROR) << "OpLogApplier: failed to deserialize payload for key=" << entry.object_key
<< ", sequence_id=" << entry.sequence_id
<< ", payload_size=" << entry.payload.size()
<< ", error_code=" << static_cast<int>(result);
// Fallback to empty metadata if parsing fails
StandbyObjectMetadata empty_metadata;
empty_metadata.last_sequence_id = entry.sequence_id;
metadata_store_->PutMetadata(entry.object_key, empty_metadata);
return;
}6021d94 to
7a63de5
Compare
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
5d23699 to
dc40185
Compare
mooncake-store/src/oplog_watcher.cpp
Outdated
| std::vector<OpLogEntry>& entries, | ||
| EtcdRevisionId& revision_id) { | ||
| #ifdef STORE_USE_ETCD | ||
| EtcdOpLogStore oplog_store(cluster_id_, |
There was a problem hiding this comment.
The EtcdOpLogStore constructor performs etcd I/O (checks the /latest key) and starts the BatchWriteThread background thread. ReadOpLogSince is called multiple times (within a loop) in StartFromSequenceId and SyncMissedEntries, and creating and destroying it each time incurs significant overhead. It is recommended to lazily initialize it as a class member variable, similar to the approach of OpLogApplier::GetEtcdOpLogStore().
|
Cool! I believe this is another step toward our HA capabilities, which many users have been eagerly expecting. I’ll take a close look together with @00fish0. |
0994319 to
4480702
Compare
|
Thanks for your great work! I'm currently reviewing these changes and expect to finish the review in 2-3 days. Will get back to you soon. |
00fish0
left a comment
There was a problem hiding this comment.
Thanks for your contribution! I've left some initial comments. Looking forward to your thoughts.
| return oss.str(); | ||
| } | ||
|
|
||
| std::string EtcdOpLogStore::SerializeOpLogEntry(const OpLogEntry& entry) const { |
There was a problem hiding this comment.
Should we introduce compression (e.g., zstd, already applied in the Snapshot PR #1431 ) for Oplog to reduce upload payload size?
- For large payloads (>1KB): zstd can potentially reducing etcd upload time greatly and improve QPS.
- Trade-off: Compression adds CPU overhead, but I guess this is often negligible compared to network I/O savings.
There was a problem hiding this comment.
A great suggestion, but considering the workload and schedule, we won't address it this time. It can be resolved in the next PR
| } | ||
| } | ||
|
|
||
| ErrorCode EtcdOpLogStore::WriteOpLog(const OpLogEntry& entry, bool sync) { |
There was a problem hiding this comment.
OpLog write performance is primarily bottlenecked by the etcd upload phase, with throughput heavily influenced by Oplog payload size. The current serialization chain for an OpLogEntry written to etcd is:
- Inner payload → struct_pack::serialize → binary bytes
- Binary payload → base64::Encode → ASCII string (+33% size inflation)
- Outer OpLogEntry → Json::StreamWriterBuilder → JSON string
This results in significant size overhead.
It might be good to introduce these changes:
- Use struct_pack for the outer OpLogEntry serialization (instead of JSON)
Using JSON only forces a base64 encoding step. BTW, using struct_pack can keep consistent with the inner payload serialization method (struct_pack).
- Eliminate base64 encoding
With struct_pack as the outer format, the payload field (a std::string) is serialized as length-prefixed raw bytes — natively binary-safe. The base64::Encode/ base64::Decode steps (and the utils/base64.h dependency) become unnecessary, removing the 33% size inflation.
There was a problem hiding this comment.
A great suggestion, but considering the workload and schedule, we won't address it this time. It can be resolved in the next PR
| // Basic DoS protection: validate key/payload sizes before further | ||
| // processing. | ||
| std::string size_reason; | ||
| if (!OpLogManager::ValidateEntrySize(entry, &size_reason)) { | ||
| LOG(ERROR) << "OpLog entry size rejected: sequence_id=" | ||
| << entry.sequence_id << ", key=" << entry.object_key | ||
| << ", reason=" << size_reason; | ||
| consecutive_errors_.fetch_add(1); | ||
| return; | ||
| } | ||
|
|
||
| // Verify checksum to detect data corruption or tampering. | ||
| if (!OpLogManager::VerifyChecksum(entry)) { | ||
| LOG(ERROR) | ||
| << "OpLog entry checksum mismatch: sequence_id=" | ||
| << entry.sequence_id << ", key=" << entry.object_key | ||
| << ". Possible data corruption or tampering. Discarding entry."; | ||
| consecutive_errors_.fetch_add(1); | ||
| HAMetricManager::instance().inc_oplog_checksum_failures(); | ||
| return; | ||
| } |
There was a problem hiding this comment.
HandleWatchEvent here executes ValidateEntrySize and VerifyChecksum, but these are also performed inside ApplyOpLogEntry. Could we perform these checks once?
There was a problem hiding this comment.
Good observation — these checks are indeed duplicated between HandleWatchEvent and ApplyOpLogEntry.
However, I'd prefer to keep both layers for now:
ApplyOpLogEntry is the last line of defense. It's not only called from HandleWatchEvent — it's also called from SyncMissedEntries (reconnect path) and internally from RequestMissingOpLog (gap-fill path). Removing the checks there would leave those callers unprotected.
HandleWatchEvent benefits from early rejection. The pre-checks here let us accurately attribute the failure to the watch path (via consecutive_errors_ and specific log context), which would be lost if we only relied on ApplyOpLogEntry's false return — that return value is shared with out-of-order/pending cases.
To properly deduplicate, we'd need ApplyOpLogEntry to return a fine-grained result enum (e.g., kApplied, kOutOfOrder, kInvalidSize, kChecksumMismatch, ...) so callers can distinguish validation failures from ordering issues. That's a worthwhile refactor but probably out of scope for this PR.
mooncake-store/src/oplog_watcher.cpp
Outdated
|
|
||
| std::vector<OpLogEntry> entries; | ||
| EtcdRevisionId rev = 0; | ||
| if (!ReadOpLogSince(last_seq, entries, rev)) { |
There was a problem hiding this comment.
It seems that SyncMissedEntries currently calls ReadOpLogSince only once, which might miss data if the gap exceeds 1000 entries.
Maybe we can align its logic with StartFromSequenceId by using a for loop to ensure all missed entries are synced.
for (;;) {
ReadOpLogSince(read_seq_id, batch, rev);
...
if (batch.size() < kSyncBatchSize) break;
}
| if (waited.count() >= kMissingEntrySkipSeconds) { | ||
| skipped_sequence_ids_[missing_seq] = now; | ||
| missing_sequence_ids_.erase(missing_seq); | ||
| expected_sequence_id_.store(missing_seq + 1); |
There was a problem hiding this comment.
Nit:
expected_sequence_id_ is updated with unconditional store in ApplyOpLogEntry (~line 171) and ProcessPendingEntries (~line 318). Since both the Go callback thread and watch_thread_ can reach these paths concurrently, there's a theoretical window where a lower-sequence store overwrites a higher-sequence one, causing a momentary regression.
Suggest replacing with a monotonic CAS loop (same pattern as last_processed_sequence_id_ in OpLogWatcher ~line 480):
uint64_t new_val = entry.sequence_id + 1;
uint64_t cur = expected_sequence_id_.load();
while (new_val > cur &&
!expected_sequence_id_.compare_exchange_weak(cur, new_val)) {}
Not blocking.
There was a problem hiding this comment.
Thanks for the careful analysis! I agree this is theoretically sound — unconditional store is less safe than a monotonic CAS in a multi-threaded context.
However, after tracing the actual control flow, I believe there is no real regression window in practice:
-
ApplyOpLogEntry(line 171): Thestoreis only reached whenentry.sequence_id == expected(guarded by the ordering checks at line 90). Two threads cannot simultaneously pass this guard for different sequence IDs, sinceexpectedis monotonically increasing and only one entry can match at a time. -
ProcessPendingEntries— skip path (line 247): Thisstoreis inside thepending_mutex_lock, which serializes access. -
ProcessPendingEntries— pending apply path (line 318): Although thestoreis outside the lock, the precedingIsSequenceEqual(it->first, expected)check +eraseinside the lock ensures only one thread can claim a given pending entry and advanceexpected.
In terms of concurrency, the Go watch callback delivers events sequentially (one at a time), so concurrent ApplyOpLogEntry calls don't happen. The only real concurrency is between the Go callback's ApplyOpLogEntry (which internally calls ProcessPendingEntries) and the watch_thread_'s periodic ProcessPendingEntries poll — and both are serialized by the guards described above.
That said, replacing with a monotonic CAS loop is a reasonable defensive hardening — it makes the invariant (never regress) self-evident in the code rather than relying on external control-flow reasoning. Happy to apply the change if we want the extra safety margin. What do you think?
| if (!oplog_watcher_->StartFromSequenceId(last_applied_seq_id)) { | ||
| LOG(WARNING) << "Failed to start OpLogWatcher from sequence_id=" | ||
| << last_applied_seq_id << ", continuing anyway"; | ||
| state_machine_.ProcessEvent(StandbyEvent::SYNC_FAILED); |
There was a problem hiding this comment.
When StartFromSequenceId fails, the state machine transitions to RECONNECTING, but nothing in ReplicationLoop or elsewhere drives the recovery — no CONNECTED event is ever fired, and ReplicationLoop just spins on the !IsConnected() sleep branch. The service is stuck.
Could we either returning an error from Start() to fail fast, or adding a retry/reconnect path that re-invokes StartFromSequenceId?
There was a problem hiding this comment.
Good catch — this is a real bug. I traced the full state flow:
When StartFromSequenceId fails, SYNC_FAILED transitions to RECONNECTING. However, RECONNECTING can only exit via CONNECTED, MAX_ERRORS_REACHED, FATAL_ERROR, or STOP. Nothing in ReplicationLoop or OpLogWatcher ever fires a CONNECTED event in this path — ReplicationLoop just spins on !IsConnected() sleep, and OpLogWatcher never emits CONNECTED. So the service is permanently stuck.
I think the cleanest fix is: if StartFromSequenceId fails, we should not start the background threads at all and return an error from Start(), letting the caller decide whether to retry. The watcher already has its own internal reconnection logic once running, so the initial StartFromSequenceId failure is the only case that falls through the cracks.
Alternatively, we could add a retry loop around StartFromSequenceId in Start() with bounded attempts, similar to how OpLogWatcher::TryReconnect works internally. Will fix this.
|
shall we add some doc for this feature ? |
c966d90 to
1db2055
Compare
1db2055 to
b66079b
Compare
| if (!enable_latest_seq_batch_update_) { | ||
| // Direct update (may be slow, but it's what config asked for) | ||
| // Warning: This is now done AFTER op log write, which is correct order. | ||
| return UpdateLatestSequenceId(entry.sequence_id); | ||
| } |
There was a problem hiding this comment.
Could we append the /latest key-value pair to the same batch in FlushBatch, so both the OpLog entries and the pointer update land in a single Raft proposal?
// In FlushBatch(), after preparing keys/values from current_batch:
keys.push_back(BuildLatestKey());
values.push_back(std::to_string(max_seq_id));
EtcdHelper::BatchPut(keys, values); // single Raft proposal
It seems that the implementation now roughly doubles the etcd write amplification for every batch.
There was a problem hiding this comment.
Good point. We've already moved the /latest update into FlushBatch() (line 286), so it's now 2 Raft proposals per batch (1 BatchCreate + 1 Put for /latest) instead of N+1.
Merging into a single proposal isn't feasible with the current Go wrapper — EtcdStoreBatchCreateWrapper uses Txn(If CreateRevision==0), which would fail on the pre-existing /latest key. This would require a new Go-level mixed-Txn wrapper.
The practical impact of 2 vs 1 proposal is minimal. If a crash occurs between them, GetMaxSequenceId() (key-scan fallback) ensures no data loss during failover. Will track the single-proposal optimization as a follow-up once we add a mixed-Txn Go wrapper.
| void EtcdOpLogStore::BatchWriteThread() { | ||
| while (batch_write_running_.load()) { | ||
| { | ||
| std::unique_lock<std::mutex> lock(batch_mutex_); | ||
| if (pending_batch_.empty()) { | ||
| // Wait for signal or timeout (Group Commit time window) | ||
| cv_batch_updated_.wait_for( | ||
| lock, std::chrono::milliseconds(kOpLogBatchTimeoutMs)); | ||
| } | ||
|
|
||
| if (!batch_write_running_.load() && pending_batch_.empty()) { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| FlushBatch(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Now the pipeline is like:
Time: |--accumulate--|---etcd IO---|--accumulate--|---etcd IO---|
batch_1 batch_1 batch_2 batch_2
Ideally, we want accumulation and IO to overlap:
Time: |--accumulate--|--accumulate--|--accumulate--|
batch_1 batch_2 batch_3
|---etcd IO---|---etcd IO---|
batch_1 batch_2
Suggestion (for future optimization): Consider a double-buffering or async-submit approach:
Double-buffer: Swap pending_batch_ into a local buffer (already done), then submit the IO on a separate thread or via async etcd client, allowing new entries to accumulate into pending_batch_ concurrently.
Async etcd client: Use the gRPC async API or a thread pool to submit BatchPut without blocking the accumulation loop.
This is a larger refactor. Not blocking this PR.
| auto now = std::chrono::steady_clock::now(); | ||
| std::lock_guard<std::mutex> lock(mutex_); | ||
| return std::chrono::duration_cast<std::chrono::milliseconds>( | ||
| now - state_enter_time_); | ||
| } |
There was a problem hiding this comment.
GetTimeInCurrentState() captures now before acquiring mutex_, but state_enter_time_ is updated inside ProcessEvent() while holding the same mutex. If a state transition happens between the now capture and the lock acquisition, state_enter_time_ can be newer than now, producing a negative duration that wraps to a huge value (unsigned).
Consider move now inside the lock.
| if (transition_history_.size() > kMaxHistorySize) { | ||
| transition_history_.erase(transition_history_.begin()); | ||
| } |
There was a problem hiding this comment.
When the history exceeds kMaxHistorySize (1000), erasing the first element copies all remaining elements. Under rapid state transitions (e.g., RECONNECTING flapping), this becomes O(n) per transition.
Should we use std::deque with pop_front() (O(1) amortized), or a fixed-size ring buffer?
There was a problem hiding this comment.
Technically correct — vector::erase(begin()) is O(n). However, state transitions are inherently low-frequency events (seconds to minutes apart, even during RECONNECTING flapping). With kMaxHistorySize = 1000 and TransitionRecord being a small struct (timestamp + enums), moving 999 elements costs single-digit microseconds — negligible compared to the etcd I/O and LOG() calls in the same code path. Switching to std::deque would also lose contiguous memory layout, which benefits GetTransitionHistory()'s vector copy on return. Will keep as-is for simplicity; happy to revisit if profiling ever shows this as a hotspot.
| * ┌─────────────┐ │ | ||
| * │ PROMOTED │───────────────────────────────────┘ | ||
| * └─────────────┘ | ||
| */ |
There was a problem hiding this comment.
The ASCII state diagram in the header shows:
│ SYNCING │─── Error/Gap ───►│RECOVERING│
But the actual transition table sends SYNC_FAILED and DISCONNECTED from SYNCING to RECONNECTING, not RECOVERING. Either the diagram or the code is wrong — please reconcile.
There was a problem hiding this comment.
The issue with the graph has been fixed
| config_.primary_address = primary_address; | ||
| etcd_endpoints_ = etcd_endpoints; | ||
| cluster_id_ = cluster_id; |
There was a problem hiding this comment.
Should we defer field assignment until after successful connection(after ConnectToEtcdStoreClient)?
If ConnectToEtcdStoreClient fails, CONNECTION_FAILED is sent and the method returns an error — but those fields retain values from this failed attempt.
There was a problem hiding this comment.
These fields are unconditionally overwritten on every Start() call (L147-149), so stale values from a failed attempt are always replaced by the next Start(). Additionally, no code path reads these fields in FAILED state for any meaningful operation. Keeping them set is actually useful for debugging (logging which endpoint/cluster failed) and avoids complicating a potential retry flow.
| etcd_oplog_store_ = etcd_oplog_store; | ||
| } | ||
|
|
||
| uint64_t OpLogManager::Append(OpType type, const std::string& key, |
There was a problem hiding this comment.
Could you clarify the intended semantics between Append() and AppendAndPersist() for REMOVE/PUT_REVOKE?
In Append(), non-PUT_END entries use sync=true, but if WriteOpLog fails, it only logs a warning and still returns a sequence_id.
AppendAndPersist(), on the other hand, surfaces the error to the caller and enables retry.
Do we guarantee that all REMOVE/PUT_REVOKE paths use AppendAndPersist() only?
If any path still uses Append(), is there a risk of “local delete succeeded but delete OpLog was not durably persisted”?
Would it make sense to either:
- Disallow
REMOVE/PUT_REVOKEinAppend()to prevent misuse, or - Make
Append()fail/propagate error for those operation types when persistence fails?
There was a problem hiding this comment.
Thanks for the observation. The upper-layer callers (not part of this PR) already ensure that REMOVE/PUT_REVOKE operations go through AppendAndPersist() rather than Append(), so there is no risk of silent persistence failure in practice.
| (void)EtcdHelper::WaitWatchWithPrefixStopped(watch_prefix.c_str(), | ||
| watch_prefix.size(), | ||
| /*timeout_ms=*/5000); | ||
| NotifyStateEvent(StandbyEvent::WATCH_BROKEN); |
There was a problem hiding this comment.
I think there is a state-machine / watcher event mismatch here.
OpLogWatcher emits WATCH_BROKEN when the watch fails, then emits RECOVERY_SUCCESS and eventually WATCH_HEALTHY after reconnect.
However, in StandbyStateMachine, RECONNECTING only accepts CONNECTED (plus STOP/FATAL/MAX_ERRORS), not RECOVERY_SUCCESS or WATCH_HEALTHY.
So after a normal watch break, the service can enter RECONNECTING and never transition back to WATCHING, even though watch/apply has actually recovered.
Could we align this contract by either:
- making
RECONNECTINGacceptRECOVERY_SUCCESS/WATCH_HEALTHY, or - having watcher emit
CONNECTEDon successful reconnect?
There was a problem hiding this comment.
Updated as per your suggestion.
| if (!ReadOpLogSince(read_seq_id, batch, rev)) { | ||
| last_read_rev = 0; | ||
| break; | ||
| } |
There was a problem hiding this comment.
If ReadOpLogSince(read_seq_id, batch, rev) fails, the code sets last_read_rev = 0 and breaks, but the function still continues to start the watch thread and returns true.
This means startup can be treated as successful (HotStandbyService moves to SYNC_COMPLETE/WATCHING) even though historical replay was incomplete. With last_read_rev == 0, next_watch_revision_ falls back to 0, so watch may start “from now” and silently miss older OpLog entries.
| (void)StartFromSequenceId(last_processed_sequence_id_.load()); | ||
| } | ||
|
|
||
| bool OpLogWatcher::StartFromSequenceId(uint64_t start_seq_id) { |
There was a problem hiding this comment.
Could you clarify the intended logic in StartFromSequenceId() here?
read_seq_id and last_processed_sequence_id_ are only advanced when applier_->ApplyOpLogEntry(e) returns true. From the current OpLogApplier implementation, false can also mean “buffered for later because of a gap”, not only a hard failure.
Can you explain the expected behavior for these cases during initial sync?
- Why is the read cursor only advanced on successful apply?
- If an entry is buffered in
pending_entries_(or otherwise not applied immediately), how do we avoid re-reading the same range on the nextReadOpLogSince()call? - What invariant is
last_processed_sequence_id_supposed to represent here before we transition to watch mode?
I may be missing an assumption, but as written it looks possible to either keep re-reading the same entries or carry a gap into the watch phase.
There was a problem hiding this comment.
Good observation, but no fix needed in this PR. read_seq_id is only advanced on successful apply, which means if there is a sequence gap in etcd, the same range could be re-read. However this does not cause an infinite loop because the batch will be smaller than kSyncBatchSize when etcd has no more data, so the loop exits normally. The redundant reads are idempotent (pending_entries_ map overwrites with same key). Any remaining gaps are resolved during the watch phase via ProcessPendingEntries. We can optimize the cursor advancement as a follow-up.
|
Great work on the fixes! I think we are almost there. Could you please rebase onto the latest main to resolve the conflict in CMakeLists.txt? |
Description
Type of Change
How Has This Been Tested?
Checklist
./scripts/code_format.shbefore submitting.