Source code for biopax2cadbiom.biopax_converter

# -*- coding: utf-8 -*-
# MIT License
#
# Copyright (c) 2017 IRISA, Jean Coquet, Pierre Vignet, Mateo Boudet
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Contributor(s): Jean Coquet, Pierre Vignet, Mateo Boudet

"""
This module is used to translate BioPAX data to CADBIOM models.
"""
from __future__ import unicode_literals
from __future__ import print_function

# Standard imports
import md5
import copy
import os
import re
import itertools as it
from collections import defaultdict
import csv
from logging import DEBUG
import dill
import sympy


# Custom imports
from biopax2cadbiom import sparql_biopaxQueries as query
from biopax2cadbiom.cadbiom_writer import create_cadbiom_model, remove_scc_from_model
import biopax2cadbiom.commons as cm
from biopax2cadbiom.classes import Control
from biopax2cadbiom.reactions import update_reactions, duplicate_complexes
from biopax2cadbiom.transitions import build_transitions
from biopax2cadbiom.tools import parse_uri, get_metrics_from_data

LOGGER = cm.logger()


[docs]def add_reactions_and_controllers_to_entities(dictReaction, dictControl, dictPhysicalEntity): """Fill the attribute `reactions` of PhysicalEntity objects with Reactions and Controls objects. .. note:: The `reactions` attribute corresponds to a set of reactions in which the entity is involved (as controller or participant). We use this attribute in order to know if complexes have to be deconstructed (only if a subentity is used elsewhere in a reaction). .. note:: Supported roles in reactions are: - productComponent - participantComponent - leftComponents - rightComponents - controller of Thanks to :meth:`filter_control` we have only entities; each controller (`control.controllers`) is an entity (not a pathway), so only entities control reactions. Empty `controllers` of `Control` objects shouldn't happen since this attr is not optional in the SPARQL query. Some controlled elements can also be controls (Cf `Modulation` class in some databases); This has nothing to do with this function. See :meth:`add_controllers_to_reactions` and :meth:`get_control_group_condition` instead. => We just remove controllers that aren't in dictPhysicalEntity; and Controls that haven't a controlled reaction (but another control). :param dictReaction: Dictionary of biopax reactions, created by the function query.get_biopax_reactions() :param dictControl: Dictionary of biopax controls, created by the function query.get_biopax_controls() :param dictPhysicalEntity: Dictionary of biopax physicalEntities, created by the function query.get_biopax_physicalentities() :type dictReaction: <dict <str>: <Reaction>> keys: uris; values reaction objects :type dictControl: <dict <str>: <Control>> keys: uris; values control objects :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> keys: uris; values entity objects """ # Add reactions where each entity is involved for reaction in dictReaction.itervalues(): if reaction.productComponent is not None: entity_uri = reaction.productComponent dictPhysicalEntity[entity_uri].reactions.add(reaction) if reaction.participantComponent is not None: entity_uri = reaction.participantComponent dictPhysicalEntity[entity_uri].reactions.add(reaction) for entity_uri in reaction.leftComponents | reaction.rightComponents: dictPhysicalEntity[entity_uri].reactions.add(reaction) # Add reactions to entities that control them for control in dictControl.itervalues(): controlled_reaction = dictReaction.get(control.controlled, None) # The controlled element must be a reaction here # (We exclude Control subclasses (cascades of Modulations)) if control.controlType is not None and controlled_reaction: # Update the reactions of each controller only if it is an entity [ dictPhysicalEntity[controller].reactions.add(controlled_reaction) for controller in control.controllers if controller in dictPhysicalEntity ]
[docs]def add_controllers_to_reactions(dictReaction, dictControl): """Fill the attribute `controllers` of Reaction objects with Controls objects. .. note:: Thanks to :meth:`filter_control` we have only entities; each controller (`control.controllers`) is an entity (not a pathway), so only entities control reactions. .. note:: The `controllers` attribute of a Reaction corresponds to a set of controller entities involved in it. :param dictReaction: Dictionary of biopax reactions created, by the function query.get_biopax_reactions() :param dictControl: Dictionary of biopax controls created, by the function query.get_biopax_controls() :type dictReaction: <dict <str>: <Reaction>> keys: uris; values reaction objects :type dictControl: <dict <str>: <Control>> keys: uris; values control objects """ # Get control objects for control in dictControl.itervalues(): # uri of controlled element (Control/Reaction) controlled_reaction = control.controlled # We don't want control with empty controlType! # Empty controller is not possible since this attr is not optional in the SPARQL query. if control.controlType is not None: if controlled_reaction in dictReaction: # update reaction object with control object dictReaction[controlled_reaction].controllers.add(control) continue
# Here, the controlled URI is not in dictReaction # => it is not a reaction but can be a control involved in a # cascade of controls # pass
[docs]def transfer_class_attributes_on_child_entities(entities, dictPhysicalEntity): """Transfer modificationFeatures and location of classes on child entities If a child entity does not have the same attributes as its class, it is inserted in the list of BioPAX entities under a specific (new) URI, with its new inherited attributes. It is possible that an entity describing this state is already in the BioPAX ontology. In this case, the two entities will then be grouped by the function :meth:`~biopax2cadbiom.biopax_converter.merge_duplicated_entities`. .. todo:: Si entité dupliquée déjà dans le dictionnaire: - elle est déjà utilisée ailleurs dans 1 classe. => doit etre décompilée même si ne participe à aucune réaction. - Sinon, supprimer les réactions. => sert à rien de créer des entités non utilisées dans le modèle. .. note:: In a general way, sub-entities are not duplicated if the class doesn't provide information that is not already in the sub-entity. We try not to overwrite modifications or location if they are the same. The transfer of similar post-translational modifications AND location is useless. => Avoid the duplication of entities. However, we can not exclude that inconsistent / conflicting modifications are applied to the sub-entities such as: `residue modification, active` and `residue modification, inactive` .. note:: About reactions attached to duplicate entities: We CAN reset all reactions (the attributes `reactions` of sub-entities) involving the entity in its old context (without the transfer of attributes that we operate here). This avoids appearing in the model entities that are not reused anywhere else. If the entity must be present in the model, it will be decided during the merge by the function :meth:`~biopax2cadbiom.biopax_converter.merge_duplicated_entities` that also merges the reactions of the duplicates. BUT we choose to keep the reactions of the parent entity in order to solve VirtualCase14 bug. We prefer to have more entities than false transitions in the final model. There are four cases to consider about this problem: - none of the duplicates contains a reaction. => the merged entity will be absent from the model - the duplicate entity has no reaction but the duplicate already in the model contains one. => the merged entity will be in the model - the duplicate entity has a reaction but the duplicate already in the model does not contain one. - if the attribute `reactions` is not reset, the merged entity will be wrongly in the model because of it will be flagged as being reused elsewhere by :meth:`~biopax2cadbiom.biopax_converter.detect_members_used`. - if the attribute `reactions` is reset, a side effect described in testCase 14 will appear: the decompilation of classes participating in reactions causes the formation of incorrect relations between the entities of these classes. .. warning:: dictPhysicalEntity is modified here. :param entities: Dictionary of entities to be processed. keys: uris; values entity objects :param dictPhysicalEntity: Dictionary of all entities in the model. keys: uris; values entity objects :type entities: <dict <str>: <PhysicalEntity>> :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> """ def duplicate_members(entity, new_entities): """Handle the duplication of members of the given entity .. note:: For now we only take care of modificationFeatures and location. .. note:: `new_entities` dict is filled with the duplicated ones. :param entity: An entity object, that is a class with members (complexes included). :type entity: <PhysicalEntity> """ # Memorize the uris of the new entities/members temp_members_uris = set() LOGGER.debug("\nentity class:", entity.uri) for sub_entity_uri in entity.members: LOGGER.debug("subentity tested:", sub_entity_uri) # Do not process members of already processed classes if '_duplicated' in sub_entity_uri: return # Check if the subentity carry the same attributes as its class # - modificationFeatures # - location sub_entity = dictPhysicalEntity[sub_entity_uri] # Detect modifications of entity that are not in sub_entity # => avoid to duplicate entities if the class doesn't provide # information that is not already in the sub-entity modifs_sub_entity = sub_entity.modificationFeatures.viewitems() modifs_entity = entity.modificationFeatures.viewitems() modifs_to_add = modifs_entity - modifs_sub_entity if not modifs_to_add and sub_entity.location == entity.location: # Same modificationFeatures and location # Preserve the uri of this subentity in the members of its class # and iterate on the next subentity. temp_members_uris.add(sub_entity_uri) # LOGGING if LOGGER.getEffectiveLevel() == DEBUG: LOGGER.debug( "Subentity has the same modifs and location as the class (DO NOTHING):" "\nentity uri: " + entity.uri + \ "\nsubentity uri: " + sub_entity.uri + \ "\nentity name: " + entity.name + \ "\nsubentity name: " + sub_entity.name + \ "\nentity modifs: " + str(entity.modificationFeatures) + \ "\nsubentity modifs: " + str(sub_entity.modificationFeatures) ) if entity.location: # location attr may be None LOGGER.debug( "entity location: " + entity.location.name + \ "\nsubentity location: " + sub_entity.location.name ) continue #################################################################### # Here one or the two attributes (modificationFeatures or location) # needs to be reported on the subentity # Create new entity # Deep (recursive) copy (objects are duplicated) dup_sub_entity = copy.deepcopy(sub_entity) dup_sub_entity.uri += '_duplicated' # Reset reactions for this new context # => avoids the presence of non-reused entities # dup_sub_entity.reactions = set() # Inherit reactions of parent # Si entité dupliquée non utilisée ailleurs dans le modèle, # en faisant ça on force son ajout dans le modèle final au lieu de # conserver seulement la classe... dup_sub_entity.reactions = entity.reactions # Inherit modifications if the class provides modificationFeatures # that are not already in the sub-entity if modifs_to_add: # Fingerprint of modifications shortened_modifs = md5.new( shortening_modifications( entity.modificationFeatures, length=None ) ).hexdigest() # Create a new custom uri dup_sub_entity.uri += '_' + shortened_modifs ## TODO: vérifier que on écrase pas des attributs déjà spécifiés ... ## il faut essayer de prouver que les modifs préexistantes ## de l'entité fille ne sont pas incompatibles avec les modifs ## que la classe y applique. ## Log ce qui est fait ici pour que l'utilisateur puisse ## vérifier que l'update du dict ne soit pas aberrante en ## ajoutant des modifs conflictuelles/non compatibles entre elles. ## ## Se limiter à détecter la présence de modifcations dans l'entité ## dupliquée. La détection de conflits est difficile. if dup_sub_entity.modificationFeatures: # Modifications are not the same => just log it LOGGER.warning( "BioPAX could be inconsistent; Please double check the " "transfer of the following modificationsFeatures.\n" "class %s; subentity %s\n" "OLD subentity features: '%s'; ADDED: '%s'", entity.short_uri, dup_sub_entity.short_uri, dup_sub_entity.modificationFeatures, entity.modificationFeatures ) # Merge dup_sub_entity.modificationFeatures.update(modifs_to_add) # Inherit location if entity.location: if dup_sub_entity.location: if dup_sub_entity.location.name != entity.location.name: # Locations are different => bug LOGGER.error( "BioPAX bug; Please double check the transfer of the " "following location.\n" "class %s; subentity %s\n" "OLD subentity location: '%s'; NEW: '%s'", entity.short_uri, dup_sub_entity.short_uri, dup_sub_entity.location.name, entity.location.name ) # Force Inherit # /!\ This is a fix for Reactome where classes have # different locations of the entities they contain. inherit_location(dup_sub_entity, entity) # Here the Locations are the same => do nothing pass else: # Subentity has no location => inherit it inherit_location(dup_sub_entity, entity) # Register the new entity if dup_sub_entity.uri not in dictPhysicalEntity: # The subentity has never been already encountered new_entities[dup_sub_entity.uri] = dup_sub_entity dictPhysicalEntity[dup_sub_entity.uri] = dup_sub_entity else: LOGGER.warning( "Duplicated entity already in dictPhysicalEntity <%s>", dup_sub_entity.uri ) # Update reactions # Not important if reactions are reset above # (read the docstring), but we choose to keep those of the parent # entity so ... # # Also updates new_entities via reference if present in it; # if not, the subentity is just member of a class already processed dictPhysicalEntity[dup_sub_entity.uri].reactions.update(dup_sub_entity.reactions) # Memorize the uri of the new entity/member temp_members_uris.add(dup_sub_entity.uri) # Replace members with new entities assert len(entity.members) == len(temp_members_uris) entity.members = temp_members_uris def inherit_location(dup_sub_entity, entity): """Copy the location of the given entity to the given subentity The URI of the subentity is actualized. """ # Fingerprint of location shortened_loc = md5.new(entity.location.name).hexdigest() # Create a new custom uri dup_sub_entity.uri += '_' + shortened_loc dup_sub_entity.location = entity.location dup_sub_entity.location_uri = entity.location_uri new_entities = dict() # dictPhysicalEntity is modified during the iteration => do not iterate on it for entity in tuple(dictPhysicalEntity.itervalues()): if entity.members and (entity.modificationFeatures or entity.location): # A class with modification features or location attributes # that need to be reported on its members # Make new members versions with modifs & location # Replace old members by these new members in the current entity duplicate_members(entity, new_entities) LOGGER.info("transfer_class_attributes_on_child_entities: End of pass; %s", len(new_entities)) if new_entities: # Loop as long as there are entities to decompile # Pass only newly created entities (faster) transfer_class_attributes_on_child_entities(new_entities, dictPhysicalEntity)
[docs]def merge_duplicated_entities(dictPhysicalEntity, model_path, log_files=True): """Merge multiple occurrences of the same entity in the model The duplicates can come from the BioPAX database, as well as from the process of transferring post-translational modifications of classes to their daughter entities in :meth:`~biopax2cadbiom.biopax_converter.transfer_class_attributes_on_child_entities` In order to group the entities, they are ordered according to some of their attributes: - entityType - entityRef - name - components_uris - location_uri - modificationFeatures 3 files are created in this function: - `sort_dumped.csv`: Dump of all entities (sorted but not grouped) - `sort_grouped.csv`: Dump proposed groups - `sort_grouped_after_merge.csv`: Dump definitive groups .. note:: About reactions attached to duplicate entities: Reactions from all duplicates are merged together. .. warning:: If classes with similar attributes are merged, then we consider that their members are similar. These members **are not** merged together. .. todo:: During the merge of entities, prefer existing uris in the BioPAX model rather than those formed by duplication. :param dictPhysicalEntity: Dictionary of biopax physicalEntities, created by the function query.get_biopax_physicalentities() :param model_path: Filepath of the final model. :key log_files: (optional) If True, csv files are created. Default: True. :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> keys: uris; values entity objects :type model_path: <str> :type log_files: <boolean> :return: Dictionary of canonical uris as keys, and lists of non-canonical linked uris as values. :rtype: <dict <list>> """ def get_attrs(elem): """Prepare attributes for the dump un txt files .. note:: None attributes are displayed as empty strings :param elem: PhysicalEntity :type elem: <PhysicalEntity> """ return ((lambda attr: attr if attr and attr != "[]" else "")(attr) for attr in [ elem.short_uri, elem.entityRef, elem.name, # A short version of URIs (without domain names) parse_uri(sorted(elem.components_uris)), parse_uri(elem.location_uri), shortening_modifications(elem.modificationFeatures, None) ]) def dump_group(f_d, key, group): """Dump the group of entities in an opened file :param f_d: File descriptor :param key: Tuple of the 6 grouping attributes for the given group :param group: Tuple of entities :type f_d: <open file> :type key: <tuple> :type group: <tuple> """ f_d.write( "---\n" + "({}, {}, {}, {}, {}, {})\n".format( key[0], key[1], key[2], parse_uri(key[3]), key[4], key[5] ) ) [f_d.write(";".join(get_attrs(elem)).encode("utf8") + str("\n")) for elem in gr] # Sort entities before group them sorted_entities = sorted(dictPhysicalEntity.itervalues(), key=sort_callback) # Get the output directory output_dir = os.path.dirname(model_path) + '/' if log_files: # Dump all entities (sorted but not grouped) with open(output_dir + "sort_dumped.csv", 'w') as f_d: # Header f_d.write("short_uri;entityRef;name;components_uris;location_uri;modificationFeatures\n") # Data [f_d.write(";".join(get_attrs(elem)).encode("utf8") + str("\n")) for elem in sorted_entities] # Dump proposed groups with open(output_dir + "sort_grouped.csv", 'w') as f_d: # Header f_d.write("(entityType, entityRef, name, components_uris, location_uri, modificationFeatures)\n") # Data # Metrics nb_grouped_entities = 0 nb_groups = 0 merged_entities_mapping = defaultdict(list) # Iteration over entity groups for key, group in it.groupby(sorted_entities, key=sort_callback): gr = tuple(group) if len(gr) == 1: # Leave unique objects continue # Metrics nb_groups += 1 nb_grouped_entities += len(gr) # Write the group dump_group(f_d, key, group) # For debugging purposes: Detect the origin of the reactions # See the implications in transfer_class_attributes_on_child_entities() # Can be used to extract duplicated entities with reactions... dup_found = False legit_found = False for elem in gr: if not elem.reactions: continue if '_duplicated' in elem.uri: # Duplicated element with reactions dup_found = True else: # Non duplicated element with reactions legit_found = True if dup_found and legit_found: LOGGER.warning( "Mixed reactions in group (from both duplicate and existing " "entities); Entity retained for the group: <%s>", gr[0].uri ) # if dup_found and not legit_found: # reactions from duplicated only # Merging # The first object is arbitrary taken as the reference # PS: Members of classes ARE NOT merged (see docstring) for elem in gr[1:]: # Update the reference object (shared by references) gr[0].reactions.update(elem.reactions) # Replace the old object by a reference to the reference object dictPhysicalEntity[elem.uri] = gr[0] merged_entities_mapping[gr[0].uri].append(elem.uri) LOGGER.info("Merge %s entities from %s groups", nb_grouped_entities, nb_groups) if not log_files: return merged_entities_mapping # Dump definitive groups for verification purposes # Group after merge: duplicates have the same URIS sorted_merged_entities = sorted(dictPhysicalEntity.itervalues(), key=sort_callback) with open(output_dir + "sort_grouped_after_merge.csv", 'w') as f_d: # Header f_d.write("(entityType, entityRef, name, components_uris, location_uri, modificationFeatures)\n") # Data for key, group in it.groupby(sorted_merged_entities, key=sort_callback): gr = tuple(group) if len(gr) == 1: continue # Write the group dump_group(f_d, key, group) return merged_entities_mapping
[docs]def sort_callback(elem): """Order of the sort of PhysicalEntities on their attributes The sort of all entities must respect lexicographic order of all attributes. => if component URI is not casted into a sorted list, the order is modified, and then, itertools.groupby will be fooled: - ['W', 'X'] < ['X', 'Y'] => True - {'X', 'W'} < {'X', 'Y'} => False .. code-block:: text ['W', 'X'] is < to ['X', 'Y'] Ater;A;['W', 'X'];http://simulated/test#anywhere; A;A;['X', 'Y'];http://simulated/test#anywhere; Abis;A;['X', 'Y'];http://simulated/test#anywhere; If we do not cast set into list: A;A;['Y', 'X'];http://simulated/test#anywhere; Abis;A;['Y', 'X'];http://simulated/test#anywhere; Ater;A;['X', 'W'];http://simulated/test#anywhere; :param elem: PhysicalEntity :type elem: <PhysicalEntity> """ return (elem.entityType, elem.entityRef, elem.name, sorted(elem.components_uris), elem.location_uri, elem.modificationFeatures)
[docs]def detect_members_used(dictPhysicalEntity, full_graph=False, keepEmptyClasses=False): """Set the attribute 'membersUsed' of generic entities (classes). Set of members involved in at least one reaction. Empty set if the entity does not have members. .. warning:: A generic entity can be any of the subclasses of PhysicalEntity. Note that complexes are the only entities with ALWAYS `'flat_components' != None` value. **A complex can also be a class** and we check that these entities have no `flat_components` in :meth:`develop_complexes`. :param dictPhysicalEntity: Dictionary of biopax physicalEntities, created by the function query.get_biopax_physicalentities() :param full_graph: (optional) Convert all entities to cadbiom node, even the entities that are not used elsewhere. :param keepEmptyClasses: (optional) (deprecated) If some members are not used, we add the entity to the `membersUsed` attribute with the aim to represent all the members not used. => This will break some conversions and unit tests because the translation implies the removal of genericity. :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> keys: uris; values entity objects :type full_graph: <bool> """ for entity in dictPhysicalEntity.itervalues(): if full_graph: # All classes are developped entity.membersUsed = entity.members continue # full_graph is False: # Try to detect if members of the the current generic entity are used # in almost 1 reaction in the model. for sub_entity_uri in entity.members: if sub_entity_uri in dictPhysicalEntity: # entity.reactions attr must not be empty if dictPhysicalEntity[sub_entity_uri].reactions: # The generic entity will be "deconstructed" because # it is not elementary. entity.membersUsed.add(sub_entity_uri) else: # This URI is not in dictPhysicalEntity => ? # TODO: IL PEUT Y AVOIR DES ENTITY NON REFERENCEE # todo: grave ? on en fait quoi ?? # EX: http://www.reactome.org/biopax/60/48887#Complex5918 LOGGER.warning( "The member entity <%s> of <%s> IS NOT in PhysicalEntities. " "Please check your BioPAX file.", sub_entity_uri, entity.uri ) # ICI on n'ajoute pas la classe seule si aucune de ces entités n'est réutilisée ? # => non, elle est considérée comme 1 entité simple (membersUsed vide) if keepEmptyClasses: # If some members are not used, we add the entity to the membersUsed # with the aim to represent all the members not used if len(entity.members) != len(entity.membersUsed): entity.membersUsed.add(entity.uri)
[docs]def develop_complexes(dictPhysicalEntity): """Set the attribute 'flat_components' of complexes entities. 'flat_components' is a list of tuples of component URIs. This function depends of :meth:`detect_members_used`. :param dictPhysicalEntity: Dictionary of biopax physicalEntities, created by the function query.get_biopax_physicalentities() :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> keys: uris; values entity objects """ # /!\ dictPhysicalEntity will be modified in place # Take complexes without flatComponents # WHY ? Because nested complexes are processed recursively # So a subcomplex could have been processed by its top complex before being # iterated here. # Some Complex are classes, some of these classes may have components. # In this case, we produce a new Complex (copy of the class) with only # the components of the class; and we erase these components from the class. # This new complex must be added to dictPhysicalEntity. new_physical_entities = dict() [developComplexEntity(entity, dictPhysicalEntity, new_physical_entities) for entity in dictPhysicalEntity.itervalues() if entity.is_complex and not entity.flat_components] # Secure the merge of new_physical_entities with dictPhysicalEntity # We must not have an intersection between them assert not dictPhysicalEntity.viewkeys() & new_physical_entities.viewkeys() dictPhysicalEntity.update(new_physical_entities) LOGGER.debug( "develop_complexes:: New complexes due to the processing of classes:\n %s", new_physical_entities )
[docs]def developComplexEntity(complex_entity, dictPhysicalEntity, new_physical_entities): """Fill `flat_components` attribute of the given complex. Called by :meth:`develop_complexes`. Search recursively all components of the given complex. Some Complex have subcomplex like in Reactome 56 from PC8. Example: - Complex_c33f6c2be7551100a54e716b3bf8ec8a: - Complex_0088fc0fe989a0b0abc3635b20df8d90 - Complex_b87d9cb2e60df79cdde88a9f8f45e80d Here we handle ONLY COMPLEXES! Even if some complexes have sub-entities that are classes, `flat_components` contains ONLY uris of atomic entities. ONLY `flat_components_primitives` contains generic entities. In `flat_components_primitives`, we just want items (including generic ones) in the same order as any flat_component in `flat_components`. --- Some Complex are classes, some of these classes may have components. In this case, we produce a new Complex (copy of the class) with only the components of the class; and we erase these components from the class. - The new complex is added to new_physical_entities and must be added later to dictPhysicalEntity. Its uri is completed with the suffix "_not_class". - The class is left in dictPhysicalEntity. .. warning:: Complexes are the only entities that have a `flat_components` attribute set. However, Complexes that are also classes should have an empty `flat_components`. .. note:: Empty complexes (without component) are processed like any basic entity. Cf VirtualCase19: 'B_bottom' .. todo:: When a class occurs multiple times through components of complexes we should remove it and make a set of primitives. This will avoid cartesian product of members, duplication of complexes on useless flat_components. Cf VirtualCase19: 'B' class in C_top and C_bottom. --- Full explanations: `developed_components` is a list of tuples that contain combinations of all recursively searched sub-entities in the given complex. `developed_classes` is a list of **primitives** sub-entities in the given complex. Classes are not replaced by their members. Entities are in the same order as in a flat_component. The aim is to dynamically rebuild the flat_component of a complex when we remove genericity in `replace_and_build()`. Example: .. code:: A: complex composed with components: B: complex with components: W: protein X: generic smallmolecule with members: Y: smallmolecule (used elsewhere) Z: smallmolecule (not used elsewhere) C: protein X is a class that represents 2 smallmolecules: Z and Y For X: developed_components = [X, Y] (edit: just [Y] now) For W: developed_components = [W] So for B: developed_components = [[X, Y], [W]] (edit: [[Y], [W]]) and flat_components = [(X, W), (Y, W)] (edit: [(Y, W)]) For A: developed_components = [[C], [(X, W), (Y, W)]] (edit: [[C], [(Y, W)]]) and flat_components = [(C, X, W), (C, Y, W)] (edit: now X is removed, and the final result is [(C, Y, W)]) If Z has been used elsewhere, we would have had the following final result for developed_components of A: [[C], [(Y, W), (Z, W)]] and flat_components: [(C, Y, W), (C, Z, W)] developed_classes = [C, X, W] flat_components_primitives = [C, X, W] PS: 'A' can be Complex_6e3d8ef563cbcc0c9e2a4afb2a920c38 (Reactome v56 inPC8); In this complex, Z is also used, so X is totally removed. :param complex_entity: Complex entity :param dictPhysicalEntity: Dictionary of biopax physicalEntities, created by the function query.get_biopax_physicalentities() :type complex_entity: <PhysicalEntity> :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> keys: uris; values entity objects """ developed_components = list() developed_classes = list() LOGGER.debug("developComplexEntity:: current complex <%s>", complex_entity.short_uri) # Handle complexes/classes that have components (see docstring) if complex_entity.is_class and complex_entity.components_uris: # Transform the complex/class into simple complex simple_complex_uri = complex_entity.uri + "_not_class" # Add the simple complex as a member of the class # /!\ simple_complex_uri is not yet in dictPhysicalEntity complex_entity.membersUsed.add(simple_complex_uri) simple_complex = copy.deepcopy(complex_entity) # Erase members from the simple complex and keep components_uris simple_complex.members = set() simple_complex.membersUsed = set() simple_complex.uri = simple_complex_uri # Add to physicalentities new_physical_entities[simple_complex_uri] = simple_complex # Erase components_uris from the current complex complex_entity.components_uris = set() LOGGER.warning("Complex is a class with components <%s>", complex_entity.short_uri) complex_entity = simple_complex # Build developped components for each uri in components_uris # PS: We use non-canonical uris here # To have access to canonical uris, use below component.uri instead of component_uri for component_uri in complex_entity.components_uris: if component_uri not in dictPhysicalEntity: # This URI is not in dictPhysicalEntity => ? LOGGER.warning( "The component <%s> IS NOT in PhysicalEntities. Please check your BioPAX file.", component_uri ) continue # Get the entity component = dictPhysicalEntity[component_uri] # /!\ We assert that a Complex is not a class, # that Complexes can be nested, but classes can't (no class in class) # => This function is recursive only for complexes. if component.is_complex and not component.is_class: # Develop the sub complex if it is not already done if not component.flat_components: developComplexEntity(component, dictPhysicalEntity, new_physical_entities) # PS: flat_components can be [] for complex/classes or empty complex # Here it can only be an empty complex # Still no flat_components ? if not component.flat_components: # Complex without component: Just add it as a simple entity # (Cf virtualCase19, 'B_bottom' is an empty complex) developed_components.append([component_uri]) developed_classes.append([component_uri]) else: # Standard complex with components # (Cf virtualCase18, nested complex) developed_components.append(component.flat_components) developed_classes.append(component.flat_components_primitives) elif component.is_class: #or (component.is_complex and component.is_class): # - A generic sub entity (class) with some members used elsewhere (membersUsed is set) # - Or a complex/class (without component because they are removed # before this loop) # See detect_members_used() developed_components.append(list(component.membersUsed)) developed_classes.append([component_uri]) # /!\ Alternative to use canonical-uris because uri is not yet in dictPhysicalEntity: # developed_components.append([dictPhysicalEntity.get( # uri, new_physical_entities.get(uri)).uri for uri in component.membersUsed]) else: # A simple sub entity developed_components.append([component_uri]) developed_classes.append([component_uri]) if developed_components: # Make a cartesian product lists of entities or lists tuples of entities # in developed_components # See docstring. # Example of the unpacking of developed_components: # developed_components = [[C], [(Y, W), (Z, W)]] # (Y, w) and (Z, W) are from sub-complexes # elements = (C, (Y, W)) and (C, (Z, W) # flat_components = [(C, Y, W), (C, Z, W)] for elements in it.product(*developed_components): l = [] for e in elements: if isinstance(e, tuple): l += e else: l.append(e) complex_entity.flat_components.append(tuple(l)) LOGGER.info("developed_components: %s", developed_components) if developed_classes: # Flat all data in developed_classes # We just want items (including generic ones) in the same order as # any flat_component complex_entity.flat_components_primitives = list(it.chain(*developed_classes)) LOGGER.info("flat_components_primitives: %s", complex_entity.flat_components_primitives) LOGGER.info("components_uris: %s", complex_entity.components_uris) LOGGER.info("flat_components: %s", complex_entity.flat_components)
[docs]def compute_locations_names(dictLocation, numeric_compartments_names=False): """Create a cadbiom ID for each location. .. warning:: It updates the key 'cadbiom_name' of entities in dictLocation[location]. :param dictLocation: Dictionary of biopax locations created by query.get_biopax_locations(). keys: CellularLocationVocabulary uri; values: Location object :param numeric_compartments_names: (optional) If True, names of compartments will be based on numeric values instead of their real names. :type dictLocation: <dict> :type numeric_compartments_names: <bool> :returns: Dict of encoded locations. keys: numeric value or real location name; values: Location object :rtype: <dict <str>:<Location>> """ idLocationToLocation = dict() # Enumerate on locations, and get the future index used if # numeric_compartments_names is True g = ((str(location_name), location_uri) for location_name, location_uri in enumerate(sorted(dictLocation.iterkeys()))) for location_name, location_uri in g: location = dictLocation[location_uri] # Encode compartments names # Clean name for correct cadbiom parsing. if not numeric_compartments_names: # Handle empty location names if location.name: location_name = clean_name(location.name) else: # Keep the last part of the uri location_name = location.uri.split('/')[-1] idLocationToLocation[location_name] = location # Update dictLocation with encoded id location.cadbiom_name = location_name LOGGER.debug("Encoded locations: %s", idLocationToLocation) return idLocationToLocation
[docs]def add_locations_to_entities(dictPhysicalEntity, dictLocation): """Add Location objects to PhysicalEntities""" for entity in dictPhysicalEntity.itervalues(): if entity.location_uri: entity.location = dictLocation[entity.location_uri]
[docs]def add_modifications_features_to_entities( dictPhysicalEntity, dictModificationFeatures): """Add modifications and their number to the entity name """ for entity_uri, modif in dictModificationFeatures.iteritems(): try: dictPhysicalEntity[entity_uri].modificationFeatures = modif except KeyError: # Blacklisted entity pass
[docs]def add_xrefs_to_entities(dictPhysicalEntity, dictEntities_db_refs): """Add xrefs to entities :param dictEntities_db_refs: Dictionary of entityRefs. keys: uris; values: dict of databases keys: database names; values: ids :type dictEntities_db_refs: <dict <str>: <dict <str>: <list>>> """ for entity_uri, xrefs in dictEntities_db_refs.iteritems(): try: dictPhysicalEntity[entity_uri].xrefs = xrefs except KeyError: # Blacklisted entity pass
[docs]def get_pathways_entities(dictReaction, dictControl, dictPhysicalEntity): """This function creates the Dictionary pathwayToPhysicalEntities. :param dictReaction: Dictionary of biopax reactions, created by the function query.get_biopax_reactions() :param dictControl: Dictionary of biopax controls, created by the function query.get_biopax_controls() :param dictPhysicalEntity: Dictionary of biopax physicalEntities, created by the function query.get_biopax_physicalentities() :type dictReaction: <dict <str>: <Reaction>> keys: uris; values reaction objects :type dictControl: <dict <str>: <Control>> keys: uris; values control objects :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> keys: uris; values entity objects :returns: pathwayToPhysicalEntities keys: pathway uris; values: set of entities involved in the pathway. :rtype: <dict <str>: <set>> """ pathwayToPhysicalEntities = defaultdict(set) allPhysicalEntities = dictPhysicalEntity.viewkeys() for reaction in dictReaction.itervalues(): physicalEntities = set() physicalEntities.add(reaction.productComponent) physicalEntities.add(reaction.participantComponent) physicalEntities |= reaction.leftComponents physicalEntities |= reaction.rightComponents physicalEntities &= allPhysicalEntities for pathway in reaction.pathways: pathwayToPhysicalEntities[pathway] |= physicalEntities for control in dictControl.itervalues(): controllers = control.controllers & allPhysicalEntities reaction = dictReaction.get(control.controlled, None) if not reaction: continue for pathway in reaction.pathways: pathwayToPhysicalEntities[pathway].update(controllers) return pathwayToPhysicalEntities
[docs]def add_unique_cadbiom_name_to_entities(dictPhysicalEntity): """Add `cadbiom_name` attribute to entities in dictPhysicalEntity. .. note:: The attribute `cadbiom_name` corresponds to a unique cadbiom ID for the entity (Protein, Complex, Class, etc.). :param dictPhysicalEntity: Dictionary of biopax physicalEntities, created by the function query.get_biopax_physicalentities() :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> keys: uris; values entity objects """ # Get all names and entities for them # Key: name, value: entities using this name entities_cadbiom_names = defaultdict(set) for entity in dictPhysicalEntity.itervalues(): cadbiom_name = build_cadbiom_name(entity) entities_cadbiom_names[cadbiom_name].add(entity) # Set of unique cadbiom names (not used more than 1 time) unique_cadbiom_names = \ {cadbiom_name for cadbiom_name, entities in entities_cadbiom_names.iteritems() if len(entities) == 1} # Attribution of 1 unique name to ALL entities # Key: name, value: entities using this name for cadbiom_name, entities in entities_cadbiom_names.iteritems(): # test findunique directement ici ? if len(entities) == 1: # This name is used only 1 time next(iter(entities)).cadbiom_name = cadbiom_name else: # This name is used by many entities. # We decide to replace it by a unique name for each entity # and convert it according to cadbiom rules # Key: uri, value: unique name unique_cadbiom_synonyms = find_unique_synonyms( cadbiom_name, {entity.uri for entity in entities}, unique_cadbiom_names, dictPhysicalEntity, ) # Set synonyms found to each entity for entity in entities: entity.cadbiom_name = unique_cadbiom_synonyms[entity.uri]
[docs]def add_cadbiom_names_to_entities(dictPhysicalEntity): """Fill 'cadbiom_names' attribute of entities. The aim is to have the list of elements contained in each entities and their names. We process essentially entities with subunits: components or members (complexes or classes). The attribute 'cadbiom_names' corresponds to a list of unique cadbiom IDs for the entity (Complex, Class). Each member of the list is the unique cadbiom ID of each subcomponent present in the attribute 'flat_components'. .. warning:: To fill 'cadbiom_names', we first handle complexes that can be classes; BUT classes are not necessarily complexes (without 'flat_components'), so a recursive decomposition is made. For that, see :meth:`get_cadbiom_names` .. note:: Because complexes are already developed in :meth:`developComplexEntity`, this type of entities do not have to be decompiled recursively here. :param dictPhysicalEntity: Dictionary of biopax physicalEntities, created by the function query.get_biopax_physicalentities() :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> keys: uris; values entity objects """ # Attribution of names for complexes with subentities # PS: Some complexes can be classes here. for entity in dictPhysicalEntity.itervalues(): get_cadbiom_names(entity, dictPhysicalEntity)
[docs]def get_cadbiom_names(entity, dictPhysicalEntity): """To be called recursively or by :meth:`add_cadbiom_names_to_entities` .. note:: See :meth:`add_cadbiom_names_to_entities` for more information. .. note:: The attribute 'cadbiom_names' corresponds to a list of unique cadbiom IDs for the entity (Complex, Class). Each member of the list is the unique cadbiom ID of each subcomponent present in the attribute 'flat_components'. :param entity: A PhysicalEntity. :param dictPhysicalEntity: Dictionary of biopax physicalEntities, created by the function query.get_biopax_physicalentities() :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> keys: uris; values entity objects :type entity: <PhysicalEntity> :return: Set of cadbiom names for the given entity. :rtype: <set> """ ## Complexes # flat_components can contains values only with # contain uri(s) only with --full_graph: # - 0 (not a complex BUT can be a class or a simple entity), # - >= 1 components => it's a complex that CAN BE an class if len(entity.flat_components) == 1: # 1 sub component: # cadbiom_names will contain the parent's name entity.cadbiom_names.append(entity.cadbiom_name) elif len(entity.flat_components) > 1: # Many sub components # flat_components will contain the parent's name, # + a list of subcomponent's names for flat_component in entity.flat_components: s = entity.cadbiom_name + "_" + "_".join( [dictPhysicalEntity[sub_entity_uri].cadbiom_name for sub_entity_uri in flat_component] ) entity.cadbiom_names.append(s) ## Classes elif entity.is_class: # Only classes here (complexes have flat_components >= 1) # Class with members (that are used elsewhere or not) for sub_entity_uri in entity.membersUsed: # /!\ An entity can be in membersUsed of itself: # => because it represents all members not used (model simplif) if sub_entity_uri == entity.uri: # Deprecated code # Generic entity is kept (not decompiled): # We don't go deeper: add its name raise AssertionError("Deprecated code - Should never be reached") entity.cadbiom_names.append(entity.cadbiom_name) else: # Entities that compose the generic entity # => get their names recursively entity.cadbiom_names += \ get_cadbiom_names( dictPhysicalEntity[sub_entity_uri], dictPhysicalEntity ) ## Simple entity else: # Simple entity entity.cadbiom_names.append(entity.cadbiom_name) return entity.cadbiom_names
[docs]def find_unique_synonyms(cadbiom_name, entity_uris, unique_cadbiom_names, dictPhysicalEntity): """Build unique names for the given uris, having the same cadbiom name. .. note:: First, we use synonyms from BioPAX database to find a unique name. When there is no more usable synonyms to build a unique name, we add a version number based on the given cadbiom name for all the remaining entities. .. note:: The merging procedure for similar entities greatly reduces the number of entity groups proposed to this function. :param cadbiom_name: The redundant cadbiom name :param entity_uris: Set of uris of entities having the same name :param unique_cadbiom_names: Set of unique cadbiom names already used :param dictPhysicalEntity: Dictionary of biopax physicalEntities, created by the function query.get_biopax_physicalentities() :type cadbiom_name: <str> :type entity_uris: <set> :type unique_cadbiom_names: <set> :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> keys: uris; values entity objects :returns: Dictionary of uris as keys and unique names as values. :rtype: <dict> """ entities_synonyms = {} while len(entity_uris) != len(entities_synonyms): temp_entities_synonyms = {} # Get a set of synonyms for each entity in entity_uris (if a unique # name for the entity is not already found in entities_synonyms) for uri in entity_uris: if uri not in entities_synonyms: temp_entities_synonyms[uri] = { build_cadbiom_name(dictPhysicalEntity[uri], synonym=synonym) for synonym in dictPhysicalEntity[uri].synonyms } ## Pruning synonyms # Remove all synonyms used more than 1 time, in all entities for uri1, uri2 in it.combinations(temp_entities_synonyms.iterkeys(), 2): synonyms1 = copy.copy(temp_entities_synonyms[uri1]) synonyms2 = copy.copy(temp_entities_synonyms[uri2]) temp_entities_synonyms[uri1] -= synonyms2 temp_entities_synonyms[uri2] -= synonyms1 # Remove cadbiom names already used for uri in temp_entities_synonyms: temp_entities_synonyms[uri] -= unique_cadbiom_names ## Attribution nbEntitiesSelected = 0 for entity_uri, synonyms in temp_entities_synonyms.iteritems(): if synonyms: # Here, multiple synonyms subsist from the pruning # We know that synonyms contain unique names # We take the first name in the set arbitrarily cadbiom_name = next(iter(synonyms)) # Assignation to the uri entities_synonyms[entity_uri] = cadbiom_name # Mark this name as already used unique_cadbiom_names.add(cadbiom_name) nbEntitiesSelected += 1 # Here it is the last loop, there are no more usable synonyms # => we add a version number based on the given cadbiom name for all # remaining entities. if nbEntitiesSelected == 0: for entity_version, entity_uri \ in enumerate(temp_entities_synonyms.iterkeys(), 1): entities_synonyms[entity_uri] = \ "{}_v{}".format( cadbiom_name, entity_version, ) return entities_synonyms
[docs]def shortening_modifications(modificationFeatures, length=1): """Return a short version of all given modification names and occurences. .. note:: Some terms can be corrected before shortening: - `residue modification, inactive`: `inactive` - `residue modification, active`: `active` :param modificationFeatures: Counter of modificationFeatures :param length: Length of the shortening; put None for entire strings. :type modificationFeatures: <Counter> :type length: <int> :return: Short and merged version of the given modificationFeatures. :rtype: <str> """ conversion_table = { 'residue modification, inactive': 'inactive', 'residue modification, active': 'active', } # Trick to force at least 3 first letters for items in conversion_table limiter = length if length is None else 3 def conversion(term): return conversion_table[term][:limiter] if term in conversion_table else term[:length] return '_'.join( [str(nb) + conversion(term) for term, nb in modificationFeatures.iteritems()] )
[docs]def build_cadbiom_name(entity, synonym=None): """Get entity name formatted for Cadbiom. :param entity: PhysicalEntity for which the name will be encoded. :param synonym: (Optional) Synonym that will be used instead of the name of the given entity. :type entity: <PhysicalEntity> :type synonym: <str> :return: Encoded name with location if it exists. :rtype: <str> """ if synonym: name = synonym else: # Check if name is present, otherwise take the uri name = entity.name if entity.name else entity.uri.rsplit("#", 1)[1] # Clean name for correct cadbiom parsing. name = clean_name(name) # Add modifications and their number to the entity name if entity.modificationFeatures: name += '_' + shortening_modifications(entity.modificationFeatures) # Add '_gene' to the name if the entity is a DNA if any(e_type in entity.entityType for e_type in ('Dna', 'DnaRegion')): name += '_gene' # Add '_rna' to the name if the entity is a DNA if any(e_type in entity.entityType for e_type in ('Rna', 'RnaRegion')): name += '_rna' # Add location id to the name if it exists if entity.location: name += '_' + entity.location.cadbiom_name return name
[docs]def clean_name(name): """Clean name for correct cadbiom parsing.""" return re.sub('([^a-zA-Z0-9_])', '_', name)
[docs]def get_control_group_condition(controls, dictPhysicalEntity, controlled_controls): """Get condition for a group of controllers. - Activators are linked together by a logical `'OR'`, - inhibitors are linked together by a logical `'OR'`, - but sets of activators and inhibitors are linked together by a logical `'AND'`. Cascades of controls are supported here; Each regulation from a nested condition is linked by an `'AND'` operator. Unsupported controlTypes lead to a None condition; i.e. a cascade of controls can be breaked if a control has an unknown controlType. .. warning:: `controlType` can be as follows (\* are currently supported because they are general terms; others are from EcoCyc and will be logged as errors): - ACTIVATION* - INHIBITION* - INHIBITION-ALLOSTERIC - INHIBITION-COMPETITIVE - INHIBITION-IRREVERSIBLE - INHIBITION-NONCOMPETITIVE - INHIBITION-OTHER - INHIBITION-UNCOMPETITIVE - ACTIVATION-NONALLOSTERIC - ACTIVATION-ALLOSTERIC .. note:: Controllers/classes are processed in :meth:`get_cadbiom_names`. Here we just use cadbiom_names to distinguish entities. :param controls: Set of Control objects for a reaction. :param dictPhysicalEntity: Dictionary of biopax physicalEntities, created by the function query.get_biopax_physicalentities() :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> keys: uris; values entity objects :type controls: <set <Control>> :return: Sympy condition or None :rtype: <sympy.core.symbol.Symbol> or <None> """ activators_condition = set() inhibitors_condition = set() def get_control_condition(control): """Compute Sympy condition for the given control Cascades of controls are supported here; Each regulation from a nested condition is linked by an `'AND'` operator. This function is agnostic for the controlType attribute (it does not deal with the activation or inhibition meaning). About `cadbiom_names`: Set of cadbiom entities that control the reaction. See: :meth:`get_cadbiom_names`, Classes are processed in this function to set correct names. 1 Complex controller = (potentially) multiple variants stored in cadbiom_names. Resume: Variants are glued by a `'OR'`, sets of variants are glued by a `'AND'` (because these cofactors act simultaneously to control the reaction). - `'OR'`: Handle variants of each controller - `'AND'`: Handle controllers (and their sets of variants) actions """ local_cond = sympy.And( *[ sympy.Or( *[ sympy.Symbol(name) for name in set(dictPhysicalEntity[controller].cadbiom_names) ] ) for controller in control.controllers ] ) # Search controller uri in controlled elements if control.uri in controlled_controls: # Found: This controller is controlled by another control # => begin the control cascade # print("control = controlled", control.short_uri) # print("controllers:", control.controllers) # Get the condition of the current control sympcond = get_control_group_condition( controlled_controls[control.uri], dictPhysicalEntity, controlled_controls ) if sympcond is not None: # Result of a cascade is mandatory: AND operator local_cond = sympy.And(local_cond, sympcond) return local_cond for control in controls: # Opti: Early pruning of Controls without expected controlType if not control.controlType: LOGGER.warning( "The Control <%s> should have a controlType", control.uri ) continue elif control.controlType not in ("ACTIVATION", "INHIBITION"): LOGGER.error( "ControlType '%s' not supported (not 'ACTIVATION' or 'INHIBITION'); " "The control <%s> will not be retained!", control.controlType, control.uri ) continue new_cond = get_control_condition(control) if new_cond is None: continue if control.controlType == 'ACTIVATION': activators_condition.add(new_cond) elif control.controlType == 'INHIBITION': inhibitors_condition.add(new_cond) # Glue activators and inhibitors conditions by a logical AND # and put a NOT before all inhibitors if activators_condition and inhibitors_condition: return sympy.And(sympy.Or(*activators_condition), sympy.Not(sympy.Or(*inhibitors_condition))) elif activators_condition: return sympy.Or(*activators_condition) elif inhibitors_condition: return sympy.Not(sympy.Or(*inhibitors_condition)) else: # controlTypes not supported = no condition return None
[docs]def add_conditions_to_reactions(dictReaction, dictPhysicalEntity, dictControl): """Elaborate condition for each event attached to a reaction. .. note:: Condition: i.e. guard of transition in Cadbiom formalism. :param dictReaction: Dictionary of biopax reactions, created by the function query.get_biopax_reactions() :param dictPhysicalEntity: Dictionary of biopax physicalEntities, created by the function query.get_biopax_physicalentities() :type dictReaction: <dict <str>: <Reaction>> keys: uris; values reaction objects :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> keys: uris; values entity objects """ # Reverse mapping: get controlled Control objects controlled_elements = defaultdict(set) [ controlled_elements[control.controlled].add(control) for control in dictControl.itervalues() if control.controlled not in dictReaction ] # Begin event's numeration from 1 for the event naming for event_number, reaction in enumerate(dictReaction.itervalues(), 1): reaction.event = "_h_" + str(event_number) if not reaction.controllers: # If no controllers for this reaction we go to the next one reaction.cadbiomSympyCond = sympy.true continue # Get logical condition based on all controllers that regulate the reaction condition = get_control_group_condition( reaction.controllers, dictPhysicalEntity, controlled_elements ) reaction.cadbiomSympyCond = sympy.true if condition is None else condition
[docs]def get_transitions(dictReaction, dictPhysicalEntity): """Return transitions with (ori/ext nodes) and their respective events. Types considered as reactions: - Conversion - BiochemicalReaction - ComplexAssembly - Transport - TransportWithBiochemicalReaction Types considered as regulators: - Catalysis - Control - TemplateReactionRegulation Types not supported: - MolecularInteraction - Degradation .. warning:: dictPhysicalEntity is modified in place. We add "virtual nodes" for genes that are not in BioPAX format. .. todo:: handle Degradation types and TRASH nodes => will crash cadbiom writer because they are not entities... :param dictReaction: Dictionary of biopax reactions, created by the function query.get_biopax_reactions() :param dictPhysicalEntity: Dictionary of biopax physicalEntities, created by the function query.get_biopax_physicalentities() :type dictReaction: <dict <str>: <Reaction>> keys: uris; values reaction objects :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> keys: uris; values entity objects :return: Dictionary of transitions and their respective set of events. Example: .. code:: python subDictTransition[(cadbiomL,right)].append({ 'event': transition['event'], 'reaction': reaction, 'sympyCond': transitionSympyCond } :rtype: <dict <tuple <str>, <str>>: <list <dict>>> """ def update_transitions(left_entity, right_entity, reaction): """.. todo:: Move this function and reuse it elsewhere. """ dictTransition[(left_entity, right_entity)].append( { 'event': reaction.event, 'reaction': reaction.uri, 'sympyCond': reaction.cadbiomSympyCond } ) dictTransition = defaultdict(list) reaction_types = ("Conversion", "BiochemicalReaction", "ComplexAssembly", "Transport", "TransportWithBiochemicalReaction") regulator_types = ("Catalysis", "Control", "TemplateReactionRegulation") garbage_types = ("MolecularInteraction", "Degradation") # Keep standard reactions to process them at the end of the following loop # because build_transitions now takes an iterable of reactions. standard_reactions = list() for reaction_uri, reaction in dictReaction.iteritems(): typeName = reaction.interactionType if typeName in reaction_types: # TODO: ATTENTION: que faire si 'leftComponents' # ou bien 'rightComponents' sont vides ? standard_reactions.append(reaction) ## elif typeName == "Degradation": ## # Reaction of degradation = Suppression of entities ## # Normally there is just one component ## assert not reaction.rightComponents, \ ## "The degradation reaction {}, contains an output entity" \ ## " (right) ! Please check this !".format(reaction_uri) ## ## for entityL in reaction.leftComponents: ## cadbiomL = dictPhysicalEntity[entityL].cadbiom_name ## ## # /!\ This modifies dictTransition in place ## update_transitions( ## cadbiomL, "#TRASH", reaction ## ) elif typeName == "TemplateReaction": # Reaction of transcription # In Cadbiom language: Gene => product of gene entityR = reaction.productComponent # Sometimes, there is no entityR # ex: http://pathwaycommons.org/pc2/#TemplateReaction_3903f25156da4c9000a93bbc85b18572). # It is a bug in BioPax. if entityR is None: LOGGER.error( "BioPAX bug; Transcription reaction without product: <%s>", reaction_uri ) else: # Update dictPhysicalEntity with entities corresponding to genes # PS: These entities are not in BioPAX formalisation cadbiomR = dictPhysicalEntity[entityR] cadbiomL = copy.deepcopy(cadbiomR) cadbiomL.cadbiom_name += '_gene' cadbiomL.uri = reaction_uri # /!\ This modifies dictPhysicalEntity in place dictPhysicalEntity[reaction_uri] = cadbiomL # /!\ This modifies dictTransition in place update_transitions( cadbiomL.cadbiom_name, cadbiomR.cadbiom_name, reaction ) elif typeName in regulator_types: continue elif typeName in garbage_types: LOGGER.warning("Type of reaction is deliberately unsupported " "because it is too imprecise: %s - %s", reaction_uri, typeName) else: LOGGER.error("UNEXCEPTED REACTION TYPE: %s - %s" % (reaction_uri, typeName)) raise AssertionError("UNEXCEPTED REACTIONTYPE: %s - %s" % (reaction_uri, typeName)) # /!\ This modifies dictTransition in place build_transitions(dictPhysicalEntity, dictTransition, standard_reactions) return dictTransition
[docs]def filter_controls(controls, pathways_names, blacklisted_entities): """Remove pathways and cofactors from controls and keep others entities. .. note:: Remove also entities that control pathways. .. note:: We want ONLY entities and by default, there are pathways + entities. :param controls: Dict of Contollers. keys: URIs; values: <Control> :param pathways_names: Dict of pathways URIs and names. keys: URIs; values: names (or uri if no name) :param blacklisted_entities: set of entity uris blacklisted :type controls: <dict> :type pathways_names: <dict> :type blacklisted_entities: <set> :return: Filtered controllers dict. :rtype: <dict> """ blacklisted_controllers = pathways_names.viewkeys() | blacklisted_entities # Remove controls for elements that are blacklisted (Ex: Pathways) g = (control for _, control in controls.iteritems() if control.controlled not in blacklisted_controllers) # Remove blacklisted controllers # Note: Also remove controls with no remaining controllers controls = dict() for control in g: pruned_controllers = control.controllers - blacklisted_controllers if pruned_controllers: control.controllers = pruned_controllers controls[control.uri] = control return controls
[docs]def filter_entities(dictPhysicalEntity, blacklisted_entities): """Remove blacklisted entities from BioPAX entities. .. note:: Blacklisted entities are removed from dictPhysicalEntity, from components and from members. :param dictPhysicalEntity: Dictionary of biopax physicalEntities, created by the function query.get_biopax_physicalentities() :param blacklisted_entities: set of entity uris blacklisted :type dictPhysicalEntity: <dict <str>: <PhysicalEntity>> keys: uris; values entity objects :type blacklisted_entities: <set> :return: Dictionary of biopax physicalEntities without blacklisted entities :rtype: <dict <str>: <PhysicalEntity>> """ # No blacklisted entities => nothing to do if not blacklisted_entities: return dictPhysicalEntity filtered_entities = dictPhysicalEntity.viewkeys() - blacklisted_entities dictPhysicalEntityFiltered = \ {entity_uri: dictPhysicalEntity[entity_uri] for entity_uri in filtered_entities} # Remove blacklisted entities from members and components for entity in dictPhysicalEntityFiltered.itervalues(): entity.components_uris -= blacklisted_entities entity.members -= blacklisted_entities return dictPhysicalEntityFiltered
[docs]def filter_reactions(dictReaction, blacklisted_entities): """Remove blacklisted entities from reactions. .. note:: Effects: - productComponent and participantComponent can be set to None - blacklisted entities are removed from leftComponents and rightComponents :param dictReaction: Dictionary of biopax reactions, created by the function query.get_biopax_reactions() :param blacklisted_entities: set of entity uris blacklisted :type dictReaction: <dict <str>: <Reaction>> keys: uris; values reaction objects :type blacklisted_entities: <set> """ # No blacklisted entities => nothing to do if not blacklisted_entities: return dictReaction for reaction in dictReaction.itervalues(): reaction.leftComponents -= blacklisted_entities reaction.rightComponents -= blacklisted_entities if reaction.productComponent in blacklisted_entities: reaction.productComponent = None if reaction.participantComponent in blacklisted_entities: reaction.participantComponent = None return dictReaction
[docs]def load_blacklisted_entities(blacklist_file): """Get all URIs of blacklisted elements in the given file. .. note:: The csv can be written with the following delimiters: ',;'. In the first column we expect the URI, In the second column users can put the corresponding cadbiom name (currently not used). :param: blacklist_file filename. :type: <str> :return: Set of uris. :rtype: <set> """ with open(blacklist_file, 'r') as fd: # Try to detect csv format dialect = csv.Sniffer().sniff(fd.read(1024), delimiters=',;') fd.seek(0) reader = csv.reader(fd, dialect) # Take uri in first position only, right now... return {line[0] for line in reader}
[docs]def createControlFromEntityOnBothSides(dictReaction, dictControl): """Remove entities on both sides of reactions and create a control instead. We believe that these entities present in the reagents and products are in fact a catalysts without which the reaction can not take place. We remove this entity from the reaction and add an `ACTIVATION` controller to the list of BioPAX Controls. .. note:: This function must be called before adding reactions to entities. :param dictReaction: Dictionary of biopax reactions, created by the function query.get_biopax_reactions() :param dictControl: Dictionary of biopax controls, created by the function query.get_biopax_controls() :type dictReaction: <dict <str>: <Reaction>> keys: uris; values reaction objects :type dictControl: <dict <str>: <Control>> keys: uris; values control objects """ for reaction in dictReaction.itervalues(): # get common entity # in reaction.leftComponents AND in reaction.rightComponents for common_entity in reaction.leftComponents & reaction.rightComponents: # remove from reaction the entity on both sides reaction.leftComponents.remove(common_entity) reaction.rightComponents.remove(common_entity) # create a entity control of the reaction # that entity is not in original BioPAX data # /!\ the control uri is composed of reactionUri + entityUri control = Control( reaction.uri + "+" + common_entity, "#entityOnBothSides", # interactionType "ACTIVATION", # controlType reaction.uri, # reaction_uri common_entity # controller ) dictControl[control.uri] = control
[docs]def assign_missing_names(dictPhysicalEntity): """Assign an arbitrary name to entities without displayName The chosen name is the first among the sorted synonyms. Some files do not declare the displayName attribute of their entities. This has important consequences on the merge of similar entities because this process uses this attribute to detect them. cf. CellDesigner, Curie files. """ nb = 0 for entity in dictPhysicalEntity.itervalues(): if not entity.name and entity.synonyms: entity.name = sorted(entity.synonyms)[0] nb += 1 LOGGER.debug("assign_missing_names: to %s entities without displayName", nb)
[docs]def main(params): """Entry point Here we detect the presence of the pickle backup and its settings. If there is no backup or if the user doesn't want to use this functionality, queries are made against the triplestore. Then, we construct a Cadbiom model with all the retrieved data. """ # Set triplestore url cm.SPARQL_PATH = params['triplestore'] # Set the query limitation cm.SPARQL_LIMIT = params['limit'] provenance_uri = params['provenance_uri'] graph_uris = params['graph_uris'] backup_file_status = os.path.isfile(params['backup_file']) show_metrics = params.get('show_metrics') # Remove show_metrics from params (avoid useless invalidation of pickle backup) params = {k: v for k, v in params.iteritems() if k != "show_metrics"} # Pickle and backup file => load queries if params['backup_queries'] and backup_file_status: LOGGER.debug("Loading variables...") (dictPhysicalEntity, dictReaction, dictLocation, dictPathwayName, dictModificationFeatures, dictControl, dictEntities_db_refs, blacklisted_entities, saved_params) = \ dill.load(open(params['backup_file'], "rb")) # Check if given parameters are equal to those loaded from backup if saved_params != params: os.remove(params['backup_file']) LOGGER.info( "The settings are different from those you have previously entered!" ) # Force queries backup_file_status = False # No pickle or not backup file => do queries if not params['backup_queries'] or not backup_file_status: # Load entities to be blacklisted from conditions blacklisted_entities = set() if params['blacklist_file']: blacklisted_entities = \ load_blacklisted_entities(params['blacklist_file']) # Query the SPARQL endpoint LOGGER.info("Query the SPARQL endpoint...") # Remove blacklisted entities from entities dictPhysicalEntity = \ filter_entities( query.get_biopax_physicalentities(graph_uris, provenance_uri), blacklisted_entities ) LOGGER.info("Collect PhysicalEntities [done]") # Remove blacklisted entities from reactions dictReaction = \ filter_reactions( query.get_biopax_reactions(graph_uris, provenance_uri), blacklisted_entities ) LOGGER.info("Collect Reactions [done]") dictLocation = query.get_biopax_locations(graph_uris) LOGGER.info("Collect Locations [done]") dictPathwayName = query.get_biopax_pathways(graph_uris, provenance_uri) LOGGER.info("Collect Pathways [done]") dictModificationFeatures = \ query.get_biopax_modificationfeatures(graph_uris, provenance_uri) LOGGER.info("Collect ModificationFeatures [done]") dictEntities_db_refs = \ query.get_biopax_xrefs(graph_uris, provenance_uri) LOGGER.info("Collect CrossReferences [done]") # Filter cofactors from controls and remove pathways as controllers dictControl = \ filter_controls( query.get_biopax_controls(graph_uris, provenance_uri), dictPathwayName, blacklisted_entities, ) LOGGER.info("Collect Controls [done]") # LOGGING if LOGGER.getEffectiveLevel() == DEBUG: from collections import Counter # Count modificationFeatures nb_modifs = 0 for terms in dictModificationFeatures.itervalues(): nb_modifs += sum(terms.itervalues()) #LOGGER.debug("Modification features: %s", dictModificationFeatures) LOGGER.info("Number of modification features: %s", nb_modifs) # Count reference ids id_refs = list( it.chain(*[ id_refs for db_refs in dictEntities_db_refs.itervalues() for id_refs in db_refs.itervalues()] ) ) # Export cross references in separate files for each external database databases_ids = defaultdict(set) [databases_ids[db].update(ids) for db_refs in dictEntities_db_refs.itervalues() for db, ids in db_refs.iteritems()] for db, ids in databases_ids.iteritems(): # Filename patter: ids_<database_name>_<number_of_ids>.txt with open("ids_%s_%s" % (db, len(ids)) , "w") as f_d: [f_d.write(id + "\n") for id in ids] #LOGGER.debug("Reference ids: %s", dictEntities_db_refs) LOGGER.info("Number of cross references ids: %s; unique: %s", len(id_refs), len(set(id_refs))) # Types of entities LOGGER.info( "\nTypes of entities: %s\n" "Types of reactions: %s\n" "Types of controls: %s\n", Counter(entity.entityType for entity in dictPhysicalEntity.itervalues()), Counter(entity.interactionType for entity in dictReaction.itervalues()), Counter(entity.interactionType for entity in dictControl.itervalues()) ) # Resume of data retrieved from the database LOGGER.info( "\ndictPhysicalEntity: %s\n" "dictReaction: %s\n" "dictLocation: %s\n" "dictPathwayName: %s\n" "dictModificationFeatures: %s\n" "dictControl: %s\n" "dictEntities_db_refs: %s\n", len(dictPhysicalEntity), len(dictReaction), len(dictLocation), len(dictPathwayName), len(dictModificationFeatures), len(dictControl), len(dictEntities_db_refs) ) raw_input('PAUSE') # Pickle but no backup file => save queries if params['backup_queries'] and not backup_file_status: LOGGER.debug("Backup variables...") dill.dump( [ dictPhysicalEntity, dictReaction, dictLocation, dictPathwayName, dictModificationFeatures, dictControl, dictEntities_db_refs, blacklisted_entities, params ], open(params['backup_file'], "wb") ) ## Do the magic... ## Purge & format data # Assign names to entities without displayName attribute assign_missing_names(dictPhysicalEntity) # Remove entities on both sides of reactions and create a control instead createControlFromEntityOnBothSides(dictReaction, dictControl) # Before using locations in cadbiom names we have to process locations names compute_locations_names(dictLocation, params['numeric_compartments']) ## Enrichment # Enrichment of reactions and entities add_reactions_and_controllers_to_entities(dictReaction, dictControl, dictPhysicalEntity) add_controllers_to_reactions(dictReaction, dictControl) # Enrichment of entities with locations (aim: build unique names) add_locations_to_entities(dictPhysicalEntity, dictLocation) # Enrichment of entities with features (aim: build unique names) add_modifications_features_to_entities( dictPhysicalEntity, dictModificationFeatures ) # Enrichment of entities with xrefs add_xrefs_to_entities(dictPhysicalEntity, dictEntities_db_refs) # Transfer class attributes on child entities nb_physical_entities = len(dictPhysicalEntity) transfer_class_attributes_on_child_entities(dictPhysicalEntity, dictPhysicalEntity) LOGGER.info("PhysicalEntities number, before the transfer " "of class attributes: %s; after: %s", nb_physical_entities, len(dictPhysicalEntity)) # Merge duplicated entities merged_entities_mapping = merge_duplicated_entities(dictPhysicalEntity, params['cadbiom_model']) ## Build the model # Handle generic entities (fill membersUsed attribute of entities) detect_members_used(dictPhysicalEntity, params['full_graph']) if show_metrics: get_metrics_from_data(dictControl, dictReaction, dictPhysicalEntity, dictPathwayName) # Develop complexes and classes (fill flat_components attribute of entities) develop_complexes(dictPhysicalEntity) # Compute unique names add_unique_cadbiom_name_to_entities(dictPhysicalEntity) add_cadbiom_names_to_entities(dictPhysicalEntity) # Forge condition for each event attached to a reaction add_conditions_to_reactions(dictReaction, dictPhysicalEntity, dictControl) # Duplicate physical entities that are complexes: 1 new complex per flat component duplicate_complexes(dictPhysicalEntity) LOGGER.info("Duplication of complexes [Done]") ## If you want to debug reactions, its here. ## See the docstring of the module reactions.py # Duplicate reactions by removing genericity of entities LOGGER.info("Building reactions...") new_dictReaction = dict() quarter = int(len(dictReaction) / 4) for index, reaction in enumerate(dictReaction.itervalues(), 1): # Get list of reactions for the given reaction d = update_reactions(reaction, dictPhysicalEntity, merged_entities_mapping) # We must not have an intersection between new reactions and reactions # already processed. assert not d.viewkeys() & new_dictReaction.viewkeys() if quarter and index % quarter == 0: LOGGER.info("Building reactions [%d%%]", 25 * (index / quarter)) new_dictReaction.update(d) LOGGER.info("Building reactions [Done]") # Safely replace dictReaction dictReaction = new_dictReaction # Compute final transitions dictTransition = get_transitions(dictReaction, dictPhysicalEntity) LOGGER.info("Building transitions [Done]") ## Export # Make the Cadbiom model create_cadbiom_model( dictTransition, dictPhysicalEntity, dictReaction, "_".join(graph_uris), # model name params['cadbiom_model'], # model path ) # Trigger the built of a model without SCC. if not params['no_scc_fix']: remove_scc_from_model(params['cadbiom_model'])