Source code for ruspy.estimation.estimation

"""
This module contains the main function for the estimation process.
"""
import time
from functools import partial

import nlopt
import numpy as np
from estimagic.optimization.optimize import minimize

try:
    import ipopt  # noqa:F401

    optional_package_is_available = True
except ImportError:
    optional_package_is_available = False

if optional_package_is_available:
    from ipopt import minimize_ipopt
from scipy.optimize._numdiff import approx_derivative

from ruspy.model_code.cost_functions import calc_obs_costs
from ruspy.model_code.fix_point_alg import calc_fixp
from ruspy.estimation import config
from ruspy.estimation.est_cost_params import create_state_matrix
from ruspy.estimation.estimation_interface import select_model_parameters
from ruspy.estimation.estimation_interface import select_optimizer_options
from ruspy.estimation.estimation_transitions import create_transition_matrix
from ruspy.estimation.estimation_transitions import estimate_transitions
from ruspy.estimation.mpec import mpec_constraint
from ruspy.estimation.mpec import mpec_constraint_derivative
from ruspy.estimation.mpec import mpec_loglike_cost_params
from ruspy.estimation.mpec import mpec_loglike_cost_params_derivative
from ruspy.estimation.mpec import wrap_ipopt_constraint
from ruspy.estimation.mpec import wrap_ipopt_likelihood
from ruspy.estimation.mpec import wrap_mpec_loglike


[docs]def estimate( init_dict, df, ): """ Estimation function of ruspy. This function coordinates the estimation process of the ruspy package. Parameters ---------- init_dict : dictionary see :ref:`init_dict` df : pandas.DataFrame see :ref:`df` Returns ------- results_transition_params : dictionary see :ref:`result_trans` results_cost_params : dictionary see :ref:`result_costs` """ transition_results = estimate_transitions(df) endog = df.loc[(slice(None), slice(1, None)), "decision"].to_numpy(int) states = df.loc[(slice(None), slice(1, None)), "state"].to_numpy(int) args = ( disc_fac, num_states, maint_func, maint_func_dev, num_params, scale, ) = select_model_parameters(init_dict) decision_mat = np.vstack(((1 - endog), endog)) trans_mat = create_transition_matrix(num_states, np.array(transition_results["x"])) state_mat = create_state_matrix(states, num_states) optimizer_options = select_optimizer_options(init_dict, num_params, num_states) if "approach" in init_dict["optimizer"]: approach = init_dict["optimizer"]["approach"] else: raise ValueError("The key 'approach' must be in the optimizer dictionairy") if approach == "NFXP": alg_details = {} if "alg_details" not in init_dict else init_dict["alg_details"] results_transition_params, results_cost_params = estimate_nfxp( *args, decision_mat, trans_mat, state_mat, optimizer_options, transition_results, alg_details, ) elif approach == "MPEC" and optimizer_options["algorithm"] == "ipopt": results_transition_params, results_cost_params = estimate_mpec_ipopt( *args, decision_mat, trans_mat, state_mat, optimizer_options, transition_results, ) elif approach == "MPEC" and optimizer_options["algorithm"] != "ipopt": results_transition_params, results_cost_params = estimate_mpec_nlopt( *args, decision_mat, trans_mat, state_mat, optimizer_options, transition_results, ) else: raise ValueError( f"{approach} is not implemented. Only MPEC or NFXP are valid choices" ) return results_transition_params, results_cost_params
[docs]def estimate_nfxp( disc_fac, num_states, maint_func, maint_func_dev, num_params, scale, decision_mat, trans_mat, state_mat, optimizer_options, transition_results, alg_details, ): """ Estimation function for the nested fixed point algorithm in ruspy. Parameters ---------- disc_fac : numpy.float see :ref:`disc_fac` num_states : int The size of the state space. maint_func: func see :ref: `maint_func` maint_func_dev: func see :ref: `maint_func_dev` num_params : int The number of parameters to be estimated. scale : numpy.float see :ref:`scale` decision_mat : numpy.array see :ref:`decision_mat` trans_mat : numpy.array see :ref:`trans_mat` state_mat : numpy.array see :ref:`state_mat` optimizer_options : dict The options chosen for the optimization algorithm in the initialization dictionairy. transition_results : dict The results from ``estimate_transitions``. alg_details : dict see :ref: `alg_details` Returns ------- transition_results : dictionary see :ref:`result_trans` result_cost_params : dictionary see :ref:`result_costs` """ criterion_temp = optimizer_options.pop("criterion") n_evaluations, criterion = wrap_nfxp_criterion(criterion_temp) kwargs = { "maint_func": maint_func, "maint_func_dev": maint_func_dev, "num_states": num_states, "trans_mat": trans_mat, "state_mat": state_mat, "decision_mat": decision_mat, "disc_fac": disc_fac, "scale": scale, "alg_details": alg_details, } result_cost_params = {} tic = time.perf_counter() min_result = minimize( criterion, criterion_kwargs=kwargs, gradient_kwargs=kwargs, **optimizer_options, ) toc = time.perf_counter() timing = toc - tic result_cost_params["x"] = min_result[1]["value"].to_numpy() result_cost_params["fun"] = min_result[0]["fitness"] if min_result[0]["status"] == "success": result_cost_params["status"] = 1 else: result_cost_params["status"] = 0 result_cost_params["message"] = min_result[0]["message"] result_cost_params["jac"] = min_result[0]["jacobian"] result_cost_params["n_evaluations"] = min_result[0]["n_evaluations"] result_cost_params["n_iterations"] = min_result[0]["n_iterations"] result_cost_params["n_contraction_steps"] = config.total_contr_count result_cost_params["n_newt_kant_steps"] = config.total_newt_kant_count result_cost_params["time"] = timing config.total_contr_count = 0 config.total_newt_kant_count = 0 return transition_results, result_cost_params
[docs]def estimate_mpec_nlopt( disc_fac, num_states, maint_func, maint_func_dev, num_params, scale, decision_mat, trans_mat, state_mat, optimizer_options, transition_results, ): """ Estimation function of Mathematical Programming with Equilibrium Constraints (MPEC) in ruspy. Parameters ---------- disc_fac : numpy.float see :ref:`disc_fac` num_states : int The size of the state space. maint_func: func see :ref: `maint_func` maint_func_dev: func see :ref: `maint_func_dev` num_params : int The number of parameters to be estimated. scale : numpy.float see :ref:`scale` decision_mat : numpy.array see :ref:`decision_mat` trans_mat : numpy.array see :ref:`trans_mat` state_mat : numpy.array see :ref:`state_mat` optimizer_options : dict The options chosen for the optimization algorithm in the initialization dictionairy. transition_results : dict The results from ``estimate_transitions``. Returns ------- transition_results : dictionary see :ref:`result_trans` mpec_cost_parameters : dictionary see :ref:`result_costs` """ gradient = optimizer_options.pop("gradient") # Calculate partial functions needed for nlopt n_evaluations, partial_loglike_mpec = wrap_mpec_loglike( args=( maint_func, maint_func_dev, num_states, num_params, state_mat, decision_mat, disc_fac, scale, gradient, ) ) partial_constr_mpec = partial( mpec_constraint, maint_func, maint_func_dev, num_states, num_params, trans_mat, disc_fac, scale, gradient, ) # set up nlopt opt = nlopt.opt( eval("nlopt." + optimizer_options.pop("algorithm")), num_states + num_params ) opt.set_min_objective(partial_loglike_mpec) opt.add_equality_mconstraint( partial_constr_mpec, np.full(num_states, 1e-6), ) # supply user choices params = optimizer_options.pop("params") if "get_expected_values" in optimizer_options: get_expected_values = optimizer_options.pop("get_expected_values") else: get_expected_values = "No" if "set_local_optimizer" in optimizer_options: sub = nlopt.opt( # noqa: F841 eval("nlopt." + optimizer_options.pop("set_local_optimizer")), num_states + num_params, ) exec("opt.set_local_optimizer(sub)") for key, _value in optimizer_options.items(): exec("sub." + key + "(_value)") for key, _value in optimizer_options.items(): exec("opt." + key + "(_value)") # Solving nlopt tic = time.perf_counter() if get_expected_values == "Yes": obs_costs = calc_obs_costs(num_states, maint_func, params, scale) ev = calc_fixp(trans_mat, obs_costs, disc_fac)[0] params = np.concatenate((ev, params)) result = opt.optimize(params) toc = time.perf_counter() timing = toc - tic mpec_cost_parameters = {} mpec_cost_parameters["x"] = result mpec_cost_parameters["fun"] = opt.last_optimum_value() if opt.last_optimize_result() > 0: mpec_cost_parameters["status"] = True else: mpec_cost_parameters["status"] = False mpec_cost_parameters["n_iterations"] = opt.get_numevals() mpec_cost_parameters["n_evaluations"] = n_evaluations[0] mpec_cost_parameters["reason"] = opt.last_optimize_result() mpec_cost_parameters["time"] = timing return transition_results, mpec_cost_parameters
[docs]def estimate_mpec_ipopt( disc_fac, num_states, maint_func, maint_func_dev, num_params, scale, decision_mat, trans_mat, state_mat, optimizer_options, transition_results, ): """ Estimation function of Mathematical Programming with Equilibrium Constraints (MPEC) in ruspy. Parameters ---------- disc_fac : numpy.float see :ref:`disc_fac` num_states : int The size of the state space. maint_func: func see :ref: `maint_func` maint_func_dev: func see :ref: `maint_func_dev` num_params : int The number of parameters to be estimated. scale : numpy.float see :ref:`scale` decision_mat : numpy.array see :ref:`decision_mat` trans_mat : numpy.array see :ref:`trans_mat` state_mat : numpy.array see :ref:`state_mat` optimizer_options : dict The options chosen for the optimization algorithm in the initialization dictionairy. transition_results : dict The results from ``estimate_transitions``. Returns ------- transition_results : dictionary see :ref:`result_trans` mpec_cost_parameters : dictionary see :ref:`result_costs` """ if not optional_package_is_available: raise NotImplementedError( """To use this you need to install cyipopt. If you are mac or Linux user the command is $ conda install -c conda-forge cyipopt. If you use Windows you have to install from source. A description can be found here: https://github.com/matthias-k/cyipopt""" ) del optimizer_options["algorithm"] gradient = optimizer_options.pop("gradient") params = optimizer_options.pop("params") lower_bounds = optimizer_options.pop("set_lower_bounds") upper_bounds = optimizer_options.pop("set_upper_bounds") bounds = np.vstack((lower_bounds, upper_bounds)).T bounds = list(map(tuple, bounds)) if "get_expected_values" in optimizer_options: get_expected_values = optimizer_options.pop("get_expected_values") else: get_expected_values = "No" n_evaluations, neg_criterion = wrap_ipopt_likelihood( mpec_loglike_cost_params, args=( maint_func, maint_func_dev, num_states, num_params, state_mat, decision_mat, disc_fac, scale, ), ) constraint_func = wrap_ipopt_constraint( mpec_constraint, args=( maint_func, maint_func_dev, num_states, num_params, trans_mat, disc_fac, scale, ), ) if gradient == "No": def approx_gradient(params): fun = approx_derivative(neg_criterion, params, method="2-point") return fun gradient_func = approx_gradient def approx_jacobian(params): fun = approx_derivative(constraint_func, params, method="2-point") return fun jacobian_func = approx_jacobian else: gradient_func = partial( mpec_loglike_cost_params_derivative, maint_func, maint_func_dev, num_states, num_params, disc_fac, scale, decision_mat, state_mat, ) jacobian_func = partial( mpec_constraint_derivative, maint_func, maint_func_dev, num_states, num_params, disc_fac, scale, trans_mat, ) constraints = { "type": "eq", "fun": constraint_func, "jac": jacobian_func, } tic = time.perf_counter() if get_expected_values == "Yes": obs_costs = calc_obs_costs(num_states, maint_func, params, scale) ev = calc_fixp(trans_mat, obs_costs, disc_fac)[0] params = np.concatenate((ev, params)) results_ipopt = minimize_ipopt( neg_criterion, params, bounds=bounds, jac=gradient_func, constraints=constraints, **optimizer_options, ) toc = time.perf_counter() timing = toc - tic mpec_cost_parameters = {} mpec_cost_parameters["x"] = results_ipopt["x"] mpec_cost_parameters["fun"] = results_ipopt["fun"] if results_ipopt["success"] is True: mpec_cost_parameters["status"] = True else: mpec_cost_parameters["status"] = False mpec_cost_parameters["n_iterations"] = results_ipopt["nit"] mpec_cost_parameters["n_evaluations"] = results_ipopt["nfev"] mpec_cost_parameters["time"] = timing mpec_cost_parameters["n_evaluations_total"] = n_evaluations[0] return transition_results, mpec_cost_parameters
def wrap_nfxp_criterion(function): ncalls = [0] def function_wrapper(*wrapper_args, **wrapper_kwargs): ncalls[0] += 1 return function(*wrapper_args, **wrapper_kwargs) return ncalls, function_wrapper