一、CDC

CDC (Change Data Capture) ,在广义的概念上,只要能捕获数据变更的技术,都可以称为 CDC 。但通常我们说的CDC 技术主要面向数据库(包括常见的mysql,Oracle, MongoDB等)的变更,是一种用于捕获数据库中数据变更的技术。

二、常见CDC的比较

常见的主要包括Flink CDC,DataX,Canal,Sqoop,Kettle,Oracle Goldengate,Debezium等。

  • DataX,Sqoop和kettle的CDC实现技术主要是基于查询的方式实现的,通过离线调度查询作业,实现批处理请求。这种作业方式无法保证数据的一致性,实时性也较差。
  • Flink CDC,Canal,Debezium和Oracle Goldengate是基于日志的CDC技术。这种技术,利用流处理的方式,实时处理日志数据,保证了数据的一致性,为其他服务提供了实时数据。

三、Flink CDC

2020年 Flink cdc 首次在 Flink forward 大会上官宣, 由 Jark Wu & Qingsheng Ren 两位大佬提出。

Flink CDC connector 可以捕获在一个或多个表中发生的所有变更。该模式通常有一个前记录和一个后记录。Flink CDC connector 可以直接在Flink中以非约束模式(流)使用,而不需要使用类似 kafka 之类的中间件中转数据。

四、Flink CDC支持的数据库

PS:

Flink CDC 2.2才新增OceanBase,PolarDB-X,SqlServer,TiDB 四种数据源接入,均支持全量和增量一体化同步。

截止到目前FlinkCDC已经支持12 数据源。

五、阿里实现的FlinkCDC使用示例

依赖引入

    <!-- flink table支持 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- 阿里实现的flink mysql CDC -->
    <dependency>
      <groupId>com.alibaba.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>1.4.0</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.28</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.80</version>
    </dependency>
    <!-- jackson报错解决 -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>${jackson.version}</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>${jackson.version}</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-parameter-names</artifactId>
      <version>${jackson.version}</version>
    </dependency>

基于table

package spendreport.cdc;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import java.util.List;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
;
/**
 * @author zhengwen
 **/
public class TestMySqlFlinkCDC {
  public static void main(String[] args) throws Exception {
    //1.创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    //2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传, 需要从 Checkpoint 或者 Savepoint 启动程序
    //2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
    env.enableCheckpointing(5000L);
    //2.2 指定 CK 的一致性语义
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    //2.3 设置任务关闭的时候保留最后一次 CK 数据
    env.getCheckpointConfig().enableExternalizedCheckpoints(
        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    //2.4 指定从 CK 自动重启策略
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
    DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
        .hostname("127.0.0.1")
        .serverTimeZone("GMT 8")  //时区报错增加这个设置
        .port(3306)
        .username("root")
        .password("123456")
        .databaseList("wz")
        .tableList("wz.user_info")  //注意表一定要写库名.表名这种,多个,隔开
        .startupOptions(StartupOptions.initial())
        //自定义转json格式化
        .deserializer(new MyJsonDebeziumDeserializationSchema())
        //自带string格式序列化
        //.deserializer(new StringDebeziumDeserializationSchema())
        .build();
    DataStreamSource<String> streamSource = env.addSource(sourceFunction);
    //TODO 可以keyBy,比如根据table或type,然后开窗处理
    //3.打印数据
    streamSource.print();
    //streamSource.addSink(); 输出
    //4.执行任务
    env.execute("flinkTableCDC");
  }
  private static class MyJsonDebeziumDeserializationSchema implements
      com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema<String> {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector)
        throws Exception {
      Struct value = (Struct) sourceRecord.value();
      Struct source = value.getStruct("source");
      //获取数据库名称
      String db = source.getString("db");
      String table = source.getString("table");
      //获取数据类型
      String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
      if (type.equals("create")) {
        type = "insert";
      }
      JSONObject jsonObject = new JSONObject();
      jsonObject.put("database", db);
      jsonObject.put("table", table);
      jsonObject.put("type", type);
      //获取数据data
      Struct after = value.getStruct("after");
      JSONObject dataJson = new JSONObject();
      List<Field> fields = after.schema().fields();
      for (Field field : fields) {
        String field_name = field.name();
        Object fieldValue = after.get(field);
        dataJson.put(field_name, fieldValue);
      }
      jsonObject.put("data", dataJson);
      collector.collect(JSONObject.toJSONString(jsonObject));
    }
    @Override
    public TypeInformation<String> getProducedType() {
      return BasicTypeInfo.STRING_TYPE_INFO;
    }
  }
}

运行效果

PS:

  • 操作数据库的增删改就会立马触发
  • 这里是自定义的序列化转json格式字符串,自带的字符串序列化也是可以的(可以自己试试打印的内容)

基于sql

package spendreport.cdc;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
 * @author zhengwen
 **/
public class TestMySqlFlinkCDC2 {
  public static void main(String[] args) throws Exception {
    //1.创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    //2.创建 Flink-MySQL-CDC 的 Source
    String connectorName = "mysql-cdc";
    String dbHostName = "127.0.0.1";
    String dbPort = "3306";
    String dbUsername = "root";
    String dbPassword = "123456";
    String dbDatabaseName = "wz";
    String dbTableName = "user_info";
    String tableSql = "CREATE TABLE t_user_info ("
          "id int,mobile varchar(20),"
          "user_name varchar(30),"
          "real_name varchar(60),"
          "id_card varchar(20),"
          "org_name varchar(100),"
          "user_stars int,"
          "create_by int,"
        //   "create_time datetime,"
          "update_by int,"
        //   "update_time datetime,"
          "is_deleted int) "
          " WITH ("
          " 'connector' = '"   connectorName   "',"
          " 'hostname' = '"   dbHostName   "',"
          " 'port' = '"   dbPort   "',"
          " 'username' = '"   dbUsername   "',"
          " 'password' = '"   dbPassword   "',"
          " 'database-name' = '"   dbDatabaseName   "',"
          " 'table-name' = '"   dbTableName   "'"
          ")";
    tableEnv.executeSql(tableSql);
    tableEnv.executeSql("select * from t_user_info").print();
    env.execute();
  }
}

运行效果:

总结

既然是基于日志,那么数据库的配置文件肯定要开启日志功能,这里mysql需要开启内容

server-id=1
log_bin=mysql-bin
binlog_format=ROW  #目前还只能支持行
expire_logs_days=30
binlog_do_db=wz #这里binlog的库如果有多个就再写一行,千万不要写成用,隔开

  • 实时性确实高,比那些自动任务定时取体验号百倍
  • 流示的确实丝滑

最后肯定证明这种方式同步数据可行,而且实时性特高,但是就是不知道我们的目标数据库是否可以开启这些日志配置。UP!

到此这篇关于Flink流处理引擎零基础速通之数据的抽取篇的文章就介绍到这了,更多相关Flink数据的抽取内容请搜索Devmax以前的文章或继续浏览下面的相关文章希望大家以后多多支持Devmax!

Flink流处理引擎零基础速通之数据的抽取篇的更多相关文章

  1. NT IIS下用ODBC连接数据库

    $connection=intodbc_connect建立数据库连接,$query_string="查询记录的条件"如:$query_string="select*fromtable"用$cur=intodbc_exec检索数据库,将记录集放入$cur变量中。再用while{$var1=odbc_result;$var2=odbc_result;...}读取odbc_exec()返回的数据集$cur。最后是odbc_close关闭数据库的连接。odbc_result()函数是取当前记录的指定字段值。

  2. Thinkphp5框架实现获取数据库数据到视图的方法

    这篇文章主要介绍了Thinkphp5框架实现获取数据库数据到视图的方法,涉及thinkPHP5数据库配置、读取、模型操作及视图调用相关操作技巧,需要的朋友可以参考下

  3. 如何在PHP环境中使用ProtoBuf数据格式

    这篇文章主要介绍了如何在PHP环境中使用ProtoBuf数据格式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

  4. Python爬取奶茶店数据分析哪家最好喝以及性价比

    这篇文章主要介绍了用Python告诉你奶茶哪家最好喝性价比最高,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧

  5. Android本地存储方法浅析介绍

    这篇文章主要介绍了Android本地存储案例,方法简单可以实现存储并达到节省内存的效果,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧

  6. 详解Python如何实现Excel数据读取和写入

    这篇文章主要为大家详细介绍了python如何实现对EXCEL数据进行读取和写入,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  7. Python自动化办公之Excel数据的写入

    这篇文章主要为大家详细介绍一下Python中excel的写入模块- xlsxwriter,并利用该模块实现Excel数据的写入,感兴趣的小伙伴可以了解一下

  8. Python图像运算之图像阈值化处理详解

    这篇文章将详细讲解图像阈值化处理,涉及阈值化处理、固定阈值化处理和自适应阈值化处理,这是图像边缘检测或图像增强等处理的基础,感兴趣的可以了解一下

  9. JavaScript数据扁平化详解

    这篇文章主要为大家介绍了JavaScript数据扁平化,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助

  10. Python数据分析之PMI数据图形展示

    这篇文章主要介绍了Python数据分析之PMI数据图形展示,文章介绍了简单的python爬虫,并使用numpy进行了简单的数据处理,最终使用 matplotlib 进行图形绘制,实现了直观的方式展示制造业和非制造业指数图形,需要的朋友可以参考一下

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部