Skip to content

Commit 90f2cb5

Browse files
committed
WIP: trace harvester
1 parent a68f93c commit 90f2cb5

File tree

11 files changed

+293
-68
lines changed

11 files changed

+293
-68
lines changed

distsys/mpcalctx.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ import (
44
"errors"
55
"fmt"
66
"log"
7+
"math"
8+
"math/rand"
79
"os"
810
"reflect"
911
"strings"
1012
"sync"
13+
"time"
1114

1215
"github.com/UBC-NSS/pgo/distsys/trace"
1316

@@ -100,6 +103,9 @@ type MPCalContext struct {
100103
jumpTable MPCalJumpTable
101104
procTable MPCalProcTable
102105

106+
concurrencyRand *rand.Rand
107+
disruptConcurrencyDuration time.Duration
108+
103109
eventState trace.EventState
104110
apparentResourceNames map[ArchetypeResourceHandle]string
105111

@@ -170,8 +176,7 @@ func NewMPCalContext(self tla.Value, archetype MPCalArchetype, configFns ...MPCa
170176
}
171177
ctx.iface = ArchetypeInterface{ctx: ctx}
172178

173-
traceDir, hasTraceDir := os.LookupEnv("PGO_TRACE_DIR")
174-
if hasTraceDir {
179+
if traceDir, ok := os.LookupEnv("PGO_TRACE_DIR"); ok {
175180
err := os.MkdirAll(traceDir, 0750)
176181
if err != nil {
177182
log.Fatalf("Could not ensure PGO_TRACE_DIR (%s): %v", traceDir, err)
@@ -185,6 +190,16 @@ func NewMPCalContext(self tla.Value, archetype MPCalArchetype, configFns ...MPCa
185190
ctx.eventState.Recorder = trace.MakeLocalFileRecorder(traceFile)
186191
}
187192

193+
pgoDisruptConcurrency := "PGO_DISRUPT_CONCURRENCY"
194+
if disruptConcurrency, ok := os.LookupEnv(pgoDisruptConcurrency); ok {
195+
duration, err := time.ParseDuration(disruptConcurrency)
196+
if err != nil {
197+
log.Fatalf("could not parse duration %s: %v", disruptConcurrency, err)
198+
}
199+
ctx.disruptConcurrencyDuration = duration
200+
ctx.concurrencyRand = rand.New(rand.NewSource(time.Now().UnixNano()))
201+
}
202+
188203
ctx.ensureArchetypeResource(".pc", NewLocalArchetypeResource(tla.MakeString(archetype.Label)))
189204
ctx.ensureArchetypeResource(".stack", NewLocalArchetypeResource(tla.MakeTuple()))
190205
for _, configFn := range configFns {
@@ -574,6 +589,14 @@ func (ctx *MPCalContext) Run() (err error) {
574589
default: // pass
575590
}
576591

592+
if ctx.concurrencyRand != nil {
593+
// Sleep randomly for an exponentially distributed duration, whose mean is equal to the configured duration.
594+
// This is helpful, because then we get to see extremely long sleeps occasionally, but many sleeps will be short.
595+
// If we had a more even distribution and allowed long sleeps, we would reliably fail timeouts and never make progress.
596+
sleepDuration := int(math.Round(ctx.concurrencyRand.ExpFloat64() * float64(ctx.disruptConcurrencyDuration)))
597+
time.Sleep(time.Duration(sleepDuration))
598+
}
599+
577600
ctx.eventState.BeginEvent()
578601
ctx.vclockSink.InitCriticalSection(ctx.archetype.Name, ctx.self)
579602

distsys/resources/tcpmailboxes.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -457,11 +457,6 @@ func (res *tcpMailboxesRemote) Commit() chan struct{} {
457457
if err != nil {
458458
continue
459459
}
460-
vclock := res.iface.GetVClockSink().GetVClock()
461-
err = res.connEncoder.Encode(&vclock)
462-
if err != nil {
463-
continue
464-
}
465460
var shouldResend bool
466461
err = res.connDecoder.Decode(&shouldResend)
467462
if err != nil {

src/pgo/PGo.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import scala.collection.mutable
3939
import scala.concurrent.duration.{Duration, MILLISECONDS}
4040
import scala.util.Using
4141
import scala.util.parsing.combinator.RegexParsers
42+
import scala.concurrent.duration
4243

4344
object PGo {
4445
given pathConverter: ValueConverter[os.Path] =
@@ -47,6 +48,10 @@ object PGo {
4748
scallop.listArgConverter(os.Path(_, os.pwd))
4849
given tlaValueConverter: ValueConverter[TLAValue] =
4950
scallop.singleArgConverter(TLAValue.parseFromString)
51+
given durationConverter: ValueConverter[duration.Duration] =
52+
scallop.singleArgConverter(duration.Duration.apply)
53+
given listOfDurationConverter: ValueConverter[List[duration.Duration]] =
54+
scallop.listArgConverter(duration.Duration.apply)
5055
given tlaValuePropsConverter: ValueConverter[Map[String, TLAValue]] =
5156
scallop.propsConverter(tlaValueConverter)
5257
given mpcalVariablesConverter
@@ -127,6 +132,24 @@ object PGo {
127132
)
128133
}
129134
addSubcommand(TraceGenCmd)
135+
object HarvestTracesCmd extends Subcommand("harvest-traces"):
136+
val folder =
137+
trailArg[os.Path](descr = "folder where the trace files live")
138+
val tracesSubFolder = opt[String](
139+
descr = "subfolder to store generated traces",
140+
default = Some("traces_found")
141+
)
142+
val rediscoveryThreshold = opt[Int](
143+
descr =
144+
"how many traces may be rediscovered before assuming saturation",
145+
default = Some(5)
146+
)
147+
val disruptionRanges = opt[List[duration.Duration]]("disruption-ranges")
148+
val victimCmd = trailArg[List[String]](descr =
149+
"command to launch the implementation code, specify after -- (will be launched repeatedly)"
150+
)
151+
end HarvestTracesCmd
152+
addSubcommand(HarvestTracesCmd)
130153

131154
// one of the subcommands must be passed
132155
addValidation {
@@ -405,6 +428,23 @@ object PGo {
405428
config.TraceGenCmd.modelValues().foldLeft(builder)(_.modelValue(_))
406429

407430
builder.generate(config.TraceGenCmd.traceFiles())
431+
case config.HarvestTracesCmd =>
432+
val folder = config.HarvestTracesCmd.folder()
433+
val disruptionRanges = config.HarvestTracesCmd.disruptionRanges()
434+
val victimCmd = config.HarvestTracesCmd.victimCmd()
435+
436+
println(folder)
437+
println(disruptionRanges)
438+
println(victimCmd)
439+
440+
tracing.HarvestTraces(
441+
folder = folder,
442+
durations = disruptionRanges,
443+
tracesSubFolderName = config.HarvestTracesCmd.tracesSubFolder(),
444+
rediscoveryThreshold =
445+
config.HarvestTracesCmd.rediscoveryThreshold(),
446+
victimCmd = victimCmd
447+
)
408448
}
409449
Nil
410450
} catch {
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package pgo.tracing
2+
3+
import scala.collection.mutable
4+
import scala.concurrent.duration
5+
import java.util.concurrent.TimeUnit
6+
7+
object HarvestTraces:
8+
private def toGoTimeStr(dur: duration.Duration): String =
9+
dur.unit match
10+
case TimeUnit.NANOSECONDS => s"${dur.length}ns"
11+
case TimeUnit.HOURS => s"${dur.length}h"
12+
case TimeUnit.MILLISECONDS => s"${dur.length}ms"
13+
case TimeUnit.MINUTES => s"${dur.length}m"
14+
case TimeUnit.SECONDS => s"${dur.length}s"
15+
case TimeUnit.MICROSECONDS | TimeUnit.DAYS =>
16+
throw IllegalArgumentException(s"unit ${dur.unit} not supported")
17+
18+
private def readTraceCollection(folder: os.Path): Set[String] =
19+
os.list(folder)
20+
.filter(_.last.endsWith(".log"))
21+
.map(os.read)
22+
.toSet
23+
24+
def apply(
25+
folder: os.Path,
26+
tracesSubFolderName: String,
27+
durations: List[duration.Duration],
28+
rediscoveryThreshold: Int,
29+
victimCmd: List[String]
30+
): Unit =
31+
require(durations.nonEmpty)
32+
val tmpDir = os.temp.dir()
33+
val proc = os.proc(victimCmd)
34+
35+
os.copy(
36+
from = folder,
37+
to = tmpDir,
38+
replaceExisting = true,
39+
mergeFolders = true
40+
)
41+
42+
// This is a hack that makes the problem of running this with local PGo build "go away"
43+
if os.exists(tmpDir / "go.mod")
44+
then
45+
val modContents = os.read(tmpDir / "go.mod")
46+
val specialReplace =
47+
s"replace github.com/UBC-NSS/pgo/distsys => ${os.pwd / "distsys"}"
48+
val fixedModContents = modContents.replace(
49+
"replace github.com/UBC-NSS/pgo/distsys => ../../distsys",
50+
specialReplace
51+
)
52+
os.write.over(target = tmpDir / "go.mod", data = fixedModContents)
53+
end if
54+
55+
val tracesSubFolder = folder / tracesSubFolderName
56+
os.makeDir.all(folder / tracesSubFolderName)
57+
val tracesSeen = mutable.HashMap[Set[String], os.Path]()
58+
os.list(tracesSubFolder)
59+
.filter(os.isDir)
60+
.foreach: dir =>
61+
val coll = readTraceCollection(dir)
62+
assert(
63+
!tracesSeen.contains(coll),
64+
s"$dir and ${tracesSeen(coll)} should not have the same contents"
65+
)
66+
tracesSeen.update(coll, dir)
67+
68+
durations.foreach: disruptionRange =>
69+
println(
70+
s"looking for traces using max timeout $disruptionRange, rediscovery threshold = $rediscoveryThreshold..."
71+
)
72+
var uninterruptedUniqueTracesFound = 0
73+
while uninterruptedUniqueTracesFound < rediscoveryThreshold do
74+
val traceDir = os.temp.dir(dir = tmpDir)
75+
val result = proc.call(
76+
cwd = tmpDir,
77+
env = Map(
78+
"PGO_DISRUPT_CONCURRENCY" -> toGoTimeStr(disruptionRange),
79+
"PGO_TRACE_DIR" -> traceDir.toString
80+
),
81+
mergeErrIntoOut = true,
82+
stdout = os.Inherit
83+
)
84+
val traces = readTraceCollection(traceDir)
85+
86+
tracesSeen.get(traces) match
87+
case None =>
88+
val keepDir = os.temp.dir(tracesSubFolder, deleteOnExit = false)
89+
os.copy(
90+
from = traceDir,
91+
to = keepDir,
92+
replaceExisting = true,
93+
mergeFolders = true
94+
)
95+
tracesSeen.update(traces, keepDir)
96+
uninterruptedUniqueTracesFound = 0
97+
println(s"found new trace: $keepDir")
98+
case Some(existingDir) =>
99+
uninterruptedUniqueTracesFound += 1
100+
println(s"rediscovered existing trace: $existingDir")
101+
end while
102+
println(s"reached rediscovery threshold of $rediscoveryThreshold traces.")
103+
println(s"search finished for all max timeouts given.")
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package pgo.tracing
2+
3+
import pgo.model.mpcal.*
4+
import pgo.model.tla.*
5+
6+
object InferFromMPCal:
7+
def apply(mpcalBlock: MPCalBlock, tlaModule: TLAModule): JSONToTLA =
8+
???

src/pgo/tracing/JSONToTLA.scala

Lines changed: 26 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -23,92 +23,60 @@ final class JSONToTLA private (
2323
val constantDefns: Map[String, String],
2424
val modelValues: Set[String]
2525
):
26-
def renameLabel(mpcalName: String, labelName: String): JSONToTLA =
26+
def copy(
27+
modelName: String = modelName,
28+
destDir: os.Path = destDir,
29+
tlaExtends: List[String] = tlaExtends,
30+
actionRenamings: Map[String, String] = actionRenamings,
31+
mpcalVariableDefns: Map[String, MPCalVariable] = mpcalVariableDefns,
32+
modelVariableDefns: Set[String] = modelVariableDefns,
33+
constantDefns: Map[String, String] = constantDefns,
34+
modelValues: Set[String] = modelValues
35+
): JSONToTLA =
2736
new JSONToTLA(
2837
modelName = modelName,
2938
destDir = destDir,
3039
tlaExtends = tlaExtends,
31-
actionRenamings = actionRenamings.updated(mpcalName, labelName),
40+
actionRenamings = actionRenamings,
3241
mpcalVariableDefns = mpcalVariableDefns,
3342
modelVariableDefns = modelVariableDefns,
3443
constantDefns = constantDefns,
3544
modelValues = modelValues
3645
)
3746

47+
def renameLabel(mpcalName: String, labelName: String): JSONToTLA =
48+
copy(actionRenamings = actionRenamings.updated(modelName, labelName))
49+
3850
def modelVariable(name: String): JSONToTLA =
39-
new JSONToTLA(
40-
modelName = modelName,
41-
destDir = destDir,
42-
tlaExtends = tlaExtends,
43-
actionRenamings = actionRenamings,
44-
mpcalVariableDefns = mpcalVariableDefns,
45-
modelVariableDefns = modelVariableDefns + name,
46-
constantDefns = constantDefns,
47-
modelValues = modelValues
48-
)
51+
copy(modelVariableDefns = modelVariableDefns + name)
4952

5053
def mpcalLocal(mpcalName: String, tlaName: String): JSONToTLA =
51-
new JSONToTLA(
52-
modelName = modelName,
53-
destDir = destDir,
54-
tlaExtends = tlaExtends,
55-
actionRenamings = actionRenamings,
54+
copy(
5655
mpcalVariableDefns =
5756
mpcalVariableDefns.updated(mpcalName, MPCalVariable.Local(tlaName)),
58-
modelVariableDefns = modelVariableDefns + tlaName,
59-
constantDefns = constantDefns,
60-
modelValues = modelValues
57+
modelVariableDefns = modelVariableDefns + tlaName
6158
)
6259

6360
def mpcalGlobal(mpcalName: String, tlaName: String): JSONToTLA =
64-
new JSONToTLA(
65-
modelName = modelName,
66-
destDir = destDir,
67-
tlaExtends = tlaExtends,
68-
actionRenamings = actionRenamings,
61+
copy(
6962
mpcalVariableDefns =
7063
mpcalVariableDefns.updated(mpcalName, MPCalVariable.Global(tlaName)),
71-
modelVariableDefns = modelVariableDefns + tlaName,
72-
constantDefns = constantDefns,
73-
modelValues = modelValues
64+
modelVariableDefns = modelVariableDefns + tlaName
7465
)
7566

7667
def mpcalMacro(mpcalName: String, tlaOperatorPrefix: String): JSONToTLA =
77-
new JSONToTLA(
78-
modelName = modelName,
79-
destDir = destDir,
80-
tlaExtends = tlaExtends,
81-
actionRenamings = actionRenamings,
82-
mpcalVariableDefns = mpcalVariableDefns
83-
.updated(mpcalName, MPCalVariable.Mapping(tlaOperatorPrefix)),
84-
modelVariableDefns = modelVariableDefns,
85-
constantDefns = constantDefns,
86-
modelValues = modelValues
68+
copy(mpcalVariableDefns =
69+
mpcalVariableDefns.updated(
70+
mpcalName,
71+
MPCalVariable.Mapping(tlaOperatorPrefix)
72+
)
8773
)
8874

8975
def tlaConstant(name: String, value: String): JSONToTLA =
90-
new JSONToTLA(
91-
modelName = modelName,
92-
destDir = destDir,
93-
tlaExtends = tlaExtends,
94-
actionRenamings = actionRenamings,
95-
mpcalVariableDefns = mpcalVariableDefns,
96-
modelVariableDefns = modelVariableDefns,
97-
constantDefns = constantDefns.updated(name, value),
98-
modelValues = modelValues
99-
)
76+
copy(constantDefns = constantDefns.updated(name, value))
10077

10178
def modelValue(name: String): JSONToTLA =
102-
new JSONToTLA(
103-
modelName = modelName,
104-
destDir = destDir,
105-
tlaExtends = tlaExtends,
106-
actionRenamings = actionRenamings,
107-
mpcalVariableDefns = mpcalVariableDefns,
108-
modelVariableDefns = modelVariableDefns,
109-
constantDefns = constantDefns,
110-
modelValues = modelValues + name
111-
)
79+
copy(modelValues = modelValues + name)
11280

11381
private def getLabelNameFromValue(value: String): String =
11482
val mpcalLabelName = value.stripPrefix("\"").stripSuffix("\"")
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
---- MODULE dqueueValidateDefns ----
2+
EXTENDS Sequences, Integers
3+
4+
CONSTANT BUFFER_SIZE
5+
6+
network_read(state, self, idx, value, __rest(_)) ==
7+
/\ state.network[idx] # <<>>
8+
/\ Head(state.network[idx]) = value
9+
/\ __rest([state EXCEPT !.network[idx] = Tail(@)])
10+
11+
network_write(state, self, idx, value, __rest(_)) ==
12+
__rest([state EXCEPT !.network[idx] = Append(@, value)])
13+
14+
stream_read(state, self, value, __rest(_)) ==
15+
LET state2 == [state EXCEPT !.stream = (@ + 1) % BUFFER_SIZE]
16+
IN /\ value = state2.stream
17+
/\ __rest(state2)
18+
19+
====

0 commit comments

Comments
 (0)