Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 103 additions & 61 deletions packages/alchemy/src/AWS/Scheduler/ScheduleGroup.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import * as scheduler from "@distilled.cloud/aws/scheduler";
import * as Effect from "effect/Effect";
import * as Schedule_ from "effect/Schedule";
import { Unowned } from "../../AdoptPolicy.ts";
import { isResolved } from "../../Diff.ts";
import { createPhysicalName } from "../../PhysicalName.ts";
import * as Provider from "../../Provider.ts";
import { Resource } from "../../Resource.ts";
import type { Providers } from "../Providers.ts";
import {
createInternalTags,
createTagsList,
diffTags,
hasTags,
hasAlchemyTags,
} from "../../Tags.ts";
import type { Providers } from "../Providers.ts";

export interface ScheduleGroupProps {
/**
Expand All @@ -29,6 +31,10 @@ export interface ScheduleGroupProps {
* Schedule groups provide a namespace for schedules so higher-level helpers can
* organize recurring jobs separately from one-shot or operational schedules.
*
* Unlike individual schedules, schedule groups DO support tagging, so alchemy
* brands its groups with internal tags and uses tag presence to decide whether
* a foreign group can be adopted (with `--adopt`/`adopt(true)`).
*
* @section Creating Schedule Groups
* @example Basic Group
* ```typescript
Expand All @@ -55,6 +61,26 @@ export const ScheduleGroup = Resource<ScheduleGroup>(
"AWS.Scheduler.ScheduleGroup",
);

/**
* Bounded-retry on `ConflictException`. EventBridge Scheduler returns
* `ConflictException` when a group is in the transient `DELETING` /
* `CREATING` state or when a concurrent mutation is in flight; the API
* itself does not classify the error as retryable, so the provider retries
* locally rather than tagging the distilled error as `RetryableError`
* (which would also cover genuine name-collision conflicts).
*/
const conflictRetry = <A, E extends { _tag: string }, R>(
eff: Effect.Effect<A, E, R>,
) =>
eff.pipe(
Effect.retry({
while: (e) => e._tag === "ConflictException",
schedule: Schedule_.spaced("2 seconds").pipe(
Schedule_.both(Schedule_.recurs(15)),
),
}),
);

export const ScheduleGroupProvider = () =>
Provider.effect(
ScheduleGroup,
Expand All @@ -79,9 +105,7 @@ export const ScheduleGroupProvider = () =>
const scheduleGroupName =
output?.scheduleGroupName ?? (yield* toName(id, olds));
const described = yield* scheduler
.getScheduleGroup({
Name: scheduleGroupName,
})
.getScheduleGroup({ Name: scheduleGroupName })
.pipe(
Effect.catchTag("ResourceNotFoundException", () =>
Effect.succeed(undefined),
Expand All @@ -92,11 +116,32 @@ export const ScheduleGroupProvider = () =>
return undefined;
}

return {
const attrs = {
scheduleGroupArn: described.Arn,
scheduleGroupName: described.Name,
state: described.State,
};

// Probe tags. Schedule groups support tagging, so a foreign
// group can be detected by absence of our internal brand. If
// tagging is unavailable (rare — `ResourceNotFoundException`
// race), conservatively treat as unowned.
const tagsResp = yield* scheduler
.listTagsForResource({ ResourceArn: described.Arn })
.pipe(
Effect.map((r) =>
Object.fromEntries(
(r.Tags ?? []).map((t) => [t.Key, t.Value]),
),
),
Effect.catchTag("ResourceNotFoundException", () =>
Effect.succeed({} as Record<string, string>),
),
);

return (yield* hasAlchemyTags(id, tagsResp))
? attrs
: Unowned(attrs);
}),
reconcile: Effect.fn(function* ({ id, news, output, session }) {
const scheduleGroupName =
Expand All @@ -105,70 +150,53 @@ export const ScheduleGroupProvider = () =>
const desiredTags = { ...internalTags, ...news.tags };

// Observe — fetch live group; gracefully handle missing.
let observed = yield* scheduler
const observed = yield* scheduler
.getScheduleGroup({ Name: scheduleGroupName })
.pipe(
Effect.catchTag("ResourceNotFoundException", () =>
Effect.succeed(undefined),
),
);

// Ensure — create if missing. Tolerate `ConflictException` as a
// race or adoption case: verify ownership via tags before
// continuing.
if (!observed?.Arn) {
yield* scheduler
// Ensure — create if missing. Bounded-retry `ConflictException`
// because a concurrent destroy may have left the group in
// `DELETING` and AWS rejects re-creation until the deletion
// completes. After a `ConflictException` window expires, the
// create attempt itself succeeds.
let groupArn: string | undefined = observed?.Arn;
if (!groupArn) {
groupArn = yield* scheduler
.createScheduleGroup({
Name: scheduleGroupName,
Tags: createTagsList(desiredTags),
})
.pipe(
Effect.catchTag("ConflictException", () =>
scheduler.getScheduleGroup({ Name: scheduleGroupName }).pipe(
Effect.flatMap((existing) =>
existing.Arn
? scheduler
.listTagsForResource({ ResourceArn: existing.Arn })
.pipe(
Effect.filterOrFail(
({ Tags }) => hasTags(internalTags, Tags),
() =>
new Error(
`ScheduleGroup '${scheduleGroupName}' already exists and is not managed by alchemy`,
),
),
Effect.asVoid,
)
: Effect.fail(
new Error(
`ScheduleGroup '${scheduleGroupName}' already exists but could not be described`,
),
),
),
),
),
conflictRetry,
Effect.map((r) => r.ScheduleGroupArn),
);
observed = yield* scheduler
}

if (!groupArn) {
// Re-read in case the create response did not echo the ARN.
const reread = yield* scheduler
.getScheduleGroup({ Name: scheduleGroupName })
.pipe(
Effect.catchTag("ResourceNotFoundException", () =>
Effect.succeed(undefined),
),
);
if (!reread?.Arn) {
return yield* Effect.fail(
new Error(
`Failed to read created ScheduleGroup '${scheduleGroupName}'`,
),
);
}
groupArn = reread.Arn;
}

if (!observed?.Arn) {
return yield* Effect.fail(
new Error(
`Failed to read created ScheduleGroup '${scheduleGroupName}'`,
),
);
}

const groupArn = observed.Arn;

// Sync tags — diff observed cloud tags against desired so
// adoption rewrites ownership tags.
// Sync tags — diff against observed cloud tags so adoption
// rewrites ownership tags and out-of-band tag drift converges.
const observedTagsResp = yield* scheduler
.listTagsForResource({ ResourceArn: groupArn })
.pipe(
Expand All @@ -182,32 +210,46 @@ export const ScheduleGroupProvider = () =>
const { removed, upsert } = diffTags(observedTags, desiredTags);

if (removed.length > 0) {
yield* scheduler.untagResource({
ResourceArn: groupArn,
TagKeys: removed,
});
yield* scheduler
.untagResource({
ResourceArn: groupArn,
TagKeys: removed,
})
.pipe(conflictRetry);
}
if (upsert.length > 0) {
yield* scheduler.tagResource({
ResourceArn: groupArn,
Tags: upsert,
});
yield* scheduler
.tagResource({
ResourceArn: groupArn,
Tags: upsert,
})
.pipe(conflictRetry);
}

// Re-read final state so we return the cloud's authoritative
// `State` (e.g. `ACTIVE`) rather than guess.
const finalState = yield* scheduler
.getScheduleGroup({ Name: scheduleGroupName })
.pipe(
Effect.map((r) => r.State),
Effect.catchTag("ResourceNotFoundException", () =>
Effect.succeed(undefined),
),
);

yield* session.note(groupArn);

return {
scheduleGroupArn: groupArn,
scheduleGroupName,
state: observed?.State,
state: finalState,
};
}),
delete: Effect.fn(function* ({ output }) {
yield* scheduler
.deleteScheduleGroup({
Name: output.scheduleGroupName,
})
.deleteScheduleGroup({ Name: output.scheduleGroupName })
.pipe(
conflictRetry,
Effect.catchTag("ResourceNotFoundException", () => Effect.void),
);
}),
Expand Down
Loading
Loading