Skip to content

Commit a041ff6

Browse files
committed
fix(directory): rollback late register success after timeout
1 parent 860efc6 commit a041ff6

File tree

2 files changed

+44
-4
lines changed

2 files changed

+44
-4
lines changed

registry/directory/directory.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,8 @@ func (dir *RegistryDirectory) Subscribe(url *common.URL) error {
222222

223223
func (dir *RegistryDirectory) registerConsumerWithTimeout(url *common.URL, timeout time.Duration, serviceKey string) error {
224224
registerErrCh := make(chan error, 1)
225+
urlToReg := getConsumerUrlToRegistry(url.Clone())
225226
go func() {
226-
urlToReg := getConsumerUrlToRegistry(url.Clone())
227227
registerErrCh <- dir.registry.Register(urlToReg)
228228
}()
229229

@@ -245,6 +245,15 @@ func (dir *RegistryDirectory) registerConsumerWithTimeout(url *common.URL, timeo
245245
return nil
246246
case <-timer.C:
247247
logger.Errorf("register timed out for service: %s", serviceKey)
248+
go func() {
249+
err := <-registerErrCh
250+
if err != nil {
251+
return
252+
}
253+
if unRegErr := dir.registry.UnRegister(urlToReg.Clone()); unRegErr != nil {
254+
logger.Warnf("register timed out for service %s, but late unregister failed: %v", serviceKey, unRegErr)
255+
}
256+
}()
248257
return fmt.Errorf("register timed out for service: %s", serviceKey)
249258
}
250259
}

registry/directory/directory_test.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -446,10 +446,40 @@ func TestRegistryDirectorySubscribeTimeoutSkipsSubscribeStart(t *testing.T) {
446446
assert.Equal(t, int32(0), blockingRegistry.subscribeCalls.Load())
447447
}
448448

449+
func TestRegistryDirectorySubscribeTimeoutLateRegisterRollback(t *testing.T) {
450+
registryURL, err := common.NewURL(
451+
"registry://127.0.0.1:20000",
452+
common.WithParamsValue(constant.RegistryTimeoutKey, "100ms"),
453+
)
454+
require.NoError(t, err)
455+
subscribeURL, err := common.NewURL("consumer://127.0.0.1:20000/org.apache.dubbo-go.mockService")
456+
require.NoError(t, err)
457+
458+
blockingRegistry := &blockingRegistryForSubscribeTest{
459+
url: registryURL,
460+
registerBlock: make(chan struct{}),
461+
}
462+
463+
dir := &RegistryDirectory{
464+
registry: blockingRegistry,
465+
}
466+
467+
err = dir.Subscribe(subscribeURL)
468+
require.Error(t, err)
469+
require.Contains(t, err.Error(), "timed out")
470+
assert.Equal(t, int32(0), blockingRegistry.subscribeCalls.Load())
471+
472+
close(blockingRegistry.registerBlock)
473+
assert.Eventually(t, func() bool {
474+
return blockingRegistry.unregisterCalls.Load() == 1
475+
}, time.Second, 10*time.Millisecond)
476+
}
477+
449478
type blockingRegistryForSubscribeTest struct {
450-
url *common.URL
451-
registerBlock chan struct{}
452-
subscribeCalls atomic.Int32
479+
url *common.URL
480+
registerBlock chan struct{}
481+
subscribeCalls atomic.Int32
482+
unregisterCalls atomic.Int32
453483
}
454484

455485
func (r *blockingRegistryForSubscribeTest) Register(_ *common.URL) error {
@@ -458,6 +488,7 @@ func (r *blockingRegistryForSubscribeTest) Register(_ *common.URL) error {
458488
}
459489

460490
func (r *blockingRegistryForSubscribeTest) UnRegister(_ *common.URL) error {
491+
r.unregisterCalls.Add(1)
461492
return nil
462493
}
463494

0 commit comments

Comments
 (0)