'''
Functions related to chromatograms and mass tracks.
Use integers for RT scan numbers and intensities.
Flexible binning based on ppm accuracy.
Use mz tol (default 5 pmm) in XIC construction.
XICs without neighbors within x ppm are considered specific (i.e. high selectivity).
Low selectivity regions will be still inspected to determine the true number of XICs.
'''
from operator import itemgetter
import numpy as np
from scipy import interpolate
from scipy.ndimage import uniform_filter1d
from statsmodels.nonparametric.smoothers_lowess import lowess
from .mass_functions import (check_close_mzs,
nn_cluster_by_mz_seeds)
INTENSITY_DATA_TYPE = np.int64
# int32 uses less memory - for large data one can check if int32 is safe,
# i.e. under 2E9, or transform data
# -----------------------------------------------------------------------------
# mass Tracks
# -----------------------------------------------------------------------------
[docs]
def bin_to_mass_tracks(bin_data_tuples, rt_length, mz_tolerance_ppm=5):
'''
Construct mass tracks from data that may exceed proper m/z range (i.e. over mz_tolerance_ppm).
A mass track is an EIC for full RT range, without separating the mass traces.
Imperfect ROIs will be examined in extract_massTracks_ and merged if needed.
Parameters
----------
bin_data_tuples : list[tuple]
a flexible bin by units of 0.001 amu, in format of
[(mz, scan_num, intensity_int), ...]. This may or may not be within mz_tolerance_ppm.
rt_length : int
full number of scans.
mz_tolerance_ppm: float, optional, default: 5
Returns
-------
a list of massTracks as [(mz, np.array(intensities at full rt range)), ...].
See also
--------
extract_single_track_fullrt_length
'''
bin_data_tuples.sort() # by m/z, ascending
mz_range = bin_data_tuples[-1][0] - bin_data_tuples[0][0]
mz_tolerance = bin_data_tuples[0][0] * 0.000001 * mz_tolerance_ppm
# important: double tol_ range here as mean_mz falls in single tol_
if mz_range < mz_tolerance * 2:
return [extract_single_track_fullrt_length(bin_data_tuples, rt_length)]
else:
ROIs = build_chromatogram_by_mz_clustering(bin_data_tuples, rt_length, mz_tolerance)
return [extract_single_track_fullrt_length(R, rt_length) for R in ROIs]
[docs]
def build_chromatogram_intensity_aware(bin_data_tuples, rt_length, mz_tolerance):
'''
Chromatogram builder as in ADAP for testing.
Start with highest intensity, going down by intensity and include data points within mz_tolerance.
Repeat until no track is detected. Without requiring continuous RT,
which is handled in extract_single_track_fullrt_length.
Default in asari is build_chromatogram_by_mz_clustering.
Parameters
----------
bin_data_tuples: list[tuple]
a flexible bin in format of [(mz, scan_num, intensity_int), ...].
rt_length: int
number of scans (NOT USED, remove from call signature in future)
mz_tolerance_ppm: float
m/z tolerance in part-per-million. Used to seggregate m/z regions here.
Returns
-------
assigned:
separated bins of [(mz, scan_num, intensity_int), ...],
prototype of extracted ion chromatograms to be used by extract_single_track_fullrt_length.
'''
bin_data_tuples.sort(key=itemgetter(2), reverse=True)
assigned, remaining = [], bin_data_tuples
while remaining:
seed, tmp_remaining = [remaining[0]], []
for T in remaining[1:]:
if abs(T[0] - seed[0][0]) < mz_tolerance:
seed.append(T)
else:
tmp_remaining.append(T)
assigned.append( seed )
remaining = tmp_remaining
return assigned
[docs]
def build_chromatogram_by_mz_clustering(bin_data_tuples, rt_length, mz_tolerance):
'''
Generates a list of prototype extracted ion chromatograms to be used
by extract_single_track_fullrt_length.
Parameters
----------
bin_data_tuples : list[tuple]
a flexible bin in format of [(mz, scan_num, intensity_int), ...].
rt_length: int
number of scans (NOT USED, remove from call signature in future)
mz_tolerance : float
precomputed based on m/z and ppm, e.g. 5 ppm of 80 = 0.0004; 5 ppm of 800 = 0.0040.
Returns
-------
A list of clusters as separated bins, each bin as [(mz, scan_num, intensity_int), ...]
'''
return nn_cluster_by_mz_seeds(bin_data_tuples, mz_tolerance, presorted=False)
[docs]
def merge_two_mass_tracks(T1, T2):
'''
Merge two mass tracks, each massTrack as ( mz, np.array(intensities at full rt range) ).
Parameters
----------
T1 : list
a massTrack rerpesented as ( mz, np.array(intensities at full rt range))
T2: list
a massTrack rerpesented as ( mz, np.array(intensities at full rt range))
Returns
-------
The merged mass track. The mz value is averaged between tracks, intensities are summed pair-wise.
'''
return ( 0.5 * (T1[0] + T2[0]), T1[1] + T2[1] )
[docs]
def get_thousandth_bins(mzTree, mz_tolerance_ppm=5, min_timepoints=5, min_peak_height=1000):
'''
Bin an mzTree into a list of data bins, to feed to `bin_to_mass_tracks`.
These data bins can form a single mass track, or span larger m/z region
if the m/z values cannot be resolved into discrete tracks here.
Parameters
----------
mzTree: dict[list[tuples]]
indexed data points, {thousandth_mz: [(mz, ii, intensity_int)...], ...}.
(all data points indexed by m/z to thousandth precision, i.e. 0.001 amu).
mz_tolerance_ppm: float, optional, default: 5
m/z tolerance in part-per-million. Used to seggregate m/z regsions here.
min_timepoints: int, optional, default: 5
minimal consecutive scans to be considered real signal.
min_peak_height: float, optional, default: 1000
a bin is not considered if the max intensity < min_peak_height.
Returns
-------
a list of flexible bins, [ [(mz, scan_num, intensity_int), ...], ... ]
'''
def __rough_check_consecutive_scans__(datatuples, gap_allowed=2, min_timepoints=min_timepoints):
# check if the mass trace has at least min_timepoints consecutive scans
# datatuples are a list of data points in format of (mz_int, scan_num, intensity_int)
_checked = True
check_max_len = 4 * min_timepoints # give longer traces a pass for now
if len(datatuples) < check_max_len:
min_check_val = gap_allowed + min_timepoints -1 # step is 4 in five consecutive values without gap
rts = sorted([x[1] for x in datatuples])
steps = [rts[ii]-rts[ii-min_timepoints+1] for ii in range(min_timepoints-1, len(rts))]
if min(steps) > min_check_val:
_checked = False
return _checked
def __check_min_peak_height__(datatuples, min_peak_height):
return max([x[2] for x in datatuples]) >= min_peak_height
tol_ = 0.000001 * mz_tolerance_ppm
ks = sorted([k for k,v in mzTree.items() if len(v) >= min_timepoints]) # ascending order enforced
bins_of_bins = []
tmp = [ks[0]]
for ii in range(1, len(ks)):
_delta = ks[ii] - ks[ii-1]
# merge adjacent bins if they are next to each other or within ppm tolerance
if _delta==1 or _delta < tol_ * ks[ii]:
tmp.append(ks[ii])
else:
bins_of_bins.append(tmp)
tmp = [ks[ii]]
bins_of_bins.append(tmp)
good_bins = []
for bin in bins_of_bins:
datatuples = []
for b in bin:
datatuples += mzTree[b]
# check the presence of min consecutive RT in small traces, to filter out more noises
# e.g. in an example datafile, 5958 reduced to 4511 traces
if __check_min_peak_height__(datatuples, min_peak_height) and __rough_check_consecutive_scans__(datatuples):
good_bins.append(datatuples)
# del mzTree
return good_bins
# -----------------------------------------------------------------------------
# retention time alignment
# -----------------------------------------------------------------------------
[docs]
def rt_lowess_calibration(good_landmark_peaks,
selected_reference_landmark_peaks,
sample_rt_numbers, reference_rt_numbers, num_iterations, sample_name, outdir):
'''
This is the alignment function of retention time between samples.
Use LOWESS, Locally Weighted Scatterplot Smoothing, to create
correspondence between sample_rt_numbers, reference_rt_numbers.
Predicted numbers are skipped when outside real sample boundaries.
Parameters
----------
good_landmark_peaks : list[peak]
landmark peaks selected from this working sample.
Landmark peaks are usually defined by 13C/12C patterns.
selected_reference_landmark_peaks : list[peak]
landmark peaks selected from the reference sample,
matched and equal-length to good_landmark_peaks.
sample_rt_numbers : list
all scan numbers in this sample.
reference_rt_numbers : list
all scan numbers in the ref sample.
sample_name : str
sample's name
Returns
-------
rt_cal_dict : dict
dictionary converting scan number in sample_rt_numbers to calibrated integer values.
Range matched. Only changed numbers are kept for efficiency.
reverse_rt_cal_dict : dict
from ref RT scan numbers to sample RT scan numbers.
Range matched. Only changed numbers are kept for efficiency.
Note
----
LOWESS method available in statsmodels.nonparametric.smoothers_lowess, v 0.12, 0.13+
https://www.statsmodels.org/stable/generated/statsmodels.nonparametric.smoothers_lowess.lowess.html
But xvals have to be forced as float array until the fix is in new release.
See __hacked_lowess__.
'''
# force left and right ends, to prevent runaway curve functions
reference_rt_bound = max(reference_rt_numbers)
sample_rt_bound = max(sample_rt_numbers)
rt_rightend_ = 1.1 * sample_rt_bound
xx, yy = [-0.1 * sample_rt_bound,]*3, [-0.1 * sample_rt_bound,]*3
rt_cal = clean_rt_calibration_points(
[(x[0]['apex'], x[1]['apex']) for x in zip(good_landmark_peaks, selected_reference_landmark_peaks)]
)
xx += [L[0] for L in rt_cal] + [rt_rightend_]*3
yy += [L[1] for L in rt_cal] + [rt_rightend_]*3
# scale frac parameter like a sigmoid of number of data points when len(rt_cal) is in (50,150).
FRAC = 0.6 - 0.004*(len(rt_cal)-50)
FRAC = max(0.2, min(FRAC, 0.6)) # bound frac in (0.2, 0.6)
# This requires statsmodels > v 0.12.
# float conversion on xvals is to bypass a bug in statsmodels, which was fixed today 2022-01-27
#lowess_predicted = lowess(yy, xx, frac= .2, it=1, xvals=np.array(sample_rt_numbers, dtype=float))
#
# downgrade now for compatibility to older statsmodels
lowess_predicted = __hacked_lowess__(yy, xx, frac=FRAC, it=num_iterations, xvals=sample_rt_numbers)
interf = interpolate.interp1d(lowess_predicted, sample_rt_numbers, fill_value="extrapolate")
ref_interpolated = interf( reference_rt_numbers )
lowess_predicted = [int(round(ii)) for ii in lowess_predicted]
rt_cal_dict = dict(
[(x,y) for x,y in zip(sample_rt_numbers, lowess_predicted) if x!=y and 0<=y<=reference_rt_bound] )
ref_interpolated = [int(round(ii)) for ii in ref_interpolated]
reverse_rt_cal_dict = dict(
[(x,y) for x,y in zip(reference_rt_numbers, ref_interpolated) if x!=y and 0<=y<=sample_rt_bound] )
return rt_cal_dict, reverse_rt_cal_dict
[docs]
def clean_rt_calibration_points(rt_cal_pairs):
'''
Remove redundant RT calibration data points and outliers (out of 3x stdev).
This does not force even distribution of calibration data points.
Parameters
----------
rt_cal_pairs : list of paired scan numbers from this sample and from the reference sample.
Returns
-------
rt_cal_pairs : clean and sorted version of rt_cal_pairs.
'''
rt_cal_pairs = [(x, x[0]-x[1]) for x in set(rt_cal_pairs)]
_deltas = [x[1] for x in rt_cal_pairs]
m, std3x = np.mean(_deltas), np.std(_deltas) * 3
_low, _high = m-std3x, m+std3x
return sorted([x[0] for x in rt_cal_pairs if _low < x[1] < _high])
[docs]
def rt_lowess_calibration_debug(good_landmark_peaks,
selected_reference_landmark_peaks,
sample_rt_numbers, reference_rt_numbers, num_iterations, sample_name, outdir):
'''
This is the debug version of rt_lowess_calibration.
Parameters
----------
good_landmark_peaks : list[peak]
landmark peaks selected from this working sample.
Landmark peaks are usually defined by 13C/12C patterns.
selected_reference_landmark_peaks : list[peak]
landmark peaks selected from the reference sample,
matched and equal-length to good_landmark_peaks.
sample_rt_numbers : list
all scan numbers in this sample.
reference_rt_numbers : list
all scan numbers in the ref sample.
sample_name : str
sample's name
Returns
-------
rt_cal_dict : dict
dictionary converting scan number in sample_rt_numbers to calibrated integer values.
Range matched. Only changed numbers are kept for efficiency.
reverse_rt_cal_dict : dict
from ref RT scan numbers to sample RT scan numbers.
Range matched. Only changed numbers are kept for efficiency.
Outputs
-------
A PDF file showing the LOWESS regression result.
Note
----
LOWESS method available in statsmodels.nonparametric.smoothers_lowess, v 0.12, 0.13+
https://www.statsmodels.org/stable/generated/statsmodels.nonparametric.smoothers_lowess.lowess.html
But xvals have to be forced as float array until the fix is in new release.
See __hacked_lowess__.
'''
import matplotlib.pyplot as plt
import os
# force left and right ends, to prevent runaway curve functions
reference_rt_bound = max(reference_rt_numbers)
sample_rt_bound = max(sample_rt_numbers)
rt_rightend_ = 1.1 * sample_rt_bound
xx, yy = [-0.1 * sample_rt_bound,]*3, [-0.1 * sample_rt_bound,]*3
rt_cal = clean_rt_calibration_points(
[(x[0]['apex'], x[1]['apex']) for x in zip(good_landmark_peaks, selected_reference_landmark_peaks)]
)
xx += [L[0] for L in rt_cal] + [rt_rightend_]*3
yy += [L[1] for L in rt_cal] + [rt_rightend_]*3
# scale frac parameter like a sigmoid of number of data points when len(rt_cal) is in (50,150).
FRAC = 0.6 - 0.004*(len(rt_cal)-50)
FRAC = max(0.2, min(FRAC, 0.6)) # bound frac in (0.2, 0.6)
# This requires statsmodels > v 0.12.
# float conversion on xvals is to bypass a bug in statsmodels, which was fixed today 2022-01-27
#lowess_predicted = lowess(yy, xx, frac= .2, it=1, xvals=np.array(sample_rt_numbers, dtype=float))
#
# downgrade now for compatibility to older statsmodels
lowess_predicted = __hacked_lowess__(yy, xx, frac=FRAC, it=num_iterations, xvals=sample_rt_numbers)
interf = interpolate.interp1d(lowess_predicted, sample_rt_numbers, fill_value="extrapolate")
ref_interpolated = interf( reference_rt_numbers )
lowess_predicted = [int(round(ii)) for ii in lowess_predicted]
rt_cal_dict = dict(
[(x,y) for x,y in zip(sample_rt_numbers, lowess_predicted) if x!=y and 0<=y<=reference_rt_bound] )
ref_interpolated = [int(round(ii)) for ii in ref_interpolated]
reverse_rt_cal_dict = dict(
[(x,y) for x,y in zip(reference_rt_numbers, ref_interpolated) if x!=y and 0<=y<=sample_rt_bound] )
plt.figure()
# export result figures for lowess regression
# Create the scatter plot and line plot
plt.scatter(xx[3:-3], yy[3:-3], label='Landmark peaks', color='black', marker='o')
plt.plot(sample_rt_numbers, lowess_predicted, label='Alignment function', color='red', linestyle='-')
# Customize your plot
plt.xlabel('Sample Retention Time (sec)')
plt.ylabel('Reference Retention Time (sec)')
plt.title('Retention Time Alignment')
plt.legend()
# Save the figure as a PNG file
plt.savefig(os.path.join(outdir, 'export', sample_name + '_rtime_alignment_result.png'))
return rt_cal_dict, reverse_rt_cal_dict
def __hacked_lowess__(yy, xx, frac, it, xvals):
'''
This was workaround of a problem with Statsmodel 0.13, which was dealt by casting xvals as floats.
Not checking possible range error.
The bug was reported and fixed in newer Statsmodel, thus this can be phased out in future.
'''
lxy = lowess(yy, xx, frac, it)
newx, newy = list(zip(*lxy))
interf = interpolate.interp1d(newx, newy)
return interf(xvals)
[docs]
def savitzky_golay_spline(good_landmark_peaks, selected_reference_landmark_peaks, sample_rt_numbers, reference_rt_numbers):
'''
Placeholder.
Modified Savitzky-Golay filter followed by spline fitting - pls follow format in rt_lowess.
Because our data are not equally spaced, sav-gol method may produce undesired errors.
# UnivariateSpline can't handle redundant values -
spl = UnivariateSpline(xx, yy, )
sample.rt_calibration_function = spl
# rt_remap_dict will be used for index mapping to the reference sample;
for ii in sample.rt_numbers:
sample.rt_remap_dict[ii] = round(spl(ii), None)
'''
pass
[docs]
def dwt_rt_calibrate(good_landmark_peaks, selected_reference_landmark_peaks, sample_rt_numbers, reference_rt_numbers):
'''
Placeholder.
Not implemented.
'''
pass
[docs]
def remap_intensity_track(intensity_track, new, rt_cal_dict):
'''
Remap intensity_track based on rt_cal_dict, used by constructors.MassGrid.remap_intensity_track.
Parameters
----------
intensity_track : list[int]
list of intensity values from a mass track.
new : np.zeros
new copy of np.zeros of RT length, possible longer than intensity_track,
because samples may have different RT lengthes.
rt_cal_dict : dict
sample specific mapping dictionary of RT.
dictionary converting scan number in sample_rt_numbers to calibrated integer values.
Returns
-------
Updated list of intensity, using coordinates in composite mass track.
'''
new[ :intensity_track.shape[0]] = intensity_track.copy()
for k,v in rt_cal_dict.items():
new[v] = intensity_track[k]
return new
# -----------------------------------------------------------------------------
# smoothing functions
# -----------------------------------------------------------------------------
[docs]
def smooth_moving_average(list_intensity, size=9):
'''
Smooth data of a noisy mass track using simple moving average.
Parameters
----------
list_intensity : list[int]
list of intensity values from a mass track.
size : int, optional, default: 9
window size for moving average.
Returns
-------
New list of smoothed intensity values.
Note
----
For very noise data, one may use smooth_lowess.
'''
return uniform_filter1d(list_intensity, size, mode='nearest')
[docs]
def smooth_lowess(list_intensity, frac=0.02):
'''
Smooth data of a very noisy mass track via LOWESS regression.
Parameters
----------
list_intensity : list[int]
list of intensity values from a mass track.
frac : float, optional, default: 0.02
fraction of data used in LOWESS regression.
Returns
-------
New list of smoothed intensity values.
Note
----
smooth_moving_average is preferred for most data. LOWESS is not good for small peaks.
'''
lxy = lowess(list_intensity, range(len(list_intensity)), frac=frac, it=1)
_, newy = list(zip(*lxy))
return newy