# Standard Content and Dispatch handlers for the Bioformat IO system from xml.sax import handler from Martel import Parser, Dispatch from Bio import Std, Decode ################################### # Helper functions to make functions def add_int_handler(klass, tag, attrname): assert not hasattr(klass, "start_" +tag), "existing method exists" assert not hasattr(klass, "end_" +tag), "existing method exists" s = """if 1: def start(self, tag, attrs): self.save_characters() def end(self, tag): self.%s = int(self.get_characters()) """ % attrname d = {} exec s in d setattr(klass, "start_" + tag, d["start"]) setattr(klass, "end_" + tag, d["end"]) def add_text_handler(klass, tag, attrname): assert not hasattr(klass, "start_" +tag), "existing method exists" assert not hasattr(klass, "end_" +tag), "existing method exists" s = """if 1: def start(self, tag, attrs): self.save_characters() def end(self, tag): self.%s = self.get_characters() """ % attrname d = {} exec s in d setattr(klass, "start_" + tag, d["start"]) setattr(klass, "end_" + tag, d["end"]) def add_text_dict_handler(klass, tag, attrname, key): assert not hasattr(klass, "start_" +tag), "existing method exists" assert not hasattr(klass, "end_" +tag), "existing method exists" s = """if 1: def start(self, tag, attrs): self.save_characters() def end(self, tag): self.%s["%s"] = self.get_characters() """ % (attrname, key) d = {} exec s in d setattr(klass, "start_" + tag, d["start"]) setattr(klass, "end_" + tag, d["end"]) def add_text_decode_handler(klass, tag, attrname): assert not hasattr(klass, "start_" +tag), "existing method exists" assert not hasattr(klass, "end_" +tag), "existing method exists" s = """if 1: def start(self, tag, attrs): self.save_characters() self._decode_%s = attrs.get("bioformat:decode", None) def end(self, tag): if self._decode_%s is not None: s = Decode.make_decoder(self._decode_%s)(s) self.%s = self.get_characters() """ % (tag, tag, tag, attrname) d = {"Decode": Decode} exec s in d setattr(klass, "start_" + tag, d["start"]) setattr(klass, "end_" + tag, d["end"]) def add_first_text_handler(klass, tag, attrname): assert not hasattr(klass, "start_" +tag), "existing method exists" assert not hasattr(klass, "end_" +tag), "existing method exists" s = """if 1: def start(self, tag, attrs): if self.%s is None: self.save_characters() def end(self, tag): if self.%s is None: self.%s = self.get_characters() """ % (attrname, attrname, attrname) d = {} exec s in d setattr(klass, "start_" + tag, d["start"]) setattr(klass, "end_" + tag, d["end"]) def add_text_block_handler(klass, tag, joinattr, defaultjoin, attrname): assert not hasattr(klass, "start_" + tag), "existing method exists" assert not hasattr(klass, "end_" + tag), "existing method exists" assert not hasattr(klass, "start_"+tag+"_block"), "existing method exists" assert not hasattr(klass, "end_" +tag+"_block"), "existing method exists" s = """if 1: def start_block(self, tag, attrs): self._%(tag)s_join_func = Decode.make_decoder(attrs.get(%(joinattr)r, %(defaultjoin)r)) self._%(tag)s_lines = [] def end_block(self, tag): self.%(attrname)s = self._%(tag)s_join_func(self._%(tag)s_lines) def start(self, tag, attrs): self.save_characters() def end(self, tag): self._%(tag)s_lines.append(self.get_characters()) """ % locals() d = {"Decode": Decode} exec s in d setattr(klass, "start_" + tag, d["start"]) setattr(klass, "end_" + tag, d["end"]) setattr(klass, "start_" + tag + "_block", d["start_block"]) setattr(klass, "end_" + tag + "_block", d["end_block"]) def add_value_handler(klass, tag, attrname): assert not hasattr(klass, "start_" +tag), "existing method exists" assert not hasattr(klass, "end_" +tag), "existing method exists" s = """if 1: def start(self, tag, attrs): self._%(tag)s_name = attrs["name"] self._%(tag)s_decode = attrs.get("bioformat:decode", None) self.save_characters() def end(self, tag): s = self.get_characters() if self._%(tag)s_decode is not None: s = Decode.make_decoder(self._%(tag)s_decode)(s) self.%(attrname)s[self._%(tag)s_name] = s """ % locals() d = {"Decode": Decode} exec s in d setattr(klass, "start_" + tag, d["start"]) setattr(klass, "end_" + tag, d["end"]) ################################# class ConvertHandler(handler.ContentHandler): """Used to read records and produce output""" def __init__(self, record_builder, writer, record_tag = "record"): handler.ContentHandler.__init__(self) self.record_builder = record_builder self.writer = writer self.record_tag = record_tag def startDocument(self): self.inside_record = 0 self.characters = self.ignore_characters def startElement(self, tag, attrs): if self.inside_record: self.record_builder.startElement(tag, attrs) elif tag == self.record_tag: self.record_builder.startDocument() self.inside_record = 1 self.characters = self.record_builder.characters self.record_builder.startElement(tag, attrs) def endElement(self, tag): if self.inside_record: self.record_builder.endElement(tag) if tag == self.record_tag: self.record_builder.endDocument() self.writer.write(self.record_builder.document) self.inside_record = 0 self.characters = self.ignore_characters def ignore_characters(self, s): pass class ConvertDispatchHandler(Dispatch.Dispatcher): """Used to read records and produce output through a Dispatcher""" def __init__(self, record_builder, writer, record_tag = "record"): setattr(self, "end_" + record_tag, self.write_record) Dispatch.Dispatcher.__init__(self, remap = {record_tag: "bioformat:"} ) self.acquire(record_builder) self.record_builder = record_builder self.writer = writer self.record_tag = record_tag def write_record(self, tag): self.writer.write(self.record_builder.document) class RecognizeHandler(handler.ContentHandler, handler.ErrorHandler): def __init__(self): self.recognized = 1 self.exc = None def fatalError(self, exc): if isinstance(exc, Parser.ParserIncompleteException): pass else: self.recognized = 0 self.exc = exc raise exc error = fatalError def endElement(self, tag): if tag == "record": raise Parser.ParserException("we finished a record!") class Handle_dbid(Dispatch.Callback): def start_dbid(self, tag, attrs): self.attrs = attrs self.save_characters() def end_dbid(self, tag): text = self.get_characters() self.callback(text, self.attrs) class Handle_description(Dispatch.Callback): def start_description_block(self, tag, attrs): j = attrs.get("join", None) if j is None: self.join_fctn = Decode.join_fixspaces else: self.join_fctn = Decode.make_typechecked_decoder(j, list, str) self.descriptions = [] def start_description(self, tag, attrs): self.save_characters() def end_description(self, tag): x = self.get_characters() self.descriptions.append(x) def end_description_block(self, tag): self.callback(self.join_fctn(self.descriptions)) #### There can be multiple dbxref_dbids in a dbxref # DR EMBL; X64411; CAA45756.1; -. # <..dbname style="swiss">EMBL # X64411 # CAA45756.1 # ### # DR P35156, YPUI_BACSU, F; # P35156 # YPUI_BACSU # # def _fixup_sp_pattern(exp): import re import Martel exp = Martel.select_names(exp, (Std.dbxref_dbname.tag,Std.dbxref_dbid.tag)) e = exp._find_groups(Std.dbxref_dbname.tag) assert len(e) == 1 e = e[0] e.name = "dbname" dbstyle = e.attrs["style"] e.attrs = {} e = exp._find_groups(Std.dbxref_dbid.tag) assert len(e) == 2 e[0].name = "primary_dbid" primary_type = e[0].attrs["type"] e[0].attrs = {} e[1].name = "secondary_dbid" secondary_type = e[1].attrs["type"] e[1].attrs = {} pattern = str(exp) + "$" pat = re.compile(pattern) return pat, dbstyle, primary_type, secondary_type # Turns out these 'fast' versions speed up the dbxref code by about # a factor of 2. # DR PIR; S08427; S08427. _fast_dbxref_sp_general_data = None def _fast_dbxref_sp_general(s): global _fast_dbxref_sp_general_data if _fast_dbxref_sp_general_data is None: from Bio.expressions.swissprot import sprot38 _fast_dbxref_sp_general_data = _fixup_sp_pattern( sprot38.real_DR_general) pat, dbstyle, primary_type, secondary_type = _fast_dbxref_sp_general_data m = pat.match(s) assert m is not None, "Ill-formated sp-general dxbref: %r" % s return ( (dbstyle, m.group("dbname"), primary_type, m.group("primary_dbid"), 0), (dbstyle, m.group("dbname"), secondary_type, m.group("secondary_dbid"), 0) ) # DR PFAM; PF01018; GTP1_OBG; 1. # DR PROSITE; PS00905; GTP1_OBG; 1. _fast_dbxref_sp_prosite_data = None def _fast_dbxref_sp_prosite(s): global _fast_dbxref_sp_prosite_data if _fast_dbxref_sp_prosite_data is None: from Bio.expressions.swissprot import sprot38 _fast_dbxref_sp_prosite_data = _fixup_sp_pattern( sprot38.real_DR_prosite) pat, dbstyle, primary_type, secondary_type = _fast_dbxref_sp_prosite_data m = pat.match(s) assert m is not None, "Ill-formated sp-prosite dxbref: %r" % s return ( (dbstyle, m.group("dbname"), primary_type, m.group("primary_dbid"), 0), (dbstyle, m.group("dbname"), secondary_type, m.group("secondary_dbid"), 0) ) # DR EMBL; M36407; AAA33110.1; -. _fast_dbxref_sp_embl_data = None def _fast_dbxref_sp_embl(s): global _fast_dbxref_sp_embl_data if _fast_dbxref_sp_embl_data is None: from Bio.expressions.swissprot import sprot38 _fast_dbxref_sp_embl_data = _fixup_sp_pattern( sprot38.real_DR_embl) pat, dbstyle, primary_type, secondary_type = _fast_dbxref_sp_embl_data m = pat.match(s) assert m is not None, "Ill-formated sp-embl dxbref: %r" % s return ( (dbstyle, m.group("dbname"), primary_type, m.group("primary_dbid"), 0), (dbstyle, m.group("dbname"), secondary_type, m.group("secondary_dbid"), 0) ) _fast_dbxref_parser_table = { "sp-general": _fast_dbxref_sp_general, "sp-prosite": _fast_dbxref_sp_prosite, "sp-embl": _fast_dbxref_sp_embl, } class Handle_dbxref(Dispatch.Callback): def __init__(self, callback): Dispatch.Callback.__init__(self, callback) self.supported_features.append("fast-sp-dbxref") self.slow_callback = self.callback def start_dbxref(self, tag, attrs): self.negate = 0 self.dbname = None self.dbids = [] self.info = [] def start_dbxref_dbname(self, tag, attrs): assert self.dbname is None, "cannot set the dbname twice" self.dbname_style = attrs.get("style", "unknown") self.save_characters() def end_dbxref_dbname(self, tag): self.dbname = self.get_characters() def start_dbxref_dbid(self, tag, attrs): d = attrs.get("dbname", None) if d is None: assert self.dbname is not None, "must set the dbname" self.info.append( (self.dbname_style, self.dbname, attrs.get("type", "primary")) ) else: self.info.append( ("bioformat", d, attrs.get("type", "primary")) ) self.save_characters() def end_dbxref_dbid(self, tag): self.dbids.append( self.get_characters()) def start_dbxref_negate(self, tag, attrs): self.negate = 1 def end_dbxref(self, tag): cb = self.slow_callback if cb is None: return negate = self.negate for ( (dbname_style, dbname, idtype), dbid) in zip(self.info, self.dbids): self.slow_callback(dbname_style, dbname, idtype, dbid, negate) def start_fast_dbxref(self, tag, attrs): style = attrs["style"] self._fast_parser = _fast_dbxref_parser_table[style] self.save_characters() self.slow_callback = None def end_fast_dbxref(self, tag): for info in self._fast_parser(self.get_characters()): self.callback(*info) self.slow_callback = self.callback ################## class Handle_sequence(Dispatch.Callback): global_alphabet = None def start_(self, tag, attrs): self.global_alphabet = None def start_sequence_block(self, tag, attrs): self.local_alphabet = attrs.get("alphabet", None) self.gapchar = attrs.get("gapchar", None) self.stopchar = attrs.get("stopchar", None) j = attrs.get("join", None) if j is not None: self.join_func = Decode.make_typechecked_decoder(j, list, str) else: self.join_func = None self.sequences = [] def end_sequence_block(self, tag): f = self.join_func if f is not None: seq = self.f(self.sequences) else: seq = "".join(self.sequences).replace(" ", "") alphabet = self.local_alphabet or self.global_alphabet or "unknown" self.callback( (alphabet, seq, self.gapchar, self.stopchar) ) def start_alphabet(self, tag, attrs): self.global_alphabet = attrs["alphabet"] def start_sequence(self, tag, attrs): self.save_characters() def end_sequence(self, tag): self.sequences.append(self.get_characters()) class Feature: def __init__(self, name, description, location, qualifiers): self.name = name self.description = description self.location = location self.qualifiers = qualifiers def __str__(self): return "Feature %r %r %s num_qualifiers = %d" % \ (self.name, self.description, self.location, len(self.qualifiers)) class Handle_feature_location(Dispatch.Callback): def __init__(self, callback, settings = {}): Dispatch.Callback.__init__(self, callback) self.settings = settings def start_feature(self, tag, attrs): self.location_style = attrs.get("location-style", self.settings["location-style"]) j = attrs.get("join-feature", None) if j is None: self.text_join_func = "".join else: self.text_join_func = Decode.make_typechecked_decoder(j, list, str) self.location_start = None self.location_end = None self.text_lines = [] def end_feature(self, tag): if self.location_start or self.location_end: if self.text_lines: raise TypeError("Cannot have both location text and start/end") self.callback(self.location_style, (self.location_start, self.location_end)) else: self.callback(self.location_style, (self.text_join_func(self.text_lines), None)) def start_feature_location(self, tag, attrs): self.save_characters() def end_feature_location(self, tag): self.text_lines.append(self.get_characters()) add_text_handler(Handle_feature_location, "feature_location_start", "location_start") add_text_handler(Handle_feature_location, "feature_location_end", "location_end") ################################## class Handle_feature_qualifier(Dispatch.Callback): def __init__(self, callback, settings): self.settings = settings Dispatch.Callback.__init__(self, callback) def start_feature_qualifier(self, tag, attrs): self.name = None self.description = [] qj = attrs.get("join-qualifier", None) if qj is None: self.join = self.settings["qualifier_join_func"] else: self.join = Decode.make_typechecked_decoder(qj, list, str) def end_feature_qualifier(self, tag): self.callback(self.name, self.join(self.description)) def start_feature_qualifier_description(self, tag, attrs): self.save_characters() def end_feature_qualifier_description(self, tag): self.description.append(self.get_characters()) add_text_handler(Handle_feature_qualifier, "feature_qualifier_name", "name") #################### class Handle_features(Dispatch.Callback): def __init__(self, callback): Dispatch.Callback.__init__(self, callback) self.settings = {} self.acquire(Handle_feature_location(self.add_location, self.settings)) self.acquire(Handle_feature_qualifier(self.add_feature_qualifier, self.settings)) def start_feature_block(self, tag, attrs): jf = attrs.get("join-description", None) if jf is None: self.join_feature_description = Decode.join_fixspaces else: self.join_feature_description = Decode.make_typechecked_decoder( jf, list, str) self.settings["location-style"] = attrs.get("location-style", None) jq = attrs.get("join-qualifier", None) if jq is None: self.settings["qualifier_join_func"] = Decode.join_fixspaces else: self.settings["qualifier_join_func"] = \ Decode.make_typechecked_decoder(jq, list, str) self.features = [] def end_feature_block(self, tag): self.callback(self.features) self.features = None def start_feature(self, tag, attrs): self.name = None self.description = [] self.location = None self.qualifiers = [] def start_feature_description(self, tag, attrs): self.save_characters() def end_feature_description(self, tag): self.description.append(self.get_characters()) def end_feature(self, tag): self.features.append(Feature( self.name, self.join_feature_description(self.description), self.location, self.qualifiers)) def add_feature_qualifier(self, name, description): self.qualifiers.append((name, description)) def add_location(self, style, location_info): self.location = (style, location_info) add_text_handler(Handle_features, "feature_name", "name") ############## Search handlers class Handle_hsp_seqalign(Dispatch.Callback): def start_hsp(self, tag, attrs): self.query_name = None # "Query" self.subject_name = None # "Sbjct" self.query_seq = "" # the actual text of the sequence self.homology_seq = "" self.subject_seq = "" self.query_start_loc = None self.query_end_loc = None self.subject_start_loc = None self.subject_end_loc = None def end_hsp(self, tag): self.callback(self) def start_hsp_seqalign(self, tag, attrs): self.sub_leader = None def start_hsp_seqalign_query_seq(self, tag, attrs): self.save_characters() def end_hsp_seqalign_query_seq(self, tag): s = self.get_characters() self.query_seq += s self.sub_query_seq_len = len(s) def start_hsp_seqalign_homology_seq(self, tag, attrs): self.save_characters() def end_hsp_seqalign_homology_seq(self, tag): query_leader = self.leader_size query_seq_len = self.sub_query_seq_len line = self.get_characters() s = line[query_leader:query_leader+query_seq_len] assert len(s) == query_seq_len, (len(s), query_seq_len, line) self.homology_seq += s def start_hsp_seqalign_subject_seq(self, tag, attrs): self.save_characters() def end_hsp_seqalign_subject_seq(self, tag): self.subject_seq += self.get_characters() def start_hsp_seqalign_query_leader(self, tag, attrs): self.save_characters() def end_hsp_seqalign_query_leader(self, tag): self.leader_size = len(self.get_characters()) add_first_text_handler(Handle_hsp_seqalign, "hsp_seqalign_query_name", "query_name") add_first_text_handler(Handle_hsp_seqalign, "hsp_seqalign_subject_name", "subject_name") add_first_text_handler(Handle_hsp_seqalign, "hsp_seqalign_query_start", "query_start_loc") add_text_handler(Handle_hsp_seqalign, "hsp_seqalign_query_end", "query_end_loc") add_first_text_handler(Handle_hsp_seqalign, "hsp_seqalign_subject_start", "subject_start_loc") add_text_handler(Handle_hsp_seqalign, "hsp_seqalign_subject_end", "subject_end_loc") ############################# class Handle_hsp(Dispatch.Callback): def __init__(self, callback): Dispatch.Callback.__init__(self, callback) self.acquire(Handle_hsp_seqalign(self.add_hsp_seqs)) def start_hsp(self, tag, attrs): self.hsp_values = {} # expect, p, identities, ... self.strands = {} self.frames = {} def end_hsp(self, tag): self.callback(self.hsp_values, self.hsp_info, self.strands, self.frames, ) def start_hsp_strand(self, tag, attrs): self.strands[attrs["which"]] = attrs["strand"] def start_hsp_frame(self, tag, attrs): self.getting_frame = attrs["which"] self.save_characters() def end_hsp_frame(self, tag): self.frames[self.getting_frame] = self.get_characters() self.getting_frame = None def add_hsp_seqs(self, hsp_info): self.hsp_info = hsp_info def start_hsp_value(self, tag, attrs): self.value_convert = attrs.get("bioformat:decode", None) self.value_name = attrs["name"] self.save_characters() def end_hsp_value(self, tag): s = self.get_characters() if self.value_name is not None: if self.value_name == "float": s = float(s) else: s = Decode.make_decoder(self.value_convert)(s) self.hsp_values[self.value_name] = s ############################# class Handle_search_table(Dispatch.Callback): def start_search_table_value(self, tag, attrs): self.value_name = attrs["name"] self.value_decode = attrs.get("bioformat:decode", None) self.save_characters() def end_search_table_value(self, tag): s = self.get_characters() if self.value_decode is not None: x = self.value_decode if x == "int": s = int(s) elif x == "float": s = float(s) else: s = Decode.make_decoder(x)(s) self.values[self.value_name] = s def start_search_table(self, tag, attrs): self.data = [] def end_search_table(self, tag): self.callback(self.data) self.data = None def start_search_table_entry(self, tag, attrs): self.description = None self.values = {} def end_search_table_entry(self, tag): self.data.append( (self.description, self.values) ) self.description = self.values = None add_text_handler(Handle_search_table, "search_table_description", "description") ############################# class Handle_search_header(Dispatch.Callback): def start_(self, tag, attrs): self.dict = {} self.query_description = None def end_search_header(self, tag): d = self.dict d["query_description"] = self.query_description self.callback(d) add_text_block_handler(Handle_search_header, "query_description", "join-query", "join|fixspaces", "query_description") add_text_dict_handler(Handle_search_header, "application_name", "dict", "appname") add_text_dict_handler(Handle_search_header, "application_version", "dict", "appversion") add_text_dict_handler(Handle_search_header, "database_name", "dict", "dbname") add_text_dict_handler(Handle_search_header, "database_num_sequences", "dict", "db_num_sequences") add_text_dict_handler(Handle_search_header, "database_num_letters", "dict", "db_num_letters") add_text_dict_handler(Handle_search_header, "query_size", "dict", "query_size") ############################# class Handle_search_info(Dispatch.Callback): def start_(self, tag, attrs): self.parameters = {} self.statistics = {} def end_(self, tag): self.callback(self.parameters, self.statistics) add_value_handler(Handle_search_info, "search_parameter", "parameters") add_value_handler(Handle_search_info, "search_statistic", "statistics")