Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 115 additions & 45 deletions packages/alchemy/src/AWS/AutoScaling/AutoScalingGroup.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import * as autoscaling from "@distilled.cloud/aws/auto-scaling";
import * as Data from "effect/Data";
import * as Effect from "effect/Effect";
import * as Schedule from "effect/Schedule";
import { Unowned } from "../../AdoptPolicy.ts";
import { deepEqual, isResolved } from "../../Diff.ts";
import type { Input } from "../../Input.ts";
import { createPhysicalName } from "../../PhysicalName.ts";
import * as Provider from "../../Provider.ts";
import { Resource } from "../../Resource.ts";
import type { Providers } from "../Providers.ts";
import { createInternalTags, diffTags } from "../../Tags.ts";
import { createInternalTags, diffTags, hasAlchemyTags } from "../../Tags.ts";
import type { SubnetId } from "../EC2/Subnet.ts";
import type {
LaunchTemplateId,
Expand All @@ -17,6 +19,14 @@ import type {

export type AutoScalingGroupName = string;

class AutoScalingGroupNotReadyAfterCreate extends Data.TaggedError(
"AutoScalingGroupNotReadyAfterCreate",
) {}

class AutoScalingGroupStillExists extends Data.TaggedError(
"AutoScalingGroupStillExists",
) {}

export interface LaunchTemplateReference {
launchTemplateId?: Input<LaunchTemplateId>;
launchTemplateName?: Input<LaunchTemplateName>;
Expand Down Expand Up @@ -183,17 +193,37 @@ export const AutoScalingGroupProvider = () =>
const attached = newTargetGroupArns.filter((arn) => !oldSet.has(arn));

if (detached.length > 0) {
yield* autoscaling.detachLoadBalancerTargetGroups({
AutoScalingGroupName: autoScalingGroupName,
TargetGroupARNs: detached,
} as any);
yield* autoscaling
.detachLoadBalancerTargetGroups({
AutoScalingGroupName: autoScalingGroupName,
TargetGroupARNs: detached,
} as any)
.pipe(
Effect.retry({
while: (e) => e._tag === "ResourceContentionFault",
schedule: Schedule.fixed("2 seconds").pipe(
Schedule.both(Schedule.recurs(15)),
),
}),
);
}

if (attached.length > 0) {
yield* autoscaling.attachLoadBalancerTargetGroups({
AutoScalingGroupName: autoScalingGroupName,
TargetGroupARNs: attached,
} as any);
yield* autoscaling
.attachLoadBalancerTargetGroups({
AutoScalingGroupName: autoScalingGroupName,
TargetGroupARNs: attached,
} as any)
.pipe(
Effect.retry({
while: (e) =>
e._tag === "ResourceContentionFault" ||
e._tag === "InstanceRefreshInProgressFault",
schedule: Schedule.fixed("2 seconds").pipe(
Schedule.both(Schedule.recurs(15)),
),
}),
);
}
});

Expand Down Expand Up @@ -281,7 +311,13 @@ export const AutoScalingGroupProvider = () =>
const name =
output?.autoScalingGroupName ?? (yield* toName(id, olds ?? {}));
const group = yield* describeGroup(name);
return group ? toAttributes(group) : undefined;
if (!group) return undefined;
const attrs = toAttributes(group);
// Mark the resource as Unowned when alchemy tags are missing so
// the engine can gate adoption behind `adopt(true)`.
return (yield* hasAlchemyTags(id, attrs.tags))
? attrs
: Unowned(attrs);
}),
reconcile: Effect.fn(function* ({ id, news, output, session }) {
const autoScalingGroupName =
Expand Down Expand Up @@ -323,23 +359,24 @@ export const AutoScalingGroupProvider = () =>
Tags: toTags(autoScalingGroupName, desiredTags),
} as any)
.pipe(
Effect.catch((error: any) =>
error?._tag === "AlreadyExistsFault"
? Effect.void
: Effect.fail(error),
),
// Race: a peer reconciler created the ASG concurrently, or
// a previous reconciler crashed after Create succeeded but
// before persisting state. Fall through to the sync path.
Effect.catchTag("AlreadyExistsFault", () => Effect.void),
);

// Wait for `describeAutoScalingGroups` to return the new ASG.
// The ASG control plane is eventually consistent — only retry
// on the explicit "still missing" case. Other errors propagate.
existing = yield* describeGroup(autoScalingGroupName).pipe(
Effect.filterOrFail(
Boolean,
() =>
new Error(
`Auto Scaling Group '${autoScalingGroupName}' was not readable after create`,
),
Effect.flatMap((group) =>
group
? Effect.succeed(group)
: Effect.fail(new AutoScalingGroupNotReadyAfterCreate()),
),
Effect.retry({
while: () => true,
while: (e) =>
e._tag === "AutoScalingGroupNotReadyAfterCreate",
schedule: Schedule.recurs(8).pipe(
Schedule.both(Schedule.exponential("250 millis")),
),
Expand All @@ -349,20 +386,36 @@ export const AutoScalingGroupProvider = () =>

// Sync core ASG configuration — `updateAutoScalingGroup`
// overwrites min/max/desired/template/subnets/health-check
// settings in one call, so we issue it unconditionally
// (idempotent for matching values).
yield* autoscaling.updateAutoScalingGroup({
AutoScalingGroupName: autoScalingGroupName,
MinSize: news.minSize,
MaxSize: news.maxSize,
DesiredCapacity: news.desiredCapacity ?? news.minSize,
LaunchTemplate: launchTemplate,
VPCZoneIdentifier: (news.subnetIds as string[]).join(","),
HealthCheckType: healthCheckType,
HealthCheckGracePeriod: news.healthCheckGracePeriod,
DefaultCooldown: news.defaultCooldown,
TerminationPolicies: news.terminationPolicies,
} as any);
// settings in one call. AWS rejects concurrent updates with
// `ScalingActivityInProgressFault` (transient — a scale-in or
// scale-out is mid-flight) so we retry it. We never call this
// unconditionally on a freshly-created ASG that already lands
// with the right config.
if (existing) {
yield* autoscaling
.updateAutoScalingGroup({
AutoScalingGroupName: autoScalingGroupName,
MinSize: news.minSize,
MaxSize: news.maxSize,
DesiredCapacity: news.desiredCapacity ?? news.minSize,
LaunchTemplate: launchTemplate,
VPCZoneIdentifier: (news.subnetIds as string[]).join(","),
HealthCheckType: healthCheckType,
HealthCheckGracePeriod: news.healthCheckGracePeriod,
DefaultCooldown: news.defaultCooldown,
TerminationPolicies: news.terminationPolicies,
} as any)
.pipe(
Effect.retry({
while: (e) =>
e._tag === "ScalingActivityInProgressFault" ||
e._tag === "ResourceContentionFault",
schedule: Schedule.fixed("2 seconds").pipe(
Schedule.both(Schedule.recurs(15)),
),
}),
);
}

// Sync target groups — observed cloud attachments vs desired.
const observedAttrs = toAttributes(existing);
Expand Down Expand Up @@ -401,22 +454,39 @@ export const AutoScalingGroupProvider = () =>
return;
}

yield* autoscaling.deleteAutoScalingGroup({
AutoScalingGroupName: output.autoScalingGroupName,
ForceDelete: true,
} as any);
// `ForceDelete=true` lets AWS terminate any in-flight instances
// instead of refusing the call. `ScalingActivityInProgressFault`
// is transient — a scale activity raced our delete; retry.
yield* autoscaling
.deleteAutoScalingGroup({
AutoScalingGroupName: output.autoScalingGroupName,
ForceDelete: true,
} as any)
.pipe(
Effect.retry({
while: (e) =>
e._tag === "ScalingActivityInProgressFault" ||
e._tag === "ResourceInUseFault" ||
e._tag === "ResourceContentionFault",
schedule: Schedule.fixed("2 seconds").pipe(
Schedule.both(Schedule.recurs(30)),
),
}),
);

// Wait for the ASG to disappear from `describeAutoScalingGroups`
// — deletion is asynchronous (instances drain before the group
// record is removed). Bound the wait so we don't loop forever.
yield* describeGroup(output.autoScalingGroupName).pipe(
Effect.flatMap((group) =>
group
? Effect.fail(new Error("AutoScalingGroupStillExists"))
? Effect.fail(new AutoScalingGroupStillExists())
: Effect.void,
),
Effect.retry({
while: (error) =>
(error as Error).message === "AutoScalingGroupStillExists",
schedule: Schedule.recurs(12).pipe(
Schedule.both(Schedule.exponential("250 millis")),
while: (e) => e._tag === "AutoScalingGroupStillExists",
schedule: Schedule.fixed("2 seconds").pipe(
Schedule.both(Schedule.recurs(60)),
),
}),
);
Expand Down
Loading
Loading