diff --git a/frontend/src2/data_store/DataStoreList.vue b/frontend/src2/data_store/DataStoreList.vue index 9de418927..aac6941cb 100644 --- a/frontend/src2/data_store/DataStoreList.vue +++ b/frontend/src2/data_store/DataStoreList.vue @@ -1,73 +1,68 @@ diff --git a/frontend/src2/data_store/SyncDialog.vue b/frontend/src2/data_store/SyncDialog.vue new file mode 100644 index 000000000..1e746ac37 --- /dev/null +++ b/frontend/src2/data_store/SyncDialog.vue @@ -0,0 +1,109 @@ + + + diff --git a/frontend/src2/data_store/SyncLogSummary.vue b/frontend/src2/data_store/SyncLogSummary.vue new file mode 100644 index 000000000..291bc9720 --- /dev/null +++ b/frontend/src2/data_store/SyncLogSummary.vue @@ -0,0 +1,80 @@ + + + diff --git a/frontend/src2/data_store/data_store.ts b/frontend/src2/data_store/data_store.ts index e5bf235fd..d0dfae9aa 100644 --- a/frontend/src2/data_store/data_store.ts +++ b/frontend/src2/data_store/data_store.ts @@ -4,6 +4,16 @@ import { reactive, ref } from 'vue' import { showErrorToast } from '../helpers' import { DatabaseType } from '../data_source/data_source.types' +export type SyncLog = { + status: string + rows_imported: number + time_taken: number + started_at: string + ended_at: string + error: string + output: string +} + export type DataStoreTable = { name?: string data_source: string @@ -11,6 +21,7 @@ export type DataStoreTable = { database_type: DatabaseType last_synced_on: string last_synced_from_now: string + sync_mode: string row_limit?: number } const storedTables = ref>({}) @@ -44,12 +55,13 @@ async function getTables(data_source?: string, search_term?: string, limit: numb } const importingTable = ref(false) -async function importTable(data_source: string, table_name: string, row_limit?: number) { +async function importTable(data_source: string, table_name: string, row_limit?: number, sync_mode?: string) { importingTable.value = true return call('insights.api.data_store.import_table', { data_source, table_name, row_limit, + sync_mode: sync_mode || '', }) .then(() => { getTables(data_source) @@ -60,6 +72,29 @@ async function importTable(data_source: string, table_name: string, row_limit?: }) } +async function syncTable(data_source: string, table_name: string) { + return call('insights.api.data_store.import_table', { + data_source, + table_name, + sync_mode: 'Incremental Sync', + }).catch(showErrorToast) +} + +async function fullRefreshTable(data_source: string, table_name: string) { + return call('insights.api.data_store.import_table', { + data_source, + table_name, + sync_mode: 'Full Refresh', + }).catch(showErrorToast) +} + +async function getLastSyncLog(data_source: string, table_name: string): Promise { + return call('insights.api.data_store.get_last_sync_log', { + data_source, + table_name, + }).catch(showErrorToast) +} + export default function useDataStore() { return reactive({ tables: storedTables, @@ -68,5 +103,8 @@ export default function useDataStore() { importingTable, importTable, + syncTable, + fullRefreshTable, + getLastSyncLog, }) } diff --git a/insights/api/data_store.py b/insights/api/data_store.py index b0dde3188..a1124282d 100644 --- a/insights/api/data_store.py +++ b/insights/api/data_store.py @@ -20,6 +20,7 @@ def get_data_store_tables(data_source: str | None = None, search_term: str | Non Table.label, Table.data_source, Table.last_synced_on, + Table.sync_mode, DataSource.database_type, ) .where( @@ -30,6 +31,7 @@ def get_data_store_tables(data_source: str | None = None, search_term: str | Non | (Table.table == search_term if search_term else Table.table.like("%")) ) ) + .orderby(Table.last_synced_on, order=frappe.qb.desc) .limit(limit) .run(as_dict=True) ) @@ -45,18 +47,38 @@ def get_data_store_tables(data_source: str | None = None, search_term: str | Non "data_source": table.data_source, "database_type": table.database_type, "last_synced_on": table.last_synced_on, + "sync_mode": table.sync_mode or "Incremental Sync", } ) ) return ret +@insights_whitelist() +@validate_type +def get_last_sync_log(data_source: str, table_name: str): + log = frappe.get_all( + "Insights Table Import Log", + filters={"data_source": data_source, "table_name": table_name}, + fields=["status", "rows_imported", "time_taken", "started_at", "ended_at", "error", "output"], + order_by="creation desc", + limit=1, + ) + return log[0] if log else None + + @insights_whitelist(role="Insights Admin") @validate_type -def import_table(data_source: str, table_name: str): - name = get_table_name(data_source, table_name) - table_doc = frappe.get_doc("Insights Table v3", name) - table_doc.import_to_warehouse() +def import_table(data_source: str, table_name: str, sync_mode: str = ""): + from insights.insights.doctype.insights_data_source_v3.data_warehouse import WarehouseTable + + if sync_mode: + name = get_table_name(data_source, table_name) + if frappe.db.exists("Insights Table v3", name): + frappe.db.set_value("Insights Table v3", name, "sync_mode", sync_mode) + + wt = WarehouseTable(data_source, table_name) + wt.enqueue_import(sync_mode=sync_mode) def sync_tables(): @@ -64,11 +86,32 @@ def sync_tables(): tables = frappe.get_all( "Insights Table v3", filters={"stored": 1}, - fields=["name", "data_source", "table"], + fields=["name", "data_source", "table", "sync_mode"], ) for table in tables: - import_table(table.data_source, table.table) + import_table(table.data_source, table.table, sync_mode=table.sync_mode or "Incremental Sync") + + +def handle_warehouse_deletes(): + from insights.insights.doctype.insights_data_source_v3.data_warehouse import ( + WarehouseTable, + ) + from insights.insights.doctype.insights_data_source_v3.delete_tracker import handle_deletes + + tables = frappe.get_all( + "Insights Table v3", + filters={"stored": 1, "sync_mode": "Incremental Sync"}, + fields=["data_source", "table"], + ) + + for t in tables: + try: + wt = WarehouseTable(t.data_source, t.table) + doctype = t.table.replace("tab", "", 1) if t.table.startswith("tab") else t.table + handle_deletes(doctype=doctype, warehouse_table=wt.warehouse_table_name, schema=wt.schema) + except Exception: + frappe.log_error(title=f"Delete failed for {t.data_source}/{t.table}") def update_failed_sync_status(): diff --git a/insights/hooks.py b/insights/hooks.py index aa212b84a..5b55da4ba 100644 --- a/insights/hooks.py +++ b/insights/hooks.py @@ -148,7 +148,7 @@ doc_events = { "User": { "on_change": "insights.insights.doctype.insights_team.insights_team.update_admin_team", - } + }, } # Scheduled Tasks @@ -160,6 +160,7 @@ ], "daily": [ "insights.api.data_store.sync_tables", + "insights.api.data_store.handle_warehouse_deletes", ], "hourly": [ "insights.api.data_store.update_failed_sync_status", diff --git a/insights/insights/doctype/insights_data_source_v3/data_warehouse.py b/insights/insights/doctype/insights_data_source_v3/data_warehouse.py index fc32cbc99..3ee57238a 100644 --- a/insights/insights/doctype/insights_data_source_v3/data_warehouse.py +++ b/insights/insights/doctype/insights_data_source_v3/data_warehouse.py @@ -94,7 +94,13 @@ def get_table(self, data_source: str, table_name: str) -> "WarehouseTable": return WarehouseTable(data_source, table_name) def get_table_writer( - self, table_name: str, schema: ibis.Schema, database: str = "main", mode: str = "replace", log_fn=None + self, + table_name: str, + schema: ibis.Schema, + database: str = "main", + mode: str = "replace", + upsert_key: str = "", + log_fn=None, ) -> "WarehouseTableWriter": """Create a table writer for batch inserts with automatic cleanup. @@ -106,7 +112,12 @@ def get_table_writer( # On exception, temp files are cleaned up automatically """ return WarehouseTableWriter( - table_name, table_schema=schema, database=database, mode=mode, log_fn=log_fn + table_name, + table_schema=schema, + database=database, + mode=mode, + upsert_key=upsert_key, + log_fn=log_fn, ) @@ -128,12 +139,14 @@ def __init__( table_schema: ibis.Schema, database: str = "main", mode: str = "replace", + upsert_key: str = "", log_fn=None, ): self.database = database self.table_name = table_name self.table_schema = table_schema - self.mode = mode # 'replace' or 'append' + self.mode = mode # 'replace', 'append' or 'upsert' + self.upsert_key = upsert_key self._log = log_fn or (lambda *args, **kwargs: None) self._temp_dir: Path | None = None @@ -194,9 +207,20 @@ def commit(self) -> int: self._log(f"Switched to '{self.database}' database") parquet_glob = str(self._temp_dir / "*.parquet") - merged = db.read_parquet(parquet_glob) - - if self.mode == "append" and self._table_exists(db): + merged = db.read_parquet(parquet_glob, union_by_name=True) + + # for upsert we have to create a temp table and merge + if self.mode == "upsert" and self._table_exists(db) and self.upsert_key: + temp_name = f"_tmp_{self.table_name}" + db.create_table(temp_name, merged, overwrite=True) + key = self.upsert_key + db.raw_sql( + f'DELETE FROM "{self.table_name}" WHERE "{key}" IN ' + f'(SELECT "{key}" FROM "{temp_name}")' + ) + db.raw_sql(f'INSERT INTO "{self.table_name}" SELECT * FROM "{temp_name}"') + db.raw_sql(f'DROP TABLE "{temp_name}"') + elif self.mode == "append" and self._table_exists(db): db.insert(self.table_name, merged) else: db.create_table(self.table_name, merged, schema=self.table_schema, overwrite=True) @@ -282,12 +306,24 @@ def get_remote_table(self) -> Expr: ds = InsightsDataSourcev3.get_doc(self.data_source) return ds.get_ibis_table(self.table_name) - def enqueue_import(self): + def enqueue_import(self, sync_mode: str = ""): if frappe.db.get_value("Insights Data Source v3", self.data_source, "type") == "REST API": frappe.throw("Import not supported for API data sources") - importer = WarehouseTableImporter(self) - importer.enqueue_import() + if not sync_mode: + sync_mode = ( + frappe.db.get_value("Insights Table v3", self.table_doc_name, "sync_mode") or "Full Refresh" + ) + + if sync_mode == "Incremental Sync": + from insights.insights.doctype.insights_data_source_v3.incremental_sync import ( + enqueue_incremental_sync, + ) + + enqueue_incremental_sync(data_source=self.data_source, table_name=self.table_name) + else: + importer = WarehouseTableImporter(self) + importer.enqueue_import() class WarehouseTableImporter: @@ -451,6 +487,7 @@ def start_batch_import(self): total_rows = self.process_batches(batch_size, writer) self.log.rows_imported = total_rows self.update_insights_table() + self._set_incremental_cursor() self.log.status = "Completed" self._log("Import completed successfully.") except Exception as e: @@ -524,6 +561,27 @@ def update_insights_table(self): t.last_synced_on = frappe.utils.now() t.save() + def _set_incremental_cursor(self): + """After full refresh, set the cursor to the latest row so incremental picks up from here""" + from insights.insights.doctype.insights_data_source_v3.incremental_sync import SyncCursorManager + + manager = SyncCursorManager() + try: + with insights.warehouse.get_write_connection() as db: + db.raw_sql(f"USE '{self.table.schema}'") + result = db.raw_sql( + f'SELECT "modified", "name" FROM "{self.warehouse_table_name}" ' + f'ORDER BY "modified" DESC, "name" DESC LIMIT 1' + ).fetchone() + if result: + manager.update_cursor( + self.table.data_source, self.table.table_name, str(result[0]), str(result[1]) + ) + return + except Exception: + pass + manager.delete_cursor(self.table.data_source, self.table.table_name) + def _log(self, message: str, commit: bool = True): if self.last_log_time is None: self.last_log_time = time.monotonic() diff --git a/insights/insights/doctype/insights_data_source_v3/delete_tracker.py b/insights/insights/doctype/insights_data_source_v3/delete_tracker.py new file mode 100644 index 000000000..c5263532b --- /dev/null +++ b/insights/insights/doctype/insights_data_source_v3/delete_tracker.py @@ -0,0 +1,39 @@ +import frappe + +import insights + + +def _esc(value: str) -> str: + return value.replace("'", "''") + + +def handle_deletes(doctype: str, warehouse_table: str, schema: str, log_fn=None): + _log = log_fn or (lambda msg: None) + batch_size = 10_000 + + with insights.warehouse.get_write_connection() as db: + db.raw_sql(f"USE '{_esc(schema)}'") + + if warehouse_table not in db.list_tables(): + _log("warehouse table not found, skipping") + return 0 + + wh_names = db.table(warehouse_table).select("name").execute()["name"].tolist() + + stale = [] + for i in range(0, len(wh_names), batch_size): + batch = wh_names[i : i + batch_size] + existing = set(frappe.get_all(doctype, filters={"name": ("in", batch)}, pluck="name")) + stale.extend(n for n in batch if n not in existing) + + if not stale: + _log("no stale rows found") + return 0 + + for i in range(0, len(stale), batch_size): + batch = stale[i : i + batch_size] + values = ", ".join(f"'{_esc(n)}'" for n in batch) + db.raw_sql(f'DELETE FROM "{_esc(warehouse_table)}" WHERE "name" IN ({values})') + + _log(f"deleted {len(stale)} stale rows") + return len(stale) diff --git a/insights/insights/doctype/insights_data_source_v3/incremental_sync.py b/insights/insights/doctype/insights_data_source_v3/incremental_sync.py new file mode 100644 index 000000000..2e16a570c --- /dev/null +++ b/insights/insights/doctype/insights_data_source_v3/incremental_sync.py @@ -0,0 +1,278 @@ +import time + +import frappe +import frappe.utils +import ibis +import pandas as pd +from ibis.backends.sql.datatypes import DuckDBType + +import insights +from insights.insights.doctype.insights_data_source_v3.data_warehouse import ( + WAREHOUSE_DB_NAME, + WarehouseTable, + get_warehouse_schema_name, +) + + +def _ibis_to_duckdb(ibis: str): + return DuckDBType.to_string(ibis.dtype()) + + +class SyncCursorManager: + def _get_table_doc_name(self, data_source: str, table_name: str) -> str: + from insights.insights.doctype.insights_table_v3.insights_table_v3 import get_table_name + + return get_table_name(data_source, table_name) + + def get_cursor(self, data_source: str, table_name: str): + doc_name = self._get_table_doc_name(data_source, table_name) + cursor_value = frappe.db.get_value("Insights Table v3", doc_name, "cursor") + if not cursor_value: + return None + parts = cursor_value.split("|||") + if len(parts) != 2: + return None + return {"last_modified": parts[0], "last_name": parts[1]} + + def update_cursor(self, data_source: str, table_name: str, last_modified: str, last_name: str): + doc_name = self._get_table_doc_name(data_source, table_name) + cursor_value = f"{last_modified}{'|||'}{last_name}" + frappe.db.set_value("Insights Table v3", doc_name, "cursor", cursor_value) + + def delete_cursor(self, data_source: str, table_name: str): + doc_name = self._get_table_doc_name(data_source, table_name) + frappe.db.set_value("Insights Table v3", doc_name, "cursor", "") + + +class IncrementalImporter: + # batch size hard coded to 1000 to avoid memory issues + def __init__(self, table: WarehouseTable, batch_size: int = 2000): + self.table = table + self.doctype = ( + table.table_name.replace("tab", "", 1) if table.table_name.startswith("tab") else table.table_name + ) + self.batch_size = batch_size + self.schema = get_warehouse_schema_name(table.data_source) + self.cursor_mgr = SyncCursorManager() + + self.log = None + self.last_log_time = None + + def start_sync(self): + from insights.insights.doctype.insights_data_source_v3.insights_data_source_v3 import ( + db_connections, + ) + + self._sync_summary = {"upserted": 0, "deleted": 0, "batches": 0} + + with db_connections(): + self._prepare_log() + self._run_sync_loop() + self._finalize_log() + + summary = self._sync_summary + insights.create_toast( + f"Synced {frappe.bold(self.table.table_name)} — {summary['upserted']} rows upserted", + title="Incremental Sync Completed", + type="success", + duration=5, + ) + + def _prepare_log(self): + self.log = frappe.new_doc("Insights Table Import Log") + self.log.db_insert() + self.log.db_set( + { + "data_source": self.table.data_source, + "table_name": self.table.table_name, + "started_at": frappe.utils.now(), + "status": "In Progress", + "batch_size": self.batch_size, + }, + commit=True, + ) + + insights.create_toast( + f"Syncing {frappe.bold(self.table.table_name)} to the data store" + "You may not see the results till the sync is completed", + title="Incremental Sync Started", + duration=5, + ) + + def _run_sync_loop(self): + cursor = self.cursor_mgr.get_cursor(self.table.data_source, self.table.table_name) + if not cursor: + self._log("No cursor found, Running full refresh to establish baseline") + self.log.db_set({"status": "Skipped"}, commit=True) + self.table.enqueue_import(sync_mode="Full Refresh") + return + + last_modified = cursor["last_modified"] + last_name = cursor["last_name"] + + try: + # stage 1: fetch all batches from source + batches = self._fetch_all_batches(last_modified, last_name) + if not batches: + self._log("No new rows to sync") + self.log.db_set({"status": "Completed"}, commit=True) + self._sync_summary = {"upserted": 0, "deleted": 0, "batches": 0} + return + + # stage 2: write all batches to warehouse + total_upserted = self._write_batches(batches) + + # stage 3: update cursor to last row + last_df = batches[-1] + last_modified, last_name = self.get_cursor(last_df) + self.cursor_mgr.update_cursor( + self.table.data_source, self.table.table_name, last_modified, last_name + ) + + self.log.db_set({"rows_imported": total_upserted, "status": "Completed"}, commit=True) + self._log(f"Sync complete. {total_upserted} rows upserted in {len(batches)} batches.") + self._update_insights_table() + self._sync_summary = {"upserted": total_upserted, "deleted": 0, "batches": len(batches)} + + except Exception as e: + self.log.db_set({"status": "Failed"}, commit=True) + self._log(f"Error: {e}") + raise + + def _fetch_all_batches(self, last_modified, last_name): + remote = self.table.get_remote_table() + batches = [] + while True: + self._log(f"Fetching batch {len(batches) + 1}: cursor ({last_modified}, {last_name})") + df = self._fetch_batch(remote, last_modified, last_name) + if df.empty: + break + self._log(f"Fetched {len(df)} rows") + batches.append(df) + last_modified, last_name = self.get_cursor(df) + if len(df) < self.batch_size: + break + return batches + + def _write_batches(self, batches: list[pd.DataFrame]): + schema = ibis.memtable(batches[0]).schema() + self._evolve_schema(schema) + # close cached read-only connection so write connection can open + if WAREHOUSE_DB_NAME in insights.db_connections: + insights.db_connections[WAREHOUSE_DB_NAME].disconnect() + del insights.db_connections[WAREHOUSE_DB_NAME] + total = 0 + with insights.warehouse.get_table_writer( + self.table.warehouse_table_name, + schema, + database=self.schema, + mode="upsert", + upsert_key="name", + log_fn=self._log, + ) as writer: + for df in batches: + writer.insert(df) + total += len(df) + return total + + def _fetch_batch(self, remote, last_modified: str | None, last_name: str | None): + query = remote + if last_modified and last_name: + query = query.filter( + (query.modified > last_modified) + | ((query.modified == last_modified) & (query.name > last_name)) + ) + return query.order_by(["modified", "name"]).limit(self.batch_size).execute() + + def get_cursor(self, df: pd.DataFrame): + last_row = df.iloc[-1] + return str(last_row["modified"]), str(last_row["name"]) + + def detect_schema_changes(self, current: dict[str, str], incoming: dict[str, str]): + current_cols = set(current.keys()) + incoming_cols = set(incoming.keys()) + added = {col: incoming[col] for col in incoming_cols - current_cols} + removed = current_cols - incoming_cols + return added, removed + + def _evolve_schema(self, incoming_ibis_schema: ibis.Schema): + incoming_schema = {col: str(dtype) for col, dtype in incoming_ibis_schema.items()} + + try: + current_table = insights.warehouse.db.table(self.table.warehouse_table_name, database=self.schema) + current_schema = {col: str(dtype) for col, dtype in current_table.schema().items()} + except Exception: + return # Table doesn't exist yet, will be created on first upsert + + added, removed = self.detect_schema_changes(current_schema, incoming_schema) + + if not added and not removed: + return + + with insights.warehouse.get_write_connection() as db: + db.raw_sql(f"USE '{self.schema}'") + for col, dtype in added.items(): + duckdb_type = _ibis_to_duckdb(dtype) + self._log(f"Adding column: {col} ({duckdb_type})") + db.raw_sql( + f'ALTER TABLE "{self.table.warehouse_table_name}" ADD COLUMN "{col}" {duckdb_type}' + ) + + for col in removed: + self._log(f"Dropping column: {col}") + db.raw_sql(f'ALTER TABLE "{self.table.warehouse_table_name}" DROP COLUMN "{col}"') + + def _update_insights_table(self): + from insights.utils import InsightsTablev3 + + t = InsightsTablev3.get_doc( + { + "data_source": self.table.data_source, + "table": self.table.table_name, + } + ) + t.stored = 1 + t.last_synced_on = frappe.utils.now() + t.save() + + def _finalize_log(self): + ended_at = frappe.utils.now() + self.log.db_set( + { + "ended_at": ended_at, + "time_taken": frappe.utils.time_diff_in_seconds(ended_at, self.log.started_at), + }, + commit=True, + ) + + def _log(self, message: str, commit: bool = True): + if not self.log: + return + if self.last_log_time is None: + self.last_log_time = time.monotonic() + elapsed = 0.0 + else: + current_time = time.monotonic() + elapsed = current_time - self.last_log_time + self.last_log_time = current_time + + self.log.log_output(f"[{frappe.utils.now()}] [{elapsed:.1f}s] {message}", commit=commit) + + +def enqueue_incremental_sync(data_source: str, table_name: str): + job_id = f"inc_sync_{frappe.scrub(data_source)}_{frappe.scrub(table_name)}" + frappe.enqueue( + "insights.insights.doctype.insights_data_source_v3.incremental_sync.execute_incremental_sync", + data_source=data_source, + table_name=table_name, + queue="long", + timeout=30 * 60, + job_id=job_id, + deduplicate=True, + ) + + +def execute_incremental_sync(data_source: str, table_name: str): + table = WarehouseTable(data_source, table_name) + importer = IncrementalImporter(table) + importer.start_sync() diff --git a/insights/insights/doctype/insights_table_v3/insights_table_v3.json b/insights/insights/doctype/insights_table_v3/insights_table_v3.json index a0dfcb9f6..2b2be4c27 100644 --- a/insights/insights/doctype/insights_table_v3/insights_table_v3.json +++ b/insights/insights/doctype/insights_table_v3/insights_table_v3.json @@ -12,6 +12,8 @@ "last_synced_on", "row_limit", "stored", + "sync_mode", + "cursor", "before_import_script" ], "fields": [ @@ -61,16 +63,28 @@ "fieldtype": "Int", "label": "Row Limit" }, + { + "default": "Incremental Sync", + "fieldname": "sync_mode", + "fieldtype": "Select", + "label": "Sync Mode", + "options": "Full Refresh\nIncremental Sync" + }, { "fieldname": "before_import_script", "fieldtype": "Code", "label": "Before Import Script", "options": "Python" + }, + { + "fieldname": "cursor", + "fieldtype": "Data", + "label": "Cursor" } ], "index_web_pages_for_search": 1, "links": [], - "modified": "2025-10-01 12:49:40.113177", + "modified": "2026-03-31 11:34:47.368303", "modified_by": "Administrator", "module": "Insights", "name": "Insights Table v3", diff --git a/insights/insights/doctype/insights_table_v3/insights_table_v3.py b/insights/insights/doctype/insights_table_v3/insights_table_v3.py index fb015d862..78e0c4adf 100644 --- a/insights/insights/doctype/insights_table_v3/insights_table_v3.py +++ b/insights/insights/doctype/insights_table_v3/insights_table_v3.py @@ -25,11 +25,13 @@ class InsightsTablev3(Document): from frappe.types import DF before_import_script: DF.Code | None + cursor: DF.Data | None data_source: DF.Link label: DF.Data last_synced_on: DF.Datetime | None row_limit: DF.Int stored: DF.Check + sync_mode: DF.Literal["Full Refresh", "Incremental Sync"] table: DF.Data # end: auto-generated types