Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/check-operator/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func isReconcileError(conditions []metav1.Condition) error {
// Run use to run the command.
func (o *Options) Run() error {
o.printOutf("Start checking rolling-update status")
checkFunc := func() (bool, error) {
checkFunc := func(ctx context.Context) (bool, error) {
var agentDone, dcaDone, ccrDone, reconcileError bool
var status common.StatusWrapper
o.printOutf("v2alpha1 is available")
Expand Down Expand Up @@ -213,7 +213,7 @@ func (o *Options) Run() error {
return false, nil
}

return wait.Poll(o.checkPeriod, o.checkTimeout, checkFunc)
return wait.PollUntilContextTimeout(context.Background(), o.checkPeriod, o.checkTimeout, false, checkFunc)
}

func (o *Options) isAgentDone(status *v2alpha1.DaemonSetStatus) bool {
Expand Down
5 changes: 2 additions & 3 deletions cmd/helpers/secrets/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -44,7 +43,7 @@ type secretsRequest struct {

// ReadSecrets implements a secrets reader from a directory/mount
func readSecrets(r io.Reader, w io.Writer, dir string) error {
in, err := ioutil.ReadAll(r)
in, err := io.ReadAll(r)
if err != nil {
return err
}
Expand Down Expand Up @@ -142,7 +141,7 @@ func readSecretFile(path string) (string, error) {
}

var bytes []byte
bytes, err = ioutil.ReadAll(file)
bytes, err = io.ReadAll(file)
if err != nil {
return "", err
}
Expand Down
9 changes: 5 additions & 4 deletions cmd/kubectl-datadog/agent/check/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (o *options) validate(cmd *cobra.Command) error {

// run runs the check command
func (o *options) run(cmd *cobra.Command) error {
ctx := cmd.Context()
var goRoutinesCount int
var pods []corev1.Pod

Expand Down Expand Up @@ -180,7 +181,7 @@ func (o *options) run(cmd *cobra.Command) error {
if isCLCRunner(pod) {
container = "cluster-checks-runner"
}
stdOut, stdErr, err := o.execInPod(&pod, statusCmd, container)
stdOut, stdErr, err := o.execInPod(ctx, &pod, statusCmd, container)
if err != nil {
cmd.Println(fmt.Sprintf("Ignoring pod: %s, error: %v", pod.Name, err))
continue
Expand Down Expand Up @@ -220,7 +221,7 @@ func (o *options) run(cmd *cobra.Command) error {
}

// execInPod exec a command in an Agent pod
func (o *options) execInPod(pod *corev1.Pod, cmd []string, container string) (string, string, error) {
func (o *options) execInPod(ctx context.Context, pod *corev1.Pod, command []string, container string) (string, string, error) {
req := o.Clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(pod.Name).
Expand All @@ -234,7 +235,7 @@ func (o *options) execInPod(pod *corev1.Pod, cmd []string, container string) (st

parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&corev1.PodExecOptions{
Command: cmd,
Command: command,
Container: container,
Stdin: false,
Stdout: true,
Expand All @@ -248,7 +249,7 @@ func (o *options) execInPod(pod *corev1.Pod, cmd []string, container string) (st
}

var stdout, stderr bytes.Buffer
err = exec.Stream(remotecommand.StreamOptions{
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: nil,
Stdout: &stdout,
Stderr: &stderr,
Expand Down
20 changes: 10 additions & 10 deletions cmd/kubectl-datadog/flare/flare.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"net/url"
Expand Down Expand Up @@ -169,6 +168,7 @@ func (o *options) validate() error {

// run runs the flare command
func (o *options) run(cmd *cobra.Command) error {
ctx := cmd.Context()
// Prepare base directory
baseDir := filepath.Join(os.TempDir(), "datadog-operator")
if err := os.MkdirAll(baseDir, os.ModePerm); err != nil {
Expand Down Expand Up @@ -207,7 +207,7 @@ func (o *options) run(cmd *cobra.Command) error {
}

// Collect operator version
if err = o.createVersionFile(leaderPod, baseDir, cmd); err != nil {
if err = o.createVersionFile(ctx, leaderPod, baseDir, cmd); err != nil {
cmd.Println(fmt.Sprintf("Couldn't collect operator version: %v", err))
}

Expand All @@ -218,7 +218,7 @@ func (o *options) run(cmd *cobra.Command) error {
}

// Get the operator version
version, err := o.getVersion(leaderPod)
version, err := o.getVersion(ctx, leaderPod)
if err != nil {
cmd.Println(fmt.Sprintf("Couldn't get operator version: %v", err))

Expand Down Expand Up @@ -359,14 +359,14 @@ func (o *options) createStatusFile(pod *corev1.Pod, dir string, cmd *cobra.Comma
}

// createVersionFile gets the version from the operator pod and stores it in a file
func (o *options) createVersionFile(pod *corev1.Pod, dir string, cmd *cobra.Command) error {
func (o *options) createVersionFile(ctx context.Context, pod *corev1.Pod, dir string, cmd *cobra.Command) error {
if pod == nil {
return errors.New("nil leader pod")
}

// Prepare command and execute it
versionCmd := []string{"bash", "-c", "/usr/local/bin/datadog-operator --version --version-format text"}
version, err := o.execInPod(versionCmd, pod)
version, err := o.execInPod(ctx, versionCmd, pod)
if err != nil {
return err
}
Expand All @@ -375,14 +375,14 @@ func (o *options) createVersionFile(pod *corev1.Pod, dir string, cmd *cobra.Comm
}

// getOperatorVersion gets the version from the operator pod
func (o *options) getVersion(pod *corev1.Pod) (string, error) {
func (o *options) getVersion(ctx context.Context, pod *corev1.Pod) (string, error) {
if pod == nil {
return "", errors.New("nil leader pod")
}

// Prepare command and execute it
versionCmd := []string{"bash", "-c", "/usr/local/bin/datadog-operator --version --version-format json"}
versionJSON, err := o.execInPod(versionCmd, pod)
versionJSON, err := o.execInPod(ctx, versionCmd, pod)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -426,7 +426,7 @@ func (o *options) getLeader() (*corev1.Pod, error) {
}

// execInPod execs a given command in a given pod
func (o *options) execInPod(command []string, pod *corev1.Pod) ([]byte, error) {
func (o *options) execInPod(ctx context.Context, command []string, pod *corev1.Pod) ([]byte, error) {
req := o.Clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(pod.Name).
Expand Down Expand Up @@ -459,7 +459,7 @@ func (o *options) execInPod(command []string, pod *corev1.Pod) ([]byte, error) {
}

var stdout bytes.Buffer
if err := exec.Stream(remotecommand.StreamOptions{Stdout: &stdout}); err != nil {
if err := exec.StreamWithContext(ctx, remotecommand.StreamOptions{Stdout: &stdout}); err != nil {
return []byte{}, err
}

Expand Down Expand Up @@ -524,7 +524,7 @@ func (o *options) sendFlare(archivePath, version string, cmd *cobra.Command) (st
}
}()

body, err := ioutil.ReadAll(r.Body)
body, err := io.ReadAll(r.Body)
if err != nil {
return "", err
}
Expand Down
3 changes: 1 addition & 2 deletions hack/generate-docs/generate-docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"sort"
"strings"
Expand Down Expand Up @@ -217,7 +216,7 @@ func mustReadFile(path string) []byte {
}
}()

b, err := ioutil.ReadAll(f)
b, err := io.ReadAll(f)
if err != nil {
panic(fmt.Sprintf("cannot read file %q: %s", path, err))
}
Expand Down
15 changes: 9 additions & 6 deletions pkg/controller/utils/datadog/metrics_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,13 @@ func (mf *metricsForwarder) start(wg *sync.WaitGroup) {

mf.logger.Info("Starting Datadog metrics forwarder")

// wait.PollImmediateUntil is blocking until mf.connectToDatadogAPI returns true or stopChan is closed
// wait.PollImmediateUntil keeps retrying to connect to the Datadog API without returning an error
// wait.PollImmediateUntil returns an error only when stopChan is closed
if err := wait.PollImmediateUntil(mf.retryInterval, mf.connectToDatadogAPI, mf.stopChan); errors.Is(err, wait.ErrWaitTimeout) {
// Create a context that gets cancelled when stopChan is closed
ctx := wait.ContextForChannel(mf.stopChan)

// wait.PollUntilContextCancel is blocking until mf.connectToDatadogAPI returns true or context is cancelled
// wait.PollUntilContextCancel keeps retrying to connect to the Datadog API without returning an error
// wait.PollUntilContextCancel returns an error only when context is cancelled
if err := wait.PollUntilContextCancel(ctx, mf.retryInterval, true, mf.connectToDatadogAPI); errors.Is(err, context.Canceled) {
// stopChan was closed while trying to connect to Datadog API
// The metrics forwarder stopped by the ForwardersManager
mf.logger.Info("Shutting down Datadog metrics forwarder")
Expand Down Expand Up @@ -372,8 +375,8 @@ func (mf *metricsForwarder) setupFromDDAI(ddai *v1alpha1.DatadogAgentInternal) e
}

// connectToDatadogAPI ensures the connection to the Datadog API is valid
// implements wait.ConditionFunc and never returns error to keep retrying
func (mf *metricsForwarder) connectToDatadogAPI() (bool, error) {
// implements wait.ConditionWithContextFunc and never returns error to keep retrying
func (mf *metricsForwarder) connectToDatadogAPI(ctx context.Context) (bool, error) {
var err error
err = mf.setup()

Expand Down
Loading