-
Notifications
You must be signed in to change notification settings - Fork 508
Description
Search before asking
- I searched in the issues and found nothing similar.
Motivation
Although the implementation is there, we need to provide a cleaner source/sink API as it is currently somewhat messy and has a lot of arguments.
Solution
- Introduce Deserialization Schemas #625
- Introduce FlussSource for Flink DataStream API #626
- Introduce FlussSink for Flink DataStream API #627
- Add Flink DataStream Source/Sink Documentation #628
Source Features
- Add deserialization schema
- Make FlinkSource and related classes Generic
- Introduce FlussSource
- Support projection pushdown via field names
Fluss Source Usage:
FlussSource<Order> flussSource =
FlussSource.<Order>builder()
.setBootstrapServers(bootstrapServers)
.setDatabase(DEFAULT_DB)
.setTable(pkTableName)
.setStartingOffsets(OffsetsInitializer.earliest())
.setScanPartitionDiscoveryIntervalMs(1000L)
.setDeserializationSchema(new OrderDeserializationSchema())
.build();
DataStreamSource<Order> stream =
env.fromSource(flussSource, WatermarkStrategy.noWatermarks(), "Fluss Source");
Currently, it supports setting some key properties via the builder pattern, but to avoid overwhelming users they can set the rest with Configuration via the setFlussConfig(). We can always add more if required and after feedback, I just thought it's better like this for now.
Sink Features
Fluss Sink Usage:
FlinkSink<Order> flussSink =
FlussSink.<Order>builder()
.setBootstrapServers(bootstrapServers)
.setDatabase(DEFAULT_DB)
.setTable(pkTableName)
.useUpsert()
.setRowType(rowType)
.setInputType(Order.class) // Signals that no conversion is needed
.build();
stream.sinkTo(flussSink).name("Fluss Sink");
Anything else?
No response
Willingness to contribute
- I'm willing to submit a PR!
Reactions are currently unavailable