Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/lk/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ var (
ArgsUsage: "[working-dir]",
},
privateLinkCommands,
simulateCommand,
},
},
}
Expand Down
99 changes: 99 additions & 0 deletions cmd/lk/agent_reload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package main

import (
"fmt"
"net"
"sync"
"time"

agent "github.com/livekit/protocol/livekit/agent"

"github.com/livekit/livekit-cli/v2/pkg/ipc"
)

// reloadServer manages the dev-mode reload protocol between Go and Python processes.
// Flow:
// 1. Go → old Python: GetRunningJobsRequest → receives GetRunningJobsResponse (capture)
// 2. New Python → Go: GetRunningJobsRequest → Go replies with saved GetRunningJobsResponse (restore)
type reloadServer struct {
listener *ipc.Listener
mu sync.Mutex
savedJobs *agent.GetRunningAgentJobsResponse
}

func newReloadServer() (*reloadServer, error) {
ln, err := ipc.Listen("127.0.0.1:0")
if err != nil {
return nil, fmt.Errorf("reload server: %w", err)
}
return &reloadServer{listener: ln}, nil
}

func (rs *reloadServer) addr() string {
return rs.listener.Addr().String()
}

// captureJobs sends GetRunningJobsRequest to the old Python process and stores the response.
func (rs *reloadServer) captureJobs(conn net.Conn) {
conn.SetDeadline(time.Now().Add(1500 * time.Millisecond))
defer conn.SetDeadline(time.Time{})

req := &agent.AgentDevMessage{
Message: &agent.AgentDevMessage_GetRunningJobsRequest{
GetRunningJobsRequest: &agent.GetRunningAgentJobsRequest{},
},
}
if err := ipc.WriteProto(conn, req); err != nil {
fmt.Printf("reload: failed to send capture request: %v\n", err)
return
}

resp := &agent.AgentDevMessage{}
if err := ipc.ReadProto(conn, resp); err != nil {
fmt.Printf("reload: failed to read capture response: %v\n", err)
return
}

if jobs := resp.GetGetRunningJobsResponse(); jobs != nil {
rs.mu.Lock()
rs.savedJobs = jobs
rs.mu.Unlock()
fmt.Printf("reload: captured %d running job(s)\n", len(jobs.Jobs))
}
}

// serveNewProcess handles a GetRunningJobsRequest from the new Python process,
// replying with the previously captured jobs.
func (rs *reloadServer) serveNewProcess(conn net.Conn) {
req := &agent.AgentDevMessage{}
if err := ipc.ReadProto(conn, req); err != nil {
return
}
if req.GetGetRunningJobsRequest() == nil {
return
}

rs.mu.Lock()
saved := rs.savedJobs
rs.savedJobs = nil
rs.mu.Unlock()

if saved == nil {
saved = &agent.GetRunningAgentJobsResponse{}
}

resp := &agent.AgentDevMessage{
Message: &agent.AgentDevMessage_GetRunningJobsResponse{
GetRunningJobsResponse: saved,
},
}
if err := ipc.WriteProto(conn, resp); err != nil {
fmt.Printf("reload: failed to send restore response: %v\n", err)
} else if len(saved.Jobs) > 0 {
fmt.Printf("reload: restored %d job(s) to new process\n", len(saved.Jobs))
}
}

func (rs *reloadServer) close() error {
return rs.listener.Close()
}
254 changes: 254 additions & 0 deletions cmd/lk/agent_run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// Copyright 2025 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"

"github.com/urfave/cli/v3"

"github.com/livekit/livekit-cli/v2/pkg/agentfs"
)

func init() {
AgentCommands[0].Commands = append(AgentCommands[0].Commands, startCommand, devCommand)
}

var agentRunFlags = []cli.Flag{
&cli.StringFlag{
Name: "entrypoint",
Usage: "Agent entrypoint `FILE` (default: auto-detect)",
},
&cli.StringFlag{
Name: "url",
Usage: "LiveKit server `URL`",
Sources: cli.EnvVars("LIVEKIT_URL"),
},
&cli.StringFlag{
Name: "api-key",
Usage: "LiveKit API `KEY`",
Sources: cli.EnvVars("LIVEKIT_API_KEY"),
},
&cli.StringFlag{
Name: "api-secret",
Usage: "LiveKit API `SECRET`",
Sources: cli.EnvVars("LIVEKIT_API_SECRET"),
},
&cli.StringFlag{
Name: "log-level",
Usage: "Log level (TRACE, DEBUG, INFO, WARN, ERROR)",
},
}

var startCommand = &cli.Command{
Name: "start",
Usage: "Run an agent in production mode",
Flags: agentRunFlags,
Action: runAgentStart,
}

var devCommand = &cli.Command{
Name: "dev",
Usage: "Run an agent in development mode with auto-reload",
Flags: append(agentRunFlags, &cli.BoolFlag{
Name: "no-reload",
Usage: "Disable auto-reload on file changes",
}),
Action: runAgentDev,
}

// resolveCredentials returns CLI args (--url, --api-key, --api-secret) for the agent subprocess.
func resolveCredentials(cmd *cli.Command, loadOpts ...loadOption) ([]string, error) {
url := cmd.String("url")
apiKey := cmd.String("api-key")
apiSecret := cmd.String("api-secret")

// Try project config if any are missing
if url == "" || apiKey == "" || apiSecret == "" {
opts := append([]loadOption{ignoreURL}, loadOpts...)
pc, err := loadProjectDetails(cmd, opts...)
if err != nil {
return nil, err
}
if pc != nil {
if url == "" {
url = pc.URL
}
if apiKey == "" {
apiKey = pc.APIKey
}
if apiSecret == "" {
apiSecret = pc.APISecret
}
}
}

var args []string
if url != "" {
args = append(args, "--url", url)
}
if apiKey != "" {
args = append(args, "--api-key", apiKey)
}
if apiSecret != "" {
args = append(args, "--api-secret", apiSecret)
}
return args, nil
}

func detectProject(cmd *cli.Command) (string, agentfs.ProjectType, string, error) {
projectDir, projectType, err := agentfs.DetectProjectRoot(".")
if err != nil {
return "", "", "", err
}
if !projectType.IsPython() {
return "", "", "", fmt.Errorf("currently only supports Python agents (detected: %s)", projectType)
}
entrypoint, err := findEntrypoint(projectDir, cmd.String("entrypoint"), projectType)
if err != nil {
return "", "", "", err
}
return projectDir, projectType, entrypoint, nil
}

func buildCLIArgs(subcmd string, cmd *cli.Command, loadOpts ...loadOption) ([]string, error) {
args := []string{subcmd}
if logLevel := cmd.String("log-level"); logLevel != "" {
args = append(args, "--log-level", logLevel)
}
creds, err := resolveCredentials(cmd, loadOpts...)
if err != nil {
return nil, err
}
args = append(args, creds...)
return args, nil
}

func runAgentStart(ctx context.Context, cmd *cli.Command) error {
projectDir, projectType, entrypoint, err := detectProject(cmd)
if err != nil {
return err
}

cliArgs, err := buildCLIArgs("start", cmd, quietOutput)
if err != nil {
return err
}

agent, err := startAgent(AgentStartConfig{
Dir: projectDir,
Entrypoint: entrypoint,
ProjectType: projectType,
CLIArgs: cliArgs,
ForwardOutput: os.Stdout,
})
if err != nil {
return err
}

// Take over signal handling from the global NotifyContext.
signal.Reset(syscall.SIGINT, syscall.SIGTERM)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

// Forward every signal to the agent — Python decides
// first = graceful shutdown, second = force exit.
go func() {
for range sigCh {
agent.Shutdown()
}
}()

// Wait for agent to exit
<-agent.exitCh
signal.Stop(sigCh)
return nil
}

func runAgentDev(ctx context.Context, cmd *cli.Command) error {
projectDir, projectType, entrypoint, err := detectProject(cmd)
if err != nil {
return err
}

cliArgs, err := buildCLIArgs("start", cmd, outputToStderr)
if err != nil {
return err
}
if cmd.String("log-level") == "" {
cliArgs = append(cliArgs, "--log-level", "DEBUG")
}

cfg := AgentStartConfig{
Dir: projectDir,
Entrypoint: entrypoint,
ProjectType: projectType,
CLIArgs: cliArgs,
ForwardOutput: os.Stdout,
}

fmt.Fprintf(os.Stderr, "Starting agent in dev mode (%s in %s)...\n", entrypoint, projectDir)

// Take over signal handling from the global NotifyContext.
signal.Reset(syscall.SIGINT, syscall.SIGTERM)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

if cmd.Bool("no-reload") {
// No reload — just run like start
agent, err := startAgent(cfg)
if err != nil {
return err
}

go func() {
for range sigCh {
agent.Shutdown()
}
}()

<-agent.exitCh
signal.Stop(sigCh)
return nil
}

// Dev mode with file watching
watcher, err := newAgentWatcher(cfg)
if err != nil {
return err
}

done := make(chan struct{})
doneOnce := sync.Once{}

// Forward signals to the current agent, and stop the watcher on first signal.
go func() {
for range sigCh {
doneOnce.Do(func() { close(done) })
if watcher.agent != nil {
watcher.agent.Shutdown()
}
}
}()

err = watcher.Run(done)
signal.Stop(sigCh)
return err
}
Loading