本文探讨了如何合并来自不同聚类算法、但作用于同一数据集的聚类结果。当不同算法的集群通过共享相同数据项而存在重叠时,需要将这些重叠集群进行传递性合并。文章将阐述此问题本质上是图论中的连通分量发现,并提供基于sql和python/PySpark的解决方案,重点讲解其逻辑、实现步骤及注意事项,以生成统一的最终聚类结果。
1. 理解多算法聚类合并问题
在数据分析实践中,我们常会使用多种聚类算法对同一批数据项进行分析,每种算法可能基于不同的特征或参数生成各自的聚类结果。核心挑战在于,如何将这些独立的聚类结果进行整合,特别是当不同算法产生的集群之间存在重叠(即共享至少一个数据项)时,需要将这些相互关联的集群合并成一个更大的、统一的集群。这种合并必须是传递性的:如果集群A与B重叠,B与C重叠,那么A、B、C都应被合并到同一个最终集群中。
为了确保最终合并的集群有一个确定的标识,我们通常会选择一个确定性规则来生成新的集群键,例如,将合并后所有数据项中的最大ID作为新集群的键。
示例数据与预期输出:
假设我们有以下8个数据项(ID从1到8):
id |
---|
1 |
2 |
… |
8 |
以下是两个不同聚类算法的输出:
聚类结果 1
cluster_key | id |
---|---|
3 | 1 |
3 | 2 |
3 | 3 |
6 | 4 |
6 | 5 |
6 | 6 |
8 | 7 |
8 | 8 |
聚类结果 2
cluster_key | id |
---|---|
1 | 1 |
2 | 2 |
5 | 3 |
5 | 4 |
5 | 5 |
6 | 6 |
8 | 7 |
8 | 8 |
根据“任何共享一个ID的集群都应合并,且合并具有传递性”的规则,我们期望的输出如下:
预期合并输出
cluster_key | id |
---|---|
6 | 1 |
6 | 2 |
6 | 3 |
6 | 4 |
6 | 5 |
6 | 6 |
8 | 7 |
8 | 8 |
逻辑解析:
- 在聚类结果1中,集群{1,2,3}的键是3,集群{4,5,6}的键是6。
- 在聚类结果2中,集群{1}的键是1,集群{2}的键是2,集群{3,4,5}的键是5,集群{6}的键是6。
- 数据项1:在结果1中属于集群3,在结果2中属于集群1。因此,原始集群3和集群1需要合并。
- 数据项2:在结果1中属于集群3,在结果2中属于集群2。因此,原始集群3和集群2需要合并。
- 数据项3:在结果1中属于集群3,在结果2中属于集群5。因此,原始集群3和集群5需要合并。
- 由于合并的传递性,原始集群1、2、3、5、6(来自两个聚类结果)都通过数据项1、2、3、4、5、6关联起来,最终形成一个大集群。这个大集群包含的数据项是{1,2,3,4,5,6},其最大ID为6,故新集群键为6。
- 数据项7和8:在两个聚类结果中都属于键为8的集群,它们是独立的,因此保持不变,新集群键为8。
2. 理论基础:连通分量与传递闭包
上述问题在图论中被称为连通分量(Connected Components)问题,或者更精确地说,是计算特定关系下的传递闭包(Transitive Closure)。
我们可以将每个原始聚类结果中的一个集群视为图中的一个节点。如果两个集群(无论它们来自哪个聚类算法)共享至少一个数据项,那么它们之间就存在一条边。我们的目标是找出这个图中的所有连通分量,即所有相互连接的节点集合。
这种方法可以被描述为基于“非空重叠”的“单链接聚类(Single Link Clustering)”的一种变体,但其应用对象是已有的集群而非原始数据点。它与通常意义上的“共识聚类(Consensus Clustering)”有所不同,共识聚类旨在寻找一个折衷或改进的聚类结果,而这里我们寻求的是通过重叠关系进行完全合并。
3. SQL实现策略
尽管SQL在处理图的传递闭包(尤其是连通分量)方面存在固有限制,特别是对于大型或深度图,递归CTE(Common table Expression)可能面临性能和深度限制。但我们可以利用SQL来完成数据准备和识别直接重叠关系,然后结合外部工具来处理复杂的连通分量计算。
3.1 数据准备与标准化
首先,我们需要将所有聚类结果统一到一个表中,并为每个原始集群分配一个唯一的标识符,以便后续处理。
-- 创建示例数据表 CREATE TABLE Clustering1 (cluster_key INT, id INT); INSERT INTO Clustering1 VALUES (3,1), (3,2), (3,3), (6,4), (6,5), (6,6), (8,7), (8,8); CREATE TABLE Clustering2 (cluster_key INT, id INT); INSERT INTO Clustering2 VALUES (1,1), (2,2), (5,3), (5,4), (5,5), (6,6), (8,7), (8,8); -- 步骤1:标准化集群数据 -- 为每个原始集群分配一个唯一的 original_cluster_id -- 这里我们假设 original_cluster_id 可以通过 (source_table, cluster_key) 唯一标识 -- 或者更简单地,直接生成一个序列号 WITH AllClustersRaw AS ( SELECT 'C1' AS source, cluster_key, id FROM Clustering1 union ALL SELECT 'C2' AS source, cluster_key, id FROM Clustering2 ), ClusteredItems AS ( SELECT ROW_NUMBER() OVER (ORDER BY source, cluster_key) AS original_cluster_id, source, cluster_key, id FROM ( SELECT DISTINCT source, cluster_key, id FROM AllClustersRaw ) AS distinct_clusters_and_items ) SELECT * FROM ClusteredItems;
ClusteredItems 表的示例输出(original_cluster_id 可能因具体数据库和数据而异):
original_cluster_id | source | cluster_key | id |
---|---|---|---|
1 | C1 | 3 | 1 |
1 | C1 | 3 | 2 |
1 | C1 | 3 | 3 |
2 | C1 | 6 | 4 |
… | … | … | … |
5 | C2 | 1 | 1 |
6 | C2 | 2 | 2 |
… | … | … | … |
3.2 识别直接重叠关系
接下来,我们需要找出哪些original_cluster_id之间存在直接重叠(即共享至少一个id)。这将为我们构建图的边列表。
-- 步骤2:识别直接连接的集群对 WITH AllClustersRaw AS ( SELECT 'C1' AS source, cluster_key, id FROM Clustering1 UNION ALL SELECT 'C2' AS source, cluster_key, id FROM Clustering2 ), ClusteredItems AS ( SELECT DENSE_RANK() OVER (ORDER BY source, cluster_key) AS original_cluster_id, source, cluster_key, id FROM ( SELECT DISTINCT source, cluster_key, id FROM AllClustersRaw ) AS distinct_clusters_and_items ) SELECT DISTINCT c1.original_cluster_id AS cluster_a, c2.original_cluster_id AS cluster_b FROM ClusteredItems c1 JOIN ClusteredItems c2 ON c1.id = c2.id AND c1.original_cluster_id < c2.original_cluster_id;
输出示例(代表了图的边):
cluster_a | cluster_b |
---|---|
1 | 5 |
1 | 6 |
1 | 7 |
2 | 7 |
2 | 8 |
3 | 9 |
4 | 9 |
(注意:这里的original_cluster_id是基于DENSE_RANK生成的,与前面的ROW_NUMBER可能不同,但原理一致。例如,如果C1-3的original_cluster_id是1,C2-1是5,那么1,5就是一条边。)
3.3 计算连通分量(SQL挑战与外部工具推荐)
在标准SQL中,直接计算任意图的连通分量(即传递闭包)是复杂的。虽然一些数据库支持递归CTE,可以用于有限深度的路径查找,但将其应用于通用连通分量计算,特别是要为每个分量分配一个统一的merged_component_id,往往效率低下且难以维护。
SQL模拟思路(迭代更新 – 概念性): 对于小型数据集,可以尝试通过迭代更新来模拟Union-Find算法。这需要一个临时表来存储每个original_cluster_id的当前“根”组件ID,并通过多次UPDATE语句来传播这些根ID,直到没有更多的更新发生。
-- 概念性SQL迭代更新(不推荐用于生产环境,尤其大型数据集) -- 假设我们有一个临时表 ComponentRoots (original_cluster_id INT, root_id INT) -- 初始化:每个集群是自己的根 -- INSERT INTO ComponentRoots SELECT original_cluster_id, original_cluster_id FROM (SELECT DISTINCT original_cluster_id FROM ClusteredItems); -- 迭代更新: -- WHILE EXISTS (SELECT 1 FROM ComponentRoots cr1 JOIN ComponentRoots cr2 ON ... WHERE cr1.root_id != cr2.root_id) -- BEGIN -- UPDATE ComponentRoots SET root_id = LEAST(cr1.root_id, cr2.root_id) -- FROM ComponentRoots cr1 JOIN ComponentRoots cr2 ON ... (using the overlap relationships) -- WHERE cr1.root_id != cr2.root_id; -- END;
这种方法通常不是纯粹的SQL查询,而是需要在应用层进行循环控制的批处理操作。
推荐方案: 将步骤3.2中生成的重叠关系(边列表)导出,利用外部编程语言(如Python)的图库或Union-Find算法进行高效计算。这是处理连通分量最健壮和性能最佳的方法。
4. Python/PySpark实现(推荐方案)
Python及其丰富的库生态系统,如networkx(图论库)或通过实现Union-Find数据结构,能够高效地解决连通分量问题。对于大规模数据,PySpark及其GraphFrames库是理想选择。
4.1 使用Union-Find数据结构
Union-Find是一种用于处理不相交集合的算法,非常适合解决连通分量问题。
Union-Find 类实现:
class UnionFind: def __init__(self, elements): self.parent = {e: e for e in elements} self.rank = {e: 0 for e in elements} # Optional: for optimization (union by rank) def find(self, i):