## Filename : static_analysis.py
## Author(s) : Michel Le Borgne
## Created : 02/2012
## Revision :
## Source :
##
## Copyright 2012 - 2020 IRISA/IRSET
##
## This library is free software; you can redistribute it and/or modify it
## under the terms of the GNU General Public License as published
## by the Free Software Foundation; either version 2.1 of the License, or
## any later version.
##
## This library is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY, WITHOUT EVEN THE IMPLIED WARRANTY OF
## MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. The software and
## documentation provided here under is on an "as is" basis, and IRISA has
## no obligations to provide maintenance, support, updates, enhancements
## or modifications.
## In no event shall IRISA be liable to any party for direct, indirect,
## special, incidental or consequential damages, including lost profits,
## arising out of the use of this software and its documentation, even if
## IRISA have been advised of the possibility of such damage. See
## the GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this library; if not, write to the Free Software Foundation,
## Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
##
## The original code contained here was initially developed by:
##
## Michel Le Borgne.
## IRISA/IRISA
## Symbiose team
## IRISA Campus de Beaulieu
## 35042 RENNES Cedex, FRANCE
##
##
## Contributor(s): Geoffroy Andrieux
##
"""
Classes for static analysis
"""
# Standard imports
from __future__ import print_function
import networkx as nx
from collections import OrderedDict, Counter
import json
# Custom imports
from antlr4 import FileStream, CommonTokenStream
from cadbiom.models.biosignal.translators.gt_visitors import compile_cond, compile_event
from cadbiom.models.guard_transitions.translators.cadlangLexer import cadlangLexer
from cadbiom.models.guard_transitions.translators.cadlangParser import cadlangParser
from cadbiom.models.guard_transitions.translators.chart_xml import MakeModelFromXmlFile
from cadbiom.models.guard_transitions.translators.chart_xml_pid import \
MakeModelFromPidFile
from cadbiom.models.guard_transitions.chart_model import ChartModel
from cadbiom.models.guard_transitions.analyser.ana_visitors import EstimExpVisitor, \
TableVisitor, FrontierVisitor, SigExpIdCollectVisitor
import cadbiom.commons as cm
LOGGER = cm.logger()
[docs]class StaticAnalyzer(object):
"""
Main class for static analysis
"""
def __init__(self, reporter):
self.reporter = reporter
self.model = None
self.influence_dict = dict()
[docs] def build_from_chart_model(self, chart_model):
"""
For technical purpose
"""
self.model = chart_model
[docs] def build_from_cadlang(self, file_name):
"""
Build a StaticAnalyser from a .cal file of PID database
@param file_name: str - path of .cal file
"""
crep = self.reporter
fstream = FileStream(file_name)
lexer = cadlangLexer(fstream)
lexer.set_error_reporter(crep)
parser = cadlangParser(CommonTokenStream(lexer))
parser.set_error_reporter(crep)
model = ChartModel(file_name)
parser.cad_model(model)
# chart model
self.model = parser.model
[docs] def build_from_chart_file(self, file_name):
"""
Build a StaticAnalyser from a .bcx file
@param file_name: str - path of the .bcx file
"""
parsing = MakeModelFromXmlFile(file_name)
# chart model
self.model = parsing.model
[docs] def build_from_pid_file(self, file_name, has_clock=True,
ai_interpretation=0):
"""
Build an StaticAnalyser from a .xml file of PID database
@param file_name: str - path of .xml file
"""
parser = MakeModelFromPidFile(file_name, self.reporter,
has_clock, ai_interpretation)
# chart model
self.model = parser.model
[docs] def get_frontier_node_names(self):
"""
returns a (list<string>) list of frontier place names
"""
fvi = FrontierVisitor()
self.model.accept(fvi)
return fvi.frontier
[docs] def get_gene_places(self):
"""Return gene places
.. note:: We assume that places representing a gene contain
the '_gene' string in the name
:return: List of places.
:rtype: <list <place>>
"""
return [place for pname, place in self.model.node_dict.iteritems()
if '_gene' in pname]
[docs] def get_stats_model_data(self):
"""Return info from the model
:return: Dictionary:
List of keys:
- model_name: Name of the model
- places: Number of places in the model (length of node_dict attr)
- transitions: Number of transitions in the model (length of transition_list)
:rtype: <dict>
"""
return {
'model_name': self.model.name,
'places': len(self.model.node_dict),
'transitions': len(self.model.transition_list)
}
[docs] def get_stats_model_structure_data(self):
"""Return occurences of various types of nodes in the model
.. note::
- Nb input nodes
- Nb frontier nodes (no incoming transitions)
- Nb terminal nodes (no outgoing transitions)
- Nb isolated nodes (frontier places in conditions/not used)
:return: Dict with occurences of input_nodes, frontier_nodes,
final_nodes, isolated_nodes.
:rtype: <dict>
"""
input_cpt = 0
front_cpt = 0
final_cpt = 0
isolated_cpt = 0
# Iterate on model nodes
for node_name, node in self.model.node_dict.iteritems():
# Check node status
nb_outgoing_trans = len(node.outgoing_trans)
nb_incoming_trans = len(node.incoming_trans)
if node.is_input():
input_cpt += 1
if nb_outgoing_trans == 0:
final_cpt += 1
if nb_incoming_trans == 0:
front_cpt += 1
if nb_outgoing_trans == 0 and nb_incoming_trans == 0:
isolated_cpt += 1
return {
'input_nodes': input_cpt,
'frontier_nodes': front_cpt,
'final_nodes': final_cpt,
'isolated_nodes': isolated_cpt,
}
[docs] def get_stats_entities_data(self):
"""Return occurences of various locations and entity types in the model
.. note:: Start nodes (with a name like __start__x) are handled even
with no JSON data.
They are counted in the other_types and other_locations fields.
:Example:
.. code-block:: python
static_analyser = StaticAnalyzer(Reporter())
static_analyser.build_from_chart_model(model)
locations, entity_types = \
static_analyser.get_stats_entities_data()
:return: locations, entity_types
.. note:: For v1 models, entity_types will be an empty dictionary.
:rtype: <Counter>, <Counter>
"""
if self.model.xml_namespace == 'http://cadbiom.genouest.org/v2/':
# Model type 2 => We use JSON data in each nodes
entity_types = Counter()
locations = Counter()
# Iterate on model nodes
for node_name, node in self.model.node_dict.iteritems():
try:
# Model type 2 => We use JSON data in each nodes
json_data = json.loads(node.note)
except ValueError as e:
if e.message == 'No JSON object could be decoded':
json_data = dict()
# Count occurences of types of entities
entity_types[json_data.get('entityType', 'other_types')] += 1
# Count occurences of locations
locations[json_data.get('location', 'other_locations')] += 1
# Specific detection of genes
if '_gene' in node_name:
locations['gene'] += 1
return locations, entity_types
else:
# Model type 1 => We are supposed to know locations before the load
location_names = (
'_gene', '_transMb', '_nucl', '_intToMb', '_cy', '_plasmaMb',
'_exCellRegion', '_ccJct', '_en', '_siteofdouble_strandbreak',
'_membraneraft', '_rna',
)
# Init counter of locations
locations = Counter()
# Iterate on model nodes
for node_name, node in self.model.node_dict.iteritems():
loc_found = False
for location_name in location_names:
if location_name in node_name:
if loc_found:
LOGGER.warning(
"get_statistics::Multiple locations: {}, {}".format(
node_name, location_name))
locations[location_name] += 1
loc_found = True
# Warning: this line also counts "virtual" "start nodes"
# like __start__0
if not loc_found:
locations['other_locations'] += 1
return locations, Counter()
[docs] def get_statistics(self, sfile=False):
"""Merge various information on the model and its nodes
.. note:: In models written in verson (without JSON extra-data),
we assume PID encoding to detect locations of entities.
.. warning:: Start nodes (__start__0) are counted here in
'other_locations' and 'other_types'.
:param arg1: Opened file (optional).
:type arg1: <open file>
:return: Information (status of nodes and their cellular locations).
:rtype: <str>
"""
model_data = self.get_stats_model_data()
model_structure_data = self.get_stats_model_structure_data()
locations, entity_types = self.get_stats_entities_data()
# Forge data header
data = OrderedDict({
"Model name": model_data['model_name'],
"Nb places/entities": model_data['places'],
"Nb transitions loaded": model_data['transitions'],
"Nb input nodes": model_structure_data['input_nodes'],
"Nb frontier nodes/boundaries (no incoming transitions)": \
model_structure_data['frontier_nodes'],
"Nb terminal nodes (no outgoing transitions)": \
model_structure_data['final_nodes'],
"Nb isolated nodes (frontier places in conditions/not used)": \
model_structure_data['isolated_nodes'],
})
# Hacky hacky separator
data["---- "] = "----"
# Forge data locations and entity types
for entity_type, occurences in entity_types.most_common():
data["Nb " + entity_type] = occurences
for location_name, occurences in locations.most_common():
data["Nb " + location_name] = occurences
# Forge text
text = "\n".join([k + ': ' + str(v) for k, v in data.iteritems()])
# Dump text if an opened file is given
if sfile != False:
sfile.write(text)
return text
[docs] def get_frontier_scc(self):
"""Find initial strongly connected components in the transition graph
.. warning:: If the model is modified but not reloaded, KeyError is
raised (see below); we inform the user of that...
:return: List of list of node names in frontier strongly connected components
:rtype: <list <list <str>>>
"""
# Get directed graph
directed_graph = self.make_transition_dg()
# generator on the strongly connectec components
scc_list = nx.strongly_connected_components(directed_graph)
frontier_list = []
tvi = TableVisitor(self.reporter)
self.model.accept(tvi)
for scc in scc_list:
# eliminate isolated frontier nodes
if len(scc) <= 1:
continue
frontier = True
# generate symbol table for partial evaluation
# all places in scc are inactivated (-1)
symbol_table = dict.fromkeys(scc, -1)
pe_visitor = EstimExpVisitor(symbol_table)
# find all incoming transitions for each node in scc
LOGGER.debug("scc: " + str(scc))
for node_name in scc:
try:
# raises keyerror if the model isn't reloaded manually..
chart_node = self.model.node_dict[node_name]
except KeyError:
return list([[
"Error: Please try by \
saving & reloading the model manually"]])
for in_trans in chart_node.incoming_trans:
LOGGER.debug(
"Node {}, trans: {}".format(
node_name,
(in_trans.ori.name, in_trans.ext.name)
)
)
# if the transition comes from outside scc,
# we try to detect if scc may be reached with this trans
# Note: this time we use the real symb_table with
# frontiers active; not a fake one with all places in scc
# disabled.
# TODO: detect if the in_transition comes from a frontier
# may be sufficient to detect if the SCC is reachable.
if in_trans.ori.name not in scc :
cond = self.__tr_estim(in_trans, pe_visitor,
tvi.tab_symb)
# a mon avis, ne renvoie jamais un resultat superieur a 0
# car tvi.tab_symb initialise toutes les places a 0
# et pe_visitor est initialise avec les places de la SCC a -1
# le 1 (True) semble impossible (sauf oposition de signe
# avec l'operateur not).
# LOGGER.warning(
# "scc validated as a cyle ?: " + str(cond)
# )
# Guard of the transition is True
# (not False or indeterminate),
# so scc may be reached from other part of the model
# => switch to next scc
if cond >= 0:
frontier = False
break
if frontier:
LOGGER.debug("frontier found")
frontier_list.append(scc)
LOGGER.debug("SCC found: " + str(frontier_list))
# Nothing found => return empty result
if len(frontier_list) == 0:
return [[]]
return frontier_list
[docs] def get_basal_activated_genes(self):
"""
Basal activated genes are genes which are activated even if all the places present
in activation condition, are inactivate
"""
tvi = TableVisitor(self.reporter)
self.model.accept(tvi)
lgene = self.get_gene_places()
lbag = []
for gep in lgene:
# If different: maybe the place is not a gene but contains _gene
# in its name...
if len(gep.outgoing_trans) != 1:
continue
pe_visitor = EstimExpVisitor(None) # all variables set to False
trans = gep.outgoing_trans[0] # assume only one transition
trans_val = self.__tr_estim(trans, pe_visitor, tvi.tab_symb)
if trans_val == 1:
lbag.append(gep.name)
lbag.sort()
return lbag
[docs] def get_why_basal_genes(self):
"""
More complete than get_basal_activated_genes. Returns a list of pairs (g,lpv) where
g is a basal activated gene and lpv are the principal variables (isolated_inhib., dominant_inhib,
essential_act, dominant activator)
"""
res = []
node_dict = self.model.node_dict
lbag = self.get_basal_activated_genes()
for gep in lbag:
gnode = node_dict[gep]
trans = gnode.outgoing_trans[0]
pva = self.get_tr_principal_variables(trans)
res.append((gep, pva))
# Nothing found => return empty result
if len(res) == 0:
return [[]]
return res
def __tr_estim(self, trans, e_visitor, tab_symb):
"""
Given a partial assignation of variables implemented in tab_symb, estimate if
the guard of a transition is True, False or indeterminate. The variables not present
in the symbol table are considered as indeterminates
If tab_symb is None, all variables are assigned the value False and the condition of the
guard is estimated alone.
"""
ev_estim = 1
cond_estim = 1
if trans.condition:
cond_sexpr = compile_cond(trans.condition, tab_symb, self.reporter)
cond_estim = cond_sexpr.accept(e_visitor)
if not e_visitor.symb_table:
return cond_estim
if trans.event:
ev_sexpr = compile_event(trans.event, tab_symb,
True, self.reporter)[0]
ev_estim = ev_sexpr.accept(e_visitor)
return min(ev_estim, cond_estim)
def __get_tr_node_variables(self, cond_sexpr, ev_sexpr):
"""
Get the node idents in the guard expression of
the transition condition and event
"""
lst1 = []
lst2 = []
lcl = []
icv = SigExpIdCollectVisitor()
if cond_sexpr:
# condition expressions contains only node ident
lst1 = cond_sexpr.accept(icv)
if ev_sexpr:
ll2 = ev_sexpr.accept(icv)
for ident in ll2:
if self.model.node_dict.has_key(ident): # state
lst2.append(ident)
else: # clock
lcl.append(ident)
lst = lst1 + lst2
lst.sort()
# eliminate doubles
if len(lst)>0:
nres = [lst[0]]
for i in range(len(lst)-1):
if lst[i+1] != lst[i]:
nres.append(lst[i+1])
else:
nres = []
if len(lcl)>0:
cres = [lcl[0]]
for i in range(len(lcl)-1):
if lcl[i+1] != lcl[i]:
cres.append(lcl[i+1])
else:
cres = []
return nres , cres
# for test
[docs] def test_get_tr_node_variables(self, trans):
"""
externalize a private method for test only
"""
tvi = TableVisitor(self.reporter)
self.model.accept(tvi)
if trans.condition:
cond_sexpr = compile_cond(trans.condition, tvi.tab_symb,
self.reporter)
else:
cond_sexpr = None
if trans.event:
ev_sexpr, see, free_clocks = compile_event(trans.event,
tvi.tab_symb,
True, self.reporter)
else:
ev_sexpr = None
lvar = self.__get_tr_node_variables(cond_sexpr, ev_sexpr)
return lvar
[docs] def get_tr_principal_variables(self, trans):
"""
Compute the essentiel, inhibitors, dominant inhibitors,
essential activators and dominant activators of a transition tr.
"""
ein = [] # essential inhibitors
din = [] # dominant inhibitors
eac = [] # essentiel activators
dac = [] # dominant activators
tvi = TableVisitor(self.reporter)
self.model.accept(tvi)
# compile guard expression
if trans.condition:
cond_sexpr = compile_cond(trans.condition, tvi.tab_symb,
self.reporter)
else:
cond_sexpr = None
if trans.event:
ev_sexpr, see, free_clocks = compile_event(trans.event,
tvi.tab_symb,
True, self.reporter)
else:
ev_sexpr = None
lvar , lcl = self.__get_tr_node_variables(cond_sexpr, ev_sexpr)
for ident in lvar:
# essential inhibitor (F => (guard=T))
assign_table = dict()
assign_table[ident] = -1
for clo in lcl:
assign_table[clo] = 1 # clocks are assigned to Present
pe_visitor = EstimExpVisitor(assign_table)
if cond_sexpr:
cpe = cond_sexpr.accept(pe_visitor)
else:
cpe = 1
if ev_sexpr:
epe = ev_sexpr.accept(pe_visitor)
else:
epe = 1
if min(cpe, epe) == 1:
ein.append(ident)
# dominant inhibitor (T => (guard=F))
assign_table = dict()
assign_table[ident] = 1
for clo in lcl:
assign_table[clo] = 1 # clocks are assigned to Present
pe_visitor = EstimExpVisitor(assign_table)
if cond_sexpr:
cpe = cond_sexpr.accept(pe_visitor)
else:
cpe = 1
if ev_sexpr:
epe = ev_sexpr.accept(pe_visitor)
else:
epe = 1
if min(cpe, epe) == -1:
din.append(ident)
# essential activator (F => (guard=F))
assign_table = dict()
assign_table[ident] = -1
for clo in lcl:
assign_table[clo] = 1 # clocks are assigned to Present
pe_visitor = EstimExpVisitor(assign_table)
if cond_sexpr:
cpe = cond_sexpr.accept(pe_visitor)
else:
cpe = 1
if ev_sexpr:
epe = ev_sexpr.accept(pe_visitor)
else:
epe = 1
if min(cpe, epe) == -1:
eac.append(ident)
# dominant activator (T => (guard=T))
assign_table = dict()
assign_table[ident] = 1
for clo in lcl:
assign_table[clo] = 1 # clocks are assigned to Present
pe_visitor = EstimExpVisitor(assign_table)
if cond_sexpr:
cpe = cond_sexpr.accept(pe_visitor)
else:
cpe = 1
if ev_sexpr:
epe = ev_sexpr.accept(pe_visitor)
else:
epe = 1
if min(cpe, epe) == 1:
dac.append(ident)
return [ein, din, eac, dac]
[docs] def make_transition_dg(self):
"""
Build the directed graph of transitions (flow graph)
@return: a Networkx digraph
"""
graph = nx.DiGraph()
edges = [(trans.ori.name, trans.ext.name)
for trans in self.model.transition_list]
graph.add_edges_from(edges)
graph.add_nodes_from(self.model.node_dict.keys())
return graph
[docs] def make_dependence_dg(self, weight = False):
"""
Build the directed graph of transitions and conditions (dependence graph)
if weight : dependencies from transitions have a weight of 2, dependencies from conditions have a weight of 1
else : all dependencies have a weight of 1
@return: a Networkx digraph
"""
graph = nx.DiGraph()
for trans in self.model.transition_list :
ori_name = trans.ori.name
ext_name = trans.ext.name
lno , lcl = self.test_get_tr_node_variables(trans)
for cond in lno :
graph.add_edges_from([(cond, ext_name)], weight=1)
if weight :
graph.add_edges_from([(ori_name, ext_name)], weight=2)
else :
graph.add_edges_from([(ori_name, ext_name)], weight=1)
return graph
[docs] def make_full_dependence_dg(self, weight = False):
"""
Build the directed graph of transitions and conditions (dependence graph)
Conditions also influence input
@return: a Networkx digraph
"""
graph = nx.DiGraph()
for trans in self.model.transition_list :
ori_name = trans.ori.name
ext_name = trans.ext.name
lno , lcl = self.test_get_tr_node_variables(trans)
for cond in lno :
graph.add_edges_from([(cond, ext_name)], weight=1)
# conditions influence input
graph.add_edges_from([(cond, ori_name)], weight=1)
if weight :
graph.add_edges_from([(ori_name, ext_name)], weight=2)
else :
graph.add_edges_from([(ori_name, ext_name)], weight=1)
return graph
# def make_full_dependence_weighted_dg(self):
# """
# Build the directed graph of transitions and conditions
# (dependence graph)
# Dependences from transitions have a weight of 2
# Dependences from conditions have a weight of 1
# Conditions also influence input
# @return: a Networkx digraph
# """
# G=nx.DiGraph()
# for trans in self.model.transition_list :
# ori_name = trans.ori.name
# ext_name = trans.ext.name
#
# ln , lh = self.test_get_tr_node_variables(trans)
# for cond in ln :
# G.add_edges_from([(cond,ext_name)], weight=1)
# # conditions influence input
# G.add_edges_from([(cond,ori_name)], weight=1)
#
# G.add_edges_from([(ori_name,ext_name)], weight=2)
# return G
# def condition_count(self):
# cond_dict=dict()
# for trans in self.model.transition_list :
# if trans.condition != '':
# cond_name_list = get_property_places(trans.condition)
# for cond in cond_name_list :
# if cond_dict.has_key(cond) :
# cond_dict[cond]+=1
# else :
# cond_dict[cond]=1
# cond_items = cond_dict.items()
# cond_items.sort(cmpval)
#
# cond_moy=0.0
# for cond_couple in cond_items :
# cond_moy+=cond_couple[1]
# print 'cond moy : ',cond_moy/len(cond_dict.keys())
# print 'cond median : ',cond_items[len(cond_dict.keys())/2][1]
# print 'cond count : ',cond_items[:10]
# return cond_items
[docs] def get_predecessors(self, dgraph, node, rank=None):
"""
return all node predecessors if rank=None, predecessors under the rank else
@return: a list of str predecessor name
"""
if not rank :
pred_list = dgraph.predecessors(node)
pred_list.append(node)
for pred in pred_list:
sec_pred_list = dgraph.predecessors(pred)
for sec_p in sec_pred_list:
if sec_p not in pred_list:
pred_list.append(sec_p)
# print len(pred_list), pred_list
else :
self.ranked_list = []
pred_list = [node]
already_visited = []
for i in range(rank):
new_pred_list = []
for node in pred_list:
if node not in already_visited:
new_pred_list.extend(dgraph.predecessors(node))
already_visited.append(node)
self.ranked_list.append(new_pred_list)
pred_list.extend(new_pred_list)
# print i, len(list(set(new_pred_list)))
# print i,len(new_pred_list),new_pred_list
return list(set(pred_list))
[docs] def get_successors(self, dgraph, node, rank=None):
"""
return all node successors if rank=None, successors under the rank else
@return: a list of str successor name
"""
if not rank :
suc_list = dgraph.successors(node)
suc_list.append(node)
for suc in suc_list:
sec_suc_list = dgraph.successors(suc)
for sec_s in sec_suc_list:
if sec_s not in suc_list:
suc_list.append(sec_s)
# print len(pred_list), pred_list
else :
self.ranked_list = []
suc_list = [node]
already_visited = []
for i in range(rank):
new_suc_list = []
for node in suc_list:
if node not in already_visited:
new_suc_list.extend(dgraph.successors(node))
already_visited.append(node)
self.ranked_list.append(new_suc_list)
suc_list.extend(new_suc_list)
# print i,len(new_pred_list),new_pred_list
return list(set(suc_list))
[docs] def get_local_neighbors(self, dgraph, node):
"""
return node direct neighbors (successors and predecessors) in a same list
@return: a list of str neighbor name
"""
neighbor_list = []
neighbor_list.extend(dgraph.predecessors(node))
neighbor_list.extend(dgraph.successors(node))
return list(set(neighbor_list))
# def get_list_neighbors(self, dgraph, node_list):
# neighbor_list = []
# for node in node_list :
# neighbor_list.extend(self.get_node_neighbors(dgraph, node))
# final_list = []
# final_list.extend(neighbor_list)
# final_list.extend(node_list)
# return list(set(final_list))
# def change_node_att(self, g, node_list, att, value):
# for node in node_list:
# g.node[node].size = value
# return g
def get_propagated_places(self, frontier_places, property_places):
final_list = []
already_visited = []
condition = []
new_frontier = frontier_places
while len(new_frontier) != 0 :
save_frontier = new_frontier
new_frontier = []
for place in save_frontier :
node = self.model.node_dict[place]
for trans in node.outgoing_trans :
ext_place = trans.ext.name
if ext_place not in already_visited :
already_visited.append(ext_place)
new_frontier.append(ext_place)
lno , lcl = self.test_get_tr_node_variables(trans)
condition.extend(lno)
final_list.extend(new_frontier)
new_frontier = property_places
while len(new_frontier) != 0 :
save_frontier = new_frontier
new_frontier = []
for place in save_frontier :
node = self.model.node_dict[place]
for trans in node.incoming_trans :
ori_place = trans.ori.name
if ori_place not in already_visited :
already_visited.append(ori_place)
new_frontier.append(ori_place)
lno , lcl = self.test_get_tr_node_variables(trans)
condition.extend(lno)
final_list.extend(new_frontier)
new_frontier = property_places
while len(new_frontier) != 0 :
save_frontier = new_frontier
new_frontier = []
for place in save_frontier :
node = self.model.node_dict[place]
for trans in node.outgoing_trans :
ext_place = trans.ext.name
if ext_place not in already_visited :
already_visited.append(ext_place)
new_frontier.append(ext_place)
lno , lcl = self.test_get_tr_node_variables(trans)
condition.extend(lno)
final_list.extend(new_frontier)
tr_graph = self.make_transition_dg()
to_check_for_perm = []
to_check_for_perm.extend(final_list)
to_check_for_perm.extend(condition)
to_check_for_perm.extend(property_places)
to_check_for_perm = list(set(to_check_for_perm))
perm_list = []
for place in to_check_for_perm :
predecessors = self.get_predecessors(tr_graph, place, None)
for pred in predecessors :
if self.model.node_dict[pred].is_perm():
if pred not in perm_list :
perm_list.append(pred)
for place in perm_list :
successors = self.get_successors(tr_graph, place, None)
final_list.extend(successors)
final_list = list(set(final_list))
print(len(final_list))
return final_list
def get_frontier_successors(self, tr_graph, frontier_places, property_places, perm = True):
final_list = []
to_check = frontier_places
#Add permanent nodes
# if perm :
# for node in self.model.node_dict.keys():
# if self.model.node_dict[node].is_perm():
# to_check.append(node)
final_list.extend(frontier_places)
final_list.extend(to_check)
for place in to_check :
successors = self.get_successors(tr_graph, place, None)
final_list.extend(successors)
final_list = list(set(final_list))
perm_list = []
important_places = []
act_graph = self.make_dependence_dg()
for place in to_check :
successors = self.get_successors(act_graph, place, None)
important_places.extend(successors)
important_places.extend(property_places)
important_places = list(set(important_places))
for place in important_places :
predecessors = self.get_predecessors(tr_graph, place, None)
for pred in predecessors :
if self.model.node_dict[pred].is_perm():
perm_list.append(pred)
perm_list = list(set(perm_list))
for place in perm_list :
successors = self.get_successors(tr_graph, place, None)
final_list.extend(successors)
print(len(list(set(final_list))))
print(list(set(final_list)))
return list(set(final_list))
[docs] def remove_nodes_not_in_list(self, graph, node_list):
"""
@return: a Networkx graph (or digraph) without nodes NOT in the input list
"""
to_remove = graph.nodes()
for node in to_remove :
if node not in node_list:
graph.remove_node(node)
return graph
[docs] def remove_nodes_in_list(self, graph, node_list):
"""
@return: a Networkx graph (or digraph) without nodes in the input list
"""
to_remove = graph.nodes()
for node in to_remove :
if node in node_list:
graph.remove_node(node)
return graph
[docs] def in_degree_cut_off(self, dgraph, degree):
"""
@return: a Networkx digraph without nodes with input degree upper than degree parameter
"""
in_degree = dgraph.in_degree()
node_list = dgraph.nodes()
for node in node_list :
if in_degree[node] > degree :
dgraph.remove_node(node)
return dgraph
[docs] def out_degree_cut_off(self, dgraph, degree):
"""
@return: a Networkx digraph without nodes with output degree upper than degree parameter
"""
out_degree = dgraph.out_degree()
node_list = dgraph.nodes()
for node in node_list :
if out_degree[node] > degree :
dgraph.remove_node(node)
return dgraph
[docs] def export_2_graphml(self, graph, graphml_file):
"""
As it says
@param graph: the graph to export
@param graphml_file: filename
"""
nx.write_graphml(graph, graphml_file)
[docs] def export_2_dot(self, graph, dot_file):
"""
As it says
@param graph: the graph to export
@param dot_file: filename
"""
nx.write_dot(graph, dot_file)
[docs]def stat_on_graph(graph):
"""
???
"""
print('number of nodes : ', graph.order())
print('number of edges : ', graph.number_of_edges())
print('number of connected components : ', nx.number_connected_components(graph))
if nx.is_connected(graph):
print('diameter : ', nx.diameter(graph)) # max of shortest length
print('average path length : ', nx.average_shortest_path_length(graph))
print('density : ', nx.density(graph))
print('clustering : ', nx.clustering(graph))
degree = graph.degree().items()
degree.sort(cmpval)
degree_moy = 0.0
for degree_couple in degree :
degree_moy += degree_couple[1]
print('degree moy : ', degree_moy / graph.order())
print('degree median : ', degree[len(degree) / 2][1])
print('degree : ', degree[:10])
[docs]def stat_on_dgraph(dgraph):
"""
???
"""
print('number of nodes : ', dgraph.order())
print('number of edges : ', dgraph.number_of_edges())
nscc = nx.number_strongly_connected_components(dgraph)
print('number of strongly connected components : ', nscc)
in_degree = dgraph.in_degree().items()
in_degree.sort(cmpval)
print('in degree : ', in_degree[:10])
out_degree = dgraph.out_degree().items()
out_degree.sort(cmpval)
print('out degree : ', out_degree[:10])
[docs]def view_graph(graph):
"""
draw interaction graph with matplotlib
"""
import matplotlib.pyplot as plt
pos = nx.spring_layout(graph, iterations=5)
low = []
medium = []
high = []
hyper_high = []
for node in graph:
neighbors = graph.neighbors(node)
if len(neighbors) <= 2:
low.append(node)
elif len(neighbors) > 2 and len(neighbors) <= 5:
medium.append(node)
elif len(neighbors) > 5 and len(neighbors) <= 10:
high.append(node)
else:
hyper_high.append(node)
nx.draw_networkx_nodes(graph, pos, nodelist=low, node_color='blue')
nx.draw_networkx_nodes(graph, pos, nodelist=medium,
node_color='deepskyblue')
nx.draw_networkx_nodes(graph, pos, nodelist=high, node_color='hotpink')
nx.draw_networkx_nodes(graph, pos, nodelist=hyper_high,
node_color='crimson')
nx.draw_networkx_edges(graph, pos)
# nx.draw_networkx_labels(graph,pos)
plt.show()
[docs]def view_weighted_graph(graph, rank):
"""
???
"""
import matplotlib.pyplot as plt
pos = nx.spring_layout(graph, iterations=10)
edges = graph.edges(data=True)
elarge = [(u, v) for (u, v, d) in edges if d['weight'] > 1]
esmall = [(u, v) for (u, v, d) in edges if d['weight'] <= 1]
# nodes
nx.draw_networkx_nodes(graph, pos, node_size=100, nodelist=rank[0],
node_color='crimson')
nx.draw_networkx_nodes(graph, pos, node_size=75, nodelist=rank[1],
node_color='hotpink')
nx.draw_networkx_nodes(graph, pos, node_size=50, nodelist=rank[2],
node_color='deepskyblue')
nx.draw_networkx_nodes(graph, pos, node_size=25, nodelist=rank[3],
node_color='blue')
# edges
nx.draw_networkx_edges(graph, pos, edgelist=elarge,
width=0.5, alpha=0.5)
nx.draw_networkx_edges(graph, pos, edgelist=esmall,
width=0.5, alpha=0.5,
edge_color='dodgerblue', style='dashed')
# labels
nx.draw_networkx_labels(graph, pos, font_size=10, font_family='sans-serif')
plt.axis('off')
# plt.savefig("weighted_graph.png") # save as png
plt.show() # display
[docs]def cmpval(xxx, yyy):
"""
utility
"""
if xxx[1] > yyy[1]:
return -1
elif xxx[1] == yyy[1]:
return 0
else:
return 1