DM-exp-2/code/fpgrowth.py
fly6516 99acd6447f refactor(fpgrowth): 优化 FP-Growth 算法代码
- 添加代码注释,解释每个函数和关键步骤的作用
- 优化变量命名,使其更具可读性和一致性
- 调整代码结构,增加空行和缩进,提高可读性- 移除冗余代码和不必要的注释
2025-03-14 10:18:11 +08:00

104 lines
4.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
from __future__ import print_function # 导入Python 2兼容的print函数
import pandas as pd # 导入pandas库用于数据处理
from collections import defaultdict # 导入defaultdict用于构建默认字典
class FPNode:
def __init__(self, item=None, count=0, parent=None):
self.item = item # 节点对应的项
self.count = count # 节点的计数
self.parent = parent # 节点的父节点
self.children = {} # 节点的子节点字典
self.next = None # 指向下一个相同项的节点
def build_fp_tree(data, min_support):
# 构建FP树
header_table = defaultdict(int) # 初始化头表默认值为0
for transaction in data: # 遍历每一条事务
for item in transaction: # 遍历事务中的每一项
header_table[item] += 1 # 统计每一项的支持度
# 移除不满足最小支持度的项
header_table = {k: v for k, v in header_table.items() if v >= min_support}
if not header_table: # 如果头表为空返回None
return None, None
# 初始化头表
for k in header_table: # 为头表中的每一项初始化链表指针
header_table[k] = [header_table[k], None]
root = FPNode() # 创建FP树的根节点
for transaction in data: # 遍历每一条事务
filtered_items = [item for item in transaction if item in header_table] # 过滤掉不满足最小支持度的项
if filtered_items: # 如果过滤后的事务非空
filtered_items.sort(key=lambda x: header_table[x][0], reverse=True) # 按支持度降序排序
update_fp_tree(filtered_items, root, header_table) # 更新FP树
return root, header_table # 返回FP树和头表
def update_fp_tree(items, node, header_table):
# 更新FP树
if items[0] in node.children: # 如果当前项已经在子节点中
node.children[items[0]].count += 1 # 增加子节点的计数
else: # 如果当前项不在子节点中
new_node = FPNode(item=items[0], count=1, parent=node) # 创建新节点
node.children[items[0]] = new_node # 将新节点添加到子节点字典中
update_header_table(header_table, items[0], new_node) # 更新头表指针
if len(items) > 1: # 如果还有剩余项
update_fp_tree(items[1:], node.children[items[0]], header_table) # 递归更新FP树
def update_header_table(header_table, item, target_node):
# 更新头表指针
if header_table[item][1] is None: # 如果链表为空
header_table[item][1] = target_node # 将新节点作为链表头
else: # 如果链表非空
current = header_table[item][1] # 从链表头开始
while current.next: # 遍历链表
current = current.next # 找到链表尾
current.next = target_node # 将新节点添加到链表尾
def mine_fp_tree(header_table, prefix, min_support, frequent_itemsets):
# 挖掘FP树中的频繁项集
sorted_items = [item[0] for item in sorted(header_table.items(), key=lambda x: x[1][0])] # 按支持度升序排序
for item in sorted_items: # 遍历每一项
new_prefix = prefix.copy() # 复制前缀
new_prefix.add(item) # 将当前项加入前缀
frequent_itemsets.append(new_prefix) # 将新前缀加入频繁项集
conditional_pattern_bases = find_prefix_paths(item, header_table) # 找到条件模式基
conditional_fp_tree, conditional_header_table = build_fp_tree(conditional_pattern_bases, min_support) # 构建条件FP树
if conditional_header_table: # 如果条件头表非空
mine_fp_tree(conditional_header_table, new_prefix, min_support, frequent_itemsets) # 递归挖掘条件FP树
def find_prefix_paths(base_item, header_table):
# 找到条件模式基
paths = []
node = header_table[base_item][1]
while node:
path = []
ascend_tree(node, path)
if path:
paths.append(path)
node = node.next
return paths
def ascend_tree(node, path):
# 从节点向上遍历树
while node.parent and node.parent.item:
path.append(node.parent.item)
node = node.parent
def find_frequent_itemsets(data, min_support):
# 主函数使用FP-Growth算法挖掘频繁项集
root, header_table = build_fp_tree(data, min_support) # 构建FP树和头表
if not root: # 如果FP树为空
return [] # 返回空列表
frequent_itemsets = [] # 初始化频繁项集列表
mine_fp_tree(header_table, set(), min_support, frequent_itemsets) # 挖掘频繁项集
return frequent_itemsets # 返回频繁项集