# -*- coding: utf-8 -*-
## Filename : CLUnfolder.py
## Author(s) : Michel Le Borgne
## Created : 22 mars 2012
## Revision :
## Source :
##
## Copyright 2012 - 2020 IRISA/IRSET
##
## This library is free software; you can redistribute it and/or modify it
## under the terms of the GNU General Public License as published
## by the Free Software Foundation; either version 2.1 of the License, or
## any later version.
##
## This library is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY, WITHOUT EVEN THE IMPLIED WARRANTY OF
## MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. The software and
## documentation provided here under is on an "as is" basis, and IRISA has
## no obligations to provide maintenance, support, updates, enhancements
## or modifications.
## In no event shall IRISA be liable to any party for direct, indirect,
## special, incidental or consequential damages, including lost profits,
## arising out of the use of this software and its documentation, even if
## IRISA have been advised of the possibility of such damage. See
## the GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this library; if not, write to the Free Software Foundation,
## Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
##
## The original code contained here was initially developed by:
##
## Michel Le Borgne.
## IRISA
## Symbiose team
## IRISA Campus de Beaulieu
## 35042 RENNES Cedex, FRANCE
##
##
## Contributor(s): Geoffroy Andrieux IRSET
##
"""Main engine for constraints unfolding and solving.
Process from the intialization with a query to the finding of a solution.
MCLSimpleQuery (reminder):
Object containing 2 main types of attributes describing properties:
- Attributes in text format: These are logical formulas that are humanly
readable.
Ex: `start_prop, inv_prop, final_prop, variant_prop`
- Attributes in DIMACS format: These are logical formulas encoded in
the form of clauses containing numerical values.
Ex: `dim_start, dim_inv, dim_final, dim_variant`
---
About shifting clauses and variables:
Initialization of the dynamic system by __forward_init_dynamic() iterates on
all literals of the system.
If a literal is a t+1 literal, its value is shifted to the right or to the
left depending on whether its sign is positive or negative
(addition vs soustraction of the shift_step value which is the number of
variables in the system).
Otherwise, the literal is a t literal and its value stays untouched.
t+1 and t literals are generated by CLDynSys object directly from the current
model.
Shifting a clause consists to iterates on all its literals and shift their
values to the right or to the left depending on whether their values are
positive or negative.
\*constraints unfolder attributes are shifted at every step and submitted to
the solver.
---
init_with_query(query):
Copy of the query attributes to temporary attributes of the CLUnfolder.
Textual properties of query are compiled into numeric clauses in
init_forward_unfolding(), just at the beginning of squery_is_satisfied()
and squery_solve().
ALL textual properties are susceptible to add new auxiliary variables in the
system (increase shift_step). This is why any shift must be made after the
initialization.
init_forward_unfolding():
Organization of the initialization of the system (its clauses) for
the current request and first variables shift.
Initialized attributes:
- `__initial_constraints`:
Query data + negative values of all places that are not frontiers.
- `__final_constraints`:
Query data
- `__invariant_constraints`:
List of Query data
- `__precomputed_variant_constraints`:
Query data (automatic merge of textual and DIMACS forms)
- `__variant_constraints`:
Initialized with the first item of `__precomputed_variant_constraints`
- `__precomputed_dynamic_constraints`:
Initialized with the whole previously shifted system
if the size of the system hasn't changed, and if no auxiliary
clauses has been added meanwhile.
Shifted attributes with __shift_step (nb of variables in the system):
- __forward_init_dynamic(): __dynamic_constraints:
Shift dynamic_system.clauses + dynamic_system.aux_clauses + init
- __shift_final(): __final_constraints:
Shift + replace __final_constraints
- __shift_invariant(): __invariant_constraints:
Shift + append of the last element of __invariant_constraints
PS: __variant_constraints si already initialized for the first step
The following functions use the following methods to decompile textual
properties or events (__compile_property(), __compile_event()) and to
code them into DIMACS clauses constraints (__code_clause()).
__code_clause() is susceptible to add new auxiliary numerical variables
during this process if a variable is not found in __var_code_table and in
__aux_code_table.
- __init_initial_constraint_0, __init_final_constraint_0, __init_invariant_constraint_0
Call __code_clause + __compile_property
- __init_variant_constraints_0
Call __code_clause + __compile_event
shift():
Shift all clauses of the system to prepare it for the next step.
Called at each step by:
- squery_solve()
- squery_is_satisfied()
Modified attributes:
- __shift_dynamic(): __dynamic_constraints:
Shift + append of the last element of __dynamic_constraints
- __shift_variant(): __variant_constraints:
Shift clauses of the current step in __precomputed_variant_constraints,
and append them to __variant_constraints.
- __shift_invariant()
...
- __shift_final()
...
PS: __initial_constraints are left as this for now
(shifted only with BACKWARD shift direction).
"""
from __future__ import print_function
from packaging import version
#from pyCryptoMS import CryptoMS
from pycryptosat import Solver as CryptoMS
from pycryptosat import __version__ as solver_version
from cadbiom.models.biosignal.translators.gt_visitors import compile_cond, compile_event
from cadbiom.models.clause_constraints.CLDynSys import Clause, Literal
from cadbiom.models.clause_constraints.mcl.MCLSolutions import RawSolution, MCLException
from cadbiom import commons as cm
# C++ API
from _cadbiom import (
shift_clause,
shift_dimacs_clauses,
forward_code,
forward_init_dynamic,
)
# Standard imports
from logging import DEBUG
import itertools as it
LOGGER = cm.logger()
[docs]class CLUnfolder(object):
"""
When loading a model in a MCLAnalyser object, a CLUnfolder is generated which
implies the compilation of the dynamical system into a clause constraint
dynamical system and the DIMACS coding of all the variables.
This coding cannot be changed later.
The unfolding of constraints is performed efficiently on the numeric form
of the clause constraints. The CLUnfolder provide various method to convert
variable names to DIMACS code and back, to extract the frontier of a model
and to extract informations from raw solutions.
Dynamic constraints unfolding management:
Each variable is coded as an integer (in var_code_table and var_list).
Each clause is represented as a list of signed integers (DIMACS coding).
During unfolding, clauses are shifted either to the future (forward) or
to the past (backward). The shift operator is a simple addition
of self.__shift_step. The shift direction depends only on initializations.
self.__shift_step depends on the number of variables so it is impossible
to add variables while generating a trajectory unfolding.
Glossary:
- ground variables/ground DIMACS code: variables at time t=0 (not shifted).
- solution: a solution of the logical dynamic constraint from SAT solver
- state vector: list of DIMACS code of original (not shifted) variables
corresponding to a step.
- trajectory: list of state_vectors
Some attributes (Please read the constructor comments for much more information):
- `__*_property`: Logical formulas in text format from the current query.
- `__dimacs_*`: Clauses in DIMACS format from the current query.
- `__*_constraints`: CLUnfolder attributes for unfolding and shift;
initialized from the query attributes.
"""
def __init__(self, dynamic_system, debug=False):
"""
:param dynamic_system: Describe a dynamic system in clause form.
:key debug: (Optional) Used to activate debug mode in the Unfolder.
i.e. `__var_list` attribute will be ordered, and initializations of
the unfolder will be reproductible.
Default: False.
:type dynamic_system: <CLDynSys>
:type debug: <boolean>
"""
self.dynamic_system = dynamic_system # symbolic clause dynamic system
# shift_step equals to the total number of coded variables of the system
# (including inputs, entities, clocks/events, auxiliary variables)
# This number can change if auxiliary clauses are added.
# shift_step_init: the shift step ss (if n is X_0 code, n+ss is X_1 code)
self.shift_step_init = dynamic_system.get_var_number()
self.__shift_step = self.shift_step_init
# About unfolder lock and shift_step:
# shift_step must be frozen in order to avoid problems during the
# unflatten() step of RawSolutions.
# => Each step MUST have the same number of variables.
# Thus, we lock the Unfolder by turning self.__locked to True.
# See __shift_clause() and __shift_ground_clause()
self.__locked = False
self.__current_step = 1 # current number of unfolding
# For reachability optimisation. Number of shifts before checking
# the final property
self.__steps_before_check = 0
# assign a DIMACS code number to each variable (invariant)
# Create mappings between names and values of the variables of the system
# dynamic_system.base_var_set : Set of ALL variables of the dynamic system
# (including inputs, entities, clocks/events, auxiliary variables).
self.__var_code_table = dict() # Mapping: name -> DIMACS code (!= 0)
self.__var_list = ['##'] # Mapping: DIMACS code (!= 0) -> name
# Fix order of variables names with a list:
# - indexes of __var_list are the values of the variables at these indexes
# - values begin from 1 (0 is a blacklisted value)
if debug:
base_var_set = sorted(list(dynamic_system.base_var_set))
else:
base_var_set = list(dynamic_system.base_var_set)
self.__var_list += base_var_set
# keys: are the names of the variables, values: their values
self.__var_code_table = {
var_name: var_num for var_num, var_name in enumerate(base_var_set, 1)
}
# Optimization: for forward_init_dynamic() and forward_code() (mostly
# implemented in C.
# Precompute __var_code_table with the literal names in "future" format;
# i.e with a last char '`' at the end.
# Thus it is easy to search them directly in the dict (in O(1)),
# instead of slicing them.
# Note: "future" literals are already in self.dynamic_system.clauses
# and maybe in self.dynamic_system.aux_clauses
temp_var_code_table = {
var_name + "`": var_num
for var_name, var_num in self.__var_code_table.iteritems()
}
self.__var_code_table.update(temp_var_code_table)
assert len(self.__var_code_table) == 2 * self.__shift_step
# Include auxiliary clause to eliminate undesirable solutions
# If A->B has clock h, an aux constraint added is h included in h when A
# Must be set to False for inhibitors computations
self.__include_aux_clauses = True
# Flag used to know if the set of auxiliary clauses has changed between
# 2 initializations of the Unfolder by a query
# (i.e. properties of the queries are different).
# Cf init_forward_unfolding()
self.__include_aux_clauses_changed = False
self.__lit_cpt = self.shift_step_init + 1 # counter for aux. var. coding
# Same mapping tools but for auxiliary variables introduced by properties
# compilation. Cf __code_clause()
self.__aux_code_table = dict() # name to code
self.__aux_list = [] # code to name
# No frontier init:
# Set of DIMACS codes of the simple variables/places that are not frontiers
# Used to disable all variables of places in the model (except frontiers)
self.no_frontier_values = frozenset(
self.__var_code_table[not_front_name]
for not_front_name in self.dynamic_system.no_frontiers
)
# Used in __prune_initial_no_frontier_constraint_0() in order to
# compute values of literals that are specified in user constraints.
# These literals should not be blindly disabled by default in __no_frontier_init
self.initial_values = set()
# Ordered list of DIMACS codes of the places that are not frontiers
# => Clauses of negatives values
# Ex: [[-1], [-2], ...]
# The values are usually a subset of no_frontier_values
self.__no_frontier_init = list()
# Frontiers:
# Ordered list of DIMACS codes of the frontier variables/places
self.frontier_values = [
self.__var_code_table[front_name]
for front_name in self.dynamic_system.frontiers
]
self.frontier_values.sort()
## TODO: Utiliser une liste triée est-il utile ici ?
## si non passer utiliser un frozenset et supprimer les casts partout
## Cf RawSolution.frontier_pos_and_neg_values BACKWARD, encore utilisé avec index
## Cf TestCLUnfolder.test_frontier indexable mais peut etre contourné
# Precompute convenient attributes for:
# frontiers_pos_and_neg:
# - RawSolution.frontier_pos_and_neg_values
# (set operation with solution variables)
# frontiers_negative_values:
# - MCLQuery.from_frontier_sol_new_timing
# - MCLQuery.frontiers_negative_values
# - MCLAnalyser.__solve_with_inact_fsolution
# - TestCLUnfolder.test_prune
#
# Set of frontier positive and negative values
# (all frontiers and their opposite version).
# => operations with sets are much faster
self.frontiers_negative_values = frozenset(
-frontier for frontier in self.frontier_values
)
self.frontiers_pos_and_neg = self.frontiers_negative_values | frozenset(
self.frontier_values
)
# DIMACS codes of the input variables/places for extraction (inv)
self.__inputs = frozenset(
self.__var_code_table[inp] for inp in self.dynamic_system.inputs
)
# DIMACS codes of the free_clocks variables/places for extraction (invariant)
self.__free_clocks = frozenset(
self.__var_code_table[fcl] for fcl in self.dynamic_system.free_clocks
)
# Binding for a merged version of __inputs and __free_clocks
# Convenient attribute for RawSolution.extract_act_input_clock_seq()
# DIMACS codes of the input and free_clocks variables
self.inputs_and_free_clocks = self.__inputs | self.__free_clocks
# Properties to be checked
self.reset()
# Logical constraints:
# Result from unfolding of base constraints
# Boolean vectors signification:
# X: Current state of places (activated/unactivated)
# X': Future state of places
# H: Current 'free' events (present/not present) => ?
# I: Current inputs => ?
self.__dynamic_constraints = [] # DIMACS clauses: X' = f(X,H,I)
self.__precomputed_dynamic_constraints = []
# Simple temporal properties:
# SP(X0): Initial property/start property; Never change at each step.
# IP(X): Invariant property
# VP(X): Variant property
# List of logical formulas of properties forced at each step
# It's the trajectory of events of a solution.
# FP(X): Final property
self.__initial_constraints = [] # DIMACS clauses: C(X_0) <list <list <int>>>
self.__final_constraints = [] # idem: C(X_n)
self.__invariant_constraints = [] # DIMACS clauses: C(X_i))
self.__variant_constraints = [] # variant constraints along trajectory
self.__shift_direction = None # FORWARD or BACKWARD
# statistics on solver
self.__stats = False
self.__nb_vars = 0
self.__nb_clauses = 0
[docs] def reset(self):
"""Reset the unfolder before a new query
Reset only properties and dimacs clauses from the current query;
AND `__precomputed_variant_constraints`.
`__initial_constraints`, `__final_constraints`, `__invariant_constraints`,
`__variant_constraints` and `__dynamic_constraints` ARE NOT reset here
(see :meth:`init_forward_unfolding`).
This function is called from the constructor and during
:meth:`MCLAnalyser.sq_is_satisfiable()
<cadbiom.models.clause_constraints.mcl.MCLAnalyser.MCLAnalyser.sq_is_satisfiable>`
and :meth:`MCLAnalyser.sq_is_satisfiable()
<cadbiom.models.clause_constraints.mcl.MCLAnalyser.MCLAnalyser.sq_solutions>`
following the call of :meth:`init_with_query`.
"""
# Properties to be checked
self.__initial_property = None # logical formula - literal boolean expression
self.__dimacs_initial = None # list of DIMACS clauses
self.__final_property = None # logical formula
self.__dimacs_final = None # list of DIMACS clauses
self.__invariant_property = None # logical formula
self.__dimacs_invariant = None # list of DIMACS clauses
self.__variant_property = None # list<logic formulas>
self.__dimacs_variant = None # list<list<DIMACS clauses>>
# list of variant temporal constraints in DIMACS ground code
self.__precomputed_variant_constraints = None # list<list<DIMACS clauses>>
# Note: temp_initial_values is used in:
# __init_initial_constraint_0()
# __init_invariant_constraint_0()
# See __prune_initial_no_frontier_constraint_0()
self.temp_initial_values = set()
# If this function is called from the constructor,
# the following variables are just redefined here.
# For reachability optimisation. Number of shifts before checking
# the final property
self.__steps_before_check = 0
self.__shift_direction = None # FORWARD or BACKWARD
self.__locked = False
self.__lit_cpt = self.shift_step_init + 1
[docs] def check_query(self, query):
"""Check textual and DIMACS properties of the given query
If textual properties have changed since the last initialization of the
CLUnfolder, new auxiliary variables will be added to the dynamic system.
Also check if values of literals are in the limits of the dynamical
system (shift_step_init). If it's not the case, a ValueError exception
will be raised. Indeed, literals **can't** be used without a proper init.
.. TODO:: Keep in mind that adding auxiliary variables is possible in
the future.
Il faut remplir __aux_code_table, __aux_list, jusqu'à la valeur
du litéral inconnu dans le système et incrémenter les
variables en conséquence.
S'inspirer de __code_clause()
.. TODO:: We do not test if the textual events of query.variant_prop
involve events that are not in the model...
Called by:
:meth:`init_with_query`
:param query: Query built with start, invariant, final, variant properties
in textual and/or DIMACS forms.
:type query: <MCLSimpleQuery>
"""
# Check textual properties
if (
self.__initial_property != query.start_prop
or self.__final_property != query.final_prop
or self.__invariant_property != query.inv_prop
or self.__variant_property != query.variant_prop
):
# Properties have changed:
# The system will be modified; Auxiliary clauses will be added.
# Do not reload cached constraints. Cf init_forward_unfolding()
self.__include_aux_clauses_changed = True
## TODO: à repasser dans init_with_query ?
# Ne pas faire ce test systématiquement implique que les propriétés
# texte ne sont jamais modifiées entre chaque requete.
# Check literals
if query.dim_inv:
# Check dimacs invariant: Initial state only (will be shifted later)
# Ex: [[12]]
assert len(query.dim_inv) == 1
# Check values of literals
values = set(it.chain(*query.dim_start))
values.update(it.chain(*query.dim_final))
values.update(it.chain(*query.dim_inv))
if query.dim_variant:
# Handle empty steps:
# Ex: [[[12], [1]], [[58], [47]], []]
g = (step for step in query.dim_variant if step)
[values.update(it.chain(*step)) for step in g]
# Get all values that are not in the initial dynamic system
out_of_range_values = {val for val in values if abs(val) > self.shift_step_init}
if not out_of_range_values:
return
# The literal comes from an auxilary clause; not from the dynamic model.
raise ValueError(
"%s variable(s) is/are not in the initial system.\n"
"Insertion of literals from auxiliary clauses is not allowed here.\n"
"Since the auxiliary clauses are reset here, this literal is suspect\n"
"and its use dangerous because its value is not reproductible\n"
"(the value can refer to a different constraint from one run to another)."
% out_of_range_values
)
[docs] def init_with_query(self, query, check_query=True):
"""Initialise the unfolder with the given query
Following attributes are used from the query:
start_prop, dim_start, inv_prop, dim_inv, final_prop, dim_final,
variant_prop, dim_variant, steps_before_check
Textual properties of query are compiled into numeric clauses in
init_forward_unfolding(), just at the beginning of squery_is_satisfied()
and squery_solve().
This compilation step is costly due to ANTLR performances...
.. warning:: ALL textual properties are susceptible to add new auxiliary
variables in the system (increase shift_step). Thus, the system
will be rebuilt.
:param query: Query built with start, invariant, final, variant properties
in textual and/or DIMACS forms.
:key check_query: Check textual and DIMACS properties of the given query.
.. warning:: This option should be disable for performance reasons;
or if you know what you are doing.
You will assume in particular that:
- textual properties are not modified
- values of literals in DIMACS properties are already described
in the model (new literals are not allowed).
:type query: <MCLSimpleQuery>
:type check_query: <boolean>
"""
if check_query:
self.check_query(query)
# Reset the unfolder before a new query
self.reset()
# Init with query properties and clauses
self.__initial_property = query.start_prop # logical formula in text
self.__dimacs_initial = query.dim_start
self.__final_property = query.final_prop # logical formula in text
self.__dimacs_final = query.dim_final
self.__invariant_property = query.inv_prop # logical formula in text
self.__dimacs_invariant = query.dim_inv
# It's the trajectory of events of a solution.
self.__variant_property = query.variant_prop # logical formula in text
self.__dimacs_variant = query.dim_variant
# For reachability optimisation. Number of shifts before checking
# the final property (default 0)
self.__steps_before_check = query.steps_before_check
[docs] def set_stats(self):
"""Enable solver statistics (for tests)"""
self.__stats = True
self.__nb_vars = 0
self.__nb_clauses = 0
[docs] def unset_stats(self):
"""Disable solver statistics (never used)"""
self.__stats = False
[docs] def stats(self):
"""Display solver statistics"""
LOGGER.info("NB Variables in solver:", self.__nb_vars)
LOGGER.info("NB Clauses in solver:", self.__nb_clauses)
[docs] def set_include_aux_clauses(self, val):
"""Flag to include auxiliary clause to eliminate undesirable solutions.
If A->B has clock h, an aux constraint added is h included in h when A
Must be set to False for inhibitors computations.
.. warning:: This function impacts the shift_step number.
Tested by:
- __forward_init_dynamic
- __backward_init_dynamic
"""
self.__include_aux_clauses = val
self.__include_aux_clauses_changed = True
## Internal variable access for tests (never used) #########################
@property
def dynamic_constraints(self):
"""For tests: returns coded dynamic constraints"""
return self.__dynamic_constraints
@property
def initial_constraints(self):
"""For tests: returns coded initial constraints"""
return self.__initial_constraints
@property
def invariant_constraints(self):
"""For tests: returns coded invariant constraints"""
return self.__invariant_constraints
@property
def variant_constraints(self):
"""For tests: returns coded variant constraints"""
return self.__variant_constraints
@property
def final_constraints(self):
"""For tests: returns coded final constraints"""
return self.__final_constraints
## Variables management ####################################################
[docs] def var_names_in_clause(self, clause):
"""Return the names of the variables from values in the given numeric clause
(DEBUG never used)
"""
return [self.get_var_indexed_name(var) for var in clause]
[docs] def var_dimacs_code(self, var_name):
"""Returns DIMACS code of var_name (string) variable (for tests)"""
return self.__var_code_table[var_name]
[docs] def get_system_var_number(self):
"""Get number of variables in the clause constraint dynamical system
(including inputs, entities, clocks/events, auxiliary variables) (for tests)
.. note:: This number should be equal to the number of variables
in self.get_var_number()
"""
return self.dynamic_system.get_var_number()
[docs] def get_var_number(self):
"""Get number of principal variables (properties excluded) in the unfolder
(including inputs, entities, clocks/events, auxiliary variables) (for tests)
.. note:: This number should be equal to the number of variables
in self.get_system_var_number()
"""
# Remove the blacklisted first item (##) for variables naming
return len(self.__var_list) - 1
[docs] def get_var_name(self, var_num):
"""Get name of the variable
.. seealso:: Explanations on :meth:`get_var_indexed_name`
@param var_num: DIMACS literal coding of an initial variable
@return: name of the variable
"""
# Get the original var_code without shifts
var_code = (abs(var_num) - 1) % self.__shift_step + 1
if var_code <= self.shift_step_init:
# Variable num is less than the number of the variables in the system
# => variable from the initial system
return self.__var_list[var_code]
else:
# Auxiliary variable introduced by properties compilation
return self.__aux_list[var_code - self.shift_step_init - 1]
[docs] def get_var_indexed_name(self, var_num):
"""Get name of the variable with the time index appended
.. note:: time index is the number of steps since the begining of the
simulation.
:Example - Only standard variables::
__shift_step = 2 (number of variables in the system)
shift_step_init = 2 (number of variables in the system at the initial state)
__var_list = ["##", "n1", "n2"]
Virtual list of indexes in the state vector of a solution:
[0, 1, 2, 3, 4, 5, 6, 7, 8, ...]
#|n1,n2|n1,n2|n1,n2|n1,n2|
ti=0 ti=1 ti=2 ti=3
Given var_num = 7:
index of var_name in __var_list: ((7-1) % 2) + 1 = 1
var_name = "n1"
time index: (7 - 1) / 2 = 3
=> return: "n1_3"
:Example - With an auxiliary variable::
__shift_step = 2 (There is 1 auxiliary variable)
shift_step_init = 1
__var_list = ["##", "n1"]
__aux_list = ["_lit0"]
Virtual list of indexes in the state vector of a solution:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...]
#|n1,_lit0|n1,_lit0|n1,_lit0|
ti=0 ti=1 ti=2
If the index is > shift_step_init, then it is an auxiliary
variable. In order to get the index in __aux_list we have to
substract:
- `1` because on the contrary of __var_list, __aux_list begins at index 0
- `shift_step_init` because value of the first aux variable
follows the last value of __var_list
Given var_num = 6
var_code of var_name: ((6-1) % 2) + 1 = 2
2 > shift_step_init
index of var_name in __aux_list:
var_code - shift_step_init - 1
2 -1 -1 = 0
var_name = "_lit0"
=> return: "_lit0_2"
:param var_num: DIMACS literal coding
:type var_num: <int>
:return: name of the variable with the time index appended
:rtype: <str>
"""
varnum1 = abs(var_num)
# Get the original var_code without shifts
var_code = (varnum1 - 1) % self.__shift_step + 1
var_step = (varnum1 - var_code) / self.__shift_step
if var_code <= self.shift_step_init:
# Variable num is less than the number of the variables in the system
# => variable from the initial system
return self.__var_list[var_code] + "_%s" % var_step
else:
# Auxiliary variable introduced by properties compilation
index = var_code - self.shift_step_init - 1
return self.__aux_list[index] + "_%s" % var_step
@property
def free_clocks(self):
"""Get the DIMACS codes of the free_clocks variables"""
return self.__free_clocks
@property
def inputs(self):
"""Get the DIMACS codes of the input variables"""
return self.__inputs
@property
def shift_direction(self):
"""
@return: string "FORWARD" or "BACKWARD"
"""
return self.__shift_direction
@property
def shift_step(self):
"""
@return: the shift step ss (if n is X_0 code, n+ss is X_1 code)
"""
return self.__shift_step
@property
def current_step(self):
"""
@return: the current number of unfolding
"""
return self.__current_step
## Translation from names to num codes #####################################
## The translation depends on the shift direction
def __forward_code(self, clause):
"""Deprecated, directly included in C++ module
.. seelalso:: :meth:`forward_init_dynamic`
Numerically code a clause with the numeric codes found in
self.__var_code_table for a base variable x,
and numeric_code +/- shift_step (according to the sign) for x' variable.
.. note:: Future variables x' are noted "name`" in Literal names.
The absolute value of these variables will increase by shift_step
in the next step.
:param clause: A Clause object
:return: The DIMACS coding of the forward shifted clause
"""
# Old API
# num_clause = []
# for lit in clause.literals:
# if lit.name[-1] == '`': # t+1 variable
# num_clause.append(
# -(self.__var_code_table[lit.name[:-1]] + self.__shift_step) \
# if not lit.sign \
# else (self.__var_code_table[lit.name[:-1]] + self.__shift_step)
# )
# else: # t variable
# num_clause.append(
# -self.__var_code_table[lit.name] \
# if not lit.sign else self.__var_code_table[lit.name]
# )
# return num_clause
# New API via C++ module
return forward_code(clause, self.__var_code_table, self.__shift_step)
def __backward_code(self, clause):
"""(never used, backward is partially implemented)
numerically code a clause with the numeric code found in
self.__var_code_table + shift_step for a base variable x,
and numeric_code for x' variable integer coding increases in past steps
@param clause: a Clause object
@return: the DIMACS coding of the backward shifted clause
"""
num_clause = []
for lit in clause.literals:
if lit.name[-1] == "`": # t+1 variable
num_clause.append(
-self.__var_code_table[lit.name[:-1]]
if not lit.sign
else self.__var_code_table[lit.name[:-1]]
)
else: # t variable, base variable
num_clause.append(
-(self.__var_code_table[lit.name] + self.__shift_step)
if not lit.sign
else (self.__var_code_table[lit.name] + self.__shift_step)
)
return num_clause
def __code_clause(self, clause):
"""Numerically code a clause
The numerical values are found in:
- self.__var_code_table for variables in the dynamic system,
- or in self.__aux_code_table for other auxiliary variables;
We assume all variables are ground variables (not shifted).
.. warning:: This function MODIFIES SHIFT_STEP because we add a variable
in the system if a variable is found neither in __var_code_table nor
in __aux_code_table.
Thus, a new auxiliary variable will be created.
:param clause:
:return: List of numerical values corresponding to the literals
in the given clause.
:type clause: <Clause>
:rtype: <list <int>>
"""
num_clause = []
for lit in clause.literals:
# Get variable value with the given name
name = lit.name
if name in self.__var_code_table:
var_cod = self.__var_code_table[name]
elif name in self.__aux_code_table:
var_cod = self.__aux_code_table[name]
else:
# Create an auxiliary variable - modifies shift_step (nb of variables)
self.__shift_step += 1
# Mapping name to value code
self.__aux_code_table[name] = self.__shift_step
# Mapping value code to name
self.__aux_list.append(name)
# Define var_cod
var_cod = self.__shift_step
# Add the sign to the value
lit_code = var_cod if lit.sign else -var_cod
num_clause.append(lit_code)
return num_clause
## Dynamic initialisations #################################################
## TODO: move near init_forward_unfolding
def __forward_init_dynamic(self):
"""Dynamics initialisations.
Set dynamic constraints for a forward one step: X1 = f(X0)
Numerically code clauses with the numeric codes found in
self.__var_code_table for a base variable x,
and numeric_code +/- shift_step (according to the sign) for x' variable.
__dynamic_constraints is a list of lists of numeric clauses (lists of ints)
Each sublist of __dynamic_constraints corresponds to a step in the unfolder;
the last step is the last element.
.. seealso:: :meth:`__forward_code`
.. note:: Called by :meth:`init_forward_unfolding`
.. note:: Future variables x' are noted "name`" in Literal names.
The absolute value of these variables will increase by shift_step
in the next step.
"""
# New API via C++ module
if self.__include_aux_clauses:
# Auxiliary clauses are supported (default)
self.__dynamic_constraints = forward_init_dynamic(
self.dynamic_system.clauses,
self.__var_code_table,
self.__shift_step,
self.dynamic_system.aux_clauses,
)
else:
self.__dynamic_constraints = forward_init_dynamic(
self.dynamic_system.clauses, self.__var_code_table, self.__shift_step
)
# __dynamic_constraints is initialized and
# has now taken into account the auxiliary clauses flag
self.__include_aux_clauses_changed = False
def __backward_init_dynamic(self):
"""Dynamics initialisations. (never used, backward is partially implemented)
Set dynamic constraints for a forward one step: X0 = f(X1)
.. note:: Called by init_backward_unfolding()
"""
num_clause_list = [
self.__backward_code(clause) for clause in self.dynamic_system.clauses
]
if self.__include_aux_clauses:
num_clause_list += [
self.__backward_code(clause)
for clause in self.dynamic_system.aux_clauses
]
self.__dynamic_constraints = [num_clause_list]
## Shifting variables: implementation of the shift operator ################
def __shift_clause(self, numeric_clause):
"""Shift a clause for the current __shift_step
(no used anymore, replaced by direct use of C++ code)
Basically, `shift_step` is added to positive variables and subtracted
from negative variables in `numeric_clause`.
About unfolder lock and shift_step:
self.shift_step must be frozen in order to avoid problems during the
unflatten() step of RawSolutions.
=> Each step MUST have the same number of variables.
Thus, we lock the Unfolder by turning self.__locked to True.
@param numeric_clause: DIMACS clause
@warning: lock the unfolder
"""
# Froze unfolder to avoid modifications of shift_step
self.__locked = True
# Old API
# Less efficient with abs()
# return [(abs(lit) + self.__shift_step) * (-1 if lit < 0 else 1)
# for lit in numeric_clause]
# More efficient with ternary assignment
# return [(lit + self.__shift_step) if lit > 0 else (lit - self.__shift_step)
# for lit in numeric_clause]
# New API via C++ module
return shift_clause(numeric_clause, self.__shift_step)
def __shift_ground_clause(self, numeric_clause, nb_steps):
"""Shift a clause for the given step number
Each literal will be shifted by (nb_steps * __shift_step).
Why ?
Such function is used for clauses that are not already shifted during
previous steps. It is the case of variant_constraints which come from a
reloaded/forced trajectory and is composed of ground variables.
About unfolder lock and shift_step:
self.shift_step must be frozen in order to avoid problems during the
unflatten() step of RawSolutions.
=> Indeed, each step MUST have the same number of variables.
Thus, we lock the Unfolder by turning self.__locked to True.
Called by: __shift_variant()
@param numeric_clause: DIMACS clause
@param nb_steps: number of shifts asked
@warning: lock the unfolder
"""
# Froze unfolder to avoid modifications of shift_step
self.__locked = True
shift_offset = self.__shift_step * nb_steps
return [
(lit + shift_offset) if lit > 0 else (lit - shift_offset)
for lit in numeric_clause
]
def __shift_dynamic(self):
"""Shift clauses representing the dynamics X' = f(X,I,C)
Shift the last item of self.__dynamic_constraints and append the result
to self.__dynamic_constraints.
.. note:: Called by shift()
"""
if len(self.__precomputed_dynamic_constraints) > self.__current_step:
# Recall the saved state of the system for the next step
self.__dynamic_constraints.append(
self.__precomputed_dynamic_constraints[self.__current_step]
)
LOGGER.info(
"shift_dynamic:: OPTI DO NOT SHIFT; "
"Reload dynamic constraints; step: %s",
self.__current_step,
)
return
# System has not been shifted for this step until now
LOGGER.info(
"shift_dynamic:: SHIFT; "
"Set dynamic constraints; step: %s; constraints nb: %s",
self.__current_step,
len(self.__dynamic_constraints),
)
# Old API
# self.__dynamic_constraints.append(
# [self.__shift_clause(clause)
# for clause in self.__dynamic_constraints[-1]]
# )
# New API via C++ module
# __dynamic_constraints:
# List of lists of DIMACS encoded clauses (lists of ints)
self.__dynamic_constraints.append(
shift_dimacs_clauses(
self.__dynamic_constraints[-1], # <list <list <int>>>
self.__shift_step
)
)
def __shift_initial(self):
"""Shift initialisation condition
Shift + replace __initial_constraints
.. note:: Called by:
- shift()
- init_backward_unfolding()
"""
self.__initial_constraints = shift_dimacs_clauses(
self.__initial_constraints, self.__shift_step
)
def __shift_final(self):
"""Shift final property
Shift + replace __final_constraints
.. note:: Called by:
- shift()
- init_forward_unfolding()
"""
self.__final_constraints = shift_dimacs_clauses(
self.__final_constraints, self.__shift_step
)
def __shift_invariant(self):
"""Shift invariant property
Shift + append of the last element of __invariant_constraints
.. note:: Called by:
- shift()
- init_forward_unfolding()
- init_backward_unfolding()
"""
if self.__invariant_constraints:
# New API via C++ module
self.__invariant_constraints.append(
shift_dimacs_clauses(
self.__invariant_constraints[-1], self.__shift_step
)
)
def __shift_variant(self):
"""Shift variant property - depends on unfolding direction
Shift clauses of the current step in __precomputed_variant_constraints,
and append them to __variant_constraints.
Process:
- Take the clauses (constraint) of the current step
- Shift the clauses
- Append them to __variant_constraints
Example:
current_step: 2 (shift for step 3)
[[], [[4], [4]], [[6], [7]], []]
=> Only [[6], [7]] is shifted
.. note:: Called by shift()
"""
if not self.__precomputed_variant_constraints:
return
if self.__shift_direction == "FORWARD":
# Constraint t0 already included from the query during initialisation
current_constraint = self.__precomputed_variant_constraints[self.__current_step]
# Shift constraints at current step and append them to __variant_constraints
self.__variant_constraints += [
self.__shift_ground_clause(clause, self.__current_step)
for clause in current_constraint
]
elif self.__shift_direction == "BACKWARD":
raise MCLException("Shift direction 'BACKWARD' is not yet implemented")
else:
raise MCLException("Shift unsupported direction: " + self.__shift_direction)
[docs] def shift(self):
"""Shift all clauses of the system to prepare it for the next step.
self.__current_step is incremented here!
Modified attributes:
- __dynamic_constraints[-1] (last element)
Shift + append of the last element of __dynamic_constraints
- __invariant_constraints[-1] (last element)
Shift + append of the last element of __invariant_constraints
- __precomputed_variant_constraints[current_step]
Shift clauses of the current step in __precomputed_variant_constraints,
and append them to __variant_constraints.
- __final_constraints if __shift_direction is "FORWARD"
- __initial_constraints if __shift_direction is "BACKWARD"
PS: __initial_constraints are left as this for now
(shifted only with BACKWARD shift direction)
Called at each step by:
- squery_solve()
- squery_is_satisfied()
"""
# Froze unfolder to avoid modifications of shift_step
self.__locked = True
self.__shift_dynamic()
self.__shift_invariant()
self.__shift_variant()
if self.__shift_direction == "FORWARD":
self.__shift_final()
elif self.__shift_direction == "BACKWARD":
self.__shift_initial()
else:
# Shift direction must be set
raise MCLException("Shift unsupported direction: " + self.__shift_direction)
# Increment the current step
self.__current_step += 1
LOGGER.info("shift:: done; new current_step: %s", self.__current_step)
## Coding of properties ####################################################
def __compile_property(self, property_text):
"""Compile a property (logical formula) into clauses
Type checking uses the symbol table of dynamic_system.
Error reporting is made through the dynamic_system reporter.
.. warning:: MODIFIES __lit_cpt for numbering of auxiliary variables
(not coding)
.. note:: Called by:
- __init_initial_constraint_0()
- __init_final_constraint_0()
- __init_invariant_constraint_0()
"""
if self.__locked:
raise MCLException("Trying to compile property with a locked unfolder")
# Syntax analyser and type checker
# Compile a condition expression to a tree representation
tree_prop = compile_cond(
property_text,
self.dynamic_system.symb_tab,
self.dynamic_system.report
)
if tree_prop is None:
raise MCLException(
"error during parsing of '%s', please check the property syntax"
% property_text
)
# Avoid name collisions of aux var
prop_visitor = PropertyVisitor(self.__lit_cpt)
tree_prop.accept(prop_visitor)
self.__lit_cpt = prop_visitor.cpt # avoid name collisions of aux var
return prop_visitor.clauses
def __compile_event(self, property_text):
"""Compile an event (biosignal expression) into clauses
Type checking uses the symbol table of dynamic_system.
Error reporting is made through the dynamic_system reporter.
.. warning:: MODIFIES __lit_cpt for numbering of auxiliary variables
(not coding)
.. note:: Called by:
- __init_variant_constraints_0()
"""
if self.__locked:
raise MCLException("Trying to compile event with a locked unfolder")
# Syntax analyser and type checker
# Compile an event expression to a tree representation
# tree_prop is the event expression,
# ste the state events (s#> ..) used and
# fcl is the free clocks used in the event expression.
tree_prop, ste, fcl = compile_event(
property_text,
self.dynamic_system.symb_tab,
True, # Collect free clocks (events)
self.dynamic_system.report,
)
if tree_prop is None:
raise MCLException(
"error during parsing of '%s', please check the event syntax"
% property_text
)
## DEBUG clauses encoding print("treeprop, ste, fcl", tree_prop, ste, fcl)
## DEBUG clauses encoding print("lit cpt before", self.__lit_cpt)
# Avoid name collisions of aux var
prop_visitor = PropertyVisitor(self.__lit_cpt)
tree_prop.accept(prop_visitor)
self.__lit_cpt = prop_visitor.cpt # avoid name collisions of aux var
## DEBUG clauses encoding print("event visitor clauses", prop_visitor.clauses)
## DEBUG clauses encoding print("lit cpt after", self.__lit_cpt)
# raise Exception("event")
return prop_visitor.clauses
def __init_initial_constraint_0(self, no_frontier_init=True):
"""Code initial constraints in a numerical clause.
Initialisation of `__initial_constraints`:
List of DIMACS clauses (lists of values).
Automatic merge of textual and DIMACS forms:
If initial property is set (in text format of logical formulas, or
in dimacs format of numerical values), this function generates
constraints clauses in numerical form.
We have to wait until all variables are num coded before shifting anything!
.. note:: Compilation of properties in text format (logical formulas)
to numeric format is expensive when they must be parsed at each query.
:key no_frontier_init: (Optional) Boolean to force all variables of
places/entities that are not frontiers to be disabled at the
initialization step. Default: True
.. warning:: We avoid inactivation/activation collisions if such
places are explicitly mentioned in textual properties (only).
See :meth:`__prune_initial_no_frontier_constraint_0`.
DIMACS properties are not checked.
:type no_frontier_init: <boolean>
"""
self.__initial_constraints = list()
## TODO: optimize this...
## The list of literals due to __initial_property (and by extension
## __no_frontier_init) may be invariable and is reprocessed here at
## each reload of the unfolder.
## The literals in banished solutions represent a variable part of
## __initial_constraints and become more and more numerous at each run.
## __initial_constraints is huge and will be, shifted at every step for
## backward search (not used), or simply reloaded at each step as this
## for forward search.
## Caching __initial_constraints could be possible if an extra set of
## constraints is dedicated to banished solutions...
if self.__initial_property:
# Compile initial property from text logical formulas to numeric form
# __initial_property: is taken from the current query
# Costly...
self.__initial_constraints += [
self.__code_clause(clause)
for clause in self.__compile_property(self.__initial_property)
]
if no_frontier_init:
# TODO: useless if __initial_property has not changed...
self.temp_initial_values.update(
abs(val) for val in it.chain(*self.__initial_constraints)
)
if self.__dimacs_initial:
# Add DIMACS clauses
self.__initial_constraints += self.__dimacs_initial
def __init_final_constraint_0(self):
"""Code final constraints in a numerical clause.
Automatic merge of textual and DIMACS forms:
If final property is set (in text format of logical formulas, or
in dimacs format of numerical values), this function generates
constraints clauses in numerical form.
Initialisation of self.__final_constraints:
List of DIMACS clauses (lists of values).
We have to wait until all variables are num coded before shifting anything!
.. note:: Compilation of properties in text format (logical formulas)
to numeric format is expensive when they must be parsed at each query.
"""
self.__final_constraints = list()
if self.__final_property:
# compile initial (X0) property into numeric form
# Ex: [$Px$] => self.__final_constraints = [[7]]
self.__final_constraints += [
self.__code_clause(clause)
for clause in self.__compile_property(self.__final_property)
]
if self.__dimacs_final:
# Add DIMACS clauses
self.__final_constraints += self.__dimacs_final
def __init_invariant_constraint_0(self, no_frontier_init=True):
"""Code final constraints in a numerical clause.
Automatic merge of textual and DIMACS forms:
If invariant property is set (in text format of logical formulas, or
in DIMACS format of numeric values), this function generates
constraints clauses in numeric form.
Initialisation of self.__invariant_constraints:
List of lists of DIMACS clauses (lists of values).
We have to wait until all variables are num coded before shifting anything!
:Example:
[[[12], [1]], [[58], [47]], [[104], [93]]]
t0 t1 t2
.. note:: __invariant_property and __dimacs_invariant should contain
at most 1 item; it's the initial state only (will be shifted later).
.. note:: pourquoi une liste de listes ?
La propriété invariante doit etre vérifiée à chaque étape de la
recherche de satisfiabilité (tout comme variant_constraints).
`1 étape = 1 item = 1 ensemble de clauses`,
forçant le respect de la propriété définie au départ.
.. note:: Compilation of properties in text format (logical formulas)
to numeric format is expensive when they must be parsed at each query.
:key no_frontier_init: (Optional) Boolean to force all variables of
places/entities that are not frontiers to be disabled at the
initialization step. Default: True
.. warning:: We avoid inactivation/activation collisions if such
places are explicitly mentioned in textual properties (only).
See :meth:`__prune_initial_no_frontier_constraint_0`.
DIMACS properties are not checked.
:type no_frontier_init: <boolean>
"""
self.__invariant_constraints = list()
if self.__invariant_property:
# compile initial (X0) property into numeric form
self.__invariant_constraints.append(
[
self.__code_clause(clause)
for clause in self.__compile_property(self.__invariant_property)
]
)
if no_frontier_init:
# TODO: useless if __invariant_property has not changed...
self.temp_initial_values.update(
abs(val) for val in it.chain(*self.__invariant_constraints[0])
)
if self.__dimacs_invariant:
# Add DIMACS clauses
if self.__invariant_constraints:
# Merge initial state
self.__invariant_constraints[0].append(self.__dimacs_invariant[0])
else:
self.__invariant_constraints.append(self.__dimacs_invariant)
def __init_variant_constraints_0(self):
"""Code variant constraints in a numerical clause.
.. TODO:: variant_property list d'étapes, chaque étape a une formule
logique impliquant les évènements (et des places? NON,
on utilise __compile_event() et pas __compile_property())
=> ensemble **d'events** devant etre valides à chaque étape
Automatic merge of textual and DIMACS forms:
If variant_property is set, compile each property into clauses and
encode these clauses. Clauses already in DIMACS form are added.
The two variant constraint forms must be compatible (same length) in
order to allow a merge.
Attributes initialized:
self.__precomputed_variant_constraints:
Built here and is used later to shift current variant properties
and update self.__variant_constraints in __shift_variant().
self.__variant_constraints:
- the only element passed to the solver.
- initilised with the first item of
self.__precomputed_variant_constraints
"""
self.__variant_constraints = list()
# Coding of variant properties
# Take __variant_property in priority over __dimacs_variant
if self.__variant_property:
self.__precomputed_variant_constraints = list()
# compile initial (X0) property into numeric form
# For each property in step of __variant_property
self.__precomputed_variant_constraints += [
[self.__code_clause(clause) for clause in self.__compile_event(prop)]
for prop in self.__variant_property
]
if self.__dimacs_variant:
# Add DIMACS clauses
# Check if the number of steps is equal
if len(self.__variant_property) != len(self.__dimacs_variant):
raise MCLException(
"Incoherent variant properties; sizes of "
"__variant_property and __dimacs_variant differ"
)
# Merge together all steps content
# Ex:
# __precomputed_variant_constraints due to __variant_property convertion:
# [[], [[4]], [], []]
# __dimacs_variant:
# [[], [[4]], [[6], [7]], []]
# Result:
# [[], [[4], [4]], [[6], [7]], []]
self.__precomputed_variant_constraints = [
list(it.chain(*cpl))
for cpl in zip(
self.__dimacs_variant, self.__precomputed_variant_constraints
)
]
elif self.__dimacs_variant:
# No __variant_property, keep only __dimacs_variant
self.__precomputed_variant_constraints = self.__dimacs_variant
# Initialisation of __variant_constraints
if self.__precomputed_variant_constraints:
if self.__shift_direction == "FORWARD":
# For now, keep only the events of the first step (t0)
# Other steps are taken during shift() calls
self.__variant_constraints = list(
self.__precomputed_variant_constraints[0]
)
elif self.__shift_direction == "BACKWARD":
raise MCLException("Shift direction 'BACKWARD' is not yet implemented")
else:
raise MCLException("Shift unsupported direction: " + self.__shift_direction)
def __prune_initial_no_frontier_constraint_0(self, no_frontier_init=True):
"""Prevent the places mentioned in constraints to be inactivated
at the initial state of the system.
Why? This will return an unsatisfiable system whatever the constraints.
How? Get all values in the recently compiled initial and invariant
contraints. Then, built __no_frontier_init with allowed places and
append it to initial_constraints.
Positive side-effect: __no_frontier_init is cached as much as possible
from a run to another.
`__initial_constraints` is a list of DIMACS clauses (lists of values)
composed of 4 types of clauses:
1 - non frontier literals (computed here)
2 - literals from text properties (initial/invariant)
3 - literals from DIMACS properties (initial/invariant)
4 - clauses from banished solutions
(see :meth:`__init_initial_constraint_0`)
If the textual properties do not change from one run to another,
then only the clauses related to the banishment of the solutions
change.
:Example:
Places that are not frontiers are inactivated; initial_constraints
will be:
[[-1],]
Then you want to force the literal 1 to be activated under certain
conditions in start_property or invariant_property of a query.
You don't want to have the following initial_constraints because the
system is still unsatisfiable whatever the constraints:
[[-1], [1],]
.. note:: About reproductible runs and constraints order:
The order of clauses impacts the final result;
In order to keep the results obtained with the old library;
the literals of non boundaries must be placed at the beginning
of __initial_constraints (not at the beginning).
"""
if not no_frontier_init:
return
# Note: temp_initial_values comes from:
# __init_initial_constraint_0
# __init_invariant_constraint_0
if self.temp_initial_values == self.initial_values and self.__no_frontier_init:
# __no_frontier_init is already initialiazed according to the
# same initial/invariant textual properties
LOGGER.info("RELOAD __no_frontier_init")
# pas de += => pas reproductible avec les anciens runs
self.__initial_constraints = self.__no_frontier_init + self.__initial_constraints
return
if self.temp_initial_values:
# Different initial_values since the last run
# Prune no_frontier_values (remove values in user constraints from them)
LOGGER.info("INIT prune __no_frontier_init")
pruned_no_frontier_values = self.no_frontier_values - self.temp_initial_values
self.__no_frontier_init = [[-val] for val in pruned_no_frontier_values]
else:
# Take all no_frontier_values
LOGGER.info("INIT basic __no_frontier_init")
# Not reproductible with previous runs
# self.__no_frontier_init = [
# [-not_front_val]
# for not_front_val in sorted(self.no_frontier_values)
# ]
# Reproductible
self.__no_frontier_init = [
[-self.__var_code_table[not_front_name]]
for not_front_name in self.dynamic_system.no_frontiers
]
# Keep temp_initial_values for the next run
# LOGGER.debug(
# "previous initial values: %s; vs new initial values: %s",
# self.initial_values, self.temp_initial_values
# )
self.initial_values = self.temp_initial_values
# Disable all variables of places (entities) in the model (except frontiers)
# pas de += => pas reproductible avec les anciens runs
self.__initial_constraints = self.__no_frontier_init + self.__initial_constraints
## Whole initialisations ###################################################
[docs] def init_forward_unfolding(self):
"""Initialisation before generating constraints - forward trajectory
Organization of the initialization of the system (its clauses) for
the current request and first variables shift.
.. warning:: ALL textual properties are susceptible to add new auxiliary
variables in the system (increase shift_step).
This is why the shift is made after initialization.
Initialized attributes:
- __initial_constraints:
Query data + negative values of all places that are not frontiers.
- __final_constraints:
Query data
- __invariant_constraints:
List of Query data
- __precomputed_variant_constraints:
Query data (automatic merge of textual and DIMACS forms)
- __variant_constraints:
Initialized with the first item of __precomputed_variant_constraints
- __precomputed_dynamic_constraints:
Initialized with the whole previously shifted system
if the size of the system hasn't changed, and if no auxiliary
clauses has been added meanwhile.
Shifted attributes with __shift_step (nb of variables in the system):
- __dynamic_constraints:
Shift dynamic_system.clauses + dynamic_system.aux_clauses + init
- __final_constraints:
Shift + replace __final_constraints
- __invariant_constraints:
Shift + append of the last element of __invariant_constraints
PS: __variant_constraints si already initialized for the first step.
The following functions use the following methods to decompile textual
properties or events (__compile_property(), __compile_event()) and to
code them into DIMACS clauses constraints (__code_clause()).
__code_clause() is susceptible to add new auxiliary numeric variables
during this process if a variable is not found in __var_code_table and in
__aux_code_table.
- __init_initial_constraint_0, __init_final_constraint_0, __init_invariant_constraint_0
Call __code_clause + __compile_property
- __init_variant_constraints_0
Call __code_clause + __compile_event
Called at the begining of squery_is_satisfied() and squery_solve()
.. TODO:: Attention pour le moment les litéraux présents dans les attributs
DIMACS de la query doivent déjà etre présents dans le modèle.
Lorsqu'ils sont ajoutés aux variables \*constraints, à aucun moment
les valeurs absentes de dynamic_constraints ne sont ajoutées à
__aux_code_table et __aux_list; shift_step n'est pas non plus incrémenté !!
"""
old_shift_step = self.__shift_step
self.__shift_direction = "FORWARD"
self.__current_step = 1
self.__shift_step = self.shift_step_init # back to basic!
self.__aux_code_table = dict() # flush auxiliary variables
self.__aux_list = [] # idem
# Init properties to generate all variable num codes
## TODO: Similar code with init_backward_unfolding, move this into a function
self.__init_initial_constraint_0()
self.__init_final_constraint_0()
self.__init_invariant_constraint_0()
self.__init_variant_constraints_0()
# Should be after init_initial and init_invariant
self.__prune_initial_no_frontier_constraint_0()
# Now shifting is possible
## TODO: check shift_direction too (not urgent, backward is not usable)
# PS: no check if the system has changed with the same number of literals
# but different ones. This is assumed by init_with_query().
# __include_aux_clauses_changed is modified only if text properties
# has changed if check_query is activated.
# __shift_step is only impacted by text properties.
if (
old_shift_step != self.__shift_step
or self.__include_aux_clauses_changed
or not self.__dynamic_constraints
):
# The size of the system has changed,
# or aux_clauses flag has been updated (so the system has changed),
# or the system has never been initialized before.
LOGGER.info("init_forward_unfolding:: New dynamic constraints")
self.__forward_init_dynamic()
self.__precomputed_dynamic_constraints = []
else:
# Optimization for __dynamic_constraints
# Main settings of the unfolder haven't changed and the system
# was initialized before.
LOGGER.info("init_forward_unfolding:: Reload dynamic constraints")
# Save the whole system
self.__precomputed_dynamic_constraints = self.__dynamic_constraints
# Recall the initial state of the system
self.__dynamic_constraints = [self.__dynamic_constraints[0]]
self.__shift_final()
self.__shift_invariant()
[docs] def init_backward_unfolding(self):
"""Initialisation before generating constraints - backward trajectory
Never used (and backward not implemented in __init_variant_constraints_0)
"""
self.__shift_direction = "BACKWARD"
self.__current_step = 0
self.__shift_step = self.shift_step_init # back to basic!
self.__aux_code_table = dict() # flush auxiliary variables
self.__aux_list = [] # idem
# Init properties to generate all variable num codes
self.__init_initial_constraint_0()
self.__init_final_constraint_0()
self.__init_invariant_constraint_0()
self.__init_variant_constraints_0()
# Should be after init_initial and init_invariant
self.__prune_initial_no_frontier_constraint_0()
# Now shifting is possible
self.__backward_init_dynamic()
self.__shift_initial()
self.__shift_invariant()
## Solver interface ########################################################
def __load_solver(self, solv):
"""Add all the current clauses in the solver
.. note:: Called by:
- __constraints_satisfied()
- __msolve_constraints()
.. todo::
Avoid to reload the solver everytime just for a few changed variables.
Ex: __initial_constraints is changed only because of
__initial_property and __dimacs_initial changes.
Ex: __dynamic_constraints is huge and stays the same.
"""
# New API via C++ module
solv.add_clauses(self.__final_constraints)
solv.add_clauses(it.chain(*self.__invariant_constraints))
solv.add_clauses(self.__variant_constraints)
solv.add_clauses(it.chain(*self.__dynamic_constraints))
solv.add_clauses(self.__initial_constraints)
if LOGGER.getEffectiveLevel() == DEBUG:
# final
LOGGER.debug("Load new solver !!")
LOGGER.debug(">> final constraints: %s", len(self.__final_constraints))
LOGGER.debug(str(self.__final_constraints))
# trajectory invariant
LOGGER.debug(">> trajectory invariant constraints: %s", len(self.__invariant_constraints))
LOGGER.debug(str(self.__invariant_constraints))
# trajectory variant
LOGGER.debug(">> trajectory variant constraints: %s", len(self.__variant_constraints))
LOGGER.debug(str(self.__variant_constraints))
# dynamics
LOGGER.debug(">> dynamic constraints: %s", len(self.__dynamic_constraints))
LOGGER.debug(str(self.__dynamic_constraints))
# initial
LOGGER.debug(">> initial constraints: %s", len(self.__initial_constraints))
LOGGER.debug(str(self.__initial_constraints))
def __sync_stats(self, solver):
"""Sync local data (nb vars, nb clauses) with solver state
if stats are activated (never besides in test environment).
.. note:: Its the only way to set __nb_vars and __nb_clauses attributes.
"""
if version.parse(solver_version) < version.parse("5.6.9"):
if solver.nb_vars() > self.__nb_vars:
self.__nb_vars = solver.nb_vars()
else:
if solver.nb_vars > self.__nb_vars:
self.__nb_vars = solver.nb_vars
# Starting from pycryptosat 5.2, nb_clauses is private
# Starting from 5.6.9 the attribute is exposed
if solver.nb_clauses > self.__nb_clauses:
self.__nb_clauses = solver.nb_clauses
def __constraints_satisfied(self):
"""Ask the SAT solver if the query/system is satisfiable in the given
number of steps.
.. warning:: vvars (frontier values) are not tested. Thus the satisfiability
is not tested for interesting variables like in squery_solve().
:return: True if the problem is satisfiable, False otherwise.
:rtype: <boolean>
"""
solver = CryptoMS()
# Load all clauses into the solver
self.__load_solver(solver)
if self.__stats:
# If stats are activated (never besides in test environment):
# sync local data (nb vars, nb clauses) with solver state
self.__sync_stats(solver)
# Is the problem satisfiable ?
return solver.is_satisfiable()
def __msolve_constraints(self, max_sol, vvars):
"""Get solutions from the solver
:param max_sol: The maximum number of solutions to be returned
:param vvars: Variables for which solutions must differ.
In practice, it is a list with the values (Dimacs code)
of all frontier places of the system.
:type max_sol: <int>
:type vvars: <list <int>> or <frozenset <int>> or any iterable
:return: A tuple of RawSolution objects
RawSolution objects contain a solution got from SAT solver with all
variable parameters from the unfolder
:rtype: <tuple <RawSolution>>
"""
solver = CryptoMS()
# Load all clauses into the solver
self.__load_solver(solver)
if self.__stats:
# If stats are activated (never besides in test environment):
# sync local data (nb vars, nb clauses) with solver state
self.__sync_stats(solver)
if LOGGER.getEffectiveLevel() == DEBUG:
# Get List of solutions (list of tuples of literals)
lintsol = solver.msolve_selected(max_sol, vvars)
LOGGER.info("__msolve_constraints :: max_sol : %s", max_sol)
LOGGER.info("__msolve_constraints :: sol size : %s", [len(sol) for sol in lintsol])
# Make a RawSolution for each solution (tuple of literals)
return [RawSolution(solint, self) for solint in lintsol]
return tuple(
RawSolution(solint, self)
for solint in solver.msolve_selected(max_sol, vvars)
)
[docs] def squery_is_satisfied(self, max_step):
"""Ask the SAT solver if the query/system is satisfiable in the given
number of steps.
If __invariant_property is set and __final_property is not set,
1 satisfiability test is made only after all shift() calls for each
remaining step from the current step.
Otherwise, 1 test is made for each remaining step after each shift() call.
.. warning:: vvars (frontier values) are not tested. Thus the satisfiability
is not tested for interesting variables like in squery_solve().
.. note:: This function checks and corrects incoherent data on step number.
__steps_before_check can't be >= max_step.
max_step can't be >= to __precomputed_variant_constraints.
:param max_step: The horizon on which the properties must be satisfied.
:type max_step: <int>
"""
# Initialization
self.init_forward_unfolding()
# Horizon adjustment
if self.__precomputed_variant_constraints:
max_step = min(max_step, len(self.__precomputed_variant_constraints) - 1)
if self.__invariant_property and not self.__final_property:
# 1 satisfiability test is made only after all shift() calls for each step
# Shift all remaining steps
while self.__current_step <= max_step:
self.shift()
return self.__constraints_satisfied()
else:
# 1 test is made for each remaining step after each shift()
# Check and correct for incoherent data on step number
# __steps_before_check can't be >= max_step, otherwise
# __current_step could be > max_step due to shift() of "auto shift loop"
# and this function will not search any solution,
# and will return an empty list.
if self.__steps_before_check >= max_step:
self.__steps_before_check = max_step - 1
# Auto shift loop: Shift without checking (optimization)
# Do not search solutions before the specified limit
LOGGER.info(
"Satis test:: OPTI AUTO SHIFT from current_step: %s "
"to __steps_before_check: %s; max_step: %s",
self.__current_step,
self.__steps_before_check,
max_step,
)
while self.__current_step < self.__steps_before_check:
self.shift()
satisfiability_test = False
while self.__current_step < max_step:
# Shift the system
self.shift()
# Test satisfiability
satisfiability_test = self.__constraints_satisfied()
LOGGER.info("Satis test:: done, current_step %s", self.__current_step)
if satisfiability_test:
return True
# Return the satisfiability status in all cases
return satisfiability_test
[docs] def squery_solve(self, vvars, max_step, max_sol, max_user_step=None):
"""Search max_sol number of solutions with max_step steps, involving
variables of interest (frontiers).
Assert a query is loaded (start_condition, inv_condition, final_condition)
If __invariant_property is set and __final_property is not set,
1 unique solution search is made only after all shift() calls for each
remaining step from the current step.
Otherwise, 1 test is made for each remaining step after each shift() call.
.. note:: This function checks and corrects incoherent data on step number.
__steps_before_check can't be >= max_step.
max_step can't be >= to __precomputed_variant_constraints.
.. note:: À propos de l'auto-ajustement du nombre maximal d'étapes (max_step)
à l'aide de max_user_step:
Lorsqu'il n'y a plus de solutions pour 1 étape donnée, mais que ce
nombre d'étapes est < au nombre fixé par l'utilisateur, on peut
bénéficier des shifts précédents sur le modèle et on éviter de
reconstruire le problème pour un nouveau test de satisfiabilité à
l'étape suivante.
Le fait de rechercher 1 propriété est déjà un test de satisfiabilité.
Rien de plus ou de moins n'est fait par le solveur.
Concrètement squery_solve est capable d'incrémenter tout seul le
max_step jusqu'au max_user_step durant l'étape de solving.
Aucun risque d'augmenter le min_step lors de l'étape d'élagage d'une
solution connue car :
- le pb n'est pas moins contraint que lorsqu'il a trouvé la solution
à élaguer => 1 solution au moins est toujours retournée.
- max_user_step est laissé à None
:param max_step: bounded horizon for computations
:param max_sol: The maximum number of solutions to be returned
:param vvars: Variables for which solutions must differ.
In practice, it is a list with the values (Dimacs code)
of all frontier places of the system.
:key max_user_step: (Optional) If max_user_step is set, we increment the
current max_step until it reaches max_user_step. This saves some
cycles by reusing the current CLUnfolder for an extra step.
.. seealso:: :meth:`cadbiom.models.clause_constraints.mcl.MCLAnalyser.__mac_exhaustive_search`
:type max_step: <int>
:type max_sol: <int>
:type max_user_step: <int> or None
:type vvars: <list <int>>
:return: RawSolution objects
RawSolution objects contain a solution got from SAT solver with all
variable parameters from the unfolder
:rtype: <tuple <RawSolution>>
"""
# Initialization
self.init_forward_unfolding()
# Horizon adjustment
if self.__precomputed_variant_constraints:
max_step = min(max_step, len(self.__precomputed_variant_constraints) - 1)
if self.__invariant_property and not self.__final_property:
# Shift all remaining steps
while self.__current_step <= max_step:
self.shift()
# Solve the problem
return self.__msolve_constraints(max_sol, vvars)
else:
# Check and correct for incoherent data on step number
# __steps_before_check can't be >= max_step, otherwise
# __current_step could be > max_step due to shift() of "auto shift loop"
# and this function will not search any solution,
# and will return an empty list.
if self.__steps_before_check >= max_step:
self.__steps_before_check = max_step - 1
# Auto shift loop: Shift without checking (optimization)
# Do not search solutions before the specified limit
LOGGER.info(
"Solve:: OPTI AUTO SHIFT from current_step: %s "
"to __steps_before_check: %s; max_step: %s",
self.__current_step,
self.__steps_before_check,
max_step,
)
while self.__current_step < self.__steps_before_check:
self.shift()
# Search for solutions for each remaining step
# from __current_step = __steps_before_check
# (__current_step is incremented by the last shift() above)...
# ... to max_step excluded.
# Ex: __steps_before_check = 3, __current_step = 3, max_step = 4
# => we search solutions only for 1 step (step 4)
#
# When __steps_before_check is 0 by default, or less than
# (max_step - 1), we need to search solutions for every steps
# until max_step is reached.
# Otherwise, __steps_before_check is set to (maxstep - 1) only
# to allow us to search solutions for the last step (max_step).
raw_solutions = list()
while self.__current_step < max_step:
self.shift()
temp_raw_solutions = self.__msolve_constraints(max_sol, vvars)
print("Nb raw_solutions:", len(temp_raw_solutions),
"steps_before_check", self.__steps_before_check,
"max_step", max_step,
"current_step", self.__current_step
)
if (
not temp_raw_solutions
and max_user_step
and max_step + 1 <= max_user_step
):
# No more solution for this step; if max_user_step is set
# we increment the current max_step in order to reuse the
# current CLUnfolder.
max_step += 1
LOGGER.info(
"Solve:: Auto-adjust max_step from %s to %s ",
max_step - 1,
max_step,
)
continue
raw_solutions += temp_raw_solutions
return raw_solutions
###############################################################################
[docs]class PropertyVisitor(object):
"""
SigExpression on states and events into a set of clauses
Type checking must be done.
"""
def __init__(self, aux_cpt=0):
self.cpt = aux_cpt # for auxiliary variable naming
self.clauses = [] # generated clauses
self.top = True # root of the formula?
[docs] def visit_sig_ident(self, tnode):
"""
ident -> literal
"""
name = tnode.name
new_lit = Literal(name, True)
if self.top:
self.clauses.append(Clause([new_lit]))
return None
return new_lit
[docs] def visit_sig_not(self, node):
"""
translate a not
"""
top = self.top
self.top = False
# compile operand to nl <=> formula
newl = node.operand.accept(self)
notnl = newl.lit_not()
if top: # we are at the root
self.clauses.append(Clause([notnl]))
return None
return notnl # if we are not at the root
[docs] def visit_sig_sync(self, binnode):
"""
translate a binary (or/and) expression
"""
top = self.top
self.top = False
# the internal nodes will be affected with the name '_lit'
name = "_lit" + str(self.cpt)
self.cpt += 1
newl = Literal(name, True)
notnl = Literal(name, False)
# recursive visits
operator = binnode.operator
nl1 = binnode.left_h.accept(self)
notnl1 = nl1.lit_not()
nl2 = binnode.right_h.accept(self)
notnl2 = nl2.lit_not()
# clauses generation
if operator == 'and': # x = lh and rh
self.clauses.extend([
Clause([nl1, notnl]), # not x or not lh
Clause([nl2, notnl]), # not x or not rh
Clause([notnl1, notnl2, newl]) # x or not lh or not rh
])
if operator == 'or': # x = lh or rh
self.clauses.extend([
Clause([notnl1, newl]), # x or not lh
Clause([notnl2, newl]), # x or not rh
Clause([nl1, nl2, notnl]) # not x or not lh or not rh
])
if top:
self.clauses.append(Clause([newl]))
return None
return newl
[docs] def visit_sig_const(self, exp):
"""
translate a constant expression
"""
top = self.top
if top:
if exp.is_const_false():
raise TypeError("No constant False property allowed")
else: # always present or satisfied
return None
# the internal nodes will be affected with the name '_lit'
name = "_lit" + str(self.cpt)
self.cpt += 1
newl = Literal(name, True)
# clause generation
if exp.value:
lit = newl # x = True
else:
lit = newl.lit_not()
self.clauses.append(Clause([lit]))
return newl
## Time operators ##########################################################
[docs] def visit_sig_default(self, exp):
"""
default translation
"""
top = self.top
self.top = False
# the internal nodes will be affected the name '_lit'
name = "_lit" + str(self.cpt)
self.cpt += 1
newl = Literal(name, True)
notnl = Literal(name, False)
# recursive visits
nl1 = exp.left_h.accept(self)
notnl1 = nl1.lit_not()
nl2 = exp.right_h.accept(self)
notnl2 = nl2.lit_not()
# clause generation: x = lh or rh
cl1 = Clause([notnl1, newl]) # x or not lh
cl2 = Clause([notnl2, newl]) # x or not rh
cl3 = Clause([nl1, nl2, notnl]) # not x or not lh or not rh
self.clauses.append(cl1)
self.clauses.append(cl2)
self.clauses.append(cl3)
if top:
self.clauses.append(Clause([newl]))
return None
return newl
[docs] def visit_sig_when(self, exp):
"""
when translation
"""
top = self.top
self.top = False
# the internal nodes will be affected the name '_lit'
name = "_lit" + str(self.cpt)
self.cpt += 1
newl = Literal(name, True)
notnl = Literal(name, False)
# recursive visits
nl1 = exp.left_h.accept(self)
notnl1 = nl1.lit_not()
nl2 = exp.right_h.accept(self)
notnl2 = nl2.lit_not()
# clause generation: x = lh and rh
cl1 = Clause([nl1, notnl]) # not x or not lh
cl2 = Clause([nl2, notnl]) # not x or not rh
cl3 = Clause([notnl1, notnl2, newl]) # x or not lh or not rh
self.clauses.append(cl1)
self.clauses.append(cl2)
self.clauses.append(cl3)
if top:
self.clauses.append(Clause([newl]))
return None
return newl