Skip to content

【离线数仓】拉链表的3种构建方案

Published: at 10:46

在阐述具体的3个方案之前,我先总结下3个方案的优缺点:

方案一:

优点:思路简洁,易于维护。

缺点:初始化时会丢失历史数据的变化,维护成本较高。


方案二:

优点:思路简洁,易于维护,初始化时不会丢失历史数据的变化。

缺点:初始化周期较长,若历史数据量大,会耗费大量时间成本,同时维护成本较高。


方案三:

优点:不会丢失历史数据变化,且接近免维护。

缺点:资源占用较多,若历史数据量大,会带来较高的计算资源成本。

方案一:不考虑历史变化的情况

如果所有的表都是 分区表,在将 快照表 转换为 拉链表 时,我们可以充分利用分区特性,优化查询性能并提高更新效率。分区表通过将数据按照某个字段进行逻辑分割,减少了每次查询时的数据扫描量,非常适合处理大规模数据。本文中,我们将展示如何基于分区表来完成这个转换过程。

场景描述

假设我们有一张按日期分区的 快照表 snapshot_table,其分区字段是 date,记录着每天的用户信息快照。该表的结构如下:

CREATE TABLE snapshot_table (
    id STRING,
    name STRING,
    email STRING,
    update_time TIMESTAMP
) PARTITIONED BY (date STRING)  -- 按日期分区
STORED AS ORC;

目标:将 snapshot_table 转换为拉链表 dim_user_scd

拉链表 dim_user_scd 将跟踪每个用户数据的历史变化,并且也是分区表,按照 start_date 进行分区,以优化查询和更新性能。拉链表的结构如下:

CREATE TABLE dim_user_scd (
    id STRING,
    name STRING,
    email STRING,
    start_date STRING,
    end_date STRING
) PARTITIONED BY (start_date STRING)  -- 按开始日期分区
STORED AS ORC;

实现步骤

1. 初始化拉链表(首次加载)

在初次创建拉链表时,我们需要将最新的快照数据插入到拉链表中。为了确保插入的数据记录当前有效,设置每条记录的 end_date'9999-12-31'

假设我们的快照表从 2024-09-20 开始,我们可以从该日期的分区中加载数据到拉链表:

-- 假设快照表的日期是2024-09-20
INSERT INTO dim_user_scd PARTITION (start_date='2024-09-20')
SELECT
    id,
    name,
    email,
    '2024-09-20' AS start_date,
    '9999-12-31' AS end_date
FROM snapshot_table
WHERE date = '2024-09-20';

这一步将快照表的最新数据导入拉链表,并将其设置为当前有效。


2. 增量更新拉链表

随着每天快照表的更新,我们需要增量更新拉链表,确保数据的历史变化被记录。为了实现增量更新,我们可以依赖两个步骤:

2.1 更新已存在但发生变化的记录

首先,我们更新拉链表中已经存在但发生变化的记录,将其 end_date 更新为当前日期,表示这些记录不再有效。

WITH snapshot AS (
    SELECT
        id,
        name,
        email
    FROM snapshot_table
    WHERE date = '2024-09-21'  -- 假设这是增量数据
),

scd_active AS (
    SELECT
        id,
        name,
        email,
        start_date,
        end_date
    FROM dim_user_scd
    WHERE end_date = '9999-12-31'  -- 当前有效的记录
)

-- 更新已存在但数据变化的记录
INSERT INTO dim_user_scd PARTITION (start_date)
SELECT
    scd.id,
    scd.name,
    scd.email,
    scd.start_date,
    '2024-09-21' AS end_date  -- 结束日期更新为当前日期
FROM scd_active scd
JOIN snapshot sn
ON scd.id = sn.id
WHERE (scd.name != sn.name OR scd.email != sn.email);

这一步会将当前有效的数据(end_date = '9999-12-31')中发生变化的记录,更新为无效(将 end_date 更新为当前日期)。

2.2 插入新增或变化后的新记录

接下来,插入新的或更新后的记录,这些记录表示最新的用户信息,并且设置新的 start_dateend_date

-- 插入新的或变化后的记录
INSERT INTO dim_user_scd PARTITION (start_date='2024-09-21')
SELECT
    sn.id,
    sn.name,
    sn.email,
    '2024-09-21' AS start_date,   -- 当前日期作为生效日期
    '9999-12-31' AS end_date      -- end_date 设置为未来日期,表示当前有效
FROM snapshot sn
LEFT JOIN scd_active scd
ON sn.id = scd.id
WHERE scd.id IS NULL              -- 新增的记录
   OR (scd.name != sn.name OR scd.email != sn.email);  -- 发生变化的记录

这一步将处理发生变化的记录或新增的记录,将其插入到拉链表中,并设置其 start_date 为当前日期,end_date'9999-12-31'


3. 分区优化

分区表可以极大提高数据查询和更新的效率,因此我们可以考虑使用以下策略来进一步优化:

-- 启用动态分区插入
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

-- 使用动态分区插入更新后的记录
INSERT INTO dim_user_scd PARTITION (start_date)
SELECT
    sn.id,
    sn.name,
    sn.email,
    '2024-09-21' AS start_date,  -- 根据增量数据的日期动态创建分区
    '9999-12-31' AS end_date
FROM snapshot sn;

通过启用动态分区,Hive 会自动为新的 start_date 创建对应的分区,避免手动维护分区。


4. 示例执行流程

假设快照表 snapshot_table 每天更新一次,并且每天的增量数据会被插入到拉链表中。

4.1 首次加载

快照表 2024-09-20 的数据:

-- 快照表 2024-09-20 的数据
id      | name       | email               | date
---------------------------------------------------
1       | Alice      | alice@gmail.com      | 2024-09-20
2       | Bob        | bob@gmail.com        | 2024-09-20
3       | Carol      | carol@gmail.com      | 2024-09-20

首次加载后,拉链表 dim_user_scd 的数据:

-- 拉链表 dim_user_scd(2024-09-20 初始加载)
id      | name       | email               | start_date   | end_date
---------------------------------------------------------------------
1       | Alice      | alice@gmail.com      | 2024-09-20   | 9999-12-31
2       | Bob        | bob@gmail.com        | 2024-09-20   | 9999-12-31
3       | Carol      | carol@gmail.com      | 2024-09-20   | 9999-12-31

4.2 第二天增量更新

快照表 2024-09-21 的数据:

-- 快照表 2024-09-21 的数据
id      | name       | email               | date
---------------------------------------------------
1       | Alice      | alice_new@gmail.com  | 2024-09-21  -- Alice 的 email 发生变化
2       | Bob        | bob@gmail.com        | 2024-09-21  -- 无变化
4       | David      | david@gmail.com      | 2024-09-21  -- 新增用户

增量更新后,拉链表 dim_user_scd 的数据:

-- 拉链表 dim_user_scd(更新后)
id      | name       | email               | start_date   | end_date
---------------------------------------------------------------------
1       | Alice      | alice@gmail.com      | 2024-09-20   | 2024-09-21  -- 原始记录失效
1       | Alice      | alice_new@gmail.com  | 2024-09-21   | 9999-12-31  -- 新记录生效
2       | Bob        | bob@gmail.com        | 2024-09-20   | 9999-12

这个方案的缺点在于历史数据的变化会丢失,也就是我们在做初始化的时候,拉链表的数据只有一种情况,那就是全部的end_date都是9999-12-31,第二天(2024-09-25)数据更新后,也只会出现end_date = 2024-09-25的情况,如果快照表分区的开始时间是2023-09-25,那么过去一年的历史变化都不会在拉链表中体现;

方案二:初始化时候考虑数据的历史变化

问题描述

基于方案一:在将快照表转换为拉链表时,如果直接从快照表的最新分区加载数据,我们只会捕获当前的数据状态,而忽略过去的变化记录。为了正确构建拉链表,我们需要遍历快照表中的所有分区,按日期顺序依次处理每个分区的数据,记录每一天的变化。

解决方案

我们需要对快照表的所有分区进行遍历处理,将每个分区的数据按时间顺序加载到拉链表中,并在插入新的变化记录时适当地更新历史记录的结束时间。

实现步骤

1. 初始化拉链表

首先,创建一个空的拉链表 dim_user_scd,并设置 start_dateend_date 来记录每条记录的生效和失效日期。

CREATE TABLE IF NOT EXISTS dim_user_scd (
    id STRING,
    name STRING,
    email STRING,
    start_date STRING,
    end_date STRING
) PARTITIONED BY (start_date STRING)  -- 按开始日期分区
STORED AS ORC;

2. 按日期顺序处理快照表分区

为了保留历史变化记录,我们需要按顺序处理快照表的每个分区,并将变化记录插入拉链表中。首先,获取快照表的所有分区,然后依次处理每个分区的数据。

-- 获取快照表的所有分区
SHOW PARTITIONS snapshot_table;

假设返回的分区是:

date=2024-09-20
date=2024-09-21
date=2024-09-22

我们需要按日期顺序处理每个分区的记录。

3. 处理每个分区的数据

对于每个分区,我们需要进行以下几步操作:

3.1 更新已有记录的结束时间

对于每个分区,首先查找那些在拉链表中已经存在但数据发生变化的记录,并将其 end_date 更新为当前分区的日期,表示这些记录不再有效。

WITH snapshot AS (
    SELECT
        id,
        name,
        email
    FROM snapshot_table
    WHERE date = '2024-09-21'  -- 当前处理的分区
),

scd_active AS (
    SELECT
        id,
        name,
        email,
        start_date,
        end_date
    FROM dim_user_scd
    WHERE end_date = '9999-12-31'  -- 查找当前有效的记录
)

-- 更新拉链表中已经存在但数据发生变化的记录
INSERT INTO dim_user_scd PARTITION (start_date)
SELECT
    scd.id,
    scd.name,
    scd.email,
    scd.start_date,
    '2024-09-21' AS end_date  -- 当前分区的日期为结束日期
FROM scd_active scd
JOIN snapshot sn
ON scd.id = sn.id
WHERE (scd.name != sn.name OR scd.email != sn.email);
3.2 插入新的或变化的记录

接下来,我们插入那些在当前分区内新增的或发生变化的记录,将其 start_date 设置为当前分区的日期,表示这些记录自该日期开始生效。

-- 插入新的或变化的记录
INSERT INTO dim_user_scd PARTITION (start_date='2024-09-21')
SELECT
    sn.id,
    sn.name,
    sn.email,
    '2024-09-21' AS start_date,   -- 当前分区的日期为生效日期
    '9999-12-31' AS end_date      -- 设置 end_date 为未来日期,表示当前有效
FROM snapshot sn
LEFT JOIN scd_active scd
ON sn.id = scd.id
WHERE scd.id IS NULL              -- 新增的记录
   OR (scd.name != sn.name OR scd.email != sn.email);  -- 发生变化的记录

4. 动态处理所有分区

为了自动化地处理每个分区,可以通过编写一个 Hive 脚本,依次处理每个日期的分区。伪代码如下:

for partition in $(hive -e "SHOW PARTITIONS snapshot_table;")
do
    # 提取日期
    date=$(echo $partition | cut -d'=' -f2)

    # 执行更新和插入操作
    hive -e "
    -- 更新历史记录的 end_date
    WITH snapshot AS (
        SELECT id, name, email FROM snapshot_table WHERE date = '$date'
    ),
    scd_active AS (
        SELECT id, name, email, start_date, end_date FROM dim_user_scd WHERE end_date = '9999-12-31'
    )
    INSERT INTO dim_user_scd PARTITION (start_date)
    SELECT scd.id, scd.name, scd.email, scd.start_date, '$date' AS end_date
    FROM scd_active scd
    JOIN snapshot sn ON scd.id = sn.id
    WHERE (scd.name != sn.name OR scd.email != sn.email);

    -- 插入新的或变化的记录
    INSERT INTO dim_user_scd PARTITION (start_date='$date')
    SELECT sn.id, sn.name, sn.email, '$date' AS start_date, '9999-12-31' AS end_date
    FROM snapshot sn
    LEFT JOIN scd_active scd ON sn.id = scd.id
    WHERE scd.id IS NULL OR (scd.name != sn.name OR scd.email != sn.email);
    "
done

通过此循环脚本,依次处理每个分区,并确保每一天的数据变化都会被记录到拉链表中。


5. 总结

通过上述改进后的方案,我们能够完整保留快照表从开始日期到当前日期的所有数据变化,并正确地将其转换为拉链表。具体步骤如下:

  1. 按日期顺序处理快照表的所有分区:确保每个分区的数据都会被处理,不会遗漏历史变化。
  2. 更新已有记录的结束时间:对于那些已经存在的记录,如果数据发生了变化,将其 end_date 设置为变化发生的日期。
  3. 插入新的或变化的记录:对于新增或更新后的记录,将其插入拉链表,并标记为当前有效。

这样,拉链表将保留所有数据变化的历史,并能够通过分区表优化查询性能。

但这么做也有缺点:1、初始化会耗费很长时间,因为在遍历的过程中,整个执行过程是单进程的,无法并行处理,所以会耗费相当的时间;2、在实际应用中,如果出现过去某个分区数据异常,或者快照表中某个分区异常,那么初始化动作将会重做,维护成本相当高;

方案三:保留历史数据,数据趋于免维护

这个方案是我在过去的某个项目中自创的,我认为是目前最好的方案,思路是在尽可能节约执行时间的前提下,每天初始化一遍,抛砖引玉,希望提供给大家参考。

该方案的目标是通过每日调度快照表数据,构建一张能够跟踪数据变化的拉链表,记录每条数据的有效时间段(start_dateend_date)。此方案采用全分区扫描,使用窗口函数对比相邻分区数据,避免存储重复数据,并为每条记录生成准确的生效和失效时间。

表结构

  1. 快照表 (snapshot_table)

    • name:用户名称
    • email:用户电子邮件
    • date:日期(分区字段)
  2. 拉链表 (dim_user_scd)

    • name:用户名称
    • email:用户电子邮件
    • start_date:数据生效日期
    • end_date:数据失效日期

方案流程

步骤 1:获取快照表所有分区数据

每天从快照表获取所有按 date 分区的数据,通常通过调度系统触发。

-- 获取快照表的所有分区
SHOW PARTITIONS snapshot_table;

步骤 2:对比相邻分区数据,剔除重复的记录

  1. 窗口函数 LAGMAX 的使用

    • 对每个 name 的数据按 date 升序排列。
    • 使用 LAG(email) 来获取上一行的 email 以对比变化。
    • 使用 MAX(date) 获取同一 name 的最大 date
  2. 剔除无变化的数据

    • 如果 email 没有发生变化,则不再保留这条记录。
    • 保留有变化的数据,包括上一行的 emaildate 作为 prev_emailprev_date
WITH ordered_data AS (
    SELECT 
        name,
        email,
        date, 
        LAG(email) OVER (PARTITION BY name ORDER BY date) AS prev_email,  -- 获取上一行的 email
        LAG(date) OVER (PARTITION BY name ORDER BY date) AS prev_date,    -- 获取上一行的 date
        MAX(date) OVER (PARTITION BY name ORDER BY date) AS max_date      -- 获取同一 name 的最大 date
    FROM snapshot_table
),

filtered_data AS (
    SELECT 
        name,
        email,
        date,
        prev_email,
        prev_date,
        max_date
    FROM ordered_data
    WHERE prev_email IS NULL OR email != prev_email  -- 如果 email 不相同,保留该行数据
);

解释


步骤 3:计算 start_dateend_date

  1. 窗口函数 LEAD(prev_date) 生成 next_date
    • 使用 LEAD(prev_date) 获取下一行的 prev_date 作为当前行的 end_date
    • 如果没有下一行数据,则通过 max_date 确定 end_date
      • 如果 max_date 是当前日期,end_date 设为 '9999-12-31' 表示当前数据仍然有效。
      • 否则,将 end_date 设为 max_date,表示历史数据的失效时间。
WITH final_data AS (
    SELECT 
        name,
        email,
        date AS start_date,
        LEAD(prev_date) OVER (PARTITION BY name ORDER BY date) AS next_date,  -- 获取下一行的 prev_date 作为 end_date
        max_date
    FROM filtered_data
),

scd_data AS (
    SELECT
        name,
        email,
        start_date,
        CASE 
            WHEN next_date IS NULL THEN  -- 如果没有下一行数据
                CASE 
                    WHEN max_date = current_date() THEN '9999-12-31'  -- 如果 max_date 是今天,赋值 '9999-12-31'
                    ELSE max_date  -- 否则赋值为 max_date
                END
            ELSE next_date  -- 有下一行时,赋值为 next_date
        END AS end_date
    FROM final_data
);

解释


步骤 4:将结果插入到拉链表

将经过处理的记录插入到拉链表 dim_user_scd 中,记录每条数据的 start_dateend_date

-- 插入结果到拉链表
INSERT INTO dim_user_scd
SELECT 
    name,
    email,
    start_date,
    end_date
FROM scd_data;

解释


完整 SQL 实现

WITH ordered_data AS (
    SELECT 
        name,
        email,
        date, 
        LAG(email) OVER (PARTITION BY name ORDER BY date) AS prev_email,  -- 获取上一行的 email
        LAG(date) OVER (PARTITION BY name ORDER BY date) AS prev_date,    -- 获取上一行的 date
        MAX(date) OVER (PARTITION BY name ORDER BY date) AS max_date      -- 获取同一 name 的最大 date
    FROM snapshot_table
),

filtered_data AS (
    SELECT 
        name,
        email,
        date,
        prev_email,
        prev_date,
        max_date
    FROM ordered_data
    WHERE prev_email IS NULL OR email != prev_email  -- 如果 email 不相同,保留该行数据
),

final_data AS (
    SELECT 
        name,
        email,
        date AS start_date,
        LEAD(prev_date) OVER (PARTITION BY name ORDER BY date) AS next_date,  -- 获取下一行的 prev_date 作为 end_date
        max_date
    FROM filtered_data
),

scd_data AS (
    SELECT
        name,
        email,
        start_date,
        CASE 
            WHEN next_date IS NULL THEN  -- 如果没有下一行数据
                CASE 
                    WHEN max_date = current_date() THEN '9999-12-31'  -- 如果 max_date 是今天,赋值 '9999-12-31'
                    ELSE max_date  -- 否则赋值为 max_date
                END
            ELSE next_date  -- 有下一行时,赋值为 next_date
        END AS end_date
    FROM final_data
)

-- 插入结果到拉链表
INSERT INTO dim_user_scd
SELECT 
    name,
    email,
    start_date,
    end_date
FROM scd_data;

方案优点

  1. 历史数据保留:该方案保证了拉链表中的每条记录都能准确地保留历史数据变化,避免了初始化时丢失数据的问题。
  2. 动态调整有效时间:通过 LEAD(prev_date)max_date 的结合,拉链表能够动态生成每条记录的 start_dateend_date,适应了不同记录的生命周期管理。
  3. 高效处理:利用窗口函数和有条件的过滤逻辑,能够快速剔除无效的重复数据,避免冗余计算和存储。

总结

这个拉链表的构建方案结合了窗口函数的强大功能,通过多次数据处理和时间逻辑的判断,确保每条记录的生效和失效时间清晰,能够准确跟踪数据变化。通过这种方法,可以有效实现历史数据的管理,并且减少了冗余数据的存储和维护工作。

好了,各位,以上是我的分享,大家根据自己的情况,希望对你们有用!