Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,8 @@ public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem work

public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; } = BehaviorOnContinueAsNew.Ignore;

public int MaxDispatchCount => int.MaxValue;

// Note: Do not rely on cancellationToken parameter to this method because the top layer does not yet implement any cancellation.
public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2169,6 +2169,14 @@ public bool UseSeparateQueuesForEntityWorkItems
set => this.settings.UseSeparateQueueForEntityWorkItems = value;
}

/// <summary>
/// The maximum amount of times a message can be dispatched before it is considered "poisoned" and
/// moved to poison storage.
/// Currently this storage provider does not have poison message handling so this value is set to the
/// maximum integer value.
Comment thread
sophiatev marked this conversation as resolved.
Outdated
/// </summary>
public int MaxDispatchCount => int.MaxValue;

/// <summary>
/// Disposes of the current object.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ internal class RequestMessage : EntityMessage
/// </summary>
public string? ClientSpanId { get; set; }

/// <summary>
/// The dispatch count of this request message.
/// </summary>
public int DispatchCount { get; set; }

/// <inheritdoc/>
public override string GetShortDescription()
{
Expand Down
10 changes: 10 additions & 0 deletions src/DurableTask.Core/Entities/OrchestrationEntityContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,16 @@ public void CompleteAcquire(OperationResult result, Guid criticalSectionId)
this.lockAcquisitionPending = false;
}

/// <summary>
/// Called when the entity lock acquisition fails.
/// </summary>
public void AbandonAcquire()
{
this.criticalSectionLocks = null;
this.criticalSectionId = null;
this.lockAcquisitionPending = false;
}

internal void AdjustOutgoingMessage(string instanceId, RequestMessage requestMessage, DateTime? cappedTime, out string eventName)
{
if (cappedTime.HasValue)
Expand Down
6 changes: 6 additions & 0 deletions src/DurableTask.Core/History/HistoryEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ protected HistoryEvent(int eventId)
[DataMember]
public virtual EventType EventType { get; private set; }

/// <summary>
/// Gets or sets the number of times this event has been dispatched.
/// </summary>
[DataMember]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we have these new properties be omitted by default if not populated, especially IsPoisoned. This will reduce the chance that there's some compatibility issue with older versions of the SDK that try to deserialize the history.

Suggested change
[DataMember]
[DataMember(EmitDefaultValue = false)]

public int DispatchCount { get; set; }
Comment thread
sophiatev marked this conversation as resolved.

/// <summary>
/// Implementation for <see cref="IExtensibleDataObject.ExtensionData"/>.
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions src/DurableTask.Core/IOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ public interface IOrchestrationService
/// </summary>
BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; }

/// <summary>
/// Gets the maximum amount of times the same event can be dispatched before it is considered "poisonous"
/// and the corresponding operation is failed.
/// </summary>
int MaxDispatchCount { get; }
Comment thread
sophiatev marked this conversation as resolved.
Outdated
Comment thread
sophiatev marked this conversation as resolved.
Outdated

/// <summary>
/// Wait for the next orchestration work item and return the orchestration work item
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions src/DurableTask.Core/Logging/EventIds.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,6 @@ static class EventIds
public const int OrchestrationDebugTrace = 73;

public const int OrchestrationCompletedWithWarning = 74;
public const int PoisonMessageDetected = 75;
}
}
49 changes: 49 additions & 0 deletions src/DurableTask.Core/Logging/LogEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1945,5 +1945,54 @@ void IEventSourceEvent.WriteEventSource() =>
Utils.PackageVersion);
}

/// <summary>
/// Log event representing the discarding of a "poison" message.
/// </summary>
internal class PoisonMessageDetected : StructuredLogEvent, IEventSourceEvent
{
public PoisonMessageDetected(OrchestrationInstance orchestrationInstance, string eventType, int taskEventId, string details)
{
this.InstanceId = orchestrationInstance?.InstanceId ?? string.Empty;
this.ExecutionId = orchestrationInstance?.ExecutionId ?? string.Empty;
this.EventType = eventType;
this.TaskEventId = taskEventId;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also include DispatchCount in this log message.

this.Details = details;
}
Comment thread
sophiatev marked this conversation as resolved.

[StructuredLogField]
public string InstanceId { get; }

[StructuredLogField]
public string ExecutionId { get; }

[StructuredLogField]
public string EventType { get; }

[StructuredLogField]
public int TaskEventId { get; }

[StructuredLogField]
public string Details { get; }

public override EventId EventId => new EventId(
EventIds.PoisonMessageDetected,
nameof(EventIds.PoisonMessageDetected));

public override LogLevel Level => LogLevel.Error;

protected override string CreateLogMessage() =>
$"{this.InstanceId}: Poison message detected for {GetEventDescription(this.EventType, this.TaskEventId)}: {this.Details}";

void IEventSourceEvent.WriteEventSource() =>
StructuredEventSource.Log.DiscardingMessage(
this.InstanceId,
this.ExecutionId,
this.EventType,
this.TaskEventId,
this.Details,
Utils.AppName,
Utils.PackageVersion);
}

}
}
44 changes: 41 additions & 3 deletions src/DurableTask.Core/Logging/LogHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
#nullable enable
namespace DurableTask.Core.Logging
{
using System;
using System.Collections.Generic;
using System.Text;
using DurableTask.Core.Command;
using DurableTask.Core.Common;
using DurableTask.Core.Entities.EventFormat;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.History;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we revert the sort order change to this file? Normally we keep System using statements on top. Visual Studio has a setting for this, if it's VS that's changing it automatically.


class LogHelper
{
Expand Down Expand Up @@ -745,6 +747,42 @@ internal void RenewActivityMessageFailed(TaskActivityWorkItem workItem, Exceptio
this.WriteStructuredLog(new LogEvents.RenewActivityMessageFailed(workItem, exception), exception);
}
}

/// <summary>
/// Logs that a "poison message" has been detected and is being dropped.
/// </summary>
/// <param name="orchestrationInstance">The orchestration instance this event was sent to.</param>
/// <param name="historyEvent">The "poisoned" event.</param>
/// <param name="details">Extra details related to the processing of this poison message.</param>
internal void PoisonMessageDetected(OrchestrationInstance? orchestrationInstance, HistoryEvent historyEvent, string details)
{
if (this.IsStructuredLoggingEnabled)
{
this.WriteStructuredLog(new LogEvents.PoisonMessageDetected(
orchestrationInstance,
Comment thread
sophiatev marked this conversation as resolved.
historyEvent.EventType.ToString(),
Utils.GetTaskEventId(historyEvent),
details));
}
Comment thread
sophiatev marked this conversation as resolved.
}

/// <summary>
/// Logs that a "poison" entity request message has been detected and is being dropped.
/// </summary>
/// <param name="orchestrationInstance">The orchestration instance this event was sent to.</param>
/// <param name="requestMessage">The "poisoned" reuest message.</param>
Comment thread
sophiatev marked this conversation as resolved.
Outdated
/// <param name="details">Extra details related to the processing of this poison message.</param>
internal void PoisonMessageDetected(OrchestrationInstance orchestrationInstance, RequestMessage requestMessage, string details)
{
if (this.IsStructuredLoggingEnabled)
{
this.WriteStructuredLog(new LogEvents.PoisonMessageDetected(
orchestrationInstance,
requestMessage.IsLockRequest ? "LockRequest" : "OperationRequest",
taskEventId: -1,
details));
}
}
#endregion

internal void OrchestrationDebugTrace(string instanceId, string executionId, string details)
Expand Down
Loading
Loading