Source code for ruspy.estimation.mpec

"""
This module contains all the key functions used to estimate the model using MPEC.
"""
from functools import partial

import numpy as np
from scipy.optimize._numdiff import approx_derivative

from ruspy.estimation.est_cost_params import like_hood_data
from ruspy.model_code.choice_probabilities import choice_prob_gumbel
from ruspy.model_code.cost_functions import calc_obs_costs


[docs]def mpec_loglike_cost_params( maint_func, maint_func_dev, num_states, num_params, state_mat, decision_mat, disc_fac, scale, gradient, mpec_params, grad, ): """ Calculate the negative partial log likelihood for MPEC depending on cost parameters as well as the discretized expected values. Parameters ---------- maint_func: func see :ref:`maint_func` maint_func_dev: func see :ref:`maint_func` num_states : int The size of the state space. num_params : int Length of cost parameter vector. state_mat : numpy.array see :ref:`state_mat` decision_mat : numpy.array see :ref:`decision_mat` disc_fac : numpy.float see :ref:`disc_fac` scale : numpy.float see :ref:`scale` gradient : str Indicates whether analytical or numerical gradient should be used. mpec_params : numpy.array see :ref:`mpec_params` grad : numpy.array, optional The gradient of the function. The default is np.array([]). Returns ------- log_like: float Contains the negative partial log likelihood for the given parameters. """ if grad.size > 0: if gradient == "No": # numerical gradient partial_loglike_mpec = partial( mpec_loglike_cost_params, maint_func, maint_func_dev, num_states, num_params, state_mat, decision_mat, disc_fac, scale, gradient, grad=np.array([]), ) grad[:] = approx_derivative( partial_loglike_mpec, mpec_params, method="2-point" ) else: # analytical gradient grad[:] = mpec_loglike_cost_params_derivative( maint_func, maint_func_dev, num_states, num_params, disc_fac, scale, decision_mat, state_mat, mpec_params, ) costs = calc_obs_costs(num_states, maint_func, mpec_params[num_states:], scale) p_choice = choice_prob_gumbel(mpec_params[0:num_states], costs, disc_fac) log_like = like_hood_data(np.log(p_choice), decision_mat, state_mat) return float(log_like)
[docs]def mpec_constraint( maint_func, maint_func_dev, num_states, num_params, trans_mat, disc_fac, scale, gradient, result, mpec_params, grad, ): """ Calulate the constraint of MPEC. Parameters ---------- maint_func: func see :ref:`maint_func` maint_func_dev: func see :ref:`maint_func` num_states : int The size of the state space. num_params : int The number of parameters to be estimated. trans_mat : numpy.array see :ref:`trans_mat` disc_fac : numpy.float see :ref:`disc_fac` scale : numpy.float see :ref:`scale` gradient : str Indicates whether analytical or numerical gradient should be used. result : numpy.array Contains the left hand side of the constraint minus the right hand side for the nlopt solver. This should be zero for the constraint to hold. mpec_params : numpy.array see :ref:`mpec_params` grad : numpy.array, optional The gradient of the function. The default is np.array([]). Returns ------- None. """ if grad.size > 0: if gradient == "No": # numerical jacobian partial_constr_mpec_deriv = wrap_nlopt_constraint( mpec_constraint, args=( maint_func, maint_func_dev, num_states, num_params, trans_mat, disc_fac, scale, gradient, ), ) grad[:, :] = approx_derivative( partial_constr_mpec_deriv, mpec_params, method="2-point" ) else: # analytical jacobian grad[:, :] = mpec_constraint_derivative( maint_func, maint_func_dev, num_states, num_params, disc_fac, scale, trans_mat, mpec_params, ) ev = mpec_params[0:num_states] obs_costs = calc_obs_costs(num_states, maint_func, mpec_params[num_states:], scale) maint_value = disc_fac * ev - obs_costs[:, 0] repl_value = disc_fac * ev[0] - obs_costs[0, 1] - obs_costs[0, 0] # Select the minimal absolute value to rescale the value vector for the # exponential function. ev_max = np.max(np.array(maint_value, repl_value)) log_sum = ev_max + np.log( np.exp(maint_value - ev_max) + np.exp(repl_value - ev_max) ) ev_new = np.dot(trans_mat, log_sum) if result.size > 0: result[:] = ev_new - ev return ev_new - ev
[docs]def mpec_loglike_cost_params_derivative( maint_func, maint_func_dev, num_states, num_params, disc_fac, scale, decision_mat, state_mat, mpec_params, ): """ Computing the analytical gradient of the objective function for MPEC. Parameters ---------- maint_func: func see :ref:`maint_func` maint_func_dev: func see :ref:`maint_func` num_states : int The size of the state space. num_params : int Length of cost parameter vector. disc_fac : numpy.float see :ref:`disc_fac` scale : numpy.float see :ref:`scale` decision_mat : numpy.array see :ref:`decision_mat` state_mat : numpy.array see :ref:`state_mat` mpec_params : numpy.array see :ref:`mpec_params` Returns ------- gradient : numpy.array Vector that holds the derivative of the negative log likelihood function to the parameters. """ # Calculate choice probabilities costs = calc_obs_costs(num_states, maint_func, mpec_params[num_states:], scale) p_choice = choice_prob_gumbel(mpec_params[0:num_states], costs, disc_fac) # calculate the derivative based on the model derivative_both = mpec_loglike_cost_params_derivative_model( num_states, num_params, disc_fac, scale, maint_func_dev, p_choice ) # Calculate actual gradient depending on the given data # get decision matrix into the needed shape decision_mat_temp = np.vstack( ( np.tile(decision_mat[0], (num_states + num_params, 1)), np.tile(decision_mat[1], (num_states + num_params, 1)), ) ) # calculate the gradient gradient_temp = -np.sum( decision_mat_temp * np.dot(derivative_both, state_mat), axis=1 ) # bring the calculated gradient into the correct shape gradient = np.reshape(gradient_temp, (num_states + num_params, 2), order="F").sum( axis=1 ) return gradient
[docs]def mpec_constraint_derivative( maint_func, maint_func_dev, num_states, num_params, disc_fac, scale, trans_mat, mpec_params, ): """ Calculating the analytical Jacobian of the MPEC constraint. Parameters ---------- maint_func: func see :ref:`maint_func` maint_func_dev: func see :ref:`maint_func` num_states : int The size of the state space. num_params : int Length of cost parameter vector. disc_fac : numpy.float see :ref:`disc_fac` scale : numpy.float see :ref:`scale` trans_mat : numpy.array see :ref:`trans_mat` mpec_params : numpy.array see :ref:`mpec_params` Returns ------- jacobian : numpy.array Jacobian of the MPEC constraint. """ # Calculate a vector representing 1 divided by the right hand side of the MPEC # constraint ev = mpec_params[0:num_states] obs_costs = calc_obs_costs(num_states, maint_func, mpec_params[num_states:], scale) maint_value = disc_fac * ev - obs_costs[:, 0] repl_value = disc_fac * ev[0] - obs_costs[0, 1] - obs_costs[0, 0] ev_max = np.max(np.array(maint_value, repl_value)) exp_centered_maint_value = np.exp(maint_value - ev_max) exp_centered_repl_value = np.exp(repl_value - ev_max) log_sum_denom = 1 / (exp_centered_maint_value + exp_centered_repl_value) jacobian = np.zeros((num_states, num_states + num_params)) # Calculate derivative to EV(0) jacobian[:, 0] = np.dot( disc_fac * exp_centered_repl_value * trans_mat, log_sum_denom ) jacobian[0, 0] = ( jacobian[0, 0] + (1 - log_sum_denom[0] * exp_centered_repl_value) * disc_fac * trans_mat[0, 0] ) # Calculate derivative to EV(1) until EV(num_states) jacobian[:, 1:num_states] = ( trans_mat[:, 1:] * log_sum_denom[1:] * disc_fac * exp_centered_maint_value[1:] ) # Calculate derivative to RC jacobian[:, num_states] = np.dot( trans_mat, -exp_centered_repl_value * log_sum_denom ) # Calculate derivative to maintenance cost parameters log_sum_denom_temp = np.reshape(log_sum_denom, (num_states, 1)) maint_cost_difference_dev = np.reshape( (-exp_centered_maint_value * maint_func_dev(num_states, scale).T).T - exp_centered_repl_value * maint_func_dev(num_states, scale)[0], (num_states, num_params - 1), ) jacobian[:, num_states + 1 :] = np.reshape( np.dot(trans_mat, log_sum_denom_temp * maint_cost_difference_dev), (num_states, num_params - 1), ) # Calculate the Jacobian of EV ev_jacobian = np.hstack((np.eye(num_states), np.zeros((num_states, num_params)))) jacobian = jacobian - ev_jacobian return jacobian
def mpec_loglike_cost_params_derivative_model( num_states, num_params, disc_fac, scale, maint_func_dev, p_choice ): """ generates the derivative of the log likelihood function of mpec depending on the model characteristics. Parameters ---------- num_states : int The size of the state space. num_params : int Length of cost parameter vector. disc_fac : numpy.float see :ref:`disc_fac` scale : numpy.float see :ref:`scale` maint_func_dev : func see :ref:`maint_func` p_choice : np.array num_states x 2 matrix that contains the calculated conditional choice probabilities. Returns ------- derivative_both : np.array gives out the derivative of the log likelihood function depending on the model characteristics. """ # Create matrix that represents d[V(0)-V(x)]/ d[theta] (depending on x) payoff_difference_derivative = np.zeros((num_states + num_params, num_states)) payoff_difference_derivative[0, 1:] = disc_fac payoff_difference_derivative[1:num_states, 1:] = -disc_fac * np.eye(num_states - 1) payoff_difference_derivative[num_states, :] = -1 payoff_difference_derivative[num_states + 1 :, :] = ( -maint_func_dev(num_states, scale)[0] + maint_func_dev(num_states, scale) ).T # Calculate derivative depending on whether d is 0 or 1 derivative_d0 = -payoff_difference_derivative * p_choice[:, 1] derivative_d1 = payoff_difference_derivative * p_choice[:, 0] derivative_both = np.vstack((derivative_d0, derivative_d1)) return derivative_both def wrap_mpec_loglike(args): ncalls = [0] def function_wrapper(*x0): ncalls[0] += 1 return mpec_loglike_cost_params(*(args + x0)) return ncalls, function_wrapper def wrap_nlopt_likelihood(function, args): def function_wrapper(mpec_params, grad): result = function(mpec_params, *args, grad) return result return function_wrapper def wrap_nlopt_constraint(function, args): def function_wrapper(mpec_params): result = function( result=np.array([]), mpec_params=mpec_params, *args, grad=np.array([]) ) return result return function_wrapper def wrap_ipopt_likelihood(function, args): ncalls = [0] def function_wrapper(mpec_params): ncalls[0] += 1 return function( *args, gradient=None, mpec_params=mpec_params, grad=np.array([]) ) return ncalls, function_wrapper def wrap_ipopt_constraint(function, args): def function_wrapper(mpec_params): result = function( *args, gradient=None, result=np.array([]), mpec_params=mpec_params, grad=np.array([]), ) return result return function_wrapper