Source code for cadbiom_cmd.tools.solutions

# -*- coding: utf-8 -*-
# Copyright (C) 2017-2020  IRISA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# The original code contained here was initially developed by:
#
#     Pierre Vignet.
#     IRISA
#     Dyliss team
#     IRISA Campus de Beaulieu
#     35042 RENNES Cedex, FRANCE
"""
This module groups functions directly related to the parsing and the management
of the files generated by the solver of Cadbiom.

Here we find high-level functions to parse or clean *mac* files, and extract
all their data to a JSON format, a data interchange format that is humanly
readable and useful in programming.

Generic functions
~~~~~~~~~~~~~~~~~
* :meth:`~cadbiom_cmd.tools.solutions.get_query_from_filename`

Handle \*mac_complete.txt files
-------------------------------
* :meth:`~cadbiom_cmd.tools.solutions.load_solutions`
* :meth:`~cadbiom_cmd.tools.solutions.convert_solutions_to_json`

Handle \*mac\* files
--------------------
* :meth:`~cadbiom_cmd.tools.solutions.get_solutions`
* :meth:`~cadbiom_cmd.tools.solutions.get_mac_lines`
* :meth:`~cadbiom_cmd.tools.solutions.get_all_macs`

"""
#from __future__ import unicode_literals
from __future__ import print_function

# Standard imports
import os
import glob
from collections import defaultdict, Counter

# Library imports
import cadbiom.commons as cm

LOGGER = cm.logger()


## Generic #####################################################################

[docs]def get_query_from_filename(model_file, solution_file): """Return the query string according to the given model and solution filenames :Example: .. code-block:: python >>> get_query_from_filename( ... "/path/model.bcx", ... "/another_path/model_ENTITY_and_not_ENTITY_mac_complete.txt" ... ) "ENTITY_and_not_ENTITY" :param model_file: Path of a bcx model. :param solution_file: Path of a solution file (\*mac\* file). :type model_file: <str> :type solution_file: <str> """ model_filename = os.path.basename(os.path.splitext(model_file)[0]) solution_filename = os.path.basename(os.path.splitext(solution_file)[0]) # Remove model_filename from solution_filename solution_filename = solution_filename.replace(model_filename + '_', '') # Remove suffix from solution_filename if '_mac_complete' in solution_filename: # remove _mac_complete return solution_filename[:-13] elif '_mac' in solution_filename: # remove _mac return solution_filename[:-4]
## Handle *mac_complete.txt files ##############################################
[docs]def load_solutions(file): """Open a file with many solution/MACs (\*mac_complete.txt files) and yield them. :Example: .. code-block:: python >>> solutions = load_solutions('./solution_mac_complete.txt') >>> print([solution for solution in solutions]) ("Ax Bx", [['h2', 'h00'], ['h3'], ['h0', 'h1'], ['hlast']]) :param: File name :type: <str> :return: A generator of tuples of "frontier places" and a list of events in each step. :Example: .. code-block:: python ("Ax Bx", [['h2', 'h00'], ['h3'], ['h0', 'h1'], ['hlast']]) :rtype: <tuple <str>, <list>> """ sol_steps = defaultdict(list) sol = "" with open(file, 'r') as f_d: for line in f_d: #LOGGER.debug("Load_solutions :: line: " + line) # Remove possible \t separator from first line (frontier solution) line = line.rstrip('\n').rstrip('\t').replace('\t', ' ') # TODO: remove last space ' ' => beware, may be informative... # (start transitions have no event names: ori="__start__0") line = line.rstrip(' ') if line == '' or line[0] == '=': # Blank line # Skip blank lines and solution separator in some files (=====) continue elif line[0] != '%': if sol == line: # TODO: why this condition ? # => multiple lines with the same solution ? # Same frontier places yield sol, sol_steps[sol] # reinit sol sol_steps[sol] = list() continue elif sol == '': # First sol sol = line else: # Yield previous sol yield sol, sol_steps[sol] sol = line elif line[0] == '%': # Remove step with only "% " step = line.lstrip('% ') if step != '': sol_steps[sol].append(step.split(' ')) # Yield last sol yield sol, sol_steps[sol]
[docs]def convert_solutions_to_json(sol_steps, transitions, conditions=True): """Convert all events for all solutions in a complete MAC file and write them in a separate file in the JSON format. This is a function to quickly search all transition attributes involved in a solution. :Example: .. code-block:: python >>> from tools.models import get_transitions >>> # Get transitions from the model >>> model_transitions = get_transitions('model.bcx') >>> decomp_solutions = convert_solutions_to_json( ... load_solutions('./solution_mac_complete.txt'), ... model_transitions, ... conditions=True, ... ) >>> print(decomp_solutions) [{ "solution": "Ax Bx", "steps": [ [{ "event": "_h_2", "transitions": [{ "ext": "n3", "ori": "Bx" }] }], ] }] :param arg1: List of steps involved in a solution. See load_solutions(). A tuple of "frontier places" and a list of events in each step. ``("Bx Ax", [['h2', 'h00'], ['h3'], ['h0', 'h1'], ['hlast']])`` :param arg2: A dictionnary of events as keys, and transitions as values. Since many transitions can define an event, values are lists. Each transition is a tuple with: origin node, final node, attributes like label and condition. ``{'h00': [('Ax', 'n1', {'label': 'h00[]'}),]`` See get_transitions(). :param arg3: (Optional) Integrate in the final file, the conditions for each transition. :type arg1: <list> :type arg2: <dict <list <tuple <str>, <str>, <dict <str>: <str>>>> :type arg3: <bool> :return: Return the JSON data for the given steps. Example: .. code-block:: javascript [{ "solution": "Ax Bx", "steps": [ [{ "event": "_h_2", "transitions": [{ "ext": "n3", "ori": "Bx" }] }], ] }] :rtype: <list> """ def get_transition_def(step_event): """Dump each transition in the given event to a list of dictionnaries. .. note:: ori="JUN_nucl_gene" ext="JUN_nucl" event="_h_391" :return: A list of dictionaries (1 dict for 1 transition in the given event) :rtype: <list <dict>> """ # Many transitions per event (ex: complex dissociation) decomp_transitions = list() for trans in step_event: decomp_transition = { #"event": trans[2]['label'].split('[')[0], "ori": trans[0], "ext": trans[1], } # If condition boolean is set (by default), # we add the event's transition to the json data. if conditions: decomp_transition["condition"] = trans[2]['condition'] decomp_transitions.append(decomp_transition) return decomp_transitions # sol_steps structure: # ("Bx Ax", [[u'h2', u'h00'], [u'h3'], [u'h0', u'h1'], [u'hlast']]) decomp_solutions = list() for sol, steps in sol_steps: # Decompile steps in each solution decomp_steps = list() for step in steps: # Decompile events in each step decomp_events = list() for event in step: # Decompile transitions in each event decomp_event = dict() # Get transitions for the given event # Structure of transitions: # {u'h00': [('Ax', 'n1', {u'label': u'h00[]'}),] step_event = transitions.get(event, None) if not step_event: LOGGER.error( "convert_solutions_to_json:: event not found in the " "transitions of the model: %s", event ) LOGGER.error( "convert_solutions_to_json:: transitions: %s", transitions ) decomp_event['event'] = "ERROR, no transition" else: # Get list of transitions decomp_event['event'] = event decomp_event['transitions'] = get_transition_def(step_event) # Add event and its transitions decomp_events.append(decomp_event) # Add step and its events decomp_steps.append(decomp_events) # Add solution and its steps solution = { "solution": sol, "steps": decomp_steps, } decomp_solutions.append(solution) return decomp_solutions
## Handle *mac* files ##########################################################
[docs]def get_solutions(file_descriptor): """Generator of solution lines and corresponding stripped lines for \*mac\* file. .. note:: This function does not return events! It is just original lines and cleaned lines containing solutions (i.e sets of frontier places/boundaries). We remove the last ``'\\n'`` and ``'\\t'``. Tabs in the middle are replaced by one space ``' '``. :param: Opened file. :type: <file> :return: A generator of tuples; each tuple contains the original line, and the cleaned line. :Example: For an original line: ``'Z\\tY\\tX\\n'`` .. code-block:: python ('Z\\tY\\tX', 'X Y Z') :rtype: <tuple <str>, <str>> """ for line in file_descriptor: # Remove possible \t separator from first line (frontier solution) line = line.rstrip('\n') stripped_line = line.rstrip('\t').replace('\t', ' ') # Next Line if empty if stripped_line == '': continue # Remove events or other lines if stripped_line[0] not in ('%', '=', ' '): # print(stripped_line) # Sort in lower case, remove ' ' empty elements yield line, stripped_line
[docs]def get_mac_lines(filepath): """Returns only a set of MAC LINES from A file. This function is based on :meth:`~cadbiom_cmd.tools.solutions.get_solutions` that returns mac lines and stripped mac lines. .. note:: You would prefer to use :meth:`~cadbiom_cmd.tools.solutions.get_all_macs` which: - Can handle a directory path and return all macs in it, - Can handle a simple file, - Do some verifications on all parsed macs. .. note:: We assume that at this point, all MAC lines are sorted in alphabetical order. .. note:: We return LINES not a set of places. :Example: .. code-block:: python {'Cx Dx', 'Ax Bx'} :param: Filepath to be opened and in which solutions will be returned. :type: <str> :return: Set of MAC/CAM from the given file. :rtype: <set <str>> """ # Return mac lines with open(filepath, 'r') as f_d: return {stripped_line for _, stripped_line in get_solutions(f_d)}
[docs]def get_all_macs(path): """Return a set of all MAC LINES from a directory or from a file. This function is based on :meth:`~cadbiom_cmd.tools.solutions.get_solutions` that returns mac lines and stripped mac lines, and :meth:`~cadbiom_cmd.tools.solutions.get_mac_lines` that returns only mac lines from a file. .. note:: Alternatively we do some verifications here: - Detection of duplicated MACS (AssertionError raised) - Print number of MACS per file - Print duplicated MACS - Print number of MACS :param: Filepath to be opened and in which solutions will be returned. :type: <str> :return: Set of MAC/CAM from the given path. :rtype: <frozenset <str>> """ # Put all macs in a list, not a set # => allow us to detect duplicated macs among all mac files total_macs = list() file_number = 0 if os.path.isfile(path): # Get set of mac lines total_macs = get_mac_lines(path) file_number = 1 elif os.path.isdir(path): # Get set of macs for each file for file_number, file in enumerate(glob.glob(path + '*mac.txt'), 1): temp_macs = get_mac_lines(file) total_macs += list(temp_macs) # Print the number of macs for the given file LOGGER.info("File %s: MACS: %s", file, len(temp_macs)) LOGGER.info("Files processed: %s", file_number) assert file_number != 0, "No *mac.txt files found!" # Check of duplicated macs (number > 1) unique_macs = frozenset(total_macs) duplicated_macs = {(k, v) for k, v in Counter(total_macs).items() if v != 1} if duplicated_macs: LOGGER.info("<%s> Duplicated MACS: %s", path, duplicated_macs) LOGGER.info("<%s> Number of MACS loaded: %s", path, len(total_macs)) LOGGER.info("<%s> Number of unique MACS returned: %s", path, len(unique_macs)) # assert len(total_macs) == len(unique_macs) return unique_macs
## Handle *.json files #########################################################