在阐述具体的3个方案之前,我先总结下3个方案的优缺点:
方案一:
优点:思路简洁,易于维护。
缺点:初始化时会丢失历史数据的变化,维护成本较高。
方案二:
优点:思路简洁,易于维护,初始化时不会丢失历史数据的变化。
缺点:初始化周期较长,若历史数据量大,会耗费大量时间成本,同时维护成本较高。
方案三:
优点:不会丢失历史数据变化,且接近免维护。
缺点:资源占用较多,若历史数据量大,会带来较高的计算资源成本。
方案一:不考虑历史变化的情况
如果所有的表都是 分区表,在将 快照表 转换为 拉链表 时,我们可以充分利用分区特性,优化查询性能并提高更新效率。分区表通过将数据按照某个字段进行逻辑分割,减少了每次查询时的数据扫描量,非常适合处理大规模数据。本文中,我们将展示如何基于分区表来完成这个转换过程。
场景描述
假设我们有一张按日期分区的 快照表 snapshot_table
,其分区字段是 date
,记录着每天的用户信息快照。该表的结构如下:
CREATE TABLE snapshot_table (
id STRING,
name STRING,
email STRING,
update_time TIMESTAMP
) PARTITIONED BY (date STRING) -- 按日期分区
STORED AS ORC;
目标:将 snapshot_table
转换为拉链表 dim_user_scd
拉链表 dim_user_scd
将跟踪每个用户数据的历史变化,并且也是分区表,按照 start_date
进行分区,以优化查询和更新性能。拉链表的结构如下:
CREATE TABLE dim_user_scd (
id STRING,
name STRING,
email STRING,
start_date STRING,
end_date STRING
) PARTITIONED BY (start_date STRING) -- 按开始日期分区
STORED AS ORC;
实现步骤
1. 初始化拉链表(首次加载)
在初次创建拉链表时,我们需要将最新的快照数据插入到拉链表中。为了确保插入的数据记录当前有效,设置每条记录的 end_date
为 '9999-12-31'
。
假设我们的快照表从 2024-09-20 开始,我们可以从该日期的分区中加载数据到拉链表:
-- 假设快照表的日期是2024-09-20
INSERT INTO dim_user_scd PARTITION (start_date='2024-09-20')
SELECT
id,
name,
email,
'2024-09-20' AS start_date,
'9999-12-31' AS end_date
FROM snapshot_table
WHERE date = '2024-09-20';
这一步将快照表的最新数据导入拉链表,并将其设置为当前有效。
2. 增量更新拉链表
随着每天快照表的更新,我们需要增量更新拉链表,确保数据的历史变化被记录。为了实现增量更新,我们可以依赖两个步骤:
- 更新历史记录的结束日期:对于那些发生变化的记录,需要将之前有效的记录标记为失效(更新
end_date
)。 - 插入新的记录:对于新增或变化的记录,需要插入新的记录,表示该记录的最新状态。
2.1 更新已存在但发生变化的记录
首先,我们更新拉链表中已经存在但发生变化的记录,将其 end_date
更新为当前日期,表示这些记录不再有效。
WITH snapshot AS (
SELECT
id,
name,
email
FROM snapshot_table
WHERE date = '2024-09-21' -- 假设这是增量数据
),
scd_active AS (
SELECT
id,
name,
email,
start_date,
end_date
FROM dim_user_scd
WHERE end_date = '9999-12-31' -- 当前有效的记录
)
-- 更新已存在但数据变化的记录
INSERT INTO dim_user_scd PARTITION (start_date)
SELECT
scd.id,
scd.name,
scd.email,
scd.start_date,
'2024-09-21' AS end_date -- 结束日期更新为当前日期
FROM scd_active scd
JOIN snapshot sn
ON scd.id = sn.id
WHERE (scd.name != sn.name OR scd.email != sn.email);
这一步会将当前有效的数据(end_date = '9999-12-31'
)中发生变化的记录,更新为无效(将 end_date
更新为当前日期)。
2.2 插入新增或变化后的新记录
接下来,插入新的或更新后的记录,这些记录表示最新的用户信息,并且设置新的 start_date
和 end_date
。
-- 插入新的或变化后的记录
INSERT INTO dim_user_scd PARTITION (start_date='2024-09-21')
SELECT
sn.id,
sn.name,
sn.email,
'2024-09-21' AS start_date, -- 当前日期作为生效日期
'9999-12-31' AS end_date -- end_date 设置为未来日期,表示当前有效
FROM snapshot sn
LEFT JOIN scd_active scd
ON sn.id = scd.id
WHERE scd.id IS NULL -- 新增的记录
OR (scd.name != sn.name OR scd.email != sn.email); -- 发生变化的记录
这一步将处理发生变化的记录或新增的记录,将其插入到拉链表中,并设置其 start_date
为当前日期,end_date
为 '9999-12-31'
。
3. 分区优化
分区表可以极大提高数据查询和更新的效率,因此我们可以考虑使用以下策略来进一步优化:
- 分区修剪:由于拉链表是按
start_date
分区的,Hive 在查询或更新时会自动使用分区修剪技术,减少扫描的数据量。只要查询时指定分区条件,Hive 只会扫描相关分区的数据。 - 动态分区插入:Hive 支持动态分区插入,可以在插入新记录时根据
start_date
自动创建分区,这样可以避免手动创建分区的麻烦。
-- 启用动态分区插入
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
-- 使用动态分区插入更新后的记录
INSERT INTO dim_user_scd PARTITION (start_date)
SELECT
sn.id,
sn.name,
sn.email,
'2024-09-21' AS start_date, -- 根据增量数据的日期动态创建分区
'9999-12-31' AS end_date
FROM snapshot sn;
通过启用动态分区,Hive 会自动为新的 start_date
创建对应的分区,避免手动维护分区。
4. 示例执行流程
假设快照表 snapshot_table
每天更新一次,并且每天的增量数据会被插入到拉链表中。
4.1 首次加载
快照表 2024-09-20
的数据:
-- 快照表 2024-09-20 的数据
id | name | email | date
---------------------------------------------------
1 | Alice | alice@gmail.com | 2024-09-20
2 | Bob | bob@gmail.com | 2024-09-20
3 | Carol | carol@gmail.com | 2024-09-20
首次加载后,拉链表 dim_user_scd
的数据:
-- 拉链表 dim_user_scd(2024-09-20 初始加载)
id | name | email | start_date | end_date
---------------------------------------------------------------------
1 | Alice | alice@gmail.com | 2024-09-20 | 9999-12-31
2 | Bob | bob@gmail.com | 2024-09-20 | 9999-12-31
3 | Carol | carol@gmail.com | 2024-09-20 | 9999-12-31
4.2 第二天增量更新
快照表 2024-09-21
的数据:
-- 快照表 2024-09-21 的数据
id | name | email | date
---------------------------------------------------
1 | Alice | alice_new@gmail.com | 2024-09-21 -- Alice 的 email 发生变化
2 | Bob | bob@gmail.com | 2024-09-21 -- 无变化
4 | David | david@gmail.com | 2024-09-21 -- 新增用户
增量更新后,拉链表 dim_user_scd
的数据:
-- 拉链表 dim_user_scd(更新后)
id | name | email | start_date | end_date
---------------------------------------------------------------------
1 | Alice | alice@gmail.com | 2024-09-20 | 2024-09-21 -- 原始记录失效
1 | Alice | alice_new@gmail.com | 2024-09-21 | 9999-12-31 -- 新记录生效
2 | Bob | bob@gmail.com | 2024-09-20 | 9999-12
这个方案的缺点在于历史数据的变化会丢失,也就是我们在做初始化的时候,拉链表的数据只有一种情况,那就是全部的end_date
都是9999-12-31
,第二天(2024-09-25)数据更新后,也只会出现end_date
= 2024-09-25
的情况,如果快照表分区的开始时间是2023-09-25
,那么过去一年的历史变化都不会在拉链表中体现;
方案二:初始化时候考虑数据的历史变化
问题描述
基于方案一:在将快照表转换为拉链表时,如果直接从快照表的最新分区加载数据,我们只会捕获当前的数据状态,而忽略过去的变化记录。为了正确构建拉链表,我们需要遍历快照表中的所有分区,按日期顺序依次处理每个分区的数据,记录每一天的变化。
解决方案
我们需要对快照表的所有分区进行遍历处理,将每个分区的数据按时间顺序加载到拉链表中,并在插入新的变化记录时适当地更新历史记录的结束时间。
实现步骤
1. 初始化拉链表
首先,创建一个空的拉链表 dim_user_scd
,并设置 start_date
和 end_date
来记录每条记录的生效和失效日期。
CREATE TABLE IF NOT EXISTS dim_user_scd (
id STRING,
name STRING,
email STRING,
start_date STRING,
end_date STRING
) PARTITIONED BY (start_date STRING) -- 按开始日期分区
STORED AS ORC;
2. 按日期顺序处理快照表分区
为了保留历史变化记录,我们需要按顺序处理快照表的每个分区,并将变化记录插入拉链表中。首先,获取快照表的所有分区,然后依次处理每个分区的数据。
-- 获取快照表的所有分区
SHOW PARTITIONS snapshot_table;
假设返回的分区是:
date=2024-09-20
date=2024-09-21
date=2024-09-22
我们需要按日期顺序处理每个分区的记录。
3. 处理每个分区的数据
对于每个分区,我们需要进行以下几步操作:
- 找到该分区的数据:从快照表中获取该日期的快照数据。
- 更新拉链表中已有记录的结束时间:对于那些数据发生变化的记录,将其标记为失效。
- 插入新的或变化的记录:对于新增或变化的记录,将其插入拉链表,并标记为当前有效。
3.1 更新已有记录的结束时间
对于每个分区,首先查找那些在拉链表中已经存在但数据发生变化的记录,并将其 end_date
更新为当前分区的日期,表示这些记录不再有效。
WITH snapshot AS (
SELECT
id,
name,
email
FROM snapshot_table
WHERE date = '2024-09-21' -- 当前处理的分区
),
scd_active AS (
SELECT
id,
name,
email,
start_date,
end_date
FROM dim_user_scd
WHERE end_date = '9999-12-31' -- 查找当前有效的记录
)
-- 更新拉链表中已经存在但数据发生变化的记录
INSERT INTO dim_user_scd PARTITION (start_date)
SELECT
scd.id,
scd.name,
scd.email,
scd.start_date,
'2024-09-21' AS end_date -- 当前分区的日期为结束日期
FROM scd_active scd
JOIN snapshot sn
ON scd.id = sn.id
WHERE (scd.name != sn.name OR scd.email != sn.email);
3.2 插入新的或变化的记录
接下来,我们插入那些在当前分区内新增的或发生变化的记录,将其 start_date
设置为当前分区的日期,表示这些记录自该日期开始生效。
-- 插入新的或变化的记录
INSERT INTO dim_user_scd PARTITION (start_date='2024-09-21')
SELECT
sn.id,
sn.name,
sn.email,
'2024-09-21' AS start_date, -- 当前分区的日期为生效日期
'9999-12-31' AS end_date -- 设置 end_date 为未来日期,表示当前有效
FROM snapshot sn
LEFT JOIN scd_active scd
ON sn.id = scd.id
WHERE scd.id IS NULL -- 新增的记录
OR (scd.name != sn.name OR scd.email != sn.email); -- 发生变化的记录
4. 动态处理所有分区
为了自动化地处理每个分区,可以通过编写一个 Hive 脚本,依次处理每个日期的分区。伪代码如下:
for partition in $(hive -e "SHOW PARTITIONS snapshot_table;")
do
# 提取日期
date=$(echo $partition | cut -d'=' -f2)
# 执行更新和插入操作
hive -e "
-- 更新历史记录的 end_date
WITH snapshot AS (
SELECT id, name, email FROM snapshot_table WHERE date = '$date'
),
scd_active AS (
SELECT id, name, email, start_date, end_date FROM dim_user_scd WHERE end_date = '9999-12-31'
)
INSERT INTO dim_user_scd PARTITION (start_date)
SELECT scd.id, scd.name, scd.email, scd.start_date, '$date' AS end_date
FROM scd_active scd
JOIN snapshot sn ON scd.id = sn.id
WHERE (scd.name != sn.name OR scd.email != sn.email);
-- 插入新的或变化的记录
INSERT INTO dim_user_scd PARTITION (start_date='$date')
SELECT sn.id, sn.name, sn.email, '$date' AS start_date, '9999-12-31' AS end_date
FROM snapshot sn
LEFT JOIN scd_active scd ON sn.id = scd.id
WHERE scd.id IS NULL OR (scd.name != sn.name OR scd.email != sn.email);
"
done
通过此循环脚本,依次处理每个分区,并确保每一天的数据变化都会被记录到拉链表中。
5. 总结
通过上述改进后的方案,我们能够完整保留快照表从开始日期到当前日期的所有数据变化,并正确地将其转换为拉链表。具体步骤如下:
- 按日期顺序处理快照表的所有分区:确保每个分区的数据都会被处理,不会遗漏历史变化。
- 更新已有记录的结束时间:对于那些已经存在的记录,如果数据发生了变化,将其
end_date
设置为变化发生的日期。 - 插入新的或变化的记录:对于新增或更新后的记录,将其插入拉链表,并标记为当前有效。
这样,拉链表将保留所有数据变化的历史,并能够通过分区表优化查询性能。
但这么做也有缺点:1、初始化会耗费很长时间,因为在遍历的过程中,整个执行过程是单进程的,无法并行处理,所以会耗费相当的时间;2、在实际应用中,如果出现过去某个分区数据异常,或者快照表中某个分区异常,那么初始化动作将会重做,维护成本相当高;
方案三:保留历史数据,数据趋于免维护
这个方案是我在过去的某个项目中自创的,我认为是目前最好的方案,思路是在尽可能节约执行时间的前提下,每天初始化一遍,抛砖引玉,希望提供给大家参考。
该方案的目标是通过每日调度快照表数据,构建一张能够跟踪数据变化的拉链表,记录每条数据的有效时间段(start_date
到 end_date
)。此方案采用全分区扫描,使用窗口函数对比相邻分区数据,避免存储重复数据,并为每条记录生成准确的生效和失效时间。
表结构
-
快照表 (
snapshot_table
):name
:用户名称email
:用户电子邮件date
:日期(分区字段)
-
拉链表 (
dim_user_scd
):name
:用户名称email
:用户电子邮件start_date
:数据生效日期end_date
:数据失效日期
方案流程
步骤 1:获取快照表所有分区数据
每天从快照表获取所有按 date
分区的数据,通常通过调度系统触发。
-- 获取快照表的所有分区
SHOW PARTITIONS snapshot_table;
步骤 2:对比相邻分区数据,剔除重复的记录
-
窗口函数
LAG
和MAX
的使用:- 对每个
name
的数据按date
升序排列。 - 使用
LAG(email)
来获取上一行的email
以对比变化。 - 使用
MAX(date)
获取同一name
的最大date
。
- 对每个
-
剔除无变化的数据:
- 如果
email
没有发生变化,则不再保留这条记录。 - 保留有变化的数据,包括上一行的
email
和date
作为prev_email
和prev_date
。
- 如果
WITH ordered_data AS (
SELECT
name,
email,
date,
LAG(email) OVER (PARTITION BY name ORDER BY date) AS prev_email, -- 获取上一行的 email
LAG(date) OVER (PARTITION BY name ORDER BY date) AS prev_date, -- 获取上一行的 date
MAX(date) OVER (PARTITION BY name ORDER BY date) AS max_date -- 获取同一 name 的最大 date
FROM snapshot_table
),
filtered_data AS (
SELECT
name,
email,
date,
prev_email,
prev_date,
max_date
FROM ordered_data
WHERE prev_email IS NULL OR email != prev_email -- 如果 email 不相同,保留该行数据
);
解释:
LAG(email)
:获取上一行email
,用于当前行的比较。prev_email
和prev_date
:分别存储上一行的email
和date
,用于后续对比和计算。
步骤 3:计算 start_date
和 end_date
- 窗口函数
LEAD(prev_date)
生成next_date
:- 使用
LEAD(prev_date)
获取下一行的prev_date
作为当前行的end_date
。 - 如果没有下一行数据,则通过
max_date
确定end_date
:- 如果
max_date
是当前日期,end_date
设为'9999-12-31'
表示当前数据仍然有效。 - 否则,将
end_date
设为max_date
,表示历史数据的失效时间。
- 如果
- 使用
WITH final_data AS (
SELECT
name,
email,
date AS start_date,
LEAD(prev_date) OVER (PARTITION BY name ORDER BY date) AS next_date, -- 获取下一行的 prev_date 作为 end_date
max_date
FROM filtered_data
),
scd_data AS (
SELECT
name,
email,
start_date,
CASE
WHEN next_date IS NULL THEN -- 如果没有下一行数据
CASE
WHEN max_date = current_date() THEN '9999-12-31' -- 如果 max_date 是今天,赋值 '9999-12-31'
ELSE max_date -- 否则赋值为 max_date
END
ELSE next_date -- 有下一行时,赋值为 next_date
END AS end_date
FROM final_data
);
解释:
LEAD(prev_date)
:获取下一行的prev_date
作为当前行的end_date
,确保上下记录的时间段对接。CASE
语句:处理next_date
为空的情况,判断max_date
是否是当前日期来动态设定end_date
。
步骤 4:将结果插入到拉链表
将经过处理的记录插入到拉链表 dim_user_scd
中,记录每条数据的 start_date
和 end_date
。
-- 插入结果到拉链表
INSERT INTO dim_user_scd
SELECT
name,
email,
start_date,
end_date
FROM scd_data;
解释:
- 将
scd_data
中生成的有效时间段插入到拉链表中,确保每条记录的生效和失效时间清晰明了。
完整 SQL 实现
WITH ordered_data AS (
SELECT
name,
email,
date,
LAG(email) OVER (PARTITION BY name ORDER BY date) AS prev_email, -- 获取上一行的 email
LAG(date) OVER (PARTITION BY name ORDER BY date) AS prev_date, -- 获取上一行的 date
MAX(date) OVER (PARTITION BY name ORDER BY date) AS max_date -- 获取同一 name 的最大 date
FROM snapshot_table
),
filtered_data AS (
SELECT
name,
email,
date,
prev_email,
prev_date,
max_date
FROM ordered_data
WHERE prev_email IS NULL OR email != prev_email -- 如果 email 不相同,保留该行数据
),
final_data AS (
SELECT
name,
email,
date AS start_date,
LEAD(prev_date) OVER (PARTITION BY name ORDER BY date) AS next_date, -- 获取下一行的 prev_date 作为 end_date
max_date
FROM filtered_data
),
scd_data AS (
SELECT
name,
email,
start_date,
CASE
WHEN next_date IS NULL THEN -- 如果没有下一行数据
CASE
WHEN max_date = current_date() THEN '9999-12-31' -- 如果 max_date 是今天,赋值 '9999-12-31'
ELSE max_date -- 否则赋值为 max_date
END
ELSE next_date -- 有下一行时,赋值为 next_date
END AS end_date
FROM final_data
)
-- 插入结果到拉链表
INSERT INTO dim_user_scd
SELECT
name,
email,
start_date,
end_date
FROM scd_data;
方案优点
- 历史数据保留:该方案保证了拉链表中的每条记录都能准确地保留历史数据变化,避免了初始化时丢失数据的问题。
- 动态调整有效时间:通过
LEAD(prev_date)
和max_date
的结合,拉链表能够动态生成每条记录的start_date
和end_date
,适应了不同记录的生命周期管理。 - 高效处理:利用窗口函数和有条件的过滤逻辑,能够快速剔除无效的重复数据,避免冗余计算和存储。
总结
这个拉链表的构建方案结合了窗口函数的强大功能,通过多次数据处理和时间逻辑的判断,确保每条记录的生效和失效时间清晰,能够准确跟踪数据变化。通过这种方法,可以有效实现历史数据的管理,并且减少了冗余数据的存储和维护工作。