Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Features
- New `ReSettings.EnableParallelRuleCompilation` (default `false`). When `true`, workflow rules are compiled in parallel during registration, materially reducing warmup time for workflows with many thousands of rules. Silently falls back to serial compilation when combined with `UseFastExpressionCompiler = true` (which regresses ~3× under contention) or for workflows below an internal scheduling-cost threshold. Builds on the warmup work in #740 (#741).

## [6.0.1-preview.2]

### Features
Expand Down
17 changes: 17 additions & 0 deletions src/RulesEngine/Models/ReSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ internal ReSettings(ReSettings reSettings)
UseFastExpressionCompiler = reSettings.UseFastExpressionCompiler;
EnableExceptionAsErrorMessageForRuleExpressionParsing = reSettings.EnableExceptionAsErrorMessageForRuleExpressionParsing;
AutoExecuteActions = reSettings.AutoExecuteActions;
EnableParallelRuleCompilation = reSettings.EnableParallelRuleCompilation;
}


Expand Down Expand Up @@ -98,6 +99,22 @@ internal ReSettings(ReSettings reSettings)
/// run actions yourself (e.g. via ExecuteActionWorkflowAsync) for selective control. See #596.
/// </summary>
public bool AutoExecuteActions { get; set; } = true;

/// <summary>
/// When true, rules within a workflow are compiled in parallel during registration.
/// Significantly reduces warmup time for workflows with many thousands of rules.
/// </summary>
/// <remarks>
/// Silently falls back to serial compilation when:
/// <list type="bullet">
/// <item><see cref="UseFastExpressionCompiler"/> is also <c>true</c> — FastExpressionCompiler
/// regresses ~3× under parallel contention, so the engine declines to parallelize that mix.</item>
/// <item>The workflow has fewer enabled rules than an internal threshold (32) where
/// the dispatch cost outweighs the speedup.</item>
/// </list>
/// Default: <c>false</c>.
/// </remarks>
public bool EnableParallelRuleCompilation { get; set; } = false;
}

public enum NestedRuleExecutionMode
Expand Down
42 changes: 40 additions & 2 deletions src/RulesEngine/RulesEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public class RulesEngine : IRulesEngine
private readonly RuleCompiler _ruleCompiler;
private readonly ActionFactory _actionFactory;
private const string ParamParseRegex = "(\\$\\(.*?\\))";

// Below this rule count, Parallel.For's scheduling cost exceeds the speedup from
// distributing CompileRule across threads. See ReSettings.EnableParallelRuleCompilation.
private const int MinRulesForParallelCompilation = 32;
#endregion

#region Constructor
Expand Down Expand Up @@ -399,9 +403,43 @@ private bool RegisterRule(string workflowName, params RuleParameter[] ruleParams
_rulesCache.AddOrUpdateGlobalParamsDelegate(compileRulesKey, globalParamsDelegate);
}

foreach (var rule in workflow.Rules.Where(c => c.Enabled))
var enabledRules = workflow.Rules.Where(c => c.Enabled).ToArray();
var compiledFuncs = new RuleFunc<RuleResultTree>[enabledRules.Length];

// Parallel compilation helps only when:
// - the user opted in,
// - they're not also on UseFastExpressionCompiler (which regresses ~3× under
// parallel contention; FEC's internal locking serializes effort), and
// - there are enough rules to amortize Parallel.For's scheduling cost.
var shouldParallelize = _reSettings.EnableParallelRuleCompilation
&& !_reSettings.UseFastExpressionCompiler
&& enabledRules.Length >= MinRulesForParallelCompilation;

if (shouldParallelize)
{
try
{
Parallel.For(0, enabledRules.Length, i => {
compiledFuncs[i] = CompileRule(enabledRules[i], workflow.RuleExpressionType, ruleParams, globalParamExp);
});
}
catch (AggregateException ae) when (ae.InnerExceptions.Count > 0)
{
// Preserve the serial-compilation contract: the first rule that fails
// to compile surfaces its own exception, not an AggregateException.
System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(ae.InnerExceptions[0]).Throw();
}
}
else
{
for (var i = 0; i < enabledRules.Length; i++)
{
compiledFuncs[i] = CompileRule(enabledRules[i], workflow.RuleExpressionType, ruleParams, globalParamExp);
}
}
for (var i = 0; i < enabledRules.Length; i++)
{
dictFunc.Add(rule.RuleName, CompileRule(rule,workflow.RuleExpressionType, ruleParams, globalParamExp));
dictFunc.Add(enabledRules[i].RuleName, compiledFuncs[i]);
}

_rulesCache.AddOrUpdateCompiledRule(compileRulesKey, dictFunc);
Expand Down
117 changes: 117 additions & 0 deletions test/RulesEngine.UnitTest/ParallelRuleCompilationTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using RulesEngine.Models;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading.Tasks;
using Xunit;

namespace RulesEngine.UnitTest
{
[ExcludeFromCodeCoverage]
public class ParallelRuleCompilationTest
{
// Larger than the internal MinRulesForParallelCompilation threshold so parallel mode actually engages.
private const int RuleCount = 64;

private static Workflow BuildLargeWorkflow() => new Workflow
{
WorkflowName = "wf",
Rules = Enumerable.Range(0, RuleCount)
.Select(i => new Rule { RuleName = $"R{i}", Expression = $"input1 >= {i}" })
.ToArray()
};

[Fact]
public async Task ParallelCompilation_ProducesIdenticalResultsAsSerial()
{
var workflow = BuildLargeWorkflow();

var serialEngine = new RulesEngine(new[] { workflow },
new ReSettings { EnableParallelRuleCompilation = false });
var parallelEngine = new RulesEngine(new[] { workflow },
new ReSettings { EnableParallelRuleCompilation = true });

var serial = await serialEngine.ExecuteAllRulesAsync(
"wf", new[] { RuleParameter.Create("input1", 32) });
var parallel = await parallelEngine.ExecuteAllRulesAsync(
"wf", new[] { RuleParameter.Create("input1", 32) });

Assert.Equal(serial.Count, parallel.Count);
for (var i = 0; i < serial.Count; i++)
{
Assert.Equal(serial[i].Rule.RuleName, parallel[i].Rule.RuleName);
Assert.Equal(serial[i].IsSuccess, parallel[i].IsSuccess);
}
}

[Fact]
public async Task ParallelCompilation_PreservesExceptionMessage_WhenRuleExpressionThrows()
{
// Inject a deliberately broken rule into a large-enough workflow that the parallel
// path engages, then assert the per-rule ExceptionMessage explains the underlying
// failure rather than leaking an AggregateException.
var rules = Enumerable.Range(0, RuleCount)
.Select(i => new Rule { RuleName = $"R{i}", Expression = "input1 >= 0" })
.ToList();
rules[5].Expression = "input1.NoSuchMember.Foo()";

var workflow = new Workflow { WorkflowName = "wf", Rules = rules };
var engine = new RulesEngine(new[] { workflow },
new ReSettings { EnableParallelRuleCompilation = true });

var results = await engine.ExecuteAllRulesAsync(
"wf", new[] { RuleParameter.Create("input1", 1) });

var broken = results.Single(r => r.Rule.RuleName == "R5");
Assert.False(broken.IsSuccess);
Assert.False(string.IsNullOrEmpty(broken.ExceptionMessage));
Assert.DoesNotContain("AggregateException", broken.ExceptionMessage);
}

[Fact]
public async Task ParallelCompilation_FallsBackToSerial_WhenFastExpressionCompilerEnabled()
{
// The flag combination is permitted at construction time (back-compat), but the
// engine declines to parallelize and silently uses the serial path. We can only
// assert correctness here (results match serial). The fallback itself is
// observable in benchmarks, not in functional tests.
var engine = new RulesEngine(new[] { BuildLargeWorkflow() },
new ReSettings
{
EnableParallelRuleCompilation = true,
UseFastExpressionCompiler = true
});

var results = await engine.ExecuteAllRulesAsync(
"wf", new[] { RuleParameter.Create("input1", 100) });

Assert.Equal(RuleCount, results.Count);
Assert.All(results, r => Assert.True(r.IsSuccess));
}

[Fact]
public async Task ParallelCompilation_FallsBackToSerial_ForSmallWorkflowsBelowThreshold()
{
// Below the minimum threshold, the engine declines to parallelize. Verify a
// 5-rule workflow still works correctly with the flag enabled.
var workflow = new Workflow
{
WorkflowName = "wf",
Rules = Enumerable.Range(0, 5)
.Select(i => new Rule { RuleName = $"R{i}", Expression = $"input1 >= {i}" })
.ToArray()
};
var engine = new RulesEngine(new[] { workflow },
new ReSettings { EnableParallelRuleCompilation = true });

var results = await engine.ExecuteAllRulesAsync(
"wf", new[] { RuleParameter.Create("input1", 3) });

Assert.Equal(5, results.Count);
// input1=3 means R0..R3 succeed, R4 fails. Same outcome whether serial or parallel.
Assert.Equal(4, results.Count(r => r.IsSuccess));
}
}
}
Loading