Source code for pyts.decomposition.decomposition
"""The :mod:`pyts.decomposition` module includes decomposition algorithms.
Implemented algorithms are:
- Singular Spectrum Analysis
"""
from __future__ import division
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
from builtins import range
from future import standard_library
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.validation import check_array
standard_library.install_aliases()
[docs]class SSA(BaseEstimator, TransformerMixin):
"""Singular Spectrum Analysis.
Parameters
----------
window_size : int
The size of the sliding window.
grouping : None, int or array-like (default = None)
The way the elementary matrices are grouped. If None,
no grouping is performed. If an integer, the number of
groups is equal to this integer. If array-like, each element
must be a array-like containing the indices for each group.
"""
def __init__(self, window_size, grouping=None):
self.window_size = window_size
self.grouping = grouping
[docs] def fit(self, X=None, y=None):
"""Pass.
Parameters
----------
X
ignored
y
Ignored
"""
return self
[docs] def transform(self, X):
"""Transform the provided data.
Parameters
----------
X : array-like, shape = [n_samples, n_features]
Returns
-------
X_new : array-like, shape = [n_samples, n_splits, n_features]
Transformed data. ``n_splits`` value depends on the value of
``grouping``. If ``grouping=None``, ``n_splits`` is equal to
``window_size``. If ``grouping`` is an integer, ``n_splits`` is
equal to ``grouping``. If ``grouping`` is array-like, ``n_splits``
is equal to the length of ``grouping``.
"""
# Check input data
X = check_array(X)
# Shape parameters
n_samples, n_features = X.shape
# Check parameters
if not isinstance(self.window_size, int):
raise TypeError("'window_size' must be an integer.")
if self.window_size < 1:
raise ValueError("'window_size' must be greater or equal than 1.")
if self.window_size > n_features:
raise ValueError("'window_size' must be lower or equal than "
"the size of each time series.")
if not (self.grouping is None or
isinstance(self.grouping, (int, list, tuple, np.ndarray))):
raise TypeError("'grouping' must be either None, an integer "
"or array-like.")
if isinstance(self.grouping, int) and self.grouping > self.window_size:
raise ValueError("If 'grouping' is an integer, it must be "
"lower than or equal to 'window_size'.")
if isinstance(self.grouping, (list, tuple, np.ndarray)):
idx = np.concatenate(self.grouping)
diff = np.setdiff1d(idx, np.arange(self.window_size))
if diff.size > 0:
raise ValueError("If 'grouping' is array-like, all values in "
"'grouping' must be lower than 'window_size'."
" {0} is not lower than "
"{1}.".format(diff[0], self.window_size))
return np.apply_along_axis(self._ssa, 1, X, n_features,
self.window_size, self.grouping)
def _ssa(self, ts, ts_size, window_size, grouping):
n_lags = ts_size - window_size + 1
# First step: create a matrix of lagged vectors
X = np.array([ts[i: i + window_size] for i in range(n_lags)]).T
# Second step: compute the eigenvalues and eigenvectors of this
# matrix multiplied by its transposed matrix then compute
# elementary matrices
w, v = np.linalg.eigh(X.dot(X.T))
w, v = w[::-1], v[:, ::-1]
elementary_matrices = np.empty((window_size, window_size, n_lags))
for i in range(window_size):
elementary_matrices[i] = np.outer(v[:, i], v[:, i]).dot(X)
# Third step: group elementary matrices
if grouping is None:
grouping_size = window_size
grouped_matrices = elementary_matrices
elif isinstance(grouping, int):
grouping = np.array_split(np.arange(window_size), grouping)
grouping_size = len(grouping)
grouped_matrices = np.zeros((grouping_size, window_size, n_lags))
for i, group in enumerate(grouping):
grouped_matrices[i] = elementary_matrices[group].sum(axis=0)
else:
grouping_size = len(grouping)
grouped_matrices = np.zeros((grouping_size, window_size, n_lags))
for i, group in enumerate(grouping):
grouped_matrices[i] = elementary_matrices[group].sum(axis=0)
# Fourth step: decompose the time series in several time series
if window_size >= n_lags:
grouped_matrices = np.transpose(grouped_matrices, axes=[0, 2, 1])
gap = window_size
else:
gap = n_lags
y = np.zeros((grouping_size, ts_size))
first_row = [(0, col) for col in range(n_lags)]
last_col = [(row, n_lags - 1) for row in range(1, window_size)]
indices = first_row + last_col
for group in range(grouping_size):
for i, j in indices:
y[group, i + j] = np.diag(np.fliplr(grouped_matrices[group]),
gap - i - j - 1).mean()
return y