@@ -38,7 +38,7 @@ import (
3838// Client of gnet.
3939type Client struct {
4040 opts * Options
41- el * eventloop
41+ eng * engine
4242}
4343
4444// NewClient creates an instance of Client.
@@ -59,28 +59,19 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
5959 }
6060 logging .SetDefaultLoggerAndFlusher (logger , logFlusher )
6161
62- var p * netpoll.Poller
63- if p , err = netpoll .OpenPoller (); err != nil {
64- return
65- }
66-
6762 rootCtx , shutdown := context .WithCancel (context .Background ())
6863 eg , ctx := errgroup .WithContext (rootCtx )
6964 eng := engine {
7065 listeners : make (map [int ]* listener ),
7166 opts : options ,
7267 turnOff : shutdown ,
7368 eventHandler : eh ,
69+ eventLoops : new (leastConnectionsLoadBalancer ),
7470 concurrency : struct {
7571 * errgroup.Group
7672 ctx context.Context
7773 }{eg , ctx },
7874 }
79- el := eventloop {
80- listeners : eng .listeners ,
81- engine : & eng ,
82- poller : p ,
83- }
8475
8576 if options .EdgeTriggeredIOChunk > 0 {
8677 options .EdgeTriggeredIO = true
@@ -107,39 +98,82 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
10798 default :
10899 options .WriteBufferCap = math .CeilToPowerOfTwo (wbc )
109100 }
110-
111- el .buffer = make ([]byte , options .ReadBufferCap )
112- el .connections .init ()
113- el .eventHandler = eh
114- cli .el = & el
101+ cli .eng = & eng
115102 return
116103}
117104
118105// Start starts the client event-loop, handing IO events.
119106func (cli * Client ) Start () error {
120- logging .Infof ("Starting gnet client with 1 event-loop" )
121- cli .el .eventHandler .OnBoot (Engine {cli .el .engine })
122- cli .el .engine .concurrency .Go (cli .el .run )
107+ numEventLoop := determineEventLoops (cli .opts )
108+ logging .Infof ("Starting gnet client with %d event loops" , numEventLoop )
109+
110+ cli .eng .eventHandler .OnBoot (Engine {cli .eng })
111+
112+ var el0 * eventloop
113+ for i := 0 ; i < numEventLoop ; i ++ {
114+ p , err := netpoll .OpenPoller ()
115+ if err != nil {
116+ cli .eng .closeEventLoops ()
117+ return err
118+ }
119+ el := eventloop {
120+ listeners : cli .eng .listeners ,
121+ engine : cli .eng ,
122+ poller : p ,
123+ buffer : make ([]byte , cli .opts .ReadBufferCap ),
124+ eventHandler : cli .eng .eventHandler ,
125+ }
126+ el .connections .init ()
127+ cli .eng .eventLoops .register (& el )
128+ if cli .opts .Ticker && el .idx == 0 {
129+ el0 = & el
130+ }
131+ }
132+
133+ cli .eng .eventLoops .iterate (func (_ int , el * eventloop ) bool {
134+ cli .eng .concurrency .Go (el .run )
135+ return true
136+ })
137+
123138 // Start the ticker.
124- if cli . opts . Ticker {
125- ctx := cli .el . engine .concurrency .ctx
126- cli .el . engine .concurrency .Go (func () error {
127- cli . el .ticker (ctx )
139+ if el0 != nil {
140+ ctx := cli .eng .concurrency .ctx
141+ cli .eng .concurrency .Go (func () error {
142+ el0 .ticker (ctx )
128143 return nil
129144 })
130145 }
146+
131147 logging .Debugf ("default logging level is %s" , logging .LogLevel ())
148+
132149 return nil
133150}
134151
135152// Stop stops the client event-loop.
136- func (cli * Client ) Stop () (err error ) {
137- logging .Error (cli .el .poller .Trigger (queue .HighPriority , func (_ any ) error { return errorx .ErrEngineShutdown }, nil ))
138- err = cli .el .engine .concurrency .Wait ()
139- logging .Error (cli .el .poller .Close ())
140- cli .el .eventHandler .OnShutdown (Engine {cli .el .engine })
153+ func (cli * Client ) Stop () error {
154+ cli .eng .shutdown (nil )
155+
156+ cli .eng .eventHandler .OnShutdown (Engine {cli .eng })
157+
158+ // Notify all event-loops to exit.
159+ cli .eng .eventLoops .iterate (func (_ int , el * eventloop ) bool {
160+ logging .Error (el .poller .Trigger (queue .HighPriority ,
161+ func (_ any ) error { return errorx .ErrEngineShutdown }, nil ))
162+ return true
163+ })
164+
165+ // Wait for all event-loops to exit.
166+ err := cli .eng .concurrency .Wait ()
167+
168+ cli .eng .closeEventLoops ()
169+
170+ // Put the engine into the shutdown state.
171+ cli .eng .inShutdown .Store (true )
172+
173+ // Flush the logger.
141174 logging .Cleanup ()
142- return
175+
176+ return err
143177}
144178
145179// Dial is like net.Dial().
@@ -156,7 +190,7 @@ func (cli *Client) DialContext(network, address string, ctx any) (Conn, error) {
156190 return cli .EnrollContext (c , ctx )
157191}
158192
159- // Enroll converts a net.Conn to gnet.Conn and then adds it into Client.
193+ // Enroll converts a net.Conn to gnet.Conn and then adds it into the Client.
160194func (cli * Client ) Enroll (c net.Conn ) (Conn , error ) {
161195 return cli .EnrollContext (c , nil )
162196}
@@ -196,6 +230,7 @@ func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
196230 }
197231 }
198232
233+ el := cli .eng .eventLoops .next (nil )
199234 var (
200235 sockAddr unix.Sockaddr
201236 gc * conn
@@ -208,29 +243,34 @@ func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
208243 }
209244 ua := c .LocalAddr ().(* net.UnixAddr )
210245 ua .Name = c .RemoteAddr ().String () + "." + strconv .Itoa (dupFD )
211- gc = newTCPConn ( dupFD , cli . el , sockAddr , c .LocalAddr (), c .RemoteAddr ())
246+ gc = newStreamConn ( "unix" , dupFD , el , sockAddr , c .LocalAddr (), c .RemoteAddr ())
212247 case * net.TCPConn :
213248 if cli .opts .TCPNoDelay == TCPNoDelay {
214249 if err = socket .SetNoDelay (dupFD , 1 ); err != nil {
215250 return nil , err
216251 }
217252 }
218253 if cli .opts .TCPKeepAlive > 0 {
219- if err = socket .SetKeepAlivePeriod (dupFD , int (cli .opts .TCPKeepAlive .Seconds ())); err != nil {
254+ if err = setKeepAlive (
255+ dupFD ,
256+ true ,
257+ cli .opts .TCPKeepAlive ,
258+ cli .opts .TCPKeepInterval ,
259+ cli .opts .TCPKeepCount ); err != nil {
220260 return nil , err
221261 }
222262 }
223263 sockAddr , _ , _ , _ , err = socket .GetTCPSockAddr (c .RemoteAddr ().Network (), c .RemoteAddr ().String ())
224264 if err != nil {
225265 return nil , err
226266 }
227- gc = newTCPConn ( dupFD , cli . el , sockAddr , c .LocalAddr (), c .RemoteAddr ())
267+ gc = newStreamConn ( "tcp" , dupFD , el , sockAddr , c .LocalAddr (), c .RemoteAddr ())
228268 case * net.UDPConn :
229269 sockAddr , _ , _ , _ , err = socket .GetUDPSockAddr (c .RemoteAddr ().Network (), c .RemoteAddr ().String ())
230270 if err != nil {
231271 return nil , err
232272 }
233- gc = newUDPConn (dupFD , cli . el , c .LocalAddr (), sockAddr , true )
273+ gc = newUDPConn (dupFD , el , c .LocalAddr (), sockAddr , true )
234274 default :
235275 return nil , errorx .ErrUnsupportedProtocol
236276 }
@@ -240,12 +280,12 @@ func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
240280 ccb := & connWithCallback {c : gc , cb : func () {
241281 close (connOpened )
242282 }}
243- err = cli . el .poller .Trigger (queue .HighPriority , cli . el .register , ccb )
283+ err = el .poller .Trigger (queue .HighPriority , el .register , ccb )
244284 if err != nil {
245285 gc .Close () //nolint:errcheck
246286 return nil , err
247287 }
248-
249288 <- connOpened
289+
250290 return gc , nil
251291}
0 commit comments