Skip to content

Commit 4035678

Browse files
authored
Idempotent CREATE POLICY (CAS retry on 409 version conflict) (#119)
Make CREATE POLICY idempotent so authors can use it inside [Migration(N, journal: false)] reconciliation migrations without hitting HTTP 409 every restart after the first. On 409: GET the existing policy, read _seq_no + _primary_term from response root, retry PUT with if_seq_no/if_primary_term query params. Second 409 is a hard fail (concurrent writer between GET and retry; the migration lock should make this rare). No behavior change when the policy doesn't exist. Mirrors LockHandle.RenewLockAsync's optimistic-concurrency pattern. Provider README and site doc CREATE POLICY sections document the idempotency contract. Integration test extends OpenSearchTemplatePolicyIntegrationTests with a create-twice-with-mutated-body scenario verifying the CAS retry path.
1 parent 4504c7c commit 4035678

4 files changed

Lines changed: 136 additions & 3 deletions

File tree

docs/site/opensearch.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,8 @@ Uploads the policy to `_plugins/_ism/policies` (or `_opendistro/_ism/policies` o
505505
}
506506
```
507507

508+
`CREATE POLICY` is **idempotent**. ISM versions policies internally, so a plain `PUT` to an already-existing policy returns HTTP 409 `version_conflict_engine_exception`. The dispatcher transparently handles this: on 409 it reads the current `_seq_no` and `_primary_term` from the existing policy and retries the `PUT` with `if_seq_no` / `if_primary_term` query parameters. The result is upsert semantics -- no behavior change when the policy doesn't exist; safe re-execution when it does. This makes `CREATE POLICY` usable inside `[Migration(N, journal: false)]` reconciliation migrations that re-run on every startup. A second 409 on the retry indicates a concurrent writer between the GET and the retry PUT and is surfaced as a hard failure (the migration lock should make this rare).
509+
508510
### APPLY POLICY (ISM)
509511

510512
```

src/Hyperbee.Migrations.Providers.OpenSearch/Internal/Dispatch/StatementDispatcher.cs

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -803,16 +803,91 @@ private async Task<StatementResult> DispatchCreatePolicyAsync( CreatePolicyAst a
803803
}
804804

805805
var body = context.ResolvedBody.ToJsonString();
806+
var policyPath = $"{IsmPathPrefix}/policies/{ast.PolicyId}";
806807

807808
// PUT /_plugins/_ism/policies/<id> — Index State Management policy.
809+
//
810+
// ISM policies are versioned in OpenSearch: a plain PUT to an
811+
// existing policy fails with HTTP 409 `version_conflict_engine_exception`.
812+
// Authors using the [Migration(N, journal: false)] reconciliation
813+
// pattern (see provider README's "Three temporal scopes for ISM
814+
// attachment") need CREATE POLICY to be idempotent so the policy
815+
// body can be upserted from source-of-truth on every startup.
816+
//
817+
// On 409: GET the existing policy, read `_seq_no` + `_primary_term`
818+
// from the response root (ISM surfaces these at the document root,
819+
// not under _source — unusual but documented), retry PUT with the
820+
// CAS query parameters. Mirrors LockHandle.RenewLockAsync's
821+
// optimistic-concurrency pattern.
808822

809-
var response = await ll.DoRequestAsync<StringResponse>(
823+
var firstResponse = await ll.DoRequestAsync<StringResponse>(
824+
global::OpenSearch.Net.HttpMethod.PUT,
825+
policyPath,
826+
context.CancellationToken,
827+
data: PostData.String( body ) ).ConfigureAwait( false );
828+
829+
if ( firstResponse.HttpStatusCode != 409 )
830+
return BuildResult( verb, firstResponse, $"policy `{ast.PolicyId}` created/updated" );
831+
832+
// 409 — read the current version to retry with CAS.
833+
var getResponse = await ll.DoRequestAsync<StringResponse>(
834+
global::OpenSearch.Net.HttpMethod.GET,
835+
policyPath,
836+
context.CancellationToken ).ConfigureAwait( false );
837+
838+
if ( !getResponse.Success || getResponse.Body is null )
839+
{
840+
return new StatementResult( StatementOutcome.Failed, verb,
841+
Detail: $"policy `{ast.PolicyId}` returned 409 on PUT but the follow-up GET to read _seq_no/_primary_term for CAS retry failed: HTTP {getResponse.HttpStatusCode}",
842+
OpenSearchResponseStatus: getResponse.HttpStatusCode,
843+
Exception: getResponse.OriginalException );
844+
}
845+
846+
long seqNo = 0;
847+
long primaryTerm = 0;
848+
try
849+
{
850+
using var doc = JsonDocument.Parse( getResponse.Body );
851+
if ( !doc.RootElement.TryGetProperty( "_seq_no", out var seqEl )
852+
|| !doc.RootElement.TryGetProperty( "_primary_term", out var termEl )
853+
|| !seqEl.TryGetInt64( out seqNo )
854+
|| !termEl.TryGetInt64( out primaryTerm ) )
855+
{
856+
return new StatementResult( StatementOutcome.Failed, verb,
857+
Detail: $"policy `{ast.PolicyId}` 409 conflict; CAS retry could not extract _seq_no/_primary_term from GET response (response did not contain the expected fields).",
858+
OpenSearchResponseStatus: getResponse.HttpStatusCode );
859+
}
860+
}
861+
catch ( JsonException ex )
862+
{
863+
return new StatementResult( StatementOutcome.Failed, verb,
864+
Detail: $"policy `{ast.PolicyId}` 409 conflict; CAS retry could not parse GET response body as JSON: {ex.Message}",
865+
OpenSearchResponseStatus: getResponse.HttpStatusCode,
866+
Exception: ex );
867+
}
868+
869+
// Retry the PUT with CAS query params. Inline rather than via a typed
870+
// IRequestParameters because there's no ISM-specific request-parameters
871+
// type in OpenSearch.Net (the endpoint is plugin-served).
872+
var retryPath = $"{policyPath}?if_seq_no={seqNo}&if_primary_term={primaryTerm}";
873+
var retryResponse = await ll.DoRequestAsync<StringResponse>(
810874
global::OpenSearch.Net.HttpMethod.PUT,
811-
$"{IsmPathPrefix}/policies/{ast.PolicyId}",
875+
retryPath,
812876
context.CancellationToken,
813877
data: PostData.String( body ) ).ConfigureAwait( false );
814878

815-
return BuildResult( verb, response, $"policy `{ast.PolicyId}` created/updated" );
879+
if ( retryResponse.HttpStatusCode == 409 )
880+
{
881+
// Second 409 means another writer beat us between the GET and
882+
// the retry PUT. The lock should make this rare; treat as hard
883+
// failure rather than recursing.
884+
return new StatementResult( StatementOutcome.Failed, verb,
885+
Detail: $"policy `{ast.PolicyId}` CAS retry hit a second 409 — concurrent writer between GET (_seq_no={seqNo}, _primary_term={primaryTerm}) and retry PUT.",
886+
OpenSearchResponseStatus: retryResponse.HttpStatusCode,
887+
Exception: retryResponse.OriginalException ?? new InvalidOperationException( $"concurrent writer on policy {ast.PolicyId}" ) );
888+
}
889+
890+
return BuildResult( verb, retryResponse, $"policy `{ast.PolicyId}` updated via CAS retry (seq={seqNo}, term={primaryTerm})" );
816891
}
817892

818893
// --- APPLY POLICY <id> TO <pattern> ---

src/Hyperbee.Migrations.Providers.OpenSearch/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,8 @@ The wildcard form of `APPLY POLICY` is the correct expression of "apply to whate
308308

309309
Caveat: `ism_template` inside a policy body is the modern endpoint shape. Older AWS-managed clusters served by the legacy `_opendistro/_ism` endpoint may not honor it; if `IsmEndpointDetectStep` resolves to the legacy endpoint, the greenfield row falls back to runtime `APPLY POLICY` (sample 4000's pattern, run once at install time, plus sample 9001's reconciliation pattern for ongoing changes). Modern OpenSearch (2.x and the modern AWS endpoint) supports `ism_template` natively.
310310

311+
`CREATE POLICY` is **idempotent**. ISM versions policies internally, so a plain `PUT` to an already-existing policy returns HTTP 409 `version_conflict_engine_exception`. The dispatcher transparently handles this: on 409 it reads the current `_seq_no` and `_primary_term` from the existing policy and retries the `PUT` with `if_seq_no` / `if_primary_term` query parameters. The result is upsert semantics — no behavior change when the policy doesn't exist; safe re-execution when it does. This makes `CREATE POLICY` usable inside `[Migration(N, journal: false)]` reconciliation migrations that re-run on every startup. A second 409 on the retry indicates a concurrent writer between the GET and the retry PUT and is surfaced as a hard failure (the migration lock should make this rare).
312+
311313
### Cluster waits
312314

313315
```

tests/Hyperbee.Migrations.Integration.Tests/OpenSearchTemplatePolicyIntegrationTests.cs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,60 @@ public async Task ApplyPolicy_ReportsAtLeastOneIndexUpdated()
256256
$"expected updated_indices >= 1; got: {result.Detail}" );
257257
}
258258

259+
[TestMethod]
260+
[TestCategory( "OpenSearch" )]
261+
[TestCategory( "Phase2" )]
262+
public async Task CreatePolicy_TwiceWithMutatedBody_SecondCallSucceedsViaCasRetry()
263+
{
264+
// CREATE POLICY is idempotent — re-running with a mutated body
265+
// upserts via a CAS retry on _seq_no/_primary_term. Required for
266+
// the [Migration(N, journal: false)] reconciliation pattern (see
267+
// provider README "Three temporal scopes for ISM attachment");
268+
// a non-idempotent CREATE POLICY would fail with HTTP 409 on
269+
// every re-run after the first.
270+
var firstBody = MinimalIsmPolicyBody();
271+
var firstResult = await DispatchAsync(
272+
$"CREATE POLICY {_policyId} WITH BODY $body", firstBody );
273+
Assert.IsTrue( firstResult.IsSuccess, $"first create failed: {firstResult.Detail}" );
274+
275+
// Mutate the description so we can verify the retry actually
276+
// overwrote the policy rather than no-op'd. A semantically
277+
// meaningful change (different default_state, additional state)
278+
// would also work; description is enough for the post-condition
279+
// and keeps the policy minimal.
280+
var mutatedBody = JsonNode.Parse( """
281+
{
282+
"policy": {
283+
"description": "test policy (mutated)",
284+
"default_state": "hot",
285+
"states": [
286+
{
287+
"name": "hot",
288+
"actions": [],
289+
"transitions": []
290+
}
291+
]
292+
}
293+
}
294+
""" );
295+
var secondResult = await DispatchAsync(
296+
$"CREATE POLICY {_policyId} WITH BODY $body", mutatedBody );
297+
298+
Assert.IsTrue( secondResult.IsSuccess,
299+
$"second create (CAS retry path) failed: {secondResult.Detail}" );
300+
StringAssert.Contains( secondResult.Detail!, "CAS retry",
301+
$"expected detail to indicate CAS retry path; got: {secondResult.Detail}" );
302+
303+
// Post-condition: GET shows the mutated description, confirming
304+
// the retry actually overwrote rather than silently succeeded.
305+
var ll = OpenSearchTestContainer.LowLevelClient;
306+
var get = await ll.DoRequestAsync<StringResponse>(
307+
OpenSearch.Net.HttpMethod.GET, $"_plugins/_ism/policies/{_policyId}", default );
308+
Assert.AreEqual( 200, get.HttpStatusCode );
309+
StringAssert.Contains( get.Body!, "test policy (mutated)",
310+
$"expected mutated description in policy body after CAS retry; got: {get.Body}" );
311+
}
312+
259313
[TestMethod]
260314
[TestCategory( "OpenSearch" )]
261315
[TestCategory( "Phase2" )]

0 commit comments

Comments
 (0)