Skip to content

Commit 0c9771a

Browse files
committed
try to improve machine utilization for batch runs
1 parent 0b9d569 commit 0c9771a

File tree

2 files changed

+77
-61
lines changed

2 files changed

+77
-61
lines changed

omnilink/.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
/*.duckdb
22
/*.wal
3-
/*.sqlite
3+
/*.sqlite
4+
/*.sqlite-journal

omnilink/package.mill

Lines changed: 75 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,23 @@ def rr: T[PathRef] = Task:
127127
)()
128128
end rr
129129

130+
// Weird trick: because this is all macro expansion, if someone uses Mill ops inside
131+
// fn, they get expanded after us, so they can't tell we skipped using Task.Anon
132+
private inline def inDBTransaction[T](
133+
inline fn: scalasql.core.DbApi.Txn => T,
134+
): T =
135+
// Using.resource(DBConcurrencyLimiter(concurrencyLimit = 16)): _ =>
136+
Using.resource(EvalDB.dbClient()): db =>
137+
db.transaction(fn)
138+
end inDBTransaction
139+
140+
def vacuumDB() = Task.Command(exclusive = true):
141+
Using.resource(ConcurrencyLimiter(concurrencyLimit = 1)): _ =>
142+
Using.resource(EvalDB.dbClient()): client =>
143+
val db = client.getAutoCommitClientConnection
144+
db.updateRaw("VACUUM;")
145+
end vacuumDB
146+
130147
trait EvalDBTrait:
131148
def dbPath: os.SubPath
132149
def createTablesSQL: os.SubPath
@@ -235,32 +252,28 @@ private trait ConcurrencyLimiter:
235252
protected def silent = false
236253
private var usageCount = 0
237254

238-
def apply(concurrencyLimit: Int): Handle =
255+
def apply(concurrencyLimit: Int, weight: Int = 1): Handle =
239256
concurrencyLimiter.synchronized:
240-
while usageCount >= concurrencyLimit
257+
while (usageCount + weight) > concurrencyLimit
241258
do concurrencyLimiter.wait()
242-
usageCount += 1
259+
usageCount += weight
243260
if !silent
244261
then println("got concurrency token")
245-
Handle()
262+
Handle(weight)
246263
end apply
247264

248-
final class Handle private[ConcurrencyLimiter] () extends AutoCloseable:
265+
final class Handle private[ConcurrencyLimiter] (weight: Int)
266+
extends AutoCloseable:
249267
def close(): Unit =
250268
concurrencyLimiter.synchronized:
251269
if !silent
252270
then println("giving up concurrency token")
253271
concurrencyLimiter.notifyAll()
254-
usageCount -= 1
272+
usageCount -= weight
255273
end Handle
256274
end ConcurrencyLimiter
257275

258-
private object ValidateConcurrencyLimiter extends ConcurrencyLimiter
259-
private object EnvConcurrencyLimiter extends ConcurrencyLimiter
260-
private object GatherTraceConcurrencyLimiter extends ConcurrencyLimiter
261-
private object DBConcurrencyLimiter extends ConcurrencyLimiter:
262-
override protected def silent = true
263-
end DBConcurrencyLimiter
276+
private object ConcurrencyLimiter extends ConcurrencyLimiter
264277

265278
trait TracingConfigModule extends Module:
266279
tracingConfig =>
@@ -289,16 +302,6 @@ trait TracingConfigModule extends Module:
289302
then os.copy.over(cmdsFile, dir / "compile_commands.json")
290303
end installCompileCommands
291304

292-
// Weird trick: because this is all macro expansion, if someone uses Mill ops inside
293-
// fn, they get expanded after us, so they can't tell we skipped using Task.Anon
294-
private inline def inDBTransaction[T](
295-
inline fn: scalasql.core.DbApi.Txn => T,
296-
): T =
297-
Using.resource(DBConcurrencyLimiter(concurrencyLimit = 16)): _ =>
298-
Using.resource(EvalDB.dbClient()): db =>
299-
db.transaction(fn)
300-
end inDBTransaction
301-
302305
trait ConfigModule extends Module:
303306
configModule =>
304307
def threadCount: Int
@@ -446,39 +449,40 @@ trait TracingConfigModule extends Module:
446449
end unpackTrace
447450

448451
def unpackValidationEnv = Task[PathRef]:
449-
Using.resource(EnvConcurrencyLimiter(concurrencyLimit = 4)): _ =>
450-
val tracesFolder = unpackTrace().path
451-
val spec = specToValidate().path
452-
val mcSpec = specToValidateMC().path
453-
val mcConfig = specToValidateMCConfig().path
454-
455-
build.omnilink.tool
456-
.runner()
457-
.run(
458-
List[os.Shellable](
459-
"gen-tla",
460-
spec,
461-
tracesFolder,
462-
"--dest-dir",
463-
Task.dest,
464-
),
465-
)
452+
Using.resource(ConcurrencyLimiter(concurrencyLimit = 12, weight = 1)):
453+
_ =>
454+
val tracesFolder = unpackTrace().path
455+
val spec = specToValidate().path
456+
val mcSpec = specToValidateMC().path
457+
val mcConfig = specToValidateMCConfig().path
458+
459+
build.omnilink.tool
460+
.runner()
461+
.run(
462+
List[os.Shellable](
463+
"gen-tla",
464+
spec,
465+
tracesFolder,
466+
"--dest-dir",
467+
Task.dest,
468+
),
469+
)
466470

467-
os.copy(from = mcSpec, to = Task.dest / mcSpec.last)
468-
os.copy(from = mcConfig, to = Task.dest / mcConfig.last)
469-
470-
build.omnilink.tool
471-
.runner()
472-
.run(
473-
List[os.Shellable](
474-
"showlog",
475-
"porcupine",
476-
tracesFolder,
477-
Task.dest / "porcupine_ops.json",
478-
),
479-
)
471+
os.copy(from = mcSpec, to = Task.dest / mcSpec.last)
472+
os.copy(from = mcConfig, to = Task.dest / mcConfig.last)
473+
474+
build.omnilink.tool
475+
.runner()
476+
.run(
477+
List[os.Shellable](
478+
"showlog",
479+
"porcupine",
480+
tracesFolder,
481+
Task.dest / "porcupine_ops.json",
482+
),
483+
)
480484

481-
PathRef(Task.dest)
485+
PathRef(Task.dest)
482486
end unpackValidationEnv
483487

484488
def discardTrace() = Task.Command(exclusive = true)[Unit]:
@@ -543,7 +547,7 @@ trait TracingConfigModule extends Module:
543547
end discardNegativeValidation
544548

545549
def gatherTrace() = Task.Command[Unit]:
546-
Using.resource(GatherTraceConcurrencyLimiter(concurrencyLimit = 6)):
550+
Using.resource(ConcurrencyLimiter(concurrencyLimit = 24, weight = 1)):
547551
_ =>
548552
val exe = tracingExecutable()
549553

@@ -646,10 +650,15 @@ trait TracingConfigModule extends Module:
646650
()
647651
end gatherTrace
648652

649-
def concurrencyLimit = Task.Input:
650-
Task.ctx().env.get("TLC_CONCURRENCY").fold(1)(_.toInt)
653+
def validateTrace() = Task.Command(exclusive = true):
654+
validateTraceImpl(concurrencyLimit = 1)()
655+
end validateTrace
656+
657+
def validateTracePar(concurrencyLimit: Int) = Task.Command:
658+
validateTraceImpl(concurrencyLimit)()
659+
end validateTracePar
651660

652-
def validateTrace() = Task.Command(exclusive = true)[Unit]:
661+
private def validateTraceImpl(concurrencyLimit: Int) = Task.Anon[Unit]:
653662
if inDBTransaction: db =>
654663
db.run:
655664
EvalDB.Validation.select
@@ -665,7 +674,13 @@ trait TracingConfigModule extends Module:
665674
.size === 0
666675
then println(s"no traces to validate for $traceId")
667676
else
668-
Using.resource(ValidateConcurrencyLimiter(concurrencyLimit())): _ =>
677+
val weight = 16
678+
Using.resource(
679+
ConcurrencyLimiter(
680+
concurrencyLimit = concurrencyLimit * weight,
681+
weight = weight,
682+
),
683+
): _ =>
669684
val validationDir = unpackValidationEnv().path
670685
val timeFile = Task.dest / "timeinfo.txt"
671686
os.copy.over(
@@ -758,7 +773,7 @@ trait TracingConfigModule extends Module:
758773
_.peakMemory := peakMemory,
759774
)
760775
()
761-
end validateTrace
776+
end validateTraceImpl
762777

763778
def porcupineValidateTrace() = Task.Command(exclusive = true)[Unit]:
764779
require(
@@ -773,7 +788,7 @@ trait TracingConfigModule extends Module:
773788
.size > 0
774789
then println(s"already validated trace $traceId")
775790
else
776-
Using.resource(ValidateConcurrencyLimiter(concurrencyLimit())): _ =>
791+
Using.resource(ConcurrencyLimiter(concurrencyLimit = 1)): _ =>
777792
val validationDir = unpackValidationEnv().path
778793
val timeFile = Task.dest / "timeinfo.txt"
779794
val startTime = LocalDateTime.now()

0 commit comments

Comments
 (0)