# -*- coding: utf-8 -*-
# Copyright (C) 2017-2020 IRISA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# The original code contained here was initially developed by:
#
# Pierre Vignet.
# IRISA
# Dyliss team
# IRISA Campus de Beaulieu
# 35042 RENNES Cedex, FRANCE
"""
Module used to create a hierarchically-clustered heatmap of boundaries.
"""
from __future__ import unicode_literals
from __future__ import print_function
# Standard imports
import itertools as it
import os
import glob
import csv
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing as mp
try:
import pandas as pd
import seaborn as sns
except ImportError as e:
print(
"ImportError:", e,
". 'seaborn' and 'pandas' packages are required for this task;\n"
"You might want to install 'cadbiom_cmd' with the following command:\n"
"'pip install cadbiom_cmd[heatmaps]'",
)
exit()
# Library imports
from tools.solutions import get_all_macs
import cadbiom.commons as cm
LOGGER = cm.logger()
[docs]def queries_2_clustermap(output_dir, path, *args, **kwargs):
"""Entry point for queries_2_clustermap
Create a hierarchically-clustered heatmap of boundaries in mac files.
:param output_dir: Output path.
:param path: Filepath/directory of a/many complete solutions files.
:type output_dir: <str>
:type path: <str>
"""
# Check valid input file/directory
assert os.path.isfile(path) or os.path.isdir(path)
if os.path.isdir(path):
# Recursive search of *mac* files
# (mac.txt, mac_complete.txt, mac_step.txt)
path = path if path[-1] == "/" else path + "/"
# Multiprocessing
with ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor:
futures_and_output = {
executor.submit(payload, output_dir, filepath): filepath
for filepath in glob.glob(path + "*mac.txt") # Job name
}
nb_errors = 0
nb_done = 0
for future in as_completed(futures_and_output):
job_name = futures_and_output[future]
# Display results or exceptions if any
if future.exception() is not None:
LOGGER.error(
"%s generated an exception: \n%s", job_name, future.exception()
)
nb_errors += 1
else:
# The end
LOGGER.info("%s... \t\t[Done]", job_name)
nb_done += 1
LOGGER.info("Files processed: %s", nb_errors + nb_done)
assert nb_errors + nb_done != 0, "No *mac.txt files found!"
LOGGER.info("Ending: %s errors, %s done\nbye.", nb_errors, nb_done)
else:
payload(output_dir, path)
[docs]def payload(output_dir, filepath):
"""Make a clustermap based on an occurrence matrix for the given solution file
:param output_dir: Output path.
:param filepath: Solution filepath.
:type output_dir: <str>
:type filepath: <str>
"""
try:
matrix_file_path = write_matrix(filepath, output_dir)
draw_matrix_heatmap(open_dataframe(matrix_file_path), matrix_file_path)
except AssertionError:
import traceback
print(traceback.format_exc())
raise
except ValueError:
import traceback
print(traceback.format_exc())
raise
[docs]def write_matrix(filepath, output_dir):
"""Make an occurrence matrix of boundaries found in the given solution file
Example of CSV produced:
- Columns: Frontier places
- Lines: Solution with a '1' in columns corresponding to an occurrence
of the frontier place.
.. code-block:: text
solution_number;boundary_1;boundary_2;...
1;0;1;...
2;1;0;...
:param filepath: Solution filepath.
:param output_dir: Output path.
:type filepath: <str>
:type output_dir: <str>
:return: Filepath of the CSV file produced. Filename is of the form
`<solution_file>_sol_matrix.csv`
:rtype: <str>
"""
# Return a set of all MAC LINES from a directory or from a file
mac_places = tuple(frozenset(mac.split()) for mac in get_all_macs(filepath))
# Get all frontier places from the dataset
frontier_places = set(it.chain(*mac_places))
# Add _decomp to the solution filename
filename = os.path.basename(os.path.splitext(filepath)[0])
filename = filename.replace("_mac", "")
matrix_file_path = output_dir + filename + "_sol_matrix.csv"
with open(matrix_file_path, "w") as f_d_sols:
# Write headers (boundaries)
writer_sols = csv.DictWriter(
f_d_sols,
delimiter=str(";"),
restval=0, # default value for frequency
fieldnames=["solution_number"] + sorted(frontier_places),
)
writer_sols.writeheader()
for i, mac in enumerate(mac_places, 1):
row_places = {
common_frontier_place: 1
for common_frontier_place in mac & frontier_places
}
row_places["solution_number"] = i
writer_sols.writerow(row_places)
return matrix_file_path
[docs]def open_dataframe(filepath):
"""Get Pandas dataframe from CSV file
Because yes, pandas knows to open a CSV file (not like R). It's awesome.
Don't teach this in bio-info please. You should always prefer complex and
legacy technologies it makes you smart (especially for the first ones ><).
https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html
:return: Pandas dataframe
:rtype: <pandas.core.frame.DataFrame>
"""
return pd.read_csv(
filepath,
sep=str(";"),
encoding="utf-8",
index_col=0, # Column to use as the row labels
)
[docs]def draw_matrix_heatmap(df, filepath):
"""Draw and save clustermap from the given dataframe
:param df: Pandas dataframe
:param filepath: Filepath of the matrix. Used to build the SVG file.
:type df: <pandas.core.frame.DataFrame>
:type filepath: <str>
"""
# Center (don't shrink legends) the figure
# Scale all fonts in your legend and on the axes.
# => Without this, one label on two is printed due to too big fontsize...
## TODO: Find a way to dynamically set the fontsize of xticks
sns.set(font_scale=0.6, rc={"xtick.labelsize": 8 if df.shape[1] < 90 else 5})
sns.set_style("ticks")
# Pour colorer les problèmes entre eux (histoire de voir qu'ils sont bien
# regroupés lors de la phase de clustering)
# http://seaborn.pydata.org/examples/structured_heatmap.html
# (arg: {row,col}_colors)
# Clustering default config:
# method='single' (Nearest Point Algorithm), metric='euclidean'
clustergrid = sns.clustermap(
# Comparaison des solutions entre elles
# (pas des places vs les solutions où on devrait utiliser df.corr())
# => utilisation direct du dataframe
df,
# The value at which to center the colormap when plotting divergent data
# center=0,
# The mapping from data values to color space. If not provided,
# the default will depend on whether center is set. vlag, Blues, coolwarm
# sns.color_palette("PuBu", 10)
cmap="Blues",
xticklabels=True,
yticklabels=False,
# standard_scale=1,
# metric="correlation",
# method='average',
figsize=(15, 15), # x, y
)
# Customization
# Top figure title
clustergrid.fig.suptitle("ClusterMap of <" + filepath + ">", fontsize=15)
# Mask row dendrogram
clustergrid.ax_row_dendrogram.set_visible(False)
# Mask colorbar
clustergrid.cax.set_visible(False)
# Axes
clustergrid.ax_heatmap.set_ylabel("Solutions", fontsize=12)
clustergrid.ax_heatmap.set_xlabel("Boundaries", fontsize=12)
clustergrid.ax_heatmap.set_xticklabels(
clustergrid.ax_heatmap.get_xticklabels(),
rotation=45,
horizontalalignment="right",
fontweight="light",
)
# Remove the colorbar
# it's 0 or 1 for the occurrences, we don't use a correlation matrix;
# so colobar is useless
clustergrid.cax.set_visible(False)
# Export
filename = os.path.basename(os.path.splitext(filepath)[0])
# bug: don't print the grid in png... :(
# clustergrid.savefig(
# os.path.dirname(filepath) + "/" + filename + "_clustermap.png",
# dpi=1200
# )
clustergrid.savefig(
os.path.dirname(filepath) + "/" + filename + "_clustermap.svg", dpi=1200
)