-
Notifications
You must be signed in to change notification settings - Fork 3.2k
firestore session service added for session management #3540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Hariharan0309
wants to merge
28
commits into
google:main
Choose a base branch
from
Hariharan0309:firestore-session-service
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+411
−0
Open
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit
Hold shift + click to select a range
7b99e0b
firestore session service added for session management
Hariharan0309 6239843
Merge branch 'main' into firestore-session-service
hangfei d2081f3
Update src/google/adk/sessions/firestore_session_service.py
Hariharan0309 cdd2b18
Update src/google/adk/sessions/firestore_session_service.py
Hariharan0309 a2c2234
Update src/google/adk/sessions/firestore_session_service.py
Hariharan0309 a48ba7e
Merge branch 'main' into firestore-session-service
Hariharan0309 5e84dde
camel-case to snake_case in FirestoreSessionService.py
Hariharan0309 6096927
grounding metadata uncommented
Hariharan0309 042d47c
query added
Hariharan0309 8f4744a
pyink ran locally
Hariharan0309 84780eb
Merge branch 'main' into firestore-session-service
Hariharan0309 7c57c45
Merge branch 'main' into firestore-session-service
Hariharan0309 ca4e415
Merge branch 'main' into firestore-session-service
Hariharan0309 1a8e4c7
isort ran locally
Hariharan0309 1d67f27
Merge branch 'main' into firestore-session-service
Hariharan0309 ef82e50
firestore_session lazy loading done
Hariharan0309 1c048de
added the dependency in the pyproject.toml
Hariharan0309 107ef89
Merge branch 'main' into firestore-session-service
Hariharan0309 4e46847
Merge branch 'main' into firestore-session-service
Hariharan0309 9b63501
Merge branch 'main' into firestore-session-service
Hariharan0309 72d8718
Merge branch 'main' into firestore-session-service
Hariharan0309 db950b7
Merge branch 'main' into firestore-session-service
Hariharan0309 e58c37f
Merge branch 'main' into firestore-session-service
Hariharan0309 686f4e9
Merge branch 'main' into firestore-session-service
Hariharan0309 fa55a2d
Merge branch 'main' into firestore-session-service
rohityan f2ef02b
Merge branch 'main' into firestore-session-service
Hariharan0309 9f0f09f
Merge branch 'main' into firestore-session-service
Hariharan0309 84ef843
Merge branch 'main' into firestore-session-service
Hariharan0309 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,344 @@ | ||
| # Copyright 2025 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """ | ||
| Implements a session service using Google Cloud Firestore for storage. | ||
| This version uses the synchronous Firestore client to prevent event loop errors. | ||
| """ | ||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import logging | ||
| from typing import Any, Dict, Optional | ||
|
|
||
| from google.cloud import firestore | ||
| from google.cloud.firestore_v1.base_query import FieldFilter | ||
| from typing_extensions import override | ||
|
|
||
| from google.genai.types import Content | ||
|
|
||
| from ..events.event import Event | ||
| from ..events.event_actions import EventActions | ||
| from .base_session_service import BaseSessionService | ||
| from .base_session_service import GetSessionConfig | ||
| from .base_session_service import ListSessionsResponse | ||
| from .session import Session | ||
| from .state import State | ||
|
|
||
| logger = logging.getLogger("google_adk." + __name__) | ||
| # Set the level to INFO to make sure our logs are captured. | ||
| logger.setLevel(logging.INFO) | ||
|
|
||
| SESSIONS_COLLECTION = "adk_sessions" | ||
| EVENTS_SUBCOLLECTION = "events" | ||
|
|
||
|
|
||
| class FirestoreSessionService(BaseSessionService): | ||
| def __init__(self, project: Optional[str] = None, database: Optional[str] = None): | ||
| """Initializes the FirestoreSessionService with the synchronous client.""" | ||
| # Use the standard synchronous client instead of the AsyncClient | ||
| self._db = firestore.Client(project=project, database=database) | ||
|
|
||
| @override | ||
| async def create_session( | ||
| self, | ||
| *, | ||
| app_name: str, | ||
| user_id: str, | ||
| state: Optional[dict[str, Any]] = None, | ||
| session_id: Optional[str] = None, | ||
| ) -> Session: | ||
| """Creates a new session document in Firestore using a thread.""" | ||
| if session_id: | ||
| raise ValueError("User-provided session ID is not supported.") | ||
|
|
||
| def _create_in_firestore(): | ||
| session_data = { | ||
| "app_name": app_name, | ||
| "user_id": user_id, | ||
| "state": state or {}, | ||
| "createTime": firestore.SERVER_TIMESTAMP, | ||
| "updateTime": firestore.SERVER_TIMESTAMP, | ||
| } | ||
| # These are now synchronous calls | ||
| _, doc_ref = self._db.collection(SESSIONS_COLLECTION).add(session_data) | ||
| doc = doc_ref.get() | ||
| doc_dict = doc.to_dict() | ||
| return Session( | ||
| app_name=doc_dict["app_name"], | ||
| user_id=doc_dict["user_id"], | ||
| id=doc.id, | ||
| state=doc_dict.get("state", {}), | ||
| last_update_time=doc_dict["updateTime"].timestamp(), | ||
| ) | ||
|
|
||
| # Run the synchronous DB calls in a separate thread to not block the async server | ||
| return await asyncio.to_thread(_create_in_firestore) | ||
|
|
||
| @override | ||
| async def get_session( | ||
| self, | ||
| *, | ||
| app_name: str, | ||
| user_id: str, | ||
| session_id: str, | ||
| config: Optional[GetSessionConfig] = None, | ||
| ) -> Optional[Session]: | ||
| """Retrieves a session and its events from Firestore using a thread.""" | ||
|
|
||
| def _get_from_firestore(): | ||
| session_ref = self._db.collection(SESSIONS_COLLECTION).document(session_id) | ||
| session_doc = session_ref.get() | ||
|
|
||
| if not session_doc.exists: | ||
| return None | ||
|
|
||
| session_dict = session_doc.to_dict() | ||
| if ( | ||
| session_dict.get("app_name") != app_name | ||
| or session_dict.get("user_id") != user_id | ||
| ): | ||
| return None | ||
|
|
||
| update_timestamp = session_dict["updateTime"].timestamp() | ||
| session = Session( | ||
| app_name=session_dict["app_name"], | ||
| user_id=session_dict["user_id"], | ||
| id=session_doc.id, | ||
| state=session_dict.get("state", {}), | ||
| last_update_time=update_timestamp, | ||
| ) | ||
|
|
||
| # Fetch events without ordering from the database to avoid index requirements. | ||
| events_ref = session_ref.collection(EVENTS_SUBCOLLECTION) | ||
| event_docs = events_ref.stream() | ||
| events_list = [_from_firestore_doc_to_event(doc) for doc in event_docs] | ||
| # Sort the events in the application code instead. | ||
| events_list.sort(key=lambda e: e.timestamp) | ||
Hariharan0309 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| session.events = events_list | ||
|
|
||
| if config: | ||
| if config.num_recent_events: | ||
| session.events = session.events[-config.num_recent_events :] | ||
| elif config.after_timestamp: | ||
| session.events = [e for e in session.events if e.timestamp > config.after_timestamp] | ||
|
|
||
| return session | ||
|
|
||
| return await asyncio.to_thread(_get_from_firestore) | ||
|
|
||
| @override | ||
| async def list_sessions(self, *, app_name: str, user_id: str) -> ListSessionsResponse: | ||
| """Lists all sessions for a given user and app from Firestore using a thread.""" | ||
| def _list_from_firestore(): | ||
| query = self._db.collection(SESSIONS_COLLECTION).where( | ||
| filter=FieldFilter("app_name", "==", app_name) | ||
| ).where(filter=FieldFilter("user_id", "==", user_id)) | ||
|
|
||
| sessions = [] | ||
| for doc in query.stream(): | ||
| session_dict = doc.to_dict() | ||
| session = Session( | ||
| app_name=session_dict["app_name"], | ||
| user_id=session_dict["user_id"], | ||
| id=doc.id, | ||
| state=session_dict.get("state", {}), | ||
| last_update_time=session_dict["updateTime"].timestamp(), | ||
| ) | ||
| sessions.append(session) | ||
| return ListSessionsResponse(sessions=sessions) | ||
|
|
||
| return await asyncio.to_thread(_list_from_firestore) | ||
|
|
||
| @override | ||
| async def delete_session(self, *, app_name: str, user_id: str, session_id: str) -> None: | ||
| """Deletes a session and all its events from Firestore using a thread.""" | ||
| def _delete_in_firestore(): | ||
| session_ref = self._db.collection(SESSIONS_COLLECTION).document(session_id) | ||
| session_doc = session_ref.get(field_paths=["app_name", "user_id"]) | ||
| if not session_doc.exists or session_doc.to_dict().get("user_id") != user_id: | ||
| return | ||
Hariharan0309 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| events_ref = session_ref.collection(EVENTS_SUBCOLLECTION) | ||
| event_docs = events_ref.list_documents() | ||
|
|
||
| batch = self._db.batch() | ||
| for doc in event_docs: | ||
| batch.delete(doc) | ||
| batch.delete(session_ref) | ||
| batch.commit() | ||
|
|
||
| await asyncio.to_thread(_delete_in_firestore) | ||
|
|
||
| async def update_session_state(self, session_id: str, state_delta: Dict[str, Any]): | ||
| """Updates the state of a session document in Firestore.""" | ||
| def _update_in_firestore(): | ||
| session_ref = self._db.collection(SESSIONS_COLLECTION).document(session_id) | ||
| # Firestore's update with dot notation is perfect for this | ||
| update_data = {f"state.{key}": value for key, value in state_delta.items()} | ||
| update_data["updateTime"] = firestore.SERVER_TIMESTAMP | ||
| session_ref.update(update_data) | ||
|
|
||
| await asyncio.to_thread(_update_in_firestore) | ||
|
|
||
| @override | ||
| async def append_event(self, session: Session, event: Event) -> Event: | ||
| """Appends an event to the session's event subcollection in Firestore using a thread.""" | ||
| await super().append_event(session=session, event=event) | ||
Hariharan0309 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def _append_in_firestore(): | ||
| logger.info("Starting _append_in_firestore for session '%s'", session.id) | ||
| try: | ||
| # Use a batch for atomic writes. | ||
| batch = self._db.batch() | ||
|
|
||
| session_ref = self._db.collection(SESSIONS_COLLECTION).document(session.id) | ||
|
|
||
| # Create a new document for the event in the subcollection. | ||
| event_doc_ref = session_ref.collection(EVENTS_SUBCOLLECTION).document() | ||
| event.id = event_doc_ref.id # Assign the new ID to the event object | ||
|
|
||
| event_data_dict = _convert_event_to_json(event) | ||
| logger.info("Appending event data: %s", event_data_dict) | ||
|
|
||
| # Add the event creation to the batch. | ||
| batch.set(event_doc_ref, event_data_dict) | ||
|
|
||
| # Update the session document's timestamp. State is no longer saved here. | ||
| batch.update(session_ref, {"updateTime": firestore.SERVER_TIMESTAMP}) | ||
| # Commit the batch. | ||
| logger.info("Committing batch to Firestore for session '%s'...", session.id) | ||
| batch.commit() | ||
| logger.info("Batch committed successfully for session '%s'.", session.id) | ||
| except Exception as e: | ||
| # Log any exception that occurs during the process. | ||
| logger.error( | ||
| "!!! Exception in _append_in_firestore for session '%s': %s", | ||
| session.id, | ||
| e, | ||
| exc_info=True | ||
| ) | ||
|
|
||
| await asyncio.to_thread(_append_in_firestore) | ||
| return event | ||
|
|
||
| def _convert_event_to_json(event: Event) -> Dict[str, Any]: | ||
| """Serializes an Event object into a JSON-compatible dictionary.""" | ||
| metadata_json = { | ||
| 'partial': event.partial, | ||
| 'turn_complete': event.turn_complete, | ||
| 'interrupted': event.interrupted, | ||
| 'branch': event.branch, | ||
| 'long_running_tool_ids': ( | ||
| list(event.long_running_tool_ids) | ||
| if event.long_running_tool_ids | ||
| else None | ||
| ), | ||
| } | ||
| if event.grounding_metadata: | ||
| metadata_json['grounding_metadata'] = event.grounding_metadata.model_dump( | ||
| exclude_none=True, mode='json' | ||
| ) | ||
|
|
||
| event_json = { | ||
| 'author': event.author, | ||
| 'invocation_id': event.invocation_id, | ||
| 'timestamp': { | ||
| 'seconds': int(event.timestamp), | ||
| 'nanos': int( | ||
| (event.timestamp - int(event.timestamp)) * 1_000_000_000 | ||
| ), | ||
| }, | ||
| 'error_code': event.error_code, | ||
| 'error_message': event.error_message, | ||
| 'event_metadata': metadata_json, | ||
| } | ||
|
|
||
| if event.actions: | ||
| actions_json = { | ||
| 'skip_summarization': event.actions.skip_summarization, | ||
| 'state_delta': event.actions.state_delta, | ||
| 'artifact_delta': event.actions.artifact_delta, | ||
| 'transfer_agent': event.actions.transfer_to_agent, | ||
| 'escalate': event.actions.escalate, | ||
| 'requested_auth_configs': event.actions.requested_auth_configs, | ||
| } | ||
| event_json['actions'] = actions_json | ||
| if event.content: | ||
| content_dict = event.content.model_dump(exclude_none=True, mode='json') | ||
| if event.author == "user" and 'parts' in content_dict: | ||
| new_parts = [] | ||
| for part in content_dict['parts']: | ||
| # Check for inline_data which is how `Part.from_data` stores the PDF | ||
| if 'inline_data' in part and part['inline_data'].get('mime_type') == 'application/pdf': | ||
| # Replace the large PDF data with a placeholder text | ||
| new_parts.append({'text': '[PDF content omitted from history]'}) | ||
| else: | ||
| new_parts.append(part) | ||
| content_dict['parts'] = new_parts | ||
| event_json['content'] = content_dict | ||
| if event.error_code: | ||
| event_json['error_code'] = event.error_code | ||
| if event.error_message: | ||
| event_json['error_message'] = event.error_message | ||
| return event_json | ||
|
|
||
|
|
||
| def _from_firestore_doc_to_event(doc: firestore.DocumentSnapshot) -> Event: | ||
| """Deserializes a Firestore document into an Event object.""" | ||
| event_dict = doc.to_dict() | ||
| event_actions = EventActions() | ||
| if event_dict.get("actions", None): | ||
| actions_data = event_dict["actions"] | ||
| event_actions = EventActions( | ||
| skip_summarization=actions_data.get("skipSummarization", None), | ||
| state_delta=actions_data.get("stateDelta", {}), | ||
| artifact_delta=actions_data.get("artifactDelta", {}), | ||
| transfer_to_agent=actions_data.get("transferAgent", None), | ||
| escalate=actions_data.get("escalate", None), | ||
| requested_auth_configs=actions_data.get("requestedAuthConfigs", {}), | ||
| ) | ||
Hariharan0309 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| ts_map = event_dict["timestamp"] | ||
| timestamp_float = ts_map["seconds"] + ts_map.get("nanos", 0) / 1_000_000_000 | ||
|
|
||
| content_dict = event_dict.get("content", None) | ||
| content = Content(**content_dict) if content_dict else None | ||
|
|
||
| event = Event( | ||
| id=doc.id, | ||
| invocation_id=event_dict["invocation_id"], | ||
| author=event_dict["author"], | ||
| actions=event_actions, | ||
| content=content, | ||
| timestamp=timestamp_float, | ||
| error_code=event_dict.get("error_code", None), | ||
| error_message=event_dict.get("error_message", None), | ||
| ) | ||
|
|
||
| if event_dict.get("event_metadata", None): | ||
| metadata = event_dict["event_metadata"] | ||
| long_running_tool_ids_list = metadata.get("long_running_tool_ids", None) | ||
| event.partial = metadata.get("partial", None) | ||
| event.turn_complete = metadata.get("turn_complete", None) | ||
| event.interrupted = metadata.get("interrupted", None) | ||
| event.branch = metadata.get("branch", None) | ||
| # grounding_metadata_dict = metadata.get("grounding_metadata", None) | ||
| # if grounding_metadata_dict: | ||
| # event.grounding_metadata = GroundingMetadata(**grounding_metadata_dict) | ||
Hariharan0309 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| event.long_running_tool_ids = ( | ||
| set(long_running_tool_ids_list) if long_running_tool_ids_list else None | ||
| ) | ||
|
|
||
| return event | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.