#*****************************************************************************
# calls.py (part of the blacktie package)
#
# (c) 2013 - Augustine Dunn
# James Laboratory
# Department of Biochemistry and Molecular Biology
# University of California Irvine
# wadunn83@gmail.com
#
# Licenced under the GNU General Public License 3.0 license.
#******************************************************************************
"""
####################
calls.py
####################
Code defining classes to represent and excute pipeline program calls.
"""
import os
import sys
import base64
import traceback
import re
import time
import socket
import shutil
from collections import defaultdict
from mako.template import Template
from blacktie.utils.misc import Bunch,bunchify
from blacktie.utils.misc import email_notification
from blacktie.utils.misc import get_time
from blacktie.utils.misc import uniques
from blacktie.utils.externals import runExternalApp,mkdirp
from blacktie.utils import errors
[docs]class BaseCall(object):
"""
Defines common methods for all program call types.
"""
[docs] def __init__(self,yargs,email_info,run_id,run_logs,conditions,mode='analyze'):
"""
initializes a ``BaseCall`` object
:param yargs: argument tree generated by parsing the yaml config file
:param email_info: Bunch() object containing keys: ``email_from``, ``email_to``, ``email_li``
:param run_id: id for the whole set of calls
:param run_logs: the directory where log file should be put
:param conditions: one or a list of condition-dictionaries from ``yargs.condition_queue``
:param mode: choices = ['analyze','dry_run','qsub_script']
:returns: an initialized ``BaseCall`` object
"""
self._hostname = socket.gethostname()
self.mode = mode
self.yargs = yargs
self.email_info = email_info
self.run_id = run_id
self.log_dir = run_logs
self.prgbar_regex = yargs.prgbar_regex
self._conditions = conditions
self.prog_yargs = None # over-ride in child __init__
self.arg_str = None # over-ride in child __init__
[docs] def _flag_out_dir(self):
"""
renames out directory, prepending 'FAILED' flag: equivalent of ``mv tophat_Aa0 FAILED.tophat_Aa0``
"""
try:
orig_path_tokens = os.path.abspath(self.out_dir).split('/')[1:]
new_path = "/%s/FAILED.%s" % ('/'.join(orig_path_tokens[:-1]), orig_path_tokens[-1])
os.rename(os.path.abspath(self.out_dir),new_path)
self.out_dir = new_path
except OSError as exc:
if 'No such file or directory' in str(exc):
pass
else:
raise exc
[docs] def init_log_file(self):
"""
creates empty log file for this call and stores its path in ``self.log_file``
"""
if self.mode == 'analyze':
log_file = "%s/%s.log" % (self.log_dir.rstrip('/'),self.call_id)
log_file = open(log_file,'w')
log_file.close()
self.log_file = os.path.abspath(log_file.name)
else:
pass
[docs] def get_condition_id(self,condition_dict):
"""
Constructs condition ID
:param condition_dict: a dictionary containing consition info like name, replicate_id, etc.
:returns: an ID used to construct the call_id of a call.
"""
condition_id = "%s_%s" % (condition_dict['name'],condition_dict['replicate_id'])
return condition_id
[docs] def set_call_id(self):
"""
builds and stores this call's call ID in ``self.call_id``
"""
if isinstance(self._conditions,int) or isinstance(self._conditions,str):
# this should mean that we are dealing with a "group" type call
self.experiment_id = self._conditions
self._conditions = self.yargs.groups[self.experiment_id]
condition_ids = [self.get_condition_id(x) for x in self._conditions]
call_id = "%s_%s" % (self.prog_name,".".join(condition_ids))
self.call_id = call_id
elif isinstance(self._conditions,dict):
condition_id = self.get_condition_id(self._conditions)
call_id = "%s_%s" % (self.prog_name,condition_id)
self.call_id = call_id
else:
raise errors.SanityCheckError('type(self._conditions) should be either int, str, or dict. It is: %s' % (type(self._conditions)))
[docs] def notify_start_of_call(self):
"""
sends notification email informing user that ``self.call_id`` has been initiated
"""
e = self.email_info
report_time = get_time()
email_sub="[SITREP from %s] Run %s - Starting %s at %s" % (self._hostname,self.run_id,self.call_id,report_time)
email_body="%s\n\n%s" % (email_sub,self.cmd_string)
server_info = self.yargs.run_options.custom_smtp
email_notification(e.email_from, e.email_to, email_sub, email_body, base64.b64decode(e.email_li), server_info)
[docs] def notify_end_of_call(self):
"""
sends notification email informing user that ``self.call_id`` has exited
"""
e = self.email_info
report_time = get_time()
email_sub="[SITREP from %s] Run %s - Exited %s at %s" % (self._hostname,self.run_id,self.call_id,report_time)
# repeat subject in body
email_body=email_sub
email_body += "\n\n ==> stderr <==\n\n%s" % (self.stderr_msg)
server_info = self.yargs.run_options.custom_smtp
email_notification(e.email_from, e.email_to, email_sub, email_body, base64.b64decode(e.email_li), server_info)
[docs] def build_out_dir_path(self):
"""
builds correct ``out_dir`` path based on state of ``self``
:returns: ``out_dir``
"""
base_dir = self.yargs.run_options.base_dir.rstrip('/')
return "%s/%s" % (base_dir,self.call_id)
[docs] def init_opt_dict(self):
"""
builds a dict with non-job-specific values set and job-specific values set to False
based on option names in the yaml file for this phase
:returns: partially populated ``opt_dict``
"""
opt_dict = defaultdict(bool)
for opt in self.prog_yargs.keys():
opt_dict[opt]
opt_dict = dict(opt_dict) # from now on I want missing keys to raise error
# Populate opt_dict with non-job-specific options encoded in yaml file
# Ignore positional args for now
no_positional_args = self.prog_yargs.keys()
try:
no_positional_args.remove('positional_args')
except ValueError:
pass
for opt in no_positional_args:
opt_val = self.prog_yargs[opt]
if opt_val != "from_conditions":
opt_dict[opt] = opt_val
return opt_dict
[docs] def construct_options_list(self):
"""
converts ``opt_dict`` into list encoding proper options to send to the current program: saves to ``self``.
"""
options_list = []
for opt in self.opt_dict:
if self.opt_dict[opt] is False:
continue
else:
pass
if len(opt) == 1:
options_list.append('-%s' % (opt))
else:
options_list.append('--%s' % (opt))
opt_val_str = str(self.opt_dict[opt])
if opt_val_str != 'True':
options_list.append(opt_val_str)
self.options_list = options_list
[docs] def purge_progress_bars(self, stderr_str):
"""
removes the dynamic progress bars included in some output in case user did not turn them off
"""
lines = stderr_str.split('\n')
no_bar = []
for line in lines:
if self.prgbar_regex.search(line) != None: # prgbar regex compiled outside scope to avoid re-complilation overhead
pass
else:
no_bar.append(line)
return '\n'.join(no_bar)
[docs] def log_msg(self,log_msg=''):
"""
* opens ``self.log_file``
* writes ``log_msg``
* closes ``self.log_file``
"""
if self.mode == 'analyze':
log = open(self.log_file,'a')
log.write('\n%s\n' % (log_msg))
log.close()
else:
pass
[docs] def log_start(self):
"""
records start of call in ``self.log_file``
"""
if self.mode == 'analyze':
msg = '[start %s]\n' % (self.call_id)
self.log_msg(log_msg=msg)
else:
pass
[docs] def log_end(self):
"""
records command string used, program output, and the end of call in ``self.log_file``
"""
if self.mode == 'analyze':
self.stderr_msg = self.purge_progress_bars(self.stderr_msg)
err_msg = "%s\n\n%s\n[end %s]" % (self.cmd_string,self.stderr_msg,self.call_id)
self.log_msg(log_msg=err_msg)
else:
pass
[docs] def build_qsub(self):
"""
Builds and writes this CallObject's qsub script to current working directory
using options provided under the "qsub_options" sub-tree in the yaml config file.
"""
nicknames = {'tophat':'th',
'cufflinks':'cl',
'cuffmerge':'cm',
'cuffdiff':'cd',}
qsub_options = self.yargs.qsub_options
# set keyword args for template
kw = Bunch()
kw.queues = qsub_options.queues
kw.datahome = qsub_options.datahome
kw.core_range = qsub_options.core_range
kw.email_addy = self.email_info.email_to
kw.call_id = self.call_id
job_name = "%s_%s" % (nicknames[self.prog_name], '_'.join(self.call_id.split('_')[1:]))
kw.job_name = job_name
kw.out_dir = self.out_dir
kw.ld_library_path = qsub_options.ld_library_path
# need to make sure we use the number of cores that the SGE gave us
kw.cmd_str = self.cmd_string.replace('-p %s' % (self.opt_dict['p']),'-p $CORES')
qsub_template = Template(filename=qsub_options.template)
out_file = open('%s.qsub.sh' % (self.call_id),'w')
qsub_string = qsub_template.render(**kw)
out_file.write(qsub_string)
out_file.close()
[docs] def execute(self):
"""
calls correct program, records results, and manages errors
"""
self.cmd_string = "%s %s" % (self.prog_name,self.arg_str)
if self.mode == 'analyze':
try:
self.notify_start_of_call()
self.log_start()
self.stdout_msg,self.stderr_msg = runExternalApp(progName=self.prog_name,argStr=self.arg_str)
self.log_end()
self.notify_end_of_call()
except Exception as exc:
email_body = traceback.format_exc()
email_body = self.purge_progress_bars(email_body)
e = self.email_info
server_info = self.yargs.run_options.custom_smtp
self.stdout_msg = "\nError in call. Check error log.\n"
self.stderr_msg = email_body
self.log_end()
self._flag_out_dir()
if isinstance(exc,errors.SystemCallError):
email_sub="[SITREP from %s] Run %s experienced SystemCallError in call %s. MOVING ON." % (self._hostname,self.run_id,self.call_id)
email_notification(e.email_from, e.email_to, email_sub, email_body, base64.b64decode(e.email_li),server_info)
elif isinstance(exc,KeyboardInterrupt):
email_sub="[SITREP from %s] Run %s experienced KeyboardInterrupt in call %s. MOVING ON." % (self._hostname,self.run_id,self.call_id)
email_notification(e.email_from, e.email_to, email_sub, email_body, base64.b64decode(e.email_li),server_info)
else:
email_sub="[SITREP from %s] Run %s experienced unhandled exception in call %s. EXITING." % (self._hostname,self.run_id,self.call_id)
email_notification(e.email_from, e.email_to, email_sub, email_body, base64.b64decode(e.email_li),server_info)
raise
# DRY RUN
elif self.mode == 'dry_run':
print self.cmd_string + '\n'
# QSUB SCRIPT
elif self.mode == 'qsub_script':
self.build_qsub()
else:
raise errors.BlacktieError()
[docs]class TophatCall(BaseCall):
"""
Manage a single call to tophat and store associated run data.
"""
[docs] def __init__(self,yargs,email_info,run_id,run_logs,conditions,mode):
"""
initializes the ``TophatCall`` object
:param yargs: argument tree generated by parsing the yaml config file
:param email_info: Bunch() object containing keys: ``email_from``, ``email_to``, ``email_li``
:param run_id: id for the whole set of calls
:param run_logs: the directory where log file should be put
:param conditions: one or a list of condition-dictionaries from ``yargs.condition_queue``
:param mode: choices = ['analyze','dry_run','qsub_script']
:returns: an initialized ``TophatCall`` object
"""
self.prog_name = 'tophat'
BaseCall.__init__(self,yargs,email_info,run_id,run_logs,conditions,mode)
self.prog_yargs = self.yargs.tophat_options
self.set_call_id()
self.init_log_file()
self.out_dir = self.get_out_dir()
# set up options for program call
self.opt_dict = self.init_opt_dict()
self.opt_dict['o'] = self.out_dir
self.opt_dict['G'] = self.get_gtf_anno()
self.construct_options_list()
# now the positional args
bowtie_index = self.get_bt_idx()
left_reads = self.get_lt_reads()
right_reads = self.get_rt_reads()
# combine and save arg_str
self.options_list.extend([bowtie_index,left_reads,right_reads])
self.arg_str = ' '.join(self.options_list)
[docs] def get_out_dir(self):
"""
Handles ``yaml_config.tophat_options.o: from_conditions``.
"""
option = self.prog_yargs.o
if option == 'from_conditions':
return self.build_out_dir_path()
else:
return option
[docs] def get_gtf_anno(self):
"""
Handles ``yaml_config.tophat_options.G: from_conditions``.
"""
option = self.prog_yargs.G
if option == 'from_conditions':
gtf_path = self._conditions['gtf_annotation']
return gtf_path
else:
return option
[docs] def get_bt_idx(self):
"""
Handles ``yaml_config.tophat_options.positional_args.bowtie2_index: from_conditions``.
"""
option = self.prog_yargs.positional_args.bowtie2_index
if option == 'from_conditions':
bt_idx_dir = self.yargs.run_options.bowtie_indexes_dir.rstrip('/')
bt_idx_name = self._conditions['bowtie2_index']
return "%s/%s" % (bt_idx_dir,bt_idx_name)
else:
return option
[docs] def get_lt_reads(self):
"""
Handles ``yaml_config.tophat_options.positional_args.left_reads: from_conditions``.
"""
option = self.prog_yargs.positional_args.left_reads
if option == 'from_conditions':
lt_reads = self._conditions['left_reads']
return "%s" % (','.join(lt_reads))
else:
return option
[docs] def get_rt_reads(self):
"""
Handles ``yaml_config.tophat_options.positional_args.right_reads: from_conditions``.
"""
option = self.prog_yargs.positional_args.right_reads
if option == 'from_conditions':
rt_reads = self._conditions['right_reads']
return "%s" % (','.join(rt_reads))
else:
return option
[docs]class CufflinksCall(BaseCall):
"""
Manage a single call to cufflinks and store associated run data.
"""
[docs] def __init__(self,yargs,email_info,run_id,run_logs,conditions,mode):
"""
initializes the ``CufflinksCall`` object
:param yargs: argument tree generated by parsing the yaml config file
:param email_info: Bunch() object containing keys: ``email_from``, ``email_to``, ``email_li``
:param run_id: id for the whole set of calls
:param run_logs: the directory where log file should be put
:param conditions: one or a list of condition-dictionaries from ``yargs.condition_queue``
:param mode: choices = ['analyze','dry_run','qsub_script']
:returns: an initialized ``CufflinksCall`` object
.. todo:: **DONE** add support for --GTF in addition to currently supported --GTF-guide
"""
self.prog_name = 'cufflinks'
BaseCall.__init__(self,yargs,email_info,run_id,run_logs,conditions,mode)
self.prog_yargs = self.yargs.cufflinks_options
self.set_call_id()
self.init_log_file()
self.out_dir = self.get_out_dir()
# set up options for program call
self.opt_dict = self.init_opt_dict()
self.verify_options()
self.opt_dict['o'] = self.out_dir
self.opt_dict['GTF-guide'] = self.get_gtf_anno_guide()
self.opt_dict['GTF'] = self.get_gtf_anno()
self.opt_dict['frag-bias-correct'] = self.get_genome()
self.opt_dict['mask-file'] = self.get_mask_file()
self.construct_options_list()
# now the positional args
self.accepted_hits = self.get_accepted_hits()
# combine and save arg_str
self.options_list.extend([self.accepted_hits])
self.arg_str = ' '.join(self.options_list)
[docs] def verify_options(self):
"""
Makes sure that conflicting options were not imported from yaml config file.
.. todo:: **DONE** GTF and GTF-guide should not be used together but both can be ommited
"""
options = self.prog_yargs
# GTF and GTF-guide should not be used together but both can be ommited
try:
gtf = bool(options['GTF'])
except KeyError:
gtf = False
options['GTF'] = gtf
try:
gtf_guide = bool(options['GTF-guide'])
except KeyError:
gtf_guide = False
options['GTF-guide'] = gtf_guide
if gtf and gtf_guide:
raise errors.SanityCheckError('"GTF" and "GTF-guide" option were non-False. Please only set one or the other.')
[docs] def get_out_dir(self):
"""
Handles ``yaml_config.cufflinks_options.o: from_conditions``.
"""
option = self.prog_yargs.o
if option == 'from_conditions':
return self.build_out_dir_path()
else:
return option
[docs] def get_gtf_anno_guide(self):
"""
Handles ``yaml_config.cufflinks_options.GTF-guide: from_conditions``.
"""
option = self.prog_yargs['GTF-guide']
if option == 'from_conditions':
gtf_path = self._conditions['gtf_annotation']
return gtf_path
else:
return option
[docs] def get_gtf_anno(self):
"""
Handles ``yaml_config.cufflinks_options.GTF: from_conditions``.
"""
option = self.prog_yargs['GTF']
if option == 'from_conditions':
gtf_path = self._conditions['gtf_annotation']
return gtf_path
else:
return option
[docs] def get_genome(self):
"""
Handles ``yaml_config.cufflinks_options.frag-bias-correct: from_conditions``.
"""
option = self.prog_yargs['frag-bias-correct']
if option == 'from_conditions':
genome_path = self._conditions['genome_seq']
return genome_path
else:
return option
[docs] def get_accepted_hits(self):
"""
Handles ``yaml_config.cufflinks_options.positional_args.accepted_hits: from_conditions``.
"""
option = self.prog_yargs.positional_args.accepted_hits
if option == 'from_conditions':
bam_path = self.get_bam_path()
return bam_path
else:
return option
[docs] def get_bam_path(self):
"""
Supports ``self.get_accepted_hits()``.
"""
th_call_id = "tophat_%s" % (self.get_condition_id(self._conditions))
try:
th_call = self.yargs.call_records[th_call_id]
th_out_dir = th_call.out_dir
bam_path = "%s/accepted_hits.bam" % (th_out_dir.rstrip('/'))
except (KeyError,AttributeError) as exp:
msg = "WARNING: unable to find matching tophat call record in memory for condition: %s\nAttempting to find corresponding cufflinks outfile in your base_dir." \
% (self.get_condition_id(self._conditions))
self.log_msg(log_msg=msg)
# try to guess correct tophat out directory
base_dir = self.yargs.run_options.base_dir
bam_path = "%s/%s/accepted_hits.bam" % (base_dir.rstrip('/'),th_call_id)
if not os.path.exists(bam_path):
if self.mode == 'analyze':
#: ``.. todo:: build framework to handle this non-fatally``
raise errors.MissingArgumentError("I could not find an appropriate accepted_hits.bam file. Failed to find: %s" \
% (bam_path))
else:
pass
else:
return bam_path
return bam_path
[docs] def get_mask_file(self):
"""
Handles ``yaml_config.cufflinks_options.mask-file: from_conditions``.
"""
try:
option = self.prog_yargs['mask-file']
if option == 'from_conditions':
mask_path = self._conditions['mask_file']
return mask_path
else:
return option
except KeyError:
return False
[docs]class CuffmergeCall(BaseCall):
"""
Manage a single call to cuffmerge and store associated run data.
"""
[docs] def __init__(self,yargs,email_info,run_id,run_logs,conditions,mode):
"""
initializes the ``CuffmergeCall`` object
:param yargs: argument tree generated by parsing the yaml config file
:param email_info: Bunch() object containing keys: ``email_from``, ``email_to``, ``email_li``
:param run_id: id for the whole set of calls
:param run_logs: the directory where log file should be put
:param conditions: one or a list of condition-dictionaries from ``yargs.condition_queue``
:param mode: choices = ['analyze','dry_run','qsub_script']
:returns: an initialized ``CuffmergeCall`` object
"""
self.prog_name = 'cuffmerge'
BaseCall.__init__(self,yargs,email_info,run_id,run_logs,conditions,mode)
self.prog_yargs = self.yargs.cuffmerge_options
self.set_call_id()
self.init_log_file()
self.out_dir = self.get_out_dir()
# set up options for program call
self.opt_dict = self.init_opt_dict()
self.opt_dict['o'] = self.out_dir
self.opt_dict['ref-gtf'] = self.get_gtf_anno()
self.opt_dict['ref-sequence'] = self.get_genome()
self.construct_options_list()
# now the positional args
assembly_list = self.get_cufflinks_gtfs()
# combine and save arg_str
self.options_list.extend([assembly_list])
self.arg_str = ' '.join(self.options_list)
[docs] def get_out_dir(self):
"""
Handles ``yaml_config.cuffmerge_options.o: from_conditions``.
"""
option = self.prog_yargs.o
if option == 'from_conditions':
return self.build_out_dir_path()
else:
return option
[docs] def get_gtf_anno(self):
"""
Handles ``yaml_config.cuffmerge_options.ref-gtf: from_conditions``.
"""
option = self.prog_yargs['ref-gtf']
if option == 'from_conditions':
# Make sure all conditions agree on anno.gtf
gtf_path = set([c['gtf_annotation'] for c in self._conditions])
if len(gtf_path) == 1:
gtf_path = gtf_path.pop()
else:
raise errors.InvalidFileFormatError('CHECK YAML CONFIG FILE: Conditions in experiment %s do not agree on which "ref-gtf" to use: %s.' \
% (self.experiment_id,gtf_path))
return gtf_path
else:
return option
[docs] def get_genome(self):
"""
Handles ``yaml_config.cuffmerge_options.ref-sequence: from_conditions``.
"""
option = self.prog_yargs['ref-sequence']
if option == 'from_conditions':
# Make sure all conditions agree on their genome seq
genome_path = set([c['genome_seq'] for c in self._conditions])
if len(genome_path) == 1:
genome_path = genome_path.pop()
else:
raise errors.InvalidFileFormatError('CHECK YAML CONFIG FILE: Conditions in experiment %s do not agree on which "ref-sequence" to use: %s.' \
% (self.experiment_id,genome_path))
return genome_path
else:
return option
[docs] def get_cufflinks_gtfs(self):
"""
Handles ``yaml_config.cuffmerge_options.positional_args.assembly_list: from_conditions``.
"""
option = self.prog_yargs.positional_args.assembly_list
if option == 'from_conditions':
paths = []
for condition in self._conditions:
gtf_path = self.get_cuffGTF_path(condition)
paths.append(gtf_path)
mkdirp(self.out_dir)
assembly_list_file = open("%s/assembly_list.txt" % (self.out_dir.rstrip('/')),'w')
assembly_list_file.write("\n".join(paths))
assembly_list_file.close()
if self.mode == 'dry_run':
shutil.rmtree(self.out_dir)
else:
pass
return os.path.abspath(assembly_list_file.name)
else:
return option
[docs] def get_cuffGTF_path(self,condition):
"""
Supports ``self.get_cufflinks_gtfs()``.
"""
cl_call_id = "cufflinks_%s" % (self.get_condition_id(condition))
try:
cl_call = self.yargs.call_records[cl_call_id]
cl_out_dir = cl_call.out_dir
gtf_path = "%s/transcripts.gtf" % (cl_out_dir.rstrip('/'))
except (KeyError,AttributeError) as exp:
msg = "WARNING: unable to find matching cufflinks call record in memory for condition: %s\nAttempting to find corresponding cufflinks outfile in your base_dir." \
% (self.get_condition_id(condition))
self.log_msg(log_msg=msg)
# try to guess correct cufflinks out directory
base_dir = self.yargs.run_options.base_dir
gtf_path = "%s/%s/transcripts.gtf" % (base_dir.rstrip('/'),cl_call_id)
if not os.path.exists(gtf_path):
if self.mode == 'analyze':
#: .. todo:: build framework to handle this non-fatally
raise errors.MissingArgumentError("I could not find an appropriate transcripts.gtf file. Failed to find: %s" \
% (gtf_path))
else:
pass
else:
return gtf_path
return gtf_path
[docs]class CuffdiffCall(BaseCall):
"""
Manage a single call to cuffdiff and store associated run data.
"""
[docs] def __init__(self,yargs,email_info,run_id,run_logs,conditions,mode):
"""
initializes the ``CuffdiffCall`` object
:param yargs: argument tree generated by parsing the yaml config file
:param email_info: Bunch() object containing keys: ``email_from``, ``email_to``, ``email_li``
:param run_id: id for the whole set of calls
:param run_logs: the directory where log file should be put
:param conditions: one or a list of condition-dictionaries from ``yargs.condition_queue``
:param mode: choices = ['analyze','dry_run','qsub_script']
:returns: an initialized ``CuffdiffCall`` object
"""
self.prog_name = 'cuffdiff'
BaseCall.__init__(self,yargs,email_info,run_id,run_logs,conditions,mode)
self.prog_yargs = self.yargs.cuffdiff_options
self.set_call_id()
self.init_log_file()
self.out_dir = self.get_out_dir()
# set up options for program call
##cuffdiff_options:
##o: from_conditions
##labels: from_conditions
##frag-bias-correct: from_conditions
##positional_args:
##transcripts_gtf: from_conditions
##sample_bams: from_conditions
self.opt_dict = self.init_opt_dict()
self.opt_dict['o'] = self.out_dir
self.opt_dict['labels'] = self.get_labels()
self.opt_dict['mask-file'] = self.get_mask_file()
self.opt_dict['frag-bias-correct'] = self.get_genome()
self.construct_options_list()
# now the positional args
transcripts_gtf = self.get_cuffmerge_gtf()
sample_bams = self.get_sample_bams()
# combine and save arg_str
self.options_list.extend([transcripts_gtf,sample_bams])
self.arg_str = ' '.join(self.options_list)
[docs] def get_out_dir(self):
"""
Handles ``yaml_config.cuffdiff_options.o: from_conditions``.
"""
option = self.prog_yargs.o
if option == 'from_conditions':
return self.build_out_dir_path()
else:
return option
[docs] def get_genome(self):
"""
Handles ``yaml_config.cuffdiff_options.frag-bias-correct: from_conditions``.
"""
option = self.prog_yargs['frag-bias-correct']
if option == 'from_conditions':
# Make sure all conditions agree on their genome seq
genome_path = set([c['genome_seq'] for c in self._conditions])
if len(genome_path) == 1:
genome_path = genome_path.pop()
else:
raise errors.InvalidFileFormatError('CHECK YAML CONFIG FILE: Conditions in experiment %s do not agree on which "ref-sequence" to use: %s.' \
% (self.experiment_id,genome_path))
return genome_path
else:
return option
[docs] def get_sample_bams(self):
"""
Handles ``yaml_config.cuffdiff_options.positional_args.sample_bams: from_conditions``.
"""
def join_replicate_paths(top_level_conditions,paths):
# I KNOW this has crappy big O time but the list sizes here are small
joined_rep_paths = []
for tlc in top_level_conditions:
tlc_paths = []
for path in paths:
if tlc in path:
tlc_paths.append(path)
else:
pass
joined_rep_paths.append(','.join(tlc_paths))
return joined_rep_paths
#: .. todo:: support replicate bams as: " samp1_r1.bam,samp1_r2.bam samp2_r1.bam,samp2_r2.bam "
option = self.prog_yargs.positional_args.sample_bams
if option == 'from_conditions':
paths = []
for condition in self._conditions:
bam_path = self.get_bam_path(condition)
paths.append(bam_path)
# join bam paths that are bio-replicates with commas
top_level_conditions = ['_'.join( path.split('/')[-2].split('_')[:-1] ) for path in paths]
top_level_conditions = uniques(top_level_conditions)
joined_replicate_paths = join_replicate_paths(top_level_conditions,paths)
return ' '.join(joined_replicate_paths)
else:
return option
[docs] def get_bam_path(self,condition):
"""
Supports ``self.get_sample_bams()``.
"""
th_call_id = "tophat_%s" % (self.get_condition_id(condition))
try:
th_call = self.yargs.call_records[th_call_id]
th_out_dir = th_call.out_dir
bam_path = "%s/accepted_hits.bam" % (th_out_dir.rstrip('/'))
except (KeyError,AttributeError) as exp:
msg = "WARNING: unable to find matching tophat call record in memory for condition: %s\nAttempting to find corresponding cufflinks outfile in your base_dir." \
% (self.get_condition_id(condition))
self.log_msg(log_msg=msg)
# try to guess correct tophat out directory
base_dir = self.yargs.run_options.base_dir
bam_path = "%s/%s/accepted_hits.bam" % (base_dir.rstrip('/'),th_call_id)
if not os.path.exists(bam_path):
if self.mode == 'analyze':
#: .. todo:: build framework to handle this non-fatally
raise errors.MissingArgumentError("I could not find an appropriate accepted_hits.bam file. Failed to find: %s" \
% (bam_path))
else:
pass
else:
return bam_path
return bam_path
[docs] def get_mask_file(self):
"""
Handles ``yaml_config.cuffdiff_options.mask-file: from_conditions``.
"""
try:
option = self.prog_yargs['mask-file']
if option == 'from_conditions':
# Make sure all conditions agree on their mask-file
mask_path = set([c['mask_file'] for c in self._conditions])
if len(mask_path) == 1:
mask_path = mask_path.pop()
else:
raise errors.InvalidFileFormatError('CHECK YAML CONFIG FILE: Conditions in experiment %s do not agree on which "ref-sequence" to use: %s.' \
% (self.experiment_id,mask_path))
return mask_path
else:
return option
except KeyError:
return False
[docs] def get_labels(self):
"""
Handles ``yaml_config.cuffdiff_options.labels: from_conditions``.
"""
option = self.prog_yargs['labels']
if option == 'from_conditions':
labels = []
for condition in self._conditions:
labels.append(condition['name'])
labels = uniques(labels)
return ','.join(labels)
else:
return option
[docs] def get_cuffmerge_gtf(self):
"""
Handles ``yaml_config.cuffdiff_options.positional_args.transcripts_gtf: from_conditions``.
"""
cm_call_id = self.call_id.replace('cuffdiff','cuffmerge')
try:
cm_call = self.yargs.call_records[cm_call_id]
cm_out_dir = cm_call.out_dir
gtf_path = "%s/merged.gtf" % (cm_out_dir.rstrip('/'))
except (KeyError,AttributeError) as exp:
msg = "WARNING: unable to find matching cuffmerge call record in memory for experiment: %s\nAttempting to find corresponding cufflinks outfile in your base_dir." \
% (cm_call_id)
self.log_msg(log_msg=msg)
# try to guess correct cuffmerge out directory
base_dir = self.yargs.run_options.base_dir
gtf_path = "%s/%s/merged.gtf" % (base_dir.rstrip('/'),cm_call_id)
if not os.path.exists(gtf_path):
if self.mode == 'analyze':
#: .. todo:: build framework to handle this non-fatally
raise errors.MissingArgumentError("I could not find an appropriate merged.gtf file. Failed to find: %s" \
% (gtf_path))
else:
pass
else:
return gtf_path
return gtf_path
[docs]class CummerbundCall(BaseCall):
"""
Manage a single call to blacktie-cummerbund script and store associated run data.
"""
[docs] def __init__(self,yargs,email_info,run_id,run_logs,conditions,mode):
"""
initializes the ``CummerbundCall`` object
:param yargs: argument tree generated by parsing the yaml config file
:param email_info: Bunch() object containing keys: ``email_from``, ``email_to``, ``email_li``
:param run_id: id for the whole set of calls
:param run_logs: the directory where log file should be put
:param conditions: one or a list of condition-dictionaries from ``yargs.condition_queue``
:param mode: choices = ['analyze','dry_run','qsub_script']
:returns: an initialized ``CummerbundCall`` object
"""
self.prog_name = 'blacktie-cummerbund'
BaseCall.__init__(self,yargs,email_info,run_id,run_logs,conditions,mode)
self.prog_yargs = self.yargs.cummerbund_options
self.set_call_id()
self.init_log_file()
self.out_dir = self.get_out_dir()
# set up options for program call
##cuffdiff_options:
##o: from_conditions
##labels: from_conditions
##frag-bias-correct: from_conditions
##positional_args:
##transcripts_gtf: from_conditions
##sample_bams: from_conditions
self.opt_dict = self.init_opt_dict()
self.opt_dict['cuffdiff-dir'] = self.get_cuffdiff_dir()
self.opt_dict['gtf-path'] = self.get_cuffmerge_gtf()
self.opt_dict['out'] = self.out_dir
self.construct_options_list()
# combine and save arg_str
self.arg_str = ' '.join(self.options_list)
[docs] def get_cuffdiff_dir(self):
"""
Handles ``yaml_config.cummerbund_options.cuffdiff-dir: from_conditions``.
"""
cd_call_id = self.call_id.replace('blacktie-cummerbund','cuffdiff')
try:
cd_call = self.yargs.call_records[cd_call_id]
cd_out_dir = cd_call.out_dir
cuffdiff_dir = cd_out_dir
except (KeyError,AttributeError) as exp:
msg = "WARNING: unable to find matching cuffdiff call record in memory for experiment: %s\nAttempting to find corresponding cuffdiff out_dir in your base_dir." \
% (cd_call_id)
self.log_msg(log_msg=msg)
# try to guess correct cuffmerge out directory
base_dir = self.yargs.run_options.base_dir
cuffdiff_dir = "%s/%s" % (base_dir.rstrip('/'),cd_call_id)
if not os.path.exists(cuffdiff_dir):
if self.mode == 'analyze':
#: .. todo:: build framework to handle this non-fatally
raise errors.MissingArgumentError("I could not find an appropriate cuffdiff-dir directory. Failed to find: %s" \
% (cuffdiff_dir))
else:
pass
else:
return cuffdiff_dir
return cuffdiff_dir
[docs] def get_out_dir(self):
"""
Handles ``yaml_config.cummerbund_options.out: from_conditions``.
"""
option = self.prog_yargs.out
if option == 'from_conditions':
return self.build_out_dir_path()
else:
return option
[docs] def get_cuffmerge_gtf(self):
"""
Handles ``yaml_config.cummerbund_options.gtf-path: from_conditions``.
"""
cm_call_id = self.call_id.replace('blacktie-cummerbund','cuffmerge')
try:
cm_call = self.yargs.call_records[cm_call_id]
cm_out_dir = cm_call.out_dir
gtf_path = "%s/merged.gtf" % (cm_out_dir.rstrip('/'))
except (KeyError,AttributeError) as exp:
msg = "WARNING: unable to find matching cuffmerge call record in memory for experiment: %s\nAttempting to find corresponding cufflinks outfile in your base_dir." \
% (cm_call_id)
self.log_msg(log_msg=msg)
# try to guess correct cuffmerge out directory
base_dir = self.yargs.run_options.base_dir
gtf_path = "%s/%s/merged.gtf" % (base_dir.rstrip('/'),cm_call_id)
if not os.path.exists(gtf_path):
if self.mode == 'analyze':
#: .. todo:: build framework to handle this non-fatally
raise errors.MissingArgumentError("I could not find an appropriate merged.gtf file. Failed to find: %s" \
% (gtf_path))
else:
pass
else:
return gtf_path
return gtf_path