-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathplugin.go
More file actions
218 lines (175 loc) · 4.89 KB
/
plugin.go
File metadata and controls
218 lines (175 loc) · 4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package smtp
import (
"context"
"net"
"sync"
"github.com/emersion/go-smtp"
"github.com/roadrunner-server/endure/v2/dep"
"github.com/roadrunner-server/errors"
"go.uber.org/zap"
)
const (
PluginName = "smtp"
)
// Logger interface for dependency injection
type Logger interface {
NamedLogger(name string) *zap.Logger
}
// Configurer interface for configuration access
type Configurer interface {
// UnmarshalKey takes a single key and unmarshal it into a Struct
UnmarshalKey(name string, out any) error
// Has checks if a config section exists
Has(name string) bool
}
// Plugin is the SMTP server plugin
type Plugin struct {
mu sync.RWMutex
cfg *Config
log *zap.Logger
connections sync.Map // uuid -> *Session
// Jobs plugin reference
jobs Jobs
// SMTP server components
smtpServer *smtp.Server
listener net.Listener
}
// Init initializes the plugin with configuration and logger
func (p *Plugin) Init(log Logger, cfg Configurer) error {
const op = errors.Op("smtp_plugin_init")
// Check if plugin is enabled
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
// Parse configuration
err := cfg.UnmarshalKey(PluginName, &p.cfg)
if err != nil {
return errors.E(op, err)
}
// Initialize defaults
if err := p.cfg.InitDefaults(); err != nil {
return errors.E(op, err)
}
// Setup logger
p.log = log.NamedLogger(PluginName)
p.log.Info("SMTP plugin initialized",
zap.String("addr", p.cfg.Addr),
zap.String("hostname", p.cfg.Hostname),
zap.Int64("max_message_size", p.cfg.MaxMessageSize),
zap.String("jobs_pipeline", p.cfg.Jobs.Pipeline),
)
return nil
}
// Serve starts the SMTP server
func (p *Plugin) Serve() chan error {
errCh := make(chan error, 2)
p.mu.Lock()
defer p.mu.Unlock()
// Check if jobs plugin was collected
if p.jobs == nil {
errCh <- errors.E(errors.Op("smtp_serve"), errors.Str("jobs plugin not available - ensure jobs plugin is enabled and loaded"))
return errCh
}
// 1. Create SMTP backend
backend := NewBackend(p)
// 2. Create SMTP server
p.smtpServer = smtp.NewServer(backend)
p.smtpServer.Addr = p.cfg.Addr
p.smtpServer.Domain = p.cfg.Hostname
p.smtpServer.ReadTimeout = p.cfg.ReadTimeout
p.smtpServer.WriteTimeout = p.cfg.WriteTimeout
p.smtpServer.MaxMessageBytes = p.cfg.MaxMessageSize
p.smtpServer.MaxRecipients = 100
p.smtpServer.AllowInsecureAuth = true
p.log.Info("SMTP server configured",
zap.String("addr", p.smtpServer.Addr),
zap.String("domain", p.smtpServer.Domain),
zap.String("jobs_pipeline", p.cfg.Jobs.Pipeline),
)
// 3. Create listener
var err error
p.listener, err = net.Listen("tcp", p.cfg.Addr)
if err != nil {
errCh <- errors.E(errors.Op("smtp_listen"), err)
return errCh
}
p.log.Info("SMTP listener created", zap.String("addr", p.cfg.Addr))
// 4. Start SMTP server in goroutine
go func() {
p.log.Info("SMTP server starting", zap.String("addr", p.cfg.Addr))
if err := p.smtpServer.Serve(p.listener); err != nil {
p.log.Error("SMTP server error", zap.Error(err))
errCh <- err
}
}()
// 5. Start temp file cleanup routine
p.startCleanupRoutine(context.Background())
return errCh
}
// Stop gracefully stops the plugin
func (p *Plugin) Stop(ctx context.Context) error {
p.log.Info("stopping SMTP plugin")
doneCh := make(chan struct{}, 1)
go func() {
p.mu.Lock()
defer p.mu.Unlock()
// 1. Close listener (stops accepting new connections)
if p.listener != nil {
_ = p.listener.Close()
}
// 2. Close SMTP server
if p.smtpServer != nil {
_ = p.smtpServer.Close()
}
// 3. Close all tracked connections
p.connections.Range(func(key, value any) bool {
// Sessions will be cleaned up by Logout()
return true
})
doneCh <- struct{}{}
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-doneCh:
p.log.Info("SMTP plugin stopped")
return nil
}
}
// Name returns plugin name for RoadRunner
func (p *Plugin) Name() string {
return PluginName
}
// Collects declares dependencies on other plugins
func (p *Plugin) Collects() []*dep.In {
return []*dep.In{
dep.Fits(func(pp any) {
// Collect Jobs plugin that implements Push method
p.jobs = pp.(Jobs)
p.log.Debug("collected jobs plugin")
}, (*Jobs)(nil)),
}
}
// RPC returns RPC interface for external management
func (p *Plugin) RPC() any {
return &rpc{p: p}
}
// pushToJobs sends email as job to Jobs plugin
func (p *Plugin) pushToJobs(email *EmailData) error {
const op = errors.Op("smtp_push_to_jobs")
if p.jobs == nil {
return errors.E(op, errors.Str("jobs plugin not available - ensure jobs plugin is enabled and loaded before smtp plugin"))
}
// Convert to domain model
msg := emailToJobMessage(email, &p.cfg.Jobs)
// Push directly to Jobs plugin
err := p.jobs.Push(context.Background(), msg)
if err != nil {
return errors.E(op, err)
}
p.log.Debug("email pushed to jobs",
zap.String("uuid", email.UUID),
zap.String("pipeline", p.cfg.Jobs.Pipeline),
)
return nil
}