diff --git a/assignment-3/submission/18307130003/README.md b/assignment-3/submission/18307130003/README.md index 1e2e90a478b7d70794b748bd2676b38172a3004d..7681ef351e62b20db0e897276cca53440388d181 100644 --- a/assignment-3/submission/18307130003/README.md +++ b/assignment-3/submission/18307130003/README.md @@ -111,7 +111,7 @@ $$ 我们利用以下函数生成数据集: ```python {.line-numbers} -# tester.py +# source.py class TestSuite: ''' @@ -896,7 +896,7 @@ size = 500 执行以下指令进行模型的训练与预测。 ```bash -python ./tester.py +python ./source.py ``` 生成数据集使用的参数等可以在 `TestSuite` 类中对应调整。 diff --git a/assignment-3/submission/18307130003/source.py b/assignment-3/submission/18307130003/source.py index 1f752604c63cda3c2bf4f6483638de95b65dbe5a..8cb42afa5cbcc6d59d6d2afbcb512bb9bb9c069b 100644 --- a/assignment-3/submission/18307130003/source.py +++ b/assignment-3/submission/18307130003/source.py @@ -1,15 +1,106 @@ +from typing import Any, Callable, List, NamedTuple, Tuple, Type +import matplotlib.pyplot as plt from abc import ABC, abstractmethod import math import numpy as np -from utils import ( - UniformParameters, - assert_, - distance, - multinormal_pdf, - normal_pdf, -) +# Utilities + +class NormalParameters(NamedTuple): + ''' + Attributes: + `size`: the number of data points in the dataset + `mean`: the mean of the distribution + `cov`: the coefficient of variation of the distribution (dimension > 1) + `scale`: the standard deviation of the distribution (dimension = 1) + ''' + + size: int + mean: Tuple[float, ...] + cov: List[List[float]] = None + scale: float = None + + +class UniformParameters(NamedTuple): + ''' + Attributes: + `size`: the number of data points in the dataset + `intervals`: the range of each dimension, Tuple[shape(N), shape(N)] + ''' + + size: int + intervals: Tuple[np.ndarray, np.ndarray] + + +def distance(point_1: np.ndarray, point_2: np.ndarray) -> float: + ''' + Args: + `point_1`: shape(d) + `point_2`: shape(d) + + Return: + The Euclidean distance between two points. + ''' + + return np.linalg.norm(point_1 - point_2) + + +def multinormal_pdf(x: np.ndarray, mean: np.ndarray, cov: np.ndarray) -> float: + ''' + The probability density function of a multivariate Gaussian distribution + with given parameters. + + Args: + `x`: an observation, shape(d) + `mean`: the mean of the distribution, shape(d) + `cov`: the coefficient of variation of the distribution, shape(d, d) + + Return: + f(x | mean, cov) + ''' + + cov_det: float = np.linalg.det(cov) + dim: int = mean.shape[0] + const: float = (((2 * math.pi) ** dim) * cov_det) ** (-1/2) + x_m: np.ndarray = x - mean + exp: float = -np.dot(x_m, np.linalg.solve(cov, x_m)) / 2 + return const * math.exp(exp) + + +def normal_pdf(x: float, mean: float, scale: float) -> float: + ''' + The probability density function of a Gaussian distribution with given + parameters. + + Args: + `x`: an observation + `mean`: the mean of the distribution + `scale`: the standard deviation of the distribution + + Return: + f(x | mean, scale) + ''' + + const = (2 * math.pi) ** (-1/2) / scale + exp = -((x - mean) / scale) ** 2 / 2 + return const * math.exp(exp) + + +def assert_(var_name: str, got: Any, expected: Any) -> None: + ''' + Args: + `var_name`: variable name for logging + `got`: actual value + `expected`: expected value + ''' + + message = f'Assertion failed for {var_name}: expected {expected}, got {got}' + assert got == expected, message + + +# Models + class Model(ABC): ''' The abstract class (ABC) of a model. @@ -150,9 +241,9 @@ class KMeans(Model): class GaussianMixture(Model): ''' - Gaussian Mixture Model (GMM) is a probabilistic model that assumes there + Gaussian Mixture Model (GaussianMixture) is a probabilistic model that assumes there are a certain number of Gaussian distributions, and each of these - distributions represent a cluster. Hence, a GMM tends to group the data + distributions represent a cluster. Hence, a GaussianMixture tends to group the data points belonging to a single distribution together. ''' @@ -471,3 +562,341 @@ class ClusteringAlgorithm(Model): def predict(self, test_data: np.ndarray) -> np.ndarray: return self.best_model.predict(test_data) + + +# Tester + +class TestSuite: + ''' + Multiple testing data for models. + ''' + + def __init__(self) -> None: + self.rng: np.random.Generator = np.random.default_rng() + + def generate_normal(self, param: NormalParameters) -> np.ndarray: + ''' + Generate a dataset from a Gaussian distribution with given parameters. + + Args: + `param`: parameters used to generate a dataset + + Return: + shape(N, d) + ''' + + size, mean, cov, scale = param + if len(mean) > 1: + return self.rng.multivariate_normal(mean, cov, size) + else: + return self.rng.normal(mean[0], scale, size) + + def combine(self, *datasets: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + ''' + Combine several datasets into a single dataset. + + Args: + `*datasets`: a tuple of datasets needed to combine + + Return: + `dataset`: shape(N, d), where N is the total size of all datasets + `labels`: shape(N), the labels for all points in the dataset + ''' + + dataset: np.ndarray = np.concatenate(datasets) + labels: np.ndarray = np.concatenate([ + np.ones(d.shape[0], dtype=int) * i + for (i, d) in enumerate(datasets) + ]) + indices = np.arange(dataset.shape[0]) + np.random.shuffle(indices) + dataset = dataset[indices] + labels = labels[indices] + return dataset, labels + + def generate_data(self, *params: NormalParameters) -> Tuple[np.ndarray, int]: + ''' + Generate a dataset for tests. + + Args: + `params`: a tuple of parameters to generate datasets + + Return: + `dataset`: shape(N, d) + `n_clusters`: the number of clusters to partition into + ''' + + dataset, _labels = self.combine(*tuple( + self.generate_normal(p) for p in params + )) + n_clusters: int = len(params) + return dataset, n_clusters + + def train( + self, train_data: np.ndarray, model: Model + ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + ''' + Train a model with training data. + + Args: + `train_data`: shape(N, d) + `model`: the model that we need to train + `n_clusters`: the number of clusters to partition into + + Return: + `train_labels`: the predicted labels of training data, shape(N) + `centroids`: the centroids calculated from training data, shape(k, d) + `gaps`: the gap statistics of each k, shape(k + 1) + ''' + + model.fit(train_data) + train_labels = model.predict(train_data) + centroids: np.ndarray = None + gaps: np.ndarray = None + + if isinstance(model, KMeans): + centroids = model.centroids + elif isinstance(model, GaussianMixture): + centroids = model.means + elif isinstance(model, ClusteringAlgorithm): + centroids = model.best_model.centroids + gaps = model.gaps + return train_labels, centroids, gaps + + def evaluate(self, test_data: np.ndarray, model: Model) -> Tuple[np.ndarray]: + ''' + Evaluate a model with testing data. + + Args: + `test_data`: shape(N, d) + `model`: the model that we need to evaluate + + Return: + `test_labels`: The predicted labels of testing data, shape(N) + ''' + + test_labels = model.predict(test_data) + return test_labels + + def test_data_1(self) -> Tuple[np.ndarray, int]: + + return self.generate_data( + NormalParameters( + size=800, + mean=(1, 2), + cov=[[73, 0], [0, 22]], + ), + NormalParameters( + size=200, + mean=(16, -5), + cov=[[21.2, 0], [0, 32.1]], + ), + NormalParameters( + size=1000, + mean=(10, 22), + cov=[[10, 5], [5, 10]], + ), + ) + + def test_data_2(self) -> Tuple[np.ndarray, int]: + + return self.generate_data( + NormalParameters( + size=800, + mean=(1, 0), + cov=[[73, 0], [0, 22]], + ), + NormalParameters( + size=400, + mean=(20, 15), + cov=[[21.2, 0], [0, 32.1]], + ), + NormalParameters( + size=1000, + mean=(10, -22), + cov=[[10, 5], [5, 10]], + ), + NormalParameters( + size=500, + mean=(-12, -6), + cov=[[7, 3], [3, 16]], + ), + NormalParameters( + size=600, + mean=(-15, 17), + cov=[[15, 0], [0, 12]], + ), + ) + + def test_data_3(self) -> Tuple[np.ndarray, int]: + + return self.generate_data( + NormalParameters( + size=800, + mean=(-6, 3, 5), + cov=[[73, 0, 0], [0, 50, 0], [0, 0, 22]], + ), + NormalParameters( + size=500, + mean=(12, 0, -10), + cov=[[20, 5, 0], [5, 20, 0], [0, 0, 20]], + ), + NormalParameters( + size=800, + mean=(10, -20, 0), + cov=[[10, 1, 3], [1, 10, 0], [3, 0, 10]], + ), + ) + + def test_data_4(self) -> Tuple[np.ndarray, int]: + + return self.generate_data( + NormalParameters( + size=100, + mean=(-20,), + scale=2, + ), + NormalParameters( + size=150, + mean=(0,), + scale=1, + ), + NormalParameters( + size=100, + mean=(15,), + scale=2, + ), + ) + + def test_data_5(self) -> Tuple[np.ndarray, int]: + + return self.generate_data( + NormalParameters( + size=800, + mean=(0, -5), + cov=[[73, 0], [0, 2]], + ), + NormalParameters( + size=500, + mean=(-3, 0), + cov=[[100, 0], [0, 2]], + ), + NormalParameters( + size=500, + mean=(2, 5), + cov=[[70, 1], [1, 3]], + ), + ) + + def run(self) -> None: + ''' + Run all the tests. + ''' + + testcases: List[Tuple[ + str, Callable[[], Tuple[np.ndarray, int]], Type[Model], int + ]] = [ + ('k-means_1', self.test_data_1, KMeans, 0), + # ('k-means_2', self.test_data_2, KMeans, 0), + # ('k-means_3', self.test_data_3, KMeans, 0), + # ('k-means_4', self.test_data_4, KMeans, 0), + # ('k-means_5', self.test_data_5, KMeans, 0), + # ('GaussianMixture_1', self.test_data_1, GaussianMixture, 0), + # ('GaussianMixture_2', self.test_data_2, GaussianMixture, 0), + # ('GaussianMixture_3', self.test_data_3, GaussianMixture, 0), + # ('GaussianMixture_4', self.test_data_4, GaussianMixture, 0), + # ('GaussianMixture_5', self.test_data_5, GaussianMixture, 0), + # ('auto-k-means_1', self.test_data_1, ClusteringAlgorithm, 10), + # ('auto-k-means_2', self.test_data_2, ClusteringAlgorithm, 10), + # ('auto-k-means_3', self.test_data_3, ClusteringAlgorithm, 10), + # ('auto-k-means_4', self.test_data_4, ClusteringAlgorithm, 10), + # ('auto-k-means_5', self.test_data_5, ClusteringAlgorithm, 10), + ] + + for testcase in testcases: + name, get_dataset, model_class, n_clusters = testcase + + # Obtain training data and testing data + dataset, real_n_clusters = get_dataset() + train_size: int = math.floor(dataset.shape[0] * 0.8) + train_data: np.ndarray = dataset[:train_size] + test_data: np.ndarray = dataset[train_size:] + + # Train the model with training data + model = model_class(n_clusters or real_n_clusters) + train_labels, centroids, gaps = self.train(train_data, model) + + # Evaluate the model with testing data + test_labels = self.evaluate(test_data, model) + + # Visualize the datasets with labels + visualize(name + '_train', train_data, train_labels, centroids) + visualize(name + '_test', test_data, test_labels, centroids) + + # Visualize the gap statistics for + if gaps is not None: + visualize_gaps(name + '_gaps', gaps) + + print(f'{name}: Done.') + + +def visualize( + name: str, + dataset: np.ndarray, + labels: np.ndarray, + centroids: np.ndarray = None, +) -> None: + ''' + Visualize a dataset with labels. + + Args: + `name`: the output filename when saving the figure + `dataset`: shape(N, d) + `labels`: shape(N) + `centroids`: shape(k, d) + ''' + + assert_('dataset.shape[0]', dataset.shape[0], labels.shape[0]) + + # Plot the data points and the centroids. + if len(dataset.shape) > 1: + plt.scatter(dataset[:, 0], dataset[:, 1], c=labels, s=30) + if centroids is not None: + plt.scatter( + centroids[:, 0], centroids[:, 1], c='black', s=100, alpha=0.5, + ) + else: + plt.scatter(dataset, np.zeros(dataset.shape[0]), c=labels, s=30) + if centroids is not None: + plt.yticks([]) + plt.scatter( + centroids, np.zeros(centroids.shape[0]), + c='black', s=100, alpha=0.5, + ) + + # Save the figure to a local file. + plt.savefig(f'img/{name}') + plt.clf() + + +def visualize_gaps(name: str, gaps: np.ndarray) -> None: + ''' + Visualize the gap statistics. + + Args: + `name`: the output filename when saving the figure + `gaps`: the gap statistics of each k, shape(k + 1) + ''' + + # Plot the gap statistics. + indices = np.arange(1, gaps.shape[0], dtype=int) + plt.xticks(indices) + plt.plot(indices, gaps[indices], '-bo') + + # Save the figure to a local file. + plt.savefig(f'img/{name}') + plt.clf() + + +if __name__ == '__main__': + TestSuite().run()