import os
from Ft.Lib import Uri
from Ft.Xml.Domlette import NonvalidatingReader
from Ft.Rdf.Model import Model
from Ft.Rdf.RdfsHandler import RdfsHandler, RdfsConstraintViolation
from Ft.Rdf.Serializers.Dom import Serializer
from Ft.Rdf.Statement import Statement
DATABASE_NAME = os.environ.get('RDF_TEST_DB', 'test')
RDFS_XML = """
The class of persons
The class of users.
Educational institutions, insurance providers, practices, etc.
The class of moderators.
"""
RDF_XML = """
Gen. Robert E. Lee
Spartacus Fortissimus
sparta-fort
sparta-fort@rblrebels.com
Le Brigand Robespierre
guillot
guillot@rblrebels.com
RBL Rebels
"""
def init(tester):
tester.startTest("Prepare driver's database")
driver = tester.test_data['driver']
if driver.ExistsDb(DATABASE_NAME):
driver.DestroyDb(DATABASE_NAME)
driver.CreateDb(DATABASE_NAME)
tester.testDone()
def test_schemahandler(tester):
uri_base = Uri.OsPathToUri(__file__, attemptAbsolute=True)
db = tester.test_data['driver'].GetDb(DATABASE_NAME)
db.begin()
try:
tester.startTest("Create model with core RDFS statements")
sh = RdfsHandler()
m = Model(db, sh)
total = len(m.statements())
tester.compare(192, total)
tester.testDone()
tester.startTest("Add user RDFS statements")
uri = uri_base + '-INTERNAL-RDFS-XML-STRING'
schemadoc = NonvalidatingReader.parseString(RDFS_XML, uri)
serializer = Serializer()
m.suspendSchema()
serializer.deserialize(m, schemadoc, scope='http://schema.spam.com/')
m.resumeSchema()
total = len(m.statements())
tester.compare(220, total)
tester.testDone()
tester.startTest("Validate all statements")
m.checkConsistency()
tester.testDone()
tester.startTest("Add & validate user RDF statements")
uri = uri_base + '-INTERNAL-RDF-XML-STRING'
datadoc = NonvalidatingReader.parseString(RDF_XML, uri)
serializer.deserialize(m, datadoc, 'http://spam.com/data')
total = len(m.statements())
tester.compare(237, total)
tester.testDone()
tester.startTest("Misc validation tests")
tester.compare(True, sh.isInstance('http://rblrebels.com/~robelee', 'http://schema.spam.com/#Person'))
tester.compare(True, sh.isInstance('http://rblrebels.com/~spartacus', 'http://schema.spam.com/#Person'))
tester.compare(True, sh.isSubClass('http://schema.spam.com/#Moderator', 'http://schema.spam.com/#Person'))
tester.testDone()
tester.startTest("Add invalid statement")
s = Statement('http://rblrebels.com/~robelee', 'http://schema.spam.com/#represents', 'ILLEGAL')
tester.testException(m.add, (s,), RdfsConstraintViolation)
tester.testDone()
except:
db.rollback()
raise
db.rollback()
tester.groupDone()
def Test(tester):
tester.startGroup('RDF Schema Handler')
init(tester)
test_schemahandler(tester)
return