Skip to content

feat: Add Microsoft Dataverse source and bootstrap provider#304

Open
ruokun-niu wants to merge 22 commits intodrasi-project:mainfrom
ruokun-niu:dataverse-source
Open

feat: Add Microsoft Dataverse source and bootstrap provider#304
ruokun-niu wants to merge 22 commits intodrasi-project:mainfrom
ruokun-niu:dataverse-source

Conversation

@ruokun-niu
Copy link
Contributor

Description

Adds a new Dataverse source plugin and bootstrap provider that enable Drasi to continuously monitor Microsoft Dataverse (Power Platform) tables for data changes using the OData Web API change tracking mechanism — the REST equivalent of the platform's RetrieveEntityChangesRequest.

New Crates

  • drasi-source-dataverse (components/sources/dataverse/) — Polling-based change detection source using OData delta links
  • drasi-bootstrap-dataverse (components/bootstrappers/dataverse/) — Initial data loader that fetches all existing records from configured entities

Key Features

Change Tracking

  • Uses Prefer: odata.track-changes headers and @odata.deltaLink tokens (Web API equivalent of RetrieveEntityChangesRequest with DataVersion)
  • Per-entity worker tasks with independent delta tokens
  • Delta tokens persisted to StateStore for crash recovery

Adaptive Backoff

  • Matches the platform's SyncWorker adaptive polling pattern
  • 500ms minimum → 30s maximum interval
  • 1.2× multiplier under 5s, 1.5× above 5s
  • Resets to minimum on detected changes

Authentication

  • Client credentials (OAuth2) — tenant_id + client_id + client_secret
  • Azure CLIaz account get-access-token for local development without secrets

Platform YAML Compatibility

  • Serde aliases for camelCase field names (endpoint, clientId, clientSecret, tenantId, useAzureCli)
  • entities field accepts both a JSON array and a comma-separated string

Configuration Example

apiVersion: v1
kind: Source
name: leads
spec:
  kind: Dataverse
  properties:
    endpoint: https://orgXXXXX.crm.dynamics.com/
    entities: lead
    clientId: <app-client-id>
    clientSecret: <app-client-secret>
    tenantId: <azure-ad-tenant-id>

Or with Azure CLI auth (no secrets):

apiVersion: v1
kind: Source
name: leads
spec:
  kind: Dataverse
  properties:
    endpoint: https://orgXXXXX.crm.dynamics.com/
    entities: lead
    useAzureCli: true

Files Added/Modified

New Files

Path Purpose
components/sources/dataverse/ Source crate (config, auth, types, client, lib)
components/bootstrappers/dataverse/ Bootstrap crate (config, lib)
components/sources/dataverse/AUTHENTICATION.md Auth method documentation
examples/lib/dataverse-getting-started/ Standalone example with auto-detected auth

Modified Files

Path Change
Cargo.toml Added both crates to workspace members

Testing

  • 43 unit tests across both crates covering config validation, serde aliases, entity set naming, change parsing, auth modes, builder patterns, and value conversion
  • 1 integration test (ignored, requires live Dataverse environment or wiremock)
  • All tests pass, build clean

Prerequisites for Dataverse Environment

  1. Change tracking must be enabled on target entities (Settings → Customization → Entity → Enable Change Tracking)
  2. For client credentials: Azure AD app registration with Dataverse API permissions + application user in the environment
  3. For Azure CLI: az login with a user that has Dataverse access

Type of change

  • This pull request fixes a bug in Drasi and has an approved issue (issue link required).
  • This pull request adds or changes features of Drasi and has an approved issue (issue link required).
  • This pull request is a minor refactor, code cleanup, test improvement, or other maintenance task and doesn't change the functionality of Drasi (issue link optional).

Fixes: #issue_number

Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
@ruokun-niu ruokun-niu requested a review from a team as a code owner March 3, 2026 22:46
@ruokun-niu ruokun-niu marked this pull request as draft March 3, 2026 22:46
"c",
"s",
"https://myorg.crm.dynamics.com",
"http://localhost:8080/token",

Check notice

Code scanning / devskim

Accessing localhost could indicate debug code, or could hinder scaling. Note

Do not leave debug code in production
"https://myorg.crm.dynamics.com",
"http://localhost:8080/token",
);
assert_eq!(tm.token_url, "http://localhost:8080/token");

Check notice

Code scanning / devskim

Accessing localhost could indicate debug code, or could hinder scaling. Note

Do not leave debug code in production
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
@ruokun-niu ruokun-niu marked this pull request as ready for review March 4, 2026 00:29
ruokun-niu and others added 6 commits March 3, 2026 16:29
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a new Microsoft Dataverse source plugin and bootstrap provider for Drasi, enabling continuous monitoring of Dataverse tables for data changes using OData Web API change tracking (delta links). It also extends AzureIdentityProvider with a with_client_secret constructor and updates the source planner agent docs.

Changes:

  • New drasi-source-dataverse crate with polling-based change detection using OData delta links, adaptive backoff, per-entity workers, and OAuth2/Azure CLI authentication
  • New drasi-bootstrap-dataverse crate for initial data loading from Dataverse entities
  • Added AzureIdentityProvider::with_client_secret() to the shared identity library

Reviewed changes

Copilot reviewed 19 out of 19 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
Cargo.toml Added both new crates to workspace members
lib/src/identity/azure.rs Added with_client_secret constructor and tests
components/sources/dataverse/Cargo.toml New crate manifest
components/sources/dataverse/src/config.rs Source configuration with serde aliases and validation
components/sources/dataverse/src/auth.rs Token manager with client credentials and Azure CLI support
components/sources/dataverse/src/types.rs OData delta response types and change parsing
components/sources/dataverse/src/client.rs HTTP client for Dataverse OData Web API
components/sources/dataverse/src/lib.rs Main source plugin with polling loop and builder
components/sources/dataverse/src/descriptor.rs Plugin descriptor for dynamic loading
components/sources/dataverse/tests/integration_test.rs Wiremock-based integration test (ignored)
components/sources/dataverse/README.md Source documentation
components/bootstrappers/dataverse/Cargo.toml New crate manifest
components/bootstrappers/dataverse/src/config.rs Bootstrap configuration with validation
components/bootstrappers/dataverse/src/lib.rs Bootstrap provider with pagination
components/bootstrappers/dataverse/src/descriptor.rs Plugin descriptor for dynamic loading
components/bootstrappers/dataverse/README.md Bootstrap documentation
.github/agents/source-planner.md Added authentication guidance for source plugins

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

@@ -0,0 +1,38 @@
[package]
@@ -0,0 +1,37 @@
[package]
Comment on lines +402 to +447
fn convert_json_value(value: &serde_json::Value) -> ElementValue {
match value {
serde_json::Value::Null => ElementValue::Null,
serde_json::Value::Bool(b) => ElementValue::Bool(*b),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
ElementValue::Integer(i)
} else if let Some(f) = n.as_f64() {
ElementValue::Float(ordered_float::OrderedFloat(f))
} else {
ElementValue::Null
}
}
serde_json::Value::String(s) => ElementValue::String(Arc::from(s.as_str())),
serde_json::Value::Array(arr) => {
if !arr.is_empty() {
if let Some(first_obj) = arr[0].as_object() {
if first_obj.contains_key("Value") {
let values: Vec<ElementValue> = arr
.iter()
.filter_map(|item| {
item.as_object()
.and_then(|obj| obj.get("Value"))
.map(convert_json_value)
})
.collect();
return ElementValue::List(values);
}
}
}
ElementValue::List(arr.iter().map(convert_json_value).collect())
}
serde_json::Value::Object(obj) => {
if obj.contains_key("Value") && obj.len() <= 2 {
if let Some(val) = obj.get("Value") {
return convert_json_value(val);
}
}
let mut map = ElementPropertyMap::new();
for (k, v) in obj {
map.insert(k, convert_json_value(v));
}
ElementValue::Object(map)
}
}
}
Comment on lines +132 to +135
// Track how many times the delta endpoint is called to sequence responses
let delta_call_count = Arc::new(AtomicUsize::new(0));
let delta_count_clone = delta_call_count.clone();

//! | `entities` | Vec\<String\> | required | Entity logical names to monitor |
//! | `entity_set_overrides` | HashMap\<String, String\> | `{}` | Override entity set name mapping |
//! | `entity_columns` | HashMap\<String, Vec...\> | `{}` | Per-entity column selection |
//! | `polling_interval_ms` | u64 | `5000` | Base polling interval |
Comment on lines +678 to +709
let shutdown_tx_clone = shutdown_tx.clone();
let source_id = self.base.id.clone();
let combined_handle = tokio::spawn(async move {
for (i, handle) in task_handles.into_iter().enumerate() {
if let Err(e) = handle.await {
log::error!("[{source_id}] Entity worker {i} terminated with error: {e}");
}
}
log::info!("[{source_id}] All entity workers stopped");
});

*self.base.task_handle.write().await = Some(combined_handle);

// Store shutdown_tx in a custom slot
// We abuse the oneshot field by creating a bridge
let (oneshot_tx, _oneshot_rx) = tokio::sync::oneshot::channel::<()>();
// When oneshot_tx is dropped (in stop()), we trigger shutdown
// Actually, let's use a different approach: store shutdown_tx in context
// For now, store it directly - we need the watch::Sender
{
// We'll trigger shutdown via the watch channel
// Store the sender so stop() can use it
let mut lock = self.base.shutdown_tx.write().await;
// Create a bridge: when the oneshot is sent, signal the watch
let shutdown_tx_for_stop = shutdown_tx.clone();
let (bridge_tx, bridge_rx) = tokio::sync::oneshot::channel::<()>();
tokio::spawn(async move {
let _ = bridge_rx.await;
let _ = shutdown_tx_for_stop.send(true);
});
*lock = Some(bridge_tx);
}

/// Dynamic plugin entry point.
///
/// Dynamic plugin entry point.
Comment on lines +805 to +806
///
/// Dynamic plugin entry point.
Comment on lines +166 to +177
let source = crate::DataverseSourceBuilder::new(id)
.with_environment_url(config.environment_url.clone())
.with_tenant_id(config.tenant_id.clone())
.with_client_id(config.client_id.clone())
.with_client_secret(config.client_secret.clone())
.with_entities(config.entities.clone())
.with_polling_interval_ms(config.polling_interval_ms)
.with_min_interval_ms(config.min_interval_ms)
.with_max_interval_seconds(config.max_interval_seconds)
.with_api_version(config.api_version.clone())
.with_auto_start(auto_start)
.build()?;
/// Azure AD client secret.
/// Required for client credentials flow, ignored when `use_azure_cli` is true.
#[serde(default, alias = "clientSecret")]
pub client_secret: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should wrap these in ConfigValue so that they can be mapped to env variables or secrets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fields are already wrapped in ConfigValue in descriptor.rs, which is the platform/dynamic plugin entry point. DtoMapper resolves env vars and secrets into plain strings, which are then stored here in config.rs. This matches the pattern used by the postgres source and bootstrap components.

/// - `min_interval_ms`: Minimum adaptive polling interval (default: 500)
/// - `max_interval_seconds`: Maximum adaptive polling interval (default: 30)
/// - `api_version`: Dataverse Web API version (default: `v9.2`)
#[derive(Debug, Clone, Serialize, Deserialize)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are internal domain objects, when do they need to be serialized / deserialized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. they dont need ot be serialized

api_version,
};

let source = crate::DataverseSourceBuilder::new(id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entity_set_overrides and entity_columns are being dropped here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I have added them back

entities: dto.entities,
entity_set_overrides: dto.entity_set_overrides,
entity_columns: dto.entity_columns,
polling_interval_ms,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

polling interval is never actually used in the polling loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

ruokun-niu and others added 5 commits March 13, 2026 09:23
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a new Microsoft Dataverse source plugin and bootstrap provider for Drasi, enabling continuous monitoring of Dataverse tables via OData Web API change tracking (the REST equivalent of RetrieveEntityChangesRequest). It also extends AzureIdentityProvider with a with_client_secret constructor and updates the source-planner agent documentation with authentication guidance.

Changes:

  • Two new crates: drasi-source-dataverse (polling-based CDC with adaptive backoff and per-entity workers) and drasi-bootstrap-dataverse (initial data loader with pagination)
  • New AzureIdentityProvider::with_client_secret() method in lib/src/identity/azure.rs for service-to-service OAuth2 authentication
  • Updated workspace Cargo.toml and source-planner agent docs

Reviewed changes

Copilot reviewed 19 out of 19 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
Cargo.toml Added both new crates to workspace members
lib/src/identity/azure.rs Added with_client_secret() constructor and tests
components/sources/dataverse/Cargo.toml New source crate definition
components/sources/dataverse/src/config.rs Source configuration with validation
components/sources/dataverse/src/types.rs OData delta response types and parsing
components/sources/dataverse/src/auth.rs TokenManager with client credentials and Azure CLI auth
components/sources/dataverse/src/client.rs HTTP client for OData change tracking API
components/sources/dataverse/src/lib.rs Main source impl with per-entity polling workers
components/sources/dataverse/src/descriptor.rs Dynamic plugin descriptor for the source
components/sources/dataverse/tests/integration_test.rs Wiremock-based integration test (ignored)
components/sources/dataverse/README.md Source documentation
components/sources/dataverse/Makefile Build/test/lint targets
components/bootstrappers/dataverse/Cargo.toml New bootstrap crate definition
components/bootstrappers/dataverse/src/config.rs Bootstrap configuration with validation
components/bootstrappers/dataverse/src/lib.rs Bootstrap provider with pagination and label filtering
components/bootstrappers/dataverse/src/descriptor.rs Dynamic plugin descriptor for bootstrap
components/bootstrappers/dataverse/README.md Bootstrap documentation
components/bootstrappers/dataverse/Makefile Build/test/lint targets
.github/agents/source-planner.md Auth guidance for source development

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +133 to +159
let delta_call_count = Arc::new(AtomicUsize::new(0));
let delta_count_clone = delta_call_count.clone();

// 2. Mount OAuth2 token mock (always available)
Mock::given(method("POST"))
.and(path_regex(r".*/oauth2/v2.0/token"))
.respond_with(token_response())
.expect(1..)
.mount(&mock_server)
.await;

// 3. Mount initial change tracking request (GET /api/data/v9.2/accounts with Prefer header)
let mock_uri_for_initial = mock_uri.clone();
Mock::given(method("GET"))
.and(path("/api/data/v9.2/accounts"))
.and(header("Prefer", "odata.track-changes"))
.respond_with(empty_delta_response(&mock_uri_for_initial))
.expect(1)
.mount(&mock_server)
.await;

// 4. Mount delta link endpoint - returns sequenced responses
// We use a custom responder that returns different responses based on call count
let mock_uri_for_delta = mock_uri.clone();
let mock_uri_for_delta2 = mock_uri.clone();
let mock_uri_for_delta3 = mock_uri.clone();
let mock_uri_for_delta4 = mock_uri.clone();
let mock_uri_for_initial = mock_uri.clone();
Mock::given(method("GET"))
.and(path("/api/data/v9.2/accounts"))
.and(header("Prefer", "odata.track-changes"))
Comment on lines +677 to +695
let shutdown_tx_clone = shutdown_tx.clone();
let source_id = self.base.id.clone();
let combined_handle = tokio::spawn(async move {
for (i, handle) in task_handles.into_iter().enumerate() {
if let Err(e) = handle.await {
log::error!("[{source_id}] Entity worker {i} terminated with error: {e}");
}
}
log::info!("[{source_id}] All entity workers stopped");
});

*self.base.task_handle.write().await = Some(combined_handle);

// Store shutdown_tx in a custom slot
// We abuse the oneshot field by creating a bridge
let (oneshot_tx, _oneshot_rx) = tokio::sync::oneshot::channel::<()>();
// When oneshot_tx is dropped (in stop()), we trigger shutdown
// Actually, let's use a different approach: store shutdown_tx in context
// For now, store it directly - we need the watch::Sender
@@ -0,0 +1,37 @@
[package]

/// Dynamic plugin entry point.
///
/// Dynamic plugin entry point.
Comment on lines +805 to +806
///
/// Dynamic plugin entry point.
Comment on lines +156 to +165
let source = crate::DataverseSourceBuilder::new(id)
.with_environment_url(config.environment_url.clone())
.with_tenant_id(config.tenant_id.clone())
.with_client_id(config.client_id.clone())
.with_client_secret(config.client_secret.clone())
.with_entities(config.entities.clone())
.with_min_interval_ms(config.min_interval_ms)
.with_max_interval_seconds(config.max_interval_seconds)
.with_api_version(config.api_version.clone())
.with_auto_start(auto_start);
@@ -0,0 +1,38 @@
[package]
ruokun-niu and others added 5 commits March 13, 2026 11:22
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
@ruokun-niu ruokun-niu requested a review from danielgerlag March 17, 2026 16:48
Signed-off-by: ruokun-niu <ruokunniu@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants