Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 20 additions & 37 deletions fdbclient/admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (command cliCommand) getTimeout() time.Duration {
return command.timeout
}

return DefaultCLITimeout
return DefaultTimeout
}

// getVersion returns the versions defined in the command or if not present returns the running version of the
Expand Down Expand Up @@ -307,10 +307,10 @@ func (client *cliAdminClient) runCommand(command cliCommand) (string, error) {
func (client *cliAdminClient) getStatusFromCli(
checkForProcesses bool,
) (*fdbv1beta2.FoundationDBStatus, error) {
// Always use the max timeout here. Otherwise we will retry multiple times with an increasing timeout. As the
// Always use the max timeout here. Otherwise, we will retry multiple times with an increasing timeout. As the
// timeout is only the upper bound using directly the max timeout reduces the calls to a single call.
output, err := client.runCommand(
cliCommand{command: "status json", timeout: client.getTimeout()},
cliCommand{command: "status json", timeout: getMaxTimeout(client.timeout)},
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -350,7 +350,7 @@ func (client *cliAdminClient) getStatus() (*fdbv1beta2.FoundationDBStatus, error
func (client *cliAdminClient) GetStatus() (*fdbv1beta2.FoundationDBStatus, error) {
startTime := time.Now()
// This will call directly the database and fetch the status information from the system key space.
status, err := getStatusFromDB(client.fdbLibClient, client.log, client.getTimeout())
status, err := getStatusFromDB(client.fdbLibClient, client.log, getMaxTimeout(client.timeout))
// There is a limitation in the multi version client if the cluster is only partially upgraded e.g. because not
// all fdbserver processes are restarted, then the multi version client sometimes picks the wrong version
// to connect to the cluster. This will result in an empty status only reporting the unreachable coordinators.
Expand Down Expand Up @@ -504,7 +504,7 @@ func (client *cliAdminClient) ExcludeProcessesWithNoWait(
}

// Ensure we are stopping the check after the timeout time.
timeout := time.Now().Add(client.timeout)
timeout := time.Now().Add(getMaxTimeout(client.timeout))
for {
err = client.executeTransactionForManagementAPI(func(tr fdb.Transaction) error {
inProgressKeyRange, err := fdb.PrefixRange(
Expand Down Expand Up @@ -570,7 +570,7 @@ func (client *cliAdminClient) ExcludeProcessesWithNoWait(
excludeCommand.WriteString(getAddressStringsWithoutPorts(addresses))

_, err := client.runCommand(
cliCommand{command: excludeCommand.String(), timeout: client.getTimeout()},
cliCommand{command: excludeCommand.String(), timeout: getMaxTimeout(client.timeout)},
)

return err
Expand Down Expand Up @@ -724,7 +724,10 @@ func (client *cliAdminClient) KillProcesses(addresses []fdbv1beta2.ProcessAddres

// Run the kill command once with the max timeout to reduce the risk of multiple recoveries happening.
_, err := client.runCommand(
cliCommand{command: getKillCommand(addresses, false), timeout: client.getTimeout()},
cliCommand{
command: getKillCommand(addresses, false),
timeout: getMaxTimeout(client.timeout),
},
)

return err
Expand All @@ -743,7 +746,10 @@ func (client *cliAdminClient) KillProcessesForUpgrade(addresses []fdbv1beta2.Pro

// Run the kill command once with the max timeout to reduce the risk of multiple recoveries happening.
_, err := client.runCommand(
cliCommand{command: getKillCommand(addresses, true), timeout: client.getTimeout()},
cliCommand{
command: getKillCommand(addresses, true),
timeout: getMaxTimeout(client.timeout),
},
)

return err
Expand Down Expand Up @@ -784,7 +790,7 @@ func (client *cliAdminClient) ChangeCoordinators(
// Increment the coordinator changes counter for this cluster
metrics.CoordinatorChangesCounter.WithLabelValues(client.Cluster.Namespace, client.Cluster.Name).
Inc()
return getConnectionStringFromDB(client.fdbLibClient, client.getTimeout())
return getConnectionStringFromDB(client.fdbLibClient, getDefaultTimeout(client.timeout))
}

// VersionSupported reports whether we can support a cluster with a given
Expand Down Expand Up @@ -1071,15 +1077,6 @@ func (client *cliAdminClient) SetTimeout(timeout time.Duration) {
client.timeout = timeout
}

// getTimeout will return the timeout that is specified for the admin client or otherwise the MaxCliTimeout.
func (client *cliAdminClient) getTimeout() time.Duration {
if client.timeout == 0 {
return MaxCliTimeout
}

return client.timeout
}

// GetProcessesUnderMaintenance will return all process groups that are currently stored to be under maintenance.
// The result is a map with the process group ID as key and the start of the maintenance as value.
func (client *cliAdminClient) GetProcessesUnderMaintenance() (map[fdbv1beta2.ProcessGroupID]int64, error) {
Expand Down Expand Up @@ -1221,7 +1218,6 @@ func (client *cliAdminClient) UpdatePendingForRemoval(
return client.fdbLibClient.updateGlobalCoordinationKeys(
pendingForRemoval,
updates,
client.timeout,
)
}

Expand All @@ -1232,7 +1228,6 @@ func (client *cliAdminClient) UpdatePendingForExclusion(
return client.fdbLibClient.updateGlobalCoordinationKeys(
pendingForExclusion,
updates,
client.timeout,
)
}

Expand All @@ -1243,7 +1238,6 @@ func (client *cliAdminClient) UpdatePendingForInclusion(
return client.fdbLibClient.updateGlobalCoordinationKeys(
pendingForInclusion,
updates,
client.timeout,
)
}

Expand All @@ -1254,7 +1248,6 @@ func (client *cliAdminClient) UpdatePendingForRestart(
return client.fdbLibClient.updateGlobalCoordinationKeys(
pendingForRestart,
updates,
client.timeout,
)
}

Expand All @@ -1265,7 +1258,6 @@ func (client *cliAdminClient) UpdateReadyForExclusion(
return client.fdbLibClient.updateGlobalCoordinationKeys(
readyForExclusion,
updates,
client.timeout,
)
}

Expand All @@ -1276,7 +1268,6 @@ func (client *cliAdminClient) UpdateReadyForInclusion(
return client.fdbLibClient.updateGlobalCoordinationKeys(
readyForInclusion,
updates,
client.timeout,
)
}

Expand All @@ -1287,7 +1278,6 @@ func (client *cliAdminClient) UpdateReadyForRestart(
return client.fdbLibClient.updateGlobalCoordinationKeys(
readyForRestart,
updates,
client.timeout,
)
}

Expand All @@ -1297,7 +1287,6 @@ func (client *cliAdminClient) GetPendingForRemoval(
) (map[fdbv1beta2.ProcessGroupID]time.Time, error) {
return client.fdbLibClient.getGlobalCoordinationKeys(
path.Join(pendingForRemoval, prefix),
client.timeout,
)
}

Expand All @@ -1307,7 +1296,6 @@ func (client *cliAdminClient) GetPendingForExclusion(
) (map[fdbv1beta2.ProcessGroupID]time.Time, error) {
return client.fdbLibClient.getGlobalCoordinationKeys(
path.Join(pendingForExclusion, prefix),
client.timeout,
)
}

Expand All @@ -1317,7 +1305,6 @@ func (client *cliAdminClient) GetPendingForInclusion(
) (map[fdbv1beta2.ProcessGroupID]time.Time, error) {
return client.fdbLibClient.getGlobalCoordinationKeys(
path.Join(pendingForInclusion, prefix),
client.timeout,
)
}

Expand All @@ -1327,7 +1314,6 @@ func (client *cliAdminClient) GetPendingForRestart(
) (map[fdbv1beta2.ProcessGroupID]time.Time, error) {
return client.fdbLibClient.getGlobalCoordinationKeys(
path.Join(pendingForRestart, prefix),
client.timeout,
)
}

Expand All @@ -1337,7 +1323,6 @@ func (client *cliAdminClient) GetReadyForExclusion(
) (map[fdbv1beta2.ProcessGroupID]time.Time, error) {
return client.fdbLibClient.getGlobalCoordinationKeys(
path.Join(readyForExclusion, prefix),
client.timeout,
)
}

Expand All @@ -1347,7 +1332,6 @@ func (client *cliAdminClient) GetReadyForInclusion(
) (map[fdbv1beta2.ProcessGroupID]time.Time, error) {
return client.fdbLibClient.getGlobalCoordinationKeys(
path.Join(readyForInclusion, prefix),
client.timeout,
)
}

Expand All @@ -1357,37 +1341,36 @@ func (client *cliAdminClient) GetReadyForRestart(
) (map[fdbv1beta2.ProcessGroupID]time.Time, error) {
return client.fdbLibClient.getGlobalCoordinationKeys(
path.Join(readyForRestart, prefix),
client.timeout,
)
}

// ClearReadyForRestart removes all the process group IDs for all the process groups that are ready to be restarted.
func (client *cliAdminClient) ClearReadyForRestart() error {
return client.fdbLibClient.clearGlobalCoordinationKeys(readyForRestart, client.timeout)
return client.fdbLibClient.clearGlobalCoordinationKeys(readyForRestart)
}

func (client *cliAdminClient) UpdateProcessAddresses(
updates map[fdbv1beta2.ProcessGroupID][]string,
) error {
return client.fdbLibClient.updateProcessAddresses(updates, client.timeout)
return client.fdbLibClient.updateProcessAddresses(updates)
}

func (client *cliAdminClient) GetProcessAddresses(
prefix string,
) (map[fdbv1beta2.ProcessGroupID][]string, error) {
return client.fdbLibClient.getProcessAddresses(prefix, client.timeout)
return client.fdbLibClient.getProcessAddresses(prefix)
}

// executeTransactionForManagementAPI will run an operation for the management API. This method handles all the common options.
func (client *cliAdminClient) executeTransactionForManagementAPI(
operation func(transaction fdb.Transaction) error,
) error {
return client.fdbLibClient.executeTransactionForManagementAPI(operation, client.timeout)
return client.fdbLibClient.executeTransactionForManagementAPI(operation)
}

// executeTransaction will run a transaction for the target cluster. This method will handle all the common options.
func (client *cliAdminClient) executeTransaction(
operation func(transaction fdb.Transaction) error,
) error {
return client.fdbLibClient.executeTransaction(operation, client.timeout)
return client.fdbLibClient.executeTransaction(operation)
}
42 changes: 33 additions & 9 deletions fdbclient/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

// DefaultCLITimeout is the default timeout for CLI commands.
var DefaultCLITimeout = 10 * time.Second
// DefaultTimeout is the default timeout for any FDB interactions.
var DefaultTimeout = 10 * time.Second

// MaxCliTimeout is the maximum CLI timeout that will be used for requests that might be slower to respond.
var MaxCliTimeout = 40 * time.Second

const (
defaultTransactionTimeout = 5 * time.Second
)
// MaxTimeout is the maximum timeout that will be used for requests that might be slower to respond.
var MaxTimeout = 40 * time.Second

func parseMachineReadableStatus(
logger logr.Logger,
Expand Down Expand Up @@ -116,7 +112,14 @@ func getFDBDatabase(cluster *fdbv1beta2.FoundationDBCluster) (fdb.Database, erro
return fdb.Database{}, err
}

err = database.Options().SetTransactionTimeout(defaultTransactionTimeout.Milliseconds())
// Sets the default timeout for transaction to the default timeout provided byt the user.
err = database.Options().SetTransactionTimeout(getDefaultTimeout(0).Milliseconds())
if err != nil {
return fdb.Database{}, err
}

// We set a low retry limit as the operator will retry the reconciliation process anyway.
err = database.Options().SetTransactionRetryLimit(1)
if err != nil {
return fdb.Database{}, err
}
Expand Down Expand Up @@ -227,6 +230,27 @@ func cleanConnectionStringOutput(input string) string {
return input[startIdx+1 : endIdx]
}

// getDefaultTimeout will return the timeout that is specified for the client or otherwise the maximum of the DefaultTimeout and MaxTimeout.
// If the provided timeout is higher than MaxTimeout, MaxTimeout will be returned.
func getDefaultTimeout(timeout time.Duration) time.Duration {
result := timeout
if result == 0 {
result = DefaultTimeout
}

return min(result, MaxTimeout)
}

// getMaxTimeout will return the timeout that is specified for the client or otherwise the MaxTimeout.
// If the provided timeout is higher than MaxTimeout, MaxTimeout will be returned.
func getMaxTimeout(timeout time.Duration) time.Duration {
if timeout == 0 {
return MaxTimeout
}

return min(timeout, MaxTimeout)
}

type realDatabaseClientProvider struct {
// log implementation for logging output
log logr.Logger
Expand Down
Loading
Loading