# -*- coding: utf-8 -*-
# Copyright (C) 2017-2020 IRISA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# The original code contained here was initially developed by:
#
# Pierre Vignet.
# IRISA
# Dyliss team
# IRISA Campus de Beaulieu
# 35042 RENNES Cedex, FRANCE
"""
This module groups functions directly related to the parsing and the management
of the files generated by the solver of Cadbiom.
Here we find high-level functions to parse or clean *mac* files, and extract
all their data to a JSON format, a data interchange format that is humanly
readable and useful in programming.
Generic functions
~~~~~~~~~~~~~~~~~
* :meth:`~cadbiom_cmd.tools.solutions.get_query_from_filename`
Handle \*mac_complete.txt files
-------------------------------
* :meth:`~cadbiom_cmd.tools.solutions.load_solutions`
* :meth:`~cadbiom_cmd.tools.solutions.convert_solutions_to_json`
Handle \*mac\* files
--------------------
* :meth:`~cadbiom_cmd.tools.solutions.get_solutions`
* :meth:`~cadbiom_cmd.tools.solutions.get_mac_lines`
* :meth:`~cadbiom_cmd.tools.solutions.get_all_macs`
"""
#from __future__ import unicode_literals
from __future__ import print_function
# Standard imports
import os
import glob
from collections import defaultdict, Counter
# Library imports
import cadbiom.commons as cm
LOGGER = cm.logger()
## Generic #####################################################################
[docs]def get_query_from_filename(model_file, solution_file):
"""Return the query string according to the given model and solution filenames
:Example:
.. code-block:: python
>>> get_query_from_filename(
... "/path/model.bcx",
... "/another_path/model_ENTITY_and_not_ENTITY_mac_complete.txt"
... )
"ENTITY_and_not_ENTITY"
:param model_file: Path of a bcx model.
:param solution_file: Path of a solution file (\*mac\* file).
:type model_file: <str>
:type solution_file: <str>
"""
model_filename = os.path.basename(os.path.splitext(model_file)[0])
solution_filename = os.path.basename(os.path.splitext(solution_file)[0])
# Remove model_filename from solution_filename
solution_filename = solution_filename.replace(model_filename + '_', '')
# Remove suffix from solution_filename
if '_mac_complete' in solution_filename:
# remove _mac_complete
return solution_filename[:-13]
elif '_mac' in solution_filename:
# remove _mac
return solution_filename[:-4]
## Handle *mac_complete.txt files ##############################################
[docs]def load_solutions(file):
"""Open a file with many solution/MACs (\*mac_complete.txt files)
and yield them.
:Example:
.. code-block:: python
>>> solutions = load_solutions('./solution_mac_complete.txt')
>>> print([solution for solution in solutions])
("Ax Bx", [['h2', 'h00'], ['h3'], ['h0', 'h1'], ['hlast']])
:param: File name
:type: <str>
:return: A generator of tuples of "frontier places" and a list of
events in each step.
:Example:
.. code-block:: python
("Ax Bx", [['h2', 'h00'], ['h3'], ['h0', 'h1'], ['hlast']])
:rtype: <tuple <str>, <list>>
"""
sol_steps = defaultdict(list)
sol = ""
with open(file, 'r') as f_d:
for line in f_d:
#LOGGER.debug("Load_solutions :: line: " + line)
# Remove possible \t separator from first line (frontier solution)
line = line.rstrip('\n').rstrip('\t').replace('\t', ' ')
# TODO: remove last space ' ' => beware, may be informative...
# (start transitions have no event names: ori="__start__0")
line = line.rstrip(' ')
if line == '' or line[0] == '=':
# Blank line
# Skip blank lines and solution separator in some files (=====)
continue
elif line[0] != '%':
if sol == line:
# TODO: why this condition ?
# => multiple lines with the same solution ?
# Same frontier places
yield sol, sol_steps[sol]
# reinit sol
sol_steps[sol] = list()
continue
elif sol == '':
# First sol
sol = line
else:
# Yield previous sol
yield sol, sol_steps[sol]
sol = line
elif line[0] == '%':
# Remove step with only "% "
step = line.lstrip('% ')
if step != '':
sol_steps[sol].append(step.split(' '))
# Yield last sol
yield sol, sol_steps[sol]
[docs]def convert_solutions_to_json(sol_steps, transitions, conditions=True):
"""Convert all events for all solutions in a complete MAC file
and write them in a separate file in the JSON format.
This is a function to quickly search all transition attributes involved
in a solution.
:Example:
.. code-block:: python
>>> from tools.models import get_transitions
>>> # Get transitions from the model
>>> model_transitions = get_transitions('model.bcx')
>>> decomp_solutions = convert_solutions_to_json(
... load_solutions('./solution_mac_complete.txt'),
... model_transitions,
... conditions=True,
... )
>>> print(decomp_solutions)
[{
"solution": "Ax Bx",
"steps": [
[{
"event": "_h_2",
"transitions": [{
"ext": "n3",
"ori": "Bx"
}]
}],
]
}]
:param arg1: List of steps involved in a solution. See load_solutions().
A tuple of "frontier places" and a list of events in each step.
``("Bx Ax", [['h2', 'h00'], ['h3'], ['h0', 'h1'], ['hlast']])``
:param arg2: A dictionnary of events as keys, and transitions as values.
Since many transitions can define an event, values are lists.
Each transition is a tuple with: origin node, final node, attributes
like label and condition.
``{'h00': [('Ax', 'n1', {'label': 'h00[]'}),]``
See get_transitions().
:param arg3: (Optional) Integrate in the final file,
the conditions for each transition.
:type arg1: <list>
:type arg2: <dict <list <tuple <str>, <str>, <dict <str>: <str>>>>
:type arg3: <bool>
:return: Return the JSON data for the given steps.
Example:
.. code-block:: javascript
[{
"solution": "Ax Bx",
"steps": [
[{
"event": "_h_2",
"transitions": [{
"ext": "n3",
"ori": "Bx"
}]
}],
]
}]
:rtype: <list>
"""
def get_transition_def(step_event):
"""Dump each transition in the given event to a list of dictionnaries.
.. note:: ori="JUN_nucl_gene" ext="JUN_nucl" event="_h_391"
:return: A list of dictionaries
(1 dict for 1 transition in the given event)
:rtype: <list <dict>>
"""
# Many transitions per event (ex: complex dissociation)
decomp_transitions = list()
for trans in step_event:
decomp_transition = {
#"event": trans[2]['label'].split('[')[0],
"ori": trans[0],
"ext": trans[1],
}
# If condition boolean is set (by default),
# we add the event's transition to the json data.
if conditions:
decomp_transition["condition"] = trans[2]['condition']
decomp_transitions.append(decomp_transition)
return decomp_transitions
# sol_steps structure:
# ("Bx Ax", [[u'h2', u'h00'], [u'h3'], [u'h0', u'h1'], [u'hlast']])
decomp_solutions = list()
for sol, steps in sol_steps:
# Decompile steps in each solution
decomp_steps = list()
for step in steps:
# Decompile events in each step
decomp_events = list()
for event in step:
# Decompile transitions in each event
decomp_event = dict()
# Get transitions for the given event
# Structure of transitions:
# {u'h00': [('Ax', 'n1', {u'label': u'h00[]'}),]
step_event = transitions.get(event, None)
if not step_event:
LOGGER.error(
"convert_solutions_to_json:: event not found in the "
"transitions of the model: %s",
event
)
LOGGER.error(
"convert_solutions_to_json:: transitions: %s",
transitions
)
decomp_event['event'] = "ERROR, no transition"
else:
# Get list of transitions
decomp_event['event'] = event
decomp_event['transitions'] = get_transition_def(step_event)
# Add event and its transitions
decomp_events.append(decomp_event)
# Add step and its events
decomp_steps.append(decomp_events)
# Add solution and its steps
solution = {
"solution": sol,
"steps": decomp_steps,
}
decomp_solutions.append(solution)
return decomp_solutions
## Handle *mac* files ##########################################################
[docs]def get_solutions(file_descriptor):
"""Generator of solution lines and corresponding stripped lines for
\*mac\* file.
.. note:: This function does not return events! It is just original lines and
cleaned lines containing solutions (i.e sets of frontier
places/boundaries).
We remove the last ``'\\n'`` and ``'\\t'``.
Tabs in the middle are replaced by one space ``' '``.
:param: Opened file.
:type: <file>
:return: A generator of tuples; each tuple contains the original line,
and the cleaned line.
:Example:
For an original line: ``'Z\\tY\\tX\\n'``
.. code-block:: python
('Z\\tY\\tX', 'X Y Z')
:rtype: <tuple <str>, <str>>
"""
for line in file_descriptor:
# Remove possible \t separator from first line (frontier solution)
line = line.rstrip('\n')
stripped_line = line.rstrip('\t').replace('\t', ' ')
# Next Line if empty
if stripped_line == '':
continue
# Remove events or other lines
if stripped_line[0] not in ('%', '=', ' '):
# print(stripped_line)
# Sort in lower case, remove ' ' empty elements
yield line, stripped_line
[docs]def get_mac_lines(filepath):
"""Returns only a set of MAC LINES from A file.
This function is based on :meth:`~cadbiom_cmd.tools.solutions.get_solutions`
that returns mac lines and stripped mac lines.
.. note:: You would prefer to use :meth:`~cadbiom_cmd.tools.solutions.get_all_macs`
which:
- Can handle a directory path and return all macs in it,
- Can handle a simple file,
- Do some verifications on all parsed macs.
.. note:: We assume that at this point, all MAC lines are sorted in
alphabetical order.
.. note:: We return LINES not a set of places.
:Example:
.. code-block:: python
{'Cx Dx', 'Ax Bx'}
:param: Filepath to be opened and in which solutions will be returned.
:type: <str>
:return: Set of MAC/CAM from the given file.
:rtype: <set <str>>
"""
# Return mac lines
with open(filepath, 'r') as f_d:
return {stripped_line for _, stripped_line in get_solutions(f_d)}
[docs]def get_all_macs(path):
"""Return a set of all MAC LINES from a directory or from a file.
This function is based on :meth:`~cadbiom_cmd.tools.solutions.get_solutions`
that returns mac lines and stripped mac lines, and
:meth:`~cadbiom_cmd.tools.solutions.get_mac_lines` that returns only mac
lines from a file.
.. note:: Alternatively we do some verifications here:
- Detection of duplicated MACS (AssertionError raised)
- Print number of MACS per file
- Print duplicated MACS
- Print number of MACS
:param: Filepath to be opened and in which solutions will be returned.
:type: <str>
:return: Set of MAC/CAM from the given path.
:rtype: <frozenset <str>>
"""
# Put all macs in a list, not a set
# => allow us to detect duplicated macs among all mac files
total_macs = list()
file_number = 0
if os.path.isfile(path):
# Get set of mac lines
total_macs = get_mac_lines(path)
file_number = 1
elif os.path.isdir(path):
# Get set of macs for each file
for file_number, file in enumerate(glob.glob(path + '*mac.txt'), 1):
temp_macs = get_mac_lines(file)
total_macs += list(temp_macs)
# Print the number of macs for the given file
LOGGER.info("File %s: MACS: %s", file, len(temp_macs))
LOGGER.info("Files processed: %s", file_number)
assert file_number != 0, "No *mac.txt files found!"
# Check of duplicated macs (number > 1)
unique_macs = frozenset(total_macs)
duplicated_macs = {(k, v) for k, v in Counter(total_macs).items() if v != 1}
if duplicated_macs:
LOGGER.info("<%s> Duplicated MACS: %s", path, duplicated_macs)
LOGGER.info("<%s> Number of MACS loaded: %s", path, len(total_macs))
LOGGER.info("<%s> Number of unique MACS returned: %s", path, len(unique_macs))
# assert len(total_macs) == len(unique_macs)
return unique_macs
## Handle *.json files #########################################################