# -*- coding: utf-8 -*- """ This module borrows and adapts `Pipeline` from `sklearn.pipeline` and `TransformerMixin` from `sklearn.base` in the scikit-learn framework (commit hash d205638475ca542dc46862652e3bb0be663a8eac) to be precise). Both are BSD licensed and allow for this sort of thing; attribution is given as a comment above each class. License reproduced below: BSD 3-Clause License Copyright (c) 2007-2022 The scikit-learn developers. All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. * Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ from collections import defaultdict from itertools import islice from typing import Any, Callable, Optional from typing_extensions import Protocol class TransformerProtocol(Protocol): fit: Callable[..., "TransformerProtocol"] transform: Callable[[Any], Any] # Author: Gael Varoquaux # License: BSD 3 clause class TransformerMixin(TransformerProtocol): """Mixin class for all transformers.""" def fit_transform(self, X: Any, y: Optional[Any] = None, **fit_params: Any) -> Any: """ Fit to data, then transform it. Fits transformer to X and y with optional parameters fit_params and returns a transformed version of X. Parameters ---------- X : ndarray of shape (n_samples, n_features) Training set. y : ndarray of shape (n_samples,), default=None Target values. **fit_params : dict Additional fit parameters. Returns ------- X_new : ndarray array of shape (n_samples, n_features_new) Transformed array. """ # non-optimized default implementation; override when a better # method is possible for a given clustering algorithm if y is None: # fit method of arity 1 (unsupervised transformation) return self.fit(X, **fit_params).transform(X) else: # fit method of arity 2 (supervised transformation) return self.fit(X, y, **fit_params).transform(X) # Author: Edouard Duchesnay # Gael Varoquaux # Virgile Fritsch # Alexandre Gramfort # Lars Buitinck # License: BSD class Pipeline: def __init__(self, steps, verbose=False): self.steps = steps self.verbose = verbose self._validate_steps() def _validate_steps(self): names, estimators = zip(*self.steps) # validate estimators transformers = estimators[:-1] estimator = estimators[-1] for t in transformers: if t is None or t == "passthrough": continue if not (hasattr(t, "fit") or hasattr(t, "fit_transform")) or not hasattr( t, "transform" ): raise TypeError( "All intermediate steps should be " "transformers and implement fit and transform " "or be the string 'passthrough' " "'%s' (type %s) doesn't" % (t, type(t)) ) # We allow last estimator to be None as an identity transformation if ( estimator is not None and estimator != "passthrough" and not hasattr(estimator, "fit") ): raise TypeError( "Last step of Pipeline should implement fit " "or be the string 'passthrough'. " "'%s' (type %s) doesn't" % (estimator, type(estimator)) ) def _iter(self, with_final=True, filter_passthrough=True): """ Generate (idx, (name, trans)) tuples from self.steps When filter_passthrough is True, 'passthrough' and None transformers are filtered out. """ stop = len(self.steps) if not with_final: stop -= 1 for idx, (name, trans) in enumerate(islice(self.steps, 0, stop)): if not filter_passthrough: yield idx, name, trans elif trans is not None and trans != "passthrough": yield idx, name, trans def __len__(self) -> int: """ Returns the length of the Pipeline """ return len(self.steps) def __getitem__(self, ind): """Returns a sub-pipeline or a single esimtator in the pipeline Indexing with an integer will return an estimator; using a slice returns another Pipeline instance which copies a slice of this Pipeline. This copy is shallow: modifying (or fitting) estimators in the sub-pipeline will affect the larger pipeline and vice-versa. However, replacing a value in `step` will not affect a copy. """ if isinstance(ind, slice): if ind.step not in (1, None): raise ValueError("Pipeline slicing only supports a step of 1") return self.__class__(self.steps[ind]) try: name, est = self.steps[ind] except TypeError: # Not an int, try get step by name return self.named_steps[ind] return est @property def _estimator_type(self): return self.steps[-1][1]._estimator_type @property def named_steps(self): return dict(self.steps) @property def _final_estimator(self): estimator = self.steps[-1][1] return "passthrough" if estimator is None else estimator def _log_message(self, step_idx): if not self.verbose: return None name, step = self.steps[step_idx] return "(step %d of %d) Processing %s" % (step_idx + 1, len(self.steps), name) # Estimator interface def _fit(self, X, y=None, **fit_params): # shallow copy of steps - this should really be steps_ self.steps = list(self.steps) self._validate_steps() fit_params_steps = {name: {} for name, step in self.steps if step is not None} for pname, pval in fit_params.items(): if "__" not in pname: raise ValueError( "Pipeline.fit does not accept the {} parameter. " "You can pass parameters to specific steps of your " "pipeline using the stepname__parameter format, e.g. " "`Pipeline.fit(X, y, logisticregression__sample_weight" "=sample_weight)`.".format(pname) ) step, param = pname.split("__", 1) fit_params_steps[step][param] = pval for step_idx, name, transformer in self._iter( with_final=False, filter_passthrough=False ): if transformer is None or transformer == "passthrough": continue # Fit or load from cache the current transformer X, fitted_transformer = _fit_transform_one( transformer, X, y, None, **fit_params_steps[name] ) # Replace the transformer of the step with the fitted # transformer. This is necessary when loading the transformer # from the cache. self.steps[step_idx] = (name, fitted_transformer) if self._final_estimator == "passthrough": return X, {} return X, fit_params_steps[self.steps[-1][0]] def fit(self, X, y=None, **fit_params): """Fit the model Fit all the transforms one after the other and transform the data, then fit the transformed data using the final estimator. Parameters ---------- X : iterable Training data. Must fulfill input requirements of first step of the pipeline. y : iterable, default=None Training targets. Must fulfill label requirements for all steps of the pipeline. **fit_params : dict of string -> object Parameters passed to the ``fit`` method of each step, where each parameter name is prefixed such that parameter ``p`` for step ``s`` has key ``s__p``. Returns ------- self : Pipeline This estimator """ Xt, fit_params = self._fit(X, y, **fit_params) if self._final_estimator != "passthrough": self._final_estimator.fit(Xt, y, **fit_params) return self def fit_transform(self, X, y=None, **fit_params): """Fit the model and transform with the final estimator Fits all the transforms one after the other and transforms the data, then uses fit_transform on transformed data with the final estimator. Parameters ---------- X : iterable Training data. Must fulfill input requirements of first step of the pipeline. y : iterable, default=None Training targets. Must fulfill label requirements for all steps of the pipeline. **fit_params : dict of string -> object Parameters passed to the ``fit`` method of each step, where each parameter name is prefixed such that parameter ``p`` for step ``s`` has key ``s__p``. Returns ------- Xt : array-like of shape (n_samples, n_transformed_features) Transformed samples """ last_step = self._final_estimator Xt, fit_params = self._fit(X, y, **fit_params) if last_step == "passthrough": return Xt if hasattr(last_step, "fit_transform"): return last_step.fit_transform(Xt, y, **fit_params) else: return last_step.fit(Xt, y, **fit_params).transform(Xt) @property def transform(self): """Apply transforms, and transform with the final estimator This also works where final estimator is ``None``: all prior transformations are applied. Parameters ---------- X : iterable Data to transform. Must fulfill input requirements of first step of the pipeline. Returns ------- Xt : array-like of shape (n_samples, n_transformed_features) """ # _final_estimator is None or has transform, otherwise attribute error # XXX: Handling the None case means we can't use if_delegate_has_method if self._final_estimator != "passthrough": self._final_estimator.transform return self._transform def _transform(self, X): Xt = X for _, _, transform in self._iter(): Xt = transform.transform(Xt) return Xt @property def classes_(self): return self.steps[-1][-1].classes_ @property def _pairwise(self): # check if first estimator expects pairwise input return getattr(self.steps[0][1], "_pairwise", False) @property def n_features_in_(self): # delegate to first step (which will call _check_is_fitted) return self.steps[0][1].n_features_in_ def _name_estimators(estimators): """Generate names for estimators.""" names = [ estimator if isinstance(estimator, str) else type(estimator).__name__.lower() for estimator in estimators ] namecount = defaultdict(int) for est, name in zip(estimators, names): namecount[name] += 1 for k, v in list(namecount.items()): if v == 1: del namecount[k] for i in reversed(range(len(estimators))): name = names[i] if name in namecount: names[i] += "-%d" % namecount[name] namecount[name] -= 1 return list(zip(names, estimators)) def make_pipeline(*steps, **kwargs) -> Pipeline: """Construct a Pipeline from the given estimators. This is a shorthand for the Pipeline constructor; it does not require, and does not permit, naming the estimators. Instead, their names will be set to the lowercase of their types automatically. Parameters ---------- *steps : list of estimators. verbose : bool, default=False If True, the time elapsed while fitting each step will be printed as it is completed. Returns ------- p : Pipeline """ verbose = kwargs.pop("verbose", False) if kwargs: raise TypeError( 'Unknown keyword arguments: "{}"'.format(list(kwargs.keys())[0]) ) return Pipeline(_name_estimators(steps), verbose=verbose) def _transform_one(transformer, X, y, weight, **fit_params): res = transformer.transform(X) # if we have a weight for this transformer, multiply output if weight is None: return res return res * weight def _fit_transform_one(transformer, X, y, weight, **fit_params): """ Fits ``transformer`` to ``X`` and ``y``. The transformed result is returned with the fitted transformer. If ``weight`` is not ``None``, the result will be multiplied by ``weight``. """ if hasattr(transformer, "fit_transform"): res = transformer.fit_transform(X, y, **fit_params) else: res = transformer.fit(X, y, **fit_params).transform(X) if weight is None: return res, transformer return res * weight, transformer