网络知识 娱乐 【技术种草】我用 1个肉夹馍的钱,搭了整套大数据系统

【技术种草】我用 1个肉夹馍的钱,搭了整套大数据系统

双十一大促,作为一个羊毛党怎么能不参与呢。然后我打算来腾讯云薅一薅羊毛。

下面我分享一下如何用 1 个肉夹馍的钱来搭建一套云上的大数据平台。经过本人反复的钻研,发现薅羊毛这件事简直是太简单了。最后买 MySQL 19.9元,流计算 Oceanus(Flink) 1 元,花了二十几块钱,搭建了这样式的大数据系统。

架构图:

MySQL + Flink + ES

下面就是我薅羊毛的具体步骤:

1. 找到腾讯云双十一活动主页。

2. 购买 MySQL。

在秒杀活动中找到 MySQL, 19块9居然可以用 1 年。

这里尝试一下购买,发现需要选择可用区。这里留了个心眼,选择可用区的时候,需要 MySQL、Flink(流计算 Oceanus) 集群和 ES 集群选择同一可用区。

3. 购买 Flink 集群。

腾讯云的 Flink 平台叫流计算 Oceanus。在双十一活动主页发现 1000 多,幸亏留了个心眼,在流计算 Oceanus 产品主页发现了新用户 1 元即可购买集群。

然后购买集群,然后发现下单过程中还得花 1 块钱买 COS 存储集群,不过也不贵顺便就买了。

4. 购买 Elasticsearch 集群

腾讯云的 ES在双十一这里发现也没有足够多的优惠,而且以企业用户为主。然后也去 ES 产品主页逛了逛,发现居然新用户可以 0 元购买!这不就是免费送么,搞起。

5. 系统搭建。

这样就用肉夹馍的钱(19.9 元买 MySQL + 1元 Flink(流计算 Oceanus) 集群 + 1元 COS 集群 + 0 元 ES 集群)买了一整套大数据组件。

前置准备

1. MySQL 集群准备

1.1 新建 MySQL 集群

进入 MySQL 控制台[1],在【数据库管理】> 【参数设置】中设置参数 binlog_row_image=FULL,便于使用 CDC(Capture Data Change)特性,实现数据的变更实时捕获。

1.2 准备数据

首先创建 testdb 库,并在 testdb 库中创建用户 user 表,并插入数据。

user 表结构:

字段名

类型

含义

user_id

int

用户ID

user_name

varchar(50)

用户名

create_time

timestamp

创建时间

在表中插入2条数据。

INSERT INTO `user` (`user_id`, `user_name`, `create_time`) VALUES (1001, '小明', '2021-10-01 00:00:00');
INSERT INTO `user` (`user_id`, `user_name`, `create_time`) VALUES (1002, 'TONY', '2021-10-02 00:00:00');

1.3 设置参数

点击实例 ID,在实例详情页面点击【数据库管理】进入【参数设置】面板,设置binlog_row_image=FULL来开启数据库变化的同步。

通过MySQL集成数据到 Oceanus (Flink) 集群,可以使用flink-connector-jdbc或者flink-connector-mysq-cdc。使用MySQL-cdc特性时,flink-connector-mysq-cdc 连接器需要设置 MySQL 数据库的参数 binlog_row_image=FULL。

1. 创建 Source

CREATE TABLE `user_source` (
    `user_id` int,
    `user_name` varchar(50),
    PRIMARY KEY (`user_id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
    'connector' = 'mysql-cdc',      -- 必须为 'mysql-cdc'
    'hostname' = '10.0.0.158',      -- 数据库的 IP
    'port' = '3306',                -- 数据库的访问端口
    'username' = 'root',            -- 数据库访问的用户名(需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 权限)
    'password' = 'yourpassword',    -- 数据库访问的密码
    'database-name' = 'testdb',     -- 需要同步的数据库
    'table-name' = 'user'           -- 需要同步的数据表名
);

2. 创建 Sink

-- Elasticsearch 只能作为数据目的表(Sink)写入
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

CREATE TABLE es_sink (
    `user_id` INT,
    `user_name` VARCHAR
) WITH (
    'connector.type' = 'elasticsearch',    -- 输出到 Elasticsearch
    'connector.version' = '6',             -- 指定 Elasticsearch 的版本, 例如 '6', '7'. 
    'connector.hosts' = 'http://10.0.0.175:9200', 
    'connector.index' = 'User',     
    'connector.document-type' = 'user',  
    'connector.username' = 'elastic',  
    'connector.password' = 'yourpassword', 

    'update-mode' = 'upsert',              -- 捕捉数据库变化时,需使用 'upsert' 模式 
    'connector.key-delimiter' = '$',       -- 可选参数, 复合主键的连接字符 (默认是 _ 符号)
    'connector.key-null-literal' = 'n/a',  -- 主键为 null 时的替代字符串,默认是 'null'
    'connector.connection-max-retry-timeout' = '300', -- 每次请求的最大超时时间 (ms)
    'format.type' = 'json'                 -- 输出数据格式, 目前只支持 'json'
);

3. 编写业务 SQL

insert into es_sink
(
    select user_id,
    LOWER(user_name) -- LOWER()函数会将用户名转换为小写
    from user_source
);

4. 选择 Connector

点击【保存】>【发布草稿】运行作业。

请根据实际购买的 Elasticsearch 版本选择对应的 Connector ,1.13 版本之后无需选择可自动匹配 Connector。

5. 数据查询

进入 Elasticsearch 控制台[5],点击之前购买的 Elasticsearch 实例,点击右上角【Kibana】,进入 Kibana 查询数据。具体查询方法请参考通过 Kibana 访问集群[7]。

总结

这套大数据系统用 MySQL 连接器持续集成数据库数据变化记录,经过流计算 Oceanus 实现最基础的数据转换功能,最后 Sink 到Elasticsearch 中。这肉夹馍的钱花的太直了,yyds!

参考阅读

1: MySQL 控制台:https://console.cloud.tencent.com/cdb

2: 创建 mysql 实例:https://cloud.tencent.com/document/product/236/46433

3: 流计算 Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

4: 创建 Oceanus 独享集群:https://cloud.tencent.com/document/product/849/48298

5: Elasticsearch 控制台:https://console.cloud.tencent.com/es

6: 创建 Elasticsearch 集群:https://cloud.tencent.com/document/product/845/19536

7: 通过 Kibana 访问集群:https://cloud.tencent.com/document/product/845/19541