Skip to content

Commit 9a2b790

Browse files
committed
feat: import firework.bootstrap
1 parent ee698de commit 9a2b790

10 files changed

Lines changed: 677 additions & 14 deletions

File tree

_bootstrap/_resolve.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING, Iterable
4+
5+
if TYPE_CHECKING:
6+
from .service import Service
7+
8+
9+
class RequirementResolveFailed(Exception):
10+
pass
11+
12+
13+
class DependencyBrokenError(Exception):
14+
pass
15+
16+
17+
def _build_dependencies_map(services: Iterable[Service]) -> dict[str, set[str]]:
18+
dependencies_map: dict[str, set[str]] = {}
19+
20+
for service in services:
21+
dependencies_map[service.id] = set(service.dependencies) | set(service.after)
22+
23+
for before in service.before:
24+
dependencies_map.setdefault(before, set()).add(service.id)
25+
26+
return dependencies_map
27+
28+
29+
def resolve_dependencies(
30+
services: Iterable[Service],
31+
exclude: Iterable[Service] = (),
32+
*,
33+
reverse: bool = False,
34+
) -> list[list[str]]:
35+
services = list(services)
36+
37+
dependencies_map = _build_dependencies_map(services)
38+
39+
unresolved = {s.id: s for s in services}
40+
resolved_id = {i.id for i in exclude}
41+
result: list[list[str]] = []
42+
43+
while unresolved:
44+
layer_candidates = [service for service in unresolved.values() if resolved_id.issuperset(dependencies_map[service.id])]
45+
46+
if not layer_candidates:
47+
raise TypeError("Failed to resolve requirements due to cyclic dependencies or unmet constraints.")
48+
49+
# 根据是否有 before 约束进行分类
50+
befores = []
51+
no_befores = []
52+
53+
for service in layer_candidates:
54+
if service.before:
55+
befores.append(service)
56+
else:
57+
no_befores.append(service)
58+
59+
# 优先无 before 的服务,一旦无 before 的服务存在,就先放这一层
60+
current_layer = no_befores or befores
61+
62+
# 从未解决中移除当前层的服务
63+
for cid in current_layer:
64+
del unresolved[cid]
65+
66+
resolved_id.update(current_layer)
67+
result.append(current_layer)
68+
69+
if reverse:
70+
result.reverse()
71+
72+
return result
73+
74+
75+
def validate_services_removal(existed: Iterable[Service], services_to_remove: Iterable[Service]):
76+
graph = {service.id: set() for service in existed}
77+
78+
for service, deps in _build_dependencies_map(existed).items():
79+
for dep in deps:
80+
if dep in graph:
81+
graph[dep].add(service)
82+
83+
to_remove = {service.id for service in services_to_remove}
84+
85+
for node in to_remove:
86+
for dependent in graph.get(node, ()):
87+
if dependent not in to_remove:
88+
raise DependencyBrokenError(f"Cannot remove node '{node}' because node '{dependent}' depends on it.")

_bootstrap/context.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from contextlib import asynccontextmanager
5+
from dataclasses import dataclass
6+
from typing import TYPE_CHECKING
7+
8+
from .status import Phase, ServiceStatusValue, Stage
9+
10+
if TYPE_CHECKING:
11+
from .core import Bootstrap
12+
13+
14+
@dataclass
15+
class ServiceContext:
16+
bootstrap: Bootstrap
17+
18+
def __post_init__(self):
19+
self._status: ServiceStatusValue = (Stage.EXIT, Phase.WAITING)
20+
self._sigexit = asyncio.Event()
21+
self._notify = asyncio.Event()
22+
23+
def _forward(self, stage: Stage, phase: Phase):
24+
prev_stage, prev_phase = self._status
25+
26+
if stage < prev_stage and prev_stage != Stage.EXIT:
27+
raise ValueError(f"Cannot update stage from {prev_stage} to {stage}")
28+
29+
if stage == prev_stage:
30+
if phase <= prev_phase:
31+
raise ValueError(f"Cannot update phase from {prev_phase} to {phase}")
32+
else:
33+
phase = Phase.WAITING
34+
35+
self._status = (stage, phase)
36+
37+
self._notify.set()
38+
self._notify.clear()
39+
40+
@property
41+
def should_exit(self):
42+
return self._sigexit.is_set()
43+
44+
async def wait_for(self, stage: Stage, phase: Phase):
45+
val = (stage, phase)
46+
47+
while val > self._status:
48+
await self._notify.wait()
49+
50+
async def wait_for_sigexit(self):
51+
await self._sigexit.wait()
52+
53+
@asynccontextmanager
54+
async def prepare(self):
55+
self._forward(Stage.PREPARE, Phase.WAITING)
56+
await self.wait_for(Stage.PREPARE, Phase.PENDING)
57+
yield
58+
self._forward(Stage.PREPARE, Phase.COMPLETED)
59+
60+
@asynccontextmanager
61+
async def online(self):
62+
self._forward(Stage.ONLINE, Phase.WAITING)
63+
await self.wait_for(Stage.ONLINE, Phase.PENDING)
64+
yield
65+
self._forward(Stage.ONLINE, Phase.COMPLETED)
66+
67+
@asynccontextmanager
68+
async def cleanup(self):
69+
self._forward(Stage.CLEANUP, Phase.WAITING)
70+
await self.wait_for(Stage.CLEANUP, Phase.PENDING)
71+
yield
72+
self._forward(Stage.CLEANUP, Phase.COMPLETED)
73+
74+
def dispatch_prepare(self):
75+
self._forward(Stage.PREPARE, Phase.PENDING)
76+
77+
def dispatch_online(self):
78+
self._forward(Stage.ONLINE, Phase.PENDING)
79+
80+
def dispatch_cleanup(self):
81+
self._forward(Stage.CLEANUP, Phase.PENDING)
82+
83+
def exit(self):
84+
"""Call by the manager"""
85+
self._sigexit.set()
86+
87+
def exit_complete(self):
88+
"""Call by the manager"""
89+
self._status = (Stage.EXIT, Phase.COMPLETED)

0 commit comments

Comments
 (0)