Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion cmd/cloudflared/tunnel/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,15 @@ func StartServer(
ctx, cancel := context.WithCancel(c.Context)
defer cancel()

go waitForSignal(graceShutdownC, log)
// reloadC is used to trigger configuration reloads via SIGHUP.
// Channel is created here but waitForSignal is started later, after localWatcher
// is ready to consume from reloadC (to avoid race condition).
var reloadC chan struct{}
configPath := c.String("config")
if configPath != "" && c.String(TunnelTokenFlag) == "" {
// Only enable hot reload for locally configured tunnels (not token-based)
reloadC = make(chan struct{}, 1)
}

if c.IsSet(cfdflags.ProxyDns) {
dnsReadySignal := make(chan struct{})
Expand Down Expand Up @@ -489,6 +497,20 @@ func StartServer(
return err
}

// Start local config watcher for hot reload if enabled
if reloadC != nil {
localWatcher := orchestration.NewLocalConfigWatcher(orchestrator, configPath, log)
readyC := localWatcher.Run(ctx, reloadC)
<-readyC // Wait until watcher is ready to receive signals
} else if configPath == "" {
log.Debug().Msg("Configuration hot reload disabled: no config file specified")
} else {
log.Debug().Msg("Configuration hot reload disabled: token-based tunnel")
}

// Start signal handler after localWatcher is ready to avoid race condition
go waitForSignal(graceShutdownC, reloadC, log)

metricsListener, err := metrics.CreateMetricsListener(&listeners, c.String("metrics"))
if err != nil {
log.Err(err).Msg("Error opening metrics server listener")
Expand Down
36 changes: 28 additions & 8 deletions cmd/cloudflared/tunnel/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,36 @@ import (
"github.com/rs/zerolog"
)

// waitForSignal closes graceShutdownC to indicate that we should start graceful shutdown sequence
func waitForSignal(graceShutdownC chan struct{}, logger *zerolog.Logger) {
// waitForSignal handles OS signals for graceful shutdown and configuration reload.
// It closes graceShutdownC on SIGTERM/SIGINT to trigger graceful shutdown.
// If reloadC is provided, SIGHUP will send a reload signal instead of being ignored.
func waitForSignal(graceShutdownC chan struct{}, reloadC chan<- struct{}, logger *zerolog.Logger) {
signals := make(chan os.Signal, 10)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)
defer signal.Stop(signals)

select {
case s := <-signals:
logger.Info().Msgf("Initiating graceful shutdown due to signal %s ...", s)
close(graceShutdownC)
case <-graceShutdownC:
for {
select {
case s := <-signals:
switch s {
case syscall.SIGHUP:
if reloadC != nil {
logger.Info().Msg("Received SIGHUP, triggering configuration reload")
select {
case reloadC <- struct{}{}:
default:
logger.Warn().Msg("Configuration reload already in progress, skipping")
}
} else {
logger.Info().Msg("Received SIGHUP but hot reload is not enabled for this tunnel")
}
case syscall.SIGTERM, syscall.SIGINT:
logger.Info().Msgf("Initiating graceful shutdown due to signal %s ...", s)
close(graceShutdownC)
return
}
case <-graceShutdownC:
return
}
}
}
114 changes: 113 additions & 1 deletion cmd/cloudflared/tunnel/signal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,123 @@ func TestSignalShutdown(t *testing.T) {
}
})

waitForSignal(graceShutdownC, &log)
waitForSignal(graceShutdownC, nil, &log)
assert.True(t, channelClosed(graceShutdownC))
}
}

func TestSignalSIGHUP_WithReloadChannel(t *testing.T) {
log := zerolog.Nop()

graceShutdownC := make(chan struct{})
reloadC := make(chan struct{}, 1)

go func() {
// sleep for a tick to prevent sending signal before calling waitForSignal
time.Sleep(tick)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGHUP)
// Give time for signal to be processed
time.Sleep(tick)
// Send SIGTERM to exit waitForSignal
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()

time.AfterFunc(time.Second, func() {
select {
case <-graceShutdownC:
default:
close(graceShutdownC)
t.Fatal("waitForSignal timed out")
}
})

waitForSignal(graceShutdownC, reloadC, &log)

// Check that reload signal was received
select {
case <-reloadC:
// Expected - SIGHUP should trigger reload
default:
t.Fatal("Expected reload channel to receive signal from SIGHUP")
}
}

func TestSignalSIGHUP_WithoutReloadChannel(t *testing.T) {
log := zerolog.Nop()

graceShutdownC := make(chan struct{})

go func() {
// sleep for a tick to prevent sending signal before calling waitForSignal
time.Sleep(tick)
// Send SIGHUP without reload channel - should be ignored
_ = syscall.Kill(syscall.Getpid(), syscall.SIGHUP)
time.Sleep(tick)
// Send SIGTERM to exit waitForSignal
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()

time.AfterFunc(time.Second, func() {
select {
case <-graceShutdownC:
default:
close(graceShutdownC)
t.Fatal("waitForSignal timed out")
}
})

// Should complete without panic or deadlock
waitForSignal(graceShutdownC, nil, &log)
assert.True(t, channelClosed(graceShutdownC))
}

func TestSignalSIGHUP_ReloadInProgress(t *testing.T) {
log := zerolog.Nop()

graceShutdownC := make(chan struct{})
// Create buffered channel and fill it
reloadC := make(chan struct{}, 1)
reloadC <- struct{}{} // Pre-fill to simulate reload in progress

go func() {
// sleep for a tick to prevent sending signal before calling waitForSignal
time.Sleep(tick)
// Send SIGHUP while reload is "in progress"
_ = syscall.Kill(syscall.Getpid(), syscall.SIGHUP)
time.Sleep(tick)
// Send SIGTERM to exit waitForSignal
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()

time.AfterFunc(time.Second, func() {
select {
case <-graceShutdownC:
default:
close(graceShutdownC)
t.Fatal("waitForSignal timed out")
}
})

// Should complete without blocking (non-blocking send)
waitForSignal(graceShutdownC, reloadC, &log)

// Channel should still have exactly one signal (the pre-filled one)
select {
case <-reloadC:
// Expected - drain the one signal
default:
t.Fatal("Expected reload channel to have signal")
}

// Should be empty now
select {
case <-reloadC:
t.Fatal("Expected reload channel to be empty after draining")
default:
// Expected - channel is empty
}
}

func TestWaitForShutdown(t *testing.T) {
log := zerolog.Nop()

Expand Down
92 changes: 92 additions & 0 deletions orchestration/local_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package orchestration

import (
"encoding/json"
"os"

"github.com/pkg/errors"
"gopkg.in/yaml.v3"

"github.com/cloudflare/cloudflared/config"
"github.com/cloudflare/cloudflared/ingress"
)

// LocalConfigJSON represents the JSON format expected by Orchestrator.UpdateConfig.
// It mirrors ingress.RemoteConfigJSON structure.
type LocalConfigJSON struct {
GlobalOriginRequest *config.OriginRequestConfig `json:"originRequest,omitempty"`
IngressRules []config.UnvalidatedIngressRule `json:"ingress"`
WarpRouting config.WarpRoutingConfig `json:"warp-routing"`
}

// ReadLocalConfig reads and parses the local YAML configuration file.
func ReadLocalConfig(configPath string) (*config.Configuration, error) {
file, err := os.Open(configPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to open config file %s", configPath)
}
defer file.Close()

var cfg config.Configuration
if err := yaml.NewDecoder(file).Decode(&cfg); err != nil {
return nil, errors.Wrapf(err, "failed to parse YAML config file %s", configPath)
}

return &cfg, nil
}

// ConvertLocalConfigToJSON converts local YAML configuration to JSON format
// expected by Orchestrator.UpdateConfig.
func ConvertLocalConfigToJSON(cfg *config.Configuration) ([]byte, error) {
if cfg == nil {
return nil, errors.New("config cannot be nil")
}

localJSON := LocalConfigJSON{
GlobalOriginRequest: &cfg.OriginRequest,
IngressRules: cfg.Ingress,
WarpRouting: cfg.WarpRouting,
}

data, err := json.Marshal(localJSON)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal config to JSON")
}

return data, nil
}

// ValidateLocalConfig validates the local configuration by attempting to parse
// ingress rules. Returns nil if valid.
func ValidateLocalConfig(cfg *config.Configuration) error {
_, err := ConvertAndValidateLocalConfig(cfg)
return err
}

// ConvertAndValidateLocalConfig converts local config to JSON and validates it
// in a single pass. Returns JSON bytes if valid, error otherwise.
func ConvertAndValidateLocalConfig(cfg *config.Configuration) ([]byte, error) {
data, err := ConvertLocalConfigToJSON(cfg)
if err != nil {
return nil, err
}

// Skip validation if no ingress rules
if len(cfg.Ingress) == 0 {
return data, nil
}

// Validate catch-all rule exists (last rule must have empty hostname or "*")
lastRule := cfg.Ingress[len(cfg.Ingress)-1]
if lastRule.Hostname != "" && lastRule.Hostname != "*" {
return nil, errors.New("ingress rules must end with a catch-all rule (empty hostname or '*')")
}

// Validate by attempting to parse as RemoteConfig
var remoteConfig ingress.RemoteConfig
if err := json.Unmarshal(data, &remoteConfig); err != nil {
return nil, errors.Wrap(err, "invalid ingress configuration")
}

return data, nil
}
Loading