Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ protected void initChannel(Channel ch) {
ByteBufFormat activityLogging = logConfig != null && logConfig.isEnabled()? logConfig.getDataFormat() : null;
QuicConnectionHandler handler = new QuicConnectionHandler(context, metrics, config.getIdleTimeout(),
config.getReadIdleTimeout(), config.getWriteIdleTimeout(), activityLogging, config.getMaxStreamBidiRequests(),
config.getMaxStreamUniRequests(), remoteAddress, promise::tryComplete);
config.getMaxStreamUniRequests(), remoteAddress, false, promise::tryComplete);
ch.pipeline().addLast("handler", handler);
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ private static long timeoutMillis(Duration timeout) {
private final ByteBufFormat activityLogging;
private final int maxStreamBidiRequests;
private final int maxStreamUniRequests;
private final boolean server;
private Handler<QuicConnection> handler;
private QuicChannel channel;
private QuicConnectionImpl connection;
private SocketAddress remoteAddress;

public QuicConnectionHandler(ContextInternal context, TransportMetrics<?> metrics, Duration idleTimeout,
Duration readIdleTimeout, Duration writeIdleTimeout, ByteBufFormat activityLogging,
int maxStreamBidiRequests, int maxStreamUniRequests, SocketAddress remoteAddress, Handler<QuicConnection> handler) {
int maxStreamBidiRequests, int maxStreamUniRequests, SocketAddress remoteAddress,
boolean server, Handler<QuicConnection> handler) {
this.context = context;
this.metrics = metrics;
this.idleTimeout = timeoutMillis(idleTimeout);
Expand All @@ -68,14 +70,15 @@ public QuicConnectionHandler(ContextInternal context, TransportMetrics<?> metric
this.maxStreamUniRequests = maxStreamUniRequests;
this.handler = handler;
this.remoteAddress = remoteAddress;
this.server = server;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
QuicChannel ch = (QuicChannel) ctx.channel();
channel = ch;
connection = new QuicConnectionImpl(context, metrics, idleTimeout, readIdleTimeout, writeIdleTimeout, activityLogging,
maxStreamBidiRequests, maxStreamUniRequests, ch, remoteAddress, ctx);
maxStreamBidiRequests, maxStreamUniRequests, ch, remoteAddress, ctx, server);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class QuicConnectionImpl extends ConnectionBase implements QuicConnection
private final long writeIdleTimeout;
private final ByteBufFormat activityLogging;
private final ConnectionGroup streamGroup;
private final boolean server;
private Function<ContextInternal, ContextInternal> streamContextProvider;
private Handler<QuicStream> handler;
private Handler<Duration> shutdownHandler;
Expand All @@ -77,7 +78,8 @@ public class QuicConnectionImpl extends ConnectionBase implements QuicConnection

public QuicConnectionImpl(ContextInternal context, TransportMetrics metrics, long idleTimeout,
long readIdleTimeout, long writeIdleTimeout, ByteBufFormat activityLogging, int maxStreamBidiRequests,
int maxStreamUniRequests, QuicChannel channel, SocketAddress remoteAddress, ChannelHandlerContext chctx) {
int maxStreamUniRequests, QuicChannel channel, SocketAddress remoteAddress, ChannelHandlerContext chctx,
boolean server) {
super(context, chctx);

Map<QuicStreamType, StreamOpenRequestQueue> pendingStreamRequestsMap = new EnumMap<>(QuicStreamType.class);
Expand All @@ -92,6 +94,7 @@ public QuicConnectionImpl(ContextInternal context, TransportMetrics metrics, lon
this.activityLogging = activityLogging;
this.context = context;
this.remoteAddress = remoteAddress;
this.server = server;
this.pendingStreamOpenRequestsMap = pendingStreamRequestsMap;
this.maxDatagramLength = 0;
this.streamGroup = new ConnectionGroup(context.nettyEventLoop()) {
Expand Down Expand Up @@ -144,6 +147,22 @@ public TransportMetrics<?> metrics() {
return metrics;
}

@Override
public SocketAddress remoteAddress() {
if (server) {
// Avoid caching remote address in case of connection migration
return super.channelRemoteAddress();
} else {
return super.remoteAddress();
}
}

@Override
public SocketAddress remoteAddress(boolean real) {
// No proxy
return remoteAddress();
}

void handleStream(QuicStreamChannel streamChannel) {
if (streamChannel.type() == QuicStreamType.BIDIRECTIONAL || streamChannel.isLocalCreated()) {
// Only consider stream we can end for shutdown, e.g. this excludes remote opened HTTP/3 control stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ protected void initChannel(Channel ch) {
ByteBufFormat activityLogging = logConfig != null && logConfig.isEnabled() ? logConfig.getDataFormat() : null;
QuicConnectionHandler handler = new QuicConnectionHandler(context, metrics, config.getIdleTimeout(),
config.getReadIdleTimeout(), config.getWriteIdleTimeout(), activityLogging, config.getMaxStreamBidiRequests(),
config.getMaxStreamUniRequests(), vertx.transport().convert(channel.remoteSocketAddress()), QuicServerImpl.this.handler);
config.getMaxStreamUniRequests(), vertx.transport().convert(channel.remoteSocketAddress()), true,
QuicServerImpl.this.handler);
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("handler", handler);
}
Expand Down
Loading