The sourcecode for the drift model in a spatially structured population, still poorly documented and with some redundancies.

import random as rd, copy as cp, Tkinter as tk, matplotlib.pyplot as plt, collections as ct
from numpy import log10, ceil, arange


class Pop(list):
    """A class for modelling spatially constrained genetic drift.
    Instantiates a list-like object of depth 2 as an initial
    population. Using the method run(), we simulate genetic drift
    in a spatially structured population where dying individuals
    are preferentially replaced by their neighbours. Includes methods
    to visualize the result.
    """
    
    def __init__(self, pop, het=0, jump = 1, scale = 1, verbose=False):
        """Create a list of length 'pop_size', with individuals
        of 'het(erogeneity)' different genotypes.
        Parameters:
        pop (integer, list, or list of lists) - population,
          or population size if an integer
        het (integer) - number of different (potential) genotypes
          population, can be ommitted when input is in list
          format and the range of possible genotypes equals the
          range of genotypes in the initial populations"""
        print "Creating new population..."
        
        self.verbose = verbose
        
        pop = self.input_decode(pop, het)
        # use number of different types in generation 0
        # as variable 'het' when no  other value is explicitly
        # stated:
        self.pop_size = len(pop[0])
        
        # variable "jump" to get scales correct
        # by storing how many generations have
        # been jumped in a summary:
        try: self.jump = pop.jump 
        except AttributeError: 
            self.jump = jump
        
        # miscellaneous attributes
        self.get_defaults(pop, het)
    
    
    def input_decode(self, pop, het, randrange=rd.randrange):
        # if pop is integer, create new population of length
        # pop from scratch
        if type(pop) == int:
            self.extend([[randrange(het) for i in xrange(pop)]])
        
        # else, expect list type input
        else:
            try:
                # if pop is flat list, converte to embedded
                # list
                if type(pop[0]) == int:
                    self.append(pop)
                
                # if pop has correct dimension, do nothing
                elif type(pop[0][0]) == int:
                    self.extend(pop)
                else:
                    print "It looks like the list is too deep"
            except TypeError:
                print "cannot interpret input type"
        try:
            self.het = pop.het # heterogeneity, number of
                # potential genotypes, copied from input
                # if input is already of type Pop
        except AttributeError:
            if het:
                self.het = het # het as stated if
                    # a non-zero value is given
            else:
                self.het = len(set(self[0])) # het
                    # = actual number of different
                    # genotypes otherwhise
            
        return self 
    
    def get_defaults(self, pop, het):
        
                
        try: self.pop_scale = pop.pop_scale
        except AttributeError:
            self.pop_scale = 1
            
        
        try:
            self.het = pop.het
        
        except AttributeError:
            if self.pop_scale <= 1:
                try:
                    self.het = len(set(pop[0]))
                    if max(pop[0]) != self.het-1:
                        correct = dict((list(set(pop[0]))[i], i) for i in range(self.het))
                        self[0] = ([[correct[entry] for entry in gen] for gen in pop])
                        
                except KeyError:
                    self.het = het
                    
            else:
                self.het = het
                
        
        self.freqs = self.reset_freqs(self)
        
        # Re-use colormap of input if applicable,
        # else get default colormap
        try:
            self.col = pop.col
            self.colors = pop.colors
        except AttributeError:
            self.set_colors()

    def set_colors(self, colormap=None):
            """Function allowing the selection of a different colormap,
            in list format; internally used to acquire default colors."""
            if colormap:
                self.col = colormap
            else:
                self.col = ["black", "red", "blue", "green", "purple", "orange", "yellow", "pink", "cyan","grey"]
            self.colors = dict()
            for j in range(int(ceil(float(self.het)/len(self.col)))):
                self.colors.update((len(self.col)*j+i, self.col[i]) for i in range(len(self.col)))
    
        
    def reset_freqs(self, pop):
        # calculate frequencies for all generations in input
        self.gencounter = -1
        return [pop.freq_calc(gen) for gen in range(len(pop))]
    
    def freq_calc(self, gen):
        # Calculate frequencies for an individual generation
        Ct = ct.Counter()
        for i in self[gen]:
            Ct[i] += 1
            
        self.gencounter +=1
        return Ct
    
    def summary(self, dist=0, pop_reduce=0):
        # produce shortened version,
        # primarily for display purposes
        if dist < 1: # distance/jump between generations that are included
                     # defaults to such that the number in the output does not
                     # exceed 1000
            dist = self.sumstandards()[0]
        if pop_reduce < 1: # distance between adjacent individuals
                           # within generation, default such that
                           # number does not exceed 500
            pop_reduce = self.sumstandards()[1]
        summary = cp.copy(self)
        summary = Pop([[self[i][j] for j in range(0,self.pop_size, pop_reduce)] for i in range(0,len(self), dist)], jump=dist, scale=pop_reduce, het=self.het)
        summary.pop_scale = pop_reduce * self.pop_scale
        summary.freqs = [self.freqs[i] for i in range(0,len(self.freqs),dist)]
        summary.colors, summary.het = self.colors, self.het
        
        
        return summary
    
    def sumstandards(self):
        # default values for 'summary()' below, as a function of the population's size
        return (int(round(len(self)/1000.)), int(round(self.pop_size/500.)))
    
        
    def run(self, death_rate=.5, distance=1,boundary=False, migrations=None, limit=-1, randrange=rd.randrange, uniform=rd.uniform, verbose=False, mutations=None, final_state=1):
        """Core function.
        Parameters:
        :: death_rate :: (float) - between 0 and 1
        :: distance:: (float or int) - standard deviation of position
           of a parent in list relative to the individual it replaces
        :: boundary :: (boolean) - whether the population bends in
           on itself on the edges (False, default) or forms a line
           with disjoint ends (True)
        :: migrations:: (tuple or None): tuple of two integers,
           1st for the size of the area over which migration-induced
           perturbances occur, 2nd for the approximate frequency of such
           migration events
        :: limit :: (int) - Countdown to force the simulation to abort
           before uniformity after a fixed number of generations.
           Negative values produce a simulation until the desired
           degree of uniformity (final_state) is reached
        :: final_state :: (int) - degree of uniformity at which to
           abort the simulation, default 1 (for a uniform population).
           '0' should only be chosen with small populations and
           positive values for mutations
           
        """
        print 'running simulation...'
        self.distance = distance
        self.verbose = verbose
        
        # Abort when a) limit of generations or b) uniformity
        # is reached
        current = cp.copy(self[self.gencounter-1])
        while limit and len(self.freqs[self.gencounter]) > final_state:
            # determine whether migration event happens
            try:
                if migrations and randrange(migrations[1]) == 0:
                    self.migrate(migrations[0])
            except TypeError or IndexError: pass
            
            for i in rd.sample(arange(self.pop_size), int(self.pop_size * death_rate)):
                if death_rate == 1 or uniform(0,1) < death_rate:
                    
                    # mutate at given rate if applicable
                    if mutations and uniform(0,1) < mutations:
                        mutant = randrange(self.het)
                        print "Generation", self.gencounter, "mutating", current[i], "to", mutant
                        current[i] = mutant
                    
                    # if running a version without mutations, or otherwise if
                    # no mutation is to occur, proceed to select a parent
                    # and produce a clone of it
                    else:
                        step = self.get_step()
                        while boundary and (i + step < 0 or i+step >= self.pop_size):
                            step = self.get_step()
                        try: current[i] = self[-1][(i + step)]
                        except IndexError:
                            try:
                                current[i] = self[-1][(i + step)-self.pop_size]
                            except IndexError:
                                current[i] = self[-1][(i + step)%self.pop_size]
            
            # at the end of the cycle, store current population
            self.append(cp.copy(current))
            current_freqs = self.freq_calc(self.gencounter+1)
            self.freqs.append(current_freqs)
            limit -= 1 # countdown for limited simulations
            if verbose:
                print self.gencounter, "generations calculated"
                print "current freqs:"
                print current_freqs
        if verbose:
            print self.freqs[-1]
        
    def get_step(self, gauss = rd.gauss):
        """Helper function for run(), selects parent for the 
        successor of a dying individual."""
        if self.distance:
            return int(round(gauss(0,self.distance)))
        else:
            return rd.choice(arange(-self.pop_size,0))
    
    def migrate(self, width, randrange = rd.randrange):
        """Helper function for run(), only used when the optional
        parameter 'migrations' is set. Shuffles a randomly selected
        subset of the population with paramtrized size at
        parametrized intevals. Bends over the edge of the population
        even when boundaries==True.
        """
        try:
            
            # Default behaviour coded by 0: size of area over which migration
            # takes place is random number between 0 and pop_size/3
            if width == 0:
                width = self.pop_size/3
            
            # starting with a random index
            start = randrange(self.pop_size)
            end = start + randrange(width)
            print "Gen.", len(self), "migrating between", start, end
            
            # creating a copy of a slice of the population with given
            # length as the migration hotspot (with extra tricks to
            # bend over corners).
            if end < self.pop_size:
                hotspot = self[-1][start:end]
            else:
                hotspot = self[-1][start:]+self[-1][:end - self.pop_size]
            
            # shuffle result
            rd.shuffle(hotspot)
            
            # re-insert shuffled result:
            if end <= self.pop_size:
                self[-1][start:end] = hotspot
            else:
                self[-1][:end-self.pop_size] = hotspot[self.pop_size-start:]
                self[-1][start:] = hotspot[:self.pop_size-start]
        
        # if size of mutation hotspot is too large, try half the size
        except ValueError:
            print "Size of migration hotspot too large, trying with size/2"
            self.migrate(width/2)
    
    
    def area_plot(self, length=None, start=0, h_stretch=False, v_stretch=False, marg=50, unique=True, col=None):
        """Plotting the population over time and space, using Tkinter."""
        col = self.col
        
        
        if (unique and self.het > len(col) and
          sum([i != 0 for i in self.freqs[-1]]) <= len(col)):
            self.uniq()
            
        
        
        if length == None or length > len(self):
            length = len(self)
        if start < 0 and len(self)+start < length:
            start = len(self)+start
        elif start < 0 or start >= length:
            start = 0
        reallength = length - start
        
        screenwidth = 1100. - marg*2
        if length > screenwidth or h_stretch==True:
            C_width = screenwidth
            x = screenwidth/reallength
        else:
            C_width = reallength
            x = 1
        
        screenheight = 660. - marg * 2
        if self.pop_size > screenheight or v_stretch==True:
            C_height = screenheight
            y = screenheight/self.pop_size
        else:
            C_height = self.pop_size
            y = 1
        
        app = tk.Tk()
        C = tk.Canvas(app, width=C_width + marg * 2, height=C_height + marg * 2)
        C.grid()
        
        colors = self.colors
        hundreds = [w for w in range(0,len(self), 100)]
        for j in xrange(start,length):
            if j in hundreds:
                print "Generation "+str(int(j*self.jump))+" plotted"
            gen = self[j]
            for k in xrange(self.pop_size):
                try:
                    C.create_rectangle(marg + x*(j-start), marg + y*k, marg + x * (j+1-start), marg + y * (k+1), width = 0, fill = colors[gen[k]])
                except IndexError:
                    print "IndexError at: generation", k, "of", len(gen)
                    print "total (cycled) colors", len(colors)
        
        x_dim = int(log10(self.pop_size * self.pop_scale/2))
        y_dim = int(log10(reallength *self.jump/ 2))
        print str(self.pop_scale)
        for i in range(0,self.pop_size*self.pop_scale+1,10**x_dim):
            C.create_line(marg-10, marg + C_height - i*y/self.pop_scale, marg, marg + C_height - i*y/self.pop_scale)
            C.create_text(marg/2., marg + C_height - i*y/self.pop_scale, text=str(i), anchor="s")
        for i in range(int(round(start*self.jump,-y_dim)), length*self.jump + 2, 10**y_dim):
            C.create_line(marg + x*(i/self.jump-start), marg + C_height, marg + x*(i/self.jump-start), marg+10 + C_height)
            C.create_text(marg + x*(i/self.jump-start), marg * 1.5 + C_height, anchor="n", text=str(i))
        try:
            C.create_line(marg + (self.k-start)*x, marg, marg + (self.k-start)*x, marg + C_height, dash=(3,2))
        except AttributeError: pass
        C.grid(row=0,column=0)
        
        def end():
            app.quit()
        tk.Button(app, command=end, text="Close plot").grid(row=0, column=1)
        
        
        return app
    
        
    def get_k(self):
        """Calculate the first generation at which
        each color uniquely identifies a specific
        genotype"""
        k = 0
        while len(self.freqs[k]) > len(self.col):
                k += 1
        self.k = k
        return k
    
    
    def uniq(self):
        """
        internal function for assuring that all remaining
        types are plotted in distinct colors as soon as that
        is possible"""
        k = self.get_k()
        
        screencolors = [self.colors[j] for j in self.freqs[k]]
        
        print k*self.jump
        if len(screencolors) > len(set(screencolors)):
            indices = [j for j in self.freqs[k]]
            for i in range(len(indices)):
                print "replacing", self.colors[indices[i]], "with", self.col[i]
                self.colors[indices[i]] = self.col[i]
            print "one/one correspondece color/type from generation", k*self.jump
    
    def color_change(self, oldcolor, newcolor):
        """change the color with which to display a selected type
        oldcolor: int (index of color in self.colors) or string
        newcolor: new color, string"""
        self.colors[self.color_to_index(oldcolor)] = newcolor
    
    def color_swap(self, color1, color2):
        """swap the colors used to display two types, identified either
        by their indices or with a string
        """
        if type(color1) == type(color2) == str:
            color1_index, color2_index = self.color_to_index(color1), self.color_to_index(color2)
        elif type(color1)== type(color2) == int:
            color1_index, color2_index = color1, color2
            color1, color2 = self.colors[color1], self.colors[color2]
        else: raise TypeError("incompatible types in input, use either indices (int) or color names (str)")
        
        self.colors[color1_index], self.colors[color2_index] = color2, color1
        
    
    
    def color_to_index(self, colorstring, index=None):
        """get the index associated with a specific
        color (string input) after the point when a
        one/one correspondence has been achieved,
        
        """
        if index == None:
            index = self.get_k()
        if type(colorstring) == int:
            return colorstring
        for i in self.freqs[index]:
            if self.colors[i] == colorstring:
                return i
        
    
    def track(self, colorstring, index=None):
        """track the frequencies of the type represented
        by a particular color, only use when colors uniquely
        identify a type, either because there were sufficiently
        few initially, or after the application of uniq()"""
        if index == None:
            index = self.get_k()
        
        i = self.color_to_index(colorstring, index)
        
        return [gen[i] for gen in self.freqs]
    
    def track_max(self, color):
        """Get a list of generations at which a specific genotype
        (identified by its index or by its associated color at generation
        k) is the most frequent one in the population."""
        if type(color) == int:
            return [i for i in range(self.gencounter) if self.freqs[i][color] == max([self.freqs[i][j] for j in self.freqs[i]])]
        elif type(color) == str:
            return [i for i in range(self.gencounter) if self.freqs[i][self.color_to_index(color)] == max([self.freqs[i][j] for j in self.freqs[i]])]
    
    def max_range(self, color):
        """Get a list of tuples for generations betweeen
        which 'color' is continuously the most frequent
        genotype in the population"""
        maxlist = self.track_max(color)
        maxbegin = [i for i in maxlist if i-1 not in maxlist]
        maxend = [i for i in maxlist if i+1 not in maxlist]
        return [(maxbegin[i], maxend[i]) for i in range(len(maxend))]
    
    def track_plot(self, colors=None):
        
        """Create a line diagram for the frequency/ies of
        one or several genotypes identified by their color
        or index. Default: All genotypes present at k"""
        import matplotlib.pyplot as plt
        if colors == None:
            colors = [color for color in self.freqs[self.k]]
        def plot_colors():
            for color in colors:
                plt.plot(self.track(color))
            
        try:
            plot_colors()
        except TypeError:
            plot_colors([colors])
        res = plt.show()

One thought on “

  1. Pingback: On smurfs – playing around with genetic drift in a spatially structured population | Pan loquens

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s