Source code for mlpy.stats.dbn.hmm

"""
Hidden Markov Models
====================

.. autosummary::
   :toctree: generated/
   :nosignatures:

   HMM
   DiscreteHMM
   GaussianHMM
   StudentHMM
   GMMHMM

"""
from __future__ import division, print_function, absolute_import
# noinspection PyUnresolvedReferences
from six.moves import range

from abc import ABCMeta, abstractmethod

import numpy as np
# noinspection PyPackageRequirements
from sklearn.mixture import GMM

from ...auxiliary.array import accum, normalize
from ...optimize.algorithms import EM
from ...libs import hmmc
from ..models.mixture import StudentMM
from ..models import markov
from .. import normalize_logspace, nonuniform
from .. import conditional_normal, conditional_student, conditional_mix_normal
from .. import multivariate_normal, multivariate_student


[docs]class HMM(EM): """Hidden Markov Model base class. Representation of a hidden Markov model probability distribution. This class allows for easy evaluation of, sampling from, and maximum-likelihood estimation of the parameters of a HMM. See the instance documentation for details specific to a particular object. Parameters ---------- ncomponents : int Number of states in the model. startprob_prior : array, shape (`ncomponents`,) Initial state occupation prior distribution. startprob : array, shape (`ncomponents`,) Initial state occupation distribution. transmat_prior : array, shape (`ncomponents`, `ncomponents`) Matrix of prior transition probabilities between states. transmat : array, shape (`ncomponents`, `ncomponents`) Matrix of transition probabilities between states. emission : cond_rv_frozen The conditional probability distribution used for the emission. emission_prior : normal_invwishart Initial emission parameters, a normal-inverse Wishart distribution. n_iter : int Number of iterations to perform during training, optional. thresh : float Convergence threshold, optional. verbose : bool Controls if debug information is printed to the console, optional. Attributes ---------- ncomponents : int The number of hidden states. nfeatures : int Dimensionality of the Gaussian emission. startprob_prior : array, shape (`ncomponents`,) Initial state occupation prior distribution. startprob : array, shape (`ncomponents`,) Initial state occupation distribution. transmat_prior : array, shape (`ncomponents`, `ncomponents`) Matrix of prior transition probabilities between states. transmat : array, shape (`ncomponents`, `ncomponents`) Matrix of transition probabilities between states. emission_prior : normal_invwishart Initial emission parameters, a normal-inverse Wishart distribution. emission : cond_rv_frozen The conditional probability distribution used for the emission. Examples -------- >>> from mlpy.stats.dbn.hmm import GaussianHMM >>> model = GaussianHMM(ncomponents=2, startprob_prior=[3, 2]) Create a gaussian hidden Markov model >>> import scipy.io >>> mat = scipy.io.loadmat('data/speechDataDigits4And5.mat')) >>> x = np.hstack([mat['train4'][0], mat['train5'][0]]) Load data used for fitting the HMM and fit the HMM: >>> model.fit(x, n_init=3) .. note:: Adapted from Matlab: | Project: `Probabilistic Modeling Toolkit for Matlab/Octave <https://github.com/probml/pmtk3>`_. | Copyright (2010) Kevin Murphy and Matt Dunham | License: `MIT <https://github.com/probml/pmtk3/blob/5fefd068a2e84ae508684d3e4750bd72a4164ba0/license.txt>`_ """ __metaclass__ = ABCMeta @property def startprob_prior(self): """Vector of initial probabilities for each state. Returns ------- startprob_prior : array, shape (`ncomponents`,) The initial probabilities. """ return self._startprob_prior @startprob_prior.setter def startprob_prior(self, pi_prior): self._startprob_prior = np.ones(self.ncomponents, dtype=float) if pi_prior is None else np.asarray(pi_prior, dtype=np.float64) # check if there exists a component whose value is exactly zero # if so, add a small number and re-normalize if not np.alltrue(self._startprob_prior): normalize(self._startprob_prior) if len(self._startprob_prior) != self.ncomponents: raise ValueError('pi_prior must have length ncomponents') @property def transmat_prior(self): """Transition probability matrix. Returns ------- transmat_prior : array, shape (`ncomponents`, `ncomponents`) Matrix of transition probabilities from each state to every other state. """ return self._transmat_prior @transmat_prior.setter def transmat_prior(self, trans_prior): self._transmat_prior = np.ones((self.ncomponents, self.ncomponents), float) if trans_prior is None else np.asarray(trans_prior, dtype=np.float64) # check if there exists a component whose value is exactly zero # if so, add a small number and re-normalize if not np.alltrue(self._transmat_prior): normalize(self._transmat_prior) if np.asarray(self._transmat_prior).shape != (self.ncomponents, self.ncomponents): self._transmat_prior = np.tile(self._transmat_prior, (self.ncomponents, 1)) def __init__(self, ncomponents=1, startprob_prior=None, startprob=None, transmat_prior=None, transmat=None, emission_prior=None, emission=None, n_iter=None, thresh=None, verbose=None): super(HMM, self).__init__(n_iter, thresh, verbose) self.ncomponents = ncomponents self.nfeatures = 1 self._startprob_prior = None self._transmat_prior = None self._fit_X = None """:type:ndarray[ndarray[float]]""" self.startprob_prior = startprob_prior self.transmat_prior = transmat_prior self.startprob = startprob self.transmat = transmat self.emission_prior = emission_prior self.emission = emission
[docs] def score_samples(self, obs): """Compute the log probability of the evidence. Compute the log probability of the evidence (likelihood) under the model and the posteriors. Parameters ---------- obs : array_like, shape (`n`, `len`, `nfeatures`) Sequence of `nfeatures`-dimensional data points. Each row corresponds to a single point in the sequence. Returns ------- logp : float Log likelihood of the sequence `obs`. posteriors : array_like, shape (`n`, `ncomponents`) Posterior probabilities of each state for each observation """ n = obs.shape[0] logp = np.zeros(n) posterior = np.zeros((n, obs.shape[1], self.ncomponents), dtype=np.float64) for i, x in enumerate(obs): log_b = self.emission.logpdf(x) log_b, scale = normalize_logspace(log_b.T) b = np.exp(log_b) logp[i], alpha = self._forward(self.startprob, self.transmat, b.T) _, gamma = self._backward(self.transmat, b.T, alpha) logp[i] += np.sum(scale) posterior[i] = gamma.T return logp, posterior
[docs] def score(self, obs): """ Compute log probability of the evidence (likelihood) under the model. Parameters ---------- obs : array_like, shape (`n`, `len`, `nfeatures`) Sequence of `nfeatures`-dimensional data points. Each row corresponds to a single point in the sequence. Returns ------- logp : float Log likelihood of the sequence `obs`. """ n = obs.shape[0] logp = np.zeros(n) for i, x in enumerate(obs): log_b = self.emission.logpdf(x) log_b, scale = normalize_logspace(log_b.T) b = np.exp(log_b) logp[i], alpha = self._forward(self.startprob, self.transmat, b.T) logp[i] += np.sum(scale) return logp
[docs] def predict_proba(self, obs): """Compute the posterior probability for each state in the model. Parameters ---------- obs : array_like, shape (`n`, `len`, `nfeatures`) Sequence of `nfeatures`-dimensional data points. Each row corresponds to a single point in the sequence. Returns ------- posteriors : array_like, shape (`n`, `ncomponents`) Posterior probabilities of each state for each observation """ _, posteriors = self.score_samples(obs) return posteriors
[docs] def sample(self, length, size=1): """Generates random samples from the model. Parameters ---------- length : int or ndarray[int] Length of a sample size : int, optional Number of samples to generate. Default is 1. Returns ------- obs : array_like, shape (`n`, `ni`, `nfeatures`) List of samples, where `n` is the number of samples, `ni` is the length of the i-th sample, and each observation has `nfeatures`. hidden_states : array_like, shape (`n`, `ni`) List of hidden states, where `n` is the number of samples, `ni` is the i-th hidden state. """ # noinspection PyTypeChecker length = np.tile(length, size) if not hasattr(length, "__len__") else length assert (length.size == size) hidden_states = np.empty((size, np.max(length)), dtype=np.int32) obs = np.empty((size, np.max(length), self.nfeatures), dtype=np.float64) * np.nan for i in range(size): hidden_states[i] = markov.sample(self.startprob, self.transmat, size=length[i]) for t in range(length[i]): obs[i][t] = self._generate_sample_from_state(hidden_states[i][t]) if size == 1: hidden_states = hidden_states[0] obs = obs[0] return obs, hidden_states
[docs] def decode(self, obs, algorithm="viterbi"): """Find the most likely state sequence. Find the most likely state sequence corresponding to the observation `obs`. Uses the given algorithm for decoding. Parameters ---------- obs : array_like, shape (`nfeatures`, `T`) The local evidence vector. algorithm : {'viterbi', 'map'} Decoder algorithm to be used. Returns ------- best_path : array_like, shape (`n`,) The most likely states for each observation loglik : float Log probability of the maximum likelihood path through the HMM """ algorithm = algorithm if algorithm in frozenset(("viterbi", "map")) else "viterbi" return { "viterbi": self._decode_viterbi, "map": self._decode_map }[algorithm](obs)
[docs] def fit(self, obs, n_init=1): """Estimate model parameters. Parameters ---------- obs : array_like, shape (`n`, `ni`, `nfeatures`) List of observation sequences, where `n` is the number of sequences, `ni` is the length of the i_th observation, and each observation has `nfeatures` features. Returns ------- float : log likelihood of the sequence `obs` """ self.nfeatures = obs[0].shape[0] self._fit_X = obs return self._em(obs, n_init=n_init)
@abstractmethod def _initialize(self, obs, init_count): """Perform initialization step before entering the EM algorithm. Parameters ---------- obs : array_like, shape (`n`, `ni`, `nfeatures`) List of observation sequences, where `n` is the number of sequences, `ni` is the length of the i_th observation, and each observation has `nfeatures` features. init_count : int Restart counter Raises ------ NotImplementedError If the child class does not implement this function. """ raise NotImplementedError @abstractmethod def _generate_sample_from_state(self, state): """Generate a sample from the given current state. Parameters ---------- state : int Current state. Returns ------- sample: int An observation sampled for the given state Raises ------ NotImplementedError If the child class does not implement this function. """ raise NotImplementedError def _decode_viterbi(self, obs, scale=True): """Find most likely (Viterbi) state sequence corresponding to `obs`. Parameters ---------- obs : array_like, shape (`nfeatures`, `T`) The local evidence vector. Returns ------- best_path: array_like, shape (`n`,) The most likely states for each observation loglik: float Log probability of the maximum likelihood path through the HMM """ log_b = self.emission.logpdf(obs.T) obslik = np.exp(log_b) seq_len = obslik.shape[1] delta = np.zeros((self.ncomponents, seq_len), dtype=np.float64) psi = np.zeros((self.ncomponents, seq_len), dtype=np.int32) best_path = np.zeros(seq_len, dtype=np.int32) scales = np.ones(seq_len, dtype=np.float64) delta[:, 0] = self.startprob * obslik[:, 0] if scale: delta[:, 0], n = normalize(delta[:, 0], return_scale=True) scales[0] = 1.0 / n psi[:, 0] = 0 # arbitrary, since there is no predecessor to t=0 for t in range(1, seq_len): for j in range(self.ncomponents): m = delta[:, t - 1] * self.transmat[:, j] delta[j, t] = np.max(m) psi[j, t] = np.argmax(m) delta[j, t] = delta[j, t] * obslik[j, t] if scale: delta[:, t], n = normalize(delta[:, t], return_scale=True) scales[t] = 1.0 / n best_path[seq_len - 1] = np.argmax(delta[:, seq_len - 1]) for t in range(seq_len - 2, -1, -1): best_path[t] = psi[best_path[t + 1], t + 1] if scale: loglik = -np.sum(np.log(scales)) else: p = np.max(delta[:, seq_len - 1]) loglik = np.log(p) return best_path, loglik def _decode_map(self, obs): """Find most likely (MAP) state sequence corresponding to `obs`. Uses the maximum a posteriori estimation. Parameters ---------- obs : array_like, shape (`nfeatures`, `T`) The local evidence vector. Returns ------- best_path: array_like, shape (`n`,) The most likely states for each observation loglik: float Log probability of the maximum likelihood path through the HMM """ _, posteriors = self.score_samples(np.array([obs.T])) best_path = np.argmax(posteriors[0], axis=1) loglik = np.max(posteriors[0], axis=1).sum() return best_path, loglik def _estep(self, obs): """Perform expectation step of the EM algorithm. Parameters ---------- obs : array_like, shape (`n`, `ni`, `nfeatures`) List of observation sequences, where `n` is the number of sequences, `ni` is the length of the i_th observation, and each observation has `nfeatures` features. Returns ------- loglik : float Log likelihood of the observation `obs` """ stacked_obs = self._stack_obs(obs) seq_idx = np.cumsum([0] + [x.shape[1] for x in obs]) nstacked = stacked_obs.shape[0] start_counts = np.zeros(self.ncomponents) trans_counts = np.zeros((self.ncomponents, self.ncomponents)) weights = np.zeros((nstacked, self.ncomponents)) loglik = 0 nobs = obs.shape[0] log_b = self.emission.logpdf(stacked_obs) log_b, scale = normalize_logspace(log_b.T) b = np.exp(log_b) for i in range(nobs): ndx = np.arange(seq_idx[i], seq_idx[i + 1]) bi = b[ndx] logp, alpha = hmmc.forward(self.startprob, self.transmat, bi.T) # logp, alpha = self._forward(self.startprob, self.transmat, bi.T) beta, gamma = hmmc.backward(self.transmat, bi.T, alpha) # beta, gamma = self._backward(self.transmat, bi.T, alpha) loglik += logp xi_summed = hmmc.computeTwoSliceSum(alpha, beta, self.transmat, bi.T) # xi_summed = self._compute_two_slice_sum(alpha, beta, self.transmat, bi.T) start_counts += gamma[:, 0] trans_counts += xi_summed weights[ndx] += gamma.T loglik += np.sum(scale) log_prior = np.dot(np.log(np.ravel(self.transmat) + np.spacing(1)), np.ravel(self.transmat_prior)) + np.dot(np.log(self.startprob + np.spacing(1)), self.startprob_prior) loglik += log_prior # emission component self.startprob = normalize(start_counts + self.startprob_prior) self.transmat = normalize(trans_counts + self.transmat_prior, 1) self.emission.expected_sufficient_statistics(stacked_obs, weights) loglik += self.emission.logprior() return loglik def _mstep(self): """Perform maximization step of the EM algorithm.""" self.emission.fit() def _stack_obs(self, obs): """Stack observations. Stack observations to a sequence of (`n`*`ni`)-by-`nfeatures`-dimensional data points. Each row corresponds to a single data point in a sequence. Parameters ---------- obs : array_like, shape (`n`, `ni`, `nfeatures`) List of observation sequences, where `n` is the number of sequences, `ni` is the length of the i_th observation, and each observation has `nfeatures` features. Returns ------- stacked_obs : array_like, shape (`n`*`ni`, `nfeatures`) List of observation sequences """ stacked_obs = np.empty((0, self.nfeatures), dtype=np.float64) for i in range(obs.shape[0]): # d = obs[i].T if obs[i].shape[0] == self.nfeatures else obs[i] stacked_obs = np.vstack([stacked_obs, obs[i].T]) return stacked_obs def _rand_init(self): """Randomly initialize the prior and the transition probabilities. """ # noinspection PyArgumentList self.startprob = normalize(np.random.rand(self.ncomponents) + self.startprob_prior - 1) self.transmat = normalize( np.random.rand(self.ncomponents, self.ncomponents) + self.transmat_prior - 1, 1) def _init_with_mix_model(self, obs, pz): """Initialize the prior probabilities and transition probabilities. Initialize the prior probabilities and transition probabilities during the initialization step before the EM algorithm. Parameters ---------- obs : array_like, shape (`n`, `ni`, `nfeatures`) List of observation sequences, where `n` is the number of sequences, `ni` is the length of the i_th observation, and each observation has `nfeatures` features. pz : array_like, shape (`n`*`ni`, `ncomponents`) Posterior probability of the observation """ z = np.argmax(pz, axis=1) if self.transmat is None: self.transmat = accum(np.vstack([z[0:-1], z[1::]]).T, 1, size=(self.ncomponents, self.ncomponents)) # regularize self.transmat = normalize(self.transmat + np.ones(self.transmat.shape), 1) if self.startprob is None: seq_idx = np.cumsum([0] + [x.shape[1] for x in obs]) self.startprob = np.bincount(z[seq_idx[0:-1]], minlength=self.ncomponents) self.startprob = normalize(self.startprob + np.ones(self.startprob.shape)) # regularize # noinspection PyMethodMayBeStatic def _forward(self, pi, transmat, softev): n, m = softev.shape scale = np.zeros(m) at = transmat.T alpha = np.zeros((n, m)) alpha[:, 0], scale[0] = normalize(np.ravel(pi) * softev[:, 0], return_scale=True) for t in range(1, m): alpha[:, t], scale[t] = normalize(np.dot(at, alpha[:, t - 1]) * softev[:, t], return_scale=True) loglik = np.sum(np.log(scale + np.finfo(float).eps)) return loglik, alpha # noinspection PyMethodMayBeStatic def _backward(self, transmat, softev, alpha): n, m = softev.shape beta = np.zeros((n, m)) beta[:, m - 1] = np.ones(n) for t in reversed(range(m)): beta[:, t - 1] = normalize(np.dot(transmat, beta[:, t] * softev[:, t])) gamma = normalize(alpha * beta, 0) return beta, gamma # noinspection PyMethodMayBeStatic def _compute_two_slice_sum(self, alpha, beta, transmat, softev): n, m = softev.shape xi_summed = np.zeros((n, n)) for t in reversed(range(1, m)): b = beta[:, t] * softev[:, t] xit = transmat * np.dot(alpha[:, t - 1].reshape(n, 1), b.reshape(1, n)) xi_summed += np.true_divide(xit, np.sum(np.ravel(xit))) return xi_summed
[docs]class DiscreteHMM(HMM): """Hidden Markov Model with discrete(multinomial) emissions. Representation of a hidden Markov model probability distribution. This class allows for easy evaluation of, sampling from, and maximum-likelihood estimation of the parameters of a HMM. Parameters ---------- ncomponents : int Number of states in the model. startprob_prior : array, shape (`ncomponents`,) Initial state occupation prior distribution. startprob : array, shape (`ncomponents`,) Initial state occupation distribution. transmat_prior : array, shape (`ncomponents`, `ncomponents`) Matrix of prior transition probabilities between states. transmat : array, shape (`ncomponents`, `ncomponents`) Matrix of transition probabilities between states. emission : cond_rv_frozen The conditional probability distribution used for the emission. emission_prior : normal_invwishart Initial emission parameters, a normal-inverse Wishart distribution. n_iter : int Number of iterations to perform during training, optional. thresh : float Convergence threshold, optional. verbose : bool Controls if debug information is printed to the console, optional. Attributes ---------- ncomponents : int The number of hidden states. nfeatures : int Dimensionality of the Gaussian emission. startprob_prior : array, shape (`ncomponents`,) Initial state occupation prior distribution. startprob : array, shape (`ncomponents`,) Initial state occupation distribution. transmat_prior : array, shape (`ncomponents`, `ncomponents`) Matrix of prior transition probabilities between states. transmat : array, shape (`ncomponents`, `ncomponents`) Matrix of transition probabilities between states. emission_prior : normal_invwishart Initial emission parameters, a normal-inverse Wishart distribution. emission : cond_rv_frozen The conditional probability distribution used for the emission. Examples -------- >>> from mlpy.stats.dbn.hmm import DiscreteHMM >>> DiscreteHMM(ncomponents=2) ... .. note:: Adapted from Matlab: | Project: `Probabilistic Modeling Toolkit for Matlab/Octave <https://github.com/probml/pmtk3>`_. | Copyright (2010) Kevin Murphy and Matt Dunham | License: `MIT <https://github.com/probml/pmtk3/blob/5fefd068a2e84ae508684d3e4750bd72a4164ba0/license.txt>`_ """ def __init__(self, ncomponents=1, startprob_prior=None, startprob=None, transmat_prior=None, transmat=None, emission_prior=None, emission=None, n_iter=None, thresh=None, verbose=None): super(DiscreteHMM, self).__init__(ncomponents, startprob_prior, startprob, transmat_prior, transmat, emission_prior, emission, n_iter, thresh, verbose) def _generate_sample_from_state(self, state): """Generate a sample from the given current state. Parameters ---------- state : int Current state. Returns ------- sample: int An observation sampled for the given state """ return nonuniform.rvs(self.emission.T[state]) def _initialize(self, obs, init_count): """Perform initialization step before entering the EM algorithm. Parameters ---------- obs : array_like, shape (`n`, `ni`, `nfeatures`) List of observation sequences, where `n` is the number of sequences, `ni` is the length of the i_th observation, and each observation has `nfeatures` features. init_count : int Restart counter Raises ------ NotImplementedError This function is not implemented yet. """ raise NotImplementedError
[docs]class GaussianHMM(HMM): """ Hidden Markov Model with Gaussian emissions. Representation of a hidden Markov model probability distribution. This class allows for easy evaluation of, sampling from, and maximum-likelihood estimation of the parameters of a HMM. Parameters ---------- ncomponents : int Number of states in the model. startprob_prior : array, shape (`ncomponents`,) Initial state occupation prior distribution. startprob : array, shape (`ncomponents`,) Initial state occupation distribution. transmat_prior : array, shape (`ncomponents`, `ncomponents`) Matrix of prior transition probabilities between states. transmat : array, shape (`ncomponents`, `ncomponents`) Matrix of transition probabilities between states. emission : conditional_normal_frozen The conditional probability distribution used for the emission. emission_prior : normal_invwishart Initial emission parameters, a normal-inverse Wishart distribution. n_iter : int Number of iterations to perform during training, optional. thresh : float Convergence threshold, optional. verbose : bool Controls if debug information is printed to the console, optional. Attributes ---------- ncomponents : int The number of hidden states. nfeatures : int Dimensionality of the Gaussian emission. startprob_prior : array, shape (`ncomponents`,) Initial state occupation prior distribution. startprob : array, shape (`ncomponents`,) Initial state occupation distribution. transmat_prior : array, shape (`ncomponents`, `ncomponents`) Matrix of prior transition probabilities between states. transmat : array, shape (`ncomponents`, `ncomponents`) Matrix of transition probabilities between states. emission_prior : normal_invwishart Initial emission parameters, a normal-inverse Wishart distribution. emission : cond_rv_frozen The conditional probability distribution used for the emission. mean : array, shape (`ncomponents`, `nfeatures`) Mean parameters for each state. cov : array, shape (`ncomponents`, `nfeatures`, `nfeatures`) Covariance parameters for each state. Examples -------- >>> from mlpy.stats.dbn.hmm import GaussianHMM >>> GaussianHMM(ncomponents=2) ... .. note:: Adapted from Matlab: | Project: `Probabilistic Modeling Toolkit for Matlab/Octave <https://github.com/probml/pmtk3>`_. | Copyright (2010) Kevin Murphy and Matt Dunham | License: `MIT <https://github.com/probml/pmtk3/blob/5fefd068a2e84ae508684d3e4750bd72a4164ba0/license.txt>`_ """ @property def mean(self): """The mean parameters for each state Returns ------- array, shape (`ncomponents`, `nfeatures`) : Mean parameters for each state. """ return self.emission.mean @property def cov(self): """Covariance parameters for each state. Returns ------- array, shape (`ncomponents`, `nfeatures`, `nfeatures`) : Covariance parameters for each state as a full matrix """ return self.emission.cov def __init__(self, ncomponents=1, startprob_prior=None, startprob=None, transmat_prior=None, transmat=None, emission_prior=None, emission=None, n_iter=None, thresh=None, verbose=None): super(GaussianHMM, self).__init__(ncomponents, startprob_prior, transmat_prior, startprob, transmat, emission_prior, emission, n_iter, thresh, verbose) if emission: self.nfeatures = emission.dim def _generate_sample_from_state(self, state): """Generate a sample from the given current state. Parameters ---------- state : int Current state. Returns ------- sample: int An observation sampled for the given state """ return multivariate_normal.rvs(self.emission.mean[state], self.emission.cov[state], size=1) def _initialize(self, obs, init_count): """Perform initialization step before entering the EM algorithm. Parameters ---------- obs : array_like, shape (`n`, `ni`, `nfeatures`) List of observation sequences, where `n` is the number of sequences, `ni` is the length of the i_th observation, and each observation has `nfeatures` features. init_count : int Restart counter """ if self.emission is None or self.startprob is None or self.transmat is None: if init_count == 0: stacked_obs = self._stack_obs(obs) gmm = GMM(n_components=self.ncomponents, covariance_type='full') gmm.fit(stacked_obs) if self.transmat is None or self.startprob is None: pz = gmm.predict_proba(stacked_obs) self._init_with_mix_model(obs, pz) if self.emission is None: self.emission = conditional_normal(gmm.means_, gmm.covars_, algorithm='map') # regularize MLE for i in range(self.emission.ncomponents): self.emission.cov[i] += np.eye(self.emission.dim) else: stacked_obs = self._stack_obs(obs) mean = np.zeros((self.ncomponents, self.nfeatures)) cov = np.zeros((self.ncomponents, self.nfeatures, self.nfeatures)) for i in range(self.ncomponents): xx = stacked_obs + np.random.randn(stacked_obs.shape[0], stacked_obs.shape[1]) mean[i] = np.mean(xx, 0) cov[i, :, :] = np.cov(xx, rowvar=0) if self.emission is None: self.emission = conditional_normal(mean, cov, algorithm='map') else: self.emission.mean = mean self.emission.cov = cov self._rand_init() if self.emission_prior: self.emission.prior = self.emission_prior
[docs]class StudentHMM(HMM): """ Hidden Markov Model with Student emissions Representation of a hidden Markov model probability distribution. This class allows for easy evaluation of, sampling from, and maximum-likelihood estimation of the parameters of a HMM. Parameters ---------- ncomponents : int Number of states in the model. startprob_prior : array, shape (`ncomponents`,) Initial state occupation prior distribution. startprob : array, shape (`ncomponents`,) Initial state occupation distribution. transmat_prior : array, shape (`ncomponents`, `ncomponents`) Matrix of prior transition probabilities between states. transmat : array, shape (`ncomponents`, `ncomponents`) Matrix of transition probabilities between states. emission : conditional_student_frozen The conditional probability distribution used for the emission. emission_prior : normal_invwishart Initial emission parameters, a normal-inverse Wishart distribution. n_iter : int Number of iterations to perform during training, optional. thresh : float Convergence threshold, optional. verbose : bool Controls if debug information is printed to the console, optional. Attributes ---------- ncomponents : int The number of hidden states. nfeatures : int Dimensionality of the Gaussian emission. startprob_prior : array, shape (`ncomponents`,) Initial state occupation prior distribution. startprob : array, shape (`ncomponents`,) Initial state occupation distribution. transmat_prior : array, shape (`ncomponents`, `ncomponents`) Matrix of prior transition probabilities between states. transmat : array, shape (`ncomponents`, `ncomponents`) Matrix of transition probabilities between states. emission_prior : normal_invwishart Initial emission parameters, a normal-inverse Wishart distribution. emission : cond_rv_frozen The conditional probability distribution used for the emission. Examples -------- >>> from mlpy.stats.dbn.hmm import StudentHMM >>> StudentHMM(ncomponents=2) ... .. note:: Adapted from Matlab: | Project: `Probabilistic Modeling Toolkit for Matlab/Octave <https://github.com/probml/pmtk3>`_. | Copyright (2010) Kevin Murphy and Matt Dunham | License: `MIT <https://github.com/probml/pmtk3/blob/5fefd068a2e84ae508684d3e4750bd72a4164ba0/license.txt>`_ """ def __init__(self, ncomponents=1, startprob_prior=None, startprob=None, transmat_prior=None, transmat=None, emission_prior=None, emission=None, n_iter=None, thresh=None, verbose=None): super(StudentHMM, self).__init__(ncomponents, startprob_prior, startprob, transmat_prior, transmat, emission_prior, emission, n_iter, thresh, verbose) def _generate_sample_from_state(self, state): """Generate a sample from the given current state. Parameters ---------- state : int Current state. Returns ------- sample: int An observation sampled for the given state """ return multivariate_student.rvs(self.emission.mean[state], self.emission.cov[state], self.emission.df, size=1) def _initialize(self, obs, init_count): """Perform initialization step before entering the EM algorithm. Parameters ---------- obs : array_like, shape (`n`, `ni`, `nfeatures`) List of observation sequences, where `n` is the number of sequences, `ni` is the length of the i_th observation, and each observation has `nfeatures` features. init_count : int Restart counter """ df = 10 * np.ones(self.ncomponents) fix_df = False if self.emission and self.emission.df: df = self.emission.df fix_df = True if self.emission is None or self.startprob is None or self.transmat is None: if init_count == 0: stacked_obs = self._stack_obs(obs) model = StudentMM(ncomponents=self.ncomponents, n_iter=10) model.fit(stacked_obs) if self.transmat is None or self.startprob is None: pz = model.predict_proba(stacked_obs) self._init_with_mix_model(obs, pz) cov = model.cond_proba.cov + np.eye(self.nfeatures) mean = model.cond_proba.mean if not fix_df: df = model.cond_proba.df self.emission = conditional_student(mean, cov, df, self.emission_prior) else: stacked_obs = self._stack_obs(obs) mean = np.zeros((self.ncomponents, self.nfeatures)) cov = np.zeros((self.ncomponents, self.nfeatures, self.nfeatures)) for i in range(self.ncomponents): xx = stacked_obs + np.random.randn(stacked_obs.shape[0], stacked_obs.shape[1]) mean[i] = np.mean(xx, 0) cov[i, :, :] = np.cov(xx, rowvar=0) if self.emission is None: self.emission = conditional_student(mean, cov, df, self.emission_prior) else: self.emission.mean = mean self.emission.cov = cov self._rand_init() if self.emission_prior: self.emission.prior = self.emission_prior
[docs]class GMMHMM(HMM): """ Hidden Markov Model with Gaussian mixture emissions. Representation of a hidden Markov model probability distribution. This class allows for easy evaluation of, sampling from, and maximum-likelihood estimation of the parameters of a HMM. Parameters ---------- ncomponents : int Number of states in the model. nmix : int Number of mixtures. startprob_prior : array, shape (`ncomponents`,) Initial state occupation prior distribution. startprob : array, shape (`ncomponents`,) Initial state occupation distribution. transmat_prior : array, shape (`ncomponents`, `ncomponents`) Matrix of prior transition probabilities between states. transmat : array, shape (`ncomponents`, `ncomponents`) Matrix of transition probabilities between states. emission : conditional_mix_normal_frozen The conditional probability distribution used for the emission. emission_prior : normal_invwishart Initial emission parameters, a normal-inverse Wishart distribution. n_iter : int Number of iterations to perform during training, optional. thresh : float Convergence threshold, optional. verbose : bool Controls if debug information is printed to the console, optional. Attributes ---------- ncomponents : int The number of hidden states. nmix : int Number of mixtures. nfeatures : int Dimensionality of the Gaussian emission. startprob_prior : array, shape (`ncomponents`,) Initial state occupation prior distribution. startprob : array, shape (`ncomponents`,) Initial state occupation distribution. transmat_prior : array, shape (`ncomponents`, `ncomponents`) Matrix of prior transition probabilities between states. transmat : array, shape (`ncomponents`, `ncomponents`) Matrix of transition probabilities between states. emission_prior : normal_invwishart Initial emission parameters, a normal-inverse Wishart distribution. emission : cond_rv_frozen The conditional probability distribution used for the emission. Examples -------- >>> from mlpy.stats.dbn.hmm import GMMHMM >>> GMMHMM(ncomponents=2) ... .. note:: Adapted from Matlab: | Project: `Probabilistic Modeling Toolkit for Matlab/Octave <https://github.com/probml/pmtk3>`_. | Copyright (2010) Kevin Murphy and Matt Dunham | License: `MIT <https://github.com/probml/pmtk3/blob/5fefd068a2e84ae508684d3e4750bd72a4164ba0/license.txt>`_ """ def __init__(self, ncomponents=1, nmix=1, startprob_prior=None, startprob=None, transmat_prior=None, transmat=None, emission_prior=None, emission=None, n_iter=None, thresh=None, verbose=None): super(GMMHMM, self).__init__(ncomponents, startprob_prior, startprob, transmat_prior, transmat, emission_prior, emission, n_iter, thresh, verbose) self.nmix = 1 if nmix is None else nmix def _generate_sample_from_state(self, state): """Generate a sample from the given current state. Parameters ---------- state : int Current state. Returns ------- sample: int An observation sampled for the given state Raises ------ NotImplementedError This functionality is not implemented yet. """ raise NotImplementedError def _initialize(self, obs, init_count): """Perform initialization step before entering the EM algorithm. Parameters ---------- obs : array_like, shape (`n`, `ni`, `nfeatures`) List of observation sequences, where `n` is the number of sequences, `ni` is the length of the i_th observation, and each observation has `nfeatures` features. init_count : int Restart counter """ if self.emission is None or self.startprob is None or self.transmat is None: stacked_obs = self._stack_obs(obs) if init_count == 0: gmm = GMM(n_components=self.ncomponents, covariance_type='full') gmm.fit(stacked_obs) if self.emission is None: cov = np.eye(self.nfeatures) + gmm.covars_ # noinspection PyTypeChecker m = np.tile(gmm.weights_, self.ncomponents) m = normalize(m + np.random.rand(m.shape[0], m.shape[1]), 0) self.emission = conditional_mix_normal(gmm.means_, cov, m, self.emission_prior) else: stacked_obs = self._stack_obs(obs) mean = np.zeros((self.ncomponents, self.nfeatures)) cov = np.zeros((self.ncomponents, self.nfeatures, self.nfeatures)) for i in range(self.ncomponents): xx = stacked_obs + np.random.randn(stacked_obs.shape[0], stacked_obs.shape[1]) mean[i] = np.mean(xx, 0) cov[i, :, :] = np.cov(xx, rowvar=0) m = normalize(np.random.rand(self.nmix, self.ncomponents), 0) if self.emission is None: self.emission = conditional_mix_normal(mean, cov, m, self.emission_prior) else: self.emission.mean = mean self.emission.cov = cov self.m = m self._rand_init() if self.emission_prior: self.emission.prior = self.emission_prior