Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.func;

import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowModelFactory;
import io.serverlessworkflow.impl.executors.CallableTask;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

public abstract class AbstractJavaCallExecutor<T> implements CallableTask {

protected final Optional<Class<T>> inputClass;

protected AbstractJavaCallExecutor() {
this(Optional.empty());
}

protected AbstractJavaCallExecutor(Optional<Class<T>> inputClass) {
this.inputClass = inputClass;
}

@Override
public CompletableFuture<WorkflowModel> apply(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
Object result = callJavaFunction(workflowContext, taskContext, model2Input(input));
WorkflowModelFactory modelFactory = workflowContext.definition().application().modelFactory();
return result instanceof CompletableFuture future
? future.thenApply(v -> output2Model(modelFactory, input, v))
: CompletableFuture.completedFuture(output2Model(modelFactory, input, result));
}

protected abstract Object callJavaFunction(
WorkflowContext workflowContext, TaskContext taskContext, T input);

protected T model2Input(WorkflowModel model) {
return JavaFuncUtils.convertT(model, inputClass);
}

protected Object convertResponse(Object obj) {
return obj == null
? null
: DataTypeConverterRegistry.get().find(obj.getClass()).map(c -> c.apply(obj)).orElse(obj);
}

protected WorkflowModel output2Model(
WorkflowModelFactory modelFactory, WorkflowModel input, Object result) {
return modelFactory.fromAny(input, convertResponse(result));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.func;

import io.serverlessworkflow.impl.ServicePriority;
import java.util.function.Function;

public interface DataTypeConverter<T, V> extends Function<T, V>, ServicePriority {
Class<T> sourceType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.func;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;

public class DataTypeConverterRegistry {

private static final DataTypeConverterRegistry instance = new DataTypeConverterRegistry();

public static DataTypeConverterRegistry get() {
return instance;
}

@SuppressWarnings("rawtypes")
private final Iterable<DataTypeConverter> converters;

@SuppressWarnings("rawtypes")
private final Map<Class<?>, Optional<DataTypeConverter>> convertersMap;

private DataTypeConverterRegistry() {
this.converters = ServiceLoader.load(DataTypeConverter.class);
this.convertersMap = new ConcurrentHashMap<>();
}

@SuppressWarnings("rawtypes")
public Optional<DataTypeConverter> find(Class clazz) {
return convertersMap.computeIfAbsent(clazz, this::searchConverter);
}

@SuppressWarnings({"rawtypes", "unchecked"})
private Optional<DataTypeConverter> searchConverter(Class<?> clazz) {
List<DataTypeConverter> candidates = new ArrayList<>();
for (DataTypeConverter converter : converters) {
if (converter.sourceType().equals(clazz)) {
candidates.add(converter);
}
}
if (!candidates.isEmpty()) {
return first(candidates);
}

for (DataTypeConverter converter : converters) {
if (converter.sourceType().isAssignableFrom(clazz)) {
candidates.add(converter);
}
}
return candidates.isEmpty() ? Optional.empty() : first(candidates);
}

@SuppressWarnings("rawtypes")
private Optional<DataTypeConverter> first(List<DataTypeConverter> candidates) {
Collections.sort(candidates);
return Optional.of(candidates.get(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,47 +15,29 @@
*/
package io.serverlessworkflow.impl.executors.func;

import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

public class JavaConsumerCallExecutor<T>
implements CallableTaskBuilder<CallJava.CallJavaConsumer<T>> {
public class JavaConsumerCallExecutor<T> implements CallableTask {

private Consumer<T> consumer;
private Optional<Class<T>> inputClass;
private final Optional<Class<T>> inputClass;
private final Consumer<T> consumer;

public void init(
CallJava.CallJavaConsumer<T> task,
WorkflowDefinition definition,
WorkflowMutablePosition position) {
consumer = task.consumer();
inputClass = task.inputClass();
public JavaConsumerCallExecutor(Optional<Class<T>> inputClass, Consumer<T> consumer) {
this.inputClass = inputClass;
this.consumer = consumer;
}

private CompletableFuture<WorkflowModel> apply(
@Override
public CompletableFuture<WorkflowModel> apply(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
T typed = JavaFuncUtils.convertT(input, inputClass);
consumer.accept(typed);
return CompletableFuture.completedFuture(input);
}

@Override
public boolean accept(Class<? extends TaskBase> clazz) {
return CallJava.CallJavaConsumer.class.isAssignableFrom(clazz);
}

@Override
public CallableTask build() {
return this::apply;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.func;

import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
import java.util.Optional;
import java.util.function.Consumer;

public class JavaConsumerCallExecutorBuilder<T>
implements CallableTaskBuilder<CallJava.CallJavaConsumer<T>> {

private Consumer<T> consumer;
private Optional<Class<T>> inputClass;

public void init(
CallJava.CallJavaConsumer<T> task,
WorkflowDefinition definition,
WorkflowMutablePosition position) {
consumer = task.consumer();
inputClass = task.inputClass();
}

@Override
public boolean accept(Class<? extends TaskBase> clazz) {
return CallJava.CallJavaConsumer.class.isAssignableFrom(clazz);
}

@Override
public CallableTask build() {
return new JavaConsumerCallExecutor<T>(inputClass, consumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,24 @@
*/
package io.serverlessworkflow.impl.executors.func;

import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.api.types.func.CallJava.CallJavaContextFunction;
import io.serverlessworkflow.api.types.func.JavaContextFunction;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

public class JavaContextFunctionCallExecutor<T, V>
implements CallableTaskBuilder<CallJava.CallJavaContextFunction<T, V>> {
public class JavaContextFunctionCallExecutor<T, V> extends AbstractJavaCallExecutor<T> {

private JavaContextFunction<T, V> function;
private Optional<Class<T>> inputClass;
private final JavaContextFunction<T, V> function;

@Override
public boolean accept(Class<? extends TaskBase> clazz) {
return CallJava.CallJavaContextFunction.class.isAssignableFrom(clazz);
}

@Override
public void init(
CallJavaContextFunction<T, V> task,
WorkflowDefinition definition,
WorkflowMutablePosition position) {
this.function = task.function();
this.inputClass = task.inputClass();
public JavaContextFunctionCallExecutor(
Optional<Class<T>> inputClass, JavaContextFunction<T, V> function) {
super(inputClass);
this.function = function;
}

@Override
public CallableTask build() {
return this::apply;
}

private CompletableFuture<WorkflowModel> apply(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
return CompletableFuture.completedFuture(
workflowContext
.definition()
.application()
.modelFactory()
.fromAny(
input, function.apply(JavaFuncUtils.convertT(input, inputClass), workflowContext)));
protected Object callJavaFunction(
WorkflowContext workflowContext, TaskContext taskContext, T input) {
return function.apply(input, workflowContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.func;

import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.api.types.func.CallJava.CallJavaContextFunction;
import io.serverlessworkflow.api.types.func.JavaContextFunction;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
import java.util.Optional;

public class JavaContextFunctionCallExecutorBuilder<T, V>
implements CallableTaskBuilder<CallJava.CallJavaContextFunction<T, V>> {

protected JavaContextFunction<T, V> function;
protected Optional<Class<T>> inputClass;

@Override
public boolean accept(Class<? extends TaskBase> clazz) {
return CallJava.CallJavaContextFunction.class.isAssignableFrom(clazz);
}

@Override
public void init(
CallJavaContextFunction<T, V> task,
WorkflowDefinition definition,
WorkflowMutablePosition position) {
this.function = task.function();
this.inputClass = task.inputClass();
}

@Override
public CallableTask build() {
return new JavaContextFunctionCallExecutor<T, V>(inputClass, function);
}
}
Loading