Skip to content

[Store][Feature]Hot Standby and Oplog Interface for Master Service HA#1515

Open
Libotry wants to merge 18 commits intokvcache-ai:mainfrom
Libotry:br_main_0207_standby_oplog_interface
Open

[Store][Feature]Hot Standby and Oplog Interface for Master Service HA#1515
Libotry wants to merge 18 commits intokvcache-ai:mainfrom
Libotry:br_main_0207_standby_oplog_interface

Conversation

@Libotry
Copy link
Collaborator

@Libotry Libotry commented Feb 7, 2026

Description

Type of Change

  • Types
    • Bug fix
    • New feature
      • Transfer Engine
      • Mooncake Store
      • Mooncake EP
      • Integration
      • P2P Store
      • Python Wheel
    • Breaking change
    • CI/CD
    • Documentation update
    • Other

How Has This Been Tested?

Checklist

  • I have performed a self-review of my own code.
  • I have formatted my own code using ./scripts/code_format.sh before submitting.
  • I have updated the documentation.
  • I have added tests to prove my changes are effective.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @Libotry, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Master Service's High Availability capabilities by introducing a hot standby replication mechanism. It establishes a robust framework for replicating metadata changes from a primary instance to standby instances using an etcd-based operation log. This ensures data consistency and enables rapid failover, thereby improving the overall fault tolerance and reliability of the system. The changes encompass the entire replication pipeline, from logging operations to their application on standby nodes, complete with state management and comprehensive monitoring.

Highlights

  • Hot Standby Service Introduction: Introduces a comprehensive Hot Standby service for the Master Service, enabling High Availability (HA) through etcd-backed OpLog replication and state management.
  • OpLog Management and Persistence: Adds OpLogManager for in-memory operation logging and EtcdOpLogStore for durable persistence of OpLog entries to etcd, supporting batch writes (group commit) and fixed-width sequence ID keys.
  • OpLog Replication and Application: Implements OpLogWatcher to monitor etcd for new OpLog entries and OpLogApplier to apply these entries to the Standby's local metadata store, ensuring global ordering, handling out-of-order entries, and resolving missing entries with retries and timeouts.
  • State Management and Metrics: Integrates a StandbyStateMachine for robust lifecycle management of the Hot Standby service and HAMetricManager for collecting and exposing Prometheus-compatible metrics on replication status, lag, and errors.
  • Data Integrity and Validation: Incorporates xxHash for fast checksum computation on OpLog entry payloads, includes size validation for keys and payloads, and adds uint64_t sequence ID comparison utilities to handle wrap-around scenarios safely.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • dependencies.sh
    • Added libxxhash-dev to system packages, providing a fast hashing library for checksum computations.
  • mooncake-store/include/etcd_oplog_store.h
    • Added EtcdOpLogStore class for storing and retrieving OpLog entries in etcd, including batch update mechanisms for the latest sequence ID and group commit for OpLog writes.
  • mooncake-store/include/ha_metric_manager.h
    • Added HAMetricManager singleton for collecting and exposing High Availability (HA) related metrics, such as OpLog sequence tracking, replication lag, error counters, and performance histograms.
  • mooncake-store/include/hot_standby_service.h
    • Added HotStandbyService class to manage standby replication, including connecting to a primary, applying OpLog entries, verifying data consistency, and promoting to primary status.
  • mooncake-store/include/metadata_store.h
    • Added MetadataStore abstract interface and StandbyObjectMetadata struct to define how metadata is stored and managed on standby nodes.
  • mooncake-store/include/oplog_applier.h
    • Added OpLogApplier class responsible for applying OpLog entries to the standby metadata store, ensuring correct ordering, handling missing entries, and performing data validation.
  • mooncake-store/include/oplog_manager.h
    • Added OpLogManager class for managing in-memory operation logs, including sequence ID allocation, checksum computation, and size validation for OpLog entries.
  • mooncake-store/include/oplog_watcher.h
    • Added OpLogWatcher class to watch etcd for OpLog changes, perform initial synchronization, and forward events to the OpLogApplier with robust reconnection logic.
  • mooncake-store/include/snapshot_provider.h
    • Added SnapshotProvider abstract interface for loading metadata snapshots, facilitating faster bootstrap of standby nodes.
  • mooncake-store/include/standby_state_machine.h
    • Added StandbyStateMachine class to manage the lifecycle and state transitions of the Hot Standby service, providing a clear state model for operational clarity.
  • mooncake-store/include/types.h
    • Added utility functions for uint64_t sequence ID comparison to safely handle wrap-around scenarios.
    • Introduced IsValidClusterIdComponent for validating etcd cluster IDs to prevent key-prefix injection.
    • Included ylt/struct_pack.hpp for efficient binary serialization of metadata payloads.
  • mooncake-store/src/CMakeLists.txt
    • Updated source file list to include new HA-related C++ files.
    • Configured CMake to find and link the xxHash library.
  • mooncake-store/src/etcd_oplog_store.cpp
    • Implemented EtcdOpLogStore functionalities, including fixed-width key generation, Base64 encoding/decoding for binary payloads, and background threads for batch updates and group commits.
  • mooncake-store/src/ha_metric_manager.cpp
    • Implemented HAMetricManager to initialize and manage Prometheus-style metrics for HA components.
  • mooncake-store/src/hot_standby_service.cpp
    • Implemented HotStandbyService logic, including StandbyMetadataStore, snapshot bootstrap, and the main replication and verification loops.
  • mooncake-store/src/oplog_applier.cpp
    • Implemented OpLogApplier logic, focusing on ordered application of OpLog entries, handling pending entries, and resolving gaps by requesting missing data from etcd.
  • mooncake-store/src/oplog_manager.cpp
    • Implemented OpLogManager with xxHash for checksums and prefix hashes, and logic for persisting entries to EtcdOpLogStore.
  • mooncake-store/src/oplog_watcher.cpp
    • Implemented OpLogWatcher with initial sync from etcd, event handling, and reconnection strategies, ensuring consistent watch resumption.
  • mooncake-store/src/standby_state_machine.cpp
    • Implemented StandbyStateMachine with defined state transitions, history tracking, and callback mechanisms for state changes.
  • mooncake-store/tests/hot_standby_ut/etcd_oplog_store_test.cpp
    • Added unit tests for EtcdOpLogStore covering CRUD operations, serialization, fencing, sequence ID management, batch updates, cleanup, and cluster ID validation.
  • mooncake-store/tests/hot_standby_ut/ha_metric_manager_test.cpp
    • Added unit tests for HAMetricManager to verify metric updates, serialization, singleton behavior, and concurrent access.
  • mooncake-store/tests/hot_standby_ut/hot_standby_service_test.cpp
    • Added unit tests for HotStandbyService covering start/stop, state transitions, sync status, promotion, warm start, and metadata operations.
  • mooncake-store/tests/hot_standby_ut/oplog_applier_test.cpp
    • Added unit tests for OpLogApplier covering basic apply, sequence ordering, gap resolution, checksums, size validation, recovery, pending entries, and JSON parsing.
  • mooncake-store/tests/hot_standby_ut/oplog_manager_test.cpp
    • Added unit tests for OpLogManager covering append, sequence ID allocation, persistence, checksums, size validation, and concurrent operations.
  • mooncake-store/tests/hot_standby_ut/oplog_watcher_test.cpp
    • Added unit tests for OpLogWatcher covering start/stop, event handling, reconnection, checksum/size validation, and state callbacks.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive high-availability (HA) mechanism for the Master Service, centered around an etcd-based operational log (oplog). The changes are extensive and well-structured, adding key components like EtcdOpLogStore for persistence, OpLogManager for creating entries, OpLogWatcher for observing changes, OpLogApplier for applying them, and a StandbyStateMachine to manage the lifecycle of a hot standby instance. The design thoughtfully includes features for robustness, such as checksums, group commit, gap detection in the oplog, and a "read-then-watch" pattern for consistency.

My review identified a significant performance issue where an EtcdOpLogStore object is inefficiently created within a loop. I also found some opportunities for code improvement by removing duplicated helper functions and simplifying some logic. Overall, this is a strong feature addition with a solid design and good test coverage.

// Note: `/latest` is batch-updated on Primary, so this is for monitoring only.
#ifdef STORE_USE_ETCD
if (!cluster_id_.empty()) {
EtcdOpLogStore oplog_store(cluster_id_, /*enable_latest_seq_batch_update=*/false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The EtcdOpLogStore is created inside the ReplicationLoop's while loop. This is inefficient as it will be constructed and destructed on every iteration. The constructor for EtcdOpLogStore can be expensive as it may involve etcd operations. This object should be created once before the loop begins to avoid this repeated overhead.

For example:

void HotStandbyService::ReplicationLoop() {
    LOG(INFO) << "Replication loop started (etcd-based OpLog sync)";

#ifdef STORE_USE_ETCD
    std::unique_ptr<EtcdOpLogStore> oplog_store;
    if (!cluster_id_.empty()) {
        oplog_store = std::make_unique<EtcdOpLogStore>(cluster_id_, /*enable_latest_seq_batch_update=*/false);
    }
#endif

    while (IsRunning()) {
        // ...
#ifdef STORE_USE_ETCD
        if (oplog_store) {
            uint64_t latest_seq = 0;
            ErrorCode err = oplog_store->GetLatestSequenceId(latest_seq);
            if (err == ErrorCode::OK) {
                primary_seq_id_.store(latest_seq);
            }
        }
#endif
        // ...
    }
    // ...
}

Comment on lines +567 to +667
// Base64 encoding for binary payload
// JsonCpp treats strings as UTF-8, so we must encode binary data
std::string Base64Encode(const std::string& data) {
static const char base64_chars[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";

std::string result;
result.reserve(((data.size() + 2) / 3) * 4);

size_t i = 0;
size_t data_len = data.size();

// Process 3 bytes at a time
while (i + 2 < data_len) {
uint32_t octet_a = static_cast<unsigned char>(data[i++]);
uint32_t octet_b = static_cast<unsigned char>(data[i++]);
uint32_t octet_c = static_cast<unsigned char>(data[i++]);

uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c;

result.push_back(base64_chars[(triple >> 18) & 0x3F]);
result.push_back(base64_chars[(triple >> 12) & 0x3F]);
result.push_back(base64_chars[(triple >> 6) & 0x3F]);
result.push_back(base64_chars[triple & 0x3F]);
}

// Handle remaining bytes
size_t remaining = data_len - i;
if (remaining > 0) {
uint32_t octet_a = static_cast<unsigned char>(data[i++]);
uint32_t octet_b = (remaining > 1) ? static_cast<unsigned char>(data[i++]) : 0;
uint32_t octet_c = 0;

uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c;

result.push_back(base64_chars[(triple >> 18) & 0x3F]);
result.push_back(base64_chars[(triple >> 12) & 0x3F]);
result.push_back((remaining > 1) ? base64_chars[(triple >> 6) & 0x3F] : '=');
result.push_back('=');
}

return result;
}

std::string Base64Decode(const std::string& encoded) {
static const unsigned char decode_table[256] = {
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 62, 64, 64, 64, 63,
52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 64, 64, 64, 64, 64, 64,
64, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 64, 64, 64, 64, 64,
64, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64
};

std::string result;
result.reserve((encoded.size() * 3) / 4);

size_t i = 0;
while (i < encoded.size()) {
// Skip whitespace and invalid chars
while (i < encoded.size() && (encoded[i] == ' ' || encoded[i] == '\n' || encoded[i] == '\r' || encoded[i] == '\t')) {
i++;
}
if (i >= encoded.size()) break;

uint32_t sextet_a = decode_table[static_cast<unsigned char>(encoded[i++])];
if (i >= encoded.size() || sextet_a == 64) break;

uint32_t sextet_b = decode_table[static_cast<unsigned char>(encoded[i++])];
if (sextet_b == 64) break;

uint32_t sextet_c = (i < encoded.size()) ? decode_table[static_cast<unsigned char>(encoded[i++])] : 64;
uint32_t sextet_d = (i < encoded.size()) ? decode_table[static_cast<unsigned char>(encoded[i++])] : 64;

uint32_t triple = (sextet_a << 18) | (sextet_b << 12) |
((sextet_c != 64) ? (sextet_c << 6) : 0) |
((sextet_d != 64) ? sextet_d : 0);

result.push_back(static_cast<char>((triple >> 16) & 0xFF));
if (sextet_c != 64) {
result.push_back(static_cast<char>((triple >> 8) & 0xFF));
}
if (sextet_d != 64) {
result.push_back(static_cast<char>(triple & 0xFF));
}
}

return result;
}

} // namespace
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Base64Encode and Base64Decode helper functions are also implemented in mooncake-store/src/oplog_watcher.cpp. This code duplication should be avoided. Consider moving these functions to a common utility file (e.g., utils.h/.cpp) and calling them from both places to improve maintainability. Using a well-tested third-party library for Base64 encoding/decoding would be even more robust if one is available in the project's dependencies.

Comment on lines +454 to +472
MetadataPayload payload;
bool parse_success = false;
auto result = struct_pack::deserialize_to(payload, entry.payload);
if (result == struct_pack::errc::ok) {
parse_success = true;
} else {
LOG(ERROR) << "OpLogApplier: failed to deserialize payload for key=" << entry.object_key
<< ", sequence_id=" << entry.sequence_id
<< ", payload_size=" << entry.payload.size()
<< ", error_code=" << static_cast<int>(result);
}

if (!parse_success) {
// Fallback to empty metadata if parsing fails
StandbyObjectMetadata empty_metadata;
empty_metadata.last_sequence_id = entry.sequence_id;
metadata_store_->PutMetadata(entry.object_key, empty_metadata);
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for deserializing the payload and handling failure can be simplified. The parse_success boolean variable is redundant. You can directly check the result of struct_pack::deserialize_to and handle the failure case within the same if block.

    MetadataPayload payload;
    auto result = struct_pack::deserialize_to(payload, entry.payload);
    if (result != struct_pack::errc::ok) {
        LOG(ERROR) << "OpLogApplier: failed to deserialize payload for key=" << entry.object_key
                   << ", sequence_id=" << entry.sequence_id
                   << ", payload_size=" << entry.payload.size()
                   << ", error_code=" << static_cast<int>(result);
        // Fallback to empty metadata if parsing fails
        StandbyObjectMetadata empty_metadata;
        empty_metadata.last_sequence_id = entry.sequence_id;
        metadata_store_->PutMetadata(entry.object_key, empty_metadata);
        return;
    }

@Libotry Libotry force-pushed the br_main_0207_standby_oplog_interface branch from 6021d94 to 7a63de5 Compare February 7, 2026 09:43
@codecov-commenter
Copy link

codecov-commenter commented Feb 7, 2026

@00fish0 00fish0 self-assigned this Feb 8, 2026
@Libotry Libotry force-pushed the br_main_0207_standby_oplog_interface branch from 5d23699 to dc40185 Compare February 9, 2026 02:39
std::vector<OpLogEntry>& entries,
EtcdRevisionId& revision_id) {
#ifdef STORE_USE_ETCD
EtcdOpLogStore oplog_store(cluster_id_,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EtcdOpLogStore constructor performs etcd I/O (checks the /latest key) and starts the BatchWriteThread background thread. ReadOpLogSince is called multiple times (within a loop) in StartFromSequenceId and SyncMissedEntries, and creating and destroying it each time incurs significant overhead. It is recommended to lazily initialize it as a class member variable, similar to the approach of OpLogApplier::GetEtcdOpLogStore().

@ykwd
Copy link
Collaborator

ykwd commented Feb 11, 2026

Cool! I believe this is another step toward our HA capabilities, which many users have been eagerly expecting. I’ll take a close look together with @00fish0.

@Libotry Libotry force-pushed the br_main_0207_standby_oplog_interface branch from 0994319 to 4480702 Compare February 12, 2026 03:32
@00fish0
Copy link
Collaborator

00fish0 commented Feb 13, 2026

Thanks for your great work! I'm currently reviewing these changes and expect to finish the review in 2-3 days. Will get back to you soon.

Copy link
Collaborator

@00fish0 00fish0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution! I've left some initial comments. Looking forward to your thoughts.

return oss.str();
}

std::string EtcdOpLogStore::SerializeOpLogEntry(const OpLogEntry& entry) const {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we introduce compression (e.g., zstd, already applied in the Snapshot PR #1431 ) for Oplog to reduce upload payload size?

  1. For large payloads (>1KB): zstd can potentially reducing etcd upload time greatly and improve QPS.
  2. Trade-off: Compression adds CPU overhead, but I guess this is often negligible compared to network I/O savings.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A great suggestion, but considering the workload and schedule, we won't address it this time. It can be resolved in the next PR

}
}

ErrorCode EtcdOpLogStore::WriteOpLog(const OpLogEntry& entry, bool sync) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpLog write performance is primarily bottlenecked by the etcd upload phase, with throughput heavily influenced by Oplog payload size. The current serialization chain for an OpLogEntry written to etcd is:

  1. Inner payload → struct_pack::serialize → binary bytes
  2. Binary payload → base64::Encode → ASCII string (+33% size inflation)
  3. Outer OpLogEntry → Json::StreamWriterBuilder → JSON string

This results in significant size overhead.

It might be good to introduce these changes:

  1. Use struct_pack for the outer OpLogEntry serialization (instead of JSON)

Using JSON only forces a base64 encoding step. BTW, using struct_pack can keep consistent with the inner payload serialization method (struct_pack).

  1. Eliminate base64 encoding

With struct_pack as the outer format, the payload field (a std::string) is serialized as length-prefixed raw bytes — natively binary-safe. The base64::Encode/ base64::Decode steps (and the utils/base64.h dependency) become unnecessary, removing the 33% size inflation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A great suggestion, but considering the workload and schedule, we won't address it this time. It can be resolved in the next PR

Comment on lines +453 to +473
// Basic DoS protection: validate key/payload sizes before further
// processing.
std::string size_reason;
if (!OpLogManager::ValidateEntrySize(entry, &size_reason)) {
LOG(ERROR) << "OpLog entry size rejected: sequence_id="
<< entry.sequence_id << ", key=" << entry.object_key
<< ", reason=" << size_reason;
consecutive_errors_.fetch_add(1);
return;
}

// Verify checksum to detect data corruption or tampering.
if (!OpLogManager::VerifyChecksum(entry)) {
LOG(ERROR)
<< "OpLog entry checksum mismatch: sequence_id="
<< entry.sequence_id << ", key=" << entry.object_key
<< ". Possible data corruption or tampering. Discarding entry.";
consecutive_errors_.fetch_add(1);
HAMetricManager::instance().inc_oplog_checksum_failures();
return;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HandleWatchEvent here executes ValidateEntrySize and VerifyChecksum, but these are also performed inside ApplyOpLogEntry. Could we perform these checks once?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation — these checks are indeed duplicated between HandleWatchEvent and ApplyOpLogEntry.

However, I'd prefer to keep both layers for now:

ApplyOpLogEntry is the last line of defense. It's not only called from HandleWatchEvent — it's also called from SyncMissedEntries (reconnect path) and internally from RequestMissingOpLog (gap-fill path). Removing the checks there would leave those callers unprotected.

HandleWatchEvent benefits from early rejection. The pre-checks here let us accurately attribute the failure to the watch path (via consecutive_errors_ and specific log context), which would be lost if we only relied on ApplyOpLogEntry's false return — that return value is shared with out-of-order/pending cases.

To properly deduplicate, we'd need ApplyOpLogEntry to return a fine-grained result enum (e.g., kApplied, kOutOfOrder, kInvalidSize, kChecksumMismatch, ...) so callers can distinguish validation failures from ordering issues. That's a worthwhile refactor but probably out of scope for this PR.


std::vector<OpLogEntry> entries;
EtcdRevisionId rev = 0;
if (!ReadOpLogSince(last_seq, entries, rev)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that SyncMissedEntries currently calls ReadOpLogSince only once, which might miss data if the gap exceeds 1000 entries.

Maybe we can align its logic with StartFromSequenceId by using a for loop to ensure all missed entries are synced.

for (;;) {
    ReadOpLogSince(read_seq_id, batch, rev);
    ...
    if (batch.size() < kSyncBatchSize) break;
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if (waited.count() >= kMissingEntrySkipSeconds) {
skipped_sequence_ids_[missing_seq] = now;
missing_sequence_ids_.erase(missing_seq);
expected_sequence_id_.store(missing_seq + 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

expected_sequence_id_ is updated with unconditional store in ApplyOpLogEntry (~line 171) and ProcessPendingEntries (~line 318). Since both the Go callback thread and watch_thread_ can reach these paths concurrently, there's a theoretical window where a lower-sequence store overwrites a higher-sequence one, causing a momentary regression.

Suggest replacing with a monotonic CAS loop (same pattern as last_processed_sequence_id_ in OpLogWatcher ~line 480):

uint64_t new_val = entry.sequence_id + 1;
uint64_t cur = expected_sequence_id_.load();
while (new_val > cur &&
       !expected_sequence_id_.compare_exchange_weak(cur, new_val)) {}

Not blocking.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the careful analysis! I agree this is theoretically sound — unconditional store is less safe than a monotonic CAS in a multi-threaded context.

However, after tracing the actual control flow, I believe there is no real regression window in practice:

  1. ApplyOpLogEntry (line 171): The store is only reached when entry.sequence_id == expected (guarded by the ordering checks at line 90). Two threads cannot simultaneously pass this guard for different sequence IDs, since expected is monotonically increasing and only one entry can match at a time.

  2. ProcessPendingEntries — skip path (line 247): This store is inside the pending_mutex_ lock, which serializes access.

  3. ProcessPendingEntries — pending apply path (line 318): Although the store is outside the lock, the preceding IsSequenceEqual(it->first, expected) check + erase inside the lock ensures only one thread can claim a given pending entry and advance expected.

In terms of concurrency, the Go watch callback delivers events sequentially (one at a time), so concurrent ApplyOpLogEntry calls don't happen. The only real concurrency is between the Go callback's ApplyOpLogEntry (which internally calls ProcessPendingEntries) and the watch_thread_'s periodic ProcessPendingEntries poll — and both are serialized by the guards described above.

That said, replacing with a monotonic CAS loop is a reasonable defensive hardening — it makes the invariant (never regress) self-evident in the code rather than relying on external control-flow reasoning. Happy to apply the change if we want the extra safety margin. What do you think?

Comment on lines +229 to +232
if (!oplog_watcher_->StartFromSequenceId(last_applied_seq_id)) {
LOG(WARNING) << "Failed to start OpLogWatcher from sequence_id="
<< last_applied_seq_id << ", continuing anyway";
state_machine_.ProcessEvent(StandbyEvent::SYNC_FAILED);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When StartFromSequenceId fails, the state machine transitions to RECONNECTING, but nothing in ReplicationLoop or elsewhere drives the recovery — no CONNECTED event is ever fired, and ReplicationLoop just spins on the !IsConnected() sleep branch. The service is stuck.

Could we either returning an error from Start() to fail fast, or adding a retry/reconnect path that re-invokes StartFromSequenceId?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — this is a real bug. I traced the full state flow:
When StartFromSequenceId fails, SYNC_FAILED transitions to RECONNECTING. However, RECONNECTING can only exit via CONNECTED, MAX_ERRORS_REACHED, FATAL_ERROR, or STOP. Nothing in ReplicationLoop or OpLogWatcher ever fires a CONNECTED event in this path — ReplicationLoop just spins on !IsConnected() sleep, and OpLogWatcher never emits CONNECTED. So the service is permanently stuck.
I think the cleanest fix is: if StartFromSequenceId fails, we should not start the background threads at all and return an error from Start(), letting the caller decide whether to retry. The watcher already has its own internal reconnection logic once running, so the initial StartFromSequenceId failure is the only case that falls through the cracks.
Alternatively, we could add a retry loop around StartFromSequenceId in Start() with bounded attempts, similar to how OpLogWatcher::TryReconnect works internally. Will fix this.

@whybeyoung
Copy link
Collaborator

shall we add some doc for this feature ?

@Libotry Libotry force-pushed the br_main_0207_standby_oplog_interface branch from c966d90 to 1db2055 Compare March 3, 2026 09:24
@Libotry Libotry force-pushed the br_main_0207_standby_oplog_interface branch from 1db2055 to b66079b Compare March 3, 2026 11:43
Comment on lines +185 to +189
if (!enable_latest_seq_batch_update_) {
// Direct update (may be slow, but it's what config asked for)
// Warning: This is now done AFTER op log write, which is correct order.
return UpdateLatestSequenceId(entry.sequence_id);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we append the /latest key-value pair to the same batch in FlushBatch, so both the OpLog entries and the pointer update land in a single Raft proposal?

// In FlushBatch(), after preparing keys/values from current_batch:
keys.push_back(BuildLatestKey());
values.push_back(std::to_string(max_seq_id));
EtcdHelper::BatchPut(keys, values);  // single Raft proposal

It seems that the implementation now roughly doubles the etcd write amplification for every batch.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We've already moved the /latest update into FlushBatch() (line 286), so it's now 2 Raft proposals per batch (1 BatchCreate + 1 Put for /latest) instead of N+1.

Merging into a single proposal isn't feasible with the current Go wrapper — EtcdStoreBatchCreateWrapper uses Txn(If CreateRevision==0), which would fail on the pre-existing /latest key. This would require a new Go-level mixed-Txn wrapper.

The practical impact of 2 vs 1 proposal is minimal. If a crash occurs between them, GetMaxSequenceId() (key-scan fallback) ensures no data loss during failover. Will track the single-proposal optimization as a follow-up once we add a mixed-Txn Go wrapper.

Comment on lines +201 to +218
void EtcdOpLogStore::BatchWriteThread() {
while (batch_write_running_.load()) {
{
std::unique_lock<std::mutex> lock(batch_mutex_);
if (pending_batch_.empty()) {
// Wait for signal or timeout (Group Commit time window)
cv_batch_updated_.wait_for(
lock, std::chrono::milliseconds(kOpLogBatchTimeoutMs));
}

if (!batch_write_running_.load() && pending_batch_.empty()) {
break;
}
}

FlushBatch();
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the pipeline is like:

Time: |--accumulate--|---etcd IO---|--accumulate--|---etcd IO---|
      batch_1        batch_1        batch_2        batch_2

Ideally, we want accumulation and IO to overlap:

Time: |--accumulate--|--accumulate--|--accumulate--|
      batch_1        batch_2        batch_3
                     |---etcd IO---|---etcd IO---|
                     batch_1        batch_2

Suggestion (for future optimization): Consider a double-buffering or async-submit approach:

Double-buffer: Swap pending_batch_ into a local buffer (already done), then submit the IO on a separate thread or via async etcd client, allowing new entries to accumulate into pending_batch_ concurrently.
Async etcd client: Use the gRPC async API or a thread pool to submit BatchPut without blocking the accumulation loop.

This is a larger refactor. Not blocking this PR.

Comment on lines +271 to +275
auto now = std::chrono::steady_clock::now();
std::lock_guard<std::mutex> lock(mutex_);
return std::chrono::duration_cast<std::chrono::milliseconds>(
now - state_enter_time_);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetTimeInCurrentState() captures now before acquiring mutex_, but state_enter_time_ is updated inside ProcessEvent() while holding the same mutex. If a state transition happens between the now capture and the lock acquisition, state_enter_time_ can be newer than now, producing a negative duration that wraps to a huge value (unsigned).

Consider move now inside the lock.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines +222 to +224
if (transition_history_.size() > kMaxHistorySize) {
transition_history_.erase(transition_history_.begin());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the history exceeds kMaxHistorySize (1000), erasing the first element copies all remaining elements. Under rapid state transitions (e.g., RECONNECTING flapping), this becomes O(n) per transition.

Should we use std::deque with pop_front() (O(1) amortized), or a fixed-size ring buffer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically correct — vector::erase(begin()) is O(n). However, state transitions are inherently low-frequency events (seconds to minutes apart, even during RECONNECTING flapping). With kMaxHistorySize = 1000 and TransitionRecord being a small struct (timestamp + enums), moving 999 elements costs single-digit microseconds — negligible compared to the etcd I/O and LOG() calls in the same code path. Switching to std::deque would also lose contiguous memory layout, which benefits GetTransitionHistory()'s vector copy on return. Will keep as-is for simplicity; happy to revisit if profiling ever shows this as a hotspot.

* ┌─────────────┐ │
* │ PROMOTED │───────────────────────────────────┘
* └─────────────┘
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ASCII state diagram in the header shows:

│   SYNCING   │─── Error/Gap ───►│RECOVERING│

But the actual transition table sends SYNC_FAILED and DISCONNECTED from SYNCING to RECONNECTING, not RECOVERING. Either the diagram or the code is wrong — please reconcile.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with the graph has been fixed

Comment on lines +147 to +149
config_.primary_address = primary_address;
etcd_endpoints_ = etcd_endpoints;
cluster_id_ = cluster_id;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we defer field assignment until after successful connection(after ConnectToEtcdStoreClient)?

If ConnectToEtcdStoreClient fails, CONNECTION_FAILED is sent and the method returns an error — but those fields retain values from this failed attempt.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fields are unconditionally overwritten on every Start() call (L147-149), so stale values from a failed attempt are always replaced by the next Start(). Additionally, no code path reads these fields in FAILED state for any meaningful operation. Keeping them set is actually useful for debugging (logging which endpoint/cluster failed) and avoids complicating a potential retry flow.

etcd_oplog_store_ = etcd_oplog_store;
}

uint64_t OpLogManager::Append(OpType type, const std::string& key,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify the intended semantics between Append() and AppendAndPersist() for REMOVE/PUT_REVOKE?

In Append(), non-PUT_END entries use sync=true, but if WriteOpLog fails, it only logs a warning and still returns a sequence_id.
AppendAndPersist(), on the other hand, surfaces the error to the caller and enables retry.

Do we guarantee that all REMOVE/PUT_REVOKE paths use AppendAndPersist() only?
If any path still uses Append(), is there a risk of “local delete succeeded but delete OpLog was not durably persisted”?

Would it make sense to either:

  1. Disallow REMOVE/PUT_REVOKE in Append() to prevent misuse, or
  2. Make Append() fail/propagate error for those operation types when persistence fails?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the observation. The upper-layer callers (not part of this PR) already ensure that REMOVE/PUT_REVOKE operations go through AppendAndPersist() rather than Append(), so there is no risk of silent persistence failure in practice.

(void)EtcdHelper::WaitWatchWithPrefixStopped(watch_prefix.c_str(),
watch_prefix.size(),
/*timeout_ms=*/5000);
NotifyStateEvent(StandbyEvent::WATCH_BROKEN);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a state-machine / watcher event mismatch here.

OpLogWatcher emits WATCH_BROKEN when the watch fails, then emits RECOVERY_SUCCESS and eventually WATCH_HEALTHY after reconnect.
However, in StandbyStateMachine, RECONNECTING only accepts CONNECTED (plus STOP/FATAL/MAX_ERRORS), not RECOVERY_SUCCESS or WATCH_HEALTHY.

So after a normal watch break, the service can enter RECONNECTING and never transition back to WATCHING, even though watch/apply has actually recovered.

Could we align this contract by either:

  1. making RECONNECTING accept RECOVERY_SUCCESS / WATCH_HEALTHY, or
  2. having watcher emit CONNECTED on successful reconnect?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated as per your suggestion.

Comment on lines +87 to +90
if (!ReadOpLogSince(read_seq_id, batch, rev)) {
last_read_rev = 0;
break;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ReadOpLogSince(read_seq_id, batch, rev) fails, the code sets last_read_rev = 0 and breaks, but the function still continues to start the watch thread and returns true.

This means startup can be treated as successful (HotStandbyService moves to SYNC_COMPLETE/WATCHING) even though historical replay was incomplete. With last_read_rev == 0, next_watch_revision_ falls back to 0, so watch may start “from now” and silently miss older OpLog entries.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

(void)StartFromSequenceId(last_processed_sequence_id_.load());
}

bool OpLogWatcher::StartFromSequenceId(uint64_t start_seq_id) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify the intended logic in StartFromSequenceId() here?

read_seq_id and last_processed_sequence_id_ are only advanced when applier_->ApplyOpLogEntry(e) returns true. From the current OpLogApplier implementation, false can also mean “buffered for later because of a gap”, not only a hard failure.

Can you explain the expected behavior for these cases during initial sync?

  • Why is the read cursor only advanced on successful apply?
  • If an entry is buffered in pending_entries_ (or otherwise not applied immediately), how do we avoid re-reading the same range on the next ReadOpLogSince() call?
  • What invariant is last_processed_sequence_id_ supposed to represent here before we transition to watch mode?

I may be missing an assumption, but as written it looks possible to either keep re-reading the same entries or carry a gap into the watch phase.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation, but no fix needed in this PR. read_seq_id is only advanced on successful apply, which means if there is a sequence gap in etcd, the same range could be re-read. However this does not cause an infinite loop because the batch will be smaller than kSyncBatchSize when etcd has no more data, so the loop exits normally. The redundant reads are idempotent (pending_entries_ map overwrites with same key). Any remaining gaps are resolved during the watch phase via ProcessPendingEntries. We can optimize the cursor advancement as a follow-up.

@00fish0
Copy link
Collaborator

00fish0 commented Mar 13, 2026

Great work on the fixes! I think we are almost there. Could you please rebase onto the latest main to resolve the conflict in CMakeLists.txt?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants