# -*- coding: utf-8 -*-
#
# sas.py - SAS assessment for PDB-IHM
#
# Copyright (C) 2019-2025 Arthur Zalevsky, Sai Ganesan, Benjamin M. Webb, Brinda Vallat
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
SAS assessment for PDB-IHM
"""
import os
import re
import subprocess
import numpy as np
import pandas as pd
import requests
import json
from sklearn.linear_model import LinearRegression
from decimal import Decimal
from mmcif_io import GetInputInformation
from subprocess import run
import operator
import logging
from io import StringIO
import sys
import pkgutil
from pathlib import Path
saspath = pkgutil.get_loader('sasciftools').path
saspath = str(Path(saspath).parent)
sys.path.insert(0, saspath)
from mmCif import mmcifIO
import utility
import ihm
[docs]
class SasValidation(GetInputInformation):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.version = self.get_atsas_version()
self.dataset = GetInputInformation.get_dataset_comp(self)
self.imagepath = '../static/images/'
self.saslink = 'https://sasbdb.org/media/sascif/sascif_files/'
# self.sasentry = 'https://sasbdb.org/rest-api/entry/summary/'
self.sasbdb_ids = self.get_sasbdb_ids()
self.sascif_dicts = self.get_sascif_dicts()
self.intensities = self.get_intensities()
self.intensities = self.modify_intensity()
# self.data_dic = self.get_data_from_SASBDB()
[docs]
def get_atsas_version(self, tool: str = 'datcmp') -> str:
""" Get ATSAS version """
line = subprocess.check_output(
[tool, '--version'],
stderr=subprocess.STDOUT, text=True).strip()
q = re.search(f'^{tool}, ATSAS (?P<version>.*)\nCopyright', line)
version = q.group("version")
return version
[docs]
def get_sas_ids(self) -> list:
'''
function to get all SASBDB codes used in the model,
returns a list of SASBDB codes
'''
sas_ids = []
for indx, datatype in enumerate(self.dataset['Database name']):
if str(datatype) == 'SASBDB':
sas_id = self.dataset['Data access code'][indx]
sas_ids.append(sas_id)
return sas_ids
[docs]
def get_sasbdb_ids(self) -> list:
'''
function to get all SASBDB codes used in the model,
returns a list of SASBDB codes
'''
sasbdb_ids = []
for dataset in self.system.orphan_datasets:
if isinstance(dataset.location, ihm.location.SASBDBLocation):
try:
code = dataset.location.access_code
except AttributeError as e:
logging.error('Missing SASBDB accession code')
logging.error(e)
else:
sasbdb_ids.append(code)
return sasbdb_ids
[docs]
def get_sascif_dicts(self):
sascif_dicts = {}
sasCIFIn = mmcifIO.CifFileReader()
for code in self.sasbdb_ids:
sascif_fn = self.get_sascif_file(code, self.cache)
if sascif_fn is not None:
sascif_dicts[code] = sasCIFIn.read(sascif_fn)
return sascif_dicts
[docs]
def check_sascif_dicts(self):
return(['True' for x in self.sascif_dicts.keys()])
[docs]
def get_sascif_file(self, code, output_dir='.'):
'''
get data from SASCIF files
'''
url = f'{self.saslink}{code}.sascif'
fn = Path(output_dir, f'{code}.sascif')
# Check if we already requested the data
if os.path.isfile(fn):
logging.info(f'Found {fn} in cache!')
elif not os.path.isfile(fn):
response = requests.get(url)
response.encoding = 'ascii'
if response.status_code != 200:
logging.error(
f"Unable to fetch data for {code} from SASBDB, "
"please check the entry ID")
fn = None
else:
with open(fn, 'w') as f:
f.write(response.text)
return fn
[docs]
def get_intensities(self) -> dict:
'''
get intensity data from SASCIF file
'''
ints = {}
for code in self.sascif_dicts.keys():
sascif = self.sascif_dicts[code]
main = f'{code}_MAIN'
if main not in sascif:
raise(KeyError(f'Missing MAIN dataset for {code}'))
data = sascif[main]['_sas_scan_intensity']
try:
I_df = pd.DataFrame({
'id': np.array(data['id'], dtype=int),
'Q': np.array(data['momentum_transfer'], dtype=float),
'I': np.array(data['intensity'], dtype=float),
'E': np.array(data['intensity_su_counting'], dtype=float),
})
except:
raise(ValueError("Can't parse intensities from {code}"))
ints[code] = I_df
return ints
[docs]
def modify_intensity(self) -> dict:
'''
modify intensity data to calcualte errors and log values
'''
Int_dict_modify = {}
rg_and_io = self.get_rg_and_io()
for code, I_df in self.intensities.items():
data = self.sascif_dicts[code][f'{code}_MAIN']
unit = data['_sas_scan']['unit']
unitm = self.get_scan_unit_mult(unit)
Rg = rg_and_io[code][0]
IO = rg_and_io[code][1]
dim_num = Rg * Rg / IO
I_df = I_df[I_df['I']-I_df['E'] > 0]
I_df['Q'] = I_df['Q'] * unitm
I_df['err_x'] = I_df.apply(
lambda row: (row['Q'], row['Q']), axis=1)
I_df['err_y'] = I_df.apply(
lambda row: (
np.log10(row['I'] - row['E']),
np.log10(row['I'] + row['E'])),
axis=1)
I_df['logI'] = np.log10(I_df['I'])
I_df['logQ'] = np.log10(I_df['Q'])
I_df['logX'] = I_df.apply(lambda row: (
row['logQ'], row['logQ']), axis=1)
I_df['Ky'] = I_df['Q'] * I_df['Q'] * I_df['I'] * dim_num
I_df['Kx'] = I_df['Q'] * Rg
I_df['Px'] = I_df['Q']**4
I_df['Px'].round(3)
I_df['Py'] = I_df['Px']*I_df['I']
Int_dict_modify[code] = I_df
return Int_dict_modify
[docs]
def get_rg_for_plot(self) -> dict:
'''
get Rg values from SASCIF file, if unavailabel, get it from JSON
'''
Rg_dict = {}
for code in self.sascif_dicts.keys():
sascif = self.sascif_dicts[code]
main = f'{code}_MAIN'
data = sascif[main]['_sas_result']
rgs = []
rg = round(float(data['Rg_from_Guinier']), 2)
rgs.append(rg)
rg = round(float(data['Rg_from_PR']), 2)
rgs.append(rg)
Rg_dict[code] = rgs
return Rg_dict
[docs]
def get_rg_and_io(self) -> dict:
'''
get rg information from SASCIF file
'''
rg_and_io = {}
for code in self.sascif_dicts.keys():
sascif = self.sascif_dicts[code]
main = f'{code}_MAIN'
data = sascif[main]['_sas_result']
rg = float(data['Rg_from_PR'])
io = float(data['I0_from_PR'])
rg_and_io[code] = (rg, io)
return rg_and_io
[docs]
def get_rg_table_many(self) -> dict:
'''
get rg information from multiple SASCIF files
'''
rg_table = {'SASDB ID': [], 'R<sub>g</sub>': [],
'R<sub>g</sub> error': [], 'MW': [], 'MW error': []}
for code in self.sascif_dicts.keys():
sascif = self.sascif_dicts[code]
main = f'{code}_MAIN'
data = sascif[main]['_sas_result']
rg_table['SASDB ID'].append(code)
try:
val = f'{float(data["Rg_from_Guinier"]):.2f} nm'
except ValueError:
val = utility.NA
rg_table['R<sub>g</sub>'].append(val)
try:
val = f'{float(data["Rg_from_Guinier_error"]):.2f} nm'
except ValueError:
val = utility.NA
rg_table['R<sub>g</sub> error'].append(val)
try:
val = f'{float(data["MW_standard"]):.1f} kDa'
except ValueError:
val = utility.NA
rg_table['MW'].append(val)
try:
val = f'{float(data["MW_standard_error"]):.1f} kDa'
except ValueError:
val = utility.NA
rg_table['MW error'].append(val)
return rg_table
[docs]
def get_fits_for_plot(self) -> dict:
'''
get chi-squared values from SASCIF files
'''
fit_dict = {}
for code in self.sascif_dicts.keys():
fits = []
sascif = self.sascif_dicts[code]
for k, data in sascif.items():
if re.search('FIT', k):
chisq = round(float(data['_sas_model_fitting_details']['chi_square']), 2)
fits.append(chisq)
if len(fits) > 0:
fit_dict[code] = fits
return fit_dict
[docs]
def get_pofr(self) -> dict:
'''
get pair-dist distribution from SASCIF files
'''
pofr_dict = {}
for code in self.sascif_dicts.keys():
data = {}
sascif = self.sascif_dicts[code]
main = f'{code}_MAIN'
data = sascif[main]['_sas_p_of_R']
try:
I_df = pd.DataFrame({
'id': np.array(data['id'], dtype=int),
'ordinal': np.array(data['ordinal'], dtype=int),
'R': np.array(data['r'], dtype=float),
'P': np.array(data['P'], dtype=float),
'E': np.array(data['P_error'], dtype=float),
})
except:
raise(ValueError("Can't parse sas_p_of_R from {code}"))
pofr_dict[code] = I_df
return pofr_dict
[docs]
def get_pvals(self) -> dict:
'''
get p-values from ATSAS
'''
num_of_fits = self.get_number_of_fits()
pval_table = {'SASDB ID': [], 'Model': [], 'χ²': [], 'P-value': []}
for code in self.sascif_dicts.keys():
f1fn = Path(self.cache, f'{code}_fit1.dat')
f2fn = Path(self.cache, f'{code}_fit2.dat')
f3fn = Path(self.cache, f'{code}_pval.dat')
sascif = self.sascif_dicts[code]
main = f'{code}_MAIN'
data = sascif[main]
refX = np.array(data['_sas_scan_intensity']['momentum_transfer'], dtype=float)
refY = np.array(data['_sas_scan_intensity']['intensity'], dtype=float)
refS = np.array(data['_sas_scan_intensity']['intensity_su_counting'], dtype=float)
fits = []
c = 0
for k, v in sascif.items():
if re.search('FIT', k):
c += 1
chisq = round(float(v['_sas_model_fitting_details']['chi_square']), 2)
pval_table['SASDB ID'].append(code)
pval_table['Model'].append(c)
fitX = np.array(v['_sas_model_fitting']['momentum_transfer'], dtype=float)
fit_refY = np.array(v['_sas_model_fitting']['intensity'], dtype=float)
fitY = np.array(v['_sas_model_fitting']['fit'], dtype=float)
fit_1 = pd.DataFrame({
'Q': fitX,
'Ie': fit_refY
})
fit_2 = pd.DataFrame({
'Q': fitX,
'Ib': fitY
})
fit_1.to_csv(f1fn, header=False, index=False)
fit_2.to_csv(f2fn, header=False, index=False)
with open(f3fn, 'w+') as f:
run(['datcmp', f1fn, f2fn], stdout=f)
with open(f3fn, 'r') as f:
all_lines = [j.strip().split()
for i, j in enumerate(f.readlines())]
p_val = [all_lines[i+1][4]
for i, j in enumerate(all_lines) if 'adj' in j][0]
for fn in [f1fn, f2fn, f3fn]:
os.remove(fn)
pval_table['P-value'].append('%.2E' % Decimal(p_val))
pval_table['χ²'].append('%.2f' % chisq)
if c == 0:
pval_table['SASDB ID'].append(code)
pval_table['Model'].append(utility.NA)
pval_table['P-value'].append(utility.NA)
return pval_table
[docs]
def get_pofr_ext(self) -> dict:
'''
get pair-distance details from SASCIF files
'''
pofr_dict = {}
for code in self.sascif_dicts.keys():
sascif = self.sascif_dicts[code]
main = f'{code}_MAIN'
data = self.sascif_dicts[code][f'{code}_MAIN']
unit = data['_sas_scan']['unit']
unitm = self.get_scan_unit_mult(unit)
data = sascif[main]['_sas_p_of_R_extrapolated_intensity']
pdf_re = pd.DataFrame({
'Q': np.array(data['momentum_transfer'], dtype=float),
'I': np.array(data['intensity_reg'], dtype=float)
})
pdf_re['Q'] = pdf_re['Q'] * unitm
pdf_re['logI'] = np.log10(pdf_re['I'])
pofr_dict[code] = pdf_re
return pofr_dict
[docs]
def get_pofr_errors(self) -> dict:
'''
get pair-distance details and errors from JSON files
'''
pofr_dict = self.get_pofr_ext()
Int_dict = self.intensities
compiled_dict = {}
for code in self.sascif_dicts.keys():
I_df = Int_dict[code]
I_df_dict = dict(zip(I_df.Q, I_df.I))
I_df_err_dict = dict(zip(I_df.Q, I_df.E))
p_df = pofr_dict[code]
p_df_dict = dict(zip(p_df.Q, p_df.I))
errors = []
for Q, I in p_df_dict.items():
data_Q = self.findMinDiff(list(I_df_dict.keys()), Q)
if data_Q != 9999:
data_I = I_df_dict[data_Q]
delta_I = (I-data_I)
if I_df_err_dict[data_Q] != 0:
wt_delta_I = delta_I/I_df_err_dict[data_Q]
else:
wt_delta_I = 0
errors.append([Q, delta_I, wt_delta_I])
errors_df = pd.DataFrame(errors, columns=['Q', 'R', 'WR'])
compiled_dict[code] = errors_df
return compiled_dict
[docs]
def findMinDiff(self, listn: list, num: int) -> int:
'''
quick min diff operation for calculating errors
'''
list_sub = [(i, abs(j-num)) for i, j in enumerate(listn)]
list_sort = sorted(list_sub, key=operator.itemgetter(1))
if list_sort[0][1] < 0.00001:
return listn[list_sort[0][0]]
else:
return 9999
[docs]
@staticmethod
def get_scan_unit_mult(unit) -> int:
return {'1/A': 10.0, '1/nm': 1.0}[unit]
[docs]
def get_Guinier_data(self) -> (dict, dict):
'''
get Guinier plot data from JSON files
'''
Int_dict = self.intensities
Guinier_dict = {}
Guinier_score = {}
for code, val in Int_dict.items():
data = self.sascif_dicts[code][f'{code}_MAIN']
unit = data['_sas_scan']['unit']
unitm = self.get_scan_unit_mult(unit)
rg = float(data['_sas_result']['Rg_from_PR'])
G_df = val.astype({'Q': float, 'I': float, 'E': float})
G_df['lnI'] = np.log(G_df['I'])
# dmax = float(data_dic[code]['pddf_dmax'])
# index_low = int(data_dic[code]['guinier_point_first'])
# index_high = int(data_dic[code]['guinier_point_last'])
# q_min = math.pi/(dmax*10)
q_max = 1.3 / rg
G_df_range = G_df[G_df['Q'] < q_max].copy()
G_df_range['Q'] = G_df['Q']
G_df_range['Q2'] = G_df_range['Q']**2
X = G_df_range[['Q2']].values
y = G_df_range['lnI'].values
regression = LinearRegression(fit_intercept=True)
regression.fit(X, y)
G_df_range['y_pred'] = regression.predict(X)
G_df_range['res'] = y-regression.predict(X)
# G_df_range['Q2A'] = G_df_range['Q2']
score = '%.2f' % regression.score(X, y)
Guinier_score[code] = score
Guinier_dict[code] = G_df_range
return Guinier_score, Guinier_dict
[docs]
def get_parameters_vol_many(self) -> dict:
'''
get volume details from SASCIF files
'''
parameter_table = {'SASDB ID': [], 'Estimated Volume': [], 'Porod Volume': [], 'Specific Volume': [],
'Sample Contrast': [], 'Sample Concentration': []}
for code in self.sascif_dicts.keys():
sascif = self.sascif_dicts[code]
main = f'{code}_MAIN'
parameter_table['SASDB ID'].append(code)
data = sascif[main]['_sas_sample']
try:
val = f'{float(data["specimen_concentration"]):.2f} mg/mL'
except ValueError:
val = utility.NA
parameter_table['Sample Concentration'].append(val)
try:
val = f'{float(data["contrast"]):.2f}'
except ValueError:
val = utility.NA
parameter_table['Sample Contrast'].append(val)
try:
val = f'{float(data["specific_vol"]):.2f} nm\u00b3'
except ValueError:
val = utility.NA
parameter_table['Specific Volume'].append(val)
data = sascif[main]['_sas_result']
try:
val = f'{float(data["Porod_volume"]):.2f} nm\u00b3'
except ValueError:
val = utility.NA
parameter_table['Porod Volume'].append(val)
try:
val = f'{float(data["estimated_volume"]):.2f} nm\u00b3'
except ValueError:
val = utility.NA
parameter_table['Estimated Volume'].append(val)
return parameter_table
[docs]
def get_parameters_mw_many(self) -> dict:
'''
get MW details from SASCIF files
'''
parameter_table = {'SASDB ID': [], 'Chemical composition MW': [
], 'Standard MW': [], 'Porod Volume/MW': []}
for code in self.sascif_dicts.keys():
sascif = self.sascif_dicts[code]
main = f'{code}_MAIN'
data = sascif[main]['_sas_result']
parameter_table['SASDB ID'].append(code)
try:
val = f'{float(data["experimental_MW"]):.1f} kDa'
except ValueError:
val = utility.NA
parameter_table['Chemical composition MW'].append(val)
try:
val = f'{float(data["MW_standard"]):.1f} kDa'
except ValueError:
val = utility.NA
parameter_table['Standard MW'].append(val)
try:
Porod_MW = round(float(data['MW_Porod']), 2)
Porod_V = round(float(data['Porod_volume']), 2)
val = f'{(Porod_V / Porod_MW):.2f} nm\u00b3/kDa'
except ValueError:
val = utility.NA
parameter_table['Porod Volume/MW'].append(val)
return parameter_table
[docs]
def get_pddf(self) -> dict:
'''
get p(r) data from JSON
'''
pofr_dic = self.get_pofr()
pddf_dic = {}
for key, val in pofr_dic.items():
pd_df = val.astype({'P': float, 'R': float, 'E': float})
pd_df['R'] = pd_df['R']/10
pd_df['err_x'] = pd_df.apply(
lambda row: (row['R'], row['R']), axis=1)
pd_df['err_y'] = pd_df.apply(lambda row: (
row['P']-row['E'], row['P']+row['E']), axis=1)
pddf_dic[key] = pd_df
return pddf_dic
[docs]
def get_pddf_info(self) -> dict:
'''
get p(r) related info from JSON
'''
pddf_info = {'SASDB ID': [], 'Software used': [],
'D<sub>max</sub>': [], 'D<sub>max</sub> error': [], 'R<sub>g</sub>': [], 'R<sub>g</sub> error': []}
for code in self.sascif_dicts.keys():
sascif = self.sascif_dicts[code]
main = f'{code}_MAIN'
data = sascif[main]['_sas_p_of_R_details']
pddf_info['Software used'].append(str(data['software_p_of_R']))
data = sascif[main]['_sas_result']
try:
val = f'{float(data["D_max"]):.3f} nm'
except ValueError:
val = utility.NA
pddf_info['D<sub>max</sub>'].append(val)
try:
val = f'{float(data["Rg_from_PR"]):.3f} nm'
except ValueError:
val = utility.NA
pddf_info['R<sub>g</sub>'].append(val)
try:
val = f'{float(data["Dmax_error"]):.3f} nm'
except ValueError:
val = utility.NA
pddf_info['D<sub>max</sub> error'].append(val)
try:
val = f'{float(data["Rg_from_PR_error"]):.3f} nm'
except ValueError:
val = utility.NA
pddf_info['R<sub>g</sub> error'].append(val)
pddf_info['SASDB ID'].append(code)
return pddf_info
[docs]
def get_number_of_fits(self) -> dict:
'''
get number of fits
'''
num_of_fits = {}
for code in self.sascif_dicts.keys():
c = 0
sascif = self.sascif_dicts[code]
for k in sascif.keys():
if re.search('FIT', k):
c += 1
num_of_fits[code] = c
return num_of_fits
[docs]
def get_total_number_of_fits(self) -> int:
c = 0
for k, v in self.get_number_of_fits().items():
c += v
return c
[docs]
def get_sasdb_code_fits(self) -> list:
'''
get asumber of fits per SASBDB ID
'''
fit_dict = self.get_number_of_fits()
return list(fit_dict.values())
[docs]
def get_fit_data(self) -> dict:
'''
get fit information to make plots
'''
num_of_fits = self.get_number_of_fits()
data_fit = {}
for code in self.sascif_dicts.keys():
sascif = self.sascif_dicts[code]
main = f'{code}_MAIN'
data = sascif[main]
refX = np.array(data['_sas_scan_intensity']['momentum_transfer'], dtype=float)
refY = np.array(data['_sas_scan_intensity']['intensity'], dtype=float)
refS = np.array(data['_sas_scan_intensity']['intensity_su_counting'], dtype=float)
unit = data['_sas_scan']['unit']
unitm = self.get_scan_unit_mult(unit)
num = num_of_fits[code]
fits = {}
c = 0
for k, data in sascif.items():
if re.search('FIT', k):
fitX = np.array(data['_sas_model_fitting']['momentum_transfer'], dtype=float)
fit_refY = np.array(data['_sas_model_fitting']['intensity'], dtype=float)
fitY = np.array(data['_sas_model_fitting']['fit'], dtype=float)
fitS = np.array([refS[np.argmin(np.abs(refX - x))] for x in fitX], dtype=float)
f_df = pd.DataFrame({
'Q': fitX * unitm,
'Ie': fit_refY,
'Ib': fitY,
'E': fitS,
})
f_df['logIe'] = np.log10(f_df['Ie'])
f_df['logIb'] = np.log10(f_df['Ib'])
f_df['r'] = f_df['Ie']-f_df['Ib']
if f_df['E'].isnull().values.any():
f_df['rsigma'] = 0
else:
f_df['rsigma'] = f_df['r']/f_df['E']
f_df['logr'] = f_df['logIe']-f_df['logIb']
f_df['r2a'] = (f_df['Ib']-f_df['Ie'].mean())**2
f_df['r2b'] = (f_df['Ie']-f_df['Ie'].mean())**2
fits[c] = (self.get_fit_r2(f_df), f_df)
c += 1
if c == 0:
fits[0] = (0, pd.DataFrame())
data_fit[code] = fits
return data_fit
[docs]
def get_fit_r2(self, df: pd.DataFrame) -> int:
rsquared = df['r2a'].sum()/df['r2b'].sum()
return round(rsquared, 2)