diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/AbstractJavaCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/AbstractJavaCallExecutor.java new file mode 100644 index 000000000..99e464d72 --- /dev/null +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/AbstractJavaCallExecutor.java @@ -0,0 +1,65 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.func; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import io.serverlessworkflow.impl.executors.CallableTask; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +public abstract class AbstractJavaCallExecutor implements CallableTask { + + protected final Optional> inputClass; + + protected AbstractJavaCallExecutor() { + this(Optional.empty()); + } + + protected AbstractJavaCallExecutor(Optional> inputClass) { + this.inputClass = inputClass; + } + + @Override + public CompletableFuture apply( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + Object result = callJavaFunction(workflowContext, taskContext, model2Input(input)); + WorkflowModelFactory modelFactory = workflowContext.definition().application().modelFactory(); + return result instanceof CompletableFuture future + ? future.thenApply(v -> output2Model(modelFactory, input, v)) + : CompletableFuture.completedFuture(output2Model(modelFactory, input, result)); + } + + protected abstract Object callJavaFunction( + WorkflowContext workflowContext, TaskContext taskContext, T input); + + protected T model2Input(WorkflowModel model) { + return JavaFuncUtils.convertT(model, inputClass); + } + + protected Object convertResponse(Object obj) { + return obj == null + ? null + : DataTypeConverterRegistry.get().find(obj.getClass()).map(c -> c.apply(obj)).orElse(obj); + } + + protected WorkflowModel output2Model( + WorkflowModelFactory modelFactory, WorkflowModel input, Object result) { + return modelFactory.fromAny(input, convertResponse(result)); + } +} diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/DataTypeConverter.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/DataTypeConverter.java new file mode 100644 index 000000000..0e322c020 --- /dev/null +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/DataTypeConverter.java @@ -0,0 +1,23 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.func; + +import io.serverlessworkflow.impl.ServicePriority; +import java.util.function.Function; + +public interface DataTypeConverter extends Function, ServicePriority { + Class sourceType(); +} diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/DataTypeConverterRegistry.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/DataTypeConverterRegistry.java new file mode 100644 index 000000000..ea97e89e7 --- /dev/null +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/DataTypeConverterRegistry.java @@ -0,0 +1,75 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.func; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.ServiceLoader; +import java.util.concurrent.ConcurrentHashMap; + +public class DataTypeConverterRegistry { + + private static final DataTypeConverterRegistry instance = new DataTypeConverterRegistry(); + + public static DataTypeConverterRegistry get() { + return instance; + } + + @SuppressWarnings("rawtypes") + private final Iterable converters; + + @SuppressWarnings("rawtypes") + private final Map, Optional> convertersMap; + + private DataTypeConverterRegistry() { + this.converters = ServiceLoader.load(DataTypeConverter.class); + this.convertersMap = new ConcurrentHashMap<>(); + } + + @SuppressWarnings("rawtypes") + public Optional find(Class clazz) { + return convertersMap.computeIfAbsent(clazz, this::searchConverter); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private Optional searchConverter(Class clazz) { + List candidates = new ArrayList<>(); + for (DataTypeConverter converter : converters) { + if (converter.sourceType().equals(clazz)) { + candidates.add(converter); + } + } + if (!candidates.isEmpty()) { + return first(candidates); + } + + for (DataTypeConverter converter : converters) { + if (converter.sourceType().isAssignableFrom(clazz)) { + candidates.add(converter); + } + } + return candidates.isEmpty() ? Optional.empty() : first(candidates); + } + + @SuppressWarnings("rawtypes") + private Optional first(List candidates) { + Collections.sort(candidates); + return Optional.of(candidates.get(0)); + } +} diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaConsumerCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaConsumerCallExecutor.java index 91b912ff5..0d28cd392 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaConsumerCallExecutor.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaConsumerCallExecutor.java @@ -15,47 +15,29 @@ */ package io.serverlessworkflow.impl.executors.func; -import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.func.CallJava; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.executors.CallableTask; -import io.serverlessworkflow.impl.executors.CallableTaskBuilder; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; -public class JavaConsumerCallExecutor - implements CallableTaskBuilder> { +public class JavaConsumerCallExecutor implements CallableTask { - private Consumer consumer; - private Optional> inputClass; + private final Optional> inputClass; + private final Consumer consumer; - public void init( - CallJava.CallJavaConsumer task, - WorkflowDefinition definition, - WorkflowMutablePosition position) { - consumer = task.consumer(); - inputClass = task.inputClass(); + public JavaConsumerCallExecutor(Optional> inputClass, Consumer consumer) { + this.inputClass = inputClass; + this.consumer = consumer; } - private CompletableFuture apply( + @Override + public CompletableFuture apply( WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { T typed = JavaFuncUtils.convertT(input, inputClass); consumer.accept(typed); return CompletableFuture.completedFuture(input); } - - @Override - public boolean accept(Class clazz) { - return CallJava.CallJavaConsumer.class.isAssignableFrom(clazz); - } - - @Override - public CallableTask build() { - return this::apply; - } } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaConsumerCallExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaConsumerCallExecutorBuilder.java new file mode 100644 index 000000000..379f5127a --- /dev/null +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaConsumerCallExecutorBuilder.java @@ -0,0 +1,50 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.func; + +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.func.CallJava; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.executors.CallableTaskBuilder; +import java.util.Optional; +import java.util.function.Consumer; + +public class JavaConsumerCallExecutorBuilder + implements CallableTaskBuilder> { + + private Consumer consumer; + private Optional> inputClass; + + public void init( + CallJava.CallJavaConsumer task, + WorkflowDefinition definition, + WorkflowMutablePosition position) { + consumer = task.consumer(); + inputClass = task.inputClass(); + } + + @Override + public boolean accept(Class clazz) { + return CallJava.CallJavaConsumer.class.isAssignableFrom(clazz); + } + + @Override + public CallableTask build() { + return new JavaConsumerCallExecutor(inputClass, consumer); + } +} diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutor.java index e59d9def9..beeb94b16 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutor.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutor.java @@ -15,53 +15,24 @@ */ package io.serverlessworkflow.impl.executors.func; -import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.func.CallJava; -import io.serverlessworkflow.api.types.func.CallJava.CallJavaContextFunction; import io.serverlessworkflow.api.types.func.JavaContextFunction; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowMutablePosition; -import io.serverlessworkflow.impl.executors.CallableTask; -import io.serverlessworkflow.impl.executors.CallableTaskBuilder; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -public class JavaContextFunctionCallExecutor - implements CallableTaskBuilder> { +public class JavaContextFunctionCallExecutor extends AbstractJavaCallExecutor { - private JavaContextFunction function; - private Optional> inputClass; + private final JavaContextFunction function; - @Override - public boolean accept(Class clazz) { - return CallJava.CallJavaContextFunction.class.isAssignableFrom(clazz); - } - - @Override - public void init( - CallJavaContextFunction task, - WorkflowDefinition definition, - WorkflowMutablePosition position) { - this.function = task.function(); - this.inputClass = task.inputClass(); + public JavaContextFunctionCallExecutor( + Optional> inputClass, JavaContextFunction function) { + super(inputClass); + this.function = function; } @Override - public CallableTask build() { - return this::apply; - } - - private CompletableFuture apply( - WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - return CompletableFuture.completedFuture( - workflowContext - .definition() - .application() - .modelFactory() - .fromAny( - input, function.apply(JavaFuncUtils.convertT(input, inputClass), workflowContext))); + protected Object callJavaFunction( + WorkflowContext workflowContext, TaskContext taskContext, T input) { + return function.apply(input, workflowContext); } } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutorBuilder.java new file mode 100644 index 000000000..e115141b3 --- /dev/null +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutorBuilder.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.func; + +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.func.CallJava; +import io.serverlessworkflow.api.types.func.CallJava.CallJavaContextFunction; +import io.serverlessworkflow.api.types.func.JavaContextFunction; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.executors.CallableTaskBuilder; +import java.util.Optional; + +public class JavaContextFunctionCallExecutorBuilder + implements CallableTaskBuilder> { + + protected JavaContextFunction function; + protected Optional> inputClass; + + @Override + public boolean accept(Class clazz) { + return CallJava.CallJavaContextFunction.class.isAssignableFrom(clazz); + } + + @Override + public void init( + CallJavaContextFunction task, + WorkflowDefinition definition, + WorkflowMutablePosition position) { + this.function = task.function(); + this.inputClass = task.inputClass(); + } + + @Override + public CallableTask build() { + return new JavaContextFunctionCallExecutor(inputClass, function); + } +} diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutor.java index c60f6cb68..e0272c5c3 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutor.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutor.java @@ -15,55 +15,24 @@ */ package io.serverlessworkflow.impl.executors.func; -import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.func.CallJava; -import io.serverlessworkflow.api.types.func.CallJava.CallJavaFilterFunction; import io.serverlessworkflow.api.types.func.JavaFilterFunction; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowMutablePosition; -import io.serverlessworkflow.impl.executors.CallableTask; -import io.serverlessworkflow.impl.executors.CallableTaskBuilder; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -public class JavaFilterFunctionCallExecutor - implements CallableTaskBuilder> { +public class JavaFilterFunctionCallExecutor extends AbstractJavaCallExecutor { - private JavaFilterFunction function; - private Optional> inputClass; + private final JavaFilterFunction function; - private CompletableFuture apply( - WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - return CompletableFuture.completedFuture( - workflowContext - .definition() - .application() - .modelFactory() - .fromAny( - input, - function.apply( - JavaFuncUtils.convertT(input, inputClass), workflowContext, taskContext))); + public JavaFilterFunctionCallExecutor( + Optional> inputClass, JavaFilterFunction function) { + super(inputClass); + this.function = function; } @Override - public boolean accept(Class clazz) { - return CallJava.CallJavaFilterFunction.class.isAssignableFrom(clazz); - } - - @Override - public void init( - CallJavaFilterFunction task, - WorkflowDefinition definition, - WorkflowMutablePosition position) { - this.function = task.function(); - this.inputClass = task.inputClass(); - } - - @Override - public CallableTask build() { - return this::apply; + protected Object callJavaFunction( + WorkflowContext workflowContext, TaskContext taskContext, T input) { + return function.apply(input, workflowContext, taskContext); } } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutorBuilder.java new file mode 100644 index 000000000..1b19677e6 --- /dev/null +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutorBuilder.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.func; + +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.func.CallJava; +import io.serverlessworkflow.api.types.func.CallJava.CallJavaFilterFunction; +import io.serverlessworkflow.api.types.func.JavaFilterFunction; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.executors.CallableTaskBuilder; +import java.util.Optional; + +public class JavaFilterFunctionCallExecutorBuilder + implements CallableTaskBuilder> { + + private JavaFilterFunction function; + private Optional> inputClass; + + @Override + public boolean accept(Class clazz) { + return CallJava.CallJavaFilterFunction.class.isAssignableFrom(clazz); + } + + @Override + public void init( + CallJavaFilterFunction task, + WorkflowDefinition definition, + WorkflowMutablePosition position) { + this.function = task.function(); + this.inputClass = task.inputClass(); + } + + @Override + public CallableTask build() { + return new JavaFilterFunctionCallExecutor<>(inputClass, function); + } +} diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutor.java index 734ece909..33ac6c6e4 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutor.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutor.java @@ -15,52 +15,23 @@ */ package io.serverlessworkflow.impl.executors.func; -import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.func.CallJava; -import io.serverlessworkflow.api.types.func.CallJava.CallJavaFunction; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowMutablePosition; -import io.serverlessworkflow.impl.executors.CallableTask; -import io.serverlessworkflow.impl.executors.CallableTaskBuilder; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; -public class JavaFunctionCallExecutor - implements CallableTaskBuilder> { +public class JavaFunctionCallExecutor extends AbstractJavaCallExecutor { - private Function function; - private Optional> inputClass; + private final Function function; - private CompletableFuture apply( - WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - return CompletableFuture.completedFuture( - workflowContext - .definition() - .application() - .modelFactory() - .fromAny(input, function.apply(JavaFuncUtils.convertT(input, inputClass)))); + public JavaFunctionCallExecutor(Optional> inputClass, Function function) { + super(inputClass); + this.function = function; } @Override - public boolean accept(Class clazz) { - return CallJava.CallJavaFunction.class.isAssignableFrom(clazz); - } - - @Override - public void init( - CallJavaFunction task, - WorkflowDefinition definition, - WorkflowMutablePosition position) { - function = task.function(); - inputClass = task.inputClass(); - } - - @Override - public CallableTask build() { - return this::apply; + protected Object callJavaFunction( + WorkflowContext workflowContext, TaskContext taskContext, T input) { + return function.apply(input); } } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutorBuilder.java new file mode 100644 index 000000000..dd4ec9a79 --- /dev/null +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutorBuilder.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.func; + +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.func.CallJava; +import io.serverlessworkflow.api.types.func.CallJava.CallJavaFunction; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.executors.CallableTaskBuilder; +import java.util.Optional; +import java.util.function.Function; + +public class JavaFunctionCallExecutorBuilder + implements CallableTaskBuilder> { + + protected Function function; + protected Optional> inputClass; + + @Override + public boolean accept(Class clazz) { + return CallJava.CallJavaFunction.class.isAssignableFrom(clazz); + } + + @Override + public void init( + CallJavaFunction task, + WorkflowDefinition definition, + WorkflowMutablePosition position) { + function = task.function(); + inputClass = task.inputClass(); + } + + @Override + public CallableTask build() { + return new JavaFunctionCallExecutor<>(inputClass, function); + } +} diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutor.java index 263c7e037..0f9bae72d 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutor.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutor.java @@ -17,50 +17,23 @@ import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.safeObject; -import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.func.CallJava; -import io.serverlessworkflow.api.types.func.CallJava.CallJavaLoopFunction; import io.serverlessworkflow.api.types.func.LoopFunction; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowModelFactory; -import io.serverlessworkflow.impl.WorkflowMutablePosition; -import io.serverlessworkflow.impl.executors.CallableTask; -import io.serverlessworkflow.impl.executors.CallableTaskBuilder; -import java.util.concurrent.CompletableFuture; -public class JavaLoopFunctionCallExecutor - implements CallableTaskBuilder { +public class JavaLoopFunctionCallExecutor extends AbstractJavaCallExecutor { - private LoopFunction function; - private String varName; + private final LoopFunction function; + private final String varName; - private CompletableFuture apply( - WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - WorkflowModelFactory modelFactory = workflowContext.definition().application().modelFactory(); - return CompletableFuture.completedFuture( - modelFactory.fromAny( - input, - function.apply( - input.asJavaObject(), safeObject(taskContext.variables().get(varName))))); + public JavaLoopFunctionCallExecutor(LoopFunction function, String varName) { + this.function = function; + this.varName = varName; } @Override - public boolean accept(Class clazz) { - return CallJava.CallJavaLoopFunction.class.isAssignableFrom(clazz); - } - - @Override - public void init( - CallJavaLoopFunction task, WorkflowDefinition definition, WorkflowMutablePosition position) { - function = task.function(); - varName = task.varName(); - } - - @Override - public CallableTask build() { - return this::apply; + protected Object callJavaFunction( + WorkflowContext workflowContext, TaskContext taskContext, T input) { + return function.apply(input, (V) safeObject(taskContext.variables().get(varName))); } } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutorBuilder.java new file mode 100644 index 000000000..9ca65180d --- /dev/null +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutorBuilder.java @@ -0,0 +1,49 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.func; + +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.func.CallJava; +import io.serverlessworkflow.api.types.func.CallJava.CallJavaLoopFunction; +import io.serverlessworkflow.api.types.func.LoopFunction; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.executors.CallableTaskBuilder; + +public class JavaLoopFunctionCallExecutorBuilder + implements CallableTaskBuilder { + + private LoopFunction function; + private String varName; + + @Override + public boolean accept(Class clazz) { + return CallJava.CallJavaLoopFunction.class.isAssignableFrom(clazz); + } + + @Override + public void init( + CallJavaLoopFunction task, WorkflowDefinition definition, WorkflowMutablePosition position) { + function = task.function(); + varName = task.varName(); + } + + @Override + public CallableTask build() { + return new JavaLoopFunctionCallExecutor<>(function, varName); + } +} diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionIndexCallExecutor.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionIndexCallExecutor.java index 797ad0671..9aa3e6c65 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionIndexCallExecutor.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionIndexCallExecutor.java @@ -17,57 +17,29 @@ import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.safeObject; -import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.func.CallJava; -import io.serverlessworkflow.api.types.func.CallJava.CallJavaLoopFunctionIndex; import io.serverlessworkflow.api.types.func.LoopFunctionIndex; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowModelFactory; -import io.serverlessworkflow.impl.WorkflowMutablePosition; -import io.serverlessworkflow.impl.executors.CallableTask; -import io.serverlessworkflow.impl.executors.CallableTaskBuilder; -import java.util.concurrent.CompletableFuture; -public class JavaLoopFunctionIndexCallExecutor - implements CallableTaskBuilder { +public class JavaLoopFunctionIndexCallExecutor extends AbstractJavaCallExecutor { - private LoopFunctionIndex function; - private String varName; - private String indexName; + private final LoopFunctionIndex function; + private final String varName; + private final String indexName; - private CompletableFuture apply( - WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { - WorkflowModelFactory modelFactory = workflowContext.definition().application().modelFactory(); - - return CompletableFuture.completedFuture( - modelFactory.fromAny( - input, - function.apply( - input.asJavaObject(), - safeObject(taskContext.variables().get(varName)), - (Integer) safeObject(taskContext.variables().get(indexName))))); - } - - @Override - public boolean accept(Class clazz) { - return CallJava.CallJavaLoopFunctionIndex.class.isAssignableFrom(clazz); - } - - @Override - public void init( - CallJavaLoopFunctionIndex task, - WorkflowDefinition definition, - WorkflowMutablePosition position) { - function = task.function(); - varName = task.varName(); - indexName = task.indexName(); + public JavaLoopFunctionIndexCallExecutor( + LoopFunctionIndex function, String varName, String indexName) { + this.function = function; + this.varName = varName; + this.indexName = indexName; } @Override - public CallableTask build() { - return this::apply; + protected Object callJavaFunction( + WorkflowContext workflowContext, TaskContext taskContext, T input) { + return function.apply( + input, + (V) safeObject(taskContext.variables().get(varName)), + (Integer) safeObject(taskContext.variables().get(indexName))); } } diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionIndexCallExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionIndexCallExecutorBuilder.java new file mode 100644 index 000000000..a59006df2 --- /dev/null +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionIndexCallExecutorBuilder.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.func; + +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.func.CallJava; +import io.serverlessworkflow.api.types.func.CallJava.CallJavaLoopFunctionIndex; +import io.serverlessworkflow.api.types.func.LoopFunctionIndex; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.executors.CallableTaskBuilder; + +public class JavaLoopFunctionIndexCallExecutorBuilder + implements CallableTaskBuilder { + + private LoopFunctionIndex function; + private String varName; + private String indexName; + + @Override + public boolean accept(Class clazz) { + return CallJava.CallJavaLoopFunctionIndex.class.isAssignableFrom(clazz); + } + + @Override + public void init( + CallJavaLoopFunctionIndex task, + WorkflowDefinition definition, + WorkflowMutablePosition position) { + function = task.function(); + varName = task.varName(); + indexName = task.indexName(); + } + + @Override + public CallableTask build() { + return new JavaLoopFunctionIndexCallExecutor<>(function, varName, indexName); + } +} diff --git a/experimental/lambda/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder b/experimental/lambda/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder index a22a15533..01f6607e8 100644 --- a/experimental/lambda/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder +++ b/experimental/lambda/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder @@ -1,6 +1,6 @@ -io.serverlessworkflow.impl.executors.func.JavaLoopFunctionIndexCallExecutor -io.serverlessworkflow.impl.executors.func.JavaLoopFunctionCallExecutor -io.serverlessworkflow.impl.executors.func.JavaFunctionCallExecutor -io.serverlessworkflow.impl.executors.func.JavaConsumerCallExecutor -io.serverlessworkflow.impl.executors.func.JavaContextFunctionCallExecutor -io.serverlessworkflow.impl.executors.func.JavaFilterFunctionCallExecutor +io.serverlessworkflow.impl.executors.func.JavaLoopFunctionIndexCallExecutorBuilder +io.serverlessworkflow.impl.executors.func.JavaLoopFunctionCallExecutorBuilder +io.serverlessworkflow.impl.executors.func.JavaFunctionCallExecutorBuilder +io.serverlessworkflow.impl.executors.func.JavaConsumerCallExecutorBuilder +io.serverlessworkflow.impl.executors.func.JavaContextFunctionCallExecutorBuilder +io.serverlessworkflow.impl.executors.func.JavaFilterFunctionCallExecutorBuilder diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/CallJavaContextFunctionTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/CallJavaContextFunctionTest.java similarity index 98% rename from experimental/lambda/src/test/java/io/serverless/workflow/impl/CallJavaContextFunctionTest.java rename to experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/CallJavaContextFunctionTest.java index 71fcbaf08..2520354d7 100644 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/CallJavaContextFunctionTest.java +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/CallJavaContextFunctionTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverless.workflow.impl; +package io.serverless.workflow.impl.executors.func; import static org.assertj.core.api.Assertions.assertThat; diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/CallTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/CallTest.java similarity index 92% rename from experimental/lambda/src/test/java/io/serverless/workflow/impl/CallTest.java rename to experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/CallTest.java index eff5891bb..5b0f7f6c8 100644 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/CallTest.java +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/CallTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverless.workflow.impl; +package io.serverless.workflow.impl.executors.func; import static org.assertj.core.api.Assertions.assertThat; @@ -38,6 +38,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.function.Function; import java.util.function.Predicate; import org.junit.jupiter.api.Test; @@ -45,6 +46,21 @@ class CallTest { @Test void testJavaFunction() throws InterruptedException, ExecutionException { + internalJavaFunctionTest(JavaFunctions::getName, Person.class); + } + + @Test + void testJavaFunctionFuture() throws InterruptedException, ExecutionException { + internalJavaFunctionTest(JavaFunctions::getNameFuture, Person.class); + } + + @Test + void testJavaFunctionConverter() throws InterruptedException, ExecutionException { + internalJavaFunctionTest(JavaFunctions::getNameStringBuilder, Person.class); + } + + private void internalJavaFunctionTest(Function function, Class clazz) + throws InterruptedException, ExecutionException { try (WorkflowApplication app = WorkflowApplication.builder().build()) { Workflow workflow = new Workflow() @@ -56,8 +72,7 @@ void testJavaFunction() throws InterruptedException, ExecutionException { "javaCall", new Task() .withCallTask( - new CallTaskJava( - CallJava.function(JavaFunctions::getName, Person.class)))))); + new CallTaskJava(CallJava.function(function, clazz)))))); assertThat( app.workflowDefinition(workflow) diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/FluentDSLCallTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/FluentDSLCallTest.java similarity index 98% rename from experimental/lambda/src/test/java/io/serverless/workflow/impl/FluentDSLCallTest.java rename to experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/FluentDSLCallTest.java index 0686ca823..fb8c8d940 100644 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/FluentDSLCallTest.java +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/FluentDSLCallTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverless.workflow.impl; +package io.serverless.workflow.impl.executors.func; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; import static org.assertj.core.api.Assertions.assertThat; diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/JavaFunctions.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/JavaFunctions.java similarity index 77% rename from experimental/lambda/src/test/java/io/serverless/workflow/impl/JavaFunctions.java rename to experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/JavaFunctions.java index 455a1d975..17bf9dfd5 100644 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/JavaFunctions.java +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/JavaFunctions.java @@ -13,11 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverless.workflow.impl; +package io.serverless.workflow.impl.executors.func; import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; import java.util.Map; +import java.util.concurrent.CompletableFuture; public class JavaFunctions { @@ -29,6 +30,16 @@ static String getName(Person person) { return person.name() + " Javierito"; } + static CompletableFuture getNameFuture(Person person) { + return CompletableFuture.completedFuture(getName(person)); + } + + static CompletableFuture getNameStringBuilder(Person person) { + StringBuilder sb = new StringBuilder(person.name()); + sb.append(" Javierito"); + return CompletableFuture.completedFuture(sb); + } + static String getFilterName( Person person, WorkflowContextData workflowContext, TaskContextData taskContext) { return person.name() + "_" + workflowContext.instanceData().id() + "_" + taskContext.taskName(); diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/ModelTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ModelTest.java similarity index 99% rename from experimental/lambda/src/test/java/io/serverless/workflow/impl/ModelTest.java rename to experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ModelTest.java index 2a7c26f52..aef280b93 100644 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/ModelTest.java +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ModelTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverless.workflow.impl; +package io.serverless.workflow.impl.executors.func; import static org.assertj.core.api.Assertions.assertThat; diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/Person.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/Person.java similarity index 92% rename from experimental/lambda/src/test/java/io/serverless/workflow/impl/Person.java rename to experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/Person.java index 9594c285d..b0b891fe3 100644 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/Person.java +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/Person.java @@ -13,6 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverless.workflow.impl; +package io.serverless.workflow.impl.executors.func; record Person(String name, int age) {} diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/StringBuilder2String.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/StringBuilder2String.java new file mode 100644 index 000000000..a64f0d57d --- /dev/null +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/StringBuilder2String.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverless.workflow.impl.executors.func; + +import io.serverlessworkflow.impl.executors.func.DataTypeConverter; + +public class StringBuilder2String implements DataTypeConverter { + + @Override + public String apply(StringBuilder t) { + return t.toString(); + } + + @Override + public Class sourceType() { + return StringBuilder.class; + } +} diff --git a/experimental/lambda/src/test/resources/META-INF/services/io.serverlessworkflow.impl.executors.func.DataTypeConverter b/experimental/lambda/src/test/resources/META-INF/services/io.serverlessworkflow.impl.executors.func.DataTypeConverter new file mode 100644 index 000000000..c3cb695ad --- /dev/null +++ b/experimental/lambda/src/test/resources/META-INF/services/io.serverlessworkflow.impl.executors.func.DataTypeConverter @@ -0,0 +1 @@ +io.serverless.workflow.impl.executors.func.StringBuilder2String \ No newline at end of file