Skip to content

Commit 3cc4781

Browse files
jnthntatumcopybara-github
authored andcommitted
Add reference implementation for async attribute resolution.
Wraps the basic evaluator to coordinate resolving relevant unknowns as they are identified then advancing evaluation. PiperOrigin-RevId: 510456989
1 parent 3651b3d commit 3cc4781

8 files changed

Lines changed: 595 additions & 0 deletions

runtime/src/main/java/dev/cel/runtime/CelVariableResolver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414

1515
package dev.cel.runtime;
1616

17+
import javax.annotation.concurrent.ThreadSafe;
1718
import java.util.Optional;
1819

1920
/** Functional interface that exposes a method to find a CEL variable value by name. */
21+
@ThreadSafe
2022
@FunctionalInterface
2123
public interface CelVariableResolver {
2224

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package dev.cel.runtime.async;
16+
17+
import static com.google.common.collect.ImmutableMap.toImmutableMap;
18+
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
19+
import static com.google.common.util.concurrent.Futures.immediateFuture;
20+
import static com.google.common.util.concurrent.Futures.transformAsync;
21+
import static com.google.common.util.concurrent.Futures.whenAllSucceed;
22+
23+
import javax.annotation.concurrent.ThreadSafe;
24+
import com.google.common.collect.ImmutableMap;
25+
import com.google.common.collect.ImmutableSet;
26+
import com.google.common.util.concurrent.Futures;
27+
import com.google.common.util.concurrent.ListenableFuture;
28+
import com.google.common.util.concurrent.ListeningExecutorService;
29+
import dev.cel.runtime.CelAttribute;
30+
import dev.cel.runtime.CelAttributePattern;
31+
import dev.cel.runtime.CelEvaluationException;
32+
import dev.cel.runtime.CelRuntime.Program;
33+
import dev.cel.runtime.CelUnknownSet;
34+
import dev.cel.runtime.UnknownContext;
35+
import java.util.LinkedHashMap;
36+
import java.util.Map;
37+
import java.util.Optional;
38+
import java.util.concurrent.ExecutionException;
39+
40+
/**
41+
* Default runtime implementation for {@link CelAsyncRuntime.AsyncProgram}.
42+
*
43+
* <p>Control flow is:
44+
*
45+
* <ol>
46+
* <li>Evaluate the expression (synchronously), identifying unknowns or final result
47+
* <li>If unknowns identified, schedule lookups (calls to *Resolver.resolve) on the executor
48+
* <li>On any failure, return failed future. CEL errors can be introduced by the return value of
49+
* resolver.
50+
* <li>On all successful, generate new context for next round of evaluation on the executor
51+
* <li>repeat until a final (non-unknown) result, no progress is made, or iteration limit reached.
52+
* </ol>
53+
*/
54+
@ThreadSafe
55+
final class AsyncProgramImpl implements CelAsyncRuntime.AsyncProgram {
56+
// Safety limit for resolution rounds.
57+
private final int maxEvaluateIterations;
58+
private final Program program;
59+
private final ListeningExecutorService executor;
60+
private final ImmutableMap<CelAttributePattern, CelUnknownAttributeValueResolver> resolvers;
61+
62+
AsyncProgramImpl(
63+
Program program,
64+
ListeningExecutorService executor,
65+
ImmutableMap<CelAttributePattern, CelUnknownAttributeValueResolver> resolvers,
66+
int maxEvaluateIterations) {
67+
this.program = program;
68+
this.executor = executor;
69+
this.resolvers = resolvers;
70+
this.maxEvaluateIterations = maxEvaluateIterations;
71+
}
72+
73+
private Optional<CelUnknownAttributeValueResolver> lookupResolver(CelAttribute attribute) {
74+
// TODO: may need to handle multiple resolvers for partial case.
75+
for (Map.Entry<CelAttributePattern, CelUnknownAttributeValueResolver> entry :
76+
resolvers.entrySet()) {
77+
if (entry.getKey().isPartialMatch(attribute)) {
78+
return Optional.of(entry.getValue());
79+
}
80+
}
81+
return Optional.empty();
82+
}
83+
84+
private ListenableFuture<ImmutableMap<CelAttribute, Object>> allAsMapOnSuccess(
85+
Map<CelAttribute, ListenableFuture<Object>> futureMap) {
86+
return whenAllSucceed(futureMap.values())
87+
.call(
88+
() ->
89+
futureMap.entrySet().stream()
90+
.collect(
91+
toImmutableMap(
92+
Map.Entry::getKey,
93+
entry -> {
94+
try {
95+
return Futures.getDone(entry.getValue());
96+
} catch (ExecutionException e) {
97+
throw new AssertionError(
98+
"Futures.whenAllSucceed forwarded failed future", e);
99+
}
100+
})),
101+
executor);
102+
}
103+
104+
private ListenableFuture<Object> resolveAndReevaluate(
105+
CelUnknownSet unknowns, UnknownContext ctx, int iteration) {
106+
Map<CelAttribute, ListenableFuture<Object>> futureMap = new LinkedHashMap<>();
107+
for (CelAttribute attr : unknowns.attributes()) {
108+
Optional<CelUnknownAttributeValueResolver> maybeResolver = lookupResolver(attr);
109+
110+
maybeResolver.ifPresent((resolver) -> futureMap.put(attr, resolver.resolve(executor, attr)));
111+
}
112+
113+
if (futureMap.isEmpty()) {
114+
return immediateFailedFuture(
115+
new CelEvaluationException(
116+
String.format("Unknown resolution failed -- no resolvers for: %s", unknowns)));
117+
}
118+
119+
// TODO: lookup fails on any failure. Fine for prototyping, but this would likely
120+
// need to be configurable in the future.
121+
return transformAsync(
122+
allAsMapOnSuccess(futureMap),
123+
(result) -> evalPass(ctx.withResolvedAttributes(result), unknowns, iteration),
124+
executor);
125+
}
126+
127+
private ListenableFuture<Object> evalPass(
128+
UnknownContext ctx, CelUnknownSet lastSet, int iteration) {
129+
Object result = null;
130+
try {
131+
result = program.advanceEvaluation(ctx);
132+
} catch (CelEvaluationException e) {
133+
return immediateFailedFuture(e);
134+
}
135+
if (result instanceof CelUnknownSet) {
136+
if (result.equals(lastSet)) {
137+
return immediateFailedFuture(
138+
new CelEvaluationException("No progress in iterative eval. Last result: " + result));
139+
}
140+
// Don't handle unknowns if next evaluation would exceed eval limit.
141+
iteration++;
142+
if (iteration >= maxEvaluateIterations) {
143+
return immediateFailedFuture(
144+
new CelEvaluationException("Max Evaluation iterations exceeded: " + iteration));
145+
}
146+
return resolveAndReevaluate((CelUnknownSet) result, ctx, iteration);
147+
}
148+
149+
return immediateFuture(result);
150+
}
151+
152+
@Override
153+
public ListenableFuture<Object> evaluateToCompletion(UnknownContext ctx) {
154+
return evalPass(ctx, CelUnknownSet.create(ImmutableSet.of()), 0);
155+
}
156+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Reference implementation for an Async evaluator for the CEL runtime.
2+
3+
package(default_visibility = [
4+
"//runtime/src/main/java/dev/cel/runtime/async:__pkg__",
5+
])
6+
7+
ASYNC_RUNTIME_SOURCES = [
8+
"AsyncProgramImpl.java",
9+
"CelAsyncRuntimeImpl.java",
10+
"CelAsyncRuntime.java",
11+
"CelAsyncRuntimeBuilder.java",
12+
"CelAsyncRuntimeFactory.java",
13+
"CelUnknownAttributeValueResolver.java",
14+
]
15+
16+
java_library(
17+
name = "async",
18+
srcs = ASYNC_RUNTIME_SOURCES,
19+
deps = [
20+
"//:auto_value",
21+
"//common/src/main/java/dev/cel/common",
22+
"//runtime/src/main/java/dev/cel/runtime",
23+
"//runtime/src/main/java/dev/cel/runtime:unknown_attributes",
24+
"@maven//:com_google_code_findbugs_annotations",
25+
"@maven//:com_google_errorprone_error_prone_annotations",
26+
"@maven//:com_google_guava_guava",
27+
],
28+
)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package dev.cel.runtime.async;
16+
17+
import javax.annotation.concurrent.ThreadSafe;
18+
import com.google.common.util.concurrent.ListenableFuture;
19+
import dev.cel.common.CelAbstractSyntaxTree;
20+
import dev.cel.runtime.CelEvaluationException;
21+
import dev.cel.runtime.UnknownContext;
22+
23+
/**
24+
* CelAsyncRuntime provides configuration for an async evaluation context.
25+
*
26+
* <p>This is a factory for two primitives:
27+
*
28+
* <ul>
29+
* <li>{@link UnknownContext} manages the state necessary for the intial round of iterative
30+
* evaluation. The AsyncProgram implemenation may use its .withX methods to generate an
31+
* updated context for later rounds.
32+
* <li>{@link AsyncProgram} provides an evaluation manager to automate evaluating and resolving
33+
* unknown data
34+
* </ul>
35+
*/
36+
@ThreadSafe
37+
public interface CelAsyncRuntime {
38+
39+
/**
40+
* Initialize a new async context for iterative evaluation.
41+
*
42+
* <p>This maintains the state related to tracking which parts of the environment are unknown or
43+
* have been resolved.
44+
*/
45+
UnknownContext newAsyncContext();
46+
47+
/** AsyncProgram wraps a CEL Program with a driver to resolve unknowns as they are encountered. */
48+
interface AsyncProgram {
49+
ListenableFuture<Object> evaluateToCompletion(UnknownContext ctx);
50+
}
51+
52+
/**
53+
* Creates an {@link AsyncProgram} for the given AST.
54+
*
55+
* @param ast the input CEL expression. must be type checked.
56+
*/
57+
AsyncProgram createProgram(CelAbstractSyntaxTree ast) throws CelEvaluationException;
58+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package dev.cel.runtime.async;
16+
17+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
18+
import dev.cel.runtime.CelAttributePattern;
19+
import dev.cel.runtime.CelRuntime;
20+
import dev.cel.runtime.CelVariableResolver;
21+
import java.util.concurrent.ExecutorService;
22+
23+
/** Builder interface for {@link CelAsyncRuntime}. */
24+
public interface CelAsyncRuntimeBuilder {
25+
public static final int DEFAULT_MAX_EVALUATE_ITERATIONS = 10;
26+
27+
/** Set the CEL runtime for running incremental evaluation. */
28+
@CanIgnoreReturnValue
29+
public CelAsyncRuntimeBuilder setRuntime(CelRuntime runtime);
30+
31+
/** Add attributes that are declared as Unknown, without any resolver. */
32+
@CanIgnoreReturnValue
33+
public CelAsyncRuntimeBuilder addUnknownAttributePatterns(CelAttributePattern... attributes);
34+
35+
/** Marks an attribute pattern as unknown and associates a resolver with it. */
36+
@CanIgnoreReturnValue
37+
public CelAsyncRuntimeBuilder addResolvableAttributePattern(
38+
CelAttributePattern attribute, CelUnknownAttributeValueResolver resolver);
39+
40+
/**
41+
* Set the maximum number of allowed evaluation passes.
42+
*
43+
* <p>This is a safety mechanism for expressions that chain dependent unknowns (e.g. via the
44+
* conditional operator or nested function calls).
45+
*
46+
* <p>Implementations should default to {@value DEFAULT_MAX_EVALUATION_ITERATIONS}.
47+
*/
48+
@CanIgnoreReturnValue
49+
public CelAsyncRuntimeBuilder setMaxEvaluateIterations(int n);
50+
51+
/**
52+
* Sets the variable resolver for simple CelVariable names (e.g. 'x' or 'com.google.x').
53+
*
54+
* <p>This is consulted after checking for unknown or resolved attributes. It represents any data
55+
* about the environment that does not need any special resolution.
56+
*/
57+
@CanIgnoreReturnValue
58+
CelAsyncRuntimeBuilder setVariableResolver(CelVariableResolver variableResolver);
59+
60+
/**
61+
* Sets the executorService for generated AsyncPrograms.
62+
*
63+
* <p>The executor is used for handling simple transformations of the resolved data and for
64+
* scheduling subsequent rounds of evaluation. If synchronous style resolvers are provided, they
65+
* will be run on the same executor.
66+
*
67+
* <p>Explicitly setting an executor is mandatory. Pick an appropriate executor for the configured
68+
* resolvers.
69+
*/
70+
@CanIgnoreReturnValue
71+
public CelAsyncRuntimeBuilder setExecutorService(ExecutorService executorService);
72+
73+
public CelAsyncRuntime build();
74+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2023 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package dev.cel.runtime.async;
16+
17+
/** Factory for default {@link CelAsyncRuntime}. */
18+
public final class CelAsyncRuntimeFactory {
19+
public static CelAsyncRuntimeBuilder defaultAsyncRuntime() {
20+
return CelAsyncRuntimeImpl.newBuilder();
21+
}
22+
23+
private CelAsyncRuntimeFactory() {}
24+
}

0 commit comments

Comments
 (0)