Skip to content

KingbaseV9 flinkcdc实时同步到doris,全量同步阶段出现日期转换错误java.lang.UnsupportedOperationException: null #474

@ykmx

Description

@ykmx

建表语句

CREATE TABLE `flow_node_expand` (
	`id` character varying(64) COLLATE `ci_x_icu` NOT NULL,
	`default_assignee` character varying(255) COLLATE `ci_x_icu` NULL DEFAULT '0'::varchar,
	`create_by` character varying(255) COLLATE `ci_x_icu` NULL,
	`create_date` datetime NULL,
	`update_by` character varying(255) COLLATE `ci_x_icu` NULL,
	`update_date` datetime NULL,
	`del_flag` character varying(64) COLLATE `ci_x_icu` NULL DEFAULT '0'::varchar,
	CONSTRAINT `PRIMARY_8F87C9E1` PRIMARY KEY (id)
)TABLESPACE sys_default;

-- 对应insert语句:
-- 多条
INSERT INTO `flow_node_expand` (
    `id`,
    `default_assignee`,
    `create_by`,
    `create_date`,
    `update_by`,
    `update_date`,
    `del_flag`
) VALUES 
    ('test001', 'admin', 'system', NOW(), 'system', NOW(), '0'),
    ('test002', 'user1', 'admin', NOW(), 'admin', NOW(), '0'),
    ('test003', 'user2', 'admin', NOW() - INTERVAL '1 DAY' , 'admin', NOW(), '1');

-- 单条:
INSERT INTO `flow_node_expand` (
    `id`,
    `default_assignee`,
    `create_by`,
    `create_date`,
    `update_by`,
    `update_date`,
    `del_flag`
) VALUES (
    'test001',
    'admin',
    'system',
    NOW(),
    'system',
    NOW(),
    '0'
);

当用户kingbase source 实时增量配置页面中位点配置为initial时,flink taskmanager日志中会出现如下异常,全量同步阶段create_date,update_date都无法同步到目标端,其他字段可以正常同步。后续增量同步全部失效。

2025-11-28 11:12:45,054 ERROR io.debezium.relational.TableSchemaBuilder                    [] - Failed to properly convert data value for 'public.flow_node_expand.update_date' of type datetime for row [test001, admin, system, 2025-11-28T10:33:21.000+0800, system, 2025-11-28T10:33:21.000+0800, 0]:
java.lang.UnsupportedOperationException: null
        at com.qlangtech.plugins.incr.flink.cdc.pglike.KingBaseDateTimeConverter.convertDateTime(KingBaseDateTimeConverter.java:64) ~[tis-flink-cdc-kingbase-plugin.jar:?]
        at io.debezium.relational.CustomConverterRegistry.lambda$getValueConverter$0(CustomConverterRegistry.java:150) ~[tis-flink-cdc-common-plugin-shade-4-debuezium-4.3.0.jar:?]
        at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:297) ~[tis-flink-cdc-common-plugin-shade-4-debuezium-4.3.0.jar:?]
        at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) ~[tis-flink-cdc-common-plugin-shade-4-debuezium-4.3.0.jar:?]
        at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:111) ~[tis-flink-cdc-common-plugin-shade-4-debuezium-4.3.0.jar:?]
        at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:57) ~[tis-flink-cdc-common-plugin-shade-4-debuezium-4.3.0.jar:?]
        at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155) ~[tis-flink-cdc-common-plugin-shade-4-debuezium-4.3.0.jar:?]
        at org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.createDataEventsForTable(PostgresScanFetchTask.java:340) ~[tis-flink-cdc-kingbase-shade-4-debezium-connector-postgresql-4.3.0.jar:?]
        at org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.createDataEvents(PostgresScanFetchTask.java:265) ~[tis-flink-cdc-kingbase-shade-4-debezium-connector-postgresql-4.3.0.jar:?]
        at org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.doExecute(PostgresScanFetchTask.java:255) ~[tis-flink-cdc-kingbase-shade-4-debezium-connector-postgresql-4.3.0.jar:?]
        at org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.doExecute(PostgresScanFetchTask.java:211) ~[tis-flink-cdc-kingbase-shade-4-debezium-connector-postgresql-4.3.0.jar:?]
        at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76) ~[tis-flink-cdc-common-plugin-shade-4-debuezium-4.3.0.jar:?]
        at org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask.executeDataSnapshot(PostgresScanFetchTask.java:112) ~[tis-flink-cdc-kingbase-shade-4-debezium-connector-postgresql-4.3.0.jar:?]
        at org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:71) ~[tis-flink-cdc-kingbase-shade-4-debezium-connector-postgresql-4.3.0.jar:?]
        at org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask.execute(PostgresScanFetchTask.java:86) ~[tis-flink-cdc-kingbase-shade-4-debezium-connector-postgresql-4.3.0.jar:?]
        at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:100) ~[tis-flink-cdc-kingbase-shade-4-debezium-connector-postgresql-4.3.0.jar:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:834) [?:?]
Image Image Image

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions