Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ linters:
- third_party$
- builtin$
- ^examples/
- ^sdks/go/examples/
- '(.+)_test\.go'
- "cmd/hatchet-loadtest/rampup/(.+).go"
formatters:
Expand All @@ -71,4 +72,5 @@ formatters:
paths:
- third_party$
- builtin$
- ^examples/
- ^examples/go/
- ^sdks/go/examples/
89 changes: 72 additions & 17 deletions examples/go/scheduled/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,102 @@ import (
"log"
"time"

"github.com/google/uuid"
"github.com/oapi-codegen/runtime/types"

"github.com/hatchet-dev/hatchet/pkg/client/rest"
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
"github.com/hatchet-dev/hatchet/sdks/go/features"
)

type SimpleInput struct {
Message string `json:"message"`
}

type SimpleOutput struct {
Result string `json:"result"`
}

func main() {
client, err := hatchet.NewClient()
if err != nil {
log.Fatalf("failed to create hatchet client: %v", err)
}

simple := client.NewStandaloneTask("simple", func(ctx hatchet.Context, input SimpleInput) (SimpleOutput, error) {
return SimpleOutput{
Result: "Processed: " + input.Message,
}, nil
})

// > Create
scheduledRun, err := client.Schedules().Create(
context.Background(),
"scheduled",
features.CreateScheduledRunTrigger{
TriggerAt: time.Now().Add(1 * time.Minute),
Input: map[string]interface{}{"message": "Hello, World!"},
},
)
tomorrow := time.Now().UTC().AddDate(0, 0, 1)
tomorrowNoon := time.Date(tomorrow.Year(), tomorrow.Month(), tomorrow.Day(), 12, 0, 0, 0, time.UTC)

scheduledRun, err := simple.Schedule(context.Background(), tomorrowNoon, SimpleInput{Message: "Hello, World!"})
if err != nil {
log.Fatalf("failed to create scheduled run: %v", err)
}

scheduledRunId := scheduledRun.GetScheduledWorkflows()[0].GetId()

// > Delete
err = client.Schedules().Delete(
client.Schedules().Delete(
context.Background(),
scheduledRun.Metadata.Id,
scheduledRunId,
)
if err != nil {
log.Fatalf("failed to delete scheduled run: %v", err)
}

// > List
scheduledRuns, err := client.Schedules().List(
client.Schedules().List(
context.Background(),
rest.WorkflowScheduledListParams{},
)

// > Reschedule
client.Schedules().Update(
context.Background(),
scheduledRunId,
rest.UpdateScheduledWorkflowRunRequest{
TriggerAt: time.Now().UTC().Add(10 * time.Second),
},
)

scheduledRunIds := []types.UUID{types.UUID(uuid.MustParse(scheduledRunId))}

// > Bulk Delete
client.Schedules().BulkDelete(
context.Background(),
rest.ScheduledWorkflowsBulkDeleteRequest{
ScheduledWorkflowRunIds: &scheduledRunIds,
},
)

scheduledRunIdUUID := types.UUID(uuid.MustParse(scheduledRunId))

// > Reschedule
client.Schedules().Update(
context.Background(),
scheduledRunId,
rest.UpdateScheduledWorkflowRunRequest{
TriggerAt: time.Now().UTC().Add(10 * time.Second),
},
)

// > Bulk Update
client.Schedules().BulkUpdate(
context.Background(),
rest.ScheduledWorkflowsBulkUpdateRequest{
Updates: []rest.ScheduledWorkflowsBulkUpdateItem{
{Id: scheduledRunIdUUID, TriggerAt: time.Now().UTC().Add(10 * time.Second)},
},
},
)

worker, err := client.NewWorker("scheduled-worker", hatchet.WithWorkflows(simple))
if err != nil {
log.Fatalf("failed to list scheduled runs: %v", err)
log.Fatalf("failed to create worker: %v", err)
}

_ = scheduledRuns
if err := worker.StartBlocking(context.Background()); err != nil {
log.Fatalf("failed to start worker: %v", err)
}
}
14 changes: 10 additions & 4 deletions examples/python/cron/workflow-definition.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from pydantic import BaseModel

from hatchet_sdk import Context, EmptyModel, Hatchet

hatchet = Hatchet(debug=True)
Expand All @@ -11,11 +13,15 @@
cron_workflow = hatchet.workflow(name="CronWorkflow", on_crons=["* * * * *"])


class TaskOutput(BaseModel):
message: str


@cron_workflow.task()
def step1(input: EmptyModel, ctx: Context) -> dict[str, str]:
return {
"time": "step1",
}
def step1(input: EmptyModel, ctx: Context) -> TaskOutput:
return TaskOutput(
message="Hello, world!",
)



Expand Down
25 changes: 25 additions & 0 deletions examples/python/scheduled/programatic-async.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, timedelta, timezone

from hatchet_sdk import Hatchet
from hatchet_sdk.clients.rest.models.scheduled_run_status import ScheduledRunStatus

hatchet = Hatchet()

Expand All @@ -20,6 +21,12 @@ async def create_scheduled() -> None:

scheduled_run.metadata.id # the id of the scheduled run trigger

# > Reschedule
await hatchet.scheduled.aio_update(
scheduled_id=scheduled_run.metadata.id,
trigger_at=datetime.now(tz=timezone.utc) + timedelta(hours=1),
)

# > Delete
await hatchet.scheduled.aio_delete(scheduled_id=scheduled_run.metadata.id)

Expand All @@ -30,3 +37,21 @@ async def create_scheduled() -> None:
scheduled_run = await hatchet.scheduled.aio_get(
scheduled_id=scheduled_run.metadata.id
)

id = scheduled_run.metadata.id

# > Bulk Delete
await hatchet.scheduled.aio_bulk_delete(scheduled_ids=[id])

await hatchet.scheduled.aio_bulk_delete(
workflow_id="workflow_id",
statuses=[ScheduledRunStatus.SCHEDULED],
additional_metadata={"customer_id": "customer-a"},
)

# > Bulk Reschedule
await hatchet.scheduled.aio_bulk_update(
[
(id, datetime.now(tz=timezone.utc) + timedelta(hours=2)),
]
)
12 changes: 7 additions & 5 deletions examples/python/simple/schedule.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# > Schedule a Task
from datetime import datetime
from datetime import datetime, timedelta, timezone

from examples.simple.worker import simple

schedule = simple.schedule(datetime(2025, 3, 14, 15, 9, 26))
# > Schedule a Task

tomorrow_noon = datetime.now(tz=timezone.utc).replace(
hour=12, minute=0, second=0, microsecond=0
) + timedelta(days=1)

## 👀 do something with the id
print(schedule.id)
scheduled_run = simple.schedule(tomorrow_noon)

22 changes: 9 additions & 13 deletions examples/typescript/simple/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,27 @@ import { simple } from './workflow';
async function main() {
// > Create a Scheduled Run

const runAt = new Date(new Date().setHours(12, 0, 0, 0) + 24 * 60 * 60 * 1000);
const tomorrowNoon = new Date();
tomorrowNoon.setUTCDate(tomorrowNoon.getUTCDate() + 1);
tomorrowNoon.setUTCHours(12, 0, 0, 0);

const scheduled = await simple.schedule(runAt, {
Message: 'hello',
const scheduled = await simple.schedule(tomorrowNoon, {
Message: 'Hello, World!',
});

// 👀 Get the scheduled run ID of the workflow
// it may be helpful to store the scheduled run ID of the workflow
// in a database or other persistent storage for later use

const scheduledRunId = scheduled.metadata.id;
console.log(scheduledRunId);

// > Reschedule a Scheduled Run
await hatchet.scheduled.update(scheduledRunId, {
triggerAt: new Date(Date.now() + 60 * 60 * 1000),
triggerAt: new Date(Date.now() + 24 * 60 * 60 * 1000),
});

// > Delete a Scheduled Run
await hatchet.scheduled.delete(scheduled);
await hatchet.scheduled.delete(scheduledRunId);

// > List Scheduled Runs
const scheduledRuns = await hatchet.scheduled.list({
workflow: simple,
});
console.log(scheduledRuns);
const scheduledRuns = await hatchet.scheduled.list({});

// > Bulk Delete Scheduled Runs
await hatchet.scheduled.bulkDelete({
Expand Down
2 changes: 1 addition & 1 deletion frontend/docs/pages/v1/bulk-run.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ As with the run methods, you can call bulk methods on the task fn context parame
src={snippets.typescript.simple.bulk.bulk_run_tasks_from_within_a_task}
/>

Available bulk methods on the `Context` object are: - `bulkRunChildren` - `bulkRunChildrenNoWait`
Available bulk methods on the `Context` object are: `bulkRunChildren`, `bulkRunChildrenNoWait`.

</Tabs.Tab>
<Tabs.Tab title="Go">
Expand Down
Loading
Loading