Skip to content

Commit 25a56d9

Browse files
committed
[Core] Fixed HighwayContext Sockets Disposing pattern
1 parent 6cdc134 commit 25a56d9

File tree

1 file changed

+6
-7
lines changed

1 file changed

+6
-7
lines changed

Lagrange.Core/Internal/Context/HighwayContext.cs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ public async Task<bool> UploadFile(Stream stream, int commandId, ReadOnlyMemory<
9595

9696
ulong fileSize = (ulong)stream.Length;
9797
ulong offset = 0;
98-
int sequence = GetNewSequence();
9998
var fileMd5 = stream.Md5();
10099
while (offset < fileSize)
101100
{
@@ -105,6 +104,8 @@ public async Task<bool> UploadFile(Stream stream, int commandId, ReadOnlyMemory<
105104
ulong currentBlockOffset = offset;
106105
var task = Task.Run(async () => // closure
107106
{
107+
int sequence = GetNewSequence();
108+
108109
var head = new DataHighwayHead
109110
{
110111
Version = 1,
@@ -224,15 +225,13 @@ public async Task<bool> UploadFile(Stream stream, int commandId, ReadOnlyMemory<
224225
finally
225226
{
226227
ArrayPool<byte>.Shared.Return(buffer);
227-
228-
client.Disconnect();
229228
_sockets.Return(client);
230229
}
231230
});
232231

233232

234233
tasks.Add(task);
235-
if (tasks.Count == (commandId == 95 ? 1 : _concurrent))
234+
if (tasks.Count == (_concurrent))
236235
{
237236
var successBlocks = await Task.WhenAll(tasks);
238237
foreach (bool t in successBlocks) result &= t;
@@ -280,11 +279,11 @@ private sealed class HighwayValueTaskSource : IValueTaskSource<(RespDataHighwayH
280279

281280
private sealed class ObjectPool<T>(Func<T> objectGenerator, Action<T> onDispose) : IDisposable
282281
{
283-
private readonly ConcurrentBag<T> _objects = [];
282+
private readonly ConcurrentQueue<T> _objects = [];
284283

285-
public T Get() => _objects.TryTake(out var item) ? item : objectGenerator();
284+
public T Get() => _objects.TryDequeue(out var item) ? item : objectGenerator();
286285

287-
public void Return(T item) => _objects.Add(item);
286+
public void Return(T item) => _objects.Enqueue(item);
288287

289288
public void Dispose()
290289
{

0 commit comments

Comments
 (0)