Skip to content

[flink] CDC Ingestion supported metadata columns#2077

Merged
JingsongLi merged 3 commits intoapache:masterfrom
MonsterChenzhuo:meta
Oct 8, 2023
Merged

[flink] CDC Ingestion supported metadata columns#2077
JingsongLi merged 3 commits intoapache:masterfrom
MonsterChenzhuo:meta

Conversation

@MonsterChenzhuo
Copy link
Contributor

@MonsterChenzhuo MonsterChenzhuo commented Sep 26, 2023

Purpose

close #1985

Tests

MySqlSyncDatabaseActionITCase#testMetadataColumns()
MySqlSyncTableActionITCase#testMetadataColumns()

API and Format

Documentation

@MonsterChenzhuo MonsterChenzhuo changed the title [WIP][flink] CDC Ingestion supported metadata columns [flink] CDC Ingestion supported metadata columns Sep 27, 2023
@MonsterChenzhuo MonsterChenzhuo changed the title [flink] CDC Ingestion supported metadata columns [WIP][flink] CDC Ingestion supported metadata columns Sep 27, 2023
@MonsterChenzhuo MonsterChenzhuo force-pushed the meta branch 2 times, most recently from c197aa7 to 92c26ea Compare September 27, 2023 07:10
@MonsterChenzhuo MonsterChenzhuo changed the title [WIP][flink] CDC Ingestion supported metadata columns [flink] CDC Ingestion supported metadata columns Sep 27, 2023
@JingsongLi
Copy link
Contributor

Please rebase master

@MonsterChenzhuo
Copy link
Contributor Author

@JingsongLi PTAL,thanks.

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @MonsterChenzhuo , left some minor comments.


@Override
public Map<String, String> read(JsonNode record) {
String dbName =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dbName?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


@Override
public Map<String, String> read(JsonNode record) {
String dbName =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dbName?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

*/
public interface CdcMetadataConverter extends Serializable {

Map<String, String> read(JsonNode var1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var1 to a meaningful name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
}

for (CdcMetadataConverter metadataConverters : metadataConverters) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadataConverter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

record.get("source").get(AbstractSourceInfo.TABLE_NAME_KEY).asText();
Map<String, String> resultMap = new HashMap<>();
resultMap.put("table_name", dbName);
return resultMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.singletonMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

record.get("source").get(AbstractSourceInfo.DATABASE_NAME_KEY).asText();
Map<String, String> resultMap = new HashMap<>();
resultMap.put("database_name", dbName);
return resultMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.singletonMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

record.get("source").get(AbstractSourceInfo.TIMESTAMP_KEY).asText();
Map<String, String> resultMap = new HashMap<>();
resultMap.put("op_ts", dbName);
return resultMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.singletonMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit 04179e3 into apache:master Oct 8, 2023
pongandnoon added a commit to tongcheng-elong/incubator-paimon that referenced this pull request Oct 25, 2023
(cherry picked from commit 04179e3)

# Conflicts:
#	paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
#	paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
#	paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
pongandnoon pushed a commit to tongcheng-elong/incubator-paimon that referenced this pull request Oct 25, 2023
JingsongLi pushed a commit that referenced this pull request Mar 5, 2026
This PR supercedes #6353 - I took
over this work from my colleague @gmdfalk

## Description
Add --metadata_column support to Paimon Kafka CDC connector, similar to
the already existing options added for MySQL and Postgres:
#2077
Also add optional --metadata_column_prefix to avoid conflicts with
existing Paimon fields like topic, timestamp etc.

Supported metadata columns are those on
org.apache.kafka.clients.consumer.ConsumerRecord i.e.:

- topic
- partition
- offset
- timestamp
- timestampType: This is the name of the enum i.e. NoTimestampType,
CreateTime or LogAppendTime

The feature is backwards compatible. It's only active when
--metadata_column is supplied resp.
SynchronizationActionBase.withMetadataColumns is used.

## Motivation
This is a requested feature:
#3210

We primarly use this feature for two purposes:

1. Troubleshooting and data lineage (e.g. where in our Kafka
infrastructure does this Paimon row come from?)
2. Mapping large Kafka topic partitions 1:1 to Paimon buckets to avoid
reshuffling (see this issue it would solve:
#3249)

## Tests
Unit and Integration Tests

## API and Format
No changes to public apis or storage format.

The changes here are contained to the flink cdc package but I did have
to update CdcSourceRecord since it previously didn't provide a way to
surface arbitrary metadata for a record.

The metadata attribute on CdcSourceRecord is intentionally a generic Map
so that it can potentially be used to add metadata support for other
connectors like Pulsar or Mongo that are not yet implemented.

## Documentation
Added the new --metadata_column and --metadata_column_prefix parameter
to Kafka CDC docs.

## Dev notes
For running integration tests on MacOS with Rancher Desktop, i had to
properly expose the docker socket to testcontainers e.g. system wide via
sudo ln -sf "$HOME/.rd/docker.sock" /var/run/docker.sock.

---------

Signed-off-by: Max Falk <gfalk@yelp.com>
Co-authored-by: Max Falk <gfalk@yelp.com>
Co-authored-by: Max Falk <279131+gmdfalk@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] CDC Ingestion supported metadata columns

2 participants