Source code for rtctools_hydraulic_structures.pumping_station_mixin

import re
import numpy as np
import pickle
import logging
import sys
import os
from numpy import inf
from casadi import SX, MX, vertcat, MXFunction, NlpSolver, jacobian, hessian, det, substitute, trace
from rtctools.optimization.timeseries import Timeseries
from rtctools.optimization.optimization_problem import OptimizationProblem
from rtctools.optimization.goal_programming_mixin import Goal
from abc import abstractmethod
from .util import _ObjectParameterWrapper

logger = logging.getLogger("rtctools")


[docs]class Pump(_ObjectParameterWrapper): """ Python Pump object as an interface to the :cpp:class:`~Deltares::HydraulicStructures::PumpingStation::Pump` object in the model. """ def __init__(self, optimization_problem, symbol): super(Pump, self).__init__(optimization_problem) self.optimization_problem = optimization_problem self.symbol = symbol
[docs] def discharge(self): """ Get the state corresponding to the pump discharge. :returns: `MX` expression of the pump discharge. """ # TODO: We would rather use self.symbol + ".Q" as the control # variable, but only top level input variables are allowed. We # therefore use the convention that a symbol exists where all dots are # replaced with underscores. return self.optimization_problem.state(self.symbol.replace('.', '_') + '_Q')
[docs] def head(self): """ Get the state corresponding to the pump head. This depends on the ``head_option`` that was specified by the user. :returns: `MX` expression of the pump head. """ head_option = int(self.optimization_problem.parameters(0)[self.symbol + '.head_option']) if head_option == -1: return self.optimization_problem.state(self.symbol + '.HQUp.H') elif head_option == 0: return (self.optimization_problem.state(self.symbol + '.HQDown.H') - self.optimization_problem.state(self.symbol + '.HQUp.H')) elif head_option == 1: return self.optimization_problem.state(self.symbol + '.HQDown.H') else: raise ValueError( "A pump's head_option should be either -1, 0 or 1. Found a value of '{}'.".format(head_option))
[docs]class Resistance(_ObjectParameterWrapper): """ Python Resistance object as an interface to the :cpp:class:`~Deltares::HydraulicStructures::PumpingStation::Resistance` object in the model. """ def __init__(self, optimization_problem, symbol): super(Resistance, self).__init__(optimization_problem) self.optimization_problem = optimization_problem self.symbol = symbol
[docs] def discharge(self): """ Get the state corresponding to the discharge through the resistance. :returns: `MX` expression of the discharge. """ return self.optimization_problem.state(self.symbol + '.HQUp.Q')
[docs] def head_loss(self): """ Get the state corresponding to the head loss over the resistance. :returns: `MX` expression of the head loss. """ # Can't we use the dot notation instead, as the two are equated in the # Modelica model anyway? return self.optimization_problem.state(self.symbol.replace('.', '_') + '_dH')
[docs]class PumpingStation(_ObjectParameterWrapper): """ Python PumpingStation object as an interface to the :cpp:class:`~Deltares::HydraulicStructures::PumpingStation::PumpingStation` object in the model. """
[docs] def __init__(self, optimization_problem, symbol, pump_symbols=None, **kwargs): """ Initialize the pumping station object. :param optimization_problem: :py:class:`~rtctools.optimization.optimization_problem.OptimizationProblem` instance. :param symbol: Symbol name of the pumping station in the model. :param pump_symbols: Symbol names of the pumps in the pumping station. """ super(PumpingStation, self).__init__(optimization_problem) self.optimization_problem = optimization_problem self.symbol = symbol # NOTE: We use pump symbols to guarantee the order in which we process # the pumps. This is important for e.g. the pump switching matrix, # where we need to know what row represents what pump. self.pump_symbols = pump_symbols self._pumps = None self._resistances = None
[docs] def pumps(self): """ Get a list of :py:class:`Pump` objects that are part of this pumping station in the model. :returns: List of :py:class:`Pump` objects. """ if self._pumps is None: if self.pump_symbols is None: # TODO: Until we are able to guarantee an order in Modelica, we can only come here # if the pump switching matrix is all zeros. matrix = self.pump_switching_matrix if not np.all(matrix == 0): raise Exception("Automatic finding of pumps not allowed with non-zero switching matrix.") _pump_symbols = set() for x in self.optimization_problem.parameters(0).keys(): m = re.search(r'({}\..+?)\.working_area\['.format(self.symbol), x) if m is None: continue else: _pump_symbols.add(m.group(1)) self.pump_symbols = list(_pump_symbols) self._pumps = list((Pump(self.optimization_problem, x) for x in self.pump_symbols)) return self._pumps
[docs] def resistances(self): """ Get a list of :py:class:`Resistance` objects that are part of this pumping station in the model. :returns: List of :py:class:`Resistance` objects. """ if self._resistances is None: _resist_symbols = set() for x in self.optimization_problem.parameters(0).keys(): # TODO: Isn't there a better way to find these components # instead of looking for some type of signature (which can # change). m = re.search(r'({}\..+?)\.C'.format(self.symbol), x) if m is None: continue else: _resist_symbols.add(m.group(1)) self._resistances = list((Resistance(self.optimization_problem, x) for x in _resist_symbols)) return self._resistances
@property def pump_switching_matrix(self): # TODO: Move default values to Modelica, and delete this property # method (i.e. let it be handled automatically by __getattr__) # TODO: For some reason using super() does not work. Why? matrix = _ObjectParameterWrapper.__getattr__(self, 'pump_switching_matrix') # FIXME: Detect placeholder array for JModelica workaround if np.all(matrix == -999): matrix = np.tril(np.ones(matrix.shape), -1) for i in range(matrix.shape[0]): matrix[i, i] = -1 * sum(matrix[i, :]) # Only lower triangle matrices are allowed if not (np.tril(matrix) == matrix).all(): raise Exception("Switching matrices may only contain a non-zeros in the lower triangle.") return matrix @property def pump_switching_constraints(self): # TODO: Move default values to Modelica, and delete this property # method (i.e. let it be handled automatically by __getattr__) constraints = _ObjectParameterWrapper.__getattr__(self, 'pump_switching_constraints') # FIXME: Detect placeholder array for JModelica workaround if np.all(constraints == -999): constraints = np.transpose([np.zeros(self.n_pumps), range(int(self.n_pumps))]) return constraints
class _MinimizePowerGoal(Goal): def __init__(self, optimization_problem, *args, **kwargs): super(_MinimizePowerGoal, self).__init__(*args, **kwargs) energy_price = optimization_problem.get_timeseries('energy_price').values avg_energy_price = sum(energy_price) / len(energy_price) # For maximum power of each pump we use the (overestimated) big M avg_pump_powers = [float(max_power/2) for _, max_power in optimization_problem._pump_power_range_on.values()] total_avg_pump_power = sum(avg_pump_powers) nominal_instantaneous = total_avg_pump_power * avg_energy_price self.function_nominal = nominal_instantaneous * (optimization_problem.times()[-1] - optimization_problem.times()[0]) def function(self, o, ensemble_member): costs = 0.0 times = o.times() for ps in o.pumping_stations(): for p in ps.pumps(): for ts, tf in zip(times[:-1], times[1:]): tstep = tf - ts # TODO: Not pretty to use the same formatting again # Pump power costs += tstep * o.state_at('{}__power'.format(p.symbol), tf) * o.timeseries_at('energy_price', tf) # Start-up energy if p.start_up_energy > 0.0: costs += tstep * o.state_at('{}__switched_on'.format(p.symbol), tf) \ * p.start_up_energy \ * o.timeseries_at('energy_price', tf) # Fixed start-up costs (other than energy) if p.start_up_cost > 0.0: costs += tstep * o.state_at('{}__switched_on'.format(p.symbol), tf) \ * p.start_up_cost # Shut-down energy if p.shut_down_energy > 0.0: costs += tstep * o.state_at('{}__switched_off'.format(p.symbol), tf) \ * p.shut_down_energy \ * o.timeseries_at('energy_price', tf) # Fixed shut-down costs (other than energy) if p.shut_down_cost > 0.0: costs += tstep * o.state_at('{}__switched_off'.format(p.symbol), tf) \ * p.shut_down_cost return costs function_range = (0, 1) priority = 999 order = 1 class _InvalidCacheError(Exception): pass
[docs]class PumpingStationMixin(OptimizationProblem): """ Adds handling of PumpingStation objects in your model to your optimization problem. Relevant parameters and variables are read from the model, and from this data a set of constraints and objectives are automatically generated to minimize cost. """ # TODO: Vijzels are different in that the pump head is just the upstream # head, and the discharge/working area is a non-smooth function that does # not fit a polynomial well. How to handle these? _pumping_station_mx_path_variables = set() # In the post() routine we check if the non-linear equality constraints # (e.g. pump power and resistance head loss) minimized to their equality # constraint. A warning is raised if the relative error exceeds this value: _ineq_relative_error = 0.001 def __init__(self, *args, **kwargs): super(PumpingStationMixin, self).__init__(*args, **kwargs) # We also want to output a bunch of variables that are calculated in # post(), which we return in the extract_results() call. We store them # in this dictionary. self.__additional_results = {} assert 'model_folder' in kwargs self.__model_folder = kwargs['model_folder'] self._hq_subproblem_cache = {} self._hq_subproblem_cache_path = os.path.join(kwargs['model_folder'], '_hq_subproblem_cache.pickle') def pre(self): super(PumpingStationMixin, self).pre() # We cannot handle non-equidistant timesteps yet tsteps = np.diff(self.times()) if not np.all(tsteps == tsteps[0]): raise Exception("Timesteps need to be equidistant.") # TODO: Is a reset necessary? It is a bit double with the statements above. self._pumping_station_mx_path_variables = set() self._pump_discrete_symbols = set() self._pump_status_bounds = {} self._pump_power_bounds = {} self._pump_discharge_bounds = {} self._pump_status_pairs = {} self._pump_power_range_on = {} self._pump_working_area_head_range = {} self._pump_extended_working_area_head_range = {} self._head_range_up = {} self._head_range_down = {} self._head_range_diff = {} # Uses the same mapping as Pump.head_option for easy access self._head_range = {-1: self._head_range_up, 0: self._head_range_diff, 1: self._head_range_down} # Check validity of HQ subproblem cache. We check if _any_ file other # than ourself in the model folder is newer. try: # Calling getmtime() on a file that does not exist returns a vague # platform varying Exception. Try to prevent that from happening. if not os.path.exists(self._hq_subproblem_cache_path): raise _InvalidCacheError("Cache file does not exist") cache_mtime = os.path.getmtime(self._hq_subproblem_cache_path) cache_abspath = os.path.abspath(self._hq_subproblem_cache_path) for root, dir, files in os.walk(self.__model_folder): for f in files: f_abspath = os.path.abspath(os.path.join(root, f)) if cache_abspath != f_abspath and os.path.getmtime(f_abspath) > cache_mtime: raise _InvalidCacheError("Cache no longer valid") with open(self._hq_subproblem_cache_path) as f: self._hq_subproblem_cache = pickle.load(f) except _InvalidCacheError: self._hq_subproblem_cache = {} # Automatic deriviation of maximum head over pumping station. Note # that it is possible that different pumps use different head # definitions (up, down, diff) for the maximum head of the enclosing # pumping station. for ps in self.pumping_stations(): symbol_up = ps.symbol + ".HQUp.H" symbol_down = ps.symbol + ".HQDown.H" # Use bounds if specified, otherwise try deriving bounds from time series for hr, s in [(self._head_range_up, symbol_up), (self._head_range_down, symbol_down)]: m, M = self.bounds().get(s, [None, None]) try: # Find the actual symbol (due to aliasing) mapped_symbol = self.variable(s).getName() ts = self.get_timeseries(mapped_symbol) if m is None or not np.isfinite(m): # Attempt to avoid using time series used as initial conditions only assert not any(np.isnan(ts.values)) m = min(ts.values) logger.info('Using minimum value "{}" in time series "{}" as lower bound for "{}".'.format( m, mapped_symbol, s)) if M is None or not np.isfinite(M): # Attempt to avoid using time series used as initial conditions only assert not any(np.isnan(ts.values)) M = max(ts.values) logger.info('Using maximum value "{}" in time series "{}" as upper bound for "{}".'.format( M, mapped_symbol, s)) except KeyError: # Time series does not exist pass if m is None or M is None or not np.isfinite(m) or not np.isfinite(M): raise Exception( "Specify (finite) bounds or time series for '{}', currently found {}".format(s, (m, M))) hr[ps.symbol] = (m, M) self._head_range_diff[ps.symbol] = [a - b for a, b in zip(self._head_range_down[ps.symbol], reversed(self._head_range_up[ps.symbol]))] # Automatic derivation of discharge and power if not specified by user for ps in self.pumping_stations(): for p in ps.pumps(): discharge_sym = p.symbol.replace('.', '_') + "_Q" head_sym = p.symbol + "_head" power_sym = '{}__power'.format(p.symbol) # Find the maximum discharge and head in the working area Q = MX.sym('Q') H = MX.sym('H') hr = self._head_range[p.head_option][ps.symbol] _, (_, q_max) = self._solve_working_area_subproblem( p.working_area, p.working_area_direction, hr, H, Q, -1 * Q) self._pump_discharge_bounds[discharge_sym] = (0.0, q_max) _, (h_max, _) = self._solve_working_area_subproblem( p.working_area, p.working_area_direction, hr, H, Q, -1 * H) self._pump_working_area_head_range[head_sym] = (0.0, h_max) # TODO: determine h_min, or do we count on it being 0.0? _, (h_max, _) = self._solve_working_area_subproblem( p.working_area, p.working_area_direction, hr, H, Q, -1 * H, 0) _, (h_min, _) = self._solve_working_area_subproblem( p.working_area, p.working_area_direction, hr, H, Q, H, 0) self._pump_extended_working_area_head_range[head_sym] = (h_min, h_max) # Lower power bound (when on) power = 0.0 coeffs = p.power_coefficients for i in range(coeffs.shape[0]): for j in range(coeffs.shape[1]): power += coeffs[i, j] * H**i * Q**j min_power, _ = self._solve_working_area_subproblem( p.working_area, p.working_area_direction, hr, H, Q, power) # For a monotonically increasing convex function we can find # the maximum by checking all vertices of the polygon. We do # this not on the working area (which is not a polygon), but # on an enclosing square. max_power = 0.0 for h, q in [(0, 0), (h_max, 0), (0, q_max), (h_max, q_max)]: power = 0.0 for i in range(coeffs.shape[0]): for j in range(coeffs.shape[1]): power += coeffs[i, j] * h**i * q**j max_power = max(power, max_power) self._pump_power_bounds[power_sym] = (0.0, max_power) self._pump_power_range_on[power_sym] = (min_power, max_power) # Convexity and increasing-with-H check of power coefficients Q = SX.sym('Q') H = SX.sym('H') X = vertcat([Q, H]) for ps in self.pumping_stations(): for p in ps.pumps(): discharge_sym = p.symbol.replace('.', '_') + "_Q" head_sym = p.symbol + "_head" power = 0.0 coeffs = p.power_coefficients for i in range(coeffs.shape[0]): for j in range(coeffs.shape[1]): power += coeffs[i, j] * H**i * Q**j # Check if power is increasing with H (only necessary if there are resistances) if ps.resistances(): sx_jac = jacobian(power, H) sx_hess = hessian(sx_jac, X)[0] sx_determinant = det(sx_hess) sx_trace = trace(sx_hess) # CasADi returns NaN if the expression is still a function of H and/or Q determinant = float(sx_determinant) trace_calculated = float(sx_trace) if np.isnan(determinant): logger.warning('Cannot determine monotonicity in H of power of pump "{}".'.format(p.symbol)) elif determinant < 0.0 or trace_calculated < 0.0: # Concave function of which we are trying to find the minimum --> use enclosing rectangle h_min, h_max = self._pump_working_area_head_range[head_sym] q_min, q_max = self._pump_discharge_bounds[discharge_sym] min_jac = np.inf for h, q in [(h_min, q_min), (h_max, q_min), (h_min, q_max), (h_max, q_max)]: cur_jac = float(substitute(substitute(sx_jac, H, h), Q, q)) min_jac = min(cur_jac, min_jac) if min_jac < 0.0: logger.warning( 'Power of pump "{}" likely not increasing with H in working area.'.format(p.symbol)) else: hr = self._head_range[p.head_option][ps.symbol] # We require convexity on the working area (i.e. when pump is on) minimum_jac, _ = self._solve_working_area_subproblem( p.working_area, p.working_area_direction, hr, H, Q, sx_jac) if minimum_jac < 0.0: logger.error( 'Power of pump "{}" is not increasing with H in working area.'.format(p.symbol)) # Convexity check: sx_hess = hessian(power, X)[0] sx_determinant = det(sx_hess) sx_trace = trace(sx_hess) # CasADi returns NaN if the expression is still a function of H and/or Q determinant = float(sx_determinant) trace_calculated = float(sx_trace) if (not np.isnan(determinant) and determinant < 0.0) or ( not np.isnan(trace_calculated) and trace_calculated < 0.0): logger.error('Non-convex power relationship specified for pump "{}".'.format(p.symbol)) elif np.isnan(determinant): # The determinant is an expression of H and Q. Check if it # is a convex expression, and if so, find the minimum. sx_det_hess = hessian(sx_determinant, X)[0] sx_det_determinant = det(sx_det_hess) det_determinant = float(sx_det_determinant) if not np.isnan(det_determinant): hr = self._head_range[p.head_option][ps.symbol] # We require convexity on the expanded working area, i.e when pump is off minimum_determinant, _ = self._solve_working_area_subproblem( p.working_area, p.working_area_direction, hr, H, Q, sx_determinant, 0) minimum_trace, _ = self._solve_working_area_subproblem( p.working_area, p.working_area_direction, hr, H, Q, sx_trace, 0) if minimum_determinant < 0.0 or minimum_trace < 0.0: logger.error( 'Power for pump "{}" is not convex over extended working area'.format(p.symbol)) # For correct maximum power estimation, we also require # convexity on the rectangular region: # Q \in [0, max_q] # H \in [0, max_h] # where max_q and max_h are the maximum discharge # and pump head when the pump is _on_. bnds = {Q.getName(): self._pump_discharge_bounds[discharge_sym], H.getName(): self._pump_working_area_head_range[head_sym]} minimum_determinant, _ = self._solve_hq_subproblem(H, Q, sx_determinant, bounds=bnds) minimum_trace, _ = self._solve_hq_subproblem(H, Q, sx_trace, bounds=bnds) if minimum_determinant < 0.0 or minimum_trace < 0.0: logger.error( 'Power for pump "{}" is not convex over max. head/discharge range.'.format(p.symbol)) else: logger.warning('Cannot detect convexity of power coefficients of pump "{}".'.format(p.symbol)) else: # Positive determinant --> convex continue for ps in self.pumping_stations(): # Add discharge symbols for each of the pumps in this pumping station # TODO: Add on/off + switched on/off symbols for each pump as well. # Make sure that this is accompanied by also adding constraints on # pump 2 only being able to switch on (or be on?) when pump 1 is on. for p in ps.pumps(): # Define symbol names status_sym = '{}__status'.format(p.symbol) sw_on_sym = '{}__switched_on'.format(p.symbol) sw_off_sym = '{}__switched_off'.format(p.symbol) power_sym = '{}__power'.format(p.symbol) # Store variable names self._pump_discrete_symbols.add(status_sym) self._pump_discrete_symbols.add(sw_on_sym) self._pump_discrete_symbols.add(sw_off_sym) # Store bounds self._pump_status_bounds[status_sym] = (0, 1) self._pump_status_bounds[sw_off_sym] = (0, 1) self._pump_status_bounds[sw_off_sym] = (0, 1) # Generate and store MX variables status_mx = MX.sym(status_sym) sw_on_mx = MX.sym(sw_on_sym) sw_off_mx = MX.sym(sw_off_sym) power_mx = MX.sym(power_sym) self._pumping_station_mx_path_variables.add(status_mx) self._pumping_station_mx_path_variables.add(sw_on_mx) self._pumping_station_mx_path_variables.add(sw_off_mx) self._pumping_station_mx_path_variables.add(power_mx) # Store all symbols together self._pump_status_pairs[p.symbol] = (status_sym, sw_on_sym, sw_off_sym, power_sym) # Store cache to disk with open(self._hq_subproblem_cache_path, 'w') as f: pickle.dump(self._hq_subproblem_cache, f) @abstractmethod
[docs] def pumping_stations(self): """ User problem returns list of :class:`PumpingStation` objects. :returns: A list of pumping stations. """ raise NotImplementedError()
def constraints(self, ensemble_member): constraints = super(PumpingStationMixin, self).constraints(ensemble_member) for ps in self.pumping_stations(): for p in ps.pumps(): status_sym, sw_on_sym, sw_off_sym, _ = self._pump_status_pairs[p.symbol] for t_m1, t in zip(self.times()[:-1], self.times()[1:]): d_t = self.state_at(status_sym, t) d_tm1 = self.state_at(status_sym, t_m1) d_diff = d_t - d_tm1 x = self.state_at(sw_on_sym, t) y = self.state_at(sw_off_sym, t) # x is 1 if and only if pump switched on (else 0) constraints.append((d_t - x, 0, inf)) constraints.append((x - d_diff, 0, inf)) constraints.append((1 - d_tm1 - x, 0, inf)) # y is 1 if and only if pump switched off (else 0) constraints.append((d_tm1 - y, 0, inf)) constraints.append((y + d_diff, 0, inf)) constraints.append((1 - d_t - y, 0, inf)) # TODO: Minimum on and minimum off constraints currently # assume equidistant time steps, and the unit of # "minimum_on/off" is then the number of time steps. Allow for # non-equidistant time steps, and units of an hour. tstep = self.times()[1] - self.times()[0] if p.minimum_on > 0.0: min_on = int(p.minimum_on / tstep) # TODO: Use history data to enforce minimum pump on time at start of # optimization interval. num_tsteps = len(self.times()) for i in range(num_tsteps): sum_on = 0.0 num_tsteps_incl = min(num_tsteps - i, min_on) for j in range(num_tsteps_incl): sum_on += self.state_at(status_sym, self.times()[i+j]) constraints.append((sum_on - num_tsteps_incl * self.state_at(sw_on_sym, self.times()[i]), 0, inf)) if p.minimum_off > 0.0: min_off = int(p.minimum_on / tstep) num_tsteps = len(self.times()) for i in range(num_tsteps): sum_off = 0.0 num_tsteps_incl = min(num_tsteps - i, min_off) for j in range(num_tsteps_incl): sum_off += (1 - self.state_at(status_sym, self.times()[i+j])) constraints.append((sum_off - num_tsteps_incl * self.state_at(sw_off_sym, self.times()[i]), 0, inf)) return constraints def _solve_hq_subproblem(self, H, Q, f, constraints=None, bounds=None): # Caching of the results of this function. SWIG object cannot be # pickled directly, so isntead we just convert everything to a string. pickle_key = (str(H), str(Q), str(f), str(constraints), str(bounds)) try: return self._hq_subproblem_cache[pickle_key] except KeyError: pass # Discharge of pump is always positive if bounds is None: bounds = {Q.getName(): (0.0, np.inf), H.getName(): (-np.inf, np.inf)} if constraints is None: g, lbg, ubg = map(vertcat, ([], [], [])) else: g, lbg, ubg = map(vertcat, zip(*constraints)) # State vector X = vertcat([H, Q]) # Bounds lbx = vertcat([bounds[x.getName()][0] for x in X]) ubx = vertcat([bounds[x.getName()][1] for x in X]) nlp = {'f': f, 'g': g, 'x': X} options = {'print_time': 0, 'print_level': 0} solver = NlpSolver('nlp', 'ipopt', nlp, options) solver.setInput(lbx, 'lbx') solver.setInput(ubx, 'ubx') solver.setInput(lbg, 'lbg') solver.setInput(ubg, 'ubg') solver.setInput(vertcat([0, 0]), 'x0') solver.evaluate() objective_value = float(solver.getOutput('f')[0]) solver_output = solver.getOutput('x') self._hq_subproblem_cache[pickle_key] = objective_value, solver_output return objective_value, solver_output def _solve_working_area_subproblem(self, working_area, working_area_direction, head_range, H, Q, f, pump_status=1): constraints = self._working_area_constraints( working_area, working_area_direction, head_range, H, Q, pump_status) return self._solve_hq_subproblem(H, Q, f, constraints) def _working_area_constraints(self, working_area, working_area_direction, head_range, head, discharge, status): constraints = [] for poly, direction in zip(working_area, working_area_direction): # When the pump is off, we increasing the working area to # include all possible points on the H-axis. We calculate # the minimum needed offset to accomplish this. offset_min_h = 0.0 offset_max_h = 0.0 constr_f = 0.0 for i in range(poly.shape[0]): offset_min_h += head_range[0]**i * poly[i, 0] offset_max_h += head_range[1]**i * poly[i, 0] for j in range(poly.shape[1]): constr_f += poly[i, j] * head**i * discharge**j # TODO: Maybe compensate with more than exactly what is # needed, e.g. with 130% of the calculated offset. For # example, when we have a straight vertical line saying Q # > 0.2 m3/s, we might not want to shift the line to # exactly Q = 0 m3/s when the pump is off. To make it # easier for the mixed integer optimizer to find a good # solution, we probably want Q = 0 m3/s to already be an # acceptable solution for something like status = 0.2. # Apply the working area changes, but only if it increases # the working area size. We do not want to shrink it, even # if we hypothetically could, as we would rather keep the # constraints constant in that case. if np.sign(direction) != np.sign(offset_min_h) and \ np.sign(direction) != np.sign(offset_max_h): # We are violating both the lowest H-value as well as # the highest H-value when the pump is off. We have to # compensate only for the largest difference. max_offset = np.sign(offset_min_h) * np.max(np.abs([offset_min_h, offset_max_h])) constr_f -= (1 - status) * max_offset elif np.sign(direction) != np.sign(offset_min_h): constr_f -= (1 - status) * offset_min_h elif np.sign(direction) != np.sign(offset_max_h): constr_f -= (1 - status) * offset_max_h if direction == -1: constraints.append((constr_f, -inf, 0.0)) elif direction == 1: constraints.append((constr_f, 0.0, inf)) else: raise Exception( "Working area polynomial needs a direction of 1 or -1, but got {}".format(direction)) return constraints def path_constraints(self, ensemble_member): constraints = super(PumpingStationMixin, self).path_constraints(ensemble_member) for ps in self.pumping_stations(): for p in ps.pumps(): status_sym, _, _, power_sym = self._pump_status_pairs[p.symbol] discharge_sym = p.symbol.replace('.', '_') + "_Q" status = self.state(status_sym) hr = self._head_range[p.head_option][ps.symbol] constraints.extend(self._working_area_constraints( p.working_area, p.working_area_direction, hr, p.head(), p.discharge(), status)) # Power calculation which we need for optimization/minimization and constraints. power = 0.0 coeffs = p.power_coefficients for i in range(coeffs.shape[0]): for j in range(coeffs.shape[1]): # TODO: Is it better if we simplify here directly (e.g. x^0 = 1, x^1 = x) power += coeffs[i, j] * p.head()**i * p.discharge()**j m, M = self._pump_power_range_on[power_sym] # NOTE: Inequality constraint for power, as an equality constraint would have to be affine constraints.append((self.state(power_sym) - m * status, 0.0, inf)) constraints.append((self.state(power_sym) - M * status, -inf, 0.0)) constraints.append((self.state(power_sym) - (power - M * (1 - status)), 0.0, inf)) # Pump needs to always have a positive discharge constraints.append((p.discharge(), 0.0, inf)) # Pump needs to have zero discharge when off _, q_max = self._pump_discharge_bounds[discharge_sym] constraints.append((p.discharge() - (status * q_max), -inf, 0.0)) # To handle pump switching constraints easily, we make a vector of # the status symbols of all pumps. switch_matrix = ps.pump_switching_matrix switch_constraints = ps.pump_switching_constraints pump_status_vector = [] for p in ps.pumps(): pump_status_vector.append(self.state(self._pump_status_pairs[p.symbol][0])) for i in range(switch_matrix.shape[0]): if any(switch_matrix[i, :] != 0.0): # A pump can only be on when it is allowed to be according to the pump switching matrix # TODO: We can be multiplying by zero here. Is that worse than not multiplying at all? constraints.append((sum(np.multiply(switch_matrix[i, :], pump_status_vector)), switch_constraints[i, 0], switch_constraints[i, 1])) for r in ps.resistances(): C = r.C if C > 0.0: constraints.append((r.head_loss() - C * r.discharge()**2, 0.0, inf)) # To force the head loss to go to zero, we need an upper bound as well. _, max_head_loss = self._head_range[0][ps.symbol] q_max_dh = (max_head_loss / C)**0.5 constraints.append((max_head_loss / q_max_dh * r.discharge() - r.head_loss(), 0.0, inf)) return constraints @property def path_variables(self): variables = super(PumpingStationMixin, self).path_variables variables.extend(self._pumping_station_mx_path_variables) return variables def goals(self): goals = super(PumpingStationMixin, self).goals() goals.append(_MinimizePowerGoal(self)) return goals def bounds(self): bounds = super(PumpingStationMixin, self).bounds() bounds.update(self._pump_status_bounds) bounds.update(self._pump_power_bounds) bounds.update(self._pump_discharge_bounds) return bounds def variable_is_discrete(self, variable): if variable in self._pump_discrete_symbols: return True else: return super(PumpingStationMixin, self).variable_is_discrete(variable) def post(self): results = self.extract_results() times = self.times() # TODO: If we put the calculated pump head and discharge in the # extract_results() dictionary, should we maybe move the calculation # of these time series to priority_completed() in case that exists (or # invalidate cached results with every optimize() call)? It might be # useful for debugging of intermediate results. # Extract the pump head and pump discharge from the results. path_expressions = [] for ps in self.pumping_stations(): for p in ps.pumps(): path_expressions.append(p.head()) path_expressions.append(p.discharge()) expression = self.map_path_expression(vertcat(path_expressions), 0) f = MXFunction('f', [self.solver_input], [expression]) evaluated_path_expressions = f([self.solver_output])[0] evaluated_path_expressions = np.array(evaluated_path_expressions) # Append pump head and discharge to the results dictionary and # time series export. idx = 0 for ps in self.pumping_stations(): for p in ps.pumps(): head_ts = Timeseries(times, evaluated_path_expressions[:, idx]) head_key = "{}_{}".format(p.symbol, "head") self.set_timeseries(head_key, head_ts, output=True) self.__additional_results[head_key] = head_ts.values idx += 1 discharge_ts = Timeseries(times, evaluated_path_expressions[:, idx]) discharge_key = "{}_{}".format(p.symbol, "discharge") self.set_timeseries(discharge_key, discharge_ts, output=True) self.__additional_results[discharge_key] = discharge_ts.values idx += 1 # Check if the inequality constraints of pump power and resistance # head loss have been succesfully minimized to equality. results = self.extract_results() for ps in self.pumping_stations(): for p in ps.pumps(): head_realised = results[p.symbol + "_head"][1:] discharge_realised = results[p.symbol + "_discharge"][1:] power_realised = results[p.symbol + "__power"][1:] status_realised = results[p.symbol + "__status"][1:] power_target = power_realised * 0.0 coeffs = p.power_coefficients for i in range(coeffs.shape[0]): for j in range(coeffs.shape[1]): power_target += coeffs[i, j] * head_realised**i * discharge_realised**j * status_realised power_error = abs(sum(abs(power_target - power_realised)) / max(sum(power_target), sys.float_info.min)) if power_error > self._ineq_relative_error: logger.warning('Relative power error of {} in pump "{}"'.format(power_error, p.symbol)) for r in ps.resistances(): C = r.C head_loss_realised = results[r.symbol + ".dH"][1:] discharge_realised = results[r.symbol + ".HQUp.Q"][1:] head_loss_target = C * discharge_realised**2 head_loss_error = abs(sum(abs(head_loss_target - head_loss_realised)) / max(sum(head_loss_target), sys.float_info.min)) if head_loss_error > self._ineq_relative_error: logger.warning('Relative head loss error of {} in resistance "{}"'.format( head_loss_error, r.symbol)) # NOTE: If we call super() first, adding output time series with # set_time series has no effect, as e.g. PIMIxin/CSVMixin have already # written their export file. That is why we do it at the end instead. super(PumpingStationMixin, self).post() def extract_results(self, *args, **kwargs): results = super(PumpingStationMixin, self).extract_results(*args, **kwargs) results.update(self.__additional_results) return results
[docs]def plot_operating_points(optimization_problem, output_folder, plot_expanded_working_area=True): """ Plot the working area of each pump with its operating points. """ import matplotlib.pyplot as plt import matplotlib.lines as mlines for i, ps in enumerate(optimization_problem.pumping_stations()): for j, p in enumerate(ps.pumps()): plt.clf() # For the head range, we take the extremes of the head over the # pump encountered during optimization, and the maximum head # inside the working area. hrange = optimization_problem._head_range[p.head_option][ps.symbol] hrange = [float(x) for x in hrange] # Convert DMatrix to float head_sym = p.symbol + "_head" if plot_expanded_working_area: hrange_wa = optimization_problem._pump_extended_working_area_head_range[head_sym] else: hrange_wa = optimization_problem._pump_working_area_head_range[head_sym] hrange_wa = [float(x) for x in hrange_wa] # Convert DMatrix to float hrange = [min(hrange[0], hrange_wa[0]), max(hrange[1], hrange_wa[1])] discharge_sym = p.symbol.replace('.', '_') + "_Q" qrange = optimization_problem._pump_discharge_bounds[discharge_sym] qrange = [float(x) for x in qrange] # Convert DMatrix to float # For the lines, use a little bit wider range for both H and Q extra_space = 0.25 * (qrange[1] - qrange[0]) qs_range = (qrange[0] - extra_space, qrange[1] + extra_space) extra_space = 0.25 * (hrange[1] - hrange[0]) hs_range = (hrange[0] - extra_space, hrange[1] + extra_space) qs = np.linspace(*qs_range) hs = np.linspace(*hs_range)[:, None] # For the x and y limits we use slightly less extra space. This is # to make sure that the contour lines go all the way to the edge # of our plots. extra_space = 0.1 * (qrange[1] - qrange[0]) qplot_range = (qrange[0] - extra_space, qrange[1] + extra_space) extra_space = 0.1 * (hrange[1] - hrange[0]) hplot_range = (hrange[0] - extra_space, hrange[1] + extra_space) plt.xlim(*qplot_range) plt.ylim(*hplot_range) # Plot lines for the horizontal and vertical axes plt.axhline(0, color='black', zorder=1) plt.axvline(0, color='black', zorder=1) wa = p.working_area wa_dir = p.working_area_direction # Plot the working area for w in range(len(wa)): constraints = optimization_problem._working_area_constraints( wa[w:w+1], wa_dir[w:w+1], hrange, hs, qs, 1) plt.contour(qs, hs.ravel(), constraints[0][0], [0], colors='b') if plot_expanded_working_area: constraints = optimization_problem._working_area_constraints( wa[w:w + 1], wa_dir[w:w+1], hrange, hs, qs, 0) plt.contour(qs, hs.ravel(), constraints[0][0], [0], colors='g', linestyles='dashed') plt.plot([0, 0], list(hrange), 'yo', ms=6, label="Head range") results = optimization_problem.extract_results() plt.plot(results[discharge_sym][1:], results[head_sym][1:], 'r+', markeredgewidth=2, label="Operating points") # Manually add legend entries for the working area(s), because contour plots do not handle that automatically handles, _ = plt.gca().get_legend_handles_labels() handles.append(mlines.Line2D([], [], color='b', label='Working area')) if plot_expanded_working_area: handles.append(mlines.Line2D([], [], color='g', linestyle='--', label='Extended working area')) plt.legend(handles=handles) plt.xlabel(r'Discharge [$\mathdefault{m^3\!/s}$]') plt.ylabel(r'Head [$\mathdefault{m}$]') f = plt.gcf() f.set_size_inches(8, 6) f.tight_layout() plt.grid(True) fname = 'QHP_{}.png'.format(p.symbol.replace('.', '_')) fname = os.path.join(output_folder, fname) plt.savefig(fname, bbox_inches='tight', pad_inches=0.1)