Source code for plotypus.lightcurve

"""
Light curve fitting and plotting functions.
"""

import numpy
numpy.random.seed(0)
from scipy.stats import sem
from sys import stderr
from math import floor
from os import path
import plotypus.utils
from .utils import (verbose_print, make_sure_path_exists,
                    get_signal, get_noise, colvec, mad)
from .periodogram import find_period, Lomb_Scargle, rephase
from .preprocessing import Fourier
from sklearn.cross_validation import cross_val_score
from sklearn.linear_model import LassoLarsIC
from sklearn.pipeline import Pipeline
from sklearn.grid_search import GridSearchCV
from sklearn.utils import ConvergenceWarning
import warnings
warnings.filterwarnings("ignore", category=ConvergenceWarning)
import matplotlib
import matplotlib.pyplot as plt

__all__ = [
    'make_predictor',
    'get_lightcurve',
    'get_lightcurve_from_file',
    'find_outliers',
    'plot_lightcurve'
]


[docs]def make_predictor(regressor=LassoLarsIC(fit_intercept=False), Selector=GridSearchCV, fourier_degree=(2, 25), selector_processes=1, use_baart=False, scoring='r2', scoring_cv=3, **kwargs): """make_predictor(regressor=LassoLarsIC(fit_intercept=False), Selector=GridSearchCV, fourier_degree=(2, 25), selector_processes=1, use_baart=False, scoring='r2', scoring_cv=3, **kwargs) Makes a predictor object for use in :func:`get_lightcurve`. **Parameters** regressor : object with "fit" and "transform" methods, optional Regression object used for solving Fourier matrix (default ``sklearn.linear_model.LassoLarsIC(fit_intercept=False)``). Selector : class with "fit" and "predict" methods, optional Model selection class used for finding the best fit (default :class:`sklearn.grid_search.GridSearchCV`). selector_processes : positive integer, optional Number of processes to use for *Selector* (default 1). use_baart : boolean, optional If True, ignores *Selector* and uses Baart's Criteria to find the Fourier degree, within the boundaries (default False). fourier_degree : 2-tuple, optional Tuple containing lower and upper bounds on Fourier degree, in that order (default (2, 25)). scoring : str, optional Scoring method to use for *Selector*. This parameter can be: * "r2", in which case use :math:`R^2` (the default) * "mse", in which case use mean square error scoring_cv : positive integer, optional Number of cross validation folds used in scoring (default 3). **Returns** out : object with "fit" and "predict" methods The created predictor object. """ fourier = Fourier(degree_range=fourier_degree, regressor=regressor) \ if use_baart else Fourier() pipeline = Pipeline([('Fourier', fourier), ('Regressor', regressor)]) if use_baart: return pipeline else: params = {'Fourier__degree': list(range(fourier_degree[0], fourier_degree[1]+1))} return Selector(pipeline, params, scoring=scoring, cv=scoring_cv, n_jobs=selector_processes)
[docs]def get_lightcurve(data, copy=False, name=None, predictor=None, periodogram=Lomb_Scargle, sigma_clipping=mad, scoring='r2', scoring_cv=3, scoring_processes=1, period=None, min_period=0.2, max_period=32, coarse_precision=1e-5, fine_precision=1e-9, period_processes=1, sigma=20, shift=None, min_phase_cover=0.0, min_observations=1, n_phases=100, verbosity=None, **kwargs): """get_lightcurve(data, copy=False, name=None, predictor=None, periodogram=Lomb_Scargle, sigma_clipping=mad, scoring='r2', scoring_cv=3, scoring_processes=1, period=None, min_period=0.2, max_period=32, coarse_precision=1e-5, fine_precision=1e-9, period_processes=1, sigma=20, shift=None, min_phase_cover=0.0, n_phases=100, verbosity=None, **kwargs) Fits a light curve to the given `data` using the specified methods, with default behavior defined for all methods. **Parameters** data : array-like, shape = [n_samples, 2] or [n_samples, 3] Photometry array with columns *time*, *magnitude*, and (optional) *error*. *time* should be unphased. name : string or None, optional Name of star being processed. predictor : object that has "fit" and "predict" methods, optional Object which fits the light curve obtained from *data* after rephasing (default ``make_predictor(scoring=scoring, scoring_cv=scoring_cv)``). periodogram : function, optional Function which finds one or more *period*\s. If *period* is already provided, the function is not used. Defaults to :func:`plotypus.periodogram.Lomb_Scargle` sigma_clipping : function, optional Function which takes an array and assigns sigma scores to each element. Defaults to :func:`plotypus.utils.mad`. scoring : str, optional Scoring method used by *predictor*. This parameter can be * "r2", in which case use :func:`R^2` (the default) * "mse", in which case use mean square error scoring_cv : positive integer, optional Number of cross validation folds used in scoring (default 3). scoring_processes : positive integer, optional Number of processes to use for scoring cross validation (default 1). period : number or None, optional Period of oscillation used in the fit. This parameter can be: * None, in which case the period is obtained with the given *periodogram* function (the default). * A single positive number, giving the period to phase *data*. min_period : non-negative number, optional Lower bound on period obtained by *periodogram* (default 0.2). max_period : non-negative number, optional Upper bound on period obtained by *periodogram* (default 32.0). course_precision : positive number, optional Precision used in first period search sweep (default 1e-5). fine_precision : positive number, optional Precision used in second period search sweep (default 1e-9). period_processes : positive integer, optional Number of processes to use for period finding (default 1). sigma : number, optional Upper bound on score obtained by *sigma_clipping* for a point to be considered an inlier. shift : number or None, optional Phase shift to apply to light curve if provided. Light curve is shifted such that max light occurs at ``phase[0]`` if None given (default None). min_phase_cover : number on interval [0, 1], optional Fraction of binned light curve that must contain points in order to proceed. If light curve has insufficient coverage, a warning is printed if "outlier" *verbosity* is on, and None is returned (default 0.0). n_phases : positive integer Number of equally spaced phases to predict magnitudes at (default 100) verbosity : list or None, optional Verbosity level. See :func:`plotypus.utils.verbose_print`. **Returns** out : dict Results of the fit in a dictionary. The keys are: * name : str or None The name of the star. * period : number The star's period. * lightcurve : array-like, shape = [n_phases] Magnitudes of fitted light curve sampled at sample phases. * coefficients : array-like, shape = [n_coeffs] Fitted light curve coefficients. * dA_0 : non-negative number Error on mean magnitude. * phased_data : array-like, shape = [n_samples] *data* transformed from temporal to phase space. * model : predictor object The predictor used to fit the light curve. * R2 : number The :math:`R^2` score of the fit. * MSE : number The mean square error of the fit. * degree : positive integer The degree of the Fourier fit. * shift : number The phase shift applied. * coverage : number on interval [0, 1] The light curve coverage. **See also** :func:`get_lightcurve_from_file` """ data = numpy.ma.array(data, copy=copy) phases = numpy.linspace(0, 1, n_phases, endpoint=False) # TODO ### # Replace dA_0 with error matrix dA if predictor is None: predictor = make_predictor(scoring=scoring, scoring_cv=scoring_cv) while True: signal = get_signal(data) if len(signal) <= scoring_cv: verbose_print( "{}: length of signal ({}) less than cv folds ({})".format( name, len(signal), scoring_cv), operation="coverage", verbosity=verbosity) return elif len(signal) < min_observations: verbose_print( "{}: length of signal ({}) " "less than min_observations ({})".format( name, len(signal), min_observations), operation="coverage", verbosity=verbosity) return # Find the period of the inliers if period is not None: _period = period else: verbose_print("{}: finding period".format(name), operation="period", verbosity=verbosity) _period = find_period(signal, min_period, max_period, coarse_precision, fine_precision, periodogram, period_processes) verbose_print("{}: using period {}".format(name, _period), operation="period", verbosity=verbosity) phase, mag, *err = rephase(signal, _period).T # TODO ### # Generalize number of bins to function parameter ``coverage_bins``, which # defaults to 100, the current hard-coded behavior # Determine whether there is sufficient phase coverage coverage = numpy.zeros((100)) for p in phase: coverage[int(floor(p*100))] = 1 coverage = sum(coverage)/100 if coverage < min_phase_cover: verbose_print("{}: {} {}".format(name, coverage, min_phase_cover), operation="coverage", verbosity=verbosity) verbose_print("Insufficient phase coverage", operation="outlier", verbosity=verbosity) return # Predict light curve with warnings.catch_warnings(record=True) as w: try: predictor = predictor.fit(colvec(phase), mag) except Warning: # not sure if this should be only in verbose mode print(name, w, file=stderr) return # Reject outliers and repeat the process if there are any if sigma: outliers = find_outliers(rephase(data.data, _period), predictor, sigma, sigma_clipping) num_outliers = sum(outliers)[0] if num_outliers == 0 or \ set.issubset(set(numpy.nonzero(outliers.T[0])[0]), set(numpy.nonzero(data.mask.T[0])[0])): data.mask = outliers break if num_outliers > 0: verbose_print("{}: {} outliers".format(name, sum(outliers)[0]), operation="outlier", verbosity=verbosity) data.mask = numpy.ma.mask_or(data.mask, outliers) # Build light curve and optionally shift to max light lightcurve = predictor.predict([[i] for i in phases]) if shift is None: arg_max_light = lightcurve.argmin() lightcurve = numpy.concatenate((lightcurve[arg_max_light:], lightcurve[:arg_max_light])) shift = arg_max_light/len(phases) data.T[0] = rephase(data.data, _period, shift).T[0] # Grab the coefficients from the model coefficients = predictor.named_steps['Regressor'].coef_ \ if isinstance(predictor, Pipeline) \ else predictor.best_estimator_.named_steps['Regressor'].coef_, # compute R^2 and MSE if they haven't already been # (one or zero have been computed, depending on the predictor) estimator = predictor.best_estimator_ \ if hasattr(predictor, 'best_estimator_') \ else predictor get_score = lambda scoring: predictor.best_score_ \ if hasattr(predictor, 'best_score_') \ and predictor.scoring == scoring \ else cross_val_score(estimator, colvec(phase), mag, cv=scoring_cv, scoring=scoring, n_jobs=scoring_processes).mean() return {'name': name, 'period': _period, 'lightcurve': lightcurve, 'coefficients': coefficients[0], 'dA_0': sem(lightcurve), 'phased_data': data, 'model': predictor, 'R2': get_score('r2'), 'MSE': abs(get_score('mean_squared_error')), 'degree': estimator.get_params()['Fourier__degree'], 'shift': shift, 'coverage': coverage}
[docs]def get_lightcurve_from_file(file, *args, use_cols=None, skiprows=0, verbosity=None, **kwargs): """get_lightcurve_from_file(file, *args, use_cols=None, skiprows=0, **kwargs) Fits a light curve to the data contained in *file* using :func:`get_lightcurve`. **Parameters** file : str or file File or filename to load data from. use_cols : iterable or None, optional Iterable of columns to read from data file, or None to read all columns (default None). skiprows : number, optional Number of rows to skip at beginning of *file* (default 0) **Returns** out : dict See :func:`get_lightcurve`. """ data = numpy.loadtxt(file, skiprows=skiprows, usecols=use_cols) if len(data) != 0: masked_data = numpy.ma.array(data=data, mask=None, dtype=float) return get_lightcurve(masked_data, *args, verbosity=verbosity, **kwargs) else: verbose_print("{}: file contains no data points".format(file), operation="coverage", verbosity=verbosity) return
## These functions were used briefly and then not maintained. ## Will make comebacks of some form in a later release. ## # def get_lightcurves_from_file(filename, directories, *args, **kwargs): # return [get_lightcurve_from_file(path.join(d, filename), *args, **kwargs) # for d in directories] # # # def single_periods(data, period, min_points=10, copy=False, *args, **kwargs): # data = numpy.ma.array(data, copy=copy) # time, mag, *err = data.T # # tstart, tfinal = numpy.min(time), numpy.max(time) # periods = numpy.arange(tstart, tfinal+period, period) # data_range = ( # data[numpy.logical_and(time>pstart, time<=pend),:] # for pstart, pend in zip(periods[:-1], periods[1:]) # ) # # return ( # get_lightcurve(d, period=period, *args, **kwargs) # for d in data_range # if d.shape[0] > min_points # ) # # # def single_periods_from_file(filename, *args, use_cols=(0, 1, 2), skiprows=0, # **kwargs): # data = numpy.ma.array(data=numpy.loadtxt(filename, usecols=use_cols, # skiprows=skiprows), # mask=None, dtype=float) # return single_periods(data, *args, **kwargs)
[docs]def find_outliers(data, predictor, sigma, method=mad): """find_outliers(data, predictor, sigma, method=mad) Returns a boolean array indicating the outliers in the given *data* array. **Parameters** data : array-like, shape = [n_samples, 2] or [n_samples, 3] Photometry array containing columns *phase*, *magnitude*, and (optional) *error*. predictor : object that has "fit" and "predict" methods, optional Object which fits the light curve obtained from *data* after rephasing. sigma : number Outlier cutoff criteria. method : function, optional Function to score residuals for outlier detection (default :func:`plotypus.utils.mad`). **Returns** out : array-like, shape = data.shape Boolean array indicating the outliers in the given *data* array. """ phase, mag, *err = data.T residuals = numpy.absolute(predictor.predict(colvec(phase)) - mag) outliers = numpy.logical_and((residuals > err[0]) if err else True, residuals > sigma * method(residuals)) return numpy.tile(numpy.vstack(outliers), data.shape[1])
[docs]def plot_lightcurve(name, lightcurve, period, data, output='.', legend=False, sanitize_latex=False, color=True, n_phases=100, err_const=0.005, **kwargs): """plot_lightcurve(name, lightcurve, period, data, output='.', legend=False, color=True, n_phases=100, err_const=0.005, **kwargs) Save a plot of the given *lightcurve* to directory *output*. **Parameters** name : str Name of the star. Used in filename and plot title. lightcurve : array-like, shape = [n_samples] Fitted lightcurve. period : number Period to phase time by. data : array-like, shape = [n_samples, 2] or [n_samples, 3] Photometry array containing columns *time*, *magnitude*, and (optional) *error*. *time* should be unphased. output : str, optional Directory to save plot to (default '.'). legend : boolean, optional Whether or not to display legend on plot (default False). color : boolean, optional Whether or not to display color in plot (default True). n_phases : integer, optional Number of phase points in fit (default 100). err_const : number, optional Constant to use in absence of error (default 0.005). **Returns** None """ phases = numpy.linspace(0, 1, n_phases, endpoint=False) ax = plt.gca() ax.invert_yaxis() plt.xlim(0,2) # Plot points used phase, mag, *err = get_signal(data).T error = err[0] if err else mag*err_const inliers = plt.errorbar(numpy.hstack((phase,1+phase)), numpy.hstack((mag, mag)), yerr=numpy.hstack((error, error)), ls='None', ms=.01, mew=.01, capsize=0) # Plot outliers rejected phase, mag, *err = get_noise(data).T error = err[0] if err else mag*err_const outliers = plt.errorbar(numpy.hstack((phase,1+phase)), numpy.hstack((mag, mag)), yerr=numpy.hstack((error, error)), ls='None', marker='o' if color else 'x', ms=.01 if color else 4, mew=.01 if color else 1, capsize=0 if color else 1) # Plot the fitted light curve signal, = plt.plot(numpy.hstack((phases,1+phases)), numpy.hstack((lightcurve, lightcurve)), linewidth=1) if legend: plt.legend([signal, inliers, outliers], ["Light Curve", "Inliers", "Outliers"], loc='best') plt.xlabel('Phase ({0:0.7} day period)'.format(period)) plt.ylabel('Magnitude') plt.title(utils.sanitize_latex(name) if sanitize_latex else name) plt.tight_layout(pad=0.1) make_sure_path_exists(output) plt.savefig(path.join(output, name)) plt.clf()