Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ public void onAuthenticationFailed(String deviceId, TransportException transport
this.connectionStateCallback.onAuthenticationFailed(deviceId, transportException);
}

public void onAuthenticationTimedOut(String deviceId)
{
log.warn("Timed out waiting for CBS authentication response for device {}. Closing this connection...", deviceId);
this.connectionStateCallback.onCBSSessionClosedUnexpectedly(null);
this.close();
}

public void close()
{
log.trace("Closing this CBS session");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class AmqpsSasTokenRenewalHandler extends BaseHandler implements AuthenticationM
private boolean isClosed;
private AmqpsSasTokenRenewalHandler nextToAuthenticate;
private Task scheduledTask;
private Task authenticationTimeoutTask;
private int currentAuthenticationRequestId;

public AmqpsSasTokenRenewalHandler(AmqpsCbsSessionHandler amqpsCbsSessionHandler, AmqpsSessionHandler amqpsSessionHandler)
{
Expand Down Expand Up @@ -70,13 +72,16 @@ public void sendAuthenticationMessage(Reactor reactor) throws TransportException
log.debug("Sending authentication message for device {}", amqpsSessionHandler.getDeviceId());
amqpsCbsSessionHandler.sendAuthenticationMessage(amqpsSessionHandler.getClientConfiguration(), this);

scheduleAuthenticationTimeout(reactor);
scheduleRenewal(reactor);
}
}

@Override
public DeliveryState handleAuthenticationResponseMessage(int status, String description, Reactor reactor)
{
cancelAuthenticationTimeout();

try
{
if (nextToAuthenticate != null)
Expand Down Expand Up @@ -104,6 +109,15 @@ public DeliveryState handleAuthenticationResponseMessage(int status, String desc
return Accepted.getInstance();
}

private void onAuthenticationTimedOut(int authenticationRequestId)
{
if (!isClosed && authenticationRequestId == currentAuthenticationRequestId)
{
log.warn("Timed out waiting for CBS authentication response for device {}", this.amqpsSessionHandler.getDeviceId());
this.amqpsCbsSessionHandler.onAuthenticationTimedOut(this.amqpsSessionHandler.getDeviceId());
}
}

// Once closed, this handler will stop sending authentication messages for its device. This object may not be re-opened.
public void close()
{
Expand All @@ -128,6 +142,36 @@ private void scheduleRenewalRetry(Reactor reactor)
this.scheduledTask = reactor.schedule(RETRY_INTERVAL_MILLISECONDS, this);
}

private void scheduleAuthenticationTimeout(Reactor reactor)
{
cancelAuthenticationTimeout();

currentAuthenticationRequestId++;
final int expectedAuthenticationRequestId = currentAuthenticationRequestId;
long authenticationTimeout = this.amqpsSessionHandler.getClientConfiguration().getOperationTimeout();

log.trace("Scheduling CBS authentication response timeout for device {} in {} milliseconds", this.amqpsSessionHandler.getDeviceId(), authenticationTimeout);

this.authenticationTimeoutTask = reactor.schedule((int) authenticationTimeout, new BaseHandler()
{
@Override
public void onTimerTask(Event event)
{
onAuthenticationTimedOut(expectedAuthenticationRequestId);
}
});
}

private void cancelAuthenticationTimeout()
{
if (this.authenticationTimeoutTask != null)
{
this.authenticationTimeoutTask.cancel();
this.authenticationTimeoutTask.attachments().clear();
this.authenticationTimeoutTask = null;
}
}

// Removes any children of this handler (such as LoggingFlowController) and disassociates this handler
// from the proton reactor. By removing the reference of the proton reactor to this handler, this handler becomes
// eligible for garbage collection by the JVM. This is important for multiplexed connections where SAS token renewal
Expand All @@ -140,6 +184,8 @@ private void clearHandlers()
this.scheduledTask.attachments().clear();
}

cancelAuthenticationTimeout();

// an instance of this class shouldn't have any children, but other handlers may be added as this SDK
// grows and this protects against potential memory leaks
Iterator<Handler> childrenIterator = this.children();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/

package com.microsoft.azure.sdk.iot.device.transport.amqps;

import com.microsoft.azure.sdk.iot.device.ClientConfiguration;
import com.microsoft.azure.sdk.iot.device.auth.IotHubSasTokenAuthenticationProvider;
import com.microsoft.azure.sdk.iot.device.transport.TransportException;
import mockit.Delegate;
import mockit.Expectations;
import mockit.Mocked;
import mockit.Verifications;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.Task;
import org.junit.Test;

/**
* Unit tests for AmqpsSasTokenRenewalHandler.
*/
@SuppressWarnings("ThrowableNotThrown")
public class AmqpsSasTokenRenewalHandlerTest
{
@Mocked
AmqpsCbsSessionHandler mockedCbsSessionHandler;

@Mocked
AmqpsSessionHandler mockedSessionHandler;

@Mocked
ClientConfiguration mockedConfig;

@Mocked
IotHubSasTokenAuthenticationProvider mockedSasTokenAuthenticationProvider;

@Mocked
Reactor mockedReactor;

@Mocked
Task mockedAuthenticationTimeoutTask;

@Mocked
Task mockedRenewalTask;

@Mocked
Event mockedEvent;

@Mocked
Record mockedRecord;

// Tests_SRS_AMQPSSASTOKENRENEWALHANDLER_34_001: [If no CBS authentication response is received before the operation timeout, this function shall notify the CBS session that authentication timed out.]
@Test
public void authenticationResponseTimeoutNotifiesCbsSession() throws TransportException
{
//arrange
final String deviceId = "someDevice";
final int authenticationTimeout = 1000;
final int renewalPeriod = 2000;
final Handler[] authenticationTimeoutHandler = new Handler[1];

new Expectations()
{
{
mockedSessionHandler.getDeviceId();
result = deviceId;

mockedSessionHandler.getClientConfiguration();
result = mockedConfig;

mockedConfig.getOperationTimeout();
result = authenticationTimeout;

mockedConfig.getSasTokenAuthentication();
result = mockedSasTokenAuthenticationProvider;

mockedSasTokenAuthenticationProvider.getMillisecondsBeforeProactiveRenewal();
result = renewalPeriod;

mockedCbsSessionHandler.sendAuthenticationMessage(mockedConfig, (AuthenticationMessageCallback) any);

mockedReactor.schedule(anyInt, (Handler) any);
result = new Delegate<Reactor>()
{
@SuppressWarnings("unused")
Task schedule(int delay, Handler handler)
{
if (delay == authenticationTimeout)
{
authenticationTimeoutHandler[0] = handler;
return mockedAuthenticationTimeoutTask;
}

return mockedRenewalTask;
}
};
}
};

AmqpsSasTokenRenewalHandler sasTokenRenewalHandler = new AmqpsSasTokenRenewalHandler(mockedCbsSessionHandler, mockedSessionHandler);
sasTokenRenewalHandler.sendAuthenticationMessage(mockedReactor);

//act
((BaseHandler) authenticationTimeoutHandler[0]).onTimerTask(mockedEvent);

//assert
new Verifications()
{
{
mockedCbsSessionHandler.onAuthenticationTimedOut(deviceId);
times = 1;
}
};
}

// Tests_SRS_AMQPSSASTOKENRENEWALHANDLER_34_002: [If a CBS authentication response is received before the operation timeout, this function shall cancel the authentication response timeout.]
@Test
public void authenticationResponseCancelsResponseTimeoutTask() throws TransportException
{
//arrange
final String deviceId = "someDevice";
final int authenticationTimeout = 1000;
final int renewalPeriod = 2000;

new Expectations()
{
{
mockedSessionHandler.getDeviceId();
result = deviceId;

mockedSessionHandler.getClientConfiguration();
result = mockedConfig;

mockedConfig.getOperationTimeout();
result = authenticationTimeout;

mockedConfig.getSasTokenAuthentication();
result = mockedSasTokenAuthenticationProvider;

mockedSasTokenAuthenticationProvider.getMillisecondsBeforeProactiveRenewal();
result = renewalPeriod;

mockedCbsSessionHandler.sendAuthenticationMessage(mockedConfig, (AuthenticationMessageCallback) any);

mockedReactor.schedule(authenticationTimeout, (Handler) any);
result = mockedAuthenticationTimeoutTask;

mockedReactor.schedule(renewalPeriod, (Handler) any);
result = mockedRenewalTask;

mockedAuthenticationTimeoutTask.attachments();
result = mockedRecord;

mockedSessionHandler.openLinks();
}
};

AmqpsSasTokenRenewalHandler sasTokenRenewalHandler = new AmqpsSasTokenRenewalHandler(mockedCbsSessionHandler, mockedSessionHandler);
sasTokenRenewalHandler.sendAuthenticationMessage(mockedReactor);

//act
sasTokenRenewalHandler.handleAuthenticationResponseMessage(200, "", mockedReactor);

//assert
new Verifications()
{
{
mockedAuthenticationTimeoutTask.cancel();
times = 1;

mockedRecord.clear();
times = 1;

mockedCbsSessionHandler.onAuthenticationTimedOut(anyString);
times = 0;
}
};
}
}