Source code for pyts.classification.classification

"""The :mod:`pyts.classification` module includes classification algorithms.

Implemented algorithms are:
- k nearest neighbors
- SAX-VSM
- Bag-of-SFA in Vector Space
"""

from __future__ import division
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
from builtins import range
from future import standard_library
import numpy as np
from sklearn.utils.validation import check_array, check_X_y, check_is_fitted
from sklearn.utils.multiclass import check_classification_targets
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import LabelEncoder
from sklearn.neighbors import KNeighborsClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from ..bow import BOW
from ..quantization import SAX, SFA
from ..utils import dtw, fast_dtw, numerosity_reduction


standard_library.install_aliases()


[docs]class KNNClassifier(BaseEstimator, ClassifierMixin): """k nearest neighbors classifier. Parameters ---------- n_neighbors : int, optional (default = 1) Number of neighbors to use. weights : str or callable, optional (default = 'uniform') weight function used in prediction. Possible values: - 'uniform' : uniform weights. All points in each neighborhood are weighted equally. - 'distance' : weight points by the inverse of their distance. in this case, closer neighbors of a query point will have a greater influence than neighbors which are further away. - [callable] : a user-defined function which accepts an array of distances, and returns an array of the same shape containing the weights. algorithm : {'auto', 'ball_tree', 'kd_tree', 'brute'}, optional Algorithm used to compute the nearest neighbors. Note: fitting on sparse input will override the setting of this parameter, using brute force. leaf_size : int, optional (default = 30) Leaf size passed to BallTree or KDTree. This can affect the speed of the construction and query, as well as the memory required to store the tree. The optimal value depends on the nature of the problem. metric : string or DistanceMetric object (default = 'minkowski') the distance metric to use for the tree. The default metric is minkowski, and with p=2 is equivalent to the standard Euclidean metric. See the documentation of the DistanceMetric class for a list of available metrics. 'dtw' and 'fast_dtw' are also available. p : integer, optional (default = 2) Power parameter for the Minkowski metric. When p = 1, this is equivalent to using manhattan_distance (l1), and euclidean_distance (l2) for p = 2. For arbitrary p, minkowski_distance (l_p) is used. metric_params : dict, optional (default = None) Additional keyword arguments for the metric function. n_jobs : int, optional (default = 1) The number of parallel jobs to run for neighbors search. If ``n_jobs=-1``, then the number of jobs is set to the number of CPU cores. Doesn't affect :meth:`fit` method. """ def __init__(self, n_neighbors=1, weights='uniform', algorithm='auto', leaf_size=30, p=2, metric='minkowski', metric_params=None, n_jobs=1, **kwargs): self.n_neighbors = n_neighbors self.weights = weights self.algorithm = algorithm self.leaf_size = leaf_size self.p = p self.metric = metric self.metric_params = metric_params self.n_jobs = n_jobs self.kwargs = kwargs
[docs] def fit(self, X, y): """Fit the model according to the given training data. Parameters ---------- X : array-like, shape = [n_samples, n_features] Training vector, where n_samples is the number of samples and n_features is the number of features. y : array-like, shape = [n_samples] Class labels for each data sample. Returns ------- self : object Returns self. """ X, y = check_X_y(X, y) if self.metric == 'dtw': self._clf = KNeighborsClassifier(self.n_neighbors, self.weights, self.algorithm, self.leaf_size, self.p, dtw, self.metric_params, self.n_jobs, **self.kwargs) elif self.metric == 'fast_dtw': self._clf = KNeighborsClassifier(self.n_neighbors, self.weights, self.algorithm, self.leaf_size, self.p, fast_dtw, self.metric_params, self.n_jobs, **self.kwargs) else: self._clf = KNeighborsClassifier(self.n_neighbors, self.weights, self.algorithm, self.leaf_size, self.p, self.metric, self.metric_params, self.n_jobs, **self.kwargs) self._clf.fit(X, y) return self
[docs] def predict(self, X): """Predict the class labels for the provided data. Parameters ---------- X : array-like, shape = [n_samples, n_features] Returns ------- y : array-like, shape [n_samples] Class labels for each data sample. """ check_is_fitted(self, '_clf') X = check_array(X) return self._clf.predict(X)
[docs]class SAXVSMClassifier(BaseEstimator, ClassifierMixin): """Classifier based on SAX-VSM representation and tf-idf statistics. Parameters ---------- n_bins : int (default = 4) Number of bins (also known as the size of the alphabet). quantiles : {'gaussian', 'empirical'} (default = 'empirical') The way to compute quantiles. If 'gaussian', quantiles from a gaussian distribution N(0,1) are used. If 'empirical', empirical quantiles are used. window_size : int (default = 4) Size of the window (i.e. the size of each word). numerosity_reduction : bool (default = True) If True, delete all but one occurence of back to back identical occurences of the same words. use_idf : bool (default = True) Enable inverse-document-frequency reweighting. smooth_idf : bool (default = True) Smooth idf weights by adding one to document frequencies, as if an extra document was seen containing every term in the collection exactly once. Prevents zero divisions. sublinear_tf : bool (default = False) Apply sublinear tf scaling, i.e. replace tf with 1 + log(tf). Attributes ---------- vocabulary_ : dict A mapping of feature indices to terms. tfidf_ : sparse matrix, shape = [n_classes, n_words] Term-document matrix idf_ : array, shape = [n_features], or None The learned idf vector (global term weights) when ``use_idf=True``, None otherwise. stop_words_ : set Terms that were ignored because they either: - occurred in too many documents (`max_df`) - occurred in too few documents (`min_df`) - were cut off by feature selection (`max_features`). This is only available if no vocabulary was given. """ def __init__(self, n_bins=4, quantiles='empirical', window_size=4, numerosity_reduction=True, use_idf=True, smooth_idf=True, sublinear_tf=False): self.n_bins = n_bins self.quantiles = quantiles self.window_size = window_size self.numerosity_reduction = numerosity_reduction self.use_idf = use_idf self.smooth_idf = smooth_idf self.sublinear_tf = sublinear_tf
[docs] def fit(self, X, y): """Fit the model according to the given training data. Parameters ---------- X : array-like, shape = [n_samples] Training vector, where n_samples is the number of samples. y : array-like, shape = [n_samples] Class labels for each data sample. Returns ------- self : object Returns self. """ # Check parameters if not isinstance(self.n_bins, int): raise TypeError("'n_bins' must be an integer.") if self.n_bins < 2: raise ValueError("'n_bins' must be greater than or equal to 2.") if self.n_bins > 26: raise ValueError("'n_bins' must be lower than or equal to 26.") if self.quantiles not in ['gaussian', 'empirical']: raise ValueError("'quantiles' must be either 'gaussian' or " "'empirical'.") if not isinstance(self.use_idf, (int, float)): raise TypeError("'use_idf' must be a boolean.") if not isinstance(self.smooth_idf, (int, float)): raise TypeError("'smooth_idf' must be a boolean.") if not isinstance(self.sublinear_tf, (int, float)): raise TypeError("'sublinear_tf' must be a boolean.") X, y = check_X_y(X, y) check_classification_targets(y) le = LabelEncoder() y_ind = le.fit_transform(y) self._classes = le.classes_ n_classes = self._classes.size # SAX and BOW transformations sax = SAX(self.n_bins, self.quantiles) X_sax = sax.fit_transform(X) bow = BOW(self.window_size, self.numerosity_reduction) X_bow = bow.fit_transform(X_sax) X_class = [' '.join(X_bow[y_ind == classe]) for classe in range(n_classes)] tfidf = TfidfVectorizer(norm=None, use_idf=self.use_idf, smooth_idf=self.smooth_idf, sublinear_tf=self.sublinear_tf) if self.window_size == 1: tfidf.set_params(tokenizer=self._tok) self.tfidf_ = tfidf.fit_transform(X_class) self.vocabulary_ = {value: key for key, value in tfidf.vocabulary_.items()} self.stop_words_ = tfidf.stop_words if self.use_idf: self.idf_ = tfidf.idf_ else: self.idf_ = None self._tfidf = tfidf self._sax = sax self._bow = bow return self
[docs] def predict(self, X): """Predict the class labels for the provided data. Parameters ---------- X : array-like, shape = [n_samples, n_features] Returns ------- y : array-like, shape [n_samples] Class labels for each data sample. """ check_is_fitted(self, ['vocabulary_', 'tfidf_', 'idf_', 'stop_words_', '_tfidf']) # SAX and BOW transformations X_sax = self._sax.transform(X) X_bow = self._bow.transform(X_sax) X_transformed = self._tfidf.transform(X_bow) if self.use_idf: X_transformed /= self._tfidf.idf_ y_pred = cosine_similarity(X_transformed, self.tfidf_).argmax(axis=1) return self._classes[y_pred]
def _tok(self, x): return x.split(' ')
[docs]class BOSSVSClassifier(BaseEstimator, ClassifierMixin): """Bag-of-SFA Symbols in Vector Space. Parameters ---------- n_coefs : None or int (default = None) The number of Fourier coefficients to keep. If ``n_coefs=None``, all Fourier coefficients are returned. If ``n_coefs`` is an integer, the ``n_coefs`` most significant Fourier coefficients are returned if ``anova=True``, otherwise the first ``n_coefs`` Fourier coefficients are returned. A even number is required (for real and imaginary values) if ``anova=False``. window_size : int Window length to use to extracte sub time series. norm_mean : bool (default = True) If True, center the data before scaling. If ``norm_mean=True`` and ``anova=False``, the first Fourier coefficient will be dropped. norm_std : bool (default = True) If True, scale the data to unit variance. n_bins : int (default = 4) The number of bins. Ignored if ``quantiles='entropy'``. quantiles : {'gaussian', 'empirical'} (default = 'gaussian') The way to compute quantiles. If 'gaussian', quantiles from a gaussian distribution N(0,1) are used. If 'empirical', empirical quantiles are used. variance_selection : bool (default = False) If True, the Fourier coefficients with low variance are removed. variance_threshold : float (default = 0.) Fourier coefficients with a training-set variance lower than this threshold will be removed. Ignored if ``variance_selection=False``. numerosity_reduction : boolean (default = True) whether or not numerosity reduction is applied. When the same word occurs several times in a row, only one instance of this word is kept if ``numerosity_reduction=True``, otherwise all instances are kept. smooth_idf : boolean, default=True smooth idf weights by adding one to document frequencies, as if an extra document was seen containing every term in the collection exactly once. Prevents zero divisions. sublinear_tf : boolean, default=False apply sublinear tf scaling, i.e. replace tf with 1 + log(tf). Attributes ---------- vocabulary_ : dict A mapping of features indices to terms. """ def __init__(self, n_coefs, window_size, norm_mean=True, norm_std=True, n_bins=4, quantiles='empirical', variance_selection=False, variance_threshold=0., numerosity_reduction=True, smooth_idf=True, sublinear_tf=True): self.n_coefs = n_coefs self.window_size = window_size self.norm_mean = norm_mean self.norm_std = norm_std self.n_bins = n_bins self.quantiles = quantiles self.variance_selection = variance_selection self.variance_threshold = variance_threshold self.numerosity_reduction = numerosity_reduction self.smooth_idf = smooth_idf self.sublinear_tf = sublinear_tf
[docs] def fit(self, X, y, overlapping=True): """Fit the model according to the given training data. Parameters ---------- X : array-like, shape = [n_samples, n_features] Training vector, where n_samples in the number of samples and n_features is the number of features. y : array-like, shape = [n_samples] Class labels for each data sample. overlapping : bool (default = False) If True, overlapping windows are used for the training phase. Returns ------- self : object """ # Check input data X, y = check_X_y(X, y) check_classification_targets(y) le = LabelEncoder() y_ind = le.fit_transform(y) self._classes = le.classes_ n_classes = self._classes.size n_samples, n_features = X.shape # Check parameters if (not isinstance(self.n_coefs, int)) and (self.n_coefs is not None): raise TypeError("'n_coefs' must be None or an integer.") if isinstance(self.n_coefs, int) and self.n_coefs < 2: raise ValueError("'n_coefs' must be greater than or equal to 2.") if isinstance(self.n_coefs, int) and self.n_coefs % 2 != 0: raise ValueError("'n_coefs' must be an even integer.") if not isinstance(self.window_size, int): raise TypeError("'window_size' must be an integer.") if self.window_size > n_features: raise ValueError("'window_size' must be lower than or equal to " "the size of each time series.") if isinstance(self.n_coefs, int) and self.n_coefs > self.window_size: raise TypeError("'n_coefs' must be lower than or equal to " "'window_size'.") if not isinstance(self.norm_mean, (int, float)): raise TypeError("'norm_mean' must be a boolean.") if not isinstance(self.norm_std, (int, float)): raise TypeError("'norm_std' must be a boolean.") if not isinstance(self.n_bins, int): raise TypeError("'n_bins' must be an integer.") if self.n_bins < 2: raise ValueError("'n_bins' must be greater than or equal to 2.") if self.quantiles not in ['empirical', 'gaussian']: raise ValueError("'quantiles' must be either 'gaussian' or " "'empirical'.") if not isinstance(self.variance_selection, (int, float)): raise TypeError("'variance_selection' must be a boolean.") if not isinstance(self.variance_threshold, (int, float)): raise TypeError("'variance_threshold' must be a float.") if not isinstance(self.numerosity_reduction, (int, float)): raise TypeError("'numerosity_reduction' must be a boolean.") if not isinstance(self.smooth_idf, (int, float)): raise TypeError("'smooth_idf' must be a boolean.") if not isinstance(self.sublinear_tf, (int, float)): raise TypeError("'sublinear_tf' must be a boolean.") if not isinstance(overlapping, (int, float)): raise TypeError("'overlapping' must be a boolean.") self.vocabulary_ = {} if overlapping: n_windows = n_features - self.window_size + 1 X_window = np.asarray([X[:, i: i + self.window_size] for i in range(n_windows)]) X_window = X_window.reshape(n_samples * n_windows, -1, order='F') else: n_windows = n_features // self.window_size remainder = n_features % self. window_size if remainder == 0: window_idx = np.array_split(np.arange(0, n_features), n_windows) else: split_idx = np.arange(self.window_size, n_windows * (self.window_size + 1), self.window_size) window_idx = np.split(np.arange(0, n_features), split_idx)[:-1] X_window = X[:, window_idx].reshape(n_samples * n_windows, -1) sfa = SFA(self.n_coefs, False, self.norm_mean, self.norm_std, self.n_bins, self.quantiles, self.variance_selection, self.variance_threshold) tfidf = TfidfVectorizer(ngram_range=(1, 1), smooth_idf=self.smooth_idf, sublinear_tf=self.sublinear_tf) X_sfa = sfa.fit_transform(X_window) X_sfa = np.apply_along_axis(lambda x: ''.join(x), 1, X_sfa).reshape(n_samples, -1) word_size = len(X_sfa[0, 0]) if word_size == 1: tfidf.set_params(tokenizer=self._tok) if self.numerosity_reduction: X_sfa = np.apply_along_axis(numerosity_reduction, 1, X_sfa) else: X_sfa = np.apply_along_axis(lambda x: ' '.join(x), 1, X_sfa) X_class = np.array([' '.join(X_sfa[y_ind == i]) for i in range(n_classes)]) X_tfidf = tfidf.fit_transform(X_class) for key, value in tfidf.vocabulary_.items(): self.vocabulary_[value] = key self._sfa = sfa self._tfidf = tfidf self.tfidf_ = X_tfidf return self
[docs] def predict(self, X): """Transform the provided data. Parameters ---------- X : array-like, shape = [n_samples, n_features] Returns ------- X_new : sparse matrix, shape [n_samples, n_words] Document-term matrix. """ # Check fitted check_is_fitted(self, ['tfidf_', '_sfa', '_tfidf', 'vocabulary_']) # Check X X = check_array(X) n_samples, n_features = X.shape n_windows = n_features - self.window_size + 1 X_window = np.asarray([X[:, i: i + self.window_size] for i in range(n_windows)]) X_window = X_window.reshape(n_samples * n_windows, -1, order='F') X_sfa = self._sfa.transform(X_window) X_sfa = np.apply_along_axis(lambda x: ''.join(x), 1, X_sfa).reshape(n_samples, -1) if self.numerosity_reduction: X_sfa = np.apply_along_axis(numerosity_reduction, 1, X_sfa) else: X_sfa = np.apply_along_axis(lambda x: ' '.join(x), 1, X_sfa) tf = self._tfidf.transform(X_sfa) / self._tfidf.idf_ y_pred = cosine_similarity(tf, self.tfidf_).argmax(axis=1) return self._classes[y_pred]
def _tok(self, x): return x.split(' ')