Polars中分组列表列求交集的进阶技巧

Polars中分组列表列求交集的进阶技巧

本文探讨了如何在Polars中对包含字符串列表的列进行分组求交集操作。传统的reduce结合列表集合操作往往难以直接实现预期效果。文章提供了一种高效且灵活的解决方案,通过将列表列扁平化,利用行索引和组内计数来识别共同元素,最终重新聚合以获得每个分组内所有列表的交集。此方法避免了复杂的列表操作,转而利用Polars强大的表达式引擎和窗口函数,实现精确的分组交集计算。

1. 问题背景与挑战

在数据处理中,我们经常会遇到包含列表(list)类型数据的列。当需要对这些列表进行分组聚合,并计算每个组内所有列表的交集时,polars提供了一些内置的列表操作,但直接应用它们可能无法达到预期效果。例如,使用 pl.reduce 结合 list.set_intersection 通常会遇到类型不匹配或逻辑不符的问题。

考虑以下示例DataFrame,其中包含 id 和 values(字符串列表)两列:

import polars as pl  df = pl.DataFrame(    {"id": [1,1,2,2,3,3],     "values": [["A", "B"], ["B", "C"], ["A", "B"], ["B", "C"], ["A", "B"], ["B", "C"]]    } )  print(df)

输出:

shape: (6, 2) ┌─────┬────────────┐ │ id  ┆ values     │ │ --- ┆ ---        │ │ i64 ┆ list[str]  │ ╞═════╪════════════╡ │ 1   ┆ ["A", "B"] │ │ 1   ┆ ["B", "C"] │ │ 2   ┆ ["A", "B"] │ │ 2   ┆ ["B", "C"] │ │ 3   ┆ ["A", "B"] │ │ 3   ┆ ["B", "C"] │ └─────┴────────────┘

我们的目标是计算每个 id 组内 values 列所有列表的交集。例如,对于 id=1,其对应的列表是 [“A”, “B”] 和 [“B”, “C”],它们的交集应为 [“B”]。最终期望的输出是:

shape: (3, 2) ┌─────┬───────────┐ │ id  ┆ values    │ │ --- ┆ ---       │ │ i64 ┆ list[str] │ ╞═════╪═══════════╡ │ 1   ┆ ["B"]     │ │ 2   ┆ ["B"]     │ │ 3   ┆ ["B"]     │ └─────┴───────────┘

直接尝试使用 pl.reduce 可能会导致不符合预期的结果:

# 尝试1:reduce直接作用于列 # df.group_by("id").agg( #     pl.reduce(function=lambda acc, x: acc.list.set_intersection(x), #               exprs=pl.col("values")) # ) # 结果:list[list[str]],不是交集  # 尝试2:explode后再reduce # df.group_by("id").agg( #     pl.reduce(function=lambda acc, x: acc.list.set_intersection(x), #               exprs=pl.col("values").explode()) # ) # 结果:list[str],但只是简单拼接,并非交集

这些尝试失败的原因在于 pl.reduce 在这种场景下难以正确处理列表的迭代交集,或者 explode 操作改变了数据结构,使其不再适合直接的集合操作。

2. 解决方案:扁平化、计数与重聚合

解决此问题的核心思路是:将列表列扁平化,然后利用Polars的窗口函数(over)和聚合功能来识别在每个组内所有原始列表中都出现的元素,最后再将这些元素重新聚合为列表。

2.1 步骤分解

  1. 计算组的长度: 获取每个 id 组中原始行的数量。这将用于后续判断一个元素是否在组内所有列表中都出现。
  2. 添加行索引: 为DataFrame添加一个唯一的行索引,以便在扁平化后,我们仍然可以追溯每个元素来自哪一行。
  3. 扁平化列表列: 使用 explode 将 values 列中的每个列表元素展开为单独的行。
  4. 计算元素在组内出现的原始行数: 对于每个扁平化后的元素,计算它在当前 id 组中对应了多少个不同的原始行索引。
  5. 过滤共同元素: 筛选出那些其对应原始行数等于该 id 组总行数的元素。这些元素即为所有原始列表的交集。
  6. 重新聚合: 按照 id 再次分组,并将过滤后的元素聚合回列表。

2.2 代码实现

我们将分步展示上述过程。

第一步:准备数据并添加组长度和行索引

首先,我们添加一个 group_len 列,表示每个 id 组的原始行数。这通过 pl.len().over(“id”) 实现。接着,使用 with_row_index() 添加一个全局唯一的 index 列。

df_prepared = (     df.with_columns(pl.len().over("id").alias("group_len"))     .with_row_index() ) print(df_prepared)

输出:

shape: (6, 3) ┌───────┬─────┬────────────┬───────────┐ │ index ┆ id  ┆ values     ┆ group_len │ │ ---   ┆ --- ┆ ---        ┆ ---       │ │ u32   ┆ i64 ┆ list[str]  ┆ u32       │ ╞═══════╪═════╪════════════╪═══════════╡ │ 0     ┆ 1   ┆ ["A", "B"] │ 2         │ │ 1     ┆ 1   ┆ ["B", "C"] │ 2         │ │ 2     ┆ 2   ┆ ["A", "B"] │ 2         │ │ 3     ┆ 2   ┆ ["B", "C"] │ 2         │ │ 4     ┆ 3   ┆ ["A", "B"] │ 2         │ │ 5     ┆ 3   ┆ ["B", "C"] │ 2         │ └───────┴─────┴────────────┴───────────┘

第二步:扁平化 values 列并计算元素在组内出现的原始行数

现在,我们将 values 列扁平化,然后计算每个 (id, values) 对中,有多少个唯一的原始行索引。如果这个数量等于 group_len,则说明该 values 元素在当前 id 组的所有原始列表中都出现了。

df_exploded_counted = (     df_prepared     .explode("values")     .with_columns(         pl.col("index").n_unique().over("id", "values").alias("n_unique_rows")     ) ) print(df_exploded_counted)

输出:

shape: (12, 5) ┌───────┬─────┬────────┬───────────┬───────────────┐ │ index ┆ id  ┆ values ┆ group_len ┆ n_unique_rows │ │ ---   ┆ --- ┆ ---    ┆ ---       ┆ ---           │ │ u32   ┆ i64 ┆ str    ┆ u32       ┆ u32           │ ╞═══════╪═════╪════════╪═══════════╪═══════════════╡ │ 0     ┆ 1   ┆ A      ┆ 2         ┆ 1             │ │ 0     ┆ 1   ┆ B      ┆ 2         ┆ 2             │ # 'B'在id=1组中来自index 0和1 │ 1     ┆ 1   ┆ B      ┆ 2         ┆ 2             │ │ 1     ┆ 1   ┆ C      ┆ 2         ┆ 1             │ │ 2     ┆ 2   ┆ A      ┆ 2         ┆ 1             │ │ 2     ┆ 2   ┆ B      ┆ 2         ┆ 2             │ # 'B'在id=2组中来自index 2和3 │ 3     ┆ 2   ┆ B      ┆ 2         ┆ 2             │ │ 3     ┆ 2   ┆ C      ┆ 2         ┆ 1             │ │ 4     ┆ 3   ┆ A      ┆ 2         ┆ 1             │ │ 4     ┆ 3   ┆ B      ┆ 2         ┆ 2             │ # 'B'在id=3组中来自index 4和5 │ 5     ┆ 3   ┆ B      ┆ 2         ┆ 2             │ │ 5     ┆ 3   ┆ C      ┆ 2         ┆ 1             │ └───────┴─────┴────────┴───────────┴───────────────┘

从输出可以看出,对于 id=1,元素 “A” 只有 n_unique_rows=1(因为它只在 index=0 的原始行中出现),而元素 “B” 有 n_unique_rows=2(因为它在 index=0 和 index=1 的原始行中都出现过)。由于 group_len 也是2,这表明 “B” 是 id=1 组中所有列表的交集元素。

第三步:过滤并重新聚合

最后一步是过滤出那些 n_unique_rows 等于 group_len 的行,然后按 id 分组并聚合 values 列。为了确保结果列表中没有重复项,我们使用 pl.col.values.unique()。

final_result = (     df.with_columns(pl.len().over("id").alias("group_len"))     .with_row_index()     .explode("values")     .filter(         pl.col("index").n_unique().over("id", "values")         == pl.col("group_len")     )     .group_by("id", maintain_order=True)     .agg(pl.col("values").unique()) # 使用.unique()确保结果列表中元素唯一 )  print(final_result)

最终输出:

shape: (3, 2) ┌─────┬───────────┐ │ id  ┆ values    │ │ --- ┆ ---       │ │ i64 ┆ list[str] │ ╞═════╪═══════════╡ │ 1   ┆ ["B"]     │ │ 2   ┆ ["B"]     │ │ 3   ┆ ["B"]     │ └─────┴───────────┘

这正是我们期望的结果。

3. 注意事项与总结

  • 性能考量: 这种方法涉及 explode 操作,它会增加DataFrame的行数。对于包含非常大列表的超大型数据集,这可能会消耗较多内存。然而,Polars的内部优化通常能很好地处理此类操作。
  • 通用性: 这种扁平化-计数-重聚合的模式不仅适用于列表交集,还可以推广到其他复杂的组内列表操作,例如查找在至少N个列表中出现的元素。
  • maintain_order=True: 在 group_by 中使用 maintain_order=True 可以确保最终结果的组顺序与原始DataFrame中首次出现的组顺序一致。如果顺序不重要,可以省略。
  • unique() 的使用: 在 agg 阶段使用 pl.col(“values”).unique() 是为了确保最终的交集列表只包含唯一的元素,因为在扁平化和过滤过程中,同一个交集元素可能会被多次选中。

通过将列表操作转换为更适合Polars表达式引擎的扁平化和窗口函数操作,我们能够高效且准确地实现分组列表的交集计算。这种方法体现了Polars在处理复杂数据转换时的强大灵活性。

© 版权声明
THE END
喜欢就支持一下吧
点赞13 分享