#!/usr/bin/env python import sys, os, getopt, ConfigParser, webbrowser, getopt import musicbrainz import wave # should come with python from time import sleep #so query names are shorter q = musicbrainz # SERVER = 'musicbrainz.eorbit.net' SERVER = 'musicbrainz.org' query = """\ %s @SESSID@ @SESSKEY@ cd2trm/0.8.0 """ queryItem = """\ %s %s """ def getCDInfo(mbObj): """ @returns: a list of tuples containing (track id, track length in secs) """ print "Insert CD..." while os.system('cdparanoia -Qq 2>/dev/null'): sleep(0.5) mbObj.SetDepth(1) mbObj.Query(q.MBQ_GetCDTOC) trackLengths = {} first = mbObj.GetResultInt(q.MBE_TOCGetFirstTrack) last = mbObj.GetResultInt(q.MBE_TOCGetLastTrack) for ii in xrange(first + 1, last + 2): sectors = mbObj.GetResultInt1(q.MBE_TOCGetTrackNumSectors, ii) offset = mbObj.GetResultInt1(q.MBE_TOCGetTrackSectorOffset, ii) trackLengths[ii-1] = sectors / 75 cdid = mbObj.GetResultData(q.MBE_TOCGetCDIndexId) print "querying musicbrainz.org to see if this cd ('%s') is on there..." % cdid mbObj.QueryWithArgs(q.MBQ_GetCDInfoFromCDIndexId, [cdid]) ret = [] if mbObj.GetResultInt(q.MBE_GetNumAlbums) == 1: print "Yes and here's the info:" mbObj.Select1(q.MBS_SelectAlbum, 1) album = mbObj.GetResultData(q.MBE_AlbumGetAlbumName) artistId = mbObj.GetIDFromURL(mbObj.GetResultData(q.MBE_AlbumGetAlbumArtistId)) if artistId == q.MBI_VARIOUS_ARTIST_ID: print "\t%s" % album else: artist = mbObj.GetResultData1(q.MBE_AlbumGetArtistName, 1) print "\t%s / %s" % (artist, album) for ii in range(1, mbObj.GetResultInt(q.MBE_AlbumGetNumTracks) + 1): trackURI = mbObj.GetResultData1(q.MBE_AlbumGetTrackId, ii) trackId = mbObj.GetIDFromURL(trackURI) ret.append( (trackId, trackLengths[ii],) ) name = mbObj.GetResultData1(q.MBE_AlbumGetTrackName, ii) track = mbObj.GetOrdinalFromList(q.MBE_AlbumGetTrackList, trackURI) dura = mbObj.GetResultInt1(q.MBE_AlbumGetTrackDuration, ii) mbdura = "%d:%02d" % divmod(int(dura / 1000), 60) ourdura = "%d:%02d" % divmod(trackLengths[ii], 60) if artistId == q.MBI_VARIOUS_ARTIST_ID: artist = mbObj.GetResultData1(q.MBE_AlbumGetArtistName, ii) print "\t%02d - %s - %s (%s) [%s]" % (track, artist, name, mbdura, ourdura) else: print "\t%02d - %s (%s) [%s]" % (track, name, mbdura, ourdura) print print "\thttp://musicbrainz.org/showalbum.html?discid=%s" % cdid print def _checkTrackLengths(mbObj, trackLengths): # check to make sure the length of these tracks matches up with what the # db says they should be. for ii in range(1, mbObj.GetResultInt(q.MBE_AlbumGetNumTracks) + 1): dura = mbObj.GetResultInt1(q.MBE_AlbumGetTrackDuration, ii) diff = dura - (trackLengths[ii] * 1000) if abs(diff) > 3000: return 0 return 1 if not _checkTrackLengths(mbObj, trackLengths): print """\ The time lengths on this CD do not match what the Musicbrainz database say they should be. It could be that this CD is linked to the wrong album. Go to the album's page that this CD is linked to and verify that it is the correct one http://musicbrainz.org/showalbum.html?discid=%s If not you may want to delete this cd's CDIndex id %s from the album. """ % (cdid, cdid) print "Should we go on? [N/y]" choice = sys.stdin.readline().strip() if choice.lower() != 'y': return None return ret else: url = mbObj.GetWebSubmitURL() if url: print "opening web browser to '%s'..." % url webbrowser.open_new(url) print "Import this CD in your webbrowser and then come back here" os.system('eject %s' % device) else: print "Couldn't get cdid... maybe there's no cd in drive?" return None def getSignature(filename, songLength=None): (path, ext) = os.path.splitext(filename) if ext.lower() == '.wav': ff = WavWrapper(filename) else: raise SystemError, "Unsupported audio file." info = ff.info() trm = musicbrainz.trm() trm.SetPCMDataInfo(info.rate, info.channels, 16) if songLength: trm.SetSongLength(songLength) while 1: (buff, bytes, bit) = ff.read() if bytes == 0: break if trm.GenerateSignature(buff): break sig = trm.FinalizeSignature() return sig class WavWrapper: """ Make the wave module act more like ogg.vorbis.VorbisFile """ def __init__(self, filename): self.ff = wave.open(filename, 'r') def read(self): """ These docs are from ogg.vorbis.VorbisFile.read() @returns: Returns a tuple: (x,y,y) where x is a buffer object of the data read, y is the number of bytes read, and z is whatever the bitstream value means (no clue). @returntype: tuple """ buff = self.ff.readframes(4096) return (buff, len(buff), None) def info(self): return AudioInfo(self.ff.getframerate(), self.ff.getnchannels()) class AudioInfo: def __init__(self, rate, channels): self.rate = rate self.channels = channels def usage(): print "%s: generate a MusicBrainz TRM signature" % sys.argv[0] print " --help show this message" def writeDefaultConfig(): cp = ConfigParser.ConfigParser() cp.add_section('cd2trm') cp.set('cd2trm', 'server', SERVER) cp.set('cd2trm', 'username', '') cp.set('cd2trm', 'password', '') ff = open('cd2trm.ini', 'w') cp.write(ff) def auth(mb, cp): mb.Authenticate(cp.get('cd2trm', 'username'), cp.get('cd2trm', 'password')) def main(): device = '/dev/cdrom' optlist, args = getopt.getopt(sys.argv[1:], 'd:', ['device=']) for o, a in optlist: if o in ("-d", "--device"): device = a if not os.path.isfile('cd2trm.ini'): writeDefaultConfig() raise SystemExit, "Fill out your username/password in 'cd2trm.ini'" cp = ConfigParser.ConfigParser() cp.read('cd2trm.ini') mb = musicbrainz.mb() mb.SetServer(cp.get('cd2trm', 'server'), 80) mb.SetDepth(2) mb.SetDevice(device) # mb.SetDebug(1) auth(mb, cp) while 1: trackIds = None while not trackIds: trackIds = getCDInfo(mb) queryItems = [] for ii in xrange(len(trackIds)): ii = ii + 1 print "Ripping track %d of %d..." % (ii, len(trackIds)) tempfilename = 'temp%s-%d.wav' % (device.replace('/', '-'), ii) #retval = os.system('cdparanoia --abort-on-skip --output-wav %d temp%d.wav' % (ii, ii)) retval = os.system('cdparanoia --force-cdrom-device=%s --abort-on-skip --output-wav %d-%d[0:40] %s' % (device, ii, ii, tempfilename)) if os.WTERMSIG(retval) == 2: raise SystemExit if not os.path.isfile(tempfilename): print "skipping because cdparanoia couldn't read the disk perfectly" continue print "Getting TRM for track %d..." % ii sig = getSignature(tempfilename, songLength=trackIds[ii - 1][1]) os.unlink(tempfilename) queryItems.append(queryItem % (trackIds[ii - 1][0], sig)) print "Submiting all track TRM sigs..." try: myquery = query % ''.join(queryItems) mb.Query(myquery) except musicbrainz.MusicBrainzError, err: if str(err) == "Query failed: Session key expired. Please Authenticate again.": auth(mb, cp) mb.Query(myquery) else: raise print "Ejecting cd..." os.system('eject %s' % device) if __name__ == '__main__': main()