Source code for smiter.fragmentation_functions

"""Callables for fragmenting molecules.

Upon calling the callabe, a list/np.array of mz and intensities should be returned.
Arguments should be passed via *args and **kwargs
"""
from abc import ABC, abstractmethod
from typing import Dict, List, Tuple, Union
import sys
import shutil
import subprocess
import os
import csv

import numpy as np
import pandas as pd
import pyqms
from loguru import logger
from pyteomics import mass

import smiter
from peptide_fragmentor import PeptideFragment0r
from smiter.ext.nucleoside_fragment_kb import (
    KB_FRAGMENTATION_INFO as pyrnams_nucleoside_fragment_kb,
)
from smiter.lib import calc_mz

try:
    from smiter.ext.nucleoside_fragment_kb import KB_FRAGMENTATION_INFO
except ImportError:  # pragma: no cover
    print("Nucleoside fragmentation KB not available")  # pragma: no cover


[docs]class AbstractFragmentor(ABC): """Summary.""" @abstractmethod def __init__(self): """Summary.""" pass # pragma: no cover
[docs] @abstractmethod def fragment(self, entity): """Summary. Args: entity (TYPE): Description """ pass # pragma: no cover
[docs]class PeptideFragmentor(AbstractFragmentor): """Summary.""" def __init__(self, *args, **kwargs): """Summary.""" logger.info("Initialize PeptideFragmentor") self.args = args self.kwargs = kwargs self.fragger = PeptideFragment0r() # @profile
[docs] def fragment(self, entities): """Summary. Args: entity (TYPE): Description """ if isinstance(entities, str): entities = [entities] frames = [] for entity in entities: # logger.debug(f"Fragment {entity}") results_table = self.fragger.fragment(entity, **self.kwargs) frames.append(results_table) final_table = pd.concat(frames) i = np.array([100 for i in range(len(final_table))]) mz_i = np.stack((final_table["mz"], i), axis=1) return mz_i
[docs]class PeptideFragmentorPyteomics(AbstractFragmentor): def __init__(self, *args, **kwargs): pass # @profile def _fragments(self, peptide, types=("b", "y"), maxcharge=1): for i in range(1, len(peptide) - 1): for ion_type in types: for charge in range(1, maxcharge + 1): if ion_type[0] in "abc": yield mass.fast_mass( peptide[:i], ion_type=ion_type, charge=charge ) else: yield mass.fast_mass( peptide[i:], ion_type=ion_type, charge=charge ) # @profile
[docs] def fragment(self, entities): mz = [] for e in entities: a = list(self._fragments(e)) mz.extend(a) i = np.array([100 for x in range(len(mz))]) mz = np.array(mz) mz_i = np.stack((mz, i), axis=1) return mz_i
[docs]class NucleosideFragmentor(AbstractFragmentor): """Summary.""" def __init__( self, nucleotide_fragment_kb: Dict[str, dict] = None, raise_error_for_non_existing_fragments=True, ): """Summary.""" logger.info("Initialize NucleosideFragmentor") if nucleotide_fragment_kb is None: nucleoside_fragment_kb = pyrnams_nucleoside_fragment_kb self.raise_error_for_non_existing_fragments = ( raise_error_for_non_existing_fragments ) nuc_to_fragments: Dict[str, List[float]] = {} cc = pyqms.chemical_composition.ChemicalComposition() for nuc_name, nuc_dict in nucleoside_fragment_kb.items(): nuc_to_fragments[nuc_name] = [] for frag_name, frag_cc_dict in nucleoside_fragment_kb[nuc_name][ "fragments" ].items(): cc.use(f"+{frag_cc_dict['formula']}") m = cc._mass() nuc_to_fragments[nuc_name].append(calc_mz(m, 1)) self.nuc_to_fragments = nuc_to_fragments
[docs] def fragment( self, entities: Union[list, str], raise_error_for_non_existing_fragments=False ): """Summary. Args: entity (TYPE): Description """ if isinstance(entities, str): entities = [entities] m = [] for entity in entities: if raise_error_for_non_existing_fragments is True: masses = self.nuc_to_fragments[entity] else: masses = self.nuc_to_fragments.get(entity, []) m.extend(masses) # logger.debug(masses) # should overlapping peaks be divided into two very similar ones? m = sorted(list(set(m))) # logger.debug(m) return np.array([(mass, 1) for mass in m])
[docs]class LipidFragmentor(AbstractFragmentor): """Summary.""" def __init__( self, lipid_input_csv: str = None, raise_error_for_non_existing_fragments=True, ): """Use LipidCreator to calculate precursor transitions of lipids.""" self.lip_to_fragments = {} # TODO run lipid fragmenter here, read output file and collect results in dict commands: List[str] = [] if sys.platform == "linux" or sys.platform == "darwin": commands.append("mono") lipid_creator_path = shutil.which("LipidCreator.exe") else: # will this work under windows? lipid_creator_path = "LipidCreator" commands.extend( [lipid_creator_path, "transitionlist", lipid_input_csv, "lipid_output.csv"] ) proc = subprocess.run(commands) with open("lipid_output.csv") as fin: for line in csv.DictReader(fin): if line["PrecursorName"] not in self.lip_to_fragments: self.lip_to_fragments[line["PrecursorName"]] = [] self.lip_to_fragments[line["PrecursorName"]].append( float(line["ProductMz"]) ) os.remove("lipid_output.csv")
[docs] def fragment( self, entities: Union[list, str], raise_error_for_non_existing_fragments=False ): """Summary. Args: entity (TYPE): Description """ if isinstance(entities, str): entities = [entities] m = [] for entity in entities: if raise_error_for_non_existing_fragments is True: masses = self.lip_to_fragments[entity] else: masses = self.lip_to_fragments.get(entity, []) m.extend(masses) # logger.debug(masses) # should overlapping peaks be divided into two very similar ones? m = sorted(list(set(m))) # logger.debug(m) return np.array([(mass, 1) for mass in m])