import codecs, time, urllib, re, htmlentitydefs from xml.sax import xmlreader, SAXException import Datatypes, ReseekFile, MultiDict from xml.sax.handler import feature_external_ges import POM def _construct_pattern(): n = max([len(x) for x in htmlentitydefs.entitydefs.keys()]) entity_pattern = re.compile(r"&([a-zA-Z]{1,%d});" % n) defs = {} for k, v in htmlentitydefs.entitydefs.items(): if len(v) == 1: defs[k] = unicode(v, "latin-1") elif v[:2] == "&#" and v[-1] == ";": defs[k] = unichr(int(v[2:-1])) else: raise AssertionError("Unexpected entitydef value: %r" % v) return entity_pattern, defs _entity_pattern, entitydefs = _construct_pattern() def _load_module(name): mod = __import__(name) for term in name.split(".")[1:]: mod = getattr(mod, term) return mod class GetObject: def __call__(self, obj): self.obj = obj class UsePOMParser: def __init__(self, module_name): self.module_name = "Bio.EUtils.DTDs." + module_name def parse_using_dtd(self, file): module = _load_module(self.module_name) cb = GetObject() parser = POM.get_parser(callback = cb, module = module) # This tells the parser to not resolve the NCBI DTDs try: parser.setFeature(feature_external_ges, 0) SAXException except SAXException: pass parser.parse(file) return cb.obj # Pull out the "ERROR", "ErrorList", and "WarningList" terms def _check_for_errors(pom): errmsg = None errors = [] warnings = [] err = pom.get("ERROR", None) if err is not None: errmsg = err.tostring() for x in pom.get("ErrorList", []): errors.append( Datatypes.problem_category_mapping[x.__class__.__name__]( x.tostring())) for x in pom.get("WarningList", []): warnings.append( Datatypes.problem_category_mapping[x.__class__.__name__]( x.tostring())) return errmsg, errors, warnings def _check_for_bad_input_stream(infile, force_encoding = 1): reseekfile = ReseekFile.ReseekFile(infile) s = reseekfile.read(500) reseekfile.seek(0) reseekfile.nobuffer() lines = s.split("\n") if len(lines) > 3: if lines[0] == "": if lines[2].find("

Error occured:") != 1: s = re.findall(r"Error occured:([^<]+)", lines[2])[0] s = urllib.unquote(s) raise Datatypes.EUtilsError(s) raise Datatypes.EUtilsError("Unknown error:\n" + reseekfile.read(1000)) # On error, fetch can return a valid XML document, but not one # which matches the DTD. Rather than change the DTD (which is # pubmed_020114.dtd) I'll check it here to raise the error. # XXX HACK! if lines[2] == "": # # \tEmpty id list - nothing todo # s = "Unable to parse pmFetchResult error message" if len(lines) > 4: s = re.findall(r"([^>]+)", lines[3])[0] raise Datatypes.EUtilsError(s) # This happens when you choose a database which doesn't exist # Are there other reasons? Probably yes, if you choose # other illegal parameters. if lines[0].startswith("'): # Doesn't use an encoding, which means the XML is supposed # to be in UTF-8 encoding. However, it seems NCBI uses # Latin-1 so we need to translate the Latin-1 input to # UTF-8 output else the XML parsers will fail for non-ASCII # characters. reseekfile = codecs.EncodedFile(reseekfile, "utf-8", "iso-8859-1") return reseekfile ############################## def parse_search(infile, webenv_ref = [None]): # Need to pull out the webenv from the input stream infile = _check_for_bad_input_stream(infile) xml_parser = UsePOMParser("eSearch_020511") pom = xml_parser.parse_using_dtd(infile) errmsg, errors, warnings = _check_for_errors(pom) # ErrorList (PhraseNotFound*,FieldNotFound*)> # WarningList (PhraseIgnored*, # QuotedPhraseNotFound*, # OutputMessage*)> # If it's only "PhraseNotFound" erros, with an # OutputMessage of "No items found." then personally # think that should be considered the same as a search # which returned no results. # Set things up for an empty match webenv = None query_key = None count = 0 retmax = 0 retstart = 0 ids = [] translation_set = {} expression = None nothing_matched = 0 if errmsg == "Can't run executor": # Check that the error list only contains PhraseNotFound terms flg = 1 for x in errors: if x.category != "PhraseNotFound": flg = 0 break if flg: # Okay, only PhraseNotFound. Make sure there is # only one OutputMessage, with the text "No items found." # (Eg, an OutputMessage of 'Query syntax error.' means # there was a real problem.) msgs = [x for x in warnings if x.category == "OutputMessage"] if len(msgs) == 1 and msgs[0].text == "No items found.": nothing_matched = 1 if not nothing_matched: # This is an error raise Datatypes.EUtilsSearchError(errmsg, errors, warnings) # In other words, check if something matched if not nothing_matched: ## Get WebEnv, if it exists if pom.get_element("WebEnv") is not None: s = pom["WebEnv"].tostring() webenv = urllib.unquote(s) # ONLY change webenv_ref if there's a new one webenv_ref[0] = webenv # Other simple fields if pom.get_element("QueryKey") is not None: query_key = pom["QueryKey"].tostring() count = int(pom["Count"].tostring()) retmax = int(pom["RetMax"].tostring()) retstart = int(pom["RetStart"].tostring()) # The identifiers (if any) # NOTE: not a DBIds because the search result doesn't list the # database searched! ids = [x.tostring() for x in pom["IdList"].find_elements("Id")] # TranslationSet translation_set = {} for ele in pom["TranslationSet"]: translation_set[urllib.unquote_plus(ele["From"].tostring())] = \ urllib.unquote_plus(ele["To"].tostring()) # Convert the RPN TranslationStack into an Expression stack = [] try: translation_stack = pom["TranslationStack"] except IndexError: translation_stack = [] for ele in translation_stack: if ele.__class__.__name__ == "TermSet": stack.append(Datatypes.Term( term = urllib.unquote_plus(ele["Term"].tostring()), field = urllib.unquote_plus(ele["Field"].tostring()), count = int(ele["Count"].tostring()), explode = ele["Explode"].tostring())) elif ele.__class__.__name__ == "OP": s = ele.tostring().strip() if s == "AND": stack[-2:] = [stack[-2] & stack[-1]] elif s == "OR": stack[-2:] = [stack[-2] | stack[-1]] elif s == "RANGE": stack[-2:] = [Datatypes.Range(stack[-2], stack[-1])] elif s == "NOT": stack[-2:] = [Datatypes.Not(stack[-2], stack[-1])] elif s == "GROUP": # GROUP doesn't appear to do any more than put an extra # parenthesis around ANDs and ORs -- can't find any # specific documentation on its role # So right now it is redundant and just ignore it pass else: raise TypeError("Unknown OP code: %r" % (s,)) else: raise TypeError("Unknown TranslationStack element: %r" % (ele.__class__.__name__,)) # hack -- it appears as if the translation stack is sometimes missing # an AND at the end, which I guess is supposed to be implicit. For # instance, doing a text word search plus date range leaves off a # trailing and to link the final elements. if len(stack) == 2: stack[-2:] = [stack[-2] & stack[-1]] if len(stack) > 1: raise TypeError("Incomplete TranslationStack: %r" % stack) elif not stack: stack = [None] expression = stack[0] # Return either our synthesized query or search_result = Datatypes.SearchResult(count, retmax, retstart, ids, translation_set, expression, webenv, query_key, errors, warnings, time.time()) return search_result ########################### def parse_post(infile, webenv_ref): # It doesn't look like I need check for a bad input stream # since I can only generate two types of error messages # ePost_020511.dtd xml_parser = UsePOMParser("ePost_020511") pom = xml_parser.parse_using_dtd(infile) # If there was an ERROR, raise it now errmsg, errors, warnings = _check_for_errors(pom) if errmsg is not None: raise Datatypes.EUtilsError(errmsg) # Get any invalid identifies invalid_ids = [x.tostring() for x in pom.get("InvalidIdList", [])] # Otherwise, get the WebEnv string s = pom["WebEnv"].tostring() webenv = urllib.unquote(s) webenv_ref[0] = webenv query_key = pom["QueryKey"].tostring() return Datatypes.PostResult(webenv, query_key, invalid_ids, time.time()) ############################### # PubDate: '2000 Feb 1' or '1975 Jun' or '1995' # BLAH! PubMed 8318652 also has "1993 May-Jun" for _pubdate_format1 = re.compile( r"(?P\d{4})( (?P[A-Za-z]{3})( (?P\d+))?)?$") _pubdate_format2 = re.compile( r"(?P\d{4}) (?P[A-Za-z]{3})-(?P[A-Za-z]{3})") _month_names_to_number = { None: 1, "Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6, "Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12, } # Ignoring the hour and minute parts -- they seem to be either # midnight or 09:00 and since I don't know the timezone it seems # rather pointless # EntrezDate: 2000/02/17 09:00 _entrezdate_format = re.compile(r"(?P\d+)/(?P\d+)/(?P\d+)") # This may not be the right way to do this. # Perhaps should keep the string and only translate upon request # to a given time format? def convert_summary_Date(x): return convert_summary_Date_string(x.tostring()) def convert_summary_Date_string(s): # Can be in one of several different formats m = _pubdate_format1.match(s) if m is not None: # 2000 Feb 15 d = {} d["year"] = int(m.group("year")) d["month"] = _month_names_to_number[m.group("month")] try: d["day"] = int(m.group("day")) except TypeError: # if this is None d["day"] = 1 return Datatypes.Date(**d) m = _pubdate_format2.match(s) if m is not None: # 1993 May-Jun d = {} d["year"] = int(m.group("year")) d["month"] = _month_names_to_number[m.group("month1")] d["day"] = 1 return Datatypes.Date(**d) m = _entrezdate_format.match(s) if m is not None: return Datatypes.Date(year = int(m.group("year"),), month = int(m.group("month")), day = int(m.group("day"))) raise TypeError("Unknown date format: %s" % (s,)) def unescape_entities(s): if "&" not in s: return unicode(s) terms = [] i = 0 defs = entitydefs for m in _entity_pattern.finditer(s): terms.append(s[i:m.start()]) try: terms.append(defs[m.group(1)]) except KeyError: terms.append(m.group(0)) i = m.end() terms.append(s[i:]) return "".join(terms) def convert_summary_String(x): # The text may have HTML entity definitions .. convert as needed # # XXX Is this correct? Most other characters are properly # encoded. This may mean that that data provider messed up and # sent data in the wrong format. return unescape_entities(x.tostring()) def convert_summary_Integer(x): return int(x.tostring()) def convert_summary_Unknown(x): return x.tostring() def convert_summary_List(x): # XXX I'm not doing this as a list.. Should I? return convert_summary_Items(x.find_elements("Item")) def convert_summary_Items(x): d = MultiDict.OrderedMultiDict() for item in x: name = item.Name if name in d: print "Found multiple Items named %r!" % (name,) d[name] = summary_type_parser_table[item.Type](item) return d summary_type_parser_table = { "String": convert_summary_String, "Integer": convert_summary_Integer, "Unknown": convert_summary_Unknown, "Date": convert_summary_Date, "List": convert_summary_List, } def parse_summary_xml(infile): infile = _check_for_bad_input_stream(infile) xml_parser = UsePOMParser("eSummary_020511") pom = xml_parser.parse_using_dtd(infile) errmsg, errors, warnings = _check_for_errors(pom) if errmsg is not None: raise Datatypes.EUtilsError(errmsg) results = [] for docsum in pom: id = docsum["Id"].tostring() d = convert_summary_Items(docsum.find_elements("Item")) results.append(Datatypes.Summary(id, d)) return results ############################### # XML def parse_fetch_publication_xml(infile): infile = _check_for_bad_input_stream(infile, force_encoding = 0) xml_parser = UsePOMParser("pubmed_020114") return xml_parser.parse_using_dtd(infile) def parse_fetch_sequence_xml(infile): raise NotImplementedError # Identifer list ("\n" separated) # Useful for "uilist", "acc", and a few others def parse_fetch_identifiers(infile): infile = _check_for_bad_input_stream(infile) return [x.strip() for x in infile.readlines() if x != "\n"] ############################### def _check_for_link_errors(pom): if not pom.has_key("LinkSet"): if pom.has_key("ERROR"): raise Datatypes.EUtilsError(pom["ERROR"].tostring()) raise Datatypes.EUtilsError("Server failed to process request") if len(pom.find_elements("LinkSet")) != 1: raise AssertionError( "Did not expect to find more than one LinkSet in the XML") linkset = pom["LinkSet"] if linkset.has_key("ERROR"): raise Datatypes.EUtilsError(linkset["ERROR"].tostring()) def _parse_link(infile): #infile = _check_for_bad_input_stream(infile) # Need this, as seen in # http://www.ncbi.nlm.nih.gov/entrez/eutils/elink.fcgi?dbfrom=pubmed&cmd=llinks&db=pubmed&id=10611131%2C12085853 # which has an a-with-umlaut in Latin-1 encoding infile = codecs.EncodedFile(infile, "utf-8", "iso-8859-1") xml_parser = UsePOMParser("eLink_020511") pom = xml_parser.parse_using_dtd(infile) _check_for_link_errors(pom) return pom def parse_neighbor_links(infile): pom = _parse_link(infile) pom_linkset = pom["LinkSet"] dbfrom = pom_linkset["DbFrom"].tostring().lower() idlist = [x.tostring() for x in pom_linkset["IdList"].find_elements("Id")] linksetdbs = MultiDict.OrderedMultiDict() for pom_linksetdb in pom_linkset.find_elements("LinkSetDb"): if pom_linksetdb.has_key("ERROR"): raise Datatypes.EUtilsError(pom_linksetdb["ERROR"].tostring()) dbto = pom_linksetdb["DbTo"].tostring().lower() linkname = pom_linksetdb["LinkName"].tostring() links = [] for pom_link in pom_linksetdb.find_elements("Link"): score = pom_link.get("Score") if score is not None: score = int(score.tostring()) links.append(Datatypes.Link(pom_link["Id"].tostring(), score)) linksetdbs[linkname] = Datatypes.LinkSetDb(dbto, linkname, links) return Datatypes.NeighborLinkSet(Datatypes.DBIds(dbfrom.lower(), idlist), linksetdbs) def parse_lcheck(infile): pom = _parse_link(infile) pom_linkset = pom["LinkSet"] dbfrom = pom_linkset["DbFrom"].tostring().lower() idchecks = [] for ele in pom_linkset["IdCheckList"].find_elements("Id"): has_linkout = getattr(ele, "HasLinkOut", "N") has_linkout = {"Y": 1}.get(has_linkout, 0) has_neighbor = getattr(ele, "HasNeighbor", "N") has_neighbor = {"Y": 1}.get(has_neighbor, 0) idchecks.append(Datatypes.IdCheck(ele.tostring(), has_linkout, has_neighbor)) return Datatypes.CheckLinkSet(dbfrom, idchecks) parse_ncheck = parse_lcheck def _get_opt_string(ele, name): x = ele.get(name) if x is None: return None s = x.tostring() if not s: return None return s def parse_llinks(infile): pom = _parse_link(infile) pom_linkset = pom["LinkSet"] dbfrom = pom_linkset["DbFrom"].tostring().lower() idurlsets = [] for ele in pom_linkset["IdUrlList"].find_elements("IdUrlSet"): id = ele["Id"].tostring() objurls = [] for pom_objurl in ele.find_elements("ObjUrl"): url = _get_opt_string(pom_objurl, "Url") linkname = _get_opt_string(pom_objurl, "LinkName") subject_types = [x.tostring() for x in pom_objurl.find_elements("SubjectType")] attributes = [s.tostring() for s in pom_objurl.find_elements("Attribute")] pom_provider = pom_objurl["Provider"] provider_name = pom_provider["Name"].tostring() provider_name_abbr = pom_provider["NameAbbr"].tostring() provider_id = pom_provider["Id"].tostring() provider_url = _get_opt_string(pom_provider, "Url") provider_icon_url = _get_opt_string(pom_provider, "IconUrl") provider = Datatypes.Provider(provider_name, provider_name_abbr, provider_id, provider_url, provider_icon_url) objurl = Datatypes.ObjUrl(subject_types, provider, linkname, url, attributes) objurls.append(objurl) idurlsets.append(Datatypes.IdUrlSet(id, objurls)) return Datatypes.LinksLinkSet(dbfrom, idurlsets) parse_prlinks = parse_llinks def parse_link_xml(infile): infile = _check_for_bad_input_stream(infile) xml_parser = UsePOMParser("eLink_020511") return xml_parser.parse_using_dtd(infile)