• 文档
  • 控制台
  • 登录
  • 立即注册
    目前不支持用户自主注册,如需注册账号,请联系400-080-1100
数据传输服务DTS用户指南
最近更新时间:

6 数据订阅

6.1 简介

数据订阅功能可以实现获取关系数据库的实时增量数据,用户能够根据自身业务需求自由消费增量数据,例如实现缓存更新策略、业务异步解耦、异构数据源数据实时同步及含复杂 ETL 的数据实时同步等多种业务场景。

数据订阅的订阅对象应支持库和表。用户可以根据需要订阅某几个表的增量数据。

6.2 准备数据库账号

在配置数据订阅任务时,需要分别设置源数据库和目标数据库的数据库账号,该账号将用于数据订阅。因为数据库类型的不同,对数据库账号权限的要求有所区别。因此,请在设置订阅任务前,完成数据库账号的创建和授权。

源库的数据库账号所需权限

1.png

6.3 创建和管理订阅任务

6.3.1 操作步骤

1. 开通数据传输服务

(1) 登录 CECSTACK 专属云控制台。

(2) 单击左侧菜单栏777.png,选择“产品与服务 > 数据传输服务 DTS”,进入服务列表页面。

(3) 单击页面右上角的“开通数据传输服务”,进入数据传输服务开通页面。

1.png

1.png

(4) 在参数配置页,按需配置相应参数,完成后单击“免费创建”。

(5) 在确认配置页面,确认要购买产品的配置信息无误后,单击“免费创建”即可开通数据传输服务。

(6) 数据传输服务开通后,显示于服务列表中。

1.png

2. 创建订阅任务

(1) 在服务列表页,单击操作列“查看服务”,进入实例详情页面。

(2) 单击页面右上角“创建任务”,进入创建新任务页面。

2.png

(3) 配置数据库信息。

3.png

1.png

2.png

(4) 数据库信息配置完成后,单击“下一步”,配置任务对象。

1.png

(5) 勾选订阅对象,单击“下一步”,进入“预检查与启动”页面。

1.png

数据订阅任务正式启动前,会对网络和权限进行预检查。只有检查通过后,才能执行数据订阅任务。

(6) 预检查通过后,单击“启动”,订阅任务开始执行。

3. 查看订阅任务

(1) 单击左侧导航栏“任务列表”,进入任务列表页面。

(2) 在任务列表页,输入任务名称单击搜索图标,查询目标任务。

(3) 订阅任务状态显示“任务运行中”,则表示任务运行正常。

1.png

(4) 您可登录数据库客户端,查看数据订阅结果。

4. 停止订阅任务

订阅任务不会自动结束,您需要手动结束订阅任务。

(5) 单击左侧导航栏“任务列表”,进入任务列表页面。

(6) 选择待停止的任务,单击操作列“停止”。

1.png

(7) 在弹出的停止任务确认框中,单击“确定”,即可手动结束订阅任务。

6.4 消费订阅数据

6.4.1 使用 Kafka 客户端消费订阅数据

6.4.1.1 任务简介

当订阅任务创建完成后,DTS 开始将源库的增量数据复制到 Kafka 中,您可以根据任务列表中的Kafka 地址和 Topic 进行消费。如果采用 auto commit(自动提交),可能会因为数据还没被消费完

就执行了提交操作,从而丢失部分数据,建议采用手动提交的方式以避免该问题。

Kafka 中的数据以 protobuf 进行格式化,您可以通过电子云提供的 protobuf idl 编译成各种不同编程语言的数据结构定义。

6.4.1.2 订阅数据的数据结构

syntax = "proto3";

package com.cestc.dts.binlog.protocol;


option java_package = "com.cestc.dts.binlog.protocol.lib";

option java_outer_classname = "Entry";

option optimize_for = SPEED;


/****************************************************************

* message model

*如果要在 Enum 中新增类型,确保以前的类型的下标值不变.

****************************************************************/

message Entry {

     /**协议头部信息**/

    Header                                 header                           = 1;

    ///**打散后的事件类型**/  [default = ROWDATA]

    oneof entryType_present{

            EntryType                      entryType                      = 2;

    }


    /**传输的二进制数组**/

    bytes                               storeValue                    = 3;

}


/**message Header**/

message Header {

    /**协议的版本号**/ //[default = 1]

    oneof version_present {

            int32                        version                          = 1;

    }


    /**binlog/redolog 文件名**/

    string                           logfileName                 = 2;


    /**binlog/redolog 文件的偏移位置**/

    int64                            logfileOffset                 = 3;


    /**服务端 serverId**/

    int64                            serverId                    = 4;


    /** 变更数据的编码 **/

    string                               serverenCode       = 5;


    /**变更数据的执行时间 **/

    int64                                 executeTime                = 6;


    /** 变更数据的来源**/ //[default = MYSQL]

    oneof sourceType_present {

    Type sourceType                                                   = 7;

    }


    /** 变更数据的 schemaname**/

    string                                       schemaName         = 8;

    /**变更数据的 tablename**/

    string                                       tableName             = 9;


    /**每个 event 的长度**/

    int64                                        eventLength           = 10;


    /**数据变更类型**/ // [default = UPDATE]

    oneof eventType_present {

            EventType                              eventType                  = 11;

    }


    /**预留扩展**/

    repeated Pair                                          props                          = 12;


     /**当前事务的 gitd**/

    string                               gtid                                = 13;

}


/**每个字段的数据结构**/

message Column {

    /**字段下标**/

    int32               index                     =            1;


    /**字段 java 中类型**/

    int32                sqlType                 =            2;


    /**字段名称(忽略大小写),在 mysql 中是没有的**/

    string               name              =             3;


    /**是否是主键**/

    bool                 isKey               =             4;


    /**如果 EventType=UPDATE,用于标识这个字段值是否有修改**/

    bool          updated                =                5;


    /** 标识是否为空  **/ //[default = false]

    oneof isNull_present {

            bool         isNull                   =                  6;

    }



    /**预留扩展**/

    repeated  Pair                      props                     =                 7;


    /** 字段值,timestamp,Datetime 是一个时间格式的文本 **/

    string               value                   =               8;


    /** 对应数据对象原始长度 **/

    int32                length                 =               9;


    /**字段 mysql 类型**/

    string                mysqlType         =              10;

}


message RowData {


    /** 字段信息,增量数据(修改前,删除前) **/

    repeated Column                   beforeColumns       =            1;


    /** 字段信息,增量数据(修改后,新增后) **/

    repeated Column                   afterColumns =                  2;


    /**预留扩展**/

    repeated Pair                          props                       =          3;

}


 /**message row 每行变更数据的数据结构**/

message RowChange {


    /**tableId,由数据库产生**/

    int64                        tableId                     =                   1;


    /**数据变更类型**/ //[default = UPDATE]

    oneof eventType_present {

             EventType               eventType                  =                 2;

    }



    /** 标识是否是 ddl 语句 **/ // [default = false]

    oneof isDdl_present {

            bool                  isDdl                        =                        10;

    }


    /** ddl/query 的 sql 语句 **/

    string                        sql                         =                   11;


    /** 一次数据库变更可能存在多行 **/

    repeated RowData                rowDatas              =                     12;


    /**预留扩展**/

    repeated Pair                             props                              =                    13;

    /** ddl/query 的 schemaName,会存在跨库 ddl,需要保留执行 ddl 的当前 schemaName **/

    string                     ddlSchemaName           =                  14;

}


/**开始事务的一些信息**/

message TransactionBegin{


    /**已废弃,请使用 header 里的 executeTime**/

    int64                           executeTime            =            1;


    /**已废弃,Begin 里不提供事务 id**/

    string                          transactionId           =             2;


    /**预留扩展**/

    repeated Pair                          props                          =                     3;


    /**执行的 thread Id**/

    int64                           threadId               =              4;

}


/**结束事务的一些信息**/

message TransactionEnd{


    /**已废弃,请使用 header 里的 executeTime**/

    int64                           executeTime                =          1;


    /**事务号**

    string t                        ransactionId                =           2;


    /**预留扩展**/

    repeated Pair                           props                        =                   3;

}

/**预留扩展**/

message Pair{

    string                    key                       =                      1;

    string                    value                    =                      2;

}


/**打散后的事件类型,主要用于标识事务的开始,变更数据,结束**/

enum EntryType{

    ENTRYTYPECOMPATIBLEPROTO2 = 0;

    TRANSACTIONBEGIN                  =             1;

    ROWDATA                                   =             2;

    TRANSACTIONEND                     =             3;

    /** 心跳类型,内部使用,外部暂不可见,可忽略 **/

    HEARTBEAT                                 =             4;

    GTIDLOG                                  =               5;

}


/** 事件类型 **/

enum EventType {


    EVENTTYPECOMPATIBLEPROTO2 = 0;

        INSERT                   =                1;

        UPDATE                  =                2;

        DELETE                   =                3;

       CREATE         =                   4;

       ALTER            =                   5;

       ERASE            =                   6;

       QUERY           =                   7;

       TRUNCATE     =                  8;

       RENAME                  =               9;

       /**CREATE INDEX**/

       CINDEX           =                 10;

       DINDEX                    =               11;

       GTID                    =               12;

       /** XA **/

       XACOMMIT         =               13;

       XAROLLBACK       =              14;

       /** MASTER HEARTBEAT **/

       MHEARTBEAT      =                15;

}


/**数据库类型**/

enum Type {

     TYPECOMPATIBLEPROTO2 = 0;

        ORACLE          =             1;

         MYSQL           =             2;

         PGSQL            =             3;

}


图6-1 Entry 数据结构

1.png

图6-2 Header 数据结构

2.png

图6-3 RowChange 数据结构

1.png

图6-4 RowData 数据结构

1.png

图6-5 Column 数据结构

1.png

意见反馈

文档内容是否对您有帮助?

如您有其他疑问,您也可以通过在线客服来与我们联系探讨 在线客服

联系我们
回到顶部