K-Means 算法原理链接.
使用时,实例化类后,只需关注 fit(), predict(),传入数据类型为np.array,形状为 N x M。
class MyKMeans:
labels_ = [] # fit 后每类数据的标签
cluster_centers_ = None # N x M, 聚类中心个数
__cluster_centers_dict = dict()
def __init__(self, n_clusters=3, n_init=3, max_iter=300):
"""
:param n_clusters: 聚类中心个数
:param n_init: 随机初始化几次聚类中心, 结果从最优中选择
:param max_iter: 迭代次数
"""
self.n_clusters = n_clusters
self.n_init = n_init
self.max_iter = max_iter
for i in range(n_init):
self.__cluster_centers_dict[str(i)] = None
# 拟合
def fit(self, X):
# X.shape : N x M
for i in tqdm(range(self.n_init)):
self.__init_cluster_centers(X)
sum_dis = 0
for j in range(self.max_iter):
sum_dis, _ = self.__distance(X)
# print(_.shape)
self.__change_cluster_centers(X)
self.__cluster_centers_dict[str(i)] = [sum_dis, self.cluster_centers_, self.labels_]
# print(list(self.__cluster_centers_dict.items())[0][1][0])
self.cluster_centers_, self.labels_ = \
sorted(list(self.__cluster_centers_dict.items()), key=lambda item: item[1][0])[0][1][1:]
# 预测
def predict(self, x):
# x.shape : N x M
_, dis = self.__distance(x, is_fit=False)
# print(dis.shape)
return np.argmin(dis, axis=1)
# 算距离
def __distance(self, X, is_fit=True):
distances = np.sum(np.power(X - self.cluster_centers_[:, np.newaxis, ...], 2), axis=2).T
if is_fit:
self.labels_ = np.argmin(distances, axis=1)
return np.sum(distances), distances
# 修改聚类中心
def __change_cluster_centers(self, X):
new_cc = np.zeros(self.cluster_centers_.shape)
sum_l = np.zeros(self.cluster_centers_.shape[0])
for i in range(len(self.labels_)):
# print(new_cc.shape, X.shape, new_cc[self.labels_[i]].shape, X[i].shape, new_cc[self.labels_[i]], X[i])
new_cc[self.labels_[i]] = new_cc[self.labels_[i]] + X[i]
sum_l[self.labels_[i]] += 1
sum_l = sum_l[:, np.newaxis]
sum_l = np.repeat(sum_l, repeats=new_cc.shape[1], axis=1)
self.cluster_centers_ = new_cc / sum_l
# 随机初始化聚类中心
def __init_cluster_centers(self, X):
indexs = random.sample(range(0, X.shape[0]), self.n_clusters)
self.cluster_centers_ = np.array([X[i] for i in indexs])