'''
To-do: add extended_adducts to function arguments.
'''
from .model import *
[docs]class khipu_diagnosis(Khipu):
'''Added diagnostic and exploratory functions to khipu class.
They should be run after khipu.build_khipu().
'''
[docs] def show_trimming(self):
'''Show what nodes are removed from input_network. After running self.build_khipu.
'''
print("nodes_to_use: ", self.nodes_to_use)
print("redundant_nodes: ", self.redundant_nodes)
[docs] def build_diagnostic_tree(self, input_network, depth_limit = 10):
'''Build diagnostic tree using input nodes.
Updates
-------
self.diagnostic_tree as an instance of treelib.Tree.
Examples
--------
(tree here is diagnostic_tree)
>>> KP = model.khipu()
>>> KP.build_tree(big[0], peak_dict, edge_dict)
>>>
>>> KP.tree.show()
116.0707@138.3
└── 117.0741@138.5
├── 120.084@136.8
└── 121.0874@136.8
>>> KP = model.khipu()
>>>
>>> KP.build_tree(big[10], peak_dict, edge_dict)
[]
>>>
>>> KP.tree.show()
85.0744@110.6
└── 126.1009@110.6
└── 127.1043@110.6
├── 121.0842@109.9
├── 121.0842@110.6
├── 122.0876@109.7
│ ├── 120.0808@109.9
│ └── 120.0808@110.6
├── 122.0876@110.4
├── 124.0943@110.8
└── 125.0976@110.6
Note:
A minimum_spanning_tree will have all necessary patterns to cover full khipu, but not unique pattern.
'''
T = nx.minimum_spanning_tree(input_network)
peak_dict = self.feature_dict
tree = treelib.Tree()
root_node = self.root
tree.create_node(make_peak_tag(peak_dict[root_node]), root_node, data=peak_dict[root_node])
for node in T[root_node]:
tree.create_node(make_peak_tag(peak_dict[node]), node, parent=root_node, data=peak_dict[node])
remaining = [E for E in T.edges() if root_node not in E]
while remaining and depth_limit > 0:
tmp = []
for edge in remaining:
if edge[0] in tree:
node = edge[1]
tree.create_node(make_peak_tag(peak_dict[node]), node, parent=edge[0], data=peak_dict[node])
elif edge[1] in tree:
node = edge[0]
tree.create_node(make_peak_tag(peak_dict[node]), node, parent=edge[1], data=peak_dict[node])
else:
tmp.append(edge)
remaining = tmp
depth_limit -= 1
if remaining:
print(remaining)
return tree
def build_diagnostic_tree_full(self):
self.diagnostic_tree = self.build_diagnostic_tree(self.input_network)
print("Diagnostic tree with all input nodes: ")
self.diagnostic_tree.show()
def build_diagnostic_tree_clean(self):
self.clean_tree = self.build_diagnostic_tree(self.clean_network)
print("Minimal khipu tree: ")
self.clean_tree.show()
def build_khipu_tree(self):
peak_dict = self.feature_dict
tree = treelib.Tree()
root_node = str(round(self.neutral_mass, 4))
tree.create_node(str(root_node), root_node)
# tree depth = 2, defined by khipu
for n in self.khipu_grid.columns:
pdata = peak_dict.get(n, {})
plabel = n
if pdata:
plabel = make_peak_tag(pdata)
tree.create_node(plabel, n, parent=root_node, data=pdata)
for m in self.khipu_grid.index:
b = self.khipu_grid.loc[m, n]
if b:
tree.create_node(make_peak_tag(peak_dict[b]), b, parent=n, data=peak_dict[b])
self.khipu_tree = tree
print("Aligned khipu tree: ")
self.khipu_tree.show()
[docs] def snap_features_to_grid_by_best_mz(self, expected_grid_mz_values):
'''Alternative method to create khipu_grid, by best matching each feature to the expected_grid_mz_values.
This has major problems when there's confusion btw close values,
as expected_grid_mz_values can also be shifted by measurement error in root m/z.
'''
expected = np.array(expected_grid_mz_values).T
khipu_grid = pd.DataFrame( np.empty(expected.shape, dtype=np.str),
index=self.isotope_index,
columns=self.adduct_index,
dtype=str)
for x in self.sorted_mz_peak_ids:
ii = np.argmin(abs(expected - x[0]))
khipu_grid.iloc[np.unravel_index(ii, expected.shape)] = x[1]
return khipu_grid
# -----------------------------------------------------------------------------
[docs]def local_read_file(infile,
start_col=3,
end_col=6,
isotope_search_patterns=isotope_search_patterns,
adduct_search_patterns=adduct_search_patterns,
mz_tolerance_ppm=5,
rt_tolerance=2, ):
'''The input feature table must be a tab delimited file, with the first four columns as:
ID, m/z, retention_time, intensity.
Example data at '../testdata/full_Feature_table.tsv'.
'''
print("Working on file ", infile)
flist = read_features_from_text(open(infile).read(),
id_col=0, mz_col=1, rtime_col=2, intensity_cols=(start_col, end_col), delimiter="\t"
)
subnetworks, peak_dict, edge_dict = peaks_to_networks(flist,
isotope_search_patterns,
adduct_search_patterns,
mz_tolerance_ppm,
rt_tolerance
)
return subnetworks, peak_dict, edge_dict
[docs]def khipu_annotate(args):
'''Automated pre-annotation using Khipu on a feature table.
args as from parser.parse_args().
One can follow the steps here to construct a custom workflow,
using scripting or notebooks.
'''
adduct_patterns = adduct_search_patterns
delta_mz = PROTON
if args.mode == 'neg':
adduct_patterns = adduct_search_patterns_neg
delta_mz = -PROTON
subnetworks, peak_dict, edge_dict = local_read_file(infile=args.input,
start_col=args.start,
end_col=args.end,
isotope_search_patterns=isotope_search_patterns,
adduct_search_patterns=adduct_patterns,
mz_tolerance_ppm=args.ppm,
rt_tolerance=args.rtol,
)
WV = Weavor(peak_dict, isotope_search_patterns=isotope_search_patterns,
adduct_search_patterns=adduct_patterns,
mz_tolerance_ppm=args.ppm, mode=args.mode)
print("\n")
print("Initial khipu search grid: ")
print(WV.mzgrid)
print("\n")
khipu_list = graphs_to_khipu_list(
subnetworks, WV, mz_tolerance_ppm=args.ppm,
)
khipu_list, all_assigned_peaks = extend_khipu_list(khipu_list, peak_dict,
extended_adducts, mz_tolerance_ppm=args.ppm,
rt_tolerance=args.rtol)
print("\n\n ~~~~~~ Got %d khipus, with %d features ~~~~~~~ \n\n"
%(len(khipu_list), len(all_assigned_peaks)))
empCpds = export_empCpd_khipu_list(khipu_list)
outfile = 'khipu_test_empricalCompounds.json'
if args.output:
outfile = args.output + '.json'
with open(outfile, 'w', encoding='utf-8') as f:
json.dump(empCpds, f, ensure_ascii=False, indent=2)
singleton_ion = list(WV.mzgrid.columns)[0]
export_khipu_table(outfile.replace(".json", ".tsv"), peak_dict, empCpds, singleton_ion, delta_mz)
[docs]def peaklist_to_khipu_list(peaklist,
isotope_search_patterns=isotope_search_patterns,
adduct_search_patterns=adduct_search_patterns,
mz_tolerance_ppm=5,
rt_tolerance=2,
mode='pos'):
'''A wrapper function easier for local data analysis.
peaklist : list of input peaks/features in JSON notion as defined in Khipu or MetDataModel.
return khipu_list, all_assigned_peaks
'''
subnetworks, peak_dict, edge_dict = peaks_to_networks(peaklist,
isotope_search_patterns,
adduct_search_patterns,
mz_tolerance_ppm,
rt_tolerance
)
WV = Weavor(peak_dict, isotope_search_patterns=isotope_search_patterns,
adduct_search_patterns=adduct_search_patterns,
mz_tolerance_ppm=mz_tolerance_ppm,
mode=mode)
print("\n")
print("Initial khipu search grid: ")
print(WV.mzgrid)
print("\n")
khipu_list = graphs_to_khipu_list(
subnetworks, WV, mz_tolerance_ppm=mz_tolerance_ppm,
)
khipu_list, all_assigned_peaks = extend_khipu_list(khipu_list, peak_dict,
extended_adducts, mz_tolerance_ppm=mz_tolerance_ppm,
rt_tolerance=rt_tolerance)
return khipu_list, all_assigned_peaks
[docs]def graphs_to_khipu_list(subnetworks, WeavorInstance, mz_tolerance_ppm):
'''Generate full khipu_list from subnetworks,
including iterative khipus based on features pruned out of initial subnetwork.
'''
khipu_list, khipu_list_valid = [], []
for g in subnetworks:
if g.number_of_edges() > 0:
KP = Khipu(g)
KP.build_khipu(WeavorInstance, mz_tolerance_ppm)
khipu_list.append(KP)
while KP.redundant_nodes and KP.pruned_network and KP.pruned_network.edges():
# make sure pruned_network is connected
more_subnets = [KP.pruned_network.subgraph(c).copy()
for c in nx.connected_components(KP.pruned_network)]
for _G in more_subnets:
KP = Khipu(_G)
KP.build_khipu(WeavorInstance, mz_tolerance_ppm)
khipu_list.append(KP)
# assign IDs
ii = 0
for KP in khipu_list:
if KP.valid:
base_mz = str(round(KP.neutral_mass, 4))
ii += 1
KP.id = 'kp' + str(ii) + "_" + str(base_mz)
khipu_list_valid.append(KP)
return khipu_list_valid
[docs]def extend_khipu_list(khipu_list, peak_dict, adduct_search_patterns_extended,
mz_tolerance_ppm=5, rt_tolerance=2):
'''Update khipus by extended adduct search.
Returns updated khipu_list and list_assigned_peaks.
'''
list_assigned_peaks = []
for KP in khipu_list:
list_assigned_peaks += KP.nodes_to_use
unassigned_peaks = [v for x,v in peak_dict.items() if x not in list_assigned_peaks]
mztree = build_centurion_tree(unassigned_peaks)
for KP in khipu_list:
added_peaks = KP.extended_search(mztree,
adduct_search_patterns_extended, mz_tolerance_ppm, rt_tolerance)
list_assigned_peaks += added_peaks
return khipu_list, set(list_assigned_peaks)
def export_json_khipu_list(khipu_list):
J = []
for KP in khipu_list:
J.append(KP.export_json())
return J
[docs]def export_empCpd_khipu_list(khipu_list):
'''Export all khipus in khipu_list to a list of empirical compounds, which is JSON compatible.
Wrapper of khipu.format_to_epds().
A small number of features are "undetermined", as they came due to initial edges but violate DAG rules.
They are sent off for a new khipu.
'''
J = []
for KP in khipu_list:
J.append(KP.format_to_epds(id=KP.id))
return J
[docs]def export_khipu_table(outfile, peak_dict, json_khipu_list, singleton_ion='M+H+', delta_mz=1.0073):
'''Write tab delimited file of khipu_list.
delta_mz is +proton if pos mode, -protone if neg mode, for singletons.
'''
header = ['id', 'mz', 'rtime', 'interim_id', 'neutral_formula_mass', 'isotope', 'modification', 'ion_relation']
s = '\t'.join(header).replace('interim_id', 'empCpd') + '\n'
in_khipu_list = []
for J in json_khipu_list:
for F in J["MS1_pseudo_Spectra"]:
in_khipu_list.append(F['id'])
s += '\t'.join([
F['id'], str(F['mz']), str(F['rtime']), J['interim_id'], str(J['neutral_formula_mass']),
F.get('isotope', ''), F.get('modification', ''), F['ion_relation']
]) + '\n'
# other peaks
for k,F in peak_dict.items():
if k not in in_khipu_list:
s += '\t'.join([
F['id'], str(F['mz']), str(F['rtime']), 'singleton', str(round(F['mz']-delta_mz,4)),
'M0', singleton_ion, ''
]) + '\n'
with open(outfile, 'w') as f:
f.write(s)