#!/usr/bin/env python
# Copyright (C) 2015 University of Southern California and
# Nan Hua
#
# Authors: Nan Hua
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Prerequests:
# IMP 2.5 is required for this module
__author__ = "Nan Hua"
__credits__ = ["Nan Hua","Ke Gong","Harianto Tjong"]
__license__ = "GPL"
__version__ = "0.0.1"
__email__ = "nhua@usc.edu"
import matrix as alabmatrix #alabmatrix
import utils as alabutils #alabutils
import time
import numpy as np
import IMP
import IMP.core
import IMP.container
import IMP.algebra
import IMP.atom
import random
import h5py
import logging
import io #system io
import re
[docs]class tadmodel(object):
"""
A wrapper to do IMP modeling in TAD level beads
Parameters
----------
probfile : alab.matrix.contactmatrix instant
probability matrix at TAD level, alab.matrix.contactmatrix hdf5 format is required
nucleusRadius : float
radius of nucleus, default 5000(nm)
contactRange : int
folds for surface-surface contact coefficient
level : loglevel,
default:None will record everything during caculation
debug, info, warning, error, critical are supported
"""
def __init__(self,probfile,nucleusRadius=5000.0,chromosomeOccupancy=0.2,contactRange=1,level=None):
self.probmat = alabmatrix.contactmatrix(probfile)
self.nbead = len(self.probmat)
#setup log
LEVELS={'debug':logging.DEBUG,'info':logging.INFO,'warning':logging.WARNING,'error':logging.ERROR,'critical':logging.CRITICAL}
loglevel = LEVELS.get(level,logging.NOTSET)
self.logger = logging.getLogger()
self.logger.setLevel(loglevel)
self._log_capture_string = io.StringIO()
chhandler = logging.StreamHandler(self._log_capture_string)
chhandler.setLevel(loglevel)
self.logger.addHandler(chhandler)
self.logger.setLevel(loglevel)
#CONST
#rscale = 1.38 # 20% occupancy
self.occupancy = chromosomeOccupancy #chromosome occupancy in nucleus, defined as diploid_domain_total_volume/nuclear_volume
self.nucleusRadius = nucleusRadius # nm
#cdensity = 107.45 # bp/nm assuming 197 bp/nucleosomes and 6 nucleosome/11 nm
#kscale = (0.75*15**2)**(1.0/3.0) # 3/4*r**2 where r=15nm
self.contactRange = contactRange # surface to surface distance scale of (r1+r2)
# for which 2 beads are considered as contact
self.genome = alabutils.genome(self.probmat.genome)
rho = self.occupancy*self.nucleusRadius**3/(2*sum(self.genome.info['length']))
#get radius of each bead
self.beadRadius = [(rho * (index['end'] - index['start'])) ** (1.0/3.0) for index in self.probmat.idx]
#self.beadRadius = [rscale * kscale * ((index['end'] - index['start'])/cdensity) ** (1.0/3.0) for index in self.probmat.idx]
#calculate the total volumn of DNA (diploid) and nucleus
dnavol = sum(4. * 3.1415/3. * np.array(self.beadRadius)**3) * 2
nucvol = (4*3.1415/3)*self.nucleusRadius**3
#And chromosome occupancy
dnaocc = dnavol / nucvol
self.logger.debug(u'Occupancy: %.2f with Rnuc %d'%(dnaocc,self.nucleusRadius))
#diploid Rb; 2xtotal haploid beads
self.beadRadius = self.beadRadius + self.beadRadius
# Chromosome territory apply
#self.genome = alabutils.genome(self.probmat.genome)
cscale=1.0
chrvol = nucvol * self.genome.info['length']/sum(self.genome.info['length'])/2
self.chromRadius=cscale*((chrvol/4*3/3.1415)**(1./3.))
#record starting time
self.model = IMP.Model()
self.chain = IMP.container.ListSingletonContainer(self.model)
self.restraints = IMP.RestraintSet(self.model)
#IMP.set_check_level(IMP.USAGE)
IMP.set_check_level(IMP.NONE)
IMP.set_log_level(IMP.SILENT)
#setup nucleus envelope
self.center = IMP.algebra.Vector3D(0,0,0)
#=========================
[docs] def updateScoringFunction(self,restraintset=None):
if restraintset is None:
self.scoringFunction = IMP.core.RestraintsScoringFunction(self.restraints)
else:
self.scoringFunction = IMP.core.RestraintsScoringFunction(restraintset)
return self.scoringFunction
[docs] def cache_coordinates(self):
i = -1
for p in self.chain.get_particles():
i += 1
pattr = IMP.core.XYZR(p)
self.xyz[i,0] = pattr.get_x()
self.xyz[i,1] = pattr.get_y()
self.xyz[i,2] = pattr.get_z()
self.r[i] = pattr.get_radius()
#---
[docs] def set_coordinates(self,coordinates=None):
if coordinates is None:
randomCoordinates = True
random.seed()
randomCap = IMP.algebra.Sphere3D(self.center, 1.5*self.nucleusRadius)
else:
randomCoordinates = False
self.xyz = np.zeros((2*self.nbead,3))
self.r = np.zeros((2*self.nbead,1))
for i in range(2*self.nbead):
if (randomCoordinates):
coor = IMP.algebra.get_random_vector_in(randomCap)
else:
coor = IMP.algebra.Vector3D(coordinates[i,0],coordinates[i,1],coordinates[i,2])
sph = IMP.algebra.Sphere3D(coor,self.beadRadius[i])
p0 = IMP.Particle(self.model)
self.chain.add(p0)
ma = IMP.atom.Mass.setup_particle(p0,1) #set mass = 1 g/mol
sp = IMP.core.XYZR.setup_particle(p0,sph)
sp.set_coordinates_are_optimized(True)
#-
self.cache_coordinates()
#--
[docs] def set_excludedVolume(self,ksping=1,slack=10):
# Set up excluded volume
self.excludedVolumeRestraint = IMP.core.ExcludedVolumeRestraint(self.chain,ksping,slack)
self.restraints.add_restraint(self.excludedVolumeRestraint) #1
return self.excludedVolumeRestraint
[docs] def set_nucleusEnvelope(self,kspring):
# Set up Nucleus cap #
ubnuc = IMP.core.HarmonicUpperBound(self.nucleusRadius,kspring)
ssnuc = IMP.core.DistanceToSingletonScore(ubnuc,self.center) #center-to-center distance
self.nucleusEnvelopeRestraint = IMP.container.SingletonsRestraint(ssnuc,self.chain)
self.restraints.add_restraint(self.nucleusEnvelopeRestraint) #2
return self.nucleusEnvelopeRestraint
[docs] def set_consecutiveBeads(self,lowprob=0.1):
# Setup consecutive bead upper bound constraints
self.consecutiveBeadRestraints = self._get_consecutiveBeadRestraints(lowprob=lowprob,kspring=10)
for rs in self.consecutiveBeadRestraints: #3
self.restraints.add_restraint(rs)
self.logger.debug(u"Total consecutive bead restraints %d"%(len(self.consecutiveBeadRestraints)))
print "Total consecutive bead restraints %d"%(len(self.consecutiveBeadRestraints))
return self.consecutiveBeadRestraints
[docs] def set_fmaxRestraints(self,kspring=1):
self.fmaxRestraints = self._get_fmaxRestraints(kspring=kspring)
for rs in self.fmaxRestraints: #4
self.restraints.add_restraint(rs)
self.logger.debug(u"Total fmax restraints %d"% (len(self.fmaxRestraints)))
print "Total fmax restraints %d"% (len(self.fmaxRestraints))
return self.fmaxRestraints
#=========================chromosome territory functions
[docs] def CondenseChromosome(self,rrange=0.5):
"""
Collapse chains around centromere beads
Parameters
----------
rrange : float
scale parameter in [0,1] for the radius limit
"""
i = -1
for chrom in self.genome.info['chrom']:
i += 1
#find centromere
cenbead = np.flatnonzero((self.probmat.idx['chrom'] == chrom) & (self.probmat.idx['flag'] == 'CEN'))[0]
p0A=IMP.core.XYZ(self.chain.get_particles()[cenbead]) #fetch indexes
p0B=IMP.core.XYZ(self.chain.get_particles()[cenbead+self.nbead])
coorA = p0A.get_coordinates()
coorB = p0B.get_coordinates()
rlimit = self.chromRadius[i]*rrange
for j in np.flatnonzero(self.probmat.idx['chrom'] == chrom):
p1A=IMP.core.XYZ(self.chain.get_particles()[j])
p1B=IMP.core.XYZ(self.chain.get_particles()[j+self.nbead])
dx=(2*random.random()-1)*rlimit
dy=(2*random.random()-1)*rlimit
dz=(2*random.random()-1)*rlimit
randA = coorA
randA[0] += dx
randA[1] += dy
randA[2] += dz
dx=(2*random.random()-1)*rlimit
dy=(2*random.random()-1)*rlimit
dz=(2*random.random()-1)*rlimit
randB = coorB
randB[0] += dx
randB[1] += dy
randB[2] += dz
p1A.set_coordinates(randA) #placed nearby cen
p1B.set_coordinates(randB) #placed nearby cen
#--
#--
#=============================end chromosome terriory
#=============================restraint set functions
def _get_beadDistanceRestraint(self,bead1,bead2,dist,kspring=1):
"""
get distance upper bound restraint to bead1 and bead2
Parameters
-----------
bead1,bead2 : int
bead id
dist : int
distance upper bound
kspring : int
harmonic constant k
Return
------
restraint
"""
restraintName = "Bead (%d,%d) : %f k = %.1f" % (bead1,bead2,dist,kspring)
ds = IMP.core.SphereDistancePairScore(IMP.core.HarmonicUpperBound(dist,kspring))
pr = IMP.core.PairRestraint(self.model,ds,IMP.ParticlePair(self.chain.get_indexes()[bead1],self.chain.get_indexes()[bead2]),restraintName)
return pr
def _get_consecutiveBeadRestraints(self,lowprob=0.1,kspring=10):
"""
calculate distance constraints to consecutive beads
Parameters
-----------
lowprob : Min probility for consecutive beads
"""
consecRestraints = []
for i in range(self.nbead-1):
if self.probmat.idx[i]['chrom'] != self.probmat.idx[i+1]['chrom']:
continue
p = max(self.probmat.matrix[i,i+1],lowprob)
b1 = i;b2 = i+1
b3 = b1 + self.nbead
b4 = b2 + self.nbead
#calculate upper bound for consecutive domains
consecDist = consecutiveDistanceByProbability(self.beadRadius[b1],self.beadRadius[b2],p,self.contactRange+1)
# set k = 10 to be strong interaction
rs1 = self._get_beadDistanceRestraint(b1,b2,consecDist,kspring)
rs2 = self._get_beadDistanceRestraint(b3,b4,consecDist,kspring)
#push restraint into list
consecRestraints.append(rs1)
consecRestraints.append(rs2)
if i>0 and self.probmat.idx[i]['chrom'] == self.probmat.idx[i-1]['chrom'] and self.probmat.idx[i]['flag']!="domain" and self.probmat.idx[i-1]!="gap":
p = max(self.probmat.matrix[i-1,i+1],lowprob)
b1 = i-1;b2 = i+1
b3 = b1 + self.nbead
b4 = b2 + self.nbead
consecDist = consecutiveDistanceByProbability(self.beadRadius[b1],self.beadRadius[b2],p,self.contactRange+1)
rs1 = self._get_beadDistanceRestraint(b1,b2,consecDist,kspring)
rs2 = self._get_beadDistanceRestraint(b3,b4,consecDist,kspring)
consecRestraints.append(rs1)
consecRestraints.append(rs2)
#---
#-------
return consecRestraints
#--------------probmat restraints
def _get_minPairRestraints(self,bpair,dist,minnum,kspring = 1):
"""
Return restraint decorater of min pair restraints
for minnum out of bpairs are satisfied
Parameters
-----------
bpair : tuple list
contact pair candidates
dist : int
distance upperbound for contact
minnum : int
minimun number of pairs required to satisify
contactRange : int
scale of (r1+r2) where a contact is defined
"""
ambi = IMP.container.ListPairContainer(self.model)
restraintName = "Bead [ "
for p in bpair:
restraintName += "(%d,%d) " % (p[0],p[1])
p0 = self.chain.get_particles()[p[0]]
p1 = self.chain.get_particles()[p[1]]
pair = IMP.ParticlePair(p0,p1)
ambi.add(pair)
restraintName += "] : %f k = %.1f" % (dist,kspring)
ds = IMP.core.SphereDistancePairScore(IMP.core.HarmonicUpperBound(dist,kspring))
minpr = IMP.container.MinimumPairRestraint(ds,ambi,minnum,restraintName)
return minpr
def _get_fmaxRestraints(self,kspring=1):
"""
return restraints list for prob=1.0
"""
fmaxrs = []
for i in range(self.nbead):
for j in range(i+1,self.nbead):
if self.probmat.matrix[i,j] <= 0.999:
continue
if self.probmat.idx[i]['chrom'] == self.probmat.idx[j]['chrom']: #intra
if j-i > 1:
rs1 = self._get_beadDistanceRestraint(i,j,self.contactRange*(self.beadRadius[i]+self.beadRadius[j]),kspring)
rs2 = self._get_beadDistanceRestraint(i+self.nbead,j+self.nbead,self.contactRange*(self.beadRadius[i]+self.beadRadius[j]),kspring)
fmaxrs.append(rs1)
fmaxrs.append(rs2)
else: #inter
bpair = [(i,j),(i,j+self.nbead),(i+self.nbead,j),(i+self.nbead,j+self.nbead)] #bead pair
minprrs = self._get_minPairRestraints(bpair,self.contactRange*(self.beadRadius[i]+self.beadRadius[j]),minnum=2,kspring=kspring)
fmaxrs.append(minprrs)
#--
#--
#--
return fmaxrs
def _get_contactRestraints(self,actdist,kspring=1):
"""
return restraints list given actdist list
Parameters
----------
actdist : array like
Activation distanct array [i,j,dist]
"""
intracontactrs = []
intercontactrs = []
for (b1,b2,dcutoff) in actdist:
lastdist1 = surfaceDistance(self.chain,(b1,b2))
lastdist2 = surfaceDistance(self.chain,(b1+self.nbead,b2+self.nbead))
if self.probmat.idx[b1]['chrom'] == self.probmat.idx[b2]['chrom']: #intra chromosome
if max(lastdist1,lastdist2) <= dcutoff:
rs1 = self._get_beadDistanceRestraint(b1,b2,self.contactRange*(self.beadRadius[b1]+self.beadRadius[b2]),kspring)
rs2 = self._get_beadDistanceRestraint(b1+self.nbead,b2+self.nbead,self.contactRange*(self.beadRadius[b1]+self.beadRadius[b2]),kspring)
intracontactrs.append(rs1)
intracontactrs.append(rs2)
elif min(lastdist1,lastdist2) <= dcutoff:
bpair = [(b1,b2),(b1+self.nbead,b2+self.nbead)] #bead pair
minprrs = self._get_minPairRestraints(bpair,self.contactRange*(self.beadRadius[b1]+self.beadRadius[b2]),minnum=1)
intracontactrs.append(minprrs)
#else none contact enfored
else:
lastdist3 = surfaceDistance(self.chain,(b1,b2+self.nbead))
lastdist4 = surfaceDistance(self.chain,(b1+self.nbead,b2))
sdists = sorted([lastdist1,lastdist2,lastdist3,lastdist4])
if sdists[1] <= dcutoff:
nchoose = 2
elif sdists[0] <= dcutoff:
nchoose = 1
else: #non enforced
continue
bpair = [(b1,b2),(b1,b2+self.nbead),(b1+self.nbead,b2),(b1+self.nbead,b2+self.nbead)] #bead pair
minprrs = self._get_minPairRestraints(bpair,self.contactRange*(self.beadRadius[b1]+self.beadRadius[b2]),minnum=nchoose)
intercontactrs.append(minprrs)
return intracontactrs, intercontactrs
#==========================end probmat restraint
#-------------------------modeling
[docs] def cgstep(self,step,silent=False):
"""
perform conjugate gradient on model using scoring function sf
"""
t0 = time.time()
self.cache_coordinates()
o = IMP.core.ConjugateGradients(self.model)
o.set_check_level(IMP.USAGE)
o.set_scoring_function(self.scoringFunction)
try:
s = o.optimize(step)
except:
#tt = str(np.random.random_integers(1000))+'.txt'
#np.savetxt(tt,self.xyz,fmt='%.4f')
#print tt
#self.cache_coordinates()
#np.savetxt(tt+'.af',self.xyz,fmt='%.4f')
s = o.optimize(step)
if not silent:
self.logger.info(u'CG %d steps done @ %.1fs score = %f'%(step,alabutils.timespend(t0),s))
print 'CG %d steps done @ %.1fs score = %f'%(step,alabutils.timespend(t0),s)
return s
[docs] def mdstep(self,t,step,gamma=0.1,silent=False):
t0 = time.time()
#self.cache_coordinates()
xyzr = self.chain.get_particles()
o = IMP.atom.MolecularDynamics(self.model)
o.set_scoring_function(self.scoringFunction)
md = IMP.atom.VelocityScalingOptimizerState(self.model,xyzr,t)
#md = IMP.atom.LangevinThermostatOptimizerState(self.model,xyzr,t,gamma)
o.add_optimizer_state(md)
s = o.optimize(step)
o.remove_optimizer_state(md)
if not silent:
self.logger.info(u'MD %d steps done @ %.1fs score = %f'%(step,alabutils.timespend(t0),s))
print 'MD %d steps done @ %.1fs score = %f'%(step,alabutils.timespend(t0),s)
return s
[docs] def mdstep_withChromosomeTerritory(self,t,step):
"""
perform an mdstep with chromosome terriory restraint
Parameters
-----------
t : int
temperature
step : int
optimization steps
"""
t0 = time.time()
chrContainers=[]
for chrom in self.genome.info['chrom']:
chromContainer1=IMP.container.ListSingletonContainer(self.model,'Container %s s1'%chrom)
chromContainer2=IMP.container.ListSingletonContainer(self.model,'Container %s s2'%chrom)
for j in np.flatnonzero(self.probmat.idx['chrom'] == chrom):
p=self.chain.get_particle(j)
chromContainer1.add(p)
p=self.chain.get_particle(j+self.nbead)
chromContainer2.add(p)
chrContainers.append(chromContainer1)
chrContainers.append(chromContainer2)
# set each chromosome to different containers
for st in range(step/10):
ctRestraintSet = IMP.RestraintSet(self.model)
for i in range(len(self.genome.info['chrom'])):
comxyz = centerOfMass(chrContainers[2*i])
comc = IMP.algebra.Vector3D(comxyz[0],comxyz[1],comxyz[2])
ub = IMP.core.HarmonicUpperBound(self.chromRadius[i],0.2)
ss = IMP.core.DistanceToSingletonScore(ub,comc)
ct_rs = IMP.container.SingletonsRestraint(ss,chrContainers[2*i])
ctRestraintSet.add_restraint(ct_rs)
#another one
comxyz = centerOfMass(chrContainers[2*i+1])
comc = IMP.algebra.Vector3D(comxyz[0],comxyz[1],comxyz[2])
ub = IMP.core.HarmonicUpperBound(self.chromRadius[i],0.2)
ss = IMP.core.DistanceToSingletonScore(ub,comc)
ct_rs = IMP.container.SingletonsRestraint(ss,chrContainers[2*i+1])
ctRestraintSet.add_restraint(ct_rs)
self.updateScoringFunction([self.restraints,ctRestraintSet])
s = self.mdstep(t,10,silent=True)
#---
#s = mdstep(model,chain,sf,t,1000)
self.updateScoringFunction()
self.logger.info(u'CT-MD %d steps done @ %.1fs score = %f'%(step,alabutils.timespend(t0),s))
print 'CT-MD %d steps done @ %.1fs score = %f'%(step,alabutils.timespend(t0),s)
return s
[docs] def SimulatedAnnealing(self,hot,cold,nc=10,nstep=500):
"""
perform a cycle of simulated annealing from hot to cold
"""
t0 = time.time()
dt = (hot-cold)/nc
for i in range(nc):
t = hot-dt*i
s = self.mdstep(t,nstep,silent=True)
self.logger.info(u" Temp=%d Step=%d Time=%.1fs Score = %.8f"%(t,nstep,alabutils.timespend(t0),s))
print " Temp=%d Step=%d Time=%.1fs Score = %.8f"%(t,nstep,alabutils.timespend(t0),s)
s= self.mdstep(cold,nstep,silent=True)
self.logger.info(u" Temp=%d Step=%d Time=%.1fs Score = %.8f"%(cold,nstep,alabutils.timespend(t0),s))
print " Temp=%d Step=%d Time=%.1fs Score = %.8f"%(cold,nstep,alabutils.timespend(t0),s)
self.cgstep(100,silent=True)
[docs] def SimulatedAnnealing_Scored(self,hot,cold,nc=10,nstep=500,lowscore=10):
"""perform a cycle of simulated annealing but stop if reach low score"""
t0 = time.time()
dt = (hot-cold)/nc
for i in range(nc):
t = hot-dt*i
self.mdstep(t,nstep,silent=True)
score = self.cgstep(100,silent=True)
self.logger.info(u" Temp=%s Step=%s Time=%.1fs Score=%.8f"%(t,nstep,alabutils.timespend(t0),score))
print " Temp=%s Step=%s Time=%.1fs Score=%.8f"%(t,nstep,alabutils.timespend(t0),score)
if score < lowscore:
return t,score
break
self.mdstep(cold,300,silent=True)
score = self.cgstep(100,silent=True)
self.logger.info(u" Temp=%s Step=%s Time=%.1fs Score=%.8f"%(cold,nstep,alabutils.timespend(t0),score))
print " Temp=%s Step=%s Time=%.1fs Score=%.8f"%(cold,nstep,alabutils.timespend(t0),score)
return t,score
[docs] def shrinkingOptimization(self,drange,shrinkScore,minscore,interScale):
radStartShrink = int((1+drange)*self.nucleusRadius)
radEndShrink = int((1-drange)*self.nucleusRadius)
nucExpand=drange+1.3
radExpand = int(nucExpand*self.nucleusRadius)
incr = int(interScale*self.nucleusRadius)
nucrads = range(radEndShrink,radStartShrink,incr)
nucrads.append(radStartShrink)
nucrads = sorted(nucrads, reverse=True)
self.logger.debug(u"Optimization with decreasing NE %s"%(nucrads))
print "Optimization with decreasing NE %s"%(nucrads)
expanded=False
self.logger.debug(u"\t--- Start shrinking ---")
print "\t--- Start shrinking ---"
self.restraints.remove_restraint(self.nucleusEnvelopeRestraint) #will be replaced by temporary
for r_nuc in nucrads:
ubnuc = IMP.core.HarmonicUpperBound(r_nuc,1.0)
ssnuc = IMP.core.DistanceToSingletonScore(ubnuc,self.center)
rnuc = IMP.container.SingletonsRestraint(ssnuc,self.chain)
self.updateScoringFunction([self.restraints,rnuc])
temp, score = self.SimulatedAnnealing_Scored(2000,300,nc=2, lowscore=shrinkScore)
score = self.cgstep(500,silent=True)
self.logger.debug(u"Score optimizing temporary NE %dmm done: %.8f"%(r_nuc,score))
print "Score optimizing temporary NE %dmm done: %.8f"%(r_nuc,score)
if score > minscore:
if expanded == False: #haven't done expansion steps before
self.logger.debug(u"\t --- Start expanding ---")
print "\t --- Start expanding ---"
ubexpend = IMP.core.HarmonicUpperBound(radExpand,1.0)
ssexpend = IMP.core.DistanceToSingletonScore(ubexpend,self.center)
rexpend = IMP.container.SingletonsRestraint(ssexpend,self.chain)
expanded = True
self.updateScoringFunction([self.restraints,rexpend])
temp, score = self.SimulatedAnnealing_Scored(75000,500,nstep=1000, nc=2, lowscore=shrinkScore)
self.logger.debug(u"\t ...back to shrinking...")
print "\t ...back to shrinking..."
else:
self.logger.debug(u'\t Score is still high after expansion: %.8f' %(score))
print '\t Score is still high after expansion: %.8f' %(score)
break
#-
#--
# Putting back original radius cap #
self.restraints.add_restraint(self.nucleusEnvelopeRestraint) #2
self.updateScoringFunction()
temp, score = self.SimulatedAnnealing_Scored(2000,300, nc=2, nstep=1000, lowscore=1)
self.logger.debug(u"Recover nucleus %.1f nm at T=%.1f K, score: %.8f"%(self.nucleusRadius,temp,score))
print "Recover nucleus %.1f nm at T=%.1f K, score: %.8f"%(self.nucleusRadius,temp,score)
return 0
#==================utils
[docs] def evaluateRestraints(self,restraintset,tolerance=0.05):
total = 0
for rs in restraintset:
score = rs.get_score()
rsstr = rs.get_name()
dist,k = re.findall(': (\d+.\d+) k = (\d+.\d+)',rsstr)[0]
dist = float(dist)
k = float(k)
if (2*score/k)**0.5 > tolerance*dist:
total += 1
self.logger.warning(u"%s %f" % (rsstr,score))
print "%s %f" % (rsstr,score)
return total
[docs] def savepym(self,filename):
pymfile = IMP.display.PymolWriter(filename)
g = IMP.core.XYZRsGeometry(self.chain)
g.set_name('beads')
g.set_color(IMP.display.Color(1,1,1))
pymfile.add_geometry(g)
[docs] def savepym_withChromosome(self,filename,s=1,v=1):
import colorsys
pymfile = IMP.display.PymolWriter(filename)
nchrom = len(self.genome.info['chrom'])
h = np.array(range(nchrom+1))*2.0/(nchrom+1)/3
i = -1
for chrom in self.genome.info['chrom']:
i += 1
chromContainer1=IMP.container.ListSingletonContainer(self.model,'Container %s s1'%chrom)
chromContainer2=IMP.container.ListSingletonContainer(self.model,'Container %s s2'%chrom)
for j in np.flatnonzero(self.probmat.idx['chrom'] == chrom):
p=self.chain.get_particle(j)
chromContainer1.add(p)
p=self.chain.get_particle(j+self.nbead)
chromContainer2.add(p)
chrColor = colorsys.hsv_to_rgb(h[i],s,v)
color = IMP.display.Color(chrColor[0],chrColor[1],chrColor[2])
g1 = IMP.core.XYZRsGeometry(chromContainer1)
g1.set_name(chrom+' s1')
g1.set_color(color)
pymfile.add_geometry(g1)
g2 = IMP.core.XYZRsGeometry(chromContainer2)
g2.set_name(chrom+' s2')
g2.set_color(color)
pymfile.add_geometry(g2)
[docs] def saveCoordinates(self,filename,prefix):
import cPickle
import cStringIO
if (filename[-4:] != '.hms'):
filename += '.hms'
log_contents = self._log_capture_string.getvalue()
#self._log_capture_string.close()
#print log_contents
pymhandler = cStringIO.StringIO()
self.savepym_withChromosome(pymhandler)
#xyz = np.zeros((len(self.chain.get_particles()),3))
#r = np.zeros((len(self.chain.get_particles()),1))
#i = -1
#for p in self.chain.get_particles():
#i += 1
#pattr = IMP.core.XYZR(p)
#xyz[i,0] = pattr.get_x()
#xyz[i,1] = pattr.get_y()
#xyz[i,2] = pattr.get_z()
#r[i] = pattr.get_radius()
#---
self.cache_coordinates()
h5f = h5py.File(filename,'a')
if not 'genome' in h5f.keys():
h5f.create_dataset('genome',data = cPickle.dumps(self.probmat.genome))
if not 'idx' in h5f.keys():
h5f.create_dataset('idx',data = self.probmat.idx,compression='gzip')
if prefix in h5f.keys():
grp = h5f[prefix]
if 'xyz' in grp.keys():
grp['xyz'][...] = self.xyz
else:
grp.create_dataset('xyz',data=self.xyz,compression='gzip')
if 'r' in grp.keys():
grp['r'][...] = self.r
else:
grp.create_dataset('r',data=self.r,compression='gzip')
if 'log' in grp.keys():
grp['log'][...] = cPickle.dumps(log_contents)
else:
grp.create_dataset('log',data=cPickle.dumps(log_contents))
if 'pym' in grp.keys():
grp['pym'][...] = cPickle.dumps(pymhandler.getvalue())
else:
grp.create_dataset('pym',data=cPickle.dumps(pymhandler.getvalue()))
else:
grp = h5f.create_group(prefix)
grp.create_dataset('xyz',data=self.xyz,compression='gzip')
grp.create_dataset('r',data=self.r,compression='gzip')
grp.create_dataset('log',data=cPickle.dumps(log_contents))
grp.create_dataset('pym',data=cPickle.dumps(pymhandler.getvalue()))
h5f.close()
#====================================end tadmodel
def consecutiveDistanceByProbability(r1,r2,p,xcontact=2):
"""
Upper bound distance constraints for consecutive domains
return surface to surface distance.
Parameters
-----------
r1,r2 : int
Radius for beads
p : float Probability for contact
xcontact : int
scaling of (r1+r2) where a contact is defined. By default,
center to center distance D = 2*(r1+r2) is defined as contact.
"""
if p > 0:
d = (r1+r2)*(1. + (xcontact**3-1)/p)**(1./3.)
else:
d = 100*(r1+r2) # just a big number
return d-r1-r2 # surface to surface distance
def centerOfMass(chain):
xyzm = np.zeros((len(chain.get_particles()),4))
i = -1
for p in chain.get_particles():
i += 1
pattr = IMP.core.XYZR(p)
xyzm[i,3] = pattr.get_radius()**3
xyzm[i,0] = pattr.get_x()*xyzm[i,3]
xyzm[i,1] = pattr.get_y()*xyzm[i,3]
xyzm[i,2] = pattr.get_z()*xyzm[i,3]
#---
mass = sum(xyzm[:,3])
return (sum(xyzm[:,0])/mass,sum(xyzm[:,1])/mass,sum(xyzm[:,2])/mass)
def surfaceDistance(chain,pair):
'''
calculate surface distance for a particle index pair in chain container
'''
p1 = chain.get_particles()[pair[0]]
p2 = chain.get_particles()[pair[1]]
p1attr = IMP.core.XYZR(p1)
p2attr = IMP.core.XYZR(p2)
x1,y1,z1,r1 = [p1attr.get_x(),p1attr.get_y(),p1attr.get_z(),p1attr.get_radius()]
x2,y2,z2,r2 = [p2attr.get_x(),p2attr.get_y(),p2attr.get_z(),p2attr.get_radius()]
return np.linalg.norm([x1-x2,y1-y2,z1-z2])-r1-r2
#============================i/o
def readCoordinates(filename,prefix):
h5f = h5py.File(filename,'r')
try:
grp = h5f[prefix]
except:
raise RuntimeError, "Group name %s doesn't exist!"%(prefix)
xyz = h5f[prefix]['xyz'][:]
r = h5f[prefix]['r'][:]
h5f.close()
return xyz,r