Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 113 additions & 16 deletions src/WorkflowCore.DSL/Services/DefinitionLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class DefinitionLoader : IDefinitionLoader
// ParsingConfig to allow access to commonly used .NET methods like object.Equals
private static readonly ParsingConfig ParsingConfig = new ParsingConfig
{
AllowNewToEvaluateAnyType = true,
AllowNewToEvaluateAnyType = false,
AreContextKeywordsEnabled = true
};

Expand Down Expand Up @@ -62,7 +62,15 @@ public DefinitionLoader(IWorkflowRegistry registry, ITypeResolver typeResolver)

public WorkflowDefinition LoadDefinition(string source, Func<string, DefinitionSourceV1> deserializer)
{
if (string.IsNullOrWhiteSpace(source))
throw new ArgumentNullException(nameof(source));
if (deserializer == null)
throw new ArgumentNullException(nameof(deserializer));

var sourceObj = deserializer(source);
if (sourceObj == null)
throw new InvalidOperationException("Deserialization returned null.");

var def = Convert(sourceObj);
_registry.RegisterWorkflow(def);
return def;
Expand Down Expand Up @@ -110,27 +118,43 @@ private WorkflowStepCollection ConvertSteps(ICollection<StepSourceV1> source, Ty
{
containerType = typeof(WorkflowStep<>).MakeGenericType(stepType);

targetStep = (containerType.GetConstructor(new Type[] { }).Invoke(null) as WorkflowStep);
var ctor = containerType.GetConstructor(new Type[] { });
if (ctor == null)
throw new InvalidOperationException($"Type '{containerType.FullName}' does not have a parameterless constructor.");
targetStep = ctor.Invoke(null) as WorkflowStep;
}
else
{
targetStep = stepType.GetConstructor(new Type[] { }).Invoke(null) as WorkflowStep;
var ctor = stepType.GetConstructor(new Type[] { });
if (ctor == null)
throw new InvalidOperationException($"Type '{stepType.FullName}' does not have a parameterless constructor.");
targetStep = ctor.Invoke(null) as WorkflowStep;
if (targetStep != null)
stepType = targetStep.BodyType;
}

if (nextStep.Saga)
{
containerType = typeof(SagaContainer<>).MakeGenericType(stepType);
targetStep = (containerType.GetConstructor(new Type[] { }).Invoke(null) as WorkflowStep);
var sagaCtor = containerType.GetConstructor(new Type[] { });
if (sagaCtor == null)
throw new InvalidOperationException($"Type '{containerType.FullName}' does not have a parameterless constructor.");
targetStep = sagaCtor.Invoke(null) as WorkflowStep;
}

if (!string.IsNullOrEmpty(nextStep.CancelCondition))
{
var cancelExprType = typeof(Expression<>).MakeGenericType(typeof(Func<,>).MakeGenericType(dataType, typeof(bool)));
var dataParameter = Expression.Parameter(dataType, "data");
var cancelExpr = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { dataParameter }, typeof(bool), TransformExpression(nextStep.CancelCondition));
targetStep.CancelCondition = cancelExpr;
try
{
var cancelExpr = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { dataParameter }, typeof(bool), TransformExpression(nextStep.CancelCondition));
targetStep.CancelCondition = cancelExpr;
}
catch (Exception ex) when (ex is System.Linq.Dynamic.Core.Exceptions.ParseException || ex is InvalidOperationException)
{
throw new WorkflowDefinitionLoadException($"Error parsing cancel condition expression '{nextStep.CancelCondition}' for step '{nextStep.Id}': {ex.Message}", ex);
}
}

targetStep.Id = i;
Expand Down Expand Up @@ -252,7 +276,15 @@ private void AttachOutputs(StepSourceV1 source, Type dataType, Type stepType, Wo
foreach (var output in source.Outputs)
{
var stepParameter = Expression.Parameter(stepType, "step");
var sourceExpr = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { stepParameter }, typeof(object), TransformExpression(output.Value));
LambdaExpression sourceExpr;
try
{
sourceExpr = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { stepParameter }, typeof(object), TransformExpression(output.Value));
}
catch (Exception ex) when (ex is System.Linq.Dynamic.Core.Exceptions.ParseException || ex is InvalidOperationException)
{
throw new WorkflowDefinitionLoadException($"Error parsing output expression '{output.Value}': {ex.Message}", ex);
}

var dataParameter = Expression.Parameter(dataType, "data");

Expand Down Expand Up @@ -288,7 +320,15 @@ private void AttachDirectlyOutput(KeyValuePair<string, string> output, WorkflowS

Action<IStepBody, object> acn = (pStep, pData) =>
{
object resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); ;
object resolvedValue;
try
{
resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep);
}
catch (TargetInvocationException ex)
{
throw new InvalidOperationException($"Error evaluating expression for output property.", ex.InnerException ?? ex);
}
propertyInfo.SetValue(pData, resolvedValue, new object[] { output.Key });
};

Expand Down Expand Up @@ -340,8 +380,24 @@ private void AttachNestedOutput(KeyValuePair<string, string> output, WorkflowSte
Action<IStepBody, object> acn = (pStep, pData) =>
{
var targetExpr = Expression.Lambda(memberExpression, dataParameter);
object data = targetExpr.Compile().DynamicInvoke(pData);
object resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); ;
object data;
try
{
data = targetExpr.Compile().DynamicInvoke(pData);
}
catch (TargetInvocationException ex)
{
throw new InvalidOperationException($"Error evaluating expression for nested output property.", ex.InnerException ?? ex);
}
object resolvedValue;
try
{
resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep);
}
catch (TargetInvocationException ex)
{
throw new InvalidOperationException($"Error evaluating expression for nested output property.", ex.InnerException ?? ex);
}
propertyInfo.SetValue(data, resolvedValue, new object[] { items[1] });
};

Expand All @@ -354,7 +410,7 @@ private void AttachNestedOutput(KeyValuePair<string, string> output, WorkflowSte
{
targetProperty = Expression.Property(targetProperty, propertyName);
}
catch
catch (ArgumentException)
{
targetProperty = null;
break;
Expand All @@ -379,7 +435,15 @@ private void AttachOutcomes(StepSourceV1 source, Type dataType, WorkflowStep ste

foreach (var nextStep in source.SelectNextStep)
{
var sourceDelegate = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { dataParameter, outcomeParameter }, typeof(object), TransformExpression(nextStep.Value)).Compile();
Delegate sourceDelegate;
try
{
sourceDelegate = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { dataParameter, outcomeParameter }, typeof(object), TransformExpression(nextStep.Value)).Compile();
}
catch (Exception ex) when (ex is System.Linq.Dynamic.Core.Exceptions.ParseException || ex is InvalidOperationException)
{
throw new WorkflowDefinitionLoadException($"Error parsing select next step expression '{nextStep.Value}': {ex.Message}", ex);
}
Expression<Func<object, object, bool>> sourceExpr = (data, outcome) => System.Convert.ToBoolean(sourceDelegate.DynamicInvoke(data, outcome));
step.Outcomes.Add(new ExpressionOutcome<object>(sourceExpr)
{
Expand All @@ -396,13 +460,38 @@ private Type FindType(string name)
private static Action<IStepBody, object, IStepExecutionContext> BuildScalarInputAction(KeyValuePair<string, object> input, ParameterExpression dataParameter, ParameterExpression contextParameter, ParameterExpression environmentVarsParameter, PropertyInfo stepProperty)
{
var expr = System.Convert.ToString(input.Value);
var sourceExpr = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { dataParameter, contextParameter, environmentVarsParameter }, typeof(object), TransformExpression(expr));
LambdaExpression sourceExpr;
try
{
sourceExpr = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { dataParameter, contextParameter, environmentVarsParameter }, typeof(object), TransformExpression(expr));
}
catch (Exception ex) when (ex is System.Linq.Dynamic.Core.Exceptions.ParseException || ex is InvalidOperationException)
{
throw new WorkflowDefinitionLoadException($"Error parsing input expression '{expr}' for property '{input.Key}': {ex.Message}", ex);
}

void acn(IStepBody pStep, object pData, IStepExecutionContext pContext)
{
object resolvedValue = sourceExpr.Compile().DynamicInvoke(pData, pContext, Environment.GetEnvironmentVariables());
object resolvedValue;
try
{
resolvedValue = sourceExpr.Compile().DynamicInvoke(pData, pContext, Environment.GetEnvironmentVariables());
}
catch (TargetInvocationException ex)
{
throw new InvalidOperationException($"Error evaluating expression for step property.", ex.InnerException ?? ex);
}
if (stepProperty.PropertyType.IsEnum)
stepProperty.SetValue(pStep, Enum.Parse(stepProperty.PropertyType, (string)resolvedValue, true));
{
try
{
stepProperty.SetValue(pStep, Enum.Parse(stepProperty.PropertyType, resolvedValue?.ToString() ?? string.Empty, true));
}
catch (ArgumentException ex)
{
throw new InvalidOperationException($"Invalid enum value '{resolvedValue}' for property '{stepProperty.Name}' of type '{stepProperty.PropertyType.Name}'.", ex);
}
}
else
{
if ((resolvedValue != null) && (stepProperty.PropertyType.IsAssignableFrom(resolvedValue.GetType())))
Expand Down Expand Up @@ -430,7 +519,15 @@ void acn(IStepBody pStep, object pData, IStepExecutionContext pContext)
if (prop.Name.StartsWith("@"))
{
var sourceExpr = DynamicExpressionParser.ParseLambda(ParsingConfig, false, new[] { dataParameter, contextParameter, environmentVarsParameter }, typeof(object), TransformExpression(prop.Value.ToString()));
object resolvedValue = sourceExpr.Compile().DynamicInvoke(pData, pContext, Environment.GetEnvironmentVariables());
object resolvedValue;
try
{
resolvedValue = sourceExpr.Compile().DynamicInvoke(pData, pContext, Environment.GetEnvironmentVariables());
}
catch (TargetInvocationException ex)
{
throw new InvalidOperationException($"Error evaluating expression for step property.", ex.InnerException ?? ex);
}
subobj.Remove(prop.Name);
subobj.Add(prop.Name.TrimStart('@'), JToken.FromObject(resolvedValue));
}
Expand Down
3 changes: 2 additions & 1 deletion src/WorkflowCore.DSL/Services/Deserializers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ public static class Deserializers
{
private static Serializer yamlSerializer = new Serializer();

public static Func<string, DefinitionSourceV1> Json = (source) => JsonConvert.DeserializeObject<DefinitionSourceV1>(source);
public static Func<string, DefinitionSourceV1> Json = (source) =>
JsonConvert.DeserializeObject<DefinitionSourceV1>(source, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None });

public static Func<string, DefinitionSourceV1> Yaml = (source) => yamlSerializer.DeserializeInto(source, new DefinitionSourceV1());
}
Expand Down
9 changes: 8 additions & 1 deletion src/WorkflowCore.DSL/Services/TypeResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ public class TypeResolver : ITypeResolver
{
public Type FindType(string name)
{
return Type.GetType(name, true, true);
try
{
return Type.GetType(name, true, true);
}
catch (TypeLoadException ex)
{
throw new InvalidOperationException($"Could not resolve type '{name}'. Ensure the type exists and the assembly is referenced.", ex);
}
}
}
}
14 changes: 8 additions & 6 deletions src/WorkflowCore.Testing/JsonWorkflowTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public abstract class JsonWorkflowTest : IDisposable
protected IDefinitionLoader DefinitionLoader;
protected IWorkflowRegistry Registry;
protected List<StepError> UnhandledStepErrors = new List<StepError>();
private ServiceProvider _serviceProvider;

protected virtual void Setup()
{
Expand All @@ -25,16 +26,16 @@ protected virtual void Setup()
services.AddLogging();
ConfigureServices(services);

var serviceProvider = services.BuildServiceProvider();
_serviceProvider = services.BuildServiceProvider();

//config logging
var loggerFactory = serviceProvider.GetService<ILoggerFactory>();
var loggerFactory = _serviceProvider.GetService<ILoggerFactory>();
//loggerFactory.AddConsole(LogLevel.Debug);

PersistenceProvider = serviceProvider.GetService<IPersistenceProvider>();
DefinitionLoader = serviceProvider.GetService<IDefinitionLoader>();
Registry = serviceProvider.GetService<IWorkflowRegistry>();
Host = serviceProvider.GetService<IWorkflowHost>();
PersistenceProvider = _serviceProvider.GetService<IPersistenceProvider>();
DefinitionLoader = _serviceProvider.GetService<IDefinitionLoader>();
Registry = _serviceProvider.GetService<IWorkflowRegistry>();
Host = _serviceProvider.GetService<IWorkflowHost>();
Host.OnStepError += Host_OnStepError;
Host.Start();
}
Expand Down Expand Up @@ -104,6 +105,7 @@ protected TData GetData<TData>(string workflowId)
public void Dispose()
{
Host.Stop();
_serviceProvider?.Dispose();
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/WorkflowCore.Testing/WorkflowTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public abstract class WorkflowTest<TWorkflow, TData> : IDisposable
protected IWorkflowHost Host;
protected IPersistenceProvider PersistenceProvider;
protected List<StepError> UnhandledStepErrors = new List<StepError>();
private ServiceProvider _serviceProvider;

protected virtual void Setup()
{
Expand All @@ -25,10 +26,10 @@ protected virtual void Setup()
services.AddLogging();
ConfigureServices(services);

var serviceProvider = services.BuildServiceProvider();
_serviceProvider = services.BuildServiceProvider();

PersistenceProvider = serviceProvider.GetService<IPersistenceProvider>();
Host = serviceProvider.GetService<IWorkflowHost>();
PersistenceProvider = _serviceProvider.GetService<IPersistenceProvider>();
Host = _serviceProvider.GetService<IWorkflowHost>();
Host.RegisterWorkflow<TWorkflow, TData>();
Host.OnStepError += Host_OnStepError;
Host.Start();
Expand Down Expand Up @@ -119,6 +120,7 @@ protected TData GetData(string workflowId)
public void Dispose()
{
Host.Stop();
_serviceProvider?.Dispose();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,10 @@ public WorkflowDefinitionLoadException(string message)
: base (message)
{
}

public WorkflowDefinitionLoadException(string message, Exception innerException)
: base(message, innerException)
{
}
}
}
11 changes: 8 additions & 3 deletions src/WorkflowCore/Services/ActivityController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@ public async Task<PendingActivity> GetPendingActivity(string activityName, strin
var endTime = _dateTimeProvider.UtcNow.Add(timeout ?? TimeSpan.Zero);
var firstPass = true;
EventSubscription subscription = null;
bool lockAcquired = false;
while ((subscription == null && _dateTimeProvider.UtcNow < endTime) || firstPass)
{
if (!firstPass)
await Task.Delay(100);
subscription = await _subscriptionRepository.GetFirstOpenSubscription(Event.EventTypeActivity, activityName, _dateTimeProvider.UtcNow);
if (subscription != null)
if (!await _lockProvider.AcquireLock($"sub:{subscription.Id}", CancellationToken.None))
{
lockAcquired = await _lockProvider.AcquireLock($"sub:{subscription.Id}", CancellationToken.None);
if (!lockAcquired)
subscription = null;
}
firstPass = false;
}
if (subscription == null)
Expand All @@ -51,7 +55,7 @@ public async Task<PendingActivity> GetPendingActivity(string activityName, strin
Token = token.Encode(),
ActivityName = subscription.EventKey,
Parameters = subscription.SubscriptionData,
TokenExpiry = new DateTime(DateTime.MaxValue.Ticks, DateTimeKind.Utc)
TokenExpiry = DateTime.SpecifyKind(DateTime.MaxValue, DateTimeKind.Utc)
};

if (!await _subscriptionRepository.SetSubscriptionToken(subscription.Id, result.Token, workerId, result.TokenExpiry))
Expand All @@ -61,7 +65,8 @@ public async Task<PendingActivity> GetPendingActivity(string activityName, strin
}
finally
{
await _lockProvider.ReleaseLock($"sub:{subscription.Id}");
if (lockAcquired)
await _lockProvider.ReleaseLock($"sub:{subscription.Id}");
}

}
Expand Down
Loading