Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions internal/shutdown/shutdown.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package shutdown

import (
"context"
"sync"
)

// Signaller is to signal from outside that any goroutines should begin to close.
//
// NOTE(gregfurman): This approach is a simplified adaptation from github.com/Jeffail/shutdown.
// If we want more complicated shutdown handling, i.e the ability to distinguish between
// a graceful shutdown vs a forced, we can use the package directly,
type Signaller struct {
stopChan chan struct{}
stopOnce sync.Once
}

// NewSignaller creates a new signaller.
func NewSignaller() *Signaller {
return &Signaller{
stopChan: make(chan struct{}),
}
}

// TriggerShutdown signals to the owner of this Signaller that it should terminate.
func (s *Signaller) TriggerShutdown() {
s.stopOnce.Do(func() {
close(s.stopChan)
})
}

// WithShutdown derives a context.Context that will be terminated when either the
// parent context is cancelled or the signal to stop has been made.
func (s *Signaller) WithShutdown(parent context.Context) (context.Context, context.CancelFunc) {
var cancel context.CancelFunc
parent, cancel = context.WithCancel(parent)
go func() {
select {
case <-parent.Done():
case <-s.stopChan:
}
cancel()
}()
return parent, cancel
}
20 changes: 17 additions & 3 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

"github.com/hatchet-dev/hatchet/internal/shutdown"
"github.com/hatchet-dev/hatchet/pkg/client/loader"
"github.com/hatchet-dev/hatchet/pkg/client/rest"

Expand Down Expand Up @@ -46,6 +47,7 @@ type Client interface {
Namespace() string
CloudRegisterID() *string
RunnableActions() []string
Close() error
}

type clientImpl struct {
Expand All @@ -71,6 +73,8 @@ type clientImpl struct {
l *zerolog.Logger

v validator.Validator

shutSig *shutdown.Signaller
}

// Deprecated: ClientOpt is an internal type used by the new Go SDK.
Expand Down Expand Up @@ -282,6 +286,8 @@ func NewFromConfigFile(cf *client.ClientConfigFile, fs ...ClientOpt) (Client, er
}

func newFromOpts(opts *ClientOpts) (Client, error) {
shutSig := shutdown.NewSignaller()

if opts.token == "" {
return nil, fmt.Errorf("token is required")
}
Expand Down Expand Up @@ -355,16 +361,17 @@ func newFromOpts(opts *ClientOpts) (Client, error) {
namespace: opts.namespace,
l: opts.l,
v: opts.v,
ctxLoader: newContextLoader(opts.token, opts.grpcHeaders),
ctxLoader: newContextLoader(opts.token, opts.grpcHeaders, shutSig),
sharedMeta: opts.sharedMeta,
}

subscribe := newSubscribe(conn, shared)
admin := newAdmin(conn, shared, subscribe)
dispatcher := newDispatcher(conn, shared, opts.presetWorkerLabels)
event := newEvent(conn, shared)
httpClient := newClient(shutSig)

rest, err := rest.NewClientWithResponses(opts.serverURL, rest.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error {
rest, err := rest.NewClientWithResponses(opts.serverURL, rest.WithHTTPClient(httpClient), rest.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", opts.token))
return nil
}))
Expand All @@ -373,7 +380,7 @@ func newFromOpts(opts *ClientOpts) (Client, error) {
return nil, fmt.Errorf("could not create rest client: %w", err)
}

cloudrest, err := cloudrest.NewClientWithResponses(opts.serverURL, cloudrest.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error {
cloudrest, err := cloudrest.NewClientWithResponses(opts.serverURL, cloudrest.WithHTTPClient(httpClient), cloudrest.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", opts.token))
return nil
}))
Expand Down Expand Up @@ -417,6 +424,7 @@ func newFromOpts(opts *ClientOpts) (Client, error) {
namespace: opts.namespace,
cloudRegisterID: opts.cloudRegisterID,
runnableActions: opts.runnableActions,
shutSig: shutSig,
}, nil
}

Expand Down Expand Up @@ -472,6 +480,12 @@ func (c *clientImpl) RunnableActions() []string {
return c.runnableActions
}

func (c *clientImpl) Close() error {
// This should allow us to ungracefully cancel all currently executing requests.
c.shutSig.TriggerShutdown()
return c.conn.Close()
}

func initWorkflows(fl filesLoaderFunc, adminClient AdminClient) error {
files := fl()

Expand Down
8 changes: 7 additions & 1 deletion pkg/client/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,28 @@ package client
import (
grpcMetadata "google.golang.org/grpc/metadata"

"github.com/hatchet-dev/hatchet/internal/shutdown"

"context"
)

type contextLoader struct {
Token string
extraMD map[string]string
shutSig *shutdown.Signaller
}

func newContextLoader(token string, extraMD map[string]string) *contextLoader {
func newContextLoader(token string, extraMD map[string]string, shutSig *shutdown.Signaller) *contextLoader {
return &contextLoader{
Token: token,
extraMD: extraMD,
shutSig: shutSig,
}
}

func (c *contextLoader) newContext(ctx context.Context) context.Context {
ctx, _ = c.shutSig.WithShutdown(ctx)

pairs := map[string]string{
"authorization": "Bearer " + c.Token,
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/client/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package client

import (
"net/http"

"github.com/hatchet-dev/hatchet/internal/shutdown"
"github.com/hatchet-dev/hatchet/pkg/client/rest"
)

var _ rest.HttpRequestDoer = new(httpClient)

type httpClient struct {
wrapped rest.HttpRequestDoer
shutSig *shutdown.Signaller
}

func newClient(shutsig *shutdown.Signaller) rest.HttpRequestDoer {
return &httpClient{
wrapped: http.DefaultClient,
shutSig: shutsig,
}
}

func (c *httpClient) Do(req *http.Request) (*http.Response, error) {
ctx := req.Context()

// Here we derive a new context that is cancelled when a shutdown
// is signalled.
ctx, cancel := c.shutSig.WithShutdown(ctx)
defer cancel()

return c.wrapped.Do(req.WithContext(ctx))
}
5 changes: 5 additions & 0 deletions sdks/go/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ func (c *Client) NewWorkflow(name string, options ...WorkflowOption) *Workflow {
return newWorkflow(name, c.legacyClient, options...)
}

// Close will ungracefully cancel all in-flight HTTP and gRPC requests.
func (c *Client) Close() error {
return c.legacyClient.Close()
}

// StandaloneTask represents a single task that runs independently without a workflow wrapper.
// It's essentially a specialized workflow containing only one task.
type StandaloneTask struct {
Expand Down
Loading