Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9340,6 +9340,13 @@
"PluginListResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"featuredPluginIds": {
"default": [],
"items": {
"type": "string"
},
"type": "array"
},
"marketplaces": {
"items": {
"$ref": "#/definitions/v2/PluginMarketplaceEntry"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6128,6 +6128,13 @@
"PluginListResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"featuredPluginIds": {
"default": [],
"items": {
"type": "string"
},
"type": "array"
},
"marketplaces": {
"items": {
"$ref": "#/definitions/PluginMarketplaceEntry"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@
}
},
"properties": {
"featuredPluginIds": {
"default": [],
"items": {
"type": "string"
},
"type": "array"
},
"marketplaces": {
"items": {
"$ref": "#/definitions/PluginMarketplaceEntry"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { PluginMarketplaceEntry } from "./PluginMarketplaceEntry";

export type PluginListResponse = { marketplaces: Array<PluginMarketplaceEntry>, remoteSyncError: string | null, };
export type PluginListResponse = { marketplaces: Array<PluginMarketplaceEntry>, remoteSyncError: string | null, featuredPluginIds: Array<string>, };
2 changes: 2 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3093,6 +3093,8 @@ pub struct PluginListParams {
pub struct PluginListResponse {
pub marketplaces: Vec<PluginMarketplaceEntry>,
pub remote_sync_error: Option<String>,
#[serde(default)]
pub featured_plugin_ids: Vec<String>,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/app-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ Example with notification opt-out:
- `experimentalFeature/list` — list feature flags with stage metadata (`beta`, `underDevelopment`, `stable`, etc.), enabled/default-enabled state, and cursor pagination. For non-beta flags, `displayName`/`description`/`announcement` are `null`.
- `collaborationMode/list` — list available collaboration mode presets (experimental, no pagination). This response omits built-in developer instructions; clients should either pass `settings.developer_instructions: null` when setting a mode to use Codex's built-in instructions, or provide their own instructions explicitly.
- `skills/list` — list skills for one or more `cwd` values (optional `forceReload`).
- `plugin/list` — list discovered plugin marketplaces and plugin state, including effective marketplace install/auth policy metadata. `interface.category` uses the marketplace category when present; otherwise it falls back to the plugin manifest category. Pass `forceRemoteSync: true` to refresh curated plugin state before listing (**under development; do not call from production clients yet**).
- `plugin/list` — list discovered plugin marketplaces and plugin state, including effective marketplace install/auth policy metadata and best-effort `featuredPluginIds` for the official curated marketplace. `interface.category` uses the marketplace category when present; otherwise it falls back to the plugin manifest category. Pass `forceRemoteSync: true` to refresh curated plugin state before listing (**under development; do not call from production clients yet**).
- `plugin/read` — read one plugin by `marketplacePath` plus `pluginName`, returning marketplace info, a list-style `summary`, manifest descriptions/interface metadata, and bundled skills/apps/MCP server names (**under development; do not call from production clients yet**).
- `skills/changed` — notification emitted when watched local skill files change.
- `app/list` — list available apps.
Expand Down
35 changes: 32 additions & 3 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ use codex_core::models_manager::collaboration_mode_presets::CollaborationModesCo
use codex_core::parse_cursor;
use codex_core::plugins::MarketplaceError;
use codex_core::plugins::MarketplacePluginSource;
use codex_core::plugins::OPENAI_CURATED_MARKETPLACE_NAME;
use codex_core::plugins::PluginInstallError as CorePluginInstallError;
use codex_core::plugins::PluginInstallRequest;
use codex_core::plugins::PluginReadRequest;
Expand Down Expand Up @@ -424,7 +425,10 @@ impl CodexMessageProcessor {
Ok(config) => self
.thread_manager
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(&config),
.maybe_start_curated_repo_sync_for_config(
&config,
self.thread_manager.auth_manager(),
),
Err(err) => warn!("failed to load latest config for curated plugin sync: {err:?}"),
}
}
Expand Down Expand Up @@ -5409,9 +5413,9 @@ impl CodexMessageProcessor {
}
};
let mut remote_sync_error = None;
let auth = self.auth_manager.auth().await;

if force_remote_sync {
let auth = self.auth_manager.auth().await;
match plugins_manager
.sync_plugins_from_remote(&config, auth.as_ref())
.await
Expand Down Expand Up @@ -5443,8 +5447,11 @@ impl CodexMessageProcessor {
};
}

let config_for_marketplace_listing = config.clone();
let plugins_manager_for_marketplace_listing = plugins_manager.clone();
let data = match tokio::task::spawn_blocking(move || {
let marketplaces = plugins_manager.list_marketplaces_for_config(&config, &roots)?;
let marketplaces = plugins_manager_for_marketplace_listing
.list_marketplaces_for_config(&config_for_marketplace_listing, &roots)?;
Ok::<Vec<PluginMarketplaceEntry>, MarketplaceError>(
marketplaces
.into_iter()
Expand Down Expand Up @@ -5490,12 +5497,34 @@ impl CodexMessageProcessor {
}
};

let featured_plugin_ids = if data
.iter()
.any(|marketplace| marketplace.name == OPENAI_CURATED_MARKETPLACE_NAME)
{
match plugins_manager
.featured_plugin_ids_for_config(&config, auth.as_ref())
.await
{
Ok(featured_plugin_ids) => featured_plugin_ids,
Err(err) => {
warn!(
error = %err,
"plugin/list featured plugin fetch failed; returning empty featured ids"
);
Vec::new()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: emit failure metric

}
}
} else {
Vec::new()
};

self.outgoing
.send_response(
request_id,
PluginListResponse {
marketplaces: data,
remote_sync_error,
featured_plugin_ids,
},
)
.await;
Expand Down
10 changes: 6 additions & 4 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,7 @@ impl MessageProcessor {
thread_manager
.plugins_manager()
.set_analytics_events_client(analytics_events_client.clone());
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.
thread_manager
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(&config);

let cloud_requirements = Arc::new(RwLock::new(cloud_requirements));
let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
auth_manager: auth_manager.clone(),
Expand All @@ -244,6 +241,11 @@ impl MessageProcessor {
feedback,
log_db,
});
// Keep plugin startup warmups aligned at app-server startup.
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.
thread_manager
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(&config, auth_manager.clone());
let config_api = ConfigApi::new(
config.codex_home.clone(),
cli_overrides,
Expand Down
11 changes: 6 additions & 5 deletions codex-rs/app-server/tests/suite/v2/plugin_install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,21 +261,22 @@ async fn plugin_install_tracks_analytics_event() -> Result<()> {
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());

let payloads = timeout(DEFAULT_TIMEOUT, async {
let payload = timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = analytics_server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
if !requests.is_empty() {
break requests;
if let Some(request) = requests.iter().find(|request| {
request.method == "POST" && request.url.path() == "/codex/analytics-events/events"
}) {
break request.body.clone();
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await?;
let payload: serde_json::Value =
serde_json::from_slice(&payloads[0].body).expect("analytics payload");
let payload: serde_json::Value = serde_json::from_slice(&payload).expect("analytics payload");
assert_eq!(
payload,
json!({
Expand Down
126 changes: 126 additions & 0 deletions codex-rs/app-server/tests/suite/v2/plugin_list.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;

use anyhow::Result;
use anyhow::bail;
use app_test_support::ChatGptAuthFixture;
use app_test_support::McpProcess;
use app_test_support::to_response;
Expand Down Expand Up @@ -674,6 +675,16 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/featured"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string(r#"["linear@openai-curated","calendar@openai-curated"]"#),
)
.mount(&server)
.await;

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
Expand All @@ -692,6 +703,13 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
.await??;
let response: PluginListResponse = to_response(response)?;
assert_eq!(response.remote_sync_error, None);
assert_eq!(
response.featured_plugin_ids,
vec![
"linear@openai-curated".to_string(),
"calendar@openai-curated".to_string(),
]
);

let curated_marketplace = response
.marketplaces
Expand Down Expand Up @@ -737,6 +755,114 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
Ok(())
}

#[tokio::test]
async fn plugin_list_fetches_featured_plugin_ids_without_chatgpt_auth() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
write_plugin_sync_config(codex_home.path(), &format!("{}/backend-api/", server.uri()))?;
write_openai_curated_marketplace(codex_home.path(), &["linear", "gmail"])?;

Mock::given(method("GET"))
.and(path("/backend-api/plugins/featured"))
.respond_with(ResponseTemplate::new(200).set_body_string(r#"["linear@openai-curated"]"#))
.mount(&server)
.await;

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;

let request_id = mcp
.send_plugin_list_request(PluginListParams {
cwds: None,
force_remote_sync: false,
})
.await?;

let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginListResponse = to_response(response)?;

assert_eq!(
response.featured_plugin_ids,
vec!["linear@openai-curated".to_string()]
);
assert_eq!(response.remote_sync_error, None);
Ok(())
}

#[tokio::test]
async fn plugin_list_uses_warmed_featured_plugin_ids_cache_on_first_request() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
write_plugin_sync_config(codex_home.path(), &format!("{}/backend-api/", server.uri()))?;
write_openai_curated_marketplace(codex_home.path(), &["linear", "gmail"])?;

Mock::given(method("GET"))
.and(path("/backend-api/plugins/featured"))
.respond_with(ResponseTemplate::new(200).set_body_string(r#"["linear@openai-curated"]"#))
.expect(1)
.mount(&server)
.await;

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
wait_for_featured_plugin_request_count(&server, 1).await?;

let request_id = mcp
.send_plugin_list_request(PluginListParams {
cwds: None,
force_remote_sync: false,
})
.await?;

let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginListResponse = to_response(response)?;

assert_eq!(
response.featured_plugin_ids,
vec!["linear@openai-curated".to_string()]
);
assert_eq!(response.remote_sync_error, None);
Ok(())
}

async fn wait_for_featured_plugin_request_count(
server: &MockServer,
expected_count: usize,
) -> Result<()> {
timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
bail!("wiremock did not record requests");
};
let featured_request_count = requests
.iter()
.filter(|request| {
request.method == "GET" && request.url.path().ends_with("/plugins/featured")
})
.count();
if featured_request_count == expected_count {
return Ok::<(), anyhow::Error>(());
}
if featured_request_count > expected_count {
bail!(
"expected exactly {expected_count} /plugins/featured requests, got {featured_request_count}"
);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}

fn write_installed_plugin(
codex_home: &TempDir,
marketplace_name: &str,
Expand Down
11 changes: 6 additions & 5 deletions codex-rs/app-server/tests/suite/v2/plugin_uninstall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,21 +183,22 @@ async fn plugin_uninstall_tracks_analytics_event() -> Result<()> {
let response: PluginUninstallResponse = to_response(response)?;
assert_eq!(response, PluginUninstallResponse {});

let payloads = timeout(DEFAULT_TIMEOUT, async {
let payload = timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = analytics_server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
if !requests.is_empty() {
break requests;
if let Some(request) = requests.iter().find(|request| {
request.method == "POST" && request.url.path() == "/codex/analytics-events/events"
}) {
break request.body.clone();
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await?;
let payload: serde_json::Value =
serde_json::from_slice(&payloads[0].body).expect("analytics payload");
let payload: serde_json::Value = serde_json::from_slice(&payload).expect("analytics payload");
assert_eq!(
payload,
json!({
Expand Down
Loading
Loading