Source code for cadbiom.models.clause_constraints.mcl.CLUnfolder

# -*- coding: utf-8 -*-
## Filename    : CLUnfolder.py
## Author(s)   : Michel Le Borgne
## Created     : 22 mars 2012
## Revision    :
## Source      :
##
## Copyright 2012 - 2020 IRISA/IRSET
##
## This library is free software; you can redistribute it and/or modify it
## under the terms of the GNU General Public License as published
## by the Free Software Foundation; either version 2.1 of the License, or
## any later version.
##
## This library is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY, WITHOUT EVEN THE IMPLIED WARRANTY OF
## MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE.  The software and
## documentation provided here under is on an "as is" basis, and IRISA has
## no obligations to provide maintenance, support, updates, enhancements
## or modifications.
## In no event shall IRISA be liable to any party for direct, indirect,
## special, incidental or consequential damages, including lost profits,
## arising out of the use of this software and its documentation, even if
## IRISA have been advised of the possibility of such damage.  See
## the GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this library; if not, write to the Free Software Foundation,
## Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
##
## The original code contained here was initially developed by:
##
##     Michel Le Borgne.
##     IRISA
##     Symbiose team
##     IRISA  Campus de Beaulieu
##     35042 RENNES Cedex, FRANCE
##
##
## Contributor(s): Geoffroy Andrieux IRSET
##
"""Main engine for constraints unfolding and solving.

Process from the intialization with a query to the finding of a solution.


MCLSimpleQuery (reminder):
    Object containing 2 main types of attributes describing properties:
        - Attributes in text format: These are logical formulas that are humanly
          readable.
          Ex: `start_prop, inv_prop, final_prop, variant_prop`
        - Attributes in DIMACS format: These are logical formulas encoded in
          the form of clauses containing numerical values.
          Ex: `dim_start, dim_inv, dim_final, dim_variant`

---

About shifting clauses and variables:
    Initialization of the dynamic system by __forward_init_dynamic() iterates on
    all literals of the system.
    If a literal is a t+1 literal, its value is shifted to the right or to the
    left depending on whether its sign is positive or negative
    (addition vs soustraction of the shift_step value which is the number of
    variables in the system).

    Otherwise, the literal is a t literal and its value stays untouched.

    t+1 and t literals are generated by CLDynSys object directly from the current
    model.

    Shifting a clause consists to iterates on all its literals and shift their
    values to the right or to the left depending on whether their values are
    positive or negative.

    \*constraints unfolder attributes are shifted at every step and submitted to
    the solver.

---

init_with_query(query):
    Copy of the query attributes to temporary attributes of the CLUnfolder.

    Textual properties of query are compiled into numeric clauses in
    init_forward_unfolding(), just at the beginning of squery_is_satisfied()
    and squery_solve().

    ALL textual properties are susceptible to add new auxiliary variables in the
    system (increase shift_step). This is why any shift must be made after the
    initialization.


init_forward_unfolding():
    Organization of the initialization of the system (its clauses) for
    the current request and first variables shift.

    Initialized attributes:
        - `__initial_constraints`:
          Query data + negative values of all places that are not frontiers.
        - `__final_constraints`:
          Query data
        - `__invariant_constraints`:
          List of Query data
        - `__precomputed_variant_constraints`:
          Query data (automatic merge of textual and DIMACS forms)
        - `__variant_constraints`:
          Initialized with the first item of `__precomputed_variant_constraints`
        - `__precomputed_dynamic_constraints`:
          Initialized with the whole previously shifted system
          if the size of the system hasn't changed, and if no auxiliary
          clauses has been added meanwhile.

    Shifted attributes with __shift_step (nb of variables in the system):
        - __forward_init_dynamic(): __dynamic_constraints:
          Shift dynamic_system.clauses + dynamic_system.aux_clauses + init
        - __shift_final(): __final_constraints:
          Shift + replace __final_constraints
        - __shift_invariant(): __invariant_constraints:
          Shift + append of the last element of __invariant_constraints

        PS: __variant_constraints si already initialized for the first step

    The following functions use the following methods to decompile textual
    properties or events (__compile_property(), __compile_event()) and to
    code them into DIMACS clauses constraints (__code_clause()).

    __code_clause() is susceptible to add new auxiliary numerical variables
    during this process if a variable is not found in __var_code_table and in
    __aux_code_table.

    - __init_initial_constraint_0, __init_final_constraint_0, __init_invariant_constraint_0
      Call __code_clause + __compile_property
    - __init_variant_constraints_0
      Call __code_clause + __compile_event


shift():
    Shift all clauses of the system to prepare it for the next step.

    Called at each step by:
        - squery_solve()
        - squery_is_satisfied()

    Modified attributes:
        - __shift_dynamic(): __dynamic_constraints:
          Shift + append of the last element of __dynamic_constraints
        - __shift_variant(): __variant_constraints:
          Shift clauses of the current step in __precomputed_variant_constraints,
          and append them to __variant_constraints.
        - __shift_invariant()
          ...
        - __shift_final()
          ...

        PS: __initial_constraints are left as this for now
        (shifted only with BACKWARD shift direction).

"""
from __future__ import print_function
from packaging import version

#from pyCryptoMS import CryptoMS
from pycryptosat import Solver as CryptoMS
from pycryptosat import __version__ as solver_version
from cadbiom.models.biosignal.translators.gt_visitors import compile_cond, compile_event
from cadbiom.models.clause_constraints.CLDynSys import Clause, Literal
from cadbiom.models.clause_constraints.mcl.MCLSolutions import RawSolution, MCLException
from cadbiom import commons as cm

# C++ API
from _cadbiom import (
    shift_clause,
    shift_dimacs_clauses,
    forward_code,
    forward_init_dynamic,
)

# Standard imports
from logging import DEBUG
import itertools as it

LOGGER = cm.logger()


[docs]class CLUnfolder(object): """ When loading a model in a MCLAnalyser object, a CLUnfolder is generated which implies the compilation of the dynamical system into a clause constraint dynamical system and the DIMACS coding of all the variables. This coding cannot be changed later. The unfolding of constraints is performed efficiently on the numeric form of the clause constraints. The CLUnfolder provide various method to convert variable names to DIMACS code and back, to extract the frontier of a model and to extract informations from raw solutions. Dynamic constraints unfolding management: Each variable is coded as an integer (in var_code_table and var_list). Each clause is represented as a list of signed integers (DIMACS coding). During unfolding, clauses are shifted either to the future (forward) or to the past (backward). The shift operator is a simple addition of self.__shift_step. The shift direction depends only on initializations. self.__shift_step depends on the number of variables so it is impossible to add variables while generating a trajectory unfolding. Glossary: - ground variables/ground DIMACS code: variables at time t=0 (not shifted). - solution: a solution of the logical dynamic constraint from SAT solver - state vector: list of DIMACS code of original (not shifted) variables corresponding to a step. - trajectory: list of state_vectors Some attributes (Please read the constructor comments for much more information): - `__*_property`: Logical formulas in text format from the current query. - `__dimacs_*`: Clauses in DIMACS format from the current query. - `__*_constraints`: CLUnfolder attributes for unfolding and shift; initialized from the query attributes. """ def __init__(self, dynamic_system, debug=False): """ :param dynamic_system: Describe a dynamic system in clause form. :key debug: (Optional) Used to activate debug mode in the Unfolder. i.e. `__var_list` attribute will be ordered, and initializations of the unfolder will be reproductible. Default: False. :type dynamic_system: <CLDynSys> :type debug: <boolean> """ self.dynamic_system = dynamic_system # symbolic clause dynamic system # shift_step equals to the total number of coded variables of the system # (including inputs, entities, clocks/events, auxiliary variables) # This number can change if auxiliary clauses are added. # shift_step_init: the shift step ss (if n is X_0 code, n+ss is X_1 code) self.shift_step_init = dynamic_system.get_var_number() self.__shift_step = self.shift_step_init # About unfolder lock and shift_step: # shift_step must be frozen in order to avoid problems during the # unflatten() step of RawSolutions. # => Each step MUST have the same number of variables. # Thus, we lock the Unfolder by turning self.__locked to True. # See __shift_clause() and __shift_ground_clause() self.__locked = False self.__current_step = 1 # current number of unfolding # For reachability optimisation. Number of shifts before checking # the final property self.__steps_before_check = 0 # assign a DIMACS code number to each variable (invariant) # Create mappings between names and values of the variables of the system # dynamic_system.base_var_set : Set of ALL variables of the dynamic system # (including inputs, entities, clocks/events, auxiliary variables). self.__var_code_table = dict() # Mapping: name -> DIMACS code (!= 0) self.__var_list = ['##'] # Mapping: DIMACS code (!= 0) -> name # Fix order of variables names with a list: # - indexes of __var_list are the values of the variables at these indexes # - values begin from 1 (0 is a blacklisted value) if debug: base_var_set = sorted(list(dynamic_system.base_var_set)) else: base_var_set = list(dynamic_system.base_var_set) self.__var_list += base_var_set # keys: are the names of the variables, values: their values self.__var_code_table = { var_name: var_num for var_num, var_name in enumerate(base_var_set, 1) } # Optimization: for forward_init_dynamic() and forward_code() (mostly # implemented in C. # Precompute __var_code_table with the literal names in "future" format; # i.e with a last char '`' at the end. # Thus it is easy to search them directly in the dict (in O(1)), # instead of slicing them. # Note: "future" literals are already in self.dynamic_system.clauses # and maybe in self.dynamic_system.aux_clauses temp_var_code_table = { var_name + "`": var_num for var_name, var_num in self.__var_code_table.iteritems() } self.__var_code_table.update(temp_var_code_table) assert len(self.__var_code_table) == 2 * self.__shift_step # Include auxiliary clause to eliminate undesirable solutions # If A->B has clock h, an aux constraint added is h included in h when A # Must be set to False for inhibitors computations self.__include_aux_clauses = True # Flag used to know if the set of auxiliary clauses has changed between # 2 initializations of the Unfolder by a query # (i.e. properties of the queries are different). # Cf init_forward_unfolding() self.__include_aux_clauses_changed = False self.__lit_cpt = self.shift_step_init + 1 # counter for aux. var. coding # Same mapping tools but for auxiliary variables introduced by properties # compilation. Cf __code_clause() self.__aux_code_table = dict() # name to code self.__aux_list = [] # code to name # No frontier init: # Set of DIMACS codes of the simple variables/places that are not frontiers # Used to disable all variables of places in the model (except frontiers) self.no_frontier_values = frozenset( self.__var_code_table[not_front_name] for not_front_name in self.dynamic_system.no_frontiers ) # Used in __prune_initial_no_frontier_constraint_0() in order to # compute values of literals that are specified in user constraints. # These literals should not be blindly disabled by default in __no_frontier_init self.initial_values = set() # Ordered list of DIMACS codes of the places that are not frontiers # => Clauses of negatives values # Ex: [[-1], [-2], ...] # The values are usually a subset of no_frontier_values self.__no_frontier_init = list() # Frontiers: # Ordered list of DIMACS codes of the frontier variables/places self.frontier_values = [ self.__var_code_table[front_name] for front_name in self.dynamic_system.frontiers ] self.frontier_values.sort() ## TODO: Utiliser une liste triée est-il utile ici ? ## si non passer utiliser un frozenset et supprimer les casts partout ## Cf RawSolution.frontier_pos_and_neg_values BACKWARD, encore utilisé avec index ## Cf TestCLUnfolder.test_frontier indexable mais peut etre contourné # Precompute convenient attributes for: # frontiers_pos_and_neg: # - RawSolution.frontier_pos_and_neg_values # (set operation with solution variables) # frontiers_negative_values: # - MCLQuery.from_frontier_sol_new_timing # - MCLQuery.frontiers_negative_values # - MCLAnalyser.__solve_with_inact_fsolution # - TestCLUnfolder.test_prune # # Set of frontier positive and negative values # (all frontiers and their opposite version). # => operations with sets are much faster self.frontiers_negative_values = frozenset( -frontier for frontier in self.frontier_values ) self.frontiers_pos_and_neg = self.frontiers_negative_values | frozenset( self.frontier_values ) # DIMACS codes of the input variables/places for extraction (inv) self.__inputs = frozenset( self.__var_code_table[inp] for inp in self.dynamic_system.inputs ) # DIMACS codes of the free_clocks variables/places for extraction (invariant) self.__free_clocks = frozenset( self.__var_code_table[fcl] for fcl in self.dynamic_system.free_clocks ) # Binding for a merged version of __inputs and __free_clocks # Convenient attribute for RawSolution.extract_act_input_clock_seq() # DIMACS codes of the input and free_clocks variables self.inputs_and_free_clocks = self.__inputs | self.__free_clocks # Properties to be checked self.reset() # Logical constraints: # Result from unfolding of base constraints # Boolean vectors signification: # X: Current state of places (activated/unactivated) # X': Future state of places # H: Current 'free' events (present/not present) => ? # I: Current inputs => ? self.__dynamic_constraints = [] # DIMACS clauses: X' = f(X,H,I) self.__precomputed_dynamic_constraints = [] # Simple temporal properties: # SP(X0): Initial property/start property; Never change at each step. # IP(X): Invariant property # VP(X): Variant property # List of logical formulas of properties forced at each step # It's the trajectory of events of a solution. # FP(X): Final property self.__initial_constraints = [] # DIMACS clauses: C(X_0) <list <list <int>>> self.__final_constraints = [] # idem: C(X_n) self.__invariant_constraints = [] # DIMACS clauses: C(X_i)) self.__variant_constraints = [] # variant constraints along trajectory self.__shift_direction = None # FORWARD or BACKWARD # statistics on solver self.__stats = False self.__nb_vars = 0 self.__nb_clauses = 0
[docs] def reset(self): """Reset the unfolder before a new query Reset only properties and dimacs clauses from the current query; AND `__precomputed_variant_constraints`. `__initial_constraints`, `__final_constraints`, `__invariant_constraints`, `__variant_constraints` and `__dynamic_constraints` ARE NOT reset here (see :meth:`init_forward_unfolding`). This function is called from the constructor and during :meth:`MCLAnalyser.sq_is_satisfiable() <cadbiom.models.clause_constraints.mcl.MCLAnalyser.MCLAnalyser.sq_is_satisfiable>` and :meth:`MCLAnalyser.sq_is_satisfiable() <cadbiom.models.clause_constraints.mcl.MCLAnalyser.MCLAnalyser.sq_solutions>` following the call of :meth:`init_with_query`. """ # Properties to be checked self.__initial_property = None # logical formula - literal boolean expression self.__dimacs_initial = None # list of DIMACS clauses self.__final_property = None # logical formula self.__dimacs_final = None # list of DIMACS clauses self.__invariant_property = None # logical formula self.__dimacs_invariant = None # list of DIMACS clauses self.__variant_property = None # list<logic formulas> self.__dimacs_variant = None # list<list<DIMACS clauses>> # list of variant temporal constraints in DIMACS ground code self.__precomputed_variant_constraints = None # list<list<DIMACS clauses>> # Note: temp_initial_values is used in: # __init_initial_constraint_0() # __init_invariant_constraint_0() # See __prune_initial_no_frontier_constraint_0() self.temp_initial_values = set() # If this function is called from the constructor, # the following variables are just redefined here. # For reachability optimisation. Number of shifts before checking # the final property self.__steps_before_check = 0 self.__shift_direction = None # FORWARD or BACKWARD self.__locked = False self.__lit_cpt = self.shift_step_init + 1
[docs] def check_query(self, query): """Check textual and DIMACS properties of the given query If textual properties have changed since the last initialization of the CLUnfolder, new auxiliary variables will be added to the dynamic system. Also check if values of literals are in the limits of the dynamical system (shift_step_init). If it's not the case, a ValueError exception will be raised. Indeed, literals **can't** be used without a proper init. .. TODO:: Keep in mind that adding auxiliary variables is possible in the future. Il faut remplir __aux_code_table, __aux_list, jusqu'à la valeur du litéral inconnu dans le système et incrémenter les variables en conséquence. S'inspirer de __code_clause() .. TODO:: We do not test if the textual events of query.variant_prop involve events that are not in the model... Called by: :meth:`init_with_query` :param query: Query built with start, invariant, final, variant properties in textual and/or DIMACS forms. :type query: <MCLSimpleQuery> """ # Check textual properties if ( self.__initial_property != query.start_prop or self.__final_property != query.final_prop or self.__invariant_property != query.inv_prop or self.__variant_property != query.variant_prop ): # Properties have changed: # The system will be modified; Auxiliary clauses will be added. # Do not reload cached constraints. Cf init_forward_unfolding() self.__include_aux_clauses_changed = True ## TODO: à repasser dans init_with_query ? # Ne pas faire ce test systématiquement implique que les propriétés # texte ne sont jamais modifiées entre chaque requete. # Check literals if query.dim_inv: # Check dimacs invariant: Initial state only (will be shifted later) # Ex: [[12]] assert len(query.dim_inv) == 1 # Check values of literals values = set(it.chain(*query.dim_start)) values.update(it.chain(*query.dim_final)) values.update(it.chain(*query.dim_inv)) if query.dim_variant: # Handle empty steps: # Ex: [[[12], [1]], [[58], [47]], []] g = (step for step in query.dim_variant if step) [values.update(it.chain(*step)) for step in g] # Get all values that are not in the initial dynamic system out_of_range_values = {val for val in values if abs(val) > self.shift_step_init} if not out_of_range_values: return # The literal comes from an auxilary clause; not from the dynamic model. raise ValueError( "%s variable(s) is/are not in the initial system.\n" "Insertion of literals from auxiliary clauses is not allowed here.\n" "Since the auxiliary clauses are reset here, this literal is suspect\n" "and its use dangerous because its value is not reproductible\n" "(the value can refer to a different constraint from one run to another)." % out_of_range_values )
[docs] def init_with_query(self, query, check_query=True): """Initialise the unfolder with the given query Following attributes are used from the query: start_prop, dim_start, inv_prop, dim_inv, final_prop, dim_final, variant_prop, dim_variant, steps_before_check Textual properties of query are compiled into numeric clauses in init_forward_unfolding(), just at the beginning of squery_is_satisfied() and squery_solve(). This compilation step is costly due to ANTLR performances... .. warning:: ALL textual properties are susceptible to add new auxiliary variables in the system (increase shift_step). Thus, the system will be rebuilt. :param query: Query built with start, invariant, final, variant properties in textual and/or DIMACS forms. :key check_query: Check textual and DIMACS properties of the given query. .. warning:: This option should be disable for performance reasons; or if you know what you are doing. You will assume in particular that: - textual properties are not modified - values of literals in DIMACS properties are already described in the model (new literals are not allowed). :type query: <MCLSimpleQuery> :type check_query: <boolean> """ if check_query: self.check_query(query) # Reset the unfolder before a new query self.reset() # Init with query properties and clauses self.__initial_property = query.start_prop # logical formula in text self.__dimacs_initial = query.dim_start self.__final_property = query.final_prop # logical formula in text self.__dimacs_final = query.dim_final self.__invariant_property = query.inv_prop # logical formula in text self.__dimacs_invariant = query.dim_inv # It's the trajectory of events of a solution. self.__variant_property = query.variant_prop # logical formula in text self.__dimacs_variant = query.dim_variant # For reachability optimisation. Number of shifts before checking # the final property (default 0) self.__steps_before_check = query.steps_before_check
[docs] def set_stats(self): """Enable solver statistics (for tests)""" self.__stats = True self.__nb_vars = 0 self.__nb_clauses = 0
[docs] def unset_stats(self): """Disable solver statistics (never used)""" self.__stats = False
[docs] def stats(self): """Display solver statistics""" LOGGER.info("NB Variables in solver:", self.__nb_vars) LOGGER.info("NB Clauses in solver:", self.__nb_clauses)
[docs] def set_include_aux_clauses(self, val): """Flag to include auxiliary clause to eliminate undesirable solutions. If A->B has clock h, an aux constraint added is h included in h when A Must be set to False for inhibitors computations. .. warning:: This function impacts the shift_step number. Tested by: - __forward_init_dynamic - __backward_init_dynamic """ self.__include_aux_clauses = val self.__include_aux_clauses_changed = True
## Internal variable access for tests (never used) ######################### @property def dynamic_constraints(self): """For tests: returns coded dynamic constraints""" return self.__dynamic_constraints @property def initial_constraints(self): """For tests: returns coded initial constraints""" return self.__initial_constraints @property def invariant_constraints(self): """For tests: returns coded invariant constraints""" return self.__invariant_constraints @property def variant_constraints(self): """For tests: returns coded variant constraints""" return self.__variant_constraints @property def final_constraints(self): """For tests: returns coded final constraints""" return self.__final_constraints ## Variables management ####################################################
[docs] def var_names_in_clause(self, clause): """Return the names of the variables from values in the given numeric clause (DEBUG never used) """ return [self.get_var_indexed_name(var) for var in clause]
[docs] def var_dimacs_code(self, var_name): """Returns DIMACS code of var_name (string) variable (for tests)""" return self.__var_code_table[var_name]
[docs] def get_system_var_number(self): """Get number of variables in the clause constraint dynamical system (including inputs, entities, clocks/events, auxiliary variables) (for tests) .. note:: This number should be equal to the number of variables in self.get_var_number() """ return self.dynamic_system.get_var_number()
[docs] def get_var_number(self): """Get number of principal variables (properties excluded) in the unfolder (including inputs, entities, clocks/events, auxiliary variables) (for tests) .. note:: This number should be equal to the number of variables in self.get_system_var_number() """ # Remove the blacklisted first item (##) for variables naming return len(self.__var_list) - 1
[docs] def get_var_name(self, var_num): """Get name of the variable .. seealso:: Explanations on :meth:`get_var_indexed_name` @param var_num: DIMACS literal coding of an initial variable @return: name of the variable """ # Get the original var_code without shifts var_code = (abs(var_num) - 1) % self.__shift_step + 1 if var_code <= self.shift_step_init: # Variable num is less than the number of the variables in the system # => variable from the initial system return self.__var_list[var_code] else: # Auxiliary variable introduced by properties compilation return self.__aux_list[var_code - self.shift_step_init - 1]
[docs] def get_var_indexed_name(self, var_num): """Get name of the variable with the time index appended .. note:: time index is the number of steps since the begining of the simulation. :Example - Only standard variables:: __shift_step = 2 (number of variables in the system) shift_step_init = 2 (number of variables in the system at the initial state) __var_list = ["##", "n1", "n2"] Virtual list of indexes in the state vector of a solution: [0, 1, 2, 3, 4, 5, 6, 7, 8, ...] #|n1,n2|n1,n2|n1,n2|n1,n2| ti=0 ti=1 ti=2 ti=3 Given var_num = 7: index of var_name in __var_list: ((7-1) % 2) + 1 = 1 var_name = "n1" time index: (7 - 1) / 2 = 3 => return: "n1_3" :Example - With an auxiliary variable:: __shift_step = 2 (There is 1 auxiliary variable) shift_step_init = 1 __var_list = ["##", "n1"] __aux_list = ["_lit0"] Virtual list of indexes in the state vector of a solution: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...] #|n1,_lit0|n1,_lit0|n1,_lit0| ti=0 ti=1 ti=2 If the index is > shift_step_init, then it is an auxiliary variable. In order to get the index in __aux_list we have to substract: - `1` because on the contrary of __var_list, __aux_list begins at index 0 - `shift_step_init` because value of the first aux variable follows the last value of __var_list Given var_num = 6 var_code of var_name: ((6-1) % 2) + 1 = 2 2 > shift_step_init index of var_name in __aux_list: var_code - shift_step_init - 1 2 -1 -1 = 0 var_name = "_lit0" => return: "_lit0_2" :param var_num: DIMACS literal coding :type var_num: <int> :return: name of the variable with the time index appended :rtype: <str> """ varnum1 = abs(var_num) # Get the original var_code without shifts var_code = (varnum1 - 1) % self.__shift_step + 1 var_step = (varnum1 - var_code) / self.__shift_step if var_code <= self.shift_step_init: # Variable num is less than the number of the variables in the system # => variable from the initial system return self.__var_list[var_code] + "_%s" % var_step else: # Auxiliary variable introduced by properties compilation index = var_code - self.shift_step_init - 1 return self.__aux_list[index] + "_%s" % var_step
@property def free_clocks(self): """Get the DIMACS codes of the free_clocks variables""" return self.__free_clocks @property def inputs(self): """Get the DIMACS codes of the input variables""" return self.__inputs @property def shift_direction(self): """ @return: string "FORWARD" or "BACKWARD" """ return self.__shift_direction @property def shift_step(self): """ @return: the shift step ss (if n is X_0 code, n+ss is X_1 code) """ return self.__shift_step @property def current_step(self): """ @return: the current number of unfolding """ return self.__current_step ## Translation from names to num codes ##################################### ## The translation depends on the shift direction def __forward_code(self, clause): """Deprecated, directly included in C++ module .. seelalso:: :meth:`forward_init_dynamic` Numerically code a clause with the numeric codes found in self.__var_code_table for a base variable x, and numeric_code +/- shift_step (according to the sign) for x' variable. .. note:: Future variables x' are noted "name`" in Literal names. The absolute value of these variables will increase by shift_step in the next step. :param clause: A Clause object :return: The DIMACS coding of the forward shifted clause """ # Old API # num_clause = [] # for lit in clause.literals: # if lit.name[-1] == '`': # t+1 variable # num_clause.append( # -(self.__var_code_table[lit.name[:-1]] + self.__shift_step) \ # if not lit.sign \ # else (self.__var_code_table[lit.name[:-1]] + self.__shift_step) # ) # else: # t variable # num_clause.append( # -self.__var_code_table[lit.name] \ # if not lit.sign else self.__var_code_table[lit.name] # ) # return num_clause # New API via C++ module return forward_code(clause, self.__var_code_table, self.__shift_step) def __backward_code(self, clause): """(never used, backward is partially implemented) numerically code a clause with the numeric code found in self.__var_code_table + shift_step for a base variable x, and numeric_code for x' variable integer coding increases in past steps @param clause: a Clause object @return: the DIMACS coding of the backward shifted clause """ num_clause = [] for lit in clause.literals: if lit.name[-1] == "`": # t+1 variable num_clause.append( -self.__var_code_table[lit.name[:-1]] if not lit.sign else self.__var_code_table[lit.name[:-1]] ) else: # t variable, base variable num_clause.append( -(self.__var_code_table[lit.name] + self.__shift_step) if not lit.sign else (self.__var_code_table[lit.name] + self.__shift_step) ) return num_clause def __code_clause(self, clause): """Numerically code a clause The numerical values are found in: - self.__var_code_table for variables in the dynamic system, - or in self.__aux_code_table for other auxiliary variables; We assume all variables are ground variables (not shifted). .. warning:: This function MODIFIES SHIFT_STEP because we add a variable in the system if a variable is found neither in __var_code_table nor in __aux_code_table. Thus, a new auxiliary variable will be created. :param clause: :return: List of numerical values corresponding to the literals in the given clause. :type clause: <Clause> :rtype: <list <int>> """ num_clause = [] for lit in clause.literals: # Get variable value with the given name name = lit.name if name in self.__var_code_table: var_cod = self.__var_code_table[name] elif name in self.__aux_code_table: var_cod = self.__aux_code_table[name] else: # Create an auxiliary variable - modifies shift_step (nb of variables) self.__shift_step += 1 # Mapping name to value code self.__aux_code_table[name] = self.__shift_step # Mapping value code to name self.__aux_list.append(name) # Define var_cod var_cod = self.__shift_step # Add the sign to the value lit_code = var_cod if lit.sign else -var_cod num_clause.append(lit_code) return num_clause ## Dynamic initialisations ################################################# ## TODO: move near init_forward_unfolding def __forward_init_dynamic(self): """Dynamics initialisations. Set dynamic constraints for a forward one step: X1 = f(X0) Numerically code clauses with the numeric codes found in self.__var_code_table for a base variable x, and numeric_code +/- shift_step (according to the sign) for x' variable. __dynamic_constraints is a list of lists of numeric clauses (lists of ints) Each sublist of __dynamic_constraints corresponds to a step in the unfolder; the last step is the last element. .. seealso:: :meth:`__forward_code` .. note:: Called by :meth:`init_forward_unfolding` .. note:: Future variables x' are noted "name`" in Literal names. The absolute value of these variables will increase by shift_step in the next step. """ # New API via C++ module if self.__include_aux_clauses: # Auxiliary clauses are supported (default) self.__dynamic_constraints = forward_init_dynamic( self.dynamic_system.clauses, self.__var_code_table, self.__shift_step, self.dynamic_system.aux_clauses, ) else: self.__dynamic_constraints = forward_init_dynamic( self.dynamic_system.clauses, self.__var_code_table, self.__shift_step ) # __dynamic_constraints is initialized and # has now taken into account the auxiliary clauses flag self.__include_aux_clauses_changed = False def __backward_init_dynamic(self): """Dynamics initialisations. (never used, backward is partially implemented) Set dynamic constraints for a forward one step: X0 = f(X1) .. note:: Called by init_backward_unfolding() """ num_clause_list = [ self.__backward_code(clause) for clause in self.dynamic_system.clauses ] if self.__include_aux_clauses: num_clause_list += [ self.__backward_code(clause) for clause in self.dynamic_system.aux_clauses ] self.__dynamic_constraints = [num_clause_list] ## Shifting variables: implementation of the shift operator ################ def __shift_clause(self, numeric_clause): """Shift a clause for the current __shift_step (no used anymore, replaced by direct use of C++ code) Basically, `shift_step` is added to positive variables and subtracted from negative variables in `numeric_clause`. About unfolder lock and shift_step: self.shift_step must be frozen in order to avoid problems during the unflatten() step of RawSolutions. => Each step MUST have the same number of variables. Thus, we lock the Unfolder by turning self.__locked to True. @param numeric_clause: DIMACS clause @warning: lock the unfolder """ # Froze unfolder to avoid modifications of shift_step self.__locked = True # Old API # Less efficient with abs() # return [(abs(lit) + self.__shift_step) * (-1 if lit < 0 else 1) # for lit in numeric_clause] # More efficient with ternary assignment # return [(lit + self.__shift_step) if lit > 0 else (lit - self.__shift_step) # for lit in numeric_clause] # New API via C++ module return shift_clause(numeric_clause, self.__shift_step) def __shift_ground_clause(self, numeric_clause, nb_steps): """Shift a clause for the given step number Each literal will be shifted by (nb_steps * __shift_step). Why ? Such function is used for clauses that are not already shifted during previous steps. It is the case of variant_constraints which come from a reloaded/forced trajectory and is composed of ground variables. About unfolder lock and shift_step: self.shift_step must be frozen in order to avoid problems during the unflatten() step of RawSolutions. => Indeed, each step MUST have the same number of variables. Thus, we lock the Unfolder by turning self.__locked to True. Called by: __shift_variant() @param numeric_clause: DIMACS clause @param nb_steps: number of shifts asked @warning: lock the unfolder """ # Froze unfolder to avoid modifications of shift_step self.__locked = True shift_offset = self.__shift_step * nb_steps return [ (lit + shift_offset) if lit > 0 else (lit - shift_offset) for lit in numeric_clause ] def __shift_dynamic(self): """Shift clauses representing the dynamics X' = f(X,I,C) Shift the last item of self.__dynamic_constraints and append the result to self.__dynamic_constraints. .. note:: Called by shift() """ if len(self.__precomputed_dynamic_constraints) > self.__current_step: # Recall the saved state of the system for the next step self.__dynamic_constraints.append( self.__precomputed_dynamic_constraints[self.__current_step] ) LOGGER.info( "shift_dynamic:: OPTI DO NOT SHIFT; " "Reload dynamic constraints; step: %s", self.__current_step, ) return # System has not been shifted for this step until now LOGGER.info( "shift_dynamic:: SHIFT; " "Set dynamic constraints; step: %s; constraints nb: %s", self.__current_step, len(self.__dynamic_constraints), ) # Old API # self.__dynamic_constraints.append( # [self.__shift_clause(clause) # for clause in self.__dynamic_constraints[-1]] # ) # New API via C++ module # __dynamic_constraints: # List of lists of DIMACS encoded clauses (lists of ints) self.__dynamic_constraints.append( shift_dimacs_clauses( self.__dynamic_constraints[-1], # <list <list <int>>> self.__shift_step ) ) def __shift_initial(self): """Shift initialisation condition Shift + replace __initial_constraints .. note:: Called by: - shift() - init_backward_unfolding() """ self.__initial_constraints = shift_dimacs_clauses( self.__initial_constraints, self.__shift_step ) def __shift_final(self): """Shift final property Shift + replace __final_constraints .. note:: Called by: - shift() - init_forward_unfolding() """ self.__final_constraints = shift_dimacs_clauses( self.__final_constraints, self.__shift_step ) def __shift_invariant(self): """Shift invariant property Shift + append of the last element of __invariant_constraints .. note:: Called by: - shift() - init_forward_unfolding() - init_backward_unfolding() """ if self.__invariant_constraints: # New API via C++ module self.__invariant_constraints.append( shift_dimacs_clauses( self.__invariant_constraints[-1], self.__shift_step ) ) def __shift_variant(self): """Shift variant property - depends on unfolding direction Shift clauses of the current step in __precomputed_variant_constraints, and append them to __variant_constraints. Process: - Take the clauses (constraint) of the current step - Shift the clauses - Append them to __variant_constraints Example: current_step: 2 (shift for step 3) [[], [[4], [4]], [[6], [7]], []] => Only [[6], [7]] is shifted .. note:: Called by shift() """ if not self.__precomputed_variant_constraints: return if self.__shift_direction == "FORWARD": # Constraint t0 already included from the query during initialisation current_constraint = self.__precomputed_variant_constraints[self.__current_step] # Shift constraints at current step and append them to __variant_constraints self.__variant_constraints += [ self.__shift_ground_clause(clause, self.__current_step) for clause in current_constraint ] elif self.__shift_direction == "BACKWARD": raise MCLException("Shift direction 'BACKWARD' is not yet implemented") else: raise MCLException("Shift unsupported direction: " + self.__shift_direction)
[docs] def shift(self): """Shift all clauses of the system to prepare it for the next step. self.__current_step is incremented here! Modified attributes: - __dynamic_constraints[-1] (last element) Shift + append of the last element of __dynamic_constraints - __invariant_constraints[-1] (last element) Shift + append of the last element of __invariant_constraints - __precomputed_variant_constraints[current_step] Shift clauses of the current step in __precomputed_variant_constraints, and append them to __variant_constraints. - __final_constraints if __shift_direction is "FORWARD" - __initial_constraints if __shift_direction is "BACKWARD" PS: __initial_constraints are left as this for now (shifted only with BACKWARD shift direction) Called at each step by: - squery_solve() - squery_is_satisfied() """ # Froze unfolder to avoid modifications of shift_step self.__locked = True self.__shift_dynamic() self.__shift_invariant() self.__shift_variant() if self.__shift_direction == "FORWARD": self.__shift_final() elif self.__shift_direction == "BACKWARD": self.__shift_initial() else: # Shift direction must be set raise MCLException("Shift unsupported direction: " + self.__shift_direction) # Increment the current step self.__current_step += 1 LOGGER.info("shift:: done; new current_step: %s", self.__current_step)
## Coding of properties #################################################### def __compile_property(self, property_text): """Compile a property (logical formula) into clauses Type checking uses the symbol table of dynamic_system. Error reporting is made through the dynamic_system reporter. .. warning:: MODIFIES __lit_cpt for numbering of auxiliary variables (not coding) .. note:: Called by: - __init_initial_constraint_0() - __init_final_constraint_0() - __init_invariant_constraint_0() """ if self.__locked: raise MCLException("Trying to compile property with a locked unfolder") # Syntax analyser and type checker # Compile a condition expression to a tree representation tree_prop = compile_cond( property_text, self.dynamic_system.symb_tab, self.dynamic_system.report ) if tree_prop is None: raise MCLException( "error during parsing of '%s', please check the property syntax" % property_text ) # Avoid name collisions of aux var prop_visitor = PropertyVisitor(self.__lit_cpt) tree_prop.accept(prop_visitor) self.__lit_cpt = prop_visitor.cpt # avoid name collisions of aux var return prop_visitor.clauses def __compile_event(self, property_text): """Compile an event (biosignal expression) into clauses Type checking uses the symbol table of dynamic_system. Error reporting is made through the dynamic_system reporter. .. warning:: MODIFIES __lit_cpt for numbering of auxiliary variables (not coding) .. note:: Called by: - __init_variant_constraints_0() """ if self.__locked: raise MCLException("Trying to compile event with a locked unfolder") # Syntax analyser and type checker # Compile an event expression to a tree representation # tree_prop is the event expression, # ste the state events (s#> ..) used and # fcl is the free clocks used in the event expression. tree_prop, ste, fcl = compile_event( property_text, self.dynamic_system.symb_tab, True, # Collect free clocks (events) self.dynamic_system.report, ) if tree_prop is None: raise MCLException( "error during parsing of '%s', please check the event syntax" % property_text ) ## DEBUG clauses encoding print("treeprop, ste, fcl", tree_prop, ste, fcl) ## DEBUG clauses encoding print("lit cpt before", self.__lit_cpt) # Avoid name collisions of aux var prop_visitor = PropertyVisitor(self.__lit_cpt) tree_prop.accept(prop_visitor) self.__lit_cpt = prop_visitor.cpt # avoid name collisions of aux var ## DEBUG clauses encoding print("event visitor clauses", prop_visitor.clauses) ## DEBUG clauses encoding print("lit cpt after", self.__lit_cpt) # raise Exception("event") return prop_visitor.clauses def __init_initial_constraint_0(self, no_frontier_init=True): """Code initial constraints in a numerical clause. Initialisation of `__initial_constraints`: List of DIMACS clauses (lists of values). Automatic merge of textual and DIMACS forms: If initial property is set (in text format of logical formulas, or in dimacs format of numerical values), this function generates constraints clauses in numerical form. We have to wait until all variables are num coded before shifting anything! .. note:: Compilation of properties in text format (logical formulas) to numeric format is expensive when they must be parsed at each query. :key no_frontier_init: (Optional) Boolean to force all variables of places/entities that are not frontiers to be disabled at the initialization step. Default: True .. warning:: We avoid inactivation/activation collisions if such places are explicitly mentioned in textual properties (only). See :meth:`__prune_initial_no_frontier_constraint_0`. DIMACS properties are not checked. :type no_frontier_init: <boolean> """ self.__initial_constraints = list() ## TODO: optimize this... ## The list of literals due to __initial_property (and by extension ## __no_frontier_init) may be invariable and is reprocessed here at ## each reload of the unfolder. ## The literals in banished solutions represent a variable part of ## __initial_constraints and become more and more numerous at each run. ## __initial_constraints is huge and will be, shifted at every step for ## backward search (not used), or simply reloaded at each step as this ## for forward search. ## Caching __initial_constraints could be possible if an extra set of ## constraints is dedicated to banished solutions... if self.__initial_property: # Compile initial property from text logical formulas to numeric form # __initial_property: is taken from the current query # Costly... self.__initial_constraints += [ self.__code_clause(clause) for clause in self.__compile_property(self.__initial_property) ] if no_frontier_init: # TODO: useless if __initial_property has not changed... self.temp_initial_values.update( abs(val) for val in it.chain(*self.__initial_constraints) ) if self.__dimacs_initial: # Add DIMACS clauses self.__initial_constraints += self.__dimacs_initial def __init_final_constraint_0(self): """Code final constraints in a numerical clause. Automatic merge of textual and DIMACS forms: If final property is set (in text format of logical formulas, or in dimacs format of numerical values), this function generates constraints clauses in numerical form. Initialisation of self.__final_constraints: List of DIMACS clauses (lists of values). We have to wait until all variables are num coded before shifting anything! .. note:: Compilation of properties in text format (logical formulas) to numeric format is expensive when they must be parsed at each query. """ self.__final_constraints = list() if self.__final_property: # compile initial (X0) property into numeric form # Ex: [$Px$] => self.__final_constraints = [[7]] self.__final_constraints += [ self.__code_clause(clause) for clause in self.__compile_property(self.__final_property) ] if self.__dimacs_final: # Add DIMACS clauses self.__final_constraints += self.__dimacs_final def __init_invariant_constraint_0(self, no_frontier_init=True): """Code final constraints in a numerical clause. Automatic merge of textual and DIMACS forms: If invariant property is set (in text format of logical formulas, or in DIMACS format of numeric values), this function generates constraints clauses in numeric form. Initialisation of self.__invariant_constraints: List of lists of DIMACS clauses (lists of values). We have to wait until all variables are num coded before shifting anything! :Example: [[[12], [1]], [[58], [47]], [[104], [93]]] t0 t1 t2 .. note:: __invariant_property and __dimacs_invariant should contain at most 1 item; it's the initial state only (will be shifted later). .. note:: pourquoi une liste de listes ? La propriété invariante doit etre vérifiée à chaque étape de la recherche de satisfiabilité (tout comme variant_constraints). `1 étape = 1 item = 1 ensemble de clauses`, forçant le respect de la propriété définie au départ. .. note:: Compilation of properties in text format (logical formulas) to numeric format is expensive when they must be parsed at each query. :key no_frontier_init: (Optional) Boolean to force all variables of places/entities that are not frontiers to be disabled at the initialization step. Default: True .. warning:: We avoid inactivation/activation collisions if such places are explicitly mentioned in textual properties (only). See :meth:`__prune_initial_no_frontier_constraint_0`. DIMACS properties are not checked. :type no_frontier_init: <boolean> """ self.__invariant_constraints = list() if self.__invariant_property: # compile initial (X0) property into numeric form self.__invariant_constraints.append( [ self.__code_clause(clause) for clause in self.__compile_property(self.__invariant_property) ] ) if no_frontier_init: # TODO: useless if __invariant_property has not changed... self.temp_initial_values.update( abs(val) for val in it.chain(*self.__invariant_constraints[0]) ) if self.__dimacs_invariant: # Add DIMACS clauses if self.__invariant_constraints: # Merge initial state self.__invariant_constraints[0].append(self.__dimacs_invariant[0]) else: self.__invariant_constraints.append(self.__dimacs_invariant) def __init_variant_constraints_0(self): """Code variant constraints in a numerical clause. .. TODO:: variant_property list d'étapes, chaque étape a une formule logique impliquant les évènements (et des places? NON, on utilise __compile_event() et pas __compile_property()) => ensemble **d'events** devant etre valides à chaque étape Automatic merge of textual and DIMACS forms: If variant_property is set, compile each property into clauses and encode these clauses. Clauses already in DIMACS form are added. The two variant constraint forms must be compatible (same length) in order to allow a merge. Attributes initialized: self.__precomputed_variant_constraints: Built here and is used later to shift current variant properties and update self.__variant_constraints in __shift_variant(). self.__variant_constraints: - the only element passed to the solver. - initilised with the first item of self.__precomputed_variant_constraints """ self.__variant_constraints = list() # Coding of variant properties # Take __variant_property in priority over __dimacs_variant if self.__variant_property: self.__precomputed_variant_constraints = list() # compile initial (X0) property into numeric form # For each property in step of __variant_property self.__precomputed_variant_constraints += [ [self.__code_clause(clause) for clause in self.__compile_event(prop)] for prop in self.__variant_property ] if self.__dimacs_variant: # Add DIMACS clauses # Check if the number of steps is equal if len(self.__variant_property) != len(self.__dimacs_variant): raise MCLException( "Incoherent variant properties; sizes of " "__variant_property and __dimacs_variant differ" ) # Merge together all steps content # Ex: # __precomputed_variant_constraints due to __variant_property convertion: # [[], [[4]], [], []] # __dimacs_variant: # [[], [[4]], [[6], [7]], []] # Result: # [[], [[4], [4]], [[6], [7]], []] self.__precomputed_variant_constraints = [ list(it.chain(*cpl)) for cpl in zip( self.__dimacs_variant, self.__precomputed_variant_constraints ) ] elif self.__dimacs_variant: # No __variant_property, keep only __dimacs_variant self.__precomputed_variant_constraints = self.__dimacs_variant # Initialisation of __variant_constraints if self.__precomputed_variant_constraints: if self.__shift_direction == "FORWARD": # For now, keep only the events of the first step (t0) # Other steps are taken during shift() calls self.__variant_constraints = list( self.__precomputed_variant_constraints[0] ) elif self.__shift_direction == "BACKWARD": raise MCLException("Shift direction 'BACKWARD' is not yet implemented") else: raise MCLException("Shift unsupported direction: " + self.__shift_direction) def __prune_initial_no_frontier_constraint_0(self, no_frontier_init=True): """Prevent the places mentioned in constraints to be inactivated at the initial state of the system. Why? This will return an unsatisfiable system whatever the constraints. How? Get all values in the recently compiled initial and invariant contraints. Then, built __no_frontier_init with allowed places and append it to initial_constraints. Positive side-effect: __no_frontier_init is cached as much as possible from a run to another. `__initial_constraints` is a list of DIMACS clauses (lists of values) composed of 4 types of clauses: 1 - non frontier literals (computed here) 2 - literals from text properties (initial/invariant) 3 - literals from DIMACS properties (initial/invariant) 4 - clauses from banished solutions (see :meth:`__init_initial_constraint_0`) If the textual properties do not change from one run to another, then only the clauses related to the banishment of the solutions change. :Example: Places that are not frontiers are inactivated; initial_constraints will be: [[-1],] Then you want to force the literal 1 to be activated under certain conditions in start_property or invariant_property of a query. You don't want to have the following initial_constraints because the system is still unsatisfiable whatever the constraints: [[-1], [1],] .. note:: About reproductible runs and constraints order: The order of clauses impacts the final result; In order to keep the results obtained with the old library; the literals of non boundaries must be placed at the beginning of __initial_constraints (not at the beginning). """ if not no_frontier_init: return # Note: temp_initial_values comes from: # __init_initial_constraint_0 # __init_invariant_constraint_0 if self.temp_initial_values == self.initial_values and self.__no_frontier_init: # __no_frontier_init is already initialiazed according to the # same initial/invariant textual properties LOGGER.info("RELOAD __no_frontier_init") # pas de += => pas reproductible avec les anciens runs self.__initial_constraints = self.__no_frontier_init + self.__initial_constraints return if self.temp_initial_values: # Different initial_values since the last run # Prune no_frontier_values (remove values in user constraints from them) LOGGER.info("INIT prune __no_frontier_init") pruned_no_frontier_values = self.no_frontier_values - self.temp_initial_values self.__no_frontier_init = [[-val] for val in pruned_no_frontier_values] else: # Take all no_frontier_values LOGGER.info("INIT basic __no_frontier_init") # Not reproductible with previous runs # self.__no_frontier_init = [ # [-not_front_val] # for not_front_val in sorted(self.no_frontier_values) # ] # Reproductible self.__no_frontier_init = [ [-self.__var_code_table[not_front_name]] for not_front_name in self.dynamic_system.no_frontiers ] # Keep temp_initial_values for the next run # LOGGER.debug( # "previous initial values: %s; vs new initial values: %s", # self.initial_values, self.temp_initial_values # ) self.initial_values = self.temp_initial_values # Disable all variables of places (entities) in the model (except frontiers) # pas de += => pas reproductible avec les anciens runs self.__initial_constraints = self.__no_frontier_init + self.__initial_constraints ## Whole initialisations ###################################################
[docs] def init_forward_unfolding(self): """Initialisation before generating constraints - forward trajectory Organization of the initialization of the system (its clauses) for the current request and first variables shift. .. warning:: ALL textual properties are susceptible to add new auxiliary variables in the system (increase shift_step). This is why the shift is made after initialization. Initialized attributes: - __initial_constraints: Query data + negative values of all places that are not frontiers. - __final_constraints: Query data - __invariant_constraints: List of Query data - __precomputed_variant_constraints: Query data (automatic merge of textual and DIMACS forms) - __variant_constraints: Initialized with the first item of __precomputed_variant_constraints - __precomputed_dynamic_constraints: Initialized with the whole previously shifted system if the size of the system hasn't changed, and if no auxiliary clauses has been added meanwhile. Shifted attributes with __shift_step (nb of variables in the system): - __dynamic_constraints: Shift dynamic_system.clauses + dynamic_system.aux_clauses + init - __final_constraints: Shift + replace __final_constraints - __invariant_constraints: Shift + append of the last element of __invariant_constraints PS: __variant_constraints si already initialized for the first step. The following functions use the following methods to decompile textual properties or events (__compile_property(), __compile_event()) and to code them into DIMACS clauses constraints (__code_clause()). __code_clause() is susceptible to add new auxiliary numeric variables during this process if a variable is not found in __var_code_table and in __aux_code_table. - __init_initial_constraint_0, __init_final_constraint_0, __init_invariant_constraint_0 Call __code_clause + __compile_property - __init_variant_constraints_0 Call __code_clause + __compile_event Called at the begining of squery_is_satisfied() and squery_solve() .. TODO:: Attention pour le moment les litéraux présents dans les attributs DIMACS de la query doivent déjà etre présents dans le modèle. Lorsqu'ils sont ajoutés aux variables \*constraints, à aucun moment les valeurs absentes de dynamic_constraints ne sont ajoutées à __aux_code_table et __aux_list; shift_step n'est pas non plus incrémenté !! """ old_shift_step = self.__shift_step self.__shift_direction = "FORWARD" self.__current_step = 1 self.__shift_step = self.shift_step_init # back to basic! self.__aux_code_table = dict() # flush auxiliary variables self.__aux_list = [] # idem # Init properties to generate all variable num codes ## TODO: Similar code with init_backward_unfolding, move this into a function self.__init_initial_constraint_0() self.__init_final_constraint_0() self.__init_invariant_constraint_0() self.__init_variant_constraints_0() # Should be after init_initial and init_invariant self.__prune_initial_no_frontier_constraint_0() # Now shifting is possible ## TODO: check shift_direction too (not urgent, backward is not usable) # PS: no check if the system has changed with the same number of literals # but different ones. This is assumed by init_with_query(). # __include_aux_clauses_changed is modified only if text properties # has changed if check_query is activated. # __shift_step is only impacted by text properties. if ( old_shift_step != self.__shift_step or self.__include_aux_clauses_changed or not self.__dynamic_constraints ): # The size of the system has changed, # or aux_clauses flag has been updated (so the system has changed), # or the system has never been initialized before. LOGGER.info("init_forward_unfolding:: New dynamic constraints") self.__forward_init_dynamic() self.__precomputed_dynamic_constraints = [] else: # Optimization for __dynamic_constraints # Main settings of the unfolder haven't changed and the system # was initialized before. LOGGER.info("init_forward_unfolding:: Reload dynamic constraints") # Save the whole system self.__precomputed_dynamic_constraints = self.__dynamic_constraints # Recall the initial state of the system self.__dynamic_constraints = [self.__dynamic_constraints[0]] self.__shift_final() self.__shift_invariant()
[docs] def init_backward_unfolding(self): """Initialisation before generating constraints - backward trajectory Never used (and backward not implemented in __init_variant_constraints_0) """ self.__shift_direction = "BACKWARD" self.__current_step = 0 self.__shift_step = self.shift_step_init # back to basic! self.__aux_code_table = dict() # flush auxiliary variables self.__aux_list = [] # idem # Init properties to generate all variable num codes self.__init_initial_constraint_0() self.__init_final_constraint_0() self.__init_invariant_constraint_0() self.__init_variant_constraints_0() # Should be after init_initial and init_invariant self.__prune_initial_no_frontier_constraint_0() # Now shifting is possible self.__backward_init_dynamic() self.__shift_initial() self.__shift_invariant()
## Solver interface ######################################################## def __load_solver(self, solv): """Add all the current clauses in the solver .. note:: Called by: - __constraints_satisfied() - __msolve_constraints() .. todo:: Avoid to reload the solver everytime just for a few changed variables. Ex: __initial_constraints is changed only because of __initial_property and __dimacs_initial changes. Ex: __dynamic_constraints is huge and stays the same. """ # New API via C++ module solv.add_clauses(self.__final_constraints) solv.add_clauses(it.chain(*self.__invariant_constraints)) solv.add_clauses(self.__variant_constraints) solv.add_clauses(it.chain(*self.__dynamic_constraints)) solv.add_clauses(self.__initial_constraints) if LOGGER.getEffectiveLevel() == DEBUG: # final LOGGER.debug("Load new solver !!") LOGGER.debug(">> final constraints: %s", len(self.__final_constraints)) LOGGER.debug(str(self.__final_constraints)) # trajectory invariant LOGGER.debug(">> trajectory invariant constraints: %s", len(self.__invariant_constraints)) LOGGER.debug(str(self.__invariant_constraints)) # trajectory variant LOGGER.debug(">> trajectory variant constraints: %s", len(self.__variant_constraints)) LOGGER.debug(str(self.__variant_constraints)) # dynamics LOGGER.debug(">> dynamic constraints: %s", len(self.__dynamic_constraints)) LOGGER.debug(str(self.__dynamic_constraints)) # initial LOGGER.debug(">> initial constraints: %s", len(self.__initial_constraints)) LOGGER.debug(str(self.__initial_constraints)) def __sync_stats(self, solver): """Sync local data (nb vars, nb clauses) with solver state if stats are activated (never besides in test environment). .. note:: Its the only way to set __nb_vars and __nb_clauses attributes. """ if version.parse(solver_version) < version.parse("5.6.9"): if solver.nb_vars() > self.__nb_vars: self.__nb_vars = solver.nb_vars() else: if solver.nb_vars > self.__nb_vars: self.__nb_vars = solver.nb_vars # Starting from pycryptosat 5.2, nb_clauses is private # Starting from 5.6.9 the attribute is exposed if solver.nb_clauses > self.__nb_clauses: self.__nb_clauses = solver.nb_clauses def __constraints_satisfied(self): """Ask the SAT solver if the query/system is satisfiable in the given number of steps. .. warning:: vvars (frontier values) are not tested. Thus the satisfiability is not tested for interesting variables like in squery_solve(). :return: True if the problem is satisfiable, False otherwise. :rtype: <boolean> """ solver = CryptoMS() # Load all clauses into the solver self.__load_solver(solver) if self.__stats: # If stats are activated (never besides in test environment): # sync local data (nb vars, nb clauses) with solver state self.__sync_stats(solver) # Is the problem satisfiable ? return solver.is_satisfiable() def __msolve_constraints(self, max_sol, vvars): """Get solutions from the solver :param max_sol: The maximum number of solutions to be returned :param vvars: Variables for which solutions must differ. In practice, it is a list with the values (Dimacs code) of all frontier places of the system. :type max_sol: <int> :type vvars: <list <int>> or <frozenset <int>> or any iterable :return: A tuple of RawSolution objects RawSolution objects contain a solution got from SAT solver with all variable parameters from the unfolder :rtype: <tuple <RawSolution>> """ solver = CryptoMS() # Load all clauses into the solver self.__load_solver(solver) if self.__stats: # If stats are activated (never besides in test environment): # sync local data (nb vars, nb clauses) with solver state self.__sync_stats(solver) if LOGGER.getEffectiveLevel() == DEBUG: # Get List of solutions (list of tuples of literals) lintsol = solver.msolve_selected(max_sol, vvars) LOGGER.info("__msolve_constraints :: max_sol : %s", max_sol) LOGGER.info("__msolve_constraints :: sol size : %s", [len(sol) for sol in lintsol]) # Make a RawSolution for each solution (tuple of literals) return [RawSolution(solint, self) for solint in lintsol] return tuple( RawSolution(solint, self) for solint in solver.msolve_selected(max_sol, vvars) )
[docs] def squery_is_satisfied(self, max_step): """Ask the SAT solver if the query/system is satisfiable in the given number of steps. If __invariant_property is set and __final_property is not set, 1 satisfiability test is made only after all shift() calls for each remaining step from the current step. Otherwise, 1 test is made for each remaining step after each shift() call. .. warning:: vvars (frontier values) are not tested. Thus the satisfiability is not tested for interesting variables like in squery_solve(). .. note:: This function checks and corrects incoherent data on step number. __steps_before_check can't be >= max_step. max_step can't be >= to __precomputed_variant_constraints. :param max_step: The horizon on which the properties must be satisfied. :type max_step: <int> """ # Initialization self.init_forward_unfolding() # Horizon adjustment if self.__precomputed_variant_constraints: max_step = min(max_step, len(self.__precomputed_variant_constraints) - 1) if self.__invariant_property and not self.__final_property: # 1 satisfiability test is made only after all shift() calls for each step # Shift all remaining steps while self.__current_step <= max_step: self.shift() return self.__constraints_satisfied() else: # 1 test is made for each remaining step after each shift() # Check and correct for incoherent data on step number # __steps_before_check can't be >= max_step, otherwise # __current_step could be > max_step due to shift() of "auto shift loop" # and this function will not search any solution, # and will return an empty list. if self.__steps_before_check >= max_step: self.__steps_before_check = max_step - 1 # Auto shift loop: Shift without checking (optimization) # Do not search solutions before the specified limit LOGGER.info( "Satis test:: OPTI AUTO SHIFT from current_step: %s " "to __steps_before_check: %s; max_step: %s", self.__current_step, self.__steps_before_check, max_step, ) while self.__current_step < self.__steps_before_check: self.shift() satisfiability_test = False while self.__current_step < max_step: # Shift the system self.shift() # Test satisfiability satisfiability_test = self.__constraints_satisfied() LOGGER.info("Satis test:: done, current_step %s", self.__current_step) if satisfiability_test: return True # Return the satisfiability status in all cases return satisfiability_test
[docs] def squery_solve(self, vvars, max_step, max_sol, max_user_step=None): """Search max_sol number of solutions with max_step steps, involving variables of interest (frontiers). Assert a query is loaded (start_condition, inv_condition, final_condition) If __invariant_property is set and __final_property is not set, 1 unique solution search is made only after all shift() calls for each remaining step from the current step. Otherwise, 1 test is made for each remaining step after each shift() call. .. note:: This function checks and corrects incoherent data on step number. __steps_before_check can't be >= max_step. max_step can't be >= to __precomputed_variant_constraints. .. note:: À propos de l'auto-ajustement du nombre maximal d'étapes (max_step) à l'aide de max_user_step: Lorsqu'il n'y a plus de solutions pour 1 étape donnée, mais que ce nombre d'étapes est < au nombre fixé par l'utilisateur, on peut bénéficier des shifts précédents sur le modèle et on éviter de reconstruire le problème pour un nouveau test de satisfiabilité à l'étape suivante. Le fait de rechercher 1 propriété est déjà un test de satisfiabilité. Rien de plus ou de moins n'est fait par le solveur. Concrètement squery_solve est capable d'incrémenter tout seul le max_step jusqu'au max_user_step durant l'étape de solving. Aucun risque d'augmenter le min_step lors de l'étape d'élagage d'une solution connue car : - le pb n'est pas moins contraint que lorsqu'il a trouvé la solution à élaguer => 1 solution au moins est toujours retournée. - max_user_step est laissé à None :param max_step: bounded horizon for computations :param max_sol: The maximum number of solutions to be returned :param vvars: Variables for which solutions must differ. In practice, it is a list with the values (Dimacs code) of all frontier places of the system. :key max_user_step: (Optional) If max_user_step is set, we increment the current max_step until it reaches max_user_step. This saves some cycles by reusing the current CLUnfolder for an extra step. .. seealso:: :meth:`cadbiom.models.clause_constraints.mcl.MCLAnalyser.__mac_exhaustive_search` :type max_step: <int> :type max_sol: <int> :type max_user_step: <int> or None :type vvars: <list <int>> :return: RawSolution objects RawSolution objects contain a solution got from SAT solver with all variable parameters from the unfolder :rtype: <tuple <RawSolution>> """ # Initialization self.init_forward_unfolding() # Horizon adjustment if self.__precomputed_variant_constraints: max_step = min(max_step, len(self.__precomputed_variant_constraints) - 1) if self.__invariant_property and not self.__final_property: # Shift all remaining steps while self.__current_step <= max_step: self.shift() # Solve the problem return self.__msolve_constraints(max_sol, vvars) else: # Check and correct for incoherent data on step number # __steps_before_check can't be >= max_step, otherwise # __current_step could be > max_step due to shift() of "auto shift loop" # and this function will not search any solution, # and will return an empty list. if self.__steps_before_check >= max_step: self.__steps_before_check = max_step - 1 # Auto shift loop: Shift without checking (optimization) # Do not search solutions before the specified limit LOGGER.info( "Solve:: OPTI AUTO SHIFT from current_step: %s " "to __steps_before_check: %s; max_step: %s", self.__current_step, self.__steps_before_check, max_step, ) while self.__current_step < self.__steps_before_check: self.shift() # Search for solutions for each remaining step # from __current_step = __steps_before_check # (__current_step is incremented by the last shift() above)... # ... to max_step excluded. # Ex: __steps_before_check = 3, __current_step = 3, max_step = 4 # => we search solutions only for 1 step (step 4) # # When __steps_before_check is 0 by default, or less than # (max_step - 1), we need to search solutions for every steps # until max_step is reached. # Otherwise, __steps_before_check is set to (maxstep - 1) only # to allow us to search solutions for the last step (max_step). raw_solutions = list() while self.__current_step < max_step: self.shift() temp_raw_solutions = self.__msolve_constraints(max_sol, vvars) print("Nb raw_solutions:", len(temp_raw_solutions), "steps_before_check", self.__steps_before_check, "max_step", max_step, "current_step", self.__current_step ) if ( not temp_raw_solutions and max_user_step and max_step + 1 <= max_user_step ): # No more solution for this step; if max_user_step is set # we increment the current max_step in order to reuse the #  current CLUnfolder. max_step += 1 LOGGER.info( "Solve:: Auto-adjust max_step from %s to %s ", max_step - 1, max_step, ) continue raw_solutions += temp_raw_solutions return raw_solutions
###############################################################################
[docs]class PropertyVisitor(object): """ SigExpression on states and events into a set of clauses Type checking must be done. """ def __init__(self, aux_cpt=0): self.cpt = aux_cpt # for auxiliary variable naming self.clauses = [] # generated clauses self.top = True # root of the formula?
[docs] def visit_sig_ident(self, tnode): """ ident -> literal """ name = tnode.name new_lit = Literal(name, True) if self.top: self.clauses.append(Clause([new_lit])) return None return new_lit
[docs] def visit_sig_not(self, node): """ translate a not """ top = self.top self.top = False # compile operand to nl <=> formula newl = node.operand.accept(self) notnl = newl.lit_not() if top: # we are at the root self.clauses.append(Clause([notnl])) return None return notnl # if we are not at the root
[docs] def visit_sig_sync(self, binnode): """ translate a binary (or/and) expression """ top = self.top self.top = False # the internal nodes will be affected with the name '_lit' name = "_lit" + str(self.cpt) self.cpt += 1 newl = Literal(name, True) notnl = Literal(name, False) # recursive visits operator = binnode.operator nl1 = binnode.left_h.accept(self) notnl1 = nl1.lit_not() nl2 = binnode.right_h.accept(self) notnl2 = nl2.lit_not() # clauses generation if operator == 'and': # x = lh and rh self.clauses.extend([ Clause([nl1, notnl]), # not x or not lh Clause([nl2, notnl]), # not x or not rh Clause([notnl1, notnl2, newl]) # x or not lh or not rh ]) if operator == 'or': # x = lh or rh self.clauses.extend([ Clause([notnl1, newl]), # x or not lh Clause([notnl2, newl]), # x or not rh Clause([nl1, nl2, notnl]) # not x or not lh or not rh ]) if top: self.clauses.append(Clause([newl])) return None return newl
[docs] def visit_sig_const(self, exp): """ translate a constant expression """ top = self.top if top: if exp.is_const_false(): raise TypeError("No constant False property allowed") else: # always present or satisfied return None # the internal nodes will be affected with the name '_lit' name = "_lit" + str(self.cpt) self.cpt += 1 newl = Literal(name, True) # clause generation if exp.value: lit = newl # x = True else: lit = newl.lit_not() self.clauses.append(Clause([lit])) return newl
## Time operators ##########################################################
[docs] def visit_sig_default(self, exp): """ default translation """ top = self.top self.top = False # the internal nodes will be affected the name '_lit' name = "_lit" + str(self.cpt) self.cpt += 1 newl = Literal(name, True) notnl = Literal(name, False) # recursive visits nl1 = exp.left_h.accept(self) notnl1 = nl1.lit_not() nl2 = exp.right_h.accept(self) notnl2 = nl2.lit_not() # clause generation: x = lh or rh cl1 = Clause([notnl1, newl]) # x or not lh cl2 = Clause([notnl2, newl]) # x or not rh cl3 = Clause([nl1, nl2, notnl]) # not x or not lh or not rh self.clauses.append(cl1) self.clauses.append(cl2) self.clauses.append(cl3) if top: self.clauses.append(Clause([newl])) return None return newl
[docs] def visit_sig_when(self, exp): """ when translation """ top = self.top self.top = False # the internal nodes will be affected the name '_lit' name = "_lit" + str(self.cpt) self.cpt += 1 newl = Literal(name, True) notnl = Literal(name, False) # recursive visits nl1 = exp.left_h.accept(self) notnl1 = nl1.lit_not() nl2 = exp.right_h.accept(self) notnl2 = nl2.lit_not() # clause generation: x = lh and rh cl1 = Clause([nl1, notnl]) # not x or not lh cl2 = Clause([nl2, notnl]) # not x or not rh cl3 = Clause([notnl1, notnl2, newl]) # x or not lh or not rh self.clauses.append(cl1) self.clauses.append(cl2) self.clauses.append(cl3) if top: self.clauses.append(Clause([newl])) return None return newl