Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,40 +222,40 @@ public static <T> Consumer<FuncEmitTaskBuilder> event(
*
* @param type CloudEvent type
* @param function function that maps workflow input to {@link CloudEventData}
* @param clazz expected input class for conversion
* @param inputClass expected input class for conversion
* @param <T> input type
* @return a consumer to configure {@link FuncEmitTaskBuilder}
*/
public static <T> Consumer<FuncEmitTaskBuilder> event(
String type, Function<T, CloudEventData> function, Class<T> clazz) {
return OPS.event(type, function, clazz);
String type, Function<T, CloudEventData> function, Class<T> inputClass) {
return OPS.event(type, function, inputClass);
}

/**
* Emit a JSON CloudEvent for a POJO input type. Sets {@code contentType=application/json} and
* serializes the input to bytes using the configured JSON mapper.
*
* @param type CloudEvent type
* @param clazz input POJO class (used for typing and conversion)
* @param inputClass input POJO class (used for typing and conversion)
* @param <T> input type
* @return a consumer to configure {@link FuncEmitTaskBuilder}
*/
public static <T> Consumer<FuncEmitTaskBuilder> eventJson(String type, Class<T> clazz) {
return b -> new FuncEmitSpec().type(type).jsonData(clazz).accept(b);
public static <T> Consumer<FuncEmitTaskBuilder> eventJson(String type, Class<T> inputClass) {
return b -> new FuncEmitSpec().type(type).jsonData(inputClass).accept(b);
}

/**
* Emit a CloudEvent with arbitrary bytes payload generated by a custom serializer.
*
* @param type CloudEvent type
* @param serializer function producing bytes from the input
* @param clazz expected input class for conversion
* @param inputClass expected input class for conversion
* @param <T> input type
* @return a consumer to configure {@link FuncEmitTaskBuilder}
*/
public static <T> Consumer<FuncEmitTaskBuilder> eventBytes(
String type, Function<T, byte[]> serializer, Class<T> clazz) {
return b -> new FuncEmitSpec().type(type).bytesData(serializer, clazz).accept(b);
String type, Function<T, byte[]> serializer, Class<T> inputClass) {
return b -> new FuncEmitSpec().type(type).bytesData(serializer, inputClass).accept(b);
}

/**
Expand Down Expand Up @@ -284,13 +284,13 @@ public static FuncPredicateEventConfigurer event(String type) {
* type.
*
* @param fn the function to execute at runtime
* @param clazz expected input class for model conversion
* @param inputClass expected input class for model conversion
* @param <T> input type
* @param <R> result type
* @return a call step which supports chaining (e.g., {@code .exportAs(...).when(...)})
*/
public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn, Class<T> clazz) {
return new FuncCallStep<>(fn, clazz);
public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn, Class<T> inputClass) {
return new FuncCallStep<>(fn, inputClass);
}

/**
Expand Down Expand Up @@ -440,26 +440,26 @@ public static <T, R> FuncCallStep<T, R> withUniqueId(UniqueIdBiFunction<T, R> fn
* Create a fire-and-forget side-effect step (unnamed). The consumer receives the typed input.
*
* @param consumer side-effect function
* @param clazz expected input class for conversion
* @param inputClass expected input class for conversion
* @param <T> input type
* @return a {@link ConsumeStep} which can be chained and added via {@link
* #tasks(FuncTaskConfigurer...)}
*/
public static <T> ConsumeStep<T> consume(Consumer<T> consumer, Class<T> clazz) {
return new ConsumeStep<>(consumer, clazz);
public static <T> ConsumeStep<T> consume(Consumer<T> consumer, Class<T> inputClass) {
return new ConsumeStep<>(consumer, inputClass);
}

/**
* Named variant of {@link #consume(Consumer, Class)}.
*
* @param name task name
* @param consumer side-effect function
* @param clazz expected input class
* @param inputClass expected input class
* @param <T> input type
* @return a named {@link ConsumeStep}
*/
public static <T> ConsumeStep<T> consume(String name, Consumer<T> consumer, Class<T> clazz) {
return new ConsumeStep<>(name, consumer, clazz);
public static <T> ConsumeStep<T> consume(String name, Consumer<T> consumer, Class<T> inputClass) {
return new ConsumeStep<>(name, consumer, inputClass);
}

/**
Expand Down Expand Up @@ -506,8 +506,8 @@ public static <T, R> FuncCallStep<T, R> agent(
* @return a call step
*/
public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn) {
Class<T> clazz = ReflectionUtils.inferInputType(fn);
return new FuncCallStep<>(fn, clazz);
Class<T> inputClass = ReflectionUtils.inferInputType(fn);
return new FuncCallStep<>(fn, inputClass);
}

/**
Expand All @@ -520,22 +520,23 @@ public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn) {
* @return a named call step
*/
public static <T, R> FuncCallStep<T, R> function(String name, Function<T, R> fn) {
Class<T> clazz = ReflectionUtils.inferInputType(fn);
return new FuncCallStep<>(name, fn, clazz);
Class<T> inputClass = ReflectionUtils.inferInputType(fn);
return new FuncCallStep<>(name, fn, inputClass);
}

/**
* Named variant of {@link #function(Function, Class)} with explicit input type.
*
* @param name task name
* @param fn the function to execute
* @param clazz expected input class
* @param inputClass expected input class
* @param <T> input type
* @param <R> output type
* @return a named call step
*/
public static <T, R> FuncCallStep<T, R> function(String name, Function<T, R> fn, Class<T> clazz) {
return new FuncCallStep<>(name, fn, clazz);
public static <T, R> FuncCallStep<T, R> function(
String name, Function<T, R> fn, Class<T> inputClass) {
return new FuncCallStep<>(name, fn, inputClass);
}

// ------------------ tasks ---------------- //
Expand Down Expand Up @@ -606,51 +607,52 @@ public static <T> EmitStep emit(String name, String type, Function<T, CloudEvent
* @param name task name
* @param type CloudEvent type
* @param serializer function producing bytes
* @param clazz expected input class
* @param inputClass expected input class
* @param <T> input type
* @return a named {@link EmitStep}
*/
public static <T> EmitStep emit(
String name, String type, Function<T, byte[]> serializer, Class<T> clazz) {
return new EmitStep(name, eventBytes(type, serializer, clazz));
String name, String type, Function<T, byte[]> serializer, Class<T> inputClass) {
return new EmitStep(name, eventBytes(type, serializer, inputClass));
}

/**
* Unnamed variant of {@link #emit(String, String, Function, Class)}.
*
* @param type CloudEvent type
* @param serializer function producing bytes
* @param clazz expected input class
* @param inputClass expected input class
* @param <T> input type
* @return an {@link EmitStep}
*/
public static <T> EmitStep emit(String type, Function<T, byte[]> serializer, Class<T> clazz) {
return new EmitStep(null, eventBytes(type, serializer, clazz));
public static <T> EmitStep emit(
String type, Function<T, byte[]> serializer, Class<T> inputClass) {
return new EmitStep(null, eventBytes(type, serializer, inputClass));
}

/**
* Emit a JSON CloudEvent from a POJO input class (unnamed).
*
* @param type CloudEvent type
* @param clazz input POJO class
* @param inputClass input POJO class
* @param <T> input type
* @return an {@link EmitStep}
*/
public static <T> EmitStep emitJson(String type, Class<T> clazz) {
return new EmitStep(null, eventJson(type, clazz));
public static <T> EmitStep emitJson(String type, Class<T> inputClass) {
return new EmitStep(null, eventJson(type, inputClass));
}

/**
* Emit a JSON CloudEvent from a POJO input class (named).
*
* @param name task name
* @param type CloudEvent type
* @param clazz input POJO class
* @param inputClass input POJO class
* @param <T> input type
* @return a named {@link EmitStep}
*/
public static <T> EmitStep emitJson(String name, String type, Class<T> clazz) {
return new EmitStep(name, eventJson(type, clazz));
public static <T> EmitStep emitJson(String name, String type, Class<T> inputClass) {
return new EmitStep(name, eventJson(type, inputClass));
}

/**
Expand Down