"""Core functionality."""
import csv
from io import TextIOWrapper
from tempfile import _TemporaryFileWrapper
from loguru import logger
from smiter.params.default_params import default_mzml_params, default_peak_properties
PROTON = 1.00727646677
[docs]def calc_mz(mass: float, charge: int):
"""Calculate m/z.
Args:
mass (TYPE): Description
charge (TYPE): Description
"""
mass = float(mass)
charge = int(charge)
calc_mz = (mass + (charge * PROTON)) / charge
return calc_mz
[docs]def check_mzml_params(mzml_params: dict) -> dict:
"""Summary.
Args:
mzml_params (dict): Description
Returns:
dict: Description
Raises:
Exception: Description
"""
logger.info("Checking mzML params")
for default_param, default_value in default_mzml_params.items():
# param not set and default param required
if (mzml_params.get(default_param, None) is None) and (default_value is None):
raise Exception(f"mzml parameter {default_param} is required by not set!")
elif mzml_params.get(default_param, None) is None:
mzml_params[default_param] = default_value
return mzml_params
[docs]def check_peak_properties(peak_properties: dict) -> dict:
"""Summary.
Args:
peak_properties (dict): Description
Returns:
dict: Description
Raises:
Exception: Description
"""
logger.info("Checking peak properties")
for mol, properties in peak_properties.items():
for default_param, default_value in default_peak_properties.items():
if (properties.get(default_param, None) is None) and (
default_value is None
):
raise Exception(
f"mzml parameter {default_param} is required by not set!"
)
elif properties.get(default_param, None) is None:
properties[default_param] = default_value
return peak_properties
[docs]def csv_to_peak_properties(csv_file):
logger.info(f"Read peak properties from {csv_file}")
peak_properties = {}
with open(csv_file) as fin:
reader = csv.DictReader(fin)
for line_dict in reader:
cc = line_dict["chemical_formula"]
tn = line_dict["trivial_name"]
peak_properties[tn] = {
"trivial_name": line_dict["trivial_name"],
"chemical_formula": cc,
"charge": int(line_dict.get("charge", 2)),
"scan_start_time": float(line_dict["scan_start_time"]),
# currently only gaussian peaks from csv
"peak_function": "gauss_tail",
# "peak_function": "gauss",
"peak_params": {"sigma": float(line_dict.get("sigma", 2))},
"peak_scaling_factor": float(line_dict["peak_scaling_factor"]),
"peak_width": float(line_dict.get("peak_width", 30)),
}
return peak_properties
[docs]def peak_properties_to_csv(peak_properties, csv_file):
logger.info(f"Write peak properties to {csv_file}")
if not isinstance(csv_file, TextIOWrapper):
csv_file = open(csv_file, "w")
csv_filename = csv_file.name
fieldnames = [
"chemical_formula",
"trivial_name",
"charge",
"scan_start_time",
"peak_width",
"peak_scaling_factor",
"peak_function",
"peak_params",
]
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for trivial_name, attribs in peak_properties.items():
line = {
"chemical_formula": peak_properties[trivial_name].get(
"chemical_formula", ""
),
# "trivial_name": peak_properties[cc].get("trivial_name", ""),
"trivial_name": trivial_name,
"charge": peak_properties[trivial_name].get("charge", 2),
"scan_start_time": peak_properties[trivial_name]["scan_start_time"],
"peak_function": peak_properties[trivial_name]["peak_function"],
"peak_params": ",".join(
[
f"{key}={val}"
for key, val in peak_properties[trivial_name]["peak_params"].items()
]
),
"peak_scaling_factor": peak_properties[trivial_name].get(
"peak_scaling_factor", 1e3
),
"peak_width": peak_properties[trivial_name]["peak_width"],
}
writer.writerow(line)
csv_file.close()
return csv_filename