-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathprocessor_reply.go
More file actions
66 lines (59 loc) · 1.84 KB
/
processor_reply.go
File metadata and controls
66 lines (59 loc) · 1.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package grain
import (
"context"
"errors"
"fmt"
"strconv"
"time"
"github.com/chenxyzl/grain/message"
"github.com/chenxyzl/grain/uuid"
"google.golang.org/protobuf/proto"
)
var _ iProcess = (*processorReply[proto.Message])(nil)
type processorReply[T proto.Message] struct {
registry iRegistry
_self ActorRef
result chan proto.Message
timeout time.Duration
}
func newProcessorReplay[T proto.Message](system ISystem, timeout time.Duration) *processorReply[T] {
self := newDirectActorRef(defaultReplyKind, strconv.Itoa(int(uuid.Generate())), system.getAddr(), system)
p := system.getRegistry().add(func() iProcess {
return &processorReply[T]{
registry: system.getRegistry(),
_self: self,
result: make(chan proto.Message, 1),
timeout: timeout,
}
}).(*processorReply[T])
return p
}
func (x *processorReply[T]) self() ActorRef { return x._self }
func (x *processorReply[T]) opts() *tOpts { return nil }
func (x *processorReply[T]) init() {}
func (x *processorReply[T]) send(ctx Context) { x.result <- ctx.Message() }
func (x *processorReply[T]) Result() (T, *message.ErrCode) {
ctx, cancel := context.WithTimeout(context.Background(), x.timeout)
defer func() {
cancel()
x.registry.remove(x._self)
}()
var null T
select {
case resp := <-x.result:
switch msg := resp.(type) {
case T:
return msg, nil
case *message.Poison:
return null, message.WithErr("reply processor poisoned")
case *message.ErrCode:
return null, msg
case error:
return null, message.WithErr(msg.Error())
default:
return null, message.WithErr(fmt.Sprintf("msg type errr, need:%v, now:%v", null.ProtoReflect().Descriptor().FullName(), msg.ProtoReflect().Descriptor().FullName()))
}
case <-ctx.Done():
return null, message.WithErr(errors.Join(ctx.Err(), fmt.Errorf("reply result timeout, id:%v", x.self())).Error())
}
}