/* * JBossMQ, the OpenSource JMS implementation * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.mq.server; import java.lang.ref.ReferenceQueue; import java.lang.ref.SoftReference; import javax.jms.DeliveryMode; import javax.jms.JMSException; import org.jboss.logging.Logger; import org.jboss.mq.MessagePool; import org.jboss.mq.SpyMessage; import org.jboss.mq.SpyMessage.Header; /** * This class holds a reference to an actual Message. Where it is actually * at may vary. The reference it holds may be a: * * * @author Hiram Chirino * @author Peter Antman * @version $Revision: 1.10.2.5 $ */ public class MessageReference implements Comparable { static Logger log = Logger.getLogger(MessageReference.class); /** * The message is not persisted */ public static final int NOT_STORED = 1; /** * The message is persisted */ public static final int STORED = 2; /** * It was a persistent message for a joint * cache store/persistent manager. * This states guards against double * removal from the cache while keeping * error checking for incorrect double removal * No message should be at this state for very long */ public static final int REMOVED = 3; public long referenceId; public SpyMessage hardReference; // These fields are copied over from the messae itself.. // they are used too often to not have them handy. public byte jmsPriority; public long messageId; public int jmsDeliveryMode; public long messageScheduledDelivery; public long messageExpiration; // Data used on the server public BasicQueue queue; public MessageCache messageCache; public SoftReference softReference; public int stored; transient public Object persistData; MessageReference() { } //init and reset methods for use by object pool void init(MessageCache messageCache, long referenceId, SpyMessage message, BasicQueue queue) throws JMSException { this.messageCache = messageCache; this.hardReference = message; this.referenceId = referenceId; this.jmsPriority = (byte) message.getJMSPriority(); this.messageId = message.header.messageId; this.stored = NOT_STORED; this.jmsDeliveryMode = message.header.jmsDeliveryMode; if (message.propertyExists(SpyMessage.PROPERTY_SCHEDULED_DELIVERY)) { this.messageScheduledDelivery = message.getLongProperty(SpyMessage.PROPERTY_SCHEDULED_DELIVERY); } this.messageExpiration = message.header.jmsExpiration; this.queue = queue; this.persistData = null; } void reset() { if (hardReference != null) MessagePool.releaseMessage(hardReference); else { hardReference = (SpyMessage) softReference.get(); if (hardReference != null) MessagePool.releaseMessage(hardReference); } //clear refs so gc can collect unused objects if (softReference != null && softReference.get() != null) messageCache.softRefCacheSize--; this.messageCache = null; this.hardReference = null; this.softReference = null; this.queue = null; this.stored = 0; this.jmsDeliveryMode = 0; this.persistData = null; } public SpyMessage getMessage() throws JMSException { boolean trace = log.isTraceEnabled(); if( trace ) log.trace("getMessage lock aquire " + toString()); SpyMessage result = null; synchronized (this) { if (hardReference == null) { makeHard(); result = hardReference; messageCache.messageReferenceUsedEvent(this, false, trace); } else { result = hardReference; messageCache.cacheHits++; messageCache.messageReferenceUsedEvent(this, true, trace); } if( trace ) log.trace("getMessage lock released " + toString()); return result; } } /** * Returns true if this message reference has expired. */ public boolean isExpired() { if (messageExpiration == 0) return false; long ts = System.currentTimeMillis(); return messageExpiration < ts; } /** * Determines whether the message is persistent in the sense * that it survives a crash */ public boolean isPersistent() { return queue instanceof PersistentQueue && jmsDeliveryMode == DeliveryMode.PERSISTENT; } /** * Determines the persistent for storing the message */ public String getPersistentKey() { return queue.getDescription(); } /** * We could optimize caching by keeping the headers but not the body. * The server will uses the headers more often than the body and the * headers take up much message memory than the body * * For now just return the message. */ public SpyMessage.Header getHeaders() throws javax.jms.JMSException { return getMessage().header; } void clear() throws JMSException { boolean trace = log.isTraceEnabled(); if( trace ) log.trace("clear lock aquire " + toString()); synchronized (this) { if (stored == STORED) messageCache.removeFromStorage(this); stored = MessageReference.REMOVED; if( trace ) log.trace("clear lock relased " + toString()); } } public void invalidate() throws JMSException { boolean trace = log.isTraceEnabled(); if( trace ) log.trace("invalidate lock aquire " + toString()); synchronized (this) { if (stored == STORED) { if (hardReference == null) { makeHard(); messageCache.messageReferenceUsedEvent(this, false, trace); } messageCache.removeFromStorage(this); } if( trace ) log.trace("invalidate lock relased " + toString()); } } void makeSoft() throws JMSException { boolean trace = log.isTraceEnabled(); if( trace ) log.trace("makeSoft lock aquire " + toString()); synchronized (this) { // Attempt to soften a removed message if (stored == REMOVED) throw new JMSException("CACHE ERROR: makeSoft() on a removed message " + this); // It is already soft if (softReference != null) { // Sanity check if (stored == NOT_STORED) throw new JMSException("CACHE ERROR: soft reference to unstored message " + this); return; } if (stored == NOT_STORED) messageCache.saveToStorage(this, hardReference); // HACK: allow the jdbc2 driver to reject saveToStorage for persistent messages // it is just about to persist the message when it gets some cpu if (stored != STORED) { if (trace) log.trace("saveToStorage rejected by cache " + toString()); return; } softReference = new SoftReference(hardReference, messageCache.referenceQueue); // We don't need the hard ref anymore.. messageCache.soften(this); hardReference = null; if( trace ) log.trace("makeSoft lock released " + toString()); } } /** * Called from A PeristenceManager/CacheStore, * to let us know that this message is already stored on disk. */ public void setStored(int stored) { this.stored = stored; } void makeHard() throws JMSException { boolean trace = log.isTraceEnabled(); if( trace ) log.trace("makeHard lock aquire " + toString()); synchronized (this) { // Attempt to harden a removed message if (stored == REMOVED) throw new JMSException("CACHE ERROR: makeHard() on a removed message " + this); // allready hard if (hardReference != null) return; // Get the object via the softref hardReference = (SpyMessage) softReference.get(); // It might have been removed from the cache due to memory constraints if (hardReference == null) { // load it from disk. hardReference = messageCache.loadFromStorage(this); messageCache.cacheMisses++; } else { messageCache.cacheHits++; } // Since we have hard ref, we do not need the soft one. if (softReference != null && softReference.get() != null) messageCache.softRefCacheSize--; softReference = null; if( trace ) log.trace("makeHard lock released " + toString()); } } public boolean equals(Object o) { try { return referenceId == ((MessageReference) o).referenceId; } catch (Throwable e) { return false; } } /** * This method allows message to be order on the server queues * by priority and the order that they came in on. * * @see Comparable#compareTo(Object) */ public int compareTo(Object o) { MessageReference sm = (MessageReference) o; if (jmsPriority > sm.jmsPriority) { return -1; } if (jmsPriority < sm.jmsPriority) { return 1; } return (int) (messageId - sm.messageId); } /** * For debugging */ public String toString() { StringBuffer buffer = new StringBuffer(100); if (messageCache == null) buffer.append(" NOT IN CACHE hashCode=").append(hashCode()); else { buffer.append(referenceId); buffer.append(" msg=").append(messageId); if (hardReference != null) buffer.append(" hard"); if (softReference != null) buffer.append(" soft"); switch (stored) { case NOT_STORED: buffer.append(" NOT_STORED"); break; case STORED: buffer.append(" STORED"); break; case REMOVED: buffer.append(" REMOVED"); break; } switch (jmsDeliveryMode) { case DeliveryMode.NON_PERSISTENT: buffer.append(" NON_PERSISTENT"); break; case DeliveryMode.PERSISTENT: buffer.append(" PERSISTENT"); break; } if (persistData != null) buffer.append(" persistData=").append(persistData); if (queue != null) buffer.append(" queue=").append(queue.getDescription()); else buffer.append(" NO_QUEUE"); buffer.append(" priority=").append(jmsPriority); buffer.append(" hashCode=").append(hashCode()); } return buffer.toString(); } }