From c6e82abaf71287fb1cf80f1a046e9d3045115392 Mon Sep 17 00:00:00 2001 From: Keyur Shah Date: Tue, 9 Jun 2026 09:32:45 -0700 Subject: [PATCH 1/6] Integrate Mixer-side agent APIs under USE_MIXER_AGENT_APIS flag --- .../datacommons-mcp/datacommons_mcp/app.py | 32 ++- .../datacommons_mcp/data_models/settings.py | 26 ++- .../tools/mixer/get_observations.md | 64 ++++++ .../tools/mixer/search_indicators.md | 186 ++++++++++++++++++ .../datacommons_mcp/mixer_client.py | 70 +++++++ .../datacommons_mcp/mixer_service.py | 60 ++++++ .../datacommons-mcp/datacommons_mcp/server.py | 8 +- .../datacommons-mcp/datacommons_mcp/tools.py | 24 +++ .../tests/test_mixer_service.py | 162 +++++++++++++++ uv.lock | 4 +- 10 files changed, 617 insertions(+), 19 deletions(-) create mode 100644 packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/get_observations.md create mode 100644 packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/search_indicators.md create mode 100644 packages/datacommons-mcp/datacommons_mcp/mixer_client.py create mode 100644 packages/datacommons-mcp/datacommons_mcp/mixer_service.py create mode 100644 packages/datacommons-mcp/tests/test_mixer_service.py diff --git a/packages/datacommons-mcp/datacommons_mcp/app.py b/packages/datacommons-mcp/datacommons_mcp/app.py index f72a0b6..9b97c81 100644 --- a/packages/datacommons-mcp/datacommons_mcp/app.py +++ b/packages/datacommons-mcp/datacommons_mcp/app.py @@ -26,6 +26,7 @@ from datacommons_mcp import settings from datacommons_mcp.clients import create_dc_client +from datacommons_mcp.mixer_client import MixerClient from datacommons_mcp.utils import read_external_content, read_package_content from datacommons_mcp.version import __version__ @@ -54,20 +55,39 @@ def __init__(self) -> None: logger.error("Settings error: %s", e) raise - # Create client - try: - self.client = create_dc_client(self.settings) - except Exception as e: - logger.error("Failed to create DC client: %s", e) - raise + # Create client only if mixer APIs are NOT enabled (as fallback is not needed) + self.client = None + if not self.settings.use_mixer_agent_apis: + try: + self.client = create_dc_client(self.settings) + except Exception as e: + logger.error("Failed to create DC client: %s", e) + raise + + # Create mixer client only if enabled + self.mixer_client = None + if self.settings.use_mixer_agent_apis: + api_root = self.settings.api_root or "https://api.datacommons.org/v2" + api_key = getattr(self.settings, "api_key", None) + self.mixer_client = MixerClient(api_root=api_root, api_key=api_key) # Load Server Instructions server_instructions = self._load_instructions(SERVER_INSTRUCTIONS_FILE) + from contextlib import asynccontextmanager + + @asynccontextmanager + async def lifespan(server: FastMCP): + yield {} + if self.mixer_client: + logger.info("Closing Mixer client...") + await self.mixer_client.close() + self.mcp = FastMCP( MCP_SERVER_NAME, version=__version__, instructions=server_instructions, + lifespan=lifespan, ) def _load_instructions(self, filename: str) -> str: diff --git a/packages/datacommons-mcp/datacommons_mcp/data_models/settings.py b/packages/datacommons-mcp/datacommons_mcp/data_models/settings.py index 2ade789..942a398 100644 --- a/packages/datacommons-mcp/datacommons_mcp/data_models/settings.py +++ b/packages/datacommons-mcp/datacommons_mcp/data_models/settings.py @@ -35,8 +35,8 @@ class DCSettingsSelector(BaseSettings): ) -class DCSettings(BaseSettings): - """Settings for base Data Commons instance.""" +class DCSettingsBase(BaseSettings): + """Base settings class containing shared configurations.""" model_config = _MODEL_CONFIG @@ -53,8 +53,20 @@ class DCSettings(BaseSettings): description="Directory containing custom instruction files (markdown overrides)", ) + use_mixer_agent_apis: bool = Field( + default=False, + alias="USE_MIXER_AGENT_APIS", + description="Use the new Mixer-side agent endpoints instead of local processing", + ) + + api_root: str | None = Field( + default=None, + alias="DC_API_ROOT", + description="API root for Data Commons", + ) -class BaseDCSettings(DCSettings): + +class BaseDCSettings(DCSettingsBase): """Settings for base Data Commons instance.""" def __init__(self, **kwargs: dict[str, Any]) -> None: @@ -76,11 +88,7 @@ def __init__(self, **kwargs: dict[str, Any]) -> None: alias="DC_BASE_ROOT_TOPIC_DCIDS", description="List of root topic DCIDs for base DC", ) - api_root: str | None = Field( - default=None, - alias="DC_API_ROOT", - description="API root for local api instance", - ) + @field_validator("topic_cache_paths", "base_root_topic_dcids", mode="before") @classmethod @@ -88,7 +96,7 @@ def parse_list_like_parameter(cls, v: str) -> list[str] | None: return _parse_list_like_parameter(v) -class CustomDCSettings(DCSettings): +class CustomDCSettings(DCSettingsBase): """Settings for custom Data Commons instance.""" model_config = _MODEL_CONFIG diff --git a/packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/get_observations.md b/packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/get_observations.md new file mode 100644 index 0000000..fb9f701 --- /dev/null +++ b/packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/get_observations.md @@ -0,0 +1,64 @@ +Fetches observations for a statistical variable from Data Commons. + +**CRITICAL: Always validate variable-place combinations first** +- You **MUST** call `search_indicators` first to verify that the variable exists for the specified place +- Only use DCIDs returned by `search_indicators` - never guess or assume variable-place combinations +- This ensures data availability and prevents errors from invalid combinations + +This tool can operate in two primary modes: +1. **Single Place Mode**: Get data for one specific place (e.g., "Population of California"). +2. **Child Places Mode**: Get data for all child places of a certain type within a parent place (e.g., "Population of all counties in California"). + +### Core Logic & Rules + +* **Variable Selection**: You **must** provide the `variable_dcid`. + * Variable DCIDs are unique identifiers for statistical variables in Data Commons and are returned by prior calls to the + `search_indicators` tool. + +* **Place Selection**: You **must** provide the `place_dcid`. + +* **Mode Selection**: + * To get data for the specified place (e.g., California), **do not** provide `child_place_type`. + * To get data for all its children (e.g., all counties in California), you **must also** provide the `child_place_type` (e.g., "County"). + **CRITICAL:** Before calling `get_observations` with `child_place_type`, you **MUST** first call `search_indicators` with child sampling to determine the correct child place type. + **Child Type Determination Logic:** + 1. Use the `dcid_place_type_mappings` or inline `typeOf` fields from the `search_indicators` response to examine the types of sampled child places + 2. Use the type that is common to ALL sampled child places + 3. If more than one type is common to all child places, use the most specific type + 4. If there is no common type across all sampled child places, use the majority type (50%+ threshold) if there's a clear majority + 5. If there is no common type and no clear majority, this tool cannot be called with child-place mode - fall back to single-place mode `get_observations` calls for each place + **Note:** If you used child sampling in `search_indicators` to validate variable existence, you should still get data for ALL children of that type, not just the sampled subset. + +* **Data Volume Constraint**: When using **Child Places Mode** (when `child_place_type` is set), you **must** be conservative with your date range to avoid requesting too much data. + * Avoid requesting `'all'` data via the `date` parameter. + * **Instead, you must either request the `'latest'` data or provide a specific, bounded date range.** + +* **Date Filtering**: The tool filters observations by date using the following priority: + 1. **`date`**: The `date` parameter is optional and can be one of the values 'all', 'latest', 'range', or a date string in the format 'YYYY', 'YYYY-MM', or 'YYYY-MM-DD'. + 2. **Date Range**: If `date` is set to 'range', you must specify a date range using `date_range_start` and/or `date_range_end`. + * If only `date_range_start` is specified, then the response will contain all observations starting at and after that date (inclusive). + * If only `date_range_end` is specified, then the response will contain all observations before and up to that date (inclusive). + * If both are specified, the response contains observations within the provided range (inclusive). + * Dates must be in `YYYY`, `YYYY-MM`, or `YYYY-MM-DD` format. + 3. **Default Behavior**: If you do not provide **any** date parameters (`date`, `date_range_start`, or `date_range_end`), the tool will automatically fetch only the `'latest'` observation. + +Args: + variable_dcid (str, required): The unique identifier (DCID) of the statistical variable. + place_dcid (str, required): The DCID of the place. + child_place_type (str, optional): The type of child places to get data for. **Use this to switch to Child Places Mode.** + source_override (str, optional): An optional source ID to force the use of a specific data source. + date (str, optional): An optional date filter. Accepts 'all', 'latest', 'range', or single date values of the format 'YYYY', 'YYYY-MM', or 'YYYY-MM-DD'. Defaults to 'latest' if no date parameters are provided. + date_range_start (str, optional): The start date for a range (inclusive). **Used only if `date` is set to'range'.** + date_range_end (str, optional): The end date for a range (inclusive). **Used only if `date` is set to'range'.** + +Returns: + The fetched observation data directly from Data Commons Mixer. + The response format contains: + - `variable`: Details about the statistical variable requested (dcid, name, typeOf). + - `resolvedParentPlace`: The resolved node information for the parent place, if one was provided. + - `childPlaceType`: The place type of the children observations if ContainedIn mode was used. + - `placeObservations`: A list of observations, one entry per place. Each entry contains: + - `place`: Details about the observed place (dcid, name, typeOf). + - `timeSeries`: A list of point objects containing `{"date": "...", "value": ...}` where value is a number. + - `sourceMetadata`: Information about the primary data source used (sourceId, importName, measurementMethod, unit, etc.). + - `alternativeSources`: Details about other available data sources. diff --git a/packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/search_indicators.md b/packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/search_indicators.md new file mode 100644 index 0000000..e435be1 --- /dev/null +++ b/packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/search_indicators.md @@ -0,0 +1,186 @@ +**Purpose:** +Search for topics and variables (collectively called "indicators") available in the Data Commons Knowledge Graph. + +**Core Concept: Results are Candidates** +This tool returns *candidate* indicators that match your query. You must always filter and rank these results based on the user's context to find the most relevant one. + +**Background: Data Commons Structure** +Data Commons organizes data in two main hierarchies: + +1. **Topics:** A hierarchy of categories (e.g., `Health` -> `Clinical Data` -> `Medical Conditions`). Topics contain sub-topics and member variables. + +2. **Places:** A hierarchy of geographic containment (e.g., `World` -> `Continent` -> `Country` -> `State`). + +**CRITICAL DATA PRINCIPLE:** +The *same* statistical concept (e.g., "Population") might use *different* indicator DCIDs for different place types (e.g., one DCID for `Country` and another for `State`). This tool is essential for discovering *which* specific indicators are available for the `places` you are querying. + +**Efficiency Tips:** + +* Data coverage is generally high at the `Country` level. + +* Fetching direct children of a place (e.g., states in a country) is efficient. + +### Parameters + +**1. `query` (str, required)** + + - The search query for indicators (topics or variables). + - **Examples:** `"health grants"`, `"carbon emissions"`, `"unemployment rate"` + - **CRITICAL RULES:** + * Search for one concept at a time to get focused results. + - Instead of: "health and unemployment rate" (single search) + - Use: "health" and "unemployment rate" as separate searches + +**2. `places` (list[str], optional)** + + - A list of English, human-readable place names to filter indicators by. + - If provided, the tool will only return indicators that have data for at least one of the specified places. + - When `parent_place` is used, this parameter should **only** contain a sample of child places. + - When `parent_place` is **not** used, this can contain any place. + +**3. `parent_place` (str, optional)** + + - An English, human-readable name for a parent place. + - Use this **only** when searching for indicators about a *type* of child place (e.g., "states in India"). + - When using this parameter, you **must** also provide a sample of child places in the `places` parameter. + +**Place Name Qualification (CRITICAL):** +The following rules apply to **both** the `places` and `parent_place` parameters. + + - **ALWAYS qualify place names** with geographic context to avoid ambiguity (e.g., `"California, USA"`, `"Paris, France"`, `"Springfield, IL"`). + + - **ALWAYS specify administrative level** when ambiguous: + - For the city: `"Madrid, Spain"` + - For the autonomous community: `"Community of Madrid, Spain"` + - Similarly, differentiate between `"New York City, USA"` and `"New York State, USA"`. + + - **Common Ambiguous Cases:** + - **New York:** `"New York City, USA"` vs `"New York State, USA"` + - **Madrid:** `"Madrid, Spain"` (city) vs `"Community of Madrid, Spain"` + - **London:** `"London, UK"` (city) vs `"London, Ontario, Canada"` + - **Washington:** `"Washington, DC, USA"` vs `"Washington State, USA"` + - **Springfield:** `"Springfield, IL, USA"` vs `"Springfield, MO, USA"` (add state) + + - **NEVER** use DCIDs (e.g., `"geoId/06"`, `"country/CAN"`). + - If you get place info from another tool, extract and use *only* the readable name, but always qualify it with geographic context. + - When searching for indicators related to child places within a larger geographic entity (e.g., states within a country, or countries within a continent/the world), you MUST include the parent entity in the `parent_place` parameter and a diverse sample of 5-6 of its child places in the `places` list. + - This ensures the discovery of indicators that have data at the child place level. Refer to 'Recipe 2: Sampling Child Places' for detailed examples. + +**How to Use Place Parameters (Recipes):** + + - **Recipe 1: Data for a Specific Place** + - **Goal:** Find an indicator *about* a single place (e.g., "population of France"). + - **Call:** `query="population"`, `places=["France"]` + + - **Recipe 2: Sampling Child Places** + - **Goal:** Check data availability for a *type* of child place (e.g., "population of Indian states" or "highest GDP countries" or "top 5 US states with lowest unemployment rate"). + - **Action:** You must *proxy* this request by sampling a few children. + - **Example 1: Child places of a country** + - **Call:** + * `query="population"` + * `parent_place="India"` + * `places=["Uttar Pradesh, India", "Maharashtra, India", "Tripura, India", "Bihar, India", "Kerala, India"]` + - **Logic:** + 1. Include the parent place ("India"). The tool uses this for context and to return its DCID. + 2. Include 5-6 *diverse* child places (e.g., try to pick large/small, north/south/east/west, if known). + 3. The results for these 5-6 places are a *proxy* for all children. + 4. If a sampled child place shows data for an indicator, assume that data is available for all child places of that type for that indicator. + Conversely, if, after sampling, no child place shows data for a specific indicator, assume that data is not available for any of the child places for that indicator. + + - **Example 2: Child places of the World (Countries)** + - **Call:** + * `query="GDP"` + * `parent_place="World"` + * `places=["USA", "China", "Germany", "Nigeria", "Brazil"]` + - **Logic:** + 1. Include the parent place ("World"). + 2. Include 5-6 *diverse* child countries (e.g., from different continents, different economies). + 3. This sampling helps discover the correct indicator DCID used for the `Country` place type, which you can then use in other tools (like `get_observations` with the parent's DCID in the `place_dcid` parameter and `child_place_type='Country'`). + + - **Example 3: Administrative Level Sampling** + - **Goal:** Check data availability for different administrative levels (e.g., "population of US cities" vs "population of US states"). + - **Call:** + * **For Cities:** `query="population"`, `parent_place="Middle Earth"`, `places=["New York City, USA", "Los Angeles, USA", "Chicago, USA", "Houston, USA", "Phoenix, USA"]` + * **For States:** `query="population"`, `parent_place="USA"`, `places=["California, USA", "Texas, USA", "Florida, USA", "New York State, USA", "Pennsylvania, USA"]` + - **Logic:** Specify the exact administrative level you want to sample to avoid confusion between city and state data. + + - **Recipe 3: No Place Filtering** + - **Goal:** Find indicators for a query without checking any specific place (e.g., "what trade data do you have"). + - **Call:** `query="trade"`. Do not set `places` or `parent_place`. + - **Result:** The tool returns matching indicators, but `places_with_data` will be empty. + +**4. `per_search_limit` (int, optional, default=10, max=100)** + - Maximum results per search. + - **CRITICAL RULE:** Only set per_search_limit when explicitly requested by the user. + - Use the default value (10) unless the user specifies a different limit + - Don't assume the user wants more or fewer results + +**5. `include_topics` (bool, optional, default=True)** + - **Primary Rule:** If a user explicitly states what they want, follow their request. Otherwise, use these guidelines: + - **`include_topics = True` (Default): For Exploration & Discovery** + - **Purpose:** To explore the data hierarchy and find related variables. + - **Use when:** + - The user is exploring (e.g., "what basic health data do you have?"). + - You need to understand how data is organized to ask a better follow-up. + - **Returns:** Both topics (categories) and variables. + - **`include_topics = False`: For Specific Data** + - **Purpose:** To find a specific variable for fetching data. + - **Use when:** + - The user's goal is to get a specific number or dataset (e.g., "find unemployment rate for United States"). + - **Returns:** Variables only. + +### Special Query Scenarios + +**Scenario 1: Vague, Unqualified Queries ("what data do you have?")** + - **Action:** If a user asks a general question about available data, proactively call the tool for "World" to provide an initial overview. + - **Call:** `query=""`, `places=["World"]`, `include_topics=True` + - **Result:** This returns the top-level topics for the World. + - **Agent Follow-up:** After showing the World data, consider asking if the user would like to see data for a different, more specific place if it seems helpful for the conversation. + - **Example agent response:** "Here is a general overview of the data topics available for the World. You can also ask for this information for a specific place, like 'Africa', 'India', 'California, USA', or 'Paris, France'." + +**Scenario 2: Ambiguous Place Names** + - **Problem 1:** Geographic ambiguity - User asks for "Scotland", tool returns "Scotland County, USA". + - **Solution:** Re-run with qualified name: `places=["Scotland, UK"]` + - **Problem 2:** Administrative level ambiguity - User asks for "New York", tool returns state-level data when city-level was intended. + - **Solution:** Specify administrative level: `places=["New York City, USA"]` vs `places=["New York State, USA"]` + +### Response Structure + +Returns a dictionary containing candidate indicators directly from the Data Commons Mixer. +```json +{ + "status": "SUCCESS", + "resolvedParentPlace": { + "dcid": "geoId/06", + "name": "California", + "typeOf": ["State"] + }, + "topics": [ + { + "dcid": "dc/topic/Health", + "name": "Health", + "typeOf": ["Topic"], + "memberTopics": ["dc/topic/ClinicalData"], + "memberVariables": ["Count_Person_Obese"], + "placesWithData": ["geoId/06"] + } + ], + "variables": [ + { + "dcid": "Count_Person_Obese", + "name": "Obese population", + "typeOf": ["StatisticalVariable"], + "placesWithData": ["geoId/06"] + } + ] +} +``` + +### How to Process the Response + + - `topics`: (Only if `include_topics=True`) Collections of variables and sub-topics. Contains dcid, name, and typeOf inline. + - `variables`: Individual data indicators. Contains dcid, name, and typeOf inline. + - `placesWithData`: A list of place DCIDs for which the indicator has data. + - `resolvedParentPlace`: (Only if `parent_place` was in the request) The resolved node information for the parent place. + +**Final Reminder:** Always treat results as *candidates*. You must filter and rank them based on the user's full context. diff --git a/packages/datacommons-mcp/datacommons_mcp/mixer_client.py b/packages/datacommons-mcp/datacommons_mcp/mixer_client.py new file mode 100644 index 0000000..2e4c2f9 --- /dev/null +++ b/packages/datacommons-mcp/datacommons_mcp/mixer_client.py @@ -0,0 +1,70 @@ +# Copyright 2026 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Client for interacting with the Data Commons Mixer service. +""" + +import httpx + +from datacommons_mcp.version import __version__ + + +class MixerClient: + """Async client for interacting with Mixer-side agent endpoints.""" + + def __init__(self, api_root: str, api_key: str | None = None) -> None: + """Initialize the MixerClient. + + Args: + api_root: The base API root URL (e.g. 'https://api.datacommons.org/v2'). + api_key: Optional API key for authentication. + """ + self.api_root = api_root.rstrip("/") + self.api_key = api_key + self.headers = { + "Content-Type": "application/json", + "x-surface": f"mcp-{__version__}", + } + if api_key: + self.headers["X-API-Key"] = api_key + self.timeout = 30.0 # 30 seconds default timeout + self._client = None + + @property + def client(self) -> httpx.AsyncClient: + """Lazily initialize the AsyncClient under the active event loop.""" + if self._client is None: + self._client = httpx.AsyncClient(headers=self.headers, timeout=self.timeout) + return self._client + + async def post(self, endpoint: str, payload: dict) -> dict: + """Perform an asynchronous POST request to the specified endpoint. + + Args: + endpoint: The API endpoint (e.g. 'agent/get_observations'). + payload: The dictionary to send as JSON payload. + + Returns: + The parsed JSON response as a dictionary. + """ + url = f"{self.api_root}/{endpoint}" + response = await self.client.post(url, json=payload) + response.raise_for_status() + return response.json() + + async def close(self) -> None: + """Close the underlying HTTP client if it was initialized.""" + if self._client is not None: + await self._client.aclose() + self._client = None diff --git a/packages/datacommons-mcp/datacommons_mcp/mixer_service.py b/packages/datacommons-mcp/datacommons_mcp/mixer_service.py new file mode 100644 index 0000000..9a38f99 --- /dev/null +++ b/packages/datacommons-mcp/datacommons_mcp/mixer_service.py @@ -0,0 +1,60 @@ +# Copyright 2026 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Service layer for calling Mixer-side agent APIs. +""" + +from typing import Any + +from datacommons_mcp.app import app + + +async def get_observations( + variable_dcid: str, + place_dcid: str, + child_place_type: str | None = None, + source_override: str | None = None, + date: str | None = None, + date_range_start: str | None = None, + date_range_end: str | None = None, +) -> dict[str, Any]: + """Fetches observations via the Mixer-side agent/get_observations endpoint.""" + payload = { + "variable_dcid": variable_dcid, + "place_dcid": place_dcid, + "child_place_type": child_place_type, + "source_override": source_override, + "date": date, + "date_range_start": date_range_start, + "date_range_end": date_range_end, + } + return await app.mixer_client.post("agent/get_observations", payload) + + +async def search_indicators( + query: str, + places: list[str] | None = None, + parent_place: str | None = None, + per_search_limit: int = 10, + include_topics: bool = True, +) -> dict[str, Any]: + """Searches for indicators via the Mixer-side agent/search_indicators endpoint.""" + payload = { + "query": query, + "places": places or [], + "parent_place": parent_place, + "per_search_limit": per_search_limit, + "include_topics": include_topics, + } + return await app.mixer_client.post("agent/search_indicators", payload) diff --git a/packages/datacommons-mcp/datacommons_mcp/server.py b/packages/datacommons-mcp/datacommons_mcp/server.py index 785d294..37f425e 100644 --- a/packages/datacommons-mcp/datacommons_mcp/server.py +++ b/packages/datacommons-mcp/datacommons_mcp/server.py @@ -39,5 +39,9 @@ async def health_check(request: Request) -> JSONResponse: # noqa: ARG001 reques # Register tools -app.register_tool(tools.get_observations, tools.GET_OBSERVATIONS_INSTRUCTION_FILE) -app.register_tool(tools.search_indicators, tools.SEARCH_INDICATORS_INSTRUCTION_FILE) +if app.settings.use_mixer_agent_apis: + app.register_tool(tools.get_observations, "tools/mixer/get_observations.md") + app.register_tool(tools.search_indicators, "tools/mixer/search_indicators.md") +else: + app.register_tool(tools.get_observations, tools.GET_OBSERVATIONS_INSTRUCTION_FILE) + app.register_tool(tools.search_indicators, tools.SEARCH_INDICATORS_INSTRUCTION_FILE) diff --git a/packages/datacommons-mcp/datacommons_mcp/tools.py b/packages/datacommons-mcp/datacommons_mcp/tools.py index 5c763c2..fc9a207 100644 --- a/packages/datacommons-mcp/datacommons_mcp/tools.py +++ b/packages/datacommons-mcp/datacommons_mcp/tools.py @@ -28,6 +28,10 @@ from datacommons_mcp.services import ( search_indicators as search_indicators_service, ) +from datacommons_mcp.mixer_service import ( + get_observations as mixer_get_observations, + search_indicators as mixer_search_indicators, +) if TYPE_CHECKING: from datacommons_mcp.data_models.search import ( @@ -50,6 +54,17 @@ async def get_observations( date_range_end: str | None = None, ) -> dict: """Fetches observations for a statistical variable from Data Commons.""" + if app.settings.use_mixer_agent_apis: + return await mixer_get_observations( + variable_dcid=variable_dcid, + place_dcid=place_dcid, + child_place_type=child_place_type, + source_override=source_override, + date=date, + date_range_start=date_range_start, + date_range_end=date_range_end, + ) + response: ObservationToolResponse = await get_observations_service( client=app.client, variable_dcid=variable_dcid, @@ -74,6 +89,15 @@ async def search_indicators( maybe_bilateral: bool = False, ) -> dict: """Searches for indicators (topics and variables) in Data Commons.""" + if app.settings.use_mixer_agent_apis: + return await mixer_search_indicators( + query=query, + places=places, + parent_place=parent_place, + per_search_limit=per_search_limit, + include_topics=include_topics, + ) + response: SearchResponse = await search_indicators_service( client=app.client, query=query, diff --git a/packages/datacommons-mcp/tests/test_mixer_service.py b/packages/datacommons-mcp/tests/test_mixer_service.py new file mode 100644 index 0000000..599f524 --- /dev/null +++ b/packages/datacommons-mcp/tests/test_mixer_service.py @@ -0,0 +1,162 @@ +# Copyright 2026 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Tests for MixerClient, mixer_service, and feature flag routing. +""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from datacommons_mcp.app import app +from datacommons_mcp.mixer_client import MixerClient +from datacommons_mcp.mixer_service import get_observations, search_indicators +from datacommons_mcp.tools import ( + get_observations as tools_get_obs, +) +from datacommons_mcp.tools import ( + search_indicators as tools_search_ind, +) + + +@pytest.mark.asyncio +async def test_mixer_client_post(): + """Verify MixerClient correctly sends payload and headers to the endpoint.""" + client = MixerClient(api_root="https://api.datacommons.org/v2", api_key="test-api-key") + assert client.headers["X-API-Key"] == "test-api-key" + + mock_response = MagicMock() + mock_response.json.return_value = {"status": "SUCCESS", "data": "test"} + mock_response.raise_for_status = lambda: None + + with patch.object(client.client, "post", return_value=mock_response) as mock_post: + result = await client.post("agent/test_endpoint", {"param": "value"}) + + assert result == {"status": "SUCCESS", "data": "test"} + mock_post.assert_called_once_with( + "https://api.datacommons.org/v2/agent/test_endpoint", + json={"param": "value"} + ) + + await client.close() + + +@pytest.mark.asyncio +async def test_mixer_service_get_observations(): + """Verify get_observations builds correct payload and invokes mixer_client.""" + mock_client = AsyncMock() + mock_client.post.return_value = {"placeObservations": []} + + with patch.object(app, "mixer_client", mock_client): + result = await get_observations( + variable_dcid="Count_Person", + place_dcid="geoId/06", + child_place_type="County", + source_override="USCensus", + date="latest", + date_range_start="2020", + date_range_end="2022" + ) + assert result == {"placeObservations": []} + mock_client.post.assert_called_once_with( + "agent/get_observations", + { + "variable_dcid": "Count_Person", + "place_dcid": "geoId/06", + "child_place_type": "County", + "source_override": "USCensus", + "date": "latest", + "date_range_start": "2020", + "date_range_end": "2022", + } + ) + + +@pytest.mark.asyncio +async def test_mixer_service_search_indicators(): + """Verify search_indicators builds correct payload and invokes mixer_client.""" + mock_client = AsyncMock() + mock_client.post.return_value = {"variables": []} + + with patch.object(app, "mixer_client", mock_client): + result = await search_indicators( + query="unemployment", + places=["California"], + parent_place="USA", + per_search_limit=5, + include_topics=False + ) + assert result == {"variables": []} + mock_client.post.assert_called_once_with( + "agent/search_indicators", + { + "query": "unemployment", + "places": ["California"], + "parent_place": "USA", + "per_search_limit": 5, + "include_topics": False, + } + ) + + +@pytest.mark.asyncio +async def test_tools_routing_mixer_enabled(): + """Verify tool functions delegate to mixer_service when feature flag is enabled.""" + with patch.object(app.settings, "use_mixer_agent_apis", True): + with patch("datacommons_mcp.tools.mixer_get_observations", new_callable=AsyncMock) as mock_mixer_get_obs: + mock_mixer_get_obs.return_value = {"mixer_obs": True} + result = await tools_get_obs( + variable_dcid="Count_Person", + place_dcid="geoId/06" + ) + assert result == {"mixer_obs": True} + mock_mixer_get_obs.assert_called_once() + + with patch("datacommons_mcp.tools.mixer_search_indicators", new_callable=AsyncMock) as mock_mixer_search_ind: + mock_mixer_search_ind.return_value = {"mixer_search": True} + result = await tools_search_ind( + query="unemployment", + places=["California"] + ) + assert result == {"mixer_search": True} + mock_mixer_search_ind.assert_called_once() + + +@pytest.mark.asyncio +async def test_tools_routing_mixer_disabled(): + """Verify tool functions delegate to old local services when feature flag is disabled.""" + with patch.object(app.settings, "use_mixer_agent_apis", False): + with patch("datacommons_mcp.tools.get_observations_service", new_callable=AsyncMock) as mock_local_get_obs: + mock_response = MagicMock() + mock_response.model_dump.return_value = {"local_obs": True} + mock_local_get_obs.return_value = mock_response + + result = await tools_get_obs( + variable_dcid="Count_Person", + place_dcid="geoId/06" + ) + assert result == {"local_obs": True} + mock_local_get_obs.assert_called_once() + + with patch("datacommons_mcp.tools.search_indicators_service", new_callable=AsyncMock) as mock_local_search_ind: + mock_response = MagicMock() + mock_response.model_dump.return_value = {"local_search": True} + mock_local_search_ind.return_value = mock_response + + result = await tools_search_ind( + query="unemployment", + places=["California"] + ) + assert result == {"local_search": True} + mock_local_search_ind.assert_called_once() diff --git a/uv.lock b/uv.lock index b9f1f47..87b3c2f 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.11, <3.14" resolution-markers = [ "python_full_version >= '3.13'", @@ -474,7 +474,7 @@ wheels = [ [[package]] name = "datacommons-mcp" -version = "1.2.0" +version = "1.2.1" source = { editable = "packages/datacommons-mcp" } dependencies = [ { name = "datacommons-client" }, From a5e94066925b0d7c099c234a98ae52e574f354f1 Mon Sep 17 00:00:00 2001 From: Keyur Shah Date: Tue, 9 Jun 2026 14:35:08 -0700 Subject: [PATCH 2/6] Refactor Mixer tools into separate file and add logging decorator --- .../datacommons-mcp/datacommons_mcp/app.py | 4 +- .../datacommons_mcp/exceptions.py | 9 ++ .../datacommons_mcp/mixer_client.py | 39 +++++- .../datacommons_mcp/mixer_service.py | 17 ++- .../datacommons_mcp/mixer_tools.py | 63 +++++++++ .../datacommons-mcp/datacommons_mcp/server.py | 5 +- .../datacommons-mcp/datacommons_mcp/tools.py | 24 ---- .../tests/test_mixer_service.py | 125 +++++++++++------- 8 files changed, 206 insertions(+), 80 deletions(-) create mode 100644 packages/datacommons-mcp/datacommons_mcp/mixer_tools.py diff --git a/packages/datacommons-mcp/datacommons_mcp/app.py b/packages/datacommons-mcp/datacommons_mcp/app.py index 9b97c81..84976bf 100644 --- a/packages/datacommons-mcp/datacommons_mcp/app.py +++ b/packages/datacommons-mcp/datacommons_mcp/app.py @@ -17,7 +17,7 @@ import json import logging -from collections.abc import Callable +from collections.abc import AsyncIterator, Callable from typing import Any from fastmcp import FastMCP @@ -77,7 +77,7 @@ def __init__(self) -> None: from contextlib import asynccontextmanager @asynccontextmanager - async def lifespan(server: FastMCP): + async def lifespan(_server: FastMCP) -> AsyncIterator[dict[str, Any]]: yield {} if self.mixer_client: logger.info("Closing Mixer client...") diff --git a/packages/datacommons-mcp/datacommons_mcp/exceptions.py b/packages/datacommons-mcp/datacommons_mcp/exceptions.py index 60bb5e3..1c4d849 100644 --- a/packages/datacommons-mcp/datacommons_mcp/exceptions.py +++ b/packages/datacommons-mcp/datacommons_mcp/exceptions.py @@ -45,3 +45,12 @@ class APIKeyValidationError(_ErrorStrMixin, Exception): class InvalidAPIKeyError(APIKeyValidationError): """Raised when the API key is determined to be invalid.""" + + +class MixerAPIError(Exception): + """Raised when the Data Commons Mixer API returns an error.""" + + def __init__(self, message: str, status_code: int, body: str | None = None) -> None: + super().__init__(message) + self.status_code = status_code + self.body = body diff --git a/packages/datacommons-mcp/datacommons_mcp/mixer_client.py b/packages/datacommons-mcp/datacommons_mcp/mixer_client.py index 2e4c2f9..f320c22 100644 --- a/packages/datacommons-mcp/datacommons_mcp/mixer_client.py +++ b/packages/datacommons-mcp/datacommons_mcp/mixer_client.py @@ -15,10 +15,38 @@ Client for interacting with the Data Commons Mixer service. """ +import logging +import time +from collections.abc import Callable +from functools import wraps +from typing import Any # noqa: ANN401 + import httpx +from datacommons_mcp.exceptions import MixerAPIError from datacommons_mcp.version import __version__ +logger = logging.getLogger(__name__) + + +def log_api_call(func: Callable[..., Any]) -> Callable[..., Any]: # noqa: ANN401 + """Decorator to log URL, request payload, execution time, and errors for Mixer API calls.""" + @wraps(func) + async def wrapper(self: Any, endpoint: str, payload: dict, *args: Any, **kwargs: Any) -> Any: # noqa: ANN401 + url = f"{self.api_root}/{endpoint}" + logger.info("MixerClient POST request URL: %s, payload: %s", url, payload) + start_time = time.perf_counter() + try: + result = await func(self, endpoint, payload, *args, **kwargs) + elapsed_time = time.perf_counter() - start_time + logger.info("MixerClient POST request to %s completed in %.3f seconds", url, elapsed_time) + return result + except Exception as e: + elapsed_time = time.perf_counter() - start_time + logger.error("MixerClient POST request to %s failed after %.3f seconds with error: %s", url, elapsed_time, e) + raise + return wrapper + class MixerClient: """Async client for interacting with Mixer-side agent endpoints.""" @@ -48,6 +76,7 @@ def client(self) -> httpx.AsyncClient: self._client = httpx.AsyncClient(headers=self.headers, timeout=self.timeout) return self._client + @log_api_call async def post(self, endpoint: str, payload: dict) -> dict: """Perform an asynchronous POST request to the specified endpoint. @@ -59,9 +88,13 @@ async def post(self, endpoint: str, payload: dict) -> dict: The parsed JSON response as a dictionary. """ url = f"{self.api_root}/{endpoint}" - response = await self.client.post(url, json=payload) - response.raise_for_status() - return response.json() + try: + response = await self.client.post(url, json=payload) + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + error_msg = f"Mixer API call to {endpoint} failed with status {e.response.status_code}" + raise MixerAPIError(error_msg, e.response.status_code, e.response.text) from e async def close(self) -> None: """Close the underlying HTTP client if it was initialized.""" diff --git a/packages/datacommons-mcp/datacommons_mcp/mixer_service.py b/packages/datacommons-mcp/datacommons_mcp/mixer_service.py index 9a38f99..9f96a72 100644 --- a/packages/datacommons-mcp/datacommons_mcp/mixer_service.py +++ b/packages/datacommons-mcp/datacommons_mcp/mixer_service.py @@ -18,6 +18,16 @@ from typing import Any from datacommons_mcp.app import app +from datacommons_mcp.mixer_client import MixerClient + + +def _get_mixer_client() -> MixerClient: + """Helper to get the initialized MixerClient, raising RuntimeError if not set.""" + if app.mixer_client is None: + raise RuntimeError( + "Mixer client is not initialized. Ensure USE_MIXER_AGENT_APIS is enabled." + ) + return app.mixer_client async def get_observations( @@ -30,6 +40,7 @@ async def get_observations( date_range_end: str | None = None, ) -> dict[str, Any]: """Fetches observations via the Mixer-side agent/get_observations endpoint.""" + client = _get_mixer_client() payload = { "variable_dcid": variable_dcid, "place_dcid": place_dcid, @@ -39,7 +50,7 @@ async def get_observations( "date_range_start": date_range_start, "date_range_end": date_range_end, } - return await app.mixer_client.post("agent/get_observations", payload) + return await client.post("agent/get_observations", payload) async def search_indicators( @@ -47,9 +58,11 @@ async def search_indicators( places: list[str] | None = None, parent_place: str | None = None, per_search_limit: int = 10, + *, include_topics: bool = True, ) -> dict[str, Any]: """Searches for indicators via the Mixer-side agent/search_indicators endpoint.""" + client = _get_mixer_client() payload = { "query": query, "places": places or [], @@ -57,4 +70,4 @@ async def search_indicators( "per_search_limit": per_search_limit, "include_topics": include_topics, } - return await app.mixer_client.post("agent/search_indicators", payload) + return await client.post("agent/search_indicators", payload) diff --git a/packages/datacommons-mcp/datacommons_mcp/mixer_tools.py b/packages/datacommons-mcp/datacommons_mcp/mixer_tools.py new file mode 100644 index 0000000..457af47 --- /dev/null +++ b/packages/datacommons-mcp/datacommons_mcp/mixer_tools.py @@ -0,0 +1,63 @@ +# Copyright 2026 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Tool implementations for the Mixer-based Data Commons MCP server. +""" + +from datacommons_mcp.data_models.observations import ObservationDateType +from datacommons_mcp.mixer_service import ( + get_observations as mixer_get_observations, +) +from datacommons_mcp.mixer_service import ( + search_indicators as mixer_search_indicators, +) + + +async def get_observations( + variable_dcid: str, + place_dcid: str, + child_place_type: str | None = None, + source_override: str | None = None, + date: str = ObservationDateType.LATEST.value, + date_range_start: str | None = None, + date_range_end: str | None = None, +) -> dict: + """Fetches observations for a statistical variable from Data Commons.""" + return await mixer_get_observations( + variable_dcid=variable_dcid, + place_dcid=place_dcid, + child_place_type=child_place_type, + source_override=source_override, + date=date, + date_range_start=date_range_start, + date_range_end=date_range_end, + ) + + +async def search_indicators( + query: str, + places: list[str] | None = None, + parent_place: str | None = None, + per_search_limit: int = 10, + *, + include_topics: bool = True, +) -> dict: + """Searches for indicators (topics and variables) in Data Commons.""" + return await mixer_search_indicators( + query=query, + places=places, + parent_place=parent_place, + per_search_limit=per_search_limit, + include_topics=include_topics, + ) diff --git a/packages/datacommons-mcp/datacommons_mcp/server.py b/packages/datacommons-mcp/datacommons_mcp/server.py index 37f425e..883c87a 100644 --- a/packages/datacommons-mcp/datacommons_mcp/server.py +++ b/packages/datacommons-mcp/datacommons_mcp/server.py @@ -20,6 +20,7 @@ from starlette.requests import Request from starlette.responses import JSONResponse +import datacommons_mcp.mixer_tools as mixer_tools import datacommons_mcp.tools as tools from datacommons_mcp.app import app from datacommons_mcp.version import __version__ @@ -40,8 +41,8 @@ async def health_check(request: Request) -> JSONResponse: # noqa: ARG001 reques # Register tools if app.settings.use_mixer_agent_apis: - app.register_tool(tools.get_observations, "tools/mixer/get_observations.md") - app.register_tool(tools.search_indicators, "tools/mixer/search_indicators.md") + app.register_tool(mixer_tools.get_observations, "tools/mixer/get_observations.md") + app.register_tool(mixer_tools.search_indicators, "tools/mixer/search_indicators.md") else: app.register_tool(tools.get_observations, tools.GET_OBSERVATIONS_INSTRUCTION_FILE) app.register_tool(tools.search_indicators, tools.SEARCH_INDICATORS_INSTRUCTION_FILE) diff --git a/packages/datacommons-mcp/datacommons_mcp/tools.py b/packages/datacommons-mcp/datacommons_mcp/tools.py index fc9a207..5c763c2 100644 --- a/packages/datacommons-mcp/datacommons_mcp/tools.py +++ b/packages/datacommons-mcp/datacommons_mcp/tools.py @@ -28,10 +28,6 @@ from datacommons_mcp.services import ( search_indicators as search_indicators_service, ) -from datacommons_mcp.mixer_service import ( - get_observations as mixer_get_observations, - search_indicators as mixer_search_indicators, -) if TYPE_CHECKING: from datacommons_mcp.data_models.search import ( @@ -54,17 +50,6 @@ async def get_observations( date_range_end: str | None = None, ) -> dict: """Fetches observations for a statistical variable from Data Commons.""" - if app.settings.use_mixer_agent_apis: - return await mixer_get_observations( - variable_dcid=variable_dcid, - place_dcid=place_dcid, - child_place_type=child_place_type, - source_override=source_override, - date=date, - date_range_start=date_range_start, - date_range_end=date_range_end, - ) - response: ObservationToolResponse = await get_observations_service( client=app.client, variable_dcid=variable_dcid, @@ -89,15 +74,6 @@ async def search_indicators( maybe_bilateral: bool = False, ) -> dict: """Searches for indicators (topics and variables) in Data Commons.""" - if app.settings.use_mixer_agent_apis: - return await mixer_search_indicators( - query=query, - places=places, - parent_place=parent_place, - per_search_limit=per_search_limit, - include_topics=include_topics, - ) - response: SearchResponse = await search_indicators_service( client=app.client, query=query, diff --git a/packages/datacommons-mcp/tests/test_mixer_service.py b/packages/datacommons-mcp/tests/test_mixer_service.py index 599f524..ac367ff 100644 --- a/packages/datacommons-mcp/tests/test_mixer_service.py +++ b/packages/datacommons-mcp/tests/test_mixer_service.py @@ -17,11 +17,17 @@ from unittest.mock import AsyncMock, MagicMock, patch +import httpx import pytest - from datacommons_mcp.app import app from datacommons_mcp.mixer_client import MixerClient from datacommons_mcp.mixer_service import get_observations, search_indicators +from datacommons_mcp.mixer_tools import ( + get_observations as mixer_tools_get_obs, +) +from datacommons_mcp.mixer_tools import ( + search_indicators as mixer_tools_search_ind, +) from datacommons_mcp.tools import ( get_observations as tools_get_obs, ) @@ -111,52 +117,77 @@ async def test_mixer_service_search_indicators(): @pytest.mark.asyncio -async def test_tools_routing_mixer_enabled(): - """Verify tool functions delegate to mixer_service when feature flag is enabled.""" - with patch.object(app.settings, "use_mixer_agent_apis", True): - with patch("datacommons_mcp.tools.mixer_get_observations", new_callable=AsyncMock) as mock_mixer_get_obs: - mock_mixer_get_obs.return_value = {"mixer_obs": True} - result = await tools_get_obs( - variable_dcid="Count_Person", - place_dcid="geoId/06" - ) - assert result == {"mixer_obs": True} - mock_mixer_get_obs.assert_called_once() - - with patch("datacommons_mcp.tools.mixer_search_indicators", new_callable=AsyncMock) as mock_mixer_search_ind: - mock_mixer_search_ind.return_value = {"mixer_search": True} - result = await tools_search_ind( - query="unemployment", - places=["California"] - ) - assert result == {"mixer_search": True} - mock_mixer_search_ind.assert_called_once() +async def test_mixer_tools_execution(): + """Verify mixer_tools functions delegate to mixer_service.""" + with patch("datacommons_mcp.mixer_tools.mixer_get_observations", new_callable=AsyncMock) as mock_mixer_get_obs: + mock_mixer_get_obs.return_value = {"mixer_obs": True} + result = await mixer_tools_get_obs( + variable_dcid="Count_Person", + place_dcid="geoId/06" + ) + assert result == {"mixer_obs": True} + mock_mixer_get_obs.assert_called_once() + + with patch("datacommons_mcp.mixer_tools.mixer_search_indicators", new_callable=AsyncMock) as mock_mixer_search_ind: + mock_mixer_search_ind.return_value = {"mixer_search": True} + result = await mixer_tools_search_ind( + query="unemployment", + places=["California"] + ) + assert result == {"mixer_search": True} + mock_mixer_search_ind.assert_called_once() + + +@pytest.mark.asyncio +async def test_local_tools_execution(): + """Verify tool functions delegate to old local services.""" + with patch("datacommons_mcp.tools.get_observations_service", new_callable=AsyncMock) as mock_local_get_obs: + mock_response = MagicMock() + mock_response.model_dump.return_value = {"local_obs": True} + mock_local_get_obs.return_value = mock_response + + result = await tools_get_obs( + variable_dcid="Count_Person", + place_dcid="geoId/06" + ) + assert result == {"local_obs": True} + mock_local_get_obs.assert_called_once() + + with patch("datacommons_mcp.tools.search_indicators_service", new_callable=AsyncMock) as mock_local_search_ind: + mock_response = MagicMock() + mock_response.model_dump.return_value = {"local_search": True} + mock_local_search_ind.return_value = mock_response + + result = await tools_search_ind( + query="unemployment", + places=["California"] + ) + assert result == {"local_search": True} + mock_local_search_ind.assert_called_once() @pytest.mark.asyncio -async def test_tools_routing_mixer_disabled(): - """Verify tool functions delegate to old local services when feature flag is disabled.""" - with patch.object(app.settings, "use_mixer_agent_apis", False): - with patch("datacommons_mcp.tools.get_observations_service", new_callable=AsyncMock) as mock_local_get_obs: - mock_response = MagicMock() - mock_response.model_dump.return_value = {"local_obs": True} - mock_local_get_obs.return_value = mock_response - - result = await tools_get_obs( - variable_dcid="Count_Person", - place_dcid="geoId/06" - ) - assert result == {"local_obs": True} - mock_local_get_obs.assert_called_once() - - with patch("datacommons_mcp.tools.search_indicators_service", new_callable=AsyncMock) as mock_local_search_ind: - mock_response = MagicMock() - mock_response.model_dump.return_value = {"local_search": True} - mock_local_search_ind.return_value = mock_response - - result = await tools_search_ind( - query="unemployment", - places=["California"] - ) - assert result == {"local_search": True} - mock_local_search_ind.assert_called_once() +async def test_mixer_client_post_error(): + """Verify that MixerClient.post raises MixerAPIError and extracts details on failure.""" + client = MixerClient(api_root="https://api.datacommons.org/v2") + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.text = '{"message": "Internal error"}' + mock_response.json.return_value = {"message": "Internal error"} + + # Helper function to raise the error + def raise_status_error(): + raise httpx.HTTPStatusError("Internal Server Error", request=MagicMock(), response=mock_response) + mock_response.raise_for_status = raise_status_error + + with patch.object(client.client, "post", return_value=mock_response): + from datacommons_mcp.exceptions import MixerAPIError + with pytest.raises(MixerAPIError) as exc_info: + await client.post("agent/test", {}) + assert exc_info.value.status_code == 500 + err_msg = str(exc_info.value) + assert "agent/test" in err_msg + assert "500" in err_msg + assert exc_info.value.body == '{"message": "Internal error"}' + + await client.close() From d7d95a393a1730d91166d394af7213f620edbede Mon Sep 17 00:00:00 2001 From: Keyur Shah Date: Tue, 9 Jun 2026 15:11:14 -0700 Subject: [PATCH 3/6] Format and lint --- .../datacommons_mcp/data_models/settings.py | 1 - .../datacommons_mcp/mixer_client.py | 27 ++++++++-- .../tests/test_mixer_service.py | 50 +++++++++++-------- 3 files changed, 52 insertions(+), 26 deletions(-) diff --git a/packages/datacommons-mcp/datacommons_mcp/data_models/settings.py b/packages/datacommons-mcp/datacommons_mcp/data_models/settings.py index 942a398..c6ca1cf 100644 --- a/packages/datacommons-mcp/datacommons_mcp/data_models/settings.py +++ b/packages/datacommons-mcp/datacommons_mcp/data_models/settings.py @@ -89,7 +89,6 @@ def __init__(self, **kwargs: dict[str, Any]) -> None: description="List of root topic DCIDs for base DC", ) - @field_validator("topic_cache_paths", "base_root_topic_dcids", mode="before") @classmethod def parse_list_like_parameter(cls, v: str) -> list[str] | None: diff --git a/packages/datacommons-mcp/datacommons_mcp/mixer_client.py b/packages/datacommons-mcp/datacommons_mcp/mixer_client.py index f320c22..3d6e2f3 100644 --- a/packages/datacommons-mcp/datacommons_mcp/mixer_client.py +++ b/packages/datacommons-mcp/datacommons_mcp/mixer_client.py @@ -31,20 +31,37 @@ def log_api_call(func: Callable[..., Any]) -> Callable[..., Any]: # noqa: ANN401 """Decorator to log URL, request payload, execution time, and errors for Mixer API calls.""" + @wraps(func) - async def wrapper(self: Any, endpoint: str, payload: dict, *args: Any, **kwargs: Any) -> Any: # noqa: ANN401 + async def wrapper( + self: "MixerClient", + endpoint: str, + payload: dict, + *args: object, + **kwargs: object, + ) -> Any: # noqa: ANN401 url = f"{self.api_root}/{endpoint}" logger.info("MixerClient POST request URL: %s, payload: %s", url, payload) start_time = time.perf_counter() try: result = await func(self, endpoint, payload, *args, **kwargs) elapsed_time = time.perf_counter() - start_time - logger.info("MixerClient POST request to %s completed in %.3f seconds", url, elapsed_time) + logger.info( + "MixerClient POST request to %s completed in %.3f seconds", + url, + elapsed_time, + ) return result except Exception as e: elapsed_time = time.perf_counter() - start_time - logger.error("MixerClient POST request to %s failed after %.3f seconds with error: %s", url, elapsed_time, e) + logger.error( + "MixerClient POST request to %s failed after %.3f seconds with error: %s", + url, + elapsed_time, + e, + ) raise + return wrapper @@ -94,7 +111,9 @@ async def post(self, endpoint: str, payload: dict) -> dict: return response.json() except httpx.HTTPStatusError as e: error_msg = f"Mixer API call to {endpoint} failed with status {e.response.status_code}" - raise MixerAPIError(error_msg, e.response.status_code, e.response.text) from e + raise MixerAPIError( + error_msg, e.response.status_code, e.response.text + ) from e async def close(self) -> None: """Close the underlying HTTP client if it was initialized.""" diff --git a/packages/datacommons-mcp/tests/test_mixer_service.py b/packages/datacommons-mcp/tests/test_mixer_service.py index ac367ff..61c2750 100644 --- a/packages/datacommons-mcp/tests/test_mixer_service.py +++ b/packages/datacommons-mcp/tests/test_mixer_service.py @@ -39,7 +39,9 @@ @pytest.mark.asyncio async def test_mixer_client_post(): """Verify MixerClient correctly sends payload and headers to the endpoint.""" - client = MixerClient(api_root="https://api.datacommons.org/v2", api_key="test-api-key") + client = MixerClient( + api_root="https://api.datacommons.org/v2", api_key="test-api-key" + ) assert client.headers["X-API-Key"] == "test-api-key" mock_response = MagicMock() @@ -52,7 +54,7 @@ async def test_mixer_client_post(): assert result == {"status": "SUCCESS", "data": "test"} mock_post.assert_called_once_with( "https://api.datacommons.org/v2/agent/test_endpoint", - json={"param": "value"} + json={"param": "value"}, ) await client.close() @@ -72,7 +74,7 @@ async def test_mixer_service_get_observations(): source_override="USCensus", date="latest", date_range_start="2020", - date_range_end="2022" + date_range_end="2022", ) assert result == {"placeObservations": []} mock_client.post.assert_called_once_with( @@ -85,7 +87,7 @@ async def test_mixer_service_get_observations(): "date": "latest", "date_range_start": "2020", "date_range_end": "2022", - } + }, ) @@ -101,7 +103,7 @@ async def test_mixer_service_search_indicators(): places=["California"], parent_place="USA", per_search_limit=5, - include_topics=False + include_topics=False, ) assert result == {"variables": []} mock_client.post.assert_called_once_with( @@ -112,27 +114,29 @@ async def test_mixer_service_search_indicators(): "parent_place": "USA", "per_search_limit": 5, "include_topics": False, - } + }, ) @pytest.mark.asyncio async def test_mixer_tools_execution(): """Verify mixer_tools functions delegate to mixer_service.""" - with patch("datacommons_mcp.mixer_tools.mixer_get_observations", new_callable=AsyncMock) as mock_mixer_get_obs: + with patch( + "datacommons_mcp.mixer_tools.mixer_get_observations", new_callable=AsyncMock + ) as mock_mixer_get_obs: mock_mixer_get_obs.return_value = {"mixer_obs": True} result = await mixer_tools_get_obs( - variable_dcid="Count_Person", - place_dcid="geoId/06" + variable_dcid="Count_Person", place_dcid="geoId/06" ) assert result == {"mixer_obs": True} mock_mixer_get_obs.assert_called_once() - with patch("datacommons_mcp.mixer_tools.mixer_search_indicators", new_callable=AsyncMock) as mock_mixer_search_ind: + with patch( + "datacommons_mcp.mixer_tools.mixer_search_indicators", new_callable=AsyncMock + ) as mock_mixer_search_ind: mock_mixer_search_ind.return_value = {"mixer_search": True} result = await mixer_tools_search_ind( - query="unemployment", - places=["California"] + query="unemployment", places=["California"] ) assert result == {"mixer_search": True} mock_mixer_search_ind.assert_called_once() @@ -141,27 +145,27 @@ async def test_mixer_tools_execution(): @pytest.mark.asyncio async def test_local_tools_execution(): """Verify tool functions delegate to old local services.""" - with patch("datacommons_mcp.tools.get_observations_service", new_callable=AsyncMock) as mock_local_get_obs: + with patch( + "datacommons_mcp.tools.get_observations_service", new_callable=AsyncMock + ) as mock_local_get_obs: mock_response = MagicMock() mock_response.model_dump.return_value = {"local_obs": True} mock_local_get_obs.return_value = mock_response result = await tools_get_obs( - variable_dcid="Count_Person", - place_dcid="geoId/06" + variable_dcid="Count_Person", place_dcid="geoId/06" ) assert result == {"local_obs": True} mock_local_get_obs.assert_called_once() - with patch("datacommons_mcp.tools.search_indicators_service", new_callable=AsyncMock) as mock_local_search_ind: + with patch( + "datacommons_mcp.tools.search_indicators_service", new_callable=AsyncMock + ) as mock_local_search_ind: mock_response = MagicMock() mock_response.model_dump.return_value = {"local_search": True} mock_local_search_ind.return_value = mock_response - result = await tools_search_ind( - query="unemployment", - places=["California"] - ) + result = await tools_search_ind(query="unemployment", places=["California"]) assert result == {"local_search": True} mock_local_search_ind.assert_called_once() @@ -177,11 +181,15 @@ async def test_mixer_client_post_error(): # Helper function to raise the error def raise_status_error(): - raise httpx.HTTPStatusError("Internal Server Error", request=MagicMock(), response=mock_response) + raise httpx.HTTPStatusError( + "Internal Server Error", request=MagicMock(), response=mock_response + ) + mock_response.raise_for_status = raise_status_error with patch.object(client.client, "post", return_value=mock_response): from datacommons_mcp.exceptions import MixerAPIError + with pytest.raises(MixerAPIError) as exc_info: await client.post("agent/test", {}) assert exc_info.value.status_code == 500 From 5ffb3371336e758bb48c93c3fe9c044f0eda45f2 Mon Sep 17 00:00:00 2001 From: Keyur Shah Date: Tue, 9 Jun 2026 15:14:59 -0700 Subject: [PATCH 4/6] Document USE_MIXER_AGENT_APIS in .env.sample --- packages/datacommons-mcp/.env.sample | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/datacommons-mcp/.env.sample b/packages/datacommons-mcp/.env.sample index cb6f226..af6c656 100644 --- a/packages/datacommons-mcp/.env.sample +++ b/packages/datacommons-mcp/.env.sample @@ -57,6 +57,10 @@ DC_TYPE=base # - tools/{tool_name}.md # DC_INSTRUCTIONS_DIR=/path/to/custom/instructions +# Enable using Mixer-side agent endpoints instead of local processing (optional) +# Default: false +# USE_MIXER_AGENT_APIS=false + # ============================================================================= # NON-PROD ROOTS (optional, base DC only) From 83b383ecfa431bb922736f6b846fe684e37b4090 Mon Sep 17 00:00:00 2001 From: Keyur Shah Date: Tue, 9 Jun 2026 15:31:57 -0700 Subject: [PATCH 5/6] Use real place in tool instructions --- .../instructions/tools/mixer/search_indicators.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/search_indicators.md b/packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/search_indicators.md index e435be1..7eaf500 100644 --- a/packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/search_indicators.md +++ b/packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/search_indicators.md @@ -100,7 +100,7 @@ The following rules apply to **both** the `places` and `parent_place` parameters - **Example 3: Administrative Level Sampling** - **Goal:** Check data availability for different administrative levels (e.g., "population of US cities" vs "population of US states"). - **Call:** - * **For Cities:** `query="population"`, `parent_place="Middle Earth"`, `places=["New York City, USA", "Los Angeles, USA", "Chicago, USA", "Houston, USA", "Phoenix, USA"]` + * **For Cities:** `query="population"`, `parent_place="USA"`, `places=["New York City, USA", "Los Angeles, USA", "Chicago, USA", "Houston, USA", "Phoenix, USA"]` * **For States:** `query="population"`, `parent_place="USA"`, `places=["California, USA", "Texas, USA", "Florida, USA", "New York State, USA", "Pennsylvania, USA"]` - **Logic:** Specify the exact administrative level you want to sample to avoid confusion between city and state data. From 5f7165a8020b9a611d60d1c1915fc5cb79c759fb Mon Sep 17 00:00:00 2001 From: Keyur Shah Date: Thu, 11 Jun 2026 14:59:08 -0700 Subject: [PATCH 6/6] Rename Mixer to Agent API --- packages/datacommons-mcp/.env.sample | 4 +- .../{mixer_client.py => agent_api_client.py} | 24 +++--- ...{mixer_service.py => agent_api_service.py} | 22 +++--- .../{mixer_tools.py => agent_api_tools.py} | 16 ++-- .../datacommons-mcp/datacommons_mcp/app.py | 20 ++--- .../datacommons_mcp/data_models/settings.py | 6 +- .../datacommons_mcp/exceptions.py | 4 +- .../{mixer => agent_api}/get_observations.md | 2 +- .../{mixer => agent_api}/search_indicators.md | 2 +- .../datacommons-mcp/datacommons_mcp/server.py | 12 ++- ...est_mixer_service.py => test_agent_api.py} | 74 ++++++++++--------- 11 files changed, 96 insertions(+), 90 deletions(-) rename packages/datacommons-mcp/datacommons_mcp/{mixer_client.py => agent_api_client.py} (84%) rename packages/datacommons-mcp/datacommons_mcp/{mixer_service.py => agent_api_service.py} (74%) rename packages/datacommons-mcp/datacommons_mcp/{mixer_tools.py => agent_api_tools.py} (82%) rename packages/datacommons-mcp/datacommons_mcp/instructions/tools/{mixer => agent_api}/get_observations.md (98%) rename packages/datacommons-mcp/datacommons_mcp/instructions/tools/{mixer => agent_api}/search_indicators.md (99%) rename packages/datacommons-mcp/tests/{test_mixer_service.py => test_agent_api.py} (73%) diff --git a/packages/datacommons-mcp/.env.sample b/packages/datacommons-mcp/.env.sample index af6c656..18cbdfe 100644 --- a/packages/datacommons-mcp/.env.sample +++ b/packages/datacommons-mcp/.env.sample @@ -57,9 +57,9 @@ DC_TYPE=base # - tools/{tool_name}.md # DC_INSTRUCTIONS_DIR=/path/to/custom/instructions -# Enable using Mixer-side agent endpoints instead of local processing (optional) +# Enable using Agent-optimized APIs instead of local processing (optional) # Default: false -# USE_MIXER_AGENT_APIS=false +# DC_USE_AGENT_API=false # ============================================================================= diff --git a/packages/datacommons-mcp/datacommons_mcp/mixer_client.py b/packages/datacommons-mcp/datacommons_mcp/agent_api_client.py similarity index 84% rename from packages/datacommons-mcp/datacommons_mcp/mixer_client.py rename to packages/datacommons-mcp/datacommons_mcp/agent_api_client.py index 3d6e2f3..70d6ae0 100644 --- a/packages/datacommons-mcp/datacommons_mcp/mixer_client.py +++ b/packages/datacommons-mcp/datacommons_mcp/agent_api_client.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -Client for interacting with the Data Commons Mixer service. +Client for interacting with the Data Commons Agent API. """ import logging @@ -23,31 +23,31 @@ import httpx -from datacommons_mcp.exceptions import MixerAPIError +from datacommons_mcp.exceptions import AgentAPIError from datacommons_mcp.version import __version__ logger = logging.getLogger(__name__) def log_api_call(func: Callable[..., Any]) -> Callable[..., Any]: # noqa: ANN401 - """Decorator to log URL, request payload, execution time, and errors for Mixer API calls.""" + """Decorator to log URL, request payload, execution time, and errors for Agent API calls.""" @wraps(func) async def wrapper( - self: "MixerClient", + self: "AgentAPIClient", endpoint: str, payload: dict, *args: object, **kwargs: object, ) -> Any: # noqa: ANN401 url = f"{self.api_root}/{endpoint}" - logger.info("MixerClient POST request URL: %s, payload: %s", url, payload) + logger.info("AgentAPIClient POST request URL: %s, payload: %s", url, payload) start_time = time.perf_counter() try: result = await func(self, endpoint, payload, *args, **kwargs) elapsed_time = time.perf_counter() - start_time logger.info( - "MixerClient POST request to %s completed in %.3f seconds", + "AgentAPIClient POST request to %s completed in %.3f seconds", url, elapsed_time, ) @@ -55,7 +55,7 @@ async def wrapper( except Exception as e: elapsed_time = time.perf_counter() - start_time logger.error( - "MixerClient POST request to %s failed after %.3f seconds with error: %s", + "AgentAPIClient POST request to %s failed after %.3f seconds with error: %s", url, elapsed_time, e, @@ -65,11 +65,11 @@ async def wrapper( return wrapper -class MixerClient: - """Async client for interacting with Mixer-side agent endpoints.""" +class AgentAPIClient: + """Async client for interacting with Data Commons agent endpoints.""" def __init__(self, api_root: str, api_key: str | None = None) -> None: - """Initialize the MixerClient. + """Initialize the AgentAPIClient. Args: api_root: The base API root URL (e.g. 'https://api.datacommons.org/v2'). @@ -110,8 +110,8 @@ async def post(self, endpoint: str, payload: dict) -> dict: response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: - error_msg = f"Mixer API call to {endpoint} failed with status {e.response.status_code}" - raise MixerAPIError( + error_msg = f"Agent API call to {endpoint} failed with status {e.response.status_code}" + raise AgentAPIError( error_msg, e.response.status_code, e.response.text ) from e diff --git a/packages/datacommons-mcp/datacommons_mcp/mixer_service.py b/packages/datacommons-mcp/datacommons_mcp/agent_api_service.py similarity index 74% rename from packages/datacommons-mcp/datacommons_mcp/mixer_service.py rename to packages/datacommons-mcp/datacommons_mcp/agent_api_service.py index 9f96a72..df4f898 100644 --- a/packages/datacommons-mcp/datacommons_mcp/mixer_service.py +++ b/packages/datacommons-mcp/datacommons_mcp/agent_api_service.py @@ -12,22 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -Service layer for calling Mixer-side agent APIs. +Service layer for calling Agent APIs. """ from typing import Any +from datacommons_mcp.agent_api_client import AgentAPIClient from datacommons_mcp.app import app -from datacommons_mcp.mixer_client import MixerClient -def _get_mixer_client() -> MixerClient: - """Helper to get the initialized MixerClient, raising RuntimeError if not set.""" - if app.mixer_client is None: +def _get_agent_api_client() -> AgentAPIClient: + """Helper to get the initialized AgentAPIClient, raising RuntimeError if not set.""" + if app.agent_api_client is None: raise RuntimeError( - "Mixer client is not initialized. Ensure USE_MIXER_AGENT_APIS is enabled." + "Agent API client is not initialized. Ensure DC_USE_AGENT_API is enabled." ) - return app.mixer_client + return app.agent_api_client async def get_observations( @@ -39,8 +39,8 @@ async def get_observations( date_range_start: str | None = None, date_range_end: str | None = None, ) -> dict[str, Any]: - """Fetches observations via the Mixer-side agent/get_observations endpoint.""" - client = _get_mixer_client() + """Fetches observations via the Agent API agent/get_observations endpoint.""" + client = _get_agent_api_client() payload = { "variable_dcid": variable_dcid, "place_dcid": place_dcid, @@ -61,8 +61,8 @@ async def search_indicators( *, include_topics: bool = True, ) -> dict[str, Any]: - """Searches for indicators via the Mixer-side agent/search_indicators endpoint.""" - client = _get_mixer_client() + """Searches for indicators via the Agent API agent/search_indicators endpoint.""" + client = _get_agent_api_client() payload = { "query": query, "places": places or [], diff --git a/packages/datacommons-mcp/datacommons_mcp/mixer_tools.py b/packages/datacommons-mcp/datacommons_mcp/agent_api_tools.py similarity index 82% rename from packages/datacommons-mcp/datacommons_mcp/mixer_tools.py rename to packages/datacommons-mcp/datacommons_mcp/agent_api_tools.py index 457af47..feaaf5e 100644 --- a/packages/datacommons-mcp/datacommons_mcp/mixer_tools.py +++ b/packages/datacommons-mcp/datacommons_mcp/agent_api_tools.py @@ -12,16 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -Tool implementations for the Mixer-based Data Commons MCP server. +Tool implementations for the Agent API-based Data Commons MCP server. """ -from datacommons_mcp.data_models.observations import ObservationDateType -from datacommons_mcp.mixer_service import ( - get_observations as mixer_get_observations, +from datacommons_mcp.agent_api_service import ( + get_observations as agent_api_get_observations, ) -from datacommons_mcp.mixer_service import ( - search_indicators as mixer_search_indicators, +from datacommons_mcp.agent_api_service import ( + search_indicators as agent_api_search_indicators, ) +from datacommons_mcp.data_models.observations import ObservationDateType async def get_observations( @@ -34,7 +34,7 @@ async def get_observations( date_range_end: str | None = None, ) -> dict: """Fetches observations for a statistical variable from Data Commons.""" - return await mixer_get_observations( + return await agent_api_get_observations( variable_dcid=variable_dcid, place_dcid=place_dcid, child_place_type=child_place_type, @@ -54,7 +54,7 @@ async def search_indicators( include_topics: bool = True, ) -> dict: """Searches for indicators (topics and variables) in Data Commons.""" - return await mixer_search_indicators( + return await agent_api_search_indicators( query=query, places=places, parent_place=parent_place, diff --git a/packages/datacommons-mcp/datacommons_mcp/app.py b/packages/datacommons-mcp/datacommons_mcp/app.py index 84976bf..0b607b7 100644 --- a/packages/datacommons-mcp/datacommons_mcp/app.py +++ b/packages/datacommons-mcp/datacommons_mcp/app.py @@ -25,8 +25,8 @@ from pydantic import ValidationError from datacommons_mcp import settings +from datacommons_mcp.agent_api_client import AgentAPIClient from datacommons_mcp.clients import create_dc_client -from datacommons_mcp.mixer_client import MixerClient from datacommons_mcp.utils import read_external_content, read_package_content from datacommons_mcp.version import __version__ @@ -55,21 +55,21 @@ def __init__(self) -> None: logger.error("Settings error: %s", e) raise - # Create client only if mixer APIs are NOT enabled (as fallback is not needed) + # Create client only if agent APIs are NOT enabled (as fallback is not needed) self.client = None - if not self.settings.use_mixer_agent_apis: + if not self.settings.use_agent_api: try: self.client = create_dc_client(self.settings) except Exception as e: logger.error("Failed to create DC client: %s", e) raise - # Create mixer client only if enabled - self.mixer_client = None - if self.settings.use_mixer_agent_apis: + # Create agent API client only if enabled + self.agent_api_client = None + if self.settings.use_agent_api: api_root = self.settings.api_root or "https://api.datacommons.org/v2" api_key = getattr(self.settings, "api_key", None) - self.mixer_client = MixerClient(api_root=api_root, api_key=api_key) + self.agent_api_client = AgentAPIClient(api_root=api_root, api_key=api_key) # Load Server Instructions server_instructions = self._load_instructions(SERVER_INSTRUCTIONS_FILE) @@ -79,9 +79,9 @@ def __init__(self) -> None: @asynccontextmanager async def lifespan(_server: FastMCP) -> AsyncIterator[dict[str, Any]]: yield {} - if self.mixer_client: - logger.info("Closing Mixer client...") - await self.mixer_client.close() + if self.agent_api_client: + logger.info("Closing Agent API client...") + await self.agent_api_client.close() self.mcp = FastMCP( MCP_SERVER_NAME, diff --git a/packages/datacommons-mcp/datacommons_mcp/data_models/settings.py b/packages/datacommons-mcp/datacommons_mcp/data_models/settings.py index c6ca1cf..bd4e2d1 100644 --- a/packages/datacommons-mcp/datacommons_mcp/data_models/settings.py +++ b/packages/datacommons-mcp/datacommons_mcp/data_models/settings.py @@ -53,10 +53,10 @@ class DCSettingsBase(BaseSettings): description="Directory containing custom instruction files (markdown overrides)", ) - use_mixer_agent_apis: bool = Field( + use_agent_api: bool = Field( default=False, - alias="USE_MIXER_AGENT_APIS", - description="Use the new Mixer-side agent endpoints instead of local processing", + alias="DC_USE_AGENT_API", + description="Use the new Agent-optimized APIs instead of local processing", ) api_root: str | None = Field( diff --git a/packages/datacommons-mcp/datacommons_mcp/exceptions.py b/packages/datacommons-mcp/datacommons_mcp/exceptions.py index 1c4d849..039b922 100644 --- a/packages/datacommons-mcp/datacommons_mcp/exceptions.py +++ b/packages/datacommons-mcp/datacommons_mcp/exceptions.py @@ -47,8 +47,8 @@ class InvalidAPIKeyError(APIKeyValidationError): """Raised when the API key is determined to be invalid.""" -class MixerAPIError(Exception): - """Raised when the Data Commons Mixer API returns an error.""" +class AgentAPIError(Exception): + """Raised when the Data Commons Agent API returns an error.""" def __init__(self, message: str, status_code: int, body: str | None = None) -> None: super().__init__(message) diff --git a/packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/get_observations.md b/packages/datacommons-mcp/datacommons_mcp/instructions/tools/agent_api/get_observations.md similarity index 98% rename from packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/get_observations.md rename to packages/datacommons-mcp/datacommons_mcp/instructions/tools/agent_api/get_observations.md index fb9f701..8e18c38 100644 --- a/packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/get_observations.md +++ b/packages/datacommons-mcp/datacommons_mcp/instructions/tools/agent_api/get_observations.md @@ -52,7 +52,7 @@ Args: date_range_end (str, optional): The end date for a range (inclusive). **Used only if `date` is set to'range'.** Returns: - The fetched observation data directly from Data Commons Mixer. + The fetched observation data directly from Data Commons. The response format contains: - `variable`: Details about the statistical variable requested (dcid, name, typeOf). - `resolvedParentPlace`: The resolved node information for the parent place, if one was provided. diff --git a/packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/search_indicators.md b/packages/datacommons-mcp/datacommons_mcp/instructions/tools/agent_api/search_indicators.md similarity index 99% rename from packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/search_indicators.md rename to packages/datacommons-mcp/datacommons_mcp/instructions/tools/agent_api/search_indicators.md index 7eaf500..9d6530d 100644 --- a/packages/datacommons-mcp/datacommons_mcp/instructions/tools/mixer/search_indicators.md +++ b/packages/datacommons-mcp/datacommons_mcp/instructions/tools/agent_api/search_indicators.md @@ -146,7 +146,7 @@ The following rules apply to **both** the `places` and `parent_place` parameters ### Response Structure -Returns a dictionary containing candidate indicators directly from the Data Commons Mixer. +Returns a dictionary containing candidate indicators directly from Data Commons. ```json { "status": "SUCCESS", diff --git a/packages/datacommons-mcp/datacommons_mcp/server.py b/packages/datacommons-mcp/datacommons_mcp/server.py index 883c87a..b4cb573 100644 --- a/packages/datacommons-mcp/datacommons_mcp/server.py +++ b/packages/datacommons-mcp/datacommons_mcp/server.py @@ -20,7 +20,7 @@ from starlette.requests import Request from starlette.responses import JSONResponse -import datacommons_mcp.mixer_tools as mixer_tools +import datacommons_mcp.agent_api_tools as agent_api_tools import datacommons_mcp.tools as tools from datacommons_mcp.app import app from datacommons_mcp.version import __version__ @@ -40,9 +40,13 @@ async def health_check(request: Request) -> JSONResponse: # noqa: ARG001 reques # Register tools -if app.settings.use_mixer_agent_apis: - app.register_tool(mixer_tools.get_observations, "tools/mixer/get_observations.md") - app.register_tool(mixer_tools.search_indicators, "tools/mixer/search_indicators.md") +if app.settings.use_agent_api: + app.register_tool( + agent_api_tools.get_observations, "tools/agent_api/get_observations.md" + ) + app.register_tool( + agent_api_tools.search_indicators, "tools/agent_api/search_indicators.md" + ) else: app.register_tool(tools.get_observations, tools.GET_OBSERVATIONS_INSTRUCTION_FILE) app.register_tool(tools.search_indicators, tools.SEARCH_INDICATORS_INSTRUCTION_FILE) diff --git a/packages/datacommons-mcp/tests/test_mixer_service.py b/packages/datacommons-mcp/tests/test_agent_api.py similarity index 73% rename from packages/datacommons-mcp/tests/test_mixer_service.py rename to packages/datacommons-mcp/tests/test_agent_api.py index 61c2750..1850f88 100644 --- a/packages/datacommons-mcp/tests/test_mixer_service.py +++ b/packages/datacommons-mcp/tests/test_agent_api.py @@ -12,22 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -Tests for MixerClient, mixer_service, and feature flag routing. +Tests for AgentAPIClient, agent_api_service, and routing. """ from unittest.mock import AsyncMock, MagicMock, patch import httpx import pytest -from datacommons_mcp.app import app -from datacommons_mcp.mixer_client import MixerClient -from datacommons_mcp.mixer_service import get_observations, search_indicators -from datacommons_mcp.mixer_tools import ( - get_observations as mixer_tools_get_obs, +from datacommons_mcp.agent_api_client import AgentAPIClient +from datacommons_mcp.agent_api_service import get_observations, search_indicators +from datacommons_mcp.agent_api_tools import ( + get_observations as agent_api_tools_get_obs, ) -from datacommons_mcp.mixer_tools import ( - search_indicators as mixer_tools_search_ind, +from datacommons_mcp.agent_api_tools import ( + search_indicators as agent_api_tools_search_ind, ) +from datacommons_mcp.app import app from datacommons_mcp.tools import ( get_observations as tools_get_obs, ) @@ -37,9 +37,9 @@ @pytest.mark.asyncio -async def test_mixer_client_post(): - """Verify MixerClient correctly sends payload and headers to the endpoint.""" - client = MixerClient( +async def test_agent_api_client_post(): + """Verify AgentAPIClient correctly sends payload and headers to the endpoint.""" + client = AgentAPIClient( api_root="https://api.datacommons.org/v2", api_key="test-api-key" ) assert client.headers["X-API-Key"] == "test-api-key" @@ -61,12 +61,12 @@ async def test_mixer_client_post(): @pytest.mark.asyncio -async def test_mixer_service_get_observations(): - """Verify get_observations builds correct payload and invokes mixer_client.""" +async def test_agent_api_service_get_observations(): + """Verify get_observations builds correct payload and invokes agent_api_client.""" mock_client = AsyncMock() mock_client.post.return_value = {"placeObservations": []} - with patch.object(app, "mixer_client", mock_client): + with patch.object(app, "agent_api_client", mock_client): result = await get_observations( variable_dcid="Count_Person", place_dcid="geoId/06", @@ -92,12 +92,12 @@ async def test_mixer_service_get_observations(): @pytest.mark.asyncio -async def test_mixer_service_search_indicators(): - """Verify search_indicators builds correct payload and invokes mixer_client.""" +async def test_agent_api_service_search_indicators(): + """Verify search_indicators builds correct payload and invokes agent_api_client.""" mock_client = AsyncMock() mock_client.post.return_value = {"variables": []} - with patch.object(app, "mixer_client", mock_client): + with patch.object(app, "agent_api_client", mock_client): result = await search_indicators( query="unemployment", places=["California"], @@ -119,27 +119,29 @@ async def test_mixer_service_search_indicators(): @pytest.mark.asyncio -async def test_mixer_tools_execution(): - """Verify mixer_tools functions delegate to mixer_service.""" +async def test_agent_api_tools_execution(): + """Verify agent_api_tools functions delegate to agent_api_service.""" with patch( - "datacommons_mcp.mixer_tools.mixer_get_observations", new_callable=AsyncMock - ) as mock_mixer_get_obs: - mock_mixer_get_obs.return_value = {"mixer_obs": True} - result = await mixer_tools_get_obs( + "datacommons_mcp.agent_api_tools.agent_api_get_observations", + new_callable=AsyncMock, + ) as mock_agent_api_get_obs: + mock_agent_api_get_obs.return_value = {"agent_api_obs": True} + result = await agent_api_tools_get_obs( variable_dcid="Count_Person", place_dcid="geoId/06" ) - assert result == {"mixer_obs": True} - mock_mixer_get_obs.assert_called_once() + assert result == {"agent_api_obs": True} + mock_agent_api_get_obs.assert_called_once() with patch( - "datacommons_mcp.mixer_tools.mixer_search_indicators", new_callable=AsyncMock - ) as mock_mixer_search_ind: - mock_mixer_search_ind.return_value = {"mixer_search": True} - result = await mixer_tools_search_ind( + "datacommons_mcp.agent_api_tools.agent_api_search_indicators", + new_callable=AsyncMock, + ) as mock_agent_api_search_ind: + mock_agent_api_search_ind.return_value = {"agent_api_search": True} + result = await agent_api_tools_search_ind( query="unemployment", places=["California"] ) - assert result == {"mixer_search": True} - mock_mixer_search_ind.assert_called_once() + assert result == {"agent_api_search": True} + mock_agent_api_search_ind.assert_called_once() @pytest.mark.asyncio @@ -171,9 +173,9 @@ async def test_local_tools_execution(): @pytest.mark.asyncio -async def test_mixer_client_post_error(): - """Verify that MixerClient.post raises MixerAPIError and extracts details on failure.""" - client = MixerClient(api_root="https://api.datacommons.org/v2") +async def test_agent_api_client_post_error(): + """Verify that AgentAPIClient.post raises AgentAPIError and extracts details on failure.""" + client = AgentAPIClient(api_root="https://api.datacommons.org/v2") mock_response = MagicMock() mock_response.status_code = 500 mock_response.text = '{"message": "Internal error"}' @@ -188,9 +190,9 @@ def raise_status_error(): mock_response.raise_for_status = raise_status_error with patch.object(client.client, "post", return_value=mock_response): - from datacommons_mcp.exceptions import MixerAPIError + from datacommons_mcp.exceptions import AgentAPIError - with pytest.raises(MixerAPIError) as exc_info: + with pytest.raises(AgentAPIError) as exc_info: await client.post("agent/test", {}) assert exc_info.value.status_code == 500 err_msg = str(exc_info.value)