from collections import Counter import numpy as np from sklearn.base import BaseEstimator, ClassifierMixin # 导入BaseEstimator和ClassifierMixin class C45DecisionTree(BaseEstimator, ClassifierMixin): # 继承BaseEstimator和ClassifierMixin def __init__(self): self.tree = None # 初始化决策树结构为空 def entropy(self, y): """ 计算信息熵 (Entropy) :param y: 标签列表 :return: 信息熵值 """ counts = np.bincount(y) # 统计每个类别的数量 probabilities = counts / len(y) # 计算每个类别的概率 return -np.sum([p * np.log2(p) for p in probabilities if p > 0]) # 使用信息熵公式计算熵值 def information_gain(self, X, y, feature_index): """ 计算信息增益 (Information Gain) :param X: 特征矩阵 :param y: 标签列表 :param feature_index: 当前特征索引 :return: 信息增益值 """ total_entropy = self.entropy(y) # 计算总熵 values, counts = np.unique(X[:, feature_index], return_counts=True) # 获取当前特征的唯一值及其数量 weighted_entropy = sum((counts[i] / len(y)) * self.entropy(y[X[:, feature_index] == value]) for i, value in enumerate(values)) # 计算加权熵 return total_entropy - weighted_entropy # 信息增益等于总熵减去加权熵 def fit(self, X, y): """ 构建决策树 :param X: 特征矩阵 :param y: 标签列表 """ self.tree = self._build_tree(X, y) # 调用递归函数构建决策树 return self # 返回自身以符合scikit-learn的接口规范 def _build_tree(self, X, y): """ 递归构建决策树 :param X: 特征矩阵 :param y: 标签列表 :return: 决策树节点 """ if len(np.unique(y)) == 1: # 如果所有样本属于同一类别,则返回该类别 return y[0] if X.shape[1] == 0: # 如果没有剩余特征,则返回多数类别 return Counter(y).most_common(1)[0][0] # 选择信息增益最大的特征 best_feature = np.argmax([self.information_gain(X, y, i) for i in range(X.shape[1])]) values = np.unique(X[:, best_feature]) # 获取当前特征的唯一值 tree = {best_feature: {}} # 构建树节点,使用字典表示 for value in values: sub_X = X[X[:, best_feature] == value] # 划分子集,获取当前特征值对应的样本 sub_y = y[X[:, best_feature] == value] # 获取对应的标签 tree[best_feature][value] = self._build_tree(sub_X, sub_y) # 递归构建子树 return tree def predict_sample(self, tree, x): """ 预测单个样本 :param tree: 决策树 :param x: 单个样本 :return: 预测类别 """ if not isinstance(tree, dict): # 如果是叶子节点,直接返回类别 return tree feature = list(tree.keys())[0] # 获取当前节点的特征 value = x[feature] # 获取样本在该特征上的值 subtree = tree[feature].get(value) # 获取对应的子树 if subtree is None: # 如果子树不存在,返回多数类别 return Counter(x).most_common(1)[0][0] return self.predict_sample(subtree, x) # 递归预测 def predict(self, X): """ 预测多个样本 :param X: 特征矩阵 :return: 预测类别列表 """ predictions = [self.predict_sample(self.tree, x) for x in X] # 对每个样本调用predict_sample方法 return np.array(predictions, dtype=int) # 确保返回的预测结果是整数类型