Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/datacommons-mcp/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ DC_TYPE=base
# - tools/{tool_name}.md
# DC_INSTRUCTIONS_DIR=/path/to/custom/instructions

# Enable using Agent-optimized APIs instead of local processing (optional)
# Default: false
# DC_USE_AGENT_API=false


# =============================================================================
# NON-PROD ROOTS (optional, base DC only)
Expand Down
122 changes: 122 additions & 0 deletions packages/datacommons-mcp/datacommons_mcp/agent_api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Copyright 2026 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Client for interacting with the Data Commons Agent API.
"""

import logging
import time
from collections.abc import Callable
from functools import wraps
from typing import Any # noqa: ANN401

import httpx

from datacommons_mcp.exceptions import AgentAPIError
from datacommons_mcp.version import __version__

logger = logging.getLogger(__name__)


def log_api_call(func: Callable[..., Any]) -> Callable[..., Any]: # noqa: ANN401
"""Decorator to log URL, request payload, execution time, and errors for Agent API calls."""

@wraps(func)
async def wrapper(
self: "AgentAPIClient",
endpoint: str,
payload: dict,
*args: object,
**kwargs: object,
) -> Any: # noqa: ANN401
url = f"{self.api_root}/{endpoint}"
logger.info("AgentAPIClient POST request URL: %s, payload: %s", url, payload)
start_time = time.perf_counter()
try:
result = await func(self, endpoint, payload, *args, **kwargs)
elapsed_time = time.perf_counter() - start_time
logger.info(
"AgentAPIClient POST request to %s completed in %.3f seconds",
url,
elapsed_time,
)
return result
except Exception as e:
elapsed_time = time.perf_counter() - start_time
logger.error(
"AgentAPIClient POST request to %s failed after %.3f seconds with error: %s",
url,
elapsed_time,
e,
)
raise

return wrapper


class AgentAPIClient:
"""Async client for interacting with Data Commons agent endpoints."""

def __init__(self, api_root: str, api_key: str | None = None) -> None:
"""Initialize the AgentAPIClient.

Args:
api_root: The base API root URL (e.g. 'https://api.datacommons.org/v2').
api_key: Optional API key for authentication.
"""
self.api_root = api_root.rstrip("/")
self.api_key = api_key
self.headers = {
"Content-Type": "application/json",
"x-surface": f"mcp-{__version__}",
}
if api_key:
self.headers["X-API-Key"] = api_key
self.timeout = 30.0 # 30 seconds default timeout
self._client = None

@property
def client(self) -> httpx.AsyncClient:
"""Lazily initialize the AsyncClient under the active event loop."""
if self._client is None:
self._client = httpx.AsyncClient(headers=self.headers, timeout=self.timeout)
return self._client

@log_api_call
async def post(self, endpoint: str, payload: dict) -> dict:
"""Perform an asynchronous POST request to the specified endpoint.

Args:
endpoint: The API endpoint (e.g. 'agent/get_observations').
payload: The dictionary to send as JSON payload.

Returns:
The parsed JSON response as a dictionary.
"""
url = f"{self.api_root}/{endpoint}"
try:
response = await self.client.post(url, json=payload)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
error_msg = f"Agent API call to {endpoint} failed with status {e.response.status_code}"
raise AgentAPIError(
error_msg, e.response.status_code, e.response.text
) from e
Comment thread
keyurva marked this conversation as resolved.

async def close(self) -> None:
"""Close the underlying HTTP client if it was initialized."""
if self._client is not None:
await self._client.aclose()
self._client = None
73 changes: 73 additions & 0 deletions packages/datacommons-mcp/datacommons_mcp/agent_api_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright 2026 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Service layer for calling Agent APIs.
"""

from typing import Any

from datacommons_mcp.agent_api_client import AgentAPIClient
from datacommons_mcp.app import app


def _get_agent_api_client() -> AgentAPIClient:
"""Helper to get the initialized AgentAPIClient, raising RuntimeError if not set."""
if app.agent_api_client is None:
raise RuntimeError(
"Agent API client is not initialized. Ensure DC_USE_AGENT_API is enabled."
)
return app.agent_api_client


async def get_observations(
variable_dcid: str,
place_dcid: str,
child_place_type: str | None = None,
source_override: str | None = None,
date: str | None = None,
date_range_start: str | None = None,
date_range_end: str | None = None,
) -> dict[str, Any]:
"""Fetches observations via the Agent API agent/get_observations endpoint."""
client = _get_agent_api_client()
payload = {
"variable_dcid": variable_dcid,
"place_dcid": place_dcid,
"child_place_type": child_place_type,
"source_override": source_override,
"date": date,
"date_range_start": date_range_start,
"date_range_end": date_range_end,
}
return await client.post("agent/get_observations", payload)


async def search_indicators(
query: str,
places: list[str] | None = None,
parent_place: str | None = None,
per_search_limit: int = 10,
*,
include_topics: bool = True,
) -> dict[str, Any]:
"""Searches for indicators via the Agent API agent/search_indicators endpoint."""
client = _get_agent_api_client()
payload = {
"query": query,
"places": places or [],
"parent_place": parent_place,
"per_search_limit": per_search_limit,
"include_topics": include_topics,
}
return await client.post("agent/search_indicators", payload)
63 changes: 63 additions & 0 deletions packages/datacommons-mcp/datacommons_mcp/agent_api_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright 2026 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Tool implementations for the Agent API-based Data Commons MCP server.
"""

from datacommons_mcp.agent_api_service import (
get_observations as agent_api_get_observations,
)
from datacommons_mcp.agent_api_service import (
search_indicators as agent_api_search_indicators,
)
from datacommons_mcp.data_models.observations import ObservationDateType


async def get_observations(
variable_dcid: str,
place_dcid: str,
child_place_type: str | None = None,
source_override: str | None = None,
date: str = ObservationDateType.LATEST.value,
date_range_start: str | None = None,
date_range_end: str | None = None,
) -> dict:
"""Fetches observations for a statistical variable from Data Commons."""
return await agent_api_get_observations(
variable_dcid=variable_dcid,
place_dcid=place_dcid,
child_place_type=child_place_type,
source_override=source_override,
date=date,
date_range_start=date_range_start,
date_range_end=date_range_end,
)


async def search_indicators(
query: str,
places: list[str] | None = None,
parent_place: str | None = None,
per_search_limit: int = 10,
*,
include_topics: bool = True,
) -> dict:
"""Searches for indicators (topics and variables) in Data Commons."""
return await agent_api_search_indicators(
query=query,
places=places,
parent_place=parent_place,
per_search_limit=per_search_limit,
include_topics=include_topics,
)
34 changes: 27 additions & 7 deletions packages/datacommons-mcp/datacommons_mcp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

import json
import logging
from collections.abc import Callable
from collections.abc import AsyncIterator, Callable
from typing import Any

from fastmcp import FastMCP
from fastmcp.tools.tool import Tool
from pydantic import ValidationError

from datacommons_mcp import settings
from datacommons_mcp.agent_api_client import AgentAPIClient
from datacommons_mcp.clients import create_dc_client
from datacommons_mcp.utils import read_external_content, read_package_content
from datacommons_mcp.version import __version__
Expand Down Expand Up @@ -54,20 +55,39 @@ def __init__(self) -> None:
logger.error("Settings error: %s", e)
raise

# Create client
try:
self.client = create_dc_client(self.settings)
except Exception as e:
logger.error("Failed to create DC client: %s", e)
raise
# Create client only if agent APIs are NOT enabled (as fallback is not needed)
self.client = None
if not self.settings.use_agent_api:
try:
self.client = create_dc_client(self.settings)
except Exception as e:
logger.error("Failed to create DC client: %s", e)
raise

# Create agent API client only if enabled
self.agent_api_client = None
if self.settings.use_agent_api:
api_root = self.settings.api_root or "https://api.datacommons.org/v2"
api_key = getattr(self.settings, "api_key", None)
self.agent_api_client = AgentAPIClient(api_root=api_root, api_key=api_key)

# Load Server Instructions
server_instructions = self._load_instructions(SERVER_INSTRUCTIONS_FILE)

from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(_server: FastMCP) -> AsyncIterator[dict[str, Any]]:
yield {}
if self.agent_api_client:
logger.info("Closing Agent API client...")
await self.agent_api_client.close()

self.mcp = FastMCP(
MCP_SERVER_NAME,
version=__version__,
instructions=server_instructions,
lifespan=lifespan,
)

def _load_instructions(self, filename: str) -> str:
Expand Down
25 changes: 16 additions & 9 deletions packages/datacommons-mcp/datacommons_mcp/data_models/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class DCSettingsSelector(BaseSettings):
)


class DCSettings(BaseSettings):
"""Settings for base Data Commons instance."""
class DCSettingsBase(BaseSettings):
"""Base settings class containing shared configurations."""

model_config = _MODEL_CONFIG

Expand All @@ -53,8 +53,20 @@ class DCSettings(BaseSettings):
description="Directory containing custom instruction files (markdown overrides)",
)

use_agent_api: bool = Field(
default=False,
alias="DC_USE_AGENT_API",
description="Use the new Agent-optimized APIs instead of local processing",
)

api_root: str | None = Field(
default=None,
alias="DC_API_ROOT",
description="API root for Data Commons",
)


class BaseDCSettings(DCSettings):
class BaseDCSettings(DCSettingsBase):
"""Settings for base Data Commons instance."""

def __init__(self, **kwargs: dict[str, Any]) -> None:
Expand All @@ -76,19 +88,14 @@ def __init__(self, **kwargs: dict[str, Any]) -> None:
alias="DC_BASE_ROOT_TOPIC_DCIDS",
description="List of root topic DCIDs for base DC",
)
api_root: str | None = Field(
default=None,
alias="DC_API_ROOT",
description="API root for local api instance",
)

@field_validator("topic_cache_paths", "base_root_topic_dcids", mode="before")
@classmethod
def parse_list_like_parameter(cls, v: str) -> list[str] | None:
return _parse_list_like_parameter(v)


class CustomDCSettings(DCSettings):
class CustomDCSettings(DCSettingsBase):
"""Settings for custom Data Commons instance."""

model_config = _MODEL_CONFIG
Expand Down
Loading
Loading