#!/usr/bin/python from __future__ import print_function from Bio import SeqIO from Bio.Seq import Seq from collections import OrderedDict import sys import argparse # https://github.com/jrjhealey/bioinfo-tools/blob/master/Mutate.py # https://www.biostars.org/p/336891/ # TODO: # - create some logic to 'group' mutations that will be applied to the same sequence, to # make all switches at once # - This will also probably break the verbose transversion output so the maths will need replacing # - Create the ability to support INDELS (will also require pairwise alignment so that # hamming distances remain meaningful. def get_args(): """Parse command line arguments""" desc = "Mutate fasta sequences based on a file of sequence mappings." epi = ( "This script takes a mapfile of the form:\n" " SequenceID,A123B\n" " SequenceID,X456Y\n" "And performs substitutions/mutations. At preset it only does one SNP per sequence.\n" ) try: parser = argparse.ArgumentParser( description=desc, epilog=epi, formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument( "mutation_file", action="store", help='File of mutation mappings like so: "SeqID,X123Y"', ) parser.add_argument( "sequences", action="store", help="File of sequences to be mutated (fasta only).", ) parser.add_argument( "-v", "--verbose", action="store_true", help="Verbose behaviour, printing parameters of the script.", ) parser.add_argument( "-o", "--outfile", action="store", help="Output file for mutated sequences (default STDOUT).", ) if len(sys.argv) == 1: parser.print_help(sys.stderr) exit(1) except: sys.stderr.write( "An exception occurred with argument parsing. Check your provided options.\n" ) return parser.parse_args() class Mutation(object): """A class wrapper for sequence IDs so that duplicate IDs can be used in a dictionary""" def __init__(self, name): self.name = name def __repr__(self): return "'" + self.name + "'" def __str__(self): return self.name def parse_mapfile(mapfile): """Return a dict of mapped mutations. File should resemble: SequenceID,A123B SequenceID2,X234Y Sequence IDs should exactly match the fasta headers, as parsed by BioPython. (">" symbols are optional) """ with open(mapfile, "r") as handle: mut_dict = OrderedDict() for line in handle: id, change = line.lstrip(">").rstrip("\n").split(",") mut_dict[Mutation(id)] = change for k, v in mut_dict.items(): assert v[0].isalpha(), ( "First character of mutation map is not a valid letter. Got: %s" % v[0] ) assert v[-1].isalpha(), ( "Last character of mutation map is not a valid letter. Got: %s" % v[-1] ) assert v[1:-1].isdigit(), ( "Location string of mutation map is not a valid number. Got: %s" % v[1:-1] ) return mut_dict def morph(orig, loc, new, mutableseq, verbose): """Perform actual sequence change (polymorphism only at present)""" # Shift location to offset 0-based index loc = loc - 1 assert mutableseq[loc] == orig, ( "Sequence does not match the mutation file for pre-exising residue. Expected %s , got %s " % (orig, mutableseq[loc]) ) if verbose is True: print( "Performing change: {} -> {}, at location: {} (0 based)".format( orig, new, loc ) ) mutableseq[loc] = new return mutableseq def hamming_distance(s1, s2): """Return the Hamming distance between equal-length sequences""" if len(s1) != len(s2): raise ValueError("Undefined for sequences of unequal length") return sum(ch1 != ch2 for ch1, ch2 in zip(s1.upper(), s2.upper())) def main(): args = get_args() if args.outfile is not None: ofh = open(args.outfile, "w") # Parse the mutation file (get mutations by sequence) mutations = parse_mapfile(args.mutation_file) if args.verbose is True: print("Got mutations:") print(mutations) # Iterate all sequences and make any substitutions necessary for record in SeqIO.parse(args.sequences, "fasta"): for k, v in mutations.items(): mutable = record.seq.upper().tomutable() # mutable = record.seq.tomutable() # print("MO:", mutable) if k.name == record.id[0:4]: # BEWARE HARDCODING # print("k.name:", k.name, "record.id:", record.id) orig = v[0] print("orig:", orig) new = v[-1] loc = int(v[1:-1]) if args.verbose: print(record.id) newseq = morph(orig, loc, new, mutable, args.verbose) # print("NS is:", newseq) if args.verbose is True: print("Original: " + record.seq.upper()) print( str((" " * int(loc - 2 + 11))) + "V" ) # Padded string to show where switch happened (not sure how it'll deal with line wrapping print("New: " + newseq) print( "Distance: " + str(hamming_distance(str(record.seq), str(newseq))) ) if args.outfile is not None: ofh.write(">%s_%s\n%s\n" % (record.id, v, newseq)) if args.verbose is False: print(">%s_%s\n%s\n" % (record.id, v, newseq)) if args.outfile is not None: ofh.close() if __name__ == "__main__": main()