Source code for cadbiom_cmd.tools.models

# -*- coding: utf-8 -*-
# Copyright (C) 2017-2020  IRISA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# The original code contained here was initially developed by:
#
#     Pierre Vignet.
#     IRISA
#     Dyliss team
#     IRISA Campus de Beaulieu
#     35042 RENNES Cedex, FRANCE
"""
This module groups functions directly related to the management and the
extraction of data of a Cadbiom model.

Here we find high-level functions to manage the logical formulas of the events
and conditions defining the transitions; as well as useful functions to manage
the entities, like to obtain their metadata or the frontier places of the model.
"""
from __future__ import unicode_literals
from __future__ import print_function

# Standard imports
from collections import defaultdict
import re
import json
import itertools as it
from logging import DEBUG

# Library imports
from cadbiom.models.guard_transitions.translators.chart_xml import MakeModelFromXmlFile
from cadbiom.models.biosignal.translators.gt_visitors import compile_cond
from cadbiom.models.biosignal.sig_expr import SigNotExpr, SigIdentExpr
from cadbiom.models.guard_transitions.analyser.ana_visitors import TableVisitor

from cadbiom.models.biosignal.translators.gt_visitors import Reporter, get_conditions_from_event


import cadbiom.commons as cm

LOGGER = cm.logger()


[docs]def get_transitions_from_model_file(model_file): """Get all transitions and parser from a model file (bcx format). :param: bcx file. :type: <str> :return: Transitions (see get_transitions()) and the Parser for the model. :rtype: <dict>, <MakeModelFromXmlFile> """ parser = MakeModelFromXmlFile(model_file) return get_transitions(parser), parser
[docs]def get_transitions(parser): """Get all transitions in the given parser. There are two methods to access the transitions of a model. :Example: .. code-block:: python >>> print(dir(parser)) ['handler', 'model', 'parser'] >>> # Direct access >>> events = list() >>> for transition in parser.model.transition_list: ... events.append(transition.event) >>> >>> # Indirect access via a handler >>> events = list() >>> for transitions in parser.handler.top_pile.transitions: ... # transitions is a list of CTransition objects ... for transition in transitions: ... events.append(transition.event) .. todo:: This function is relatively perfectible and although it is useful and mandatory for the design of networkx graphs based on solutions or models, it presents a rather heavy structure which dates from the time when the API of Cadbiom (of transition objects) was unknown and not documented. :param: Parser opened on a bcx file. :type: <MakeModelFromXmlFile> :return: A dictionnary of events as keys, and transitions as values. Since many transitions can define an event, values are lists. Each transition is a tuple with: origin node, final node, attributes like label and condition. ``{'h00': [('Ax', 'n1', {'label': 'h00[]'}),]`` :rtype: <dict <list <tuple <str>, <str>, <dict <str>: <str>>>> """ # NOTE: je devrais parler d'events au lieu de transitions... # voir si on peut retourner le parser pour faire tourner le static analysis ? # ou faire 1 fonction séparée qui parle plus du modèle lui meme que du graphe... # (ce que fait get_statistics d'ailleurs...) transitions = defaultdict(list) for trans in parser.model.transition_list: # Get the names of clocks # Some event have many clocks (like _h_2755) for the same # ori/ext entities, so we have to extract them and their respective # conditions if trans.event == "": # null event without clock => StartNodes # These nodes are used to resolve the problem of # Strongly Connected Components (inactivated cycles in the graph) # The nodes # Avoids having SigConstExpr as event type in get_conditions_from_event() # I create a transition (SCC-__start__?), # and a node (__start__?) for this case. trans.event = "SCC-" + trans.ori.name events = {trans.event: trans.condition} elif re.match("_h[\w]+", trans.event): # 1 event (with 1 clock) events = {trans.event: trans.condition} else: # Many events (with many clocks with condition(s)) # Get dict of events and conditions events = get_conditions_from_event(trans.event) for event, condition in events.iteritems(): # LOGGER.debug("NEW trans", event) # Handle multiple transitions for 1 event transitions[event].append( ( trans.ori.name, trans.ext.name, { "label": event, # + '[' + trans.condition + ']', "condition": condition, }, ) ) LOGGER.info("%s transitions loaded", len(transitions)) # Return a dict instead of defaultdict to avoid later confusions # (masked errors) by searching a transition that was not in the model... assert transitions, ( "No transitions found in the model ! " "Please check the names of events (_h_xxx)" ) # Forge return value return dict(transitions)
[docs]def get_frontier_places(transitions, all_places): """Return frontier places of a model (deducted from its transitions and from all places of the model). .. note:: why we use all_places from the model instead of (input_places - output_places) to get frontier places ? Because some nodes are only in conditions and not in transitions. If we don't do that, these nodes are missing when we compute valid paths from conditions. :param arg1: Model's transitions. {u'h00': [('Ax', 'n1', {u'label': u'h00[]'}),] :type arg1: <dict> keys: names of events values: list of transitions as tuples (with in/output, and label). :return: Set of frontier places. :rtype: <set> """ # Get transitions in events g = tuple(trans for event in transitions.values() for trans in event) # Get input nodes & output nodes # input_places = {trans[0] for trans in g} output_places = {trans[1] for trans in g} # Get all places that are not in transitions in the "output" place return set(all_places) - output_places
################################################################################
[docs]def get_places_from_condition(condition): """Parse condition string and return all places, regardless of operators. .. note:: This function is only used to get all nodes in a condition when we know they are all inhibitors nodes. .. todo:: See the workaround in the code, without using very time consuming and badly coded functions. :param: Condition string. :type: <str> :return: Set of places. :rtype: <set> """ # Valid but very time consuming like any other things in Cadbiom library # err = Reporter() # tvi = TableVisitor(err) # symb_tab = tvi.tab_symb # cond_sexpr = compile_cond(condition, symb_tab, err) # inhibitors_nodes = set() # possible_paths = decompile_condition(cond_sexpr, inhibitors_nodes) # return set(it.chain(*possible_paths)) # Replace parentheses first to make spaces in the string # As operators are followed or preceded by parentheses, we can detect them # without false positives (operator string inside an entity name) replacement = ("(", ")", " and ", " or ", " not ") for operator in replacement: condition = condition.replace(operator, " ") # Must be exempt of unauthorized chars return {elem for elem in condition.split(" ") if elem != ""}
[docs]def parse_condition(condition, all_nodes, inhibitors_nodes): """Return valid paths according the given logical formula and nodes; and set inhibitors_nodes .. note:: inhibitors_nodes is modified(set) by this function. :raises AssertionError: If no valid path was found. :param condition: Condition string of a transition. :param all_nodes: Nodes involved in transitions + frontier places. :param inhibitors_nodes: Inactivated nodes in paths of conditions. Modified by the function. :type condition: <str> :type inhibitors_nodes: <set> :type all_nodes: <set> :return: Set of paths. Each path is a tuple of nodes. :rtype: <set> """ LOGGER.debug("parse_condition: %s", condition) # Error Reporter reporter = Reporter() tvi = TableVisitor(reporter) # Link the lexer to the model allows to avoid error in Reporter # like: "-> dec -> Undeclared event or state" # In practice this is time consuming and useless for what we want to do # parser = MakeModelFromXmlFile(BIO_MOLDELS_DIR + # "Whole NCI-PID database translated into CADBIOM formalism(and).bcx") # parser.model.accept(tvi) # Get tree object from condition string cond_sexpr = compile_cond(condition, tvi.tab_symb, reporter) # Get all possible paths from the condition possible_paths = decompile_condition(cond_sexpr, inhibitors_nodes) # Prune possible paths according to: # - Inhibitor nodes that must be removed because they will never # be in the graph. # - All nodes in transitions (ori -> ext) because we know all transitions # in the graph, so we know which entities can be choosen to validate a path. # - All frontier places, that are known entities that can be in conditions # (not only in ori/ext) of transitions. # So: authorized nodes = frontier_places + transition_nodes - inhibitor nodes valid_paths = { tuple(path) for path in possible_paths if (set(path) - inhibitors_nodes).issubset(all_nodes) } # Debugging only if LOGGER.getEffectiveLevel() == DEBUG: LOGGER.debug("INHIBIT NODES: %s", inhibitors_nodes) LOGGER.debug("ALL NODES: %s", all_nodes) LOGGER.debug("POSSIBLE PATHS: %s", possible_paths) LOGGER.debug("VALID PATHS: %s", valid_paths) if len(valid_paths) > 1: LOGGER.debug( "Multiple valid paths in the model for: %s:\n%s", condition, valid_paths ) for path in possible_paths: pruned_places = set(path) - inhibitors_nodes isinsubset = pruned_places.issubset(all_nodes) LOGGER.debug("PRUNED PATH: %s, VALID: %s", pruned_places, isinsubset) if not valid_paths: LOGGER.debug("No valid path in the model for condition: %s", condition) raise AssertionError("No valid path in the model for condition: " + str(condition)) return valid_paths
[docs]def decompile_condition(tree, inhibitors_nodes): """Recursive function to decompile conditions :param tree: :Example of tree argument: .. code-block:: python tree = ('H', 'v', ( ('F', 'v', 'G'), '^', ( ('A', 'v', 'B'), '^', ('C', 'v', ('D', '^', 'E')) ) )) :param inhibitors_nodes: Set of inhibitors :type tree: <expression> :type inhibitors_nodes: <set> :return: List of valid paths composed of entities (except inhibitors). Inhibitors are added to `inhibitors_nodes`. """ # print("TREE", tree, type(tree), dir(tree)) if isinstance(tree, str): # terminal node path = [tree] solutions = [path] return solutions if isinstance(tree, SigNotExpr): # tree.operand: the entity, type: SigIdentExpr #LOGGER.debug("NOT OPERAND: %s, %s", tree.operand, type(tree.operand)) try: current_inhibitors = get_places_from_condition(str(tree.operand)) inhibitors_nodes.update(current_inhibitors) #LOGGER.debug("INHIBITORS found: %s", current_inhibitors) # SigIdentExpr if name attr path = [tree.operand.name] solutions = [path] return solutions except AttributeError: # Capture operands without "name" attribute (expression other than SigIdentExpr) tree = tree.operand if isinstance(tree, SigIdentExpr): path = [tree.name] solutions = [path] return solutions # Bin expression or similar (2 expressions linked with a logical operator) op = tree.operator lpaths = decompile_condition(tree.left_h, inhibitors_nodes) rpaths = decompile_condition(tree.right_h, inhibitors_nodes) if op == 'or': # or return list(it.chain(lpaths, rpaths)) # and assert op == 'and' return list(l + r for l, r in it.product(lpaths, rpaths))
################################################################################
[docs]def get_places_data(places, model): """Get a list of JSON data parsed from each given places in the model. This function is used by :meth:`cadbiom_cmd.models.low_model_info`. .. note:: v1 models return a dict with only 1 key: 'cadbiomName' .. note:: Start nodes (with a name like __start__x) are handled even with no JSON data. They are counted in the other_types and other_locations fields. :Example of JSON data that can be found in the model: .. code-block:: python { "uri": entity.uri, "entityType": entity.entityType, "names": list(entity.synonyms | set([entity.name])), "entityRef": entity.entityRef, "location": entity.location.name if entity.location else None, "modificationFeatures": dict(entity.modificationFeatures), "members": list(entity.members), "reactions": [reaction.uri for reaction in entity.reactions], "xrefs": entity.xrefs, } :param places: Iterable of name of places. :param model: Model from handler. :type places: <set> :type model: <MakeModelFromXmlFile> :return: List of data parsed from each give places. .. note:: Here is the list of field retrieved for v2 models: - cadbiomName - uri - entityType - entityRef - location - names - xrefs :rtype: <list <dict>> """ if model.xml_namespace == "http://cadbiom.genouest.org/v2/": # Fixed fields and default types json_note_fieldnames = { "uri": "", "entityType": "", "entityRef": "", "location": "", "names": list(), # Default type is 'list' for names (not '') "xrefs": dict(), } # Init final dictionary data = list() for place_name in places: try: # Model type 2 => We use JSON data in each nodes # Get JSON data ('' if the field is not present) json_data = json.loads(model.node_dict[place_name].note) except ValueError as exc: # Handle start nodes (name: __start__x) if exc.message == "No JSON object could be decoded": json_data = dict() temp = { fieldname: json_data.get(fieldname, default_data) for fieldname, default_data in json_note_fieldnames.items() } # Patch: Handle null values that should be avoided in cadbiom_writer.build_json_data() temp["names"] = [name for name in temp["names"] if name] # Add the cadbiom name (name attribute of xml element temp["cadbiomName"] = place_name data.append(temp) return data # v1 model: return only the name of the place return [{"cadbiomName": place_name} for place_name in places]
[docs]def get_model_identifier_mapping(model_file, external_identifiers): """Get Cadbiom names corresponding to the given external identifiers (xrefs) .. note:: This function works only on v2 formated models with JSON additional data :param model_file: Model file. :param external_identifiers: Set of external identifiers to be mapped. :type model_file: <str> :type external_identifiers: <set> :return: Mapping dictionary with external identifiers as keys and cadbiom names as values. :rtype: <dict <str>:<list>> """ # Get the model parser = MakeModelFromXmlFile(model_file) model = parser.handler.model assert model.xml_namespace == 'http://cadbiom.genouest.org/v2/', \ "Operation not supported: Only v2 models are supported." # Get all nodes places_data = get_places_data(parser.handler.node_dict.iterkeys(), model) # {'xrefs': {'bdd': [values],}, 'cadbiomName': '',} g = { place["cadbiomName"]: frozenset(it.chain(*place["xrefs"].itervalues())) for place in places_data } # Mapping: external_identifiers as keys and Cadbiom names as values mapping = defaultdict(set) for place, identifiers in g.iteritems(): common_identifiers = identifiers & external_identifiers if common_identifiers: [mapping[common_id].add(place) for common_id in common_identifiers] not_found_identifiers = external_identifiers - set(mapping.keys()) if not_found_identifiers: LOGGER.info( "Some identifiers were not found (%s/%s): %s", len(not_found_identifiers), len(external_identifiers), not_found_identifiers, ) return mapping