代码拉取完成,页面将自动刷新
from scipy.io import arff
import pandas as pd
import numpy as np
from scipy.sparse import issparse
from sklearn.metrics import mutual_info_score
from scipy.stats.stats import pearsonr
from sklearn.metrics import make_scorer
from sklearn.metrics import recall_score
from sklearn.model_selection import cross_val_score
from sklearn import model_selection
from sklearn import metrics
# 组合分类器,将多个分类器组合起来形成一个模型
class EnsembleClassifiers:
def __init__(self, clf_list):
self.clf_list = clf_list
self.n_classifiers = len(clf_list)
self.trained_classifiers = [None] * self.n_classifiers
self.trained_ids = []
def fit(self, X, y, clf_id):
clf = self.clf_list[clf_id]
clf.fit(X, y)
self.trained_classifiers[clf_id] = clf
self.trained_ids += [clf_id]
def predict(self, X):
n_trained = len(self.trained_classifiers)
pred_list = np.zeros((X.shape[0], n_trained))
for i in self.trained_ids:
clf = self.trained_classifiers[i]
y_pred = clf.predict_proba(X)[:, 1]
pred_list[:, i] = y_pred
return np.mean(pred_list, axis=1)
# 该函数用于读取数据
def read_dataset(directory, dataset_name):
if dataset_name in ["Ant", "Camel"]:
X = pd.read_csv(directory + dataset_name + '.csv')
y = X['bug']
del X['bug']
elif dataset_name in ["KC3", "PC2", "PC4", "MC1"]:
data, meta = arff.loadarff(directory + dataset_name + '.arff')
X = pd.DataFrame(data)
y = X['Defective']
y = mapit(y)
del X['Defective']
else:
print("dataset %s does not exist" % dataset_name)
return np.array(X), np.array(y)
# 该函数服务于数据读取
def mapit(vector):
s = np.unique(vector)
mapping = pd.Series([x[0] for x in enumerate(s)], index=s)
vector = vector.map(mapping)
return vector
# 该函数用于对各特征计算分数(使用一定的数学公式)并排序
def rank_features(X, y, corr='fisher'):
# 这是fisher相关的函数,pearson的分数计算已经由其他库实现了
correlation_functions = {
'fisher': fisher_crit,
'mutual_info': mutual_info_score,
'info_gain': information_gain
}
# 存储最终选取出的特征
results = []
# 数据集共有多少个特征
n_features = X.shape[1]
# 首先计算各个特征的分数
if corr in ['pearson']:
for feature in range(n_features):
results.append((feature, abs(pearsonr(X[:, feature], y)[0])))
elif corr in ["fisher"]:
for feature in range(n_features):
results.append((feature, correlation_functions[corr](X[:, feature], y)))
# 进行排序
results = sorted(results, key=lambda a: -a[1])
rank_list = [f[0] for f in results]
scores = [f[1] for f in results]
return rank_list, scores
# 计算fisher的分数
def fisher_crit(v1, v2):
if issparse(v1):
v1 = v1.todense()
return abs(np.mean(v1) - np.mean(v2)) / (np.var(v1) + np.var(v2))
# 计算信息增益
def information_gain(v1, v2):
if issparse(v1):
v1 = v1.todense()
return abs(np.mean(v1) - np.mean(v2)) / (np.var(v1) + np.var(v2))
# 使用clf指定的模型在特定的特征上训练和测试,获取模型的表现分值曲线
def compute_feature_curve(clf, X, y, ft_ranks, step_size=1, score_name="AUC"):
# 选定的特征,是特征集(这也是一个列表)的列表
selected_features = []
# 在这些特征集上的表现分数(作为最终的y轴)
scores = []
# 数据集共有多少特征
n_features = X.shape[1]
# 使用AUC的方式评估模型表现
if score_name == "AUC":
score_function = 'roc_auc'
# 使用GMeans来评估模型表现
elif score_name == "GMeans":
score_function = make_scorer(g_mean_metric)
for ft_list in range(step_size, n_features + 1, step_size):
# 这里采用交叉验证的方式来给模型打分(这个函数后期再来看看)
score = np.mean(cross_val_score(clf, X[:, ft_ranks[:ft_list]], y, cv=10, scoring=score_function))
selected_features += [ft_list]
scores += [score]
print('%s score: %.3f with %s features...' % (score_name, score, ft_list))
print('Best score achieved : %.3f \n' % np.amax(scores))
return scores, selected_features
# 该函数服务于计算表现分值曲线,具体是计算GMeans准则下的分值(就是数学公式)
def g_mean_metric(y_true, y_pred):
y_pred = np.array([1 if x >= 0.5 else 0 for x in y_pred])
recall = recall_score(y_true, y_pred)
i = np.where(y_pred == 0)[0]
i2 = np.where(y_true == 0)[0]
tn = float(np.intersect1d(i, i2).size)
i = np.where(y_pred == 1)[0]
i2 = np.where(y_true == 0)[0]
fp = float(np.intersect1d(i, i2).size)
specificity = (tn / (tn + fp))
mult = recall * specificity
return np.sqrt(mult)
# 贪婪前向选择并获取贪婪选择后的特征集上SVM的表现
def greedy_selection(clf, X, y, score_name="AUC"):
n_features = X.shape[1]
global_max = 0.0
selected_features = []
if score_name == "AUC":
score_function = 'roc_auc'
elif score_name == "GMeans":
score_function = make_scorer(g_mean_metric)
scores = []
for i in range(n_features):
maximum = 0.0
for j in range(n_features):
if j in selected_features:
continue
score = np.mean(cross_val_score(clf, X[:, selected_features + [j]], y, cv=10, scoring=score_function))
if score > maximum:
maximum = score
best_feature = j
scores += [maximum]
selected_features += [best_feature]
print('%s score: %.3f with features: %s ...' % (score_name, score, selected_features))
if maximum > global_max:
global_max = maximum
return scores, np.arange(len(selected_features)) + 1
# 计算ape的特征曲线
def compute_ape_feature_curve(classifiers, n_classifiers, X, y, ft_ranks, step_size=1, score_name="AUC"):
# 选定的特征,是特征集(这也是一个列表)的列表
selected_features = []
# 在这些特征集上的表现分数(作为最终的y轴)
scores = []
# 数据集共有多少特征
n_features = X.shape[1]
# 使用AUC的方式评估模型表现
# if score_name == "AUC":
# score_function = 'roc_auc'
# # 使用GMeans来评估模型表现
# elif score_name == "GMeans":
# score_function = make_scorer(g_mean_metric)
for ft_list in range(step_size, n_features + 1, step_size):
# 当前的特征集
feature_set = X[:, ft_ranks[:ft_list]]
# 在指定的特征集上对ape进行训练,并获取得分(AUC/GMean)
score = ape_train(classifiers, n_classifiers, feature_set, y)
selected_features += [ft_list]
scores += [score]
print('%s score: %.3f with %s features...' % (score_name, scores[ft_list - 1], ft_list))
print('Best score achieved : %.3f \n' % np.amax(scores))
return scores, selected_features
# 贪婪前向选择并获取贪婪选择后的特征集上APE的表现
def compute_ape_feature_curve_with_greedy(classifiers, n_classifiers, X, y, score_name="AUC"):
# 这是总特征数
n_features = X.shape[1]
# 这是ape在所有特征集上的全局最优得分
global_max = 0.0
# 这是当前选取的特征组成的特征集
selected_features = []
# 因为ape进行训练的函数中已经指定了使用auc,所以这里暂时注去
# if score_name == "AUC":
# score_function = 'roc_auc'
#
# elif score_name == "GMeans":
# score_function = make_scorer(g_mean_metric)
# 这是在所有特征集上的auc只组成的列表
scores = []
for i in range(n_features):
maximum = 0.0
for j in range(n_features):
if j in selected_features:
continue
feature_set = X[:, selected_features + [j]]
score = ape_train(classifiers, n_classifiers, feature_set, y)
# 这里的代码保证了选取的特征是当前最优
if score > maximum:
maximum = score
best_feature = j
scores += [maximum]
selected_features += [best_feature]
print('%s score: %.3f with features: %s ...' % (score_name, score, selected_features))
if maximum > global_max:
global_max = maximum
print('Best score achieved : %.3f \n' % global_max)
return scores, np.arange(len(selected_features)) + 1
# 在指定的特征集上对ape进行训练
def ape_train(classifiers, n_classifiers, feature_set, y):
# 创建一个ape
clf_list = EnsembleClassifiers(classifiers)
# 采用10折交叉验证
skf = model_selection.StratifiedKFold(n_splits=10)
# 这存放ape中每个基分类器的得分
auc_scores = np.zeros(n_classifiers)
for i in range(n_classifiers):
# 交叉验证
scores = []
for train_index, test_index in skf.split(feature_set, y):
X_train, X_test = feature_set[train_index], feature_set[test_index]
y_train, y_test = y[train_index], y[test_index]
clf_list.fit(X_train, y_train, i)
y_pred = clf_list.predict(X_test)
scores += [metrics.roc_auc_score(y_test, y_pred)]
auc_scores[i] = np.mean(scores)
print("Score: %.3f, n_classifiers: %d" % (auc_scores[i], i + 1))
# 返回该ape模型在指定特征集上的分数
return auc_scores[n_classifiers - 1]
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。