import os, sys
import h5py
import numpy as np
import pytraj as pt
from .. import utils
from .. import printit, config
__all__ = [
"Trajectory",
"MisatoTraj",
]
[docs]class Trajectory(pt.Trajectory):
"""
This class represents the base-class for trajectory handling in the Nearl package.
This class inherits from pytraj.Trajectory.
Attributes
----------
top_filename : str
The topology file of the trajectory
traj_filename : str
The trajectory file of the trajectory
mask : str
The mask to select the atoms to be saved
mask_indices : np.ndarray
The indices of the atoms selected by the mask
atoms : np.ndarray
The per-atom index of the trajectory
residues : np.ndarray
The per-residue index of the trajectory
Methods
-------
identity()
Return the identity of the trajectory
copy_traj()
Return a copy of the trajectory object
make_index()
Prepare the per-atom/per-residue index for the further trajectory processing
write_frame()
Save the trajectory to the file
add_dummy_points()
Add additional points to a frame for visual inspection
Notes
-----
Three types of trajectory initialization are supported:
1. File-based trajectory initialization (traj_src and pdb_src are strings)
2. Pytraj-based trajectory initialization (traj_src is pytraj.Trajectory)
3. Self-based trajectory initialization (traj_src is self)
All structures are regarded as trajectories (static PDB is view as a trajectory with only 1 frame).
Examples
--------
>>> from nearl.io import Trajectory
>>> traj = Trajectory("traj.nc", "top.pdb")
"""
def __init__(self, traj_src = None, pdb_src = None, **kwarg):
"""
Initialize the trajectory object with the trajectory and topology files
Parameters
----------
traj_src : trajectory_like
The trajectory like or filename to be loaded
pdb_src : topology_like
The topology like or filename to be loaded
"""
# Set the keyword arguments for slicing/masking trajectory;
stride = kwarg.get("stride", None)
frame_indices = kwarg.get("frame_indices", None)
mask = kwarg.get("mask", None)
if config.verbose():
printit(f"{self.__class__.__name__}: Loading trajectory {traj_src} with topology {pdb_src}")
printit(f"{self.__class__.__name__}: stride: {stride}; frame_indices: {frame_indices}; mask: {mask}")
# NOTE: If both stride and frame_indices are given, stride will be respected;
# NOTE: If none of stride or frame_indices are given, all frames will be loaded;
if isinstance(traj_src, str) and isinstance(pdb_src, str):
# File name-based trajectory initialization
tmptraj = pt.load(traj_src, pdb_src, stride=stride, frame_indices=frame_indices, mask=mask)
timeinfo = tmptraj.time
boxinfo = tmptraj._boxes
elif isinstance(traj_src, str) and (pdb_src is None):
# In the case that the trajectory is self-consistent e.g. PDB file
tmptraj = pt.load(traj_src, mask=mask)
timeinfo = tmptraj.time
boxinfo = tmptraj._boxes
elif isinstance(traj_src, (pt.Trajectory, self.__class__)):
# Pytraj or self-based trajectory initialization
if mask is not None:
tmptraj = traj_src[mask]
else:
tmptraj = traj_src
timeinfo = tmptraj.time
boxinfo = tmptraj._boxes
elif (traj_src is None) and (pdb_src is None):
# Initialize an empty object
super().__init__()
return
else:
raise ValueError(f"Invalid input for traj source ({type(traj_src)}) and pdb_src ({type(pdb_src)})")
# NOTE: Adding mask in the first pt.load function causes lose of time information
# if mask is not None:
# tmptraj = tmptraj[mask]
top = tmptraj.top
xyz = tmptraj.xyz
assert tmptraj.top.n_atoms == tmptraj.xyz.shape[1], f"The number of atoms in the topology and the coordinates should be the same, rather than {tmptraj.top.n_atoms} and {tmptraj.xyz.shape[1]}"
# Set basic attributes for pytraj.Trajectory;
super().__init__(xyz=xyz, top=top, velocity=tmptraj.velocities, force=tmptraj.forces)
self._boxes = boxinfo
self.time = timeinfo
self._life_holder = tmptraj._life_holder
self._frame_holder = tmptraj._frame_holder
# Non-pytraj attributes to facilitate further trajectory processing;
self.top_filename = pdb_src
self.traj_filename = traj_src
self.mask = mask
if "identity" in kwarg.keys():
self.identity_ = kwarg["identity"]
else:
self.identity_ = None
if config.verbose() or config.debug():
printit(f"The identity of this trajectory is: {self.identity}", file=sys.stderr)
# Prepare the per-atom/per-residue index for the further trajectory processing;
self.atoms = None
self.residues = None
self.make_index()
# def __getitem__(self, index):
# # Get the return from its parent pt.Trajectory;
# self._life_holder = super().__getitem__(index)
# if isinstance(self._life_holder, pt.Frame):
# pass
# else:
# self._life_holder.top_filename = self.top_filename
# self._life_holder.traj_filename = self.traj_filename
# self._life_holder.mask = self.mask
# self._life_holder.make_index()
# return self._life_holder
@property
def identity(self):
"""
Return the identity of the trajectory used for metadata retrieval.
Returns
-------
str
By default, it returns the trajectory file name
"""
if self.identity_ is not None:
return self.identity_
else:
return self.traj_filename
[docs] def copy_traj(self):
"""
Return a copy of the trajectory object
"""
xyzcopy = self.xyz.copy()
topcopy = self.top.copy()
thecopy = pt.Trajectory(xyz=xyzcopy, top=topcopy,
velocity=self.velocities.copy() if self.velocities else None,
force=self.forces.copy() if self.velocities else None)
thecopy._boxes = self._boxes
thecopy.time = self.time
thecopy._life_holder = self._life_holder
thecopy._frame_holder = self._frame_holder
return thecopy
[docs] def make_index(self):
"""
Prepare the per-atom/per-residue index for the further trajectory processing;
"""
self.atoms = np.array([i for i in self.top.atoms])
self.residues = np.array([i for i in self.top.residues])
[docs] def write_frame(self, frames, outfile="", mask="", **kwarg):
"""
Save the trajectory to the file
Parameters
----------
frames : int, list_list or slice
The frame indices to be saved
outfile : str
The file name to save the trajectory
mask : str
The mask to select the atoms to be saved
kwarg : dict
Additional keyword arguments for the pytraj.save function
Examples
--------
>>> from nearl.io import Trajectory
>>> traj = Trajectory("traj.nc", "top.pdb")
>>> traj.write_frame(0, "frame0.pdb", mask=":1-10")
"""
if isinstance(frames, int):
# Write one frame to the file
frame_coords = np.array([self.xyz[frames]])
elif isinstance(frames, (list, tuple)):
tmp_indices = np.array(frames, dtype=int)
frame_coords = self.xyz[tmp_indices]
elif isinstance(frames, (slice, np.ndarray)):
tmp_indices = np.arange(self.n_frames)[frames]
frame_coords = self.xyz[tmp_indices]
if len(mask) > 0:
atom_sel = self.top.select(mask)
frame_coords = frame_coords[:, atom_sel, :]
top = self.top[atom_sel]
else:
top = self.top
if len(outfile) > 0:
tmp_traj = pt.Trajectory(xyz=frame_coords, top=top)
pt.save(outfile, tmp_traj, overwrite=True, **kwarg)
[docs] def add_dummy_points(self, coordinates, elements = None, frame_idx=0, outfile=""):
"""
Add additional points to a frame for visual inspection
Parameters
----------
coordinates : list
The list of coordinates to be added
elements : str or list
The list of element symbols to be added
frame_idx : int
The frame index to add the dummy points
outfile : str
The file name to save the new trajectory
"""
if elements is None:
elements = ["H"] * len(coordinates)
elif isinstance(elements, str):
elements = [elements] * len(coordinates)
elif isinstance(elements, (list, tuple)):
assert len(elements) == len(coordinates), "The length of elements should be the same as the coordinates"
elements = list(elements)
newframe = pt.Frame(self[frame_idx])
newtop = pt.Topology(self.top)
if self.residues is not None:
maxid = self.residues[-1].index
else:
maxid = [i.index for i in self.traj.top.residues][-1]
print(f"Before: {newframe.xyz.shape}")
for i, c in enumerate(coordinates):
therid = (maxid + i + 2)
theatom = pt.Atom(name='CL', charge=0.04, mass=17.0, resname="BOX", type="H", resid=therid)
theres = pt.Residue(name='BOX', resid=therid, chainID=2)
newtop.add_atom(theatom, theres)
newframe.append_xyz(np.array([c]).astype(np.float64))
print(f"After addition newframe {newframe.xyz.shape}, {newtop.n_atoms}")
thexyz = np.array([newframe.xyz])
# Write the new trajectory to the file
if len(outfile) > 0:
newtraj = pt.Trajectory(xyz=thexyz, top=newtop)
pt.save(outfile, newtraj, overwrite=True)
[docs]class MisatoTraj(Trajectory):
"""
Built-in implementation of the Misato HDF5 trajectory.
Original paper:
Siebenmorgen, T., Menezes, F., Benassou, S., Merdivan, E., Kesselheim, S., Piraud, M., Theis, F.J., Sattler, M. and Popowicz, G.M., 2023. MISATO-Machine learning dataset of protein-ligand complexes for structure-based drug discovery. bioRxiv, pp.2023-05.
Attributes
----------
pdbcode : str
The PDB code as the identity of the trajectory
topfile : str
The topology file of the trajectory
trajfile : str
The trajectory file of the trajectory
Notes
-----
The trajectory type has to be manually defined when pushing to the trajectory loader (see example).
Solvents and ions are stripped for the alignment of the coordinates with the topology.
This module uses relative path to the `misatodir` to retrieve the trajectory. The following files are required to load the trajectory:
1. The topology file ({misatodir}/parameter_restart_files_MD/{pdbcode}/production.top.gz)
2. The trajectory file ({misatodir}/MD.hdf5)
.. tip::
Since there is no explicit annotation for the ligand part, we use a ligand indices map to
extract the ligand part of the protein.
Examples
--------
>>> from nearl.io import TrajectoryLoader, MisatoTraj
>>> trajs = [('5WIJ', '/MieT5/DataSets/misato_database/'),
('4ZX0', '/MieT5/DataSets/misato_database/'),
('3EOV', '/MieT5/DataSets/misato_database/'),
('4K6W', '/MieT5/DataSets/misato_database/'),
('1KTI', '/MieT5/DataSets/misato_database/')
]
>>> trajloader = TrajectoryLoader(trajs, trajtype=MisatoTraj)
"""
def __init__(self, pdbcode, misatodir, **kwarg):
"""
Initialize the MisatoTraj object with the PDB code and the Misato directory.
Parameters
----------
pdbcode : str
The PDB code of the trajectory to be loaded
misatodir : str
The directory of the Misato MD simulation output
Notes
-----
The trajectory is firstly read as a pytraj.Trajectory object and then converted to the Nearl Trajectory object.
"""
# Needs dbfile and parm_folder;
self.topfile = f"{misatodir}/parameter_restart_files_MD/{pdbcode.lower()}/production.top.gz"
if not os.path.exists(self.topfile):
raise FileNotFoundError(f"The topology file of PDB {pdbcode} is not found ({self.topfile})")
self.trajfile = os.path.join(misatodir, f"MD.hdf5")
if not os.path.exists(self.trajfile):
raise FileNotFoundError(f"The trajectory file is not found ({self.trajfile})")
# NOTE: Get the PDB code in the standard format, lowercase and replace superceded PDB codes
self.pdbcode = pdbcode
if config.verbose():
printit(f"{self.__class__.__name__}: Loading trajectory {pdbcode} with topology {self.topfile}")
top = pt.load_topology(self.topfile)
# ! IMPORTANT: Remove water and ions to align the coordinates with the topology
res = set([i.name for i in top.residues])
if "WAT" in res:
top.strip(":WAT")
if "Cl-" in res:
top.strip(":Cl-")
if "Na+" in res:
top.strip(":Na+")
if config.verbose():
printit(f"{self.__class__.__name__}: Topology loaded with {top.n_atoms} atoms")
with h5py.File(self.trajfile, "r") as hdf:
keys = hdf.keys()
if pdbcode.upper() in keys:
coord = hdf[f"/{pdbcode.upper()}/trajectory_coordinates"]
if config.verbose():
printit(f"{self.__class__.__name__}: Trajectory loaded with {coord.shape[0]} frames and {coord.shape[1]} atoms")
# Parse frames (Only one from stride and frame_indices will take effect) and masks
if "stride" in kwarg.keys() and kwarg["stride"] is not None:
slice_frame = np.s_[::int(kwarg["stride"])]
elif "frame_indices" in kwarg.keys() and kwarg["frame_indices"] is not None:
slice_frame = np.s_[kwarg["frame_indices"]]
else:
slice_frame = np.s_[:]
if "mask" in kwarg.keys() and kwarg["mask"] is not None:
slice_atom = np.s_[top.select(kwarg["mask"])]
top = top[slice_atom]
else:
slice_atom = np.s_[:]
ret_traj = pt.Trajectory(xyz=coord[slice_frame, slice_atom, :], top=top)
else:
raise ValueError(f"Not found the key for PDB code {pdbcode.upper()} in the HDF5 trajectory file.")
if kwarg.get("superpose", False):
if kwarg.get("mask", None) is not None:
printit(f"{self.__class__.__name__}: Superpose the trajectory with mask {kwarg['mask']}")
pt.superpose(ret_traj, mask="@CA")
else:
printit(f"{self.__class__.__name__}: Superpose the trajectory with default mask @CA")
pt.superpose(ret_traj, mask="@CA")
assert ret_traj.xyz.shape.__len__() == 3 , f"Fatal: The shape of the trajectory {self.identity} is {ret_traj.xyz.shape}!!! It should be (n_frames, n_atoms, 3). "
printit("Result traj: ", ret_traj)
# Pytraj trajectory-based initialization
super().__init__(ret_traj)
@property
def identity(self):
"""
Return the PDB code as the identity of the trajectory
Returns
-------
str
The PDB code of the trajectory
"""
return utils.get_pdbcode(self.pdbcode)