-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathcron.go
More file actions
149 lines (125 loc) · 4.87 KB
/
cron.go
File metadata and controls
149 lines (125 loc) · 4.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package main
import (
"fmt"
"time"
restate "github.com/restatedev/sdk-go"
"github.com/robfig/cron/v3"
)
// JobRequest represents the structure for creating a cron job
type JobRequest struct {
CronExpression string `json:"cronExpression"` // The cron expression e.g. "0 0 * * *" (every day at midnight)
Service string `json:"service"`
Method string `json:"method"` // Handler to execute with this schedule
Key string `json:"key,omitempty"` // Optional: Virtual Object key of task to call
Payload string `json:"payload,omitempty"` // Optional payload to pass to the handler
}
// JobInfo represents the stored job information
type JobInfo struct {
Request JobRequest `json:"request"`
NextExecutionTime time.Time `json:"next_execution_time"`
NextExecutionID string `json:"next_execution_id"`
}
const JOB_STATE = "job-state" // K/V state key for storing job info in the Restate
// CronJobInitiator service for creating new cron jobs
//
// A distributed cron service built with Restate that schedules tasks based on cron expressions.
//
// Features:
// - Create cron jobs with standard cron expressions (e.g., "0 0 * * *" for daily at midnight)
// - Schedule any Restate service handler or virtual object method
// - Guaranteed execution with Restate's durability
// - Cancel and inspect running jobs
//
// Usage:
// 1. Send requests to CronJobInitiator.Create() to start new jobs
// 2. Each job gets a unique ID and runs as a CronJob virtual object
// 3. Jobs automatically reschedule themselves after each execution
type CronJobInitiator struct{}
func (CronJobInitiator) Create(ctx restate.Context, req JobRequest) (string, error) {
// Create a new job ID and initiate the cron job object for that ID
// We can then address this job object by its ID
jobID := restate.UUID(ctx).String()
fmt.Printf("Creating new cron job with ID %s for service %s and method %s", jobID, req.Service, req.Method)
job, err := restate.Object[*JobInfo](ctx, "CronJob", jobID, "Initiate").Request(req)
if err != nil {
return "", err
}
return fmt.Sprintf("Job created with ID %s and next execution time %s",
jobID, job.NextExecutionTime.Format(time.RFC3339)), nil
}
type CronJob struct{}
func (CronJob) Initiate(ctx restate.ObjectContext, req JobRequest) (*JobInfo, error) {
// Check if jobState already exists
jobState, err := restate.Get[*JobInfo](ctx, JOB_STATE)
if err != nil {
return nil, err
}
if jobState != nil {
return nil, restate.TerminalErrorf("jobState already exists for this ID")
}
return scheduleNextExecution(ctx, req)
}
func (CronJob) Execute(ctx restate.ObjectContext) error {
// Get the job information
jobState, err := restate.Get[*JobInfo](ctx, JOB_STATE)
if err != nil {
return err
}
if jobState == nil {
return restate.TerminalErrorf("job not found")
}
// Add key if it's a virtual object call
req := jobState.Request
fmt.Printf("Executing job with ID: %s for service %s for method %s", restate.Key(ctx), req.Service, req.Method)
if req.Key != "" {
restate.ObjectSend(ctx, req.Service, req.Key, req.Method).Send(req.Payload)
} else {
restate.ServiceSend(ctx, req.Service, req.Method).Send(req.Payload)
}
// Schedule the next execution
_, err = scheduleNextExecution(ctx, req)
return err
}
func (CronJob) Cancel(ctx restate.ObjectContext) error {
// Get the job to cancel the next execution
job, err := restate.Get[*JobInfo](ctx, JOB_STATE)
if err != nil {
return err
}
if job == nil {
return restate.TerminalError(fmt.Errorf("job not found for cancellation"), 404)
}
restate.CancelInvocation(ctx, job.NextExecutionID)
restate.ClearAll(ctx)
return nil
}
func (CronJob) GetInfo(ctx restate.ObjectSharedContext) (*JobInfo, error) {
return restate.Get[*JobInfo](ctx, JOB_STATE)
}
// scheduleNextExecution calculates and schedules the next execution of the cron job
func scheduleNextExecution(ctx restate.ObjectContext, req JobRequest) (*JobInfo, error) {
// Parse cron expression
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
schedule, err := parser.Parse(req.CronExpression)
if err != nil {
return nil, restate.TerminalErrorf("invalid cron expression: %v", err)
}
// Get current time deterministically from Restate
currentTime, _ := restate.Run(ctx, func(ctx restate.RunContext) (time.Time, error) {
return time.Now(), nil
})
// Calculate next execution time
nextTime := schedule.Next(currentTime)
delay := nextTime.Sub(currentTime)
// Schedule next execution for this job
thisJobID := restate.Key(ctx) // This got generated by the CronJobInitiator
handle := restate.ObjectSend(ctx, "CronJob", thisJobID, "Execute").Send(nil, restate.WithDelay(delay))
// Store the job information
jobState := &JobInfo{
Request: req,
NextExecutionTime: nextTime,
NextExecutionID: handle.GetInvocationId(),
}
restate.Set(ctx, JOB_STATE, jobState)
return jobState, nil
}