Skip to content

[Umbrella][Feature] Datastream API Support #569

@polyzos

Description

@polyzos

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Although the implementation is there, we need to provide a cleaner source/sink API as it is currently somewhat messy and has a lot of arguments.

Solution

Source Features

Fluss Source Usage:

   FlussSource<Order> flussSource =
                FlussSource.<Order>builder()
                        .setBootstrapServers(bootstrapServers)
                        .setDatabase(DEFAULT_DB)
                        .setTable(pkTableName)
                        .setStartingOffsets(OffsetsInitializer.earliest())
                        .setScanPartitionDiscoveryIntervalMs(1000L)
                        .setDeserializationSchema(new OrderDeserializationSchema())
                        .build();

        DataStreamSource<Order> stream =
                env.fromSource(flussSource, WatermarkStrategy.noWatermarks(), "Fluss Source");

Currently, it supports setting some key properties via the builder pattern, but to avoid overwhelming users they can set the rest with Configuration via the setFlussConfig(). We can always add more if required and after feedback, I just thought it's better like this for now.

Sink Features

Fluss Sink Usage:

FlinkSink<Order> flussSink =
                FlussSink.<Order>builder()
                        .setBootstrapServers(bootstrapServers)
                        .setDatabase(DEFAULT_DB)
                        .setTable(pkTableName)
                        .useUpsert()
                        .setRowType(rowType)
                        .setInputType(Order.class) // Signals that no conversion is needed
                        .build();

stream.sinkTo(flussSink).name("Fluss Sink");

Anything else?

No response

Willingness to contribute

  • I'm willing to submit a PR!

Sub-issues

Metadata

Metadata

Assignees

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions