diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceReader.java new file mode 100644 index 000000000..586809bbc --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceReader.java @@ -0,0 +1,43 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import java.util.Optional; +import java.util.stream.Stream; + +public abstract class AbstractPersistenceInstanceReader implements PersistenceInstanceReader { + + protected final Stream scanAll( + PersistenceInstanceOperations operations, + WorkflowDefinition definition, + String applicationId) { + return operations + .scanAll(applicationId, definition) + .map(v -> new WorkflowPersistenceInstance(definition, v)); + } + + protected final Optional find( + PersistenceInstanceOperations operations, WorkflowDefinition definition, String instanceId) { + return operations + .readWorkflowInfo(definition, instanceId) + .map(i -> new WorkflowPersistenceInstance(definition, i)); + } + + @Override + public void close() {} +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java new file mode 100644 index 000000000..aa8cd0298 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java @@ -0,0 +1,84 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowStatus; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +public abstract class AbstractPersistenceInstanceWriter implements PersistenceInstanceWriter { + + @Override + public CompletableFuture started(WorkflowContextData workflowContext) { + return doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext); + } + + @Override + public CompletableFuture completed(WorkflowContextData workflowContext) { + return removeProcessInstance(workflowContext); + } + + @Override + public CompletableFuture failed(WorkflowContextData workflowContext, Throwable ex) { + return removeProcessInstance(workflowContext); + } + + @Override + public CompletableFuture aborted(WorkflowContextData workflowContext) { + return removeProcessInstance(workflowContext); + } + + protected CompletableFuture removeProcessInstance(WorkflowContextData workflowContext) { + return doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext); + } + + @Override + public CompletableFuture taskStarted( + WorkflowContextData workflowContext, TaskContextData taskContext) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture taskRetried( + WorkflowContextData workflowContext, TaskContextData taskContext) { + return doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext); + } + + @Override + public CompletableFuture taskCompleted( + WorkflowContextData workflowContext, TaskContextData taskContext) { + return doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext); + } + + @Override + public CompletableFuture suspended(WorkflowContextData workflowContext) { + return doTransaction( + t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext); + } + + @Override + public CompletableFuture resumed(WorkflowContextData workflowContext) { + return doTransaction(t -> t.clearStatus(workflowContext), workflowContext); + } + + @Override + public void close() throws Exception {} + + protected abstract CompletableFuture doTransaction( + Consumer operation, WorkflowContextData context); +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java index 2c1473d13..1420b1917 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java @@ -20,7 +20,7 @@ import java.util.Optional; import java.util.stream.Stream; -public class DefaultPersistenceInstanceReader implements PersistenceInstanceReader { +public class DefaultPersistenceInstanceReader extends AbstractPersistenceInstanceReader { private final PersistenceInstanceStore store; @@ -32,7 +32,7 @@ protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) { public Optional find(WorkflowDefinition definition, String instanceId) { PersistenceInstanceTransaction transaction = store.begin(); try { - Optional instance = read(transaction, definition, instanceId); + Optional instance = find(transaction, definition, instanceId); transaction.commit(definition); return instance; } catch (Exception ex) { @@ -41,21 +41,10 @@ public Optional find(WorkflowDefinition definition, String ins } } - private Optional read( - PersistenceInstanceTransaction t, WorkflowDefinition definition, String instanceId) { - return t.readWorkflowInfo(definition, instanceId) - .map(i -> new WorkflowPersistenceInstance(definition, i)); - } - @Override public Stream scanAll(WorkflowDefinition definition, String applicationId) { PersistenceInstanceTransaction transaction = store.begin(); - return transaction - .scanAll(applicationId, definition) - .onClose(() -> transaction.commit(definition)) - .map(v -> new WorkflowPersistenceInstance(definition, v)); + return super.scanAll(transaction, definition, applicationId) + .onClose(() -> transaction.commit(definition)); } - - @Override - public void close() {} } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java index f599a743b..1dd946fde 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java @@ -15,26 +15,28 @@ */ package io.serverlessworkflow.impl.persistence; -import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; import io.serverlessworkflow.impl.WorkflowDefinitionData; -import io.serverlessworkflow.impl.WorkflowStatus; import java.time.Duration; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class DefaultPersistenceInstanceWriter implements PersistenceInstanceWriter { +public class DefaultPersistenceInstanceWriter extends AbstractPersistenceInstanceWriter { private final PersistenceInstanceStore store; private final Map> futuresMap = new ConcurrentHashMap<>(); private final Optional executorService; private final Duration closeTimeout; + private static final Logger logger = + LoggerFactory.getLogger(DefaultPersistenceInstanceWriter.class); + protected DefaultPersistenceInstanceWriter( PersistenceInstanceStore store, Optional executorService, @@ -45,61 +47,14 @@ protected DefaultPersistenceInstanceWriter( } @Override - public CompletableFuture started(WorkflowContextData workflowContext) { - return doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext); - } - - @Override - public CompletableFuture completed(WorkflowContextData workflowContext) { - return removeProcessInstance(workflowContext); - } - - @Override - public CompletableFuture failed(WorkflowContextData workflowContext, Throwable ex) { - return removeProcessInstance(workflowContext); - } - - @Override - public CompletableFuture aborted(WorkflowContextData workflowContext) { - return removeProcessInstance(workflowContext); - } - protected CompletableFuture removeProcessInstance(WorkflowContextData workflowContext) { - return doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext) + return super.removeProcessInstance(workflowContext) .thenRun(() -> futuresMap.remove(workflowContext.instanceData().id())); } @Override - public CompletableFuture taskStarted( - WorkflowContextData workflowContext, TaskContextData taskContext) { - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture taskRetried( - WorkflowContextData workflowContext, TaskContextData taskContext) { - return doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext); - } - - @Override - public CompletableFuture taskCompleted( - WorkflowContextData workflowContext, TaskContextData taskContext) { - return doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext); - } - - @Override - public CompletableFuture suspended(WorkflowContextData workflowContext) { - return doTransaction( - t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext); - } - - @Override - public CompletableFuture resumed(WorkflowContextData workflowContext) { - return doTransaction(t -> t.clearStatus(workflowContext), workflowContext); - } - - private CompletableFuture doTransaction( - Consumer operation, WorkflowContextData context) { + protected CompletableFuture doTransaction( + Consumer operation, WorkflowContextData context) { final ExecutorService service = this.executorService.orElse(context.definition().application().executorService()); final Runnable runnable = () -> executeTransaction(operation, context.definition()); @@ -112,13 +67,17 @@ private CompletableFuture doTransaction( } private void executeTransaction( - Consumer operation, WorkflowDefinitionData definition) { + Consumer operation, WorkflowDefinitionData definition) { PersistenceInstanceTransaction transaction = store.begin(); try { operation.accept(transaction); transaction.commit(definition); } catch (Exception ex) { - transaction.rollback(definition); + try { + transaction.rollback(definition); + } catch (Exception rollEx) { + logger.warn("Exception during rollback. Ignoring it", ex); + } throw ex; } } @@ -126,14 +85,5 @@ private void executeTransaction( @Override public void close() { futuresMap.clear(); - executorService.ifPresent( - e -> { - try { - e.awaitTermination(closeTimeout.toMillis(), TimeUnit.MILLISECONDS); - e.shutdown(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - }); } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceOperations.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceOperations.java new file mode 100644 index 000000000..264831b62 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceOperations.java @@ -0,0 +1,42 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowStatus; +import java.util.Optional; +import java.util.stream.Stream; + +public interface PersistenceInstanceOperations { + void writeInstanceData(WorkflowContextData workflowContext); + + void writeRetryTask(WorkflowContextData workflowContext, TaskContextData taskContext); + + void writeCompletedTask(WorkflowContextData workflowContext, TaskContextData taskContext); + + void writeStatus(WorkflowContextData workflowContext, WorkflowStatus suspended); + + void removeProcessInstance(WorkflowContextData workflowContext); + + void clearStatus(WorkflowContextData workflowContext); + + Stream scanAll(String applicationId, WorkflowDefinition definition); + + Optional readWorkflowInfo( + WorkflowDefinition definition, String instanceId); +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java index 7606f99f2..c8a2a2ab1 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceTransaction.java @@ -15,34 +15,11 @@ */ package io.serverlessworkflow.impl.persistence; -import io.serverlessworkflow.impl.TaskContextData; -import io.serverlessworkflow.impl.WorkflowContextData; -import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowDefinitionData; -import io.serverlessworkflow.impl.WorkflowStatus; -import java.util.Optional; -import java.util.stream.Stream; -public interface PersistenceInstanceTransaction { +public interface PersistenceInstanceTransaction extends PersistenceInstanceOperations { void commit(WorkflowDefinitionData definition); void rollback(WorkflowDefinitionData definition); - - void writeInstanceData(WorkflowContextData workflowContext); - - void writeRetryTask(WorkflowContextData workflowContext, TaskContextData taskContext); - - void writeCompletedTask(WorkflowContextData workflowContext, TaskContextData taskContext); - - void writeStatus(WorkflowContextData workflowContext, WorkflowStatus suspended); - - void removeProcessInstance(WorkflowContextData workflowContext); - - void clearStatus(WorkflowContextData workflowContext); - - Stream scanAll(String applicationId, WorkflowDefinition definition); - - Optional readWorkflowInfo( - WorkflowDefinition definition, String instanceId); } diff --git a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java new file mode 100644 index 000000000..bd57fc59e --- /dev/null +++ b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java @@ -0,0 +1,169 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowInstanceData; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.executors.TransitionInfo; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; +import io.serverlessworkflow.impl.persistence.WorkflowPersistenceInstance; +import java.io.IOException; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +public abstract class AbstractHandlerPersistenceTest { + + private PersistenceInstanceHandlers handlers; + private static WorkflowApplication app; + private static WorkflowDefinition definition; + protected WorkflowModel context; + protected WorkflowInstanceData workflowInstance; + protected WorkflowContextData workflowContext; + + @BeforeAll() + static void init() throws IOException { + app = WorkflowApplication.builder().build(); + definition = app.workflowDefinition(readWorkflowFromClasspath("simple-expression.yaml")); + } + + @BeforeEach + void setup() { + handlers = getPersistenceHandlers(); + context = app.modelFactory().fromNull(); + workflowContext = mock(WorkflowContext.class); + workflowInstance = mock(WorkflowInstance.class); + when(workflowContext.context()).thenReturn(context); + when(workflowContext.definition()).thenReturn(definition); + when(workflowContext.instanceData()).thenReturn(workflowInstance); + when(workflowInstance.startedAt()).thenReturn(Instant.now()); + when(workflowInstance.context()).thenReturn(context); + when(workflowInstance.id()).thenReturn(app.idFactory().get()); + when(workflowInstance.input()).thenReturn(app.modelFactory().from(Map.of("name", "Javierito"))); + } + + protected abstract PersistenceInstanceHandlers getPersistenceHandlers(); + + protected TaskContextData completedTaskContext( + WorkflowPosition position, Map model) { + TaskContext taskContext = mock(TaskContext.class); + when(taskContext.position()).thenReturn(position); + when(taskContext.completedAt()).thenReturn(Instant.now()); + when(taskContext.output()).thenReturn(app.modelFactory().from(model)); + when(taskContext.transition()).thenReturn(new TransitionInfo(null, true)); + return taskContext; + } + + protected TaskContextData retriedTaskContext(WorkflowPosition position, short retryAttempt) { + TaskContext taskContext = mock(TaskContext.class); + when(taskContext.position()).thenReturn(position); + when(taskContext.retryAttempt()).thenReturn(retryAttempt); + return taskContext; + } + + @AfterEach + void close() { + handlers.close(); + } + + @AfterAll + static void cleanup() { + if (app != null) { + app.close(); + } + } + + @Test + void testWorkflowInstance() throws InterruptedException { + final WorkflowMutablePosition position1 = + app.positionFactory().get().addProperty("do").addIndex(0).addProperty("useExpression"); + final WorkflowMutablePosition position2 = + app.positionFactory().get().addProperty("do").addIndex(1).addProperty("useExpression"); + final short numRetries = 1; + + final Map completedMap = Map.of("name", "fulanito"); + + handlers.writer().started(workflowContext).join(); + handlers + .writer() + .taskRetried(workflowContext, retriedTaskContext(position1, numRetries)) + .join(); + Optional optional = handlers.reader().find(definition, workflowInstance.id()); + assertThat(optional).isPresent(); + WorkflowPersistenceInstance instance = (WorkflowPersistenceInstance) optional.orElseThrow(); + assertThat(instance.input().asMap().orElseThrow()).isEqualTo(Map.of("name", "Javierito")); + assertThat(instance.startedAt()).isNotNull().isBefore(Instant.now()); + + // task retry + WorkflowContext updateWContext = mock(WorkflowContext.class); + TaskContext updateTContext = mock(TaskContext.class); + when(updateTContext.position()).thenReturn(position1); + instance.restoreContext(updateWContext, updateTContext); + ArgumentCaptor retryAttempt = ArgumentCaptor.forClass(Short.class); + verify(updateTContext).retryAttempt(retryAttempt.capture()); + assertThat(retryAttempt.getValue()).isEqualTo(numRetries); + + // task completed + handlers + .writer() + .taskCompleted(workflowContext, completedTaskContext(position2, completedMap)) + .join(); + instance = + (WorkflowPersistenceInstance) + handlers.reader().find(definition, workflowInstance.id()).orElseThrow(); + updateWContext = mock(WorkflowContext.class); + updateTContext = mock(TaskContext.class); + when(updateTContext.position()).thenReturn(position2); + instance.restoreContext(updateWContext, updateTContext); + ArgumentCaptor context = ArgumentCaptor.forClass(WorkflowModel.class); + verify(updateWContext).context(context.capture()); + assertThat(context.getValue()).isEqualTo(app.modelFactory().fromNull()); + ArgumentCaptor model = ArgumentCaptor.forClass(WorkflowModel.class); + verify(updateTContext).output(model.capture()); + assertThat(model.getValue().asMap().orElseThrow()).isEqualTo(completedMap); + ArgumentCaptor instant = ArgumentCaptor.forClass(Instant.class); + verify(updateTContext).completedAt(instant.capture()); + assertThat(instant.getValue()).isNotNull().isAfterOrEqualTo(instance.startedAt()); + ArgumentCaptor transition = ArgumentCaptor.forClass(TransitionInfo.class); + verify(updateTContext).transition(transition.capture()); + assertThat(transition.getValue().isEndNode()).isTrue(); + + // workflow completed + handlers.writer().completed(workflowContext).join(); + assertThat(handlers.reader().find(definition, workflowInstance.id())).isEmpty(); + } +} diff --git a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java index b0994ebac..f791c3174 100644 --- a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java +++ b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java @@ -15,156 +15,15 @@ */ package io.serverlessworkflow.impl.persistence.test; -import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import io.serverlessworkflow.impl.TaskContext; -import io.serverlessworkflow.impl.TaskContextData; -import io.serverlessworkflow.impl.WorkflowApplication; -import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowContextData; -import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowInstance; -import io.serverlessworkflow.impl.WorkflowInstanceData; -import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowMutablePosition; -import io.serverlessworkflow.impl.WorkflowPosition; -import io.serverlessworkflow.impl.executors.TransitionInfo; import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.PersistenceInstanceStore; -import io.serverlessworkflow.impl.persistence.WorkflowPersistenceInstance; -import java.io.IOException; -import java.time.Instant; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.Executors; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; - -public abstract class AbstractPersistenceTest { - - protected abstract PersistenceInstanceStore persistenceStore(); - - private PersistenceInstanceHandlers handlers; - private static WorkflowApplication app; - private static WorkflowDefinition definition; - protected WorkflowModel context; - protected WorkflowInstanceData workflowInstance; - protected WorkflowContextData workflowContext; - @BeforeAll() - static void init() throws IOException { - app = WorkflowApplication.builder().build(); - definition = app.workflowDefinition(readWorkflowFromClasspath("simple-expression.yaml")); - } +public abstract class AbstractPersistenceTest extends AbstractHandlerPersistenceTest { - @BeforeEach - void setup() { - handlers = - DefaultPersistenceInstanceHandlers.builder(persistenceStore()) - .withExecutorService(Executors.newSingleThreadExecutor()) - .build(); - context = app.modelFactory().fromNull(); - workflowContext = mock(WorkflowContext.class); - workflowInstance = mock(WorkflowInstance.class); - when(workflowContext.context()).thenReturn(context); - when(workflowContext.definition()).thenReturn(definition); - when(workflowContext.instanceData()).thenReturn(workflowInstance); - when(workflowInstance.startedAt()).thenReturn(Instant.now()); - when(workflowInstance.context()).thenReturn(context); - when(workflowInstance.id()).thenReturn(app.idFactory().get()); - when(workflowInstance.input()).thenReturn(app.modelFactory().from(Map.of("name", "Javierito"))); + protected PersistenceInstanceHandlers getPersistenceHandlers() { + return DefaultPersistenceInstanceHandlers.builder(persistenceStore()).build(); } - protected TaskContextData completedTaskContext( - WorkflowPosition position, Map model) { - TaskContext taskContext = mock(TaskContext.class); - when(taskContext.position()).thenReturn(position); - when(taskContext.completedAt()).thenReturn(Instant.now()); - when(taskContext.output()).thenReturn(app.modelFactory().from(model)); - when(taskContext.transition()).thenReturn(new TransitionInfo(null, true)); - return taskContext; - } - - protected TaskContextData retriedTaskContext(WorkflowPosition position, short retryAttempt) { - TaskContext taskContext = mock(TaskContext.class); - when(taskContext.position()).thenReturn(position); - when(taskContext.retryAttempt()).thenReturn(retryAttempt); - return taskContext; - } - - @AfterEach - void close() { - handlers.close(); - } - - @AfterAll - static void cleanup() { - if (app != null) { - app.close(); - } - } - - @Test - void testWorkflowInstance() throws InterruptedException { - final WorkflowMutablePosition position = - app.positionFactory().get().addProperty("do").addIndex(0).addProperty("useExpression"); - final short numRetries = 1; - - final Map completedMap = Map.of("name", "fulanito"); - - handlers.writer().started(workflowContext).join(); - handlers.writer().taskRetried(workflowContext, retriedTaskContext(position, numRetries)).join(); - Optional optional = handlers.reader().find(definition, workflowInstance.id()); - assertThat(optional).isPresent(); - WorkflowPersistenceInstance instance = (WorkflowPersistenceInstance) optional.orElseThrow(); - assertThat(instance.input().asMap().orElseThrow()).isEqualTo(Map.of("name", "Javierito")); - assertThat(instance.startedAt()).isNotNull().isBefore(Instant.now()); - - // task retry - WorkflowContext updateWContext = mock(WorkflowContext.class); - TaskContext updateTContext = mock(TaskContext.class); - when(updateTContext.position()).thenReturn(position); - instance.restoreContext(updateWContext, updateTContext); - ArgumentCaptor retryAttempt = ArgumentCaptor.forClass(Short.class); - verify(updateTContext).retryAttempt(retryAttempt.capture()); - assertThat(retryAttempt.getValue()).isEqualTo(numRetries); - - // task completed - handlers - .writer() - .taskCompleted(workflowContext, completedTaskContext(position, completedMap)) - .join(); - instance = - (WorkflowPersistenceInstance) - handlers.reader().find(definition, workflowInstance.id()).orElseThrow(); - updateWContext = mock(WorkflowContext.class); - updateTContext = mock(TaskContext.class); - when(updateTContext.position()).thenReturn(position); - instance.restoreContext(updateWContext, updateTContext); - ArgumentCaptor context = ArgumentCaptor.forClass(WorkflowModel.class); - verify(updateWContext).context(context.capture()); - assertThat(context.getValue()).isEqualTo(app.modelFactory().fromNull()); - ArgumentCaptor model = ArgumentCaptor.forClass(WorkflowModel.class); - verify(updateTContext).output(model.capture()); - assertThat(model.getValue().asMap().orElseThrow()).isEqualTo(completedMap); - ArgumentCaptor instant = ArgumentCaptor.forClass(Instant.class); - verify(updateTContext).completedAt(instant.capture()); - assertThat(instant.getValue()).isNotNull().isAfterOrEqualTo(instance.startedAt()); - ArgumentCaptor transition = ArgumentCaptor.forClass(TransitionInfo.class); - verify(updateTContext).transition(transition.capture()); - assertThat(transition.getValue().isEndNode()).isTrue(); - - // workflow completed - handlers.writer().completed(workflowContext).join(); - assertThat(handlers.reader().find(definition, workflowInstance.id())).isEmpty(); - } + protected abstract PersistenceInstanceStore persistenceStore(); }