Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/ee-repo-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
f9549c813b3dba5324ea9d1edacc8756a6d699bf
c3c543f4c60a8c4dfe0d912c79a051376fb091a9
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE worker_ping DROP COLUMN IF EXISTS uses_batch_http_pull;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE worker_ping ADD COLUMN IF NOT EXISTS uses_batch_http_pull BOOLEAN NOT NULL DEFAULT false;
82 changes: 80 additions & 2 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ use windmill_common::{
MODE_AND_ADDONS,
},
worker::{
is_native_mode_from_env, reload_custom_tags_setting, Connection, HUB_CACHE_DIR,
HUB_RT_CACHE_DIR, NATIVE_MODE_RESOLVED, TMP_LOGS_DIR, WINDMILL_DIR, WORKER_GROUP,
is_native_mode_from_env, reload_custom_tags_setting, Connection, HttpClient, HUB_CACHE_DIR,
HUB_RT_CACHE_DIR, NATIVE_MODE_RESOLVED, TMP_LOGS_DIR, USES_BATCH_HTTP_PULL, WINDMILL_DIR,
WORKER_GROUP,
},
KillpillSender, DEFAULT_HUB_BASE_URL, METRICS_ENABLED,
};
Expand Down Expand Up @@ -920,6 +921,20 @@ Windmill Community Edition {GIT_VERSION}
default_base_internal_url.clone()
};

// BATCH_PULL_URL: explicit URL for native workers to pull jobs via HTTP.
// In standalone mode (server_mode=true), defaults to the local server.
let batch_pull_url: Option<String> = if is_native_mode_from_env() {
if let Ok(url) = std::env::var("BATCH_PULL_URL") {
Some(url)
} else if server_mode {
Some(default_base_internal_url.clone())
} else {
None
}
} else {
None
};

initial_load(
&conn,
killpill_tx.clone(),
Expand Down Expand Up @@ -1130,6 +1145,30 @@ Windmill Community Edition {GIT_VERSION}
)?;
let mut workers = vec![];

// For native workers, create a self-signed JWT for batch pulling via HTTP.
// Enabled when BATCH_PULL_URL is set (explicitly or auto-detected in standalone mode).
let batch_pull_client = if let Some(ref pull_url) = batch_pull_url {
match create_native_batch_pull_client(pull_url).await {
Ok(client) => {
tracing::info!(
"Native batch pull client created for HTTP pull at {}",
pull_url
);
USES_BATCH_HTTP_PULL
.store(true, std::sync::atomic::Ordering::Relaxed);
Some(client)
}
Err(e) => {
tracing::warn!(
"Failed to create native batch pull client, falling back to SQL pull: {e:#}"
);
None
}
}
} else {
None
};

for i in 0..num_workers {
let suffix = if i == 0 && first_suffix.is_some() {
first_suffix.as_ref().unwrap().clone()
Expand All @@ -1153,6 +1192,7 @@ Windmill Community Edition {GIT_VERSION}
WORKER_GROUP.as_str(),
&suffix,
),
batch_pull_client: batch_pull_client.clone(),
};
workers.push(worker_conn);
}
Expand Down Expand Up @@ -1766,6 +1806,7 @@ fn display_config(envs: &[&str]) {
pub struct WorkerConn {
conn: Connection,
worker_name: String,
batch_pull_client: Option<HttpClient>,
}

pub async fn run_workers(
Expand Down Expand Up @@ -1836,6 +1877,7 @@ pub async fn run_workers(
let wk_conf = &workers[i as usize - 1];
let conn1 = wk_conf.conn.clone();
let worker_name = wk_conf.worker_name.clone();
let batch_pull_client = wk_conf.batch_pull_client.clone();
WORKERS_NAMES.write().await.push(worker_name.clone());
let ip = ip.clone();
let rx = killpill_rxs.pop().unwrap();
Expand All @@ -1858,6 +1900,7 @@ pub async fn run_workers(
rx,
tx,
&base_internal_url,
batch_pull_client.as_ref(),
);

// #[cfg(tokio_unstable)]
Expand All @@ -1876,6 +1919,41 @@ pub async fn run_workers(
Ok(())
}

/// Create an HTTP client for native workers to pull jobs from the local server's batch buffer.
/// Self-signs a JWT with native_mode=true using the same JWT secret the server uses.
async fn create_native_batch_pull_client(base_internal_url: &str) -> anyhow::Result<HttpClient> {
use windmill_common::agent_workers::{build_agent_http_client, AGENT_JWT_PREFIX};
use windmill_common::jwt::encode_with_internal_secret;

#[derive(serde::Serialize)]
struct NativeAgentAuth {
worker_group: String,
tags: Vec<String>,
native_mode: Option<bool>,
exp: usize,
}

let worker_config = windmill_common::worker::WORKER_CONFIG.read().await;
let tags = worker_config.worker_tags.clone();
drop(worker_config);

// Token expires in 30 days — renewed on restart
let exp = (chrono::Utc::now() + chrono::Duration::days(30)).timestamp() as usize;

let claims = NativeAgentAuth {
worker_group: WORKER_GROUP.to_string(),
tags,
native_mode: Some(true),
exp,
};

let jwt = encode_with_internal_secret(claims).await?;
let token = format!("{}{}", AGENT_JWT_PREFIX, jwt);

let suffix = create_default_worker_suffix(&HOSTNAME);
Ok(build_agent_http_client(&suffix, &token, base_internal_url))
}

async fn send_delayed_killpill(tx: &KillpillSender, mut max_delay_secs: u64, context: &str) {
if max_delay_secs == 0 {
max_delay_secs = 1;
Expand Down
2 changes: 1 addition & 1 deletion backend/summarized_schema.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ websocket_trigger: path(char), url(char), script_path(char), is_flow(bool), work
windmill_migrations: name(text), created_at(ts)
worker_group_job_stats: hour(bigint), worker_group(text), script_lang(char), workspace_id(char), job_count(int), total_duration_ms(bigint)
FK: (workspace_id) -> workspace(id)
worker_ping: worker(char), worker_instance(char), ping_at(ts), started_at(ts), ip(char), jobs_executed(int), custom_tags(text[]), worker_group(char), dedicated_worker(char), wm_version(char), current_job_id(uuid), current_job_workspace_id(char), vcpus(bigint), memory(bigint), occupancy_rate(float), memory_usage(bigint), wm_memory_usage(bigint), occupancy_rate_15s(float), occupancy_rate_5m(float), occupancy_rate_30m(float), job_isolation(text), dedicated_workers(text[])
worker_ping: worker(char), worker_instance(char), ping_at(ts), started_at(ts), ip(char), jobs_executed(int), custom_tags(text[]), worker_group(char), dedicated_worker(char), wm_version(char), current_job_id(uuid), current_job_workspace_id(char), vcpus(bigint), memory(bigint), occupancy_rate(float), memory_usage(bigint), wm_memory_usage(bigint), occupancy_rate_15s(float), occupancy_rate_5m(float), occupancy_rate_30m(float), job_isolation(text), dedicated_workers(text[]), native_mode(bool), uses_batch_http_pull(bool)
workspace: id(char), name(char), owner(char), deleted(bool), premium(bool), parent_workspace_id(char)
FK: (parent_workspace_id) -> workspace(id)
workspace_dependencies: id(bigint), name(char), content(text), language(script_lang), description(text), archived(bool), workspace_id(char), created_at(ts)
Expand Down
1 change: 1 addition & 0 deletions backend/tests/nativets_stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ fn spawn_workers(
rx,
tx2,
&base_internal_url,
None,
)
.await;
};
Expand Down
8 changes: 6 additions & 2 deletions backend/windmill-api-agent-workers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use windmill_common::DB;
use axum::Router;

#[cfg(not(feature = "private"))]
pub fn global_service(_job_completed_tx: windmill_worker::JobCompletedSender) -> Router {
pub fn global_service(
_job_completed_tx: windmill_worker::JobCompletedSender,
_batch_buffer: Option<()>,
) -> Router {
Router::new()
}

Expand All @@ -31,6 +34,7 @@ pub fn workspaced_service(
Router,
Vec<tokio::task::JoinHandle<()>>,
Option<windmill_worker::JobCompletedSender>,
Option<()>,
) {
use windmill_common::worker::Connection;
use windmill_worker::JobCompletedSender;
Expand All @@ -40,7 +44,7 @@ pub fn workspaced_service(

let router = Router::new();

(router, vec![], Some(job_completed_tx))
(router, vec![], Some(job_completed_tx), None)
}

#[cfg(not(feature = "private"))]
Expand Down
4 changes: 2 additions & 2 deletions backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5416,12 +5416,12 @@ async fn add_batch_jobs(
if dedicated_worker && path.is_some() {
windmill_common::worker::dedicated_worker_tag(&w_id, &path.clone().unwrap())
} else {
format!("{}", language.as_str())
language.as_worker_tag(false).to_string()
}
} else if let Some(tag) = batch_info.tag {
tag
} else {
format!("{}", language.as_str())
language.as_worker_tag(false).to_string()
};

let mut tx = user_db.begin(&authed).await?;
Expand Down
17 changes: 11 additions & 6 deletions backend/windmill-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,12 +493,16 @@ pub async fn run_server(
};

#[cfg(feature = "agent_worker_server")]
let (agent_workers_router, agent_workers_bg_processor, agent_workers_job_completed_tx) =
if server_mode {
windmill_api_agent_workers::workspaced_service(db.clone(), _base_internal_url.clone())
} else {
(Router::new(), vec![], None)
};
let (
agent_workers_router,
agent_workers_bg_processor,
agent_workers_job_completed_tx,
batch_buffer,
) = if server_mode {
windmill_api_agent_workers::workspaced_service(db.clone(), _base_internal_url.clone())
} else {
(Router::new(), vec![], None, None)
};

#[cfg(feature = "agent_worker_server")]
let agent_cache = Arc::new(AgentCache::new());
Expand Down Expand Up @@ -684,6 +688,7 @@ pub async fn run_server(
{
windmill_api_agent_workers::global_service(
agent_workers_job_completed_tx,
batch_buffer.clone(),
)
.layer(Extension(agent_cache.clone()))
} else {
Expand Down
Loading
Loading