feat: Add Microsoft Dataverse source and bootstrap provider#304
feat: Add Microsoft Dataverse source and bootstrap provider#304ruokun-niu wants to merge 22 commits intodrasi-project:mainfrom
Conversation
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
| "c", | ||
| "s", | ||
| "https://myorg.crm.dynamics.com", | ||
| "http://localhost:8080/token", |
Check notice
Code scanning / devskim
Accessing localhost could indicate debug code, or could hinder scaling. Note
| "https://myorg.crm.dynamics.com", | ||
| "http://localhost:8080/token", | ||
| ); | ||
| assert_eq!(tm.token_url, "http://localhost:8080/token"); |
Check notice
Code scanning / devskim
Accessing localhost could indicate debug code, or could hinder scaling. Note
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
There was a problem hiding this comment.
Pull request overview
This PR adds a new Microsoft Dataverse source plugin and bootstrap provider for Drasi, enabling continuous monitoring of Dataverse tables for data changes using OData Web API change tracking (delta links). It also extends AzureIdentityProvider with a with_client_secret constructor and updates the source planner agent docs.
Changes:
- New
drasi-source-dataversecrate with polling-based change detection using OData delta links, adaptive backoff, per-entity workers, and OAuth2/Azure CLI authentication - New
drasi-bootstrap-dataversecrate for initial data loading from Dataverse entities - Added
AzureIdentityProvider::with_client_secret()to the shared identity library
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
Cargo.toml |
Added both new crates to workspace members |
lib/src/identity/azure.rs |
Added with_client_secret constructor and tests |
components/sources/dataverse/Cargo.toml |
New crate manifest |
components/sources/dataverse/src/config.rs |
Source configuration with serde aliases and validation |
components/sources/dataverse/src/auth.rs |
Token manager with client credentials and Azure CLI support |
components/sources/dataverse/src/types.rs |
OData delta response types and change parsing |
components/sources/dataverse/src/client.rs |
HTTP client for Dataverse OData Web API |
components/sources/dataverse/src/lib.rs |
Main source plugin with polling loop and builder |
components/sources/dataverse/src/descriptor.rs |
Plugin descriptor for dynamic loading |
components/sources/dataverse/tests/integration_test.rs |
Wiremock-based integration test (ignored) |
components/sources/dataverse/README.md |
Source documentation |
components/bootstrappers/dataverse/Cargo.toml |
New crate manifest |
components/bootstrappers/dataverse/src/config.rs |
Bootstrap configuration with validation |
components/bootstrappers/dataverse/src/lib.rs |
Bootstrap provider with pagination |
components/bootstrappers/dataverse/src/descriptor.rs |
Plugin descriptor for dynamic loading |
components/bootstrappers/dataverse/README.md |
Bootstrap documentation |
.github/agents/source-planner.md |
Added authentication guidance for source plugins |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| @@ -0,0 +1,38 @@ | |||
| [package] | |||
| @@ -0,0 +1,37 @@ | |||
| [package] | |||
| fn convert_json_value(value: &serde_json::Value) -> ElementValue { | ||
| match value { | ||
| serde_json::Value::Null => ElementValue::Null, | ||
| serde_json::Value::Bool(b) => ElementValue::Bool(*b), | ||
| serde_json::Value::Number(n) => { | ||
| if let Some(i) = n.as_i64() { | ||
| ElementValue::Integer(i) | ||
| } else if let Some(f) = n.as_f64() { | ||
| ElementValue::Float(ordered_float::OrderedFloat(f)) | ||
| } else { | ||
| ElementValue::Null | ||
| } | ||
| } | ||
| serde_json::Value::String(s) => ElementValue::String(Arc::from(s.as_str())), | ||
| serde_json::Value::Array(arr) => { | ||
| if !arr.is_empty() { | ||
| if let Some(first_obj) = arr[0].as_object() { | ||
| if first_obj.contains_key("Value") { | ||
| let values: Vec<ElementValue> = arr | ||
| .iter() | ||
| .filter_map(|item| { | ||
| item.as_object() | ||
| .and_then(|obj| obj.get("Value")) | ||
| .map(convert_json_value) | ||
| }) | ||
| .collect(); | ||
| return ElementValue::List(values); | ||
| } | ||
| } | ||
| } | ||
| ElementValue::List(arr.iter().map(convert_json_value).collect()) | ||
| } | ||
| serde_json::Value::Object(obj) => { | ||
| if obj.contains_key("Value") && obj.len() <= 2 { | ||
| if let Some(val) = obj.get("Value") { | ||
| return convert_json_value(val); | ||
| } | ||
| } | ||
| let mut map = ElementPropertyMap::new(); | ||
| for (k, v) in obj { | ||
| map.insert(k, convert_json_value(v)); | ||
| } | ||
| ElementValue::Object(map) | ||
| } | ||
| } | ||
| } |
| // Track how many times the delta endpoint is called to sequence responses | ||
| let delta_call_count = Arc::new(AtomicUsize::new(0)); | ||
| let delta_count_clone = delta_call_count.clone(); | ||
|
|
| //! | `entities` | Vec\<String\> | required | Entity logical names to monitor | | ||
| //! | `entity_set_overrides` | HashMap\<String, String\> | `{}` | Override entity set name mapping | | ||
| //! | `entity_columns` | HashMap\<String, Vec...\> | `{}` | Per-entity column selection | | ||
| //! | `polling_interval_ms` | u64 | `5000` | Base polling interval | |
| let shutdown_tx_clone = shutdown_tx.clone(); | ||
| let source_id = self.base.id.clone(); | ||
| let combined_handle = tokio::spawn(async move { | ||
| for (i, handle) in task_handles.into_iter().enumerate() { | ||
| if let Err(e) = handle.await { | ||
| log::error!("[{source_id}] Entity worker {i} terminated with error: {e}"); | ||
| } | ||
| } | ||
| log::info!("[{source_id}] All entity workers stopped"); | ||
| }); | ||
|
|
||
| *self.base.task_handle.write().await = Some(combined_handle); | ||
|
|
||
| // Store shutdown_tx in a custom slot | ||
| // We abuse the oneshot field by creating a bridge | ||
| let (oneshot_tx, _oneshot_rx) = tokio::sync::oneshot::channel::<()>(); | ||
| // When oneshot_tx is dropped (in stop()), we trigger shutdown | ||
| // Actually, let's use a different approach: store shutdown_tx in context | ||
| // For now, store it directly - we need the watch::Sender | ||
| { | ||
| // We'll trigger shutdown via the watch channel | ||
| // Store the sender so stop() can use it | ||
| let mut lock = self.base.shutdown_tx.write().await; | ||
| // Create a bridge: when the oneshot is sent, signal the watch | ||
| let shutdown_tx_for_stop = shutdown_tx.clone(); | ||
| let (bridge_tx, bridge_rx) = tokio::sync::oneshot::channel::<()>(); | ||
| tokio::spawn(async move { | ||
| let _ = bridge_rx.await; | ||
| let _ = shutdown_tx_for_stop.send(true); | ||
| }); | ||
| *lock = Some(bridge_tx); | ||
| } |
|
|
||
| /// Dynamic plugin entry point. | ||
| /// | ||
| /// Dynamic plugin entry point. |
| /// | ||
| /// Dynamic plugin entry point. |
| let source = crate::DataverseSourceBuilder::new(id) | ||
| .with_environment_url(config.environment_url.clone()) | ||
| .with_tenant_id(config.tenant_id.clone()) | ||
| .with_client_id(config.client_id.clone()) | ||
| .with_client_secret(config.client_secret.clone()) | ||
| .with_entities(config.entities.clone()) | ||
| .with_polling_interval_ms(config.polling_interval_ms) | ||
| .with_min_interval_ms(config.min_interval_ms) | ||
| .with_max_interval_seconds(config.max_interval_seconds) | ||
| .with_api_version(config.api_version.clone()) | ||
| .with_auto_start(auto_start) | ||
| .build()?; |
| /// Azure AD client secret. | ||
| /// Required for client credentials flow, ignored when `use_azure_cli` is true. | ||
| #[serde(default, alias = "clientSecret")] | ||
| pub client_secret: String, |
There was a problem hiding this comment.
We should wrap these in ConfigValue so that they can be mapped to env variables or secrets.
There was a problem hiding this comment.
These fields are already wrapped in ConfigValue in descriptor.rs, which is the platform/dynamic plugin entry point. DtoMapper resolves env vars and secrets into plain strings, which are then stored here in config.rs. This matches the pattern used by the postgres source and bootstrap components.
| /// - `min_interval_ms`: Minimum adaptive polling interval (default: 500) | ||
| /// - `max_interval_seconds`: Maximum adaptive polling interval (default: 30) | ||
| /// - `api_version`: Dataverse Web API version (default: `v9.2`) | ||
| #[derive(Debug, Clone, Serialize, Deserialize)] |
There was a problem hiding this comment.
These are internal domain objects, when do they need to be serialized / deserialized?
There was a problem hiding this comment.
Good catch. they dont need ot be serialized
| api_version, | ||
| }; | ||
|
|
||
| let source = crate::DataverseSourceBuilder::new(id) |
There was a problem hiding this comment.
entity_set_overrides and entity_columns are being dropped here
There was a problem hiding this comment.
Good catch. I have added them back
| entities: dto.entities, | ||
| entity_set_overrides: dto.entity_set_overrides, | ||
| entity_columns: dto.entity_columns, | ||
| polling_interval_ms, |
There was a problem hiding this comment.
polling interval is never actually used in the polling loop?
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
There was a problem hiding this comment.
Pull request overview
This PR adds a new Microsoft Dataverse source plugin and bootstrap provider for Drasi, enabling continuous monitoring of Dataverse tables via OData Web API change tracking (the REST equivalent of RetrieveEntityChangesRequest). It also extends AzureIdentityProvider with a with_client_secret constructor and updates the source-planner agent documentation with authentication guidance.
Changes:
- Two new crates:
drasi-source-dataverse(polling-based CDC with adaptive backoff and per-entity workers) anddrasi-bootstrap-dataverse(initial data loader with pagination) - New
AzureIdentityProvider::with_client_secret()method inlib/src/identity/azure.rsfor service-to-service OAuth2 authentication - Updated workspace
Cargo.tomland source-planner agent docs
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
Cargo.toml |
Added both new crates to workspace members |
lib/src/identity/azure.rs |
Added with_client_secret() constructor and tests |
components/sources/dataverse/Cargo.toml |
New source crate definition |
components/sources/dataverse/src/config.rs |
Source configuration with validation |
components/sources/dataverse/src/types.rs |
OData delta response types and parsing |
components/sources/dataverse/src/auth.rs |
TokenManager with client credentials and Azure CLI auth |
components/sources/dataverse/src/client.rs |
HTTP client for OData change tracking API |
components/sources/dataverse/src/lib.rs |
Main source impl with per-entity polling workers |
components/sources/dataverse/src/descriptor.rs |
Dynamic plugin descriptor for the source |
components/sources/dataverse/tests/integration_test.rs |
Wiremock-based integration test (ignored) |
components/sources/dataverse/README.md |
Source documentation |
components/sources/dataverse/Makefile |
Build/test/lint targets |
components/bootstrappers/dataverse/Cargo.toml |
New bootstrap crate definition |
components/bootstrappers/dataverse/src/config.rs |
Bootstrap configuration with validation |
components/bootstrappers/dataverse/src/lib.rs |
Bootstrap provider with pagination and label filtering |
components/bootstrappers/dataverse/src/descriptor.rs |
Dynamic plugin descriptor for bootstrap |
components/bootstrappers/dataverse/README.md |
Bootstrap documentation |
components/bootstrappers/dataverse/Makefile |
Build/test/lint targets |
.github/agents/source-planner.md |
Auth guidance for source development |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let delta_call_count = Arc::new(AtomicUsize::new(0)); | ||
| let delta_count_clone = delta_call_count.clone(); | ||
|
|
||
| // 2. Mount OAuth2 token mock (always available) | ||
| Mock::given(method("POST")) | ||
| .and(path_regex(r".*/oauth2/v2.0/token")) | ||
| .respond_with(token_response()) | ||
| .expect(1..) | ||
| .mount(&mock_server) | ||
| .await; | ||
|
|
||
| // 3. Mount initial change tracking request (GET /api/data/v9.2/accounts with Prefer header) | ||
| let mock_uri_for_initial = mock_uri.clone(); | ||
| Mock::given(method("GET")) | ||
| .and(path("/api/data/v9.2/accounts")) | ||
| .and(header("Prefer", "odata.track-changes")) | ||
| .respond_with(empty_delta_response(&mock_uri_for_initial)) | ||
| .expect(1) | ||
| .mount(&mock_server) | ||
| .await; | ||
|
|
||
| // 4. Mount delta link endpoint - returns sequenced responses | ||
| // We use a custom responder that returns different responses based on call count | ||
| let mock_uri_for_delta = mock_uri.clone(); | ||
| let mock_uri_for_delta2 = mock_uri.clone(); | ||
| let mock_uri_for_delta3 = mock_uri.clone(); | ||
| let mock_uri_for_delta4 = mock_uri.clone(); |
| let mock_uri_for_initial = mock_uri.clone(); | ||
| Mock::given(method("GET")) | ||
| .and(path("/api/data/v9.2/accounts")) | ||
| .and(header("Prefer", "odata.track-changes")) |
| let shutdown_tx_clone = shutdown_tx.clone(); | ||
| let source_id = self.base.id.clone(); | ||
| let combined_handle = tokio::spawn(async move { | ||
| for (i, handle) in task_handles.into_iter().enumerate() { | ||
| if let Err(e) = handle.await { | ||
| log::error!("[{source_id}] Entity worker {i} terminated with error: {e}"); | ||
| } | ||
| } | ||
| log::info!("[{source_id}] All entity workers stopped"); | ||
| }); | ||
|
|
||
| *self.base.task_handle.write().await = Some(combined_handle); | ||
|
|
||
| // Store shutdown_tx in a custom slot | ||
| // We abuse the oneshot field by creating a bridge | ||
| let (oneshot_tx, _oneshot_rx) = tokio::sync::oneshot::channel::<()>(); | ||
| // When oneshot_tx is dropped (in stop()), we trigger shutdown | ||
| // Actually, let's use a different approach: store shutdown_tx in context | ||
| // For now, store it directly - we need the watch::Sender |
| @@ -0,0 +1,37 @@ | |||
| [package] | |||
|
|
||
| /// Dynamic plugin entry point. | ||
| /// | ||
| /// Dynamic plugin entry point. |
| /// | ||
| /// Dynamic plugin entry point. |
| let source = crate::DataverseSourceBuilder::new(id) | ||
| .with_environment_url(config.environment_url.clone()) | ||
| .with_tenant_id(config.tenant_id.clone()) | ||
| .with_client_id(config.client_id.clone()) | ||
| .with_client_secret(config.client_secret.clone()) | ||
| .with_entities(config.entities.clone()) | ||
| .with_min_interval_ms(config.min_interval_ms) | ||
| .with_max_interval_seconds(config.max_interval_seconds) | ||
| .with_api_version(config.api_version.clone()) | ||
| .with_auto_start(auto_start); |
| @@ -0,0 +1,38 @@ | |||
| [package] | |||
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Description
Adds a new Dataverse source plugin and bootstrap provider that enable Drasi to continuously monitor Microsoft Dataverse (Power Platform) tables for data changes using the OData Web API change tracking mechanism — the REST equivalent of the platform's
RetrieveEntityChangesRequest.New Crates
drasi-source-dataverse(components/sources/dataverse/) — Polling-based change detection source using OData delta linksdrasi-bootstrap-dataverse(components/bootstrappers/dataverse/) — Initial data loader that fetches all existing records from configured entitiesKey Features
Change Tracking
Prefer: odata.track-changesheaders and@odata.deltaLinktokens (Web API equivalent ofRetrieveEntityChangesRequestwithDataVersion)Adaptive Backoff
SyncWorkeradaptive polling patternAuthentication
tenant_id+client_id+client_secretaz account get-access-tokenfor local development without secretsPlatform YAML Compatibility
endpoint,clientId,clientSecret,tenantId,useAzureCli)entitiesfield accepts both a JSON array and a comma-separated stringConfiguration Example
Or with Azure CLI auth (no secrets):
Files Added/Modified
New Files
components/sources/dataverse/components/bootstrappers/dataverse/components/sources/dataverse/AUTHENTICATION.mdexamples/lib/dataverse-getting-started/Modified Files
Cargo.tomlTesting
Prerequisites for Dataverse Environment
az loginwith a user that has Dataverse accessType of change
Fixes: #issue_number