/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.mq.server;
import java.io.File;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import javax.management.MBeanRegistration;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.mq.SpyMessage;
import javax.jms.JMSException;
import org.jboss.mq.pm.CacheStore;
/**
* This class implements a Message cache so that larger amounts of messages
* can be processed without running out of memory. When memory starts getting tight
* it starts moving messages out of memory and into a file so that they can be recovered
* later.
*
* The locks should be obtained in the following order:
* mr, the relevent message we are working with
* lruCache, when maintaining the usage order
*
* @author Hiram Chirino
* @author David Maplesden
* @author Peter Antman
* @author Adrian Brock
* @version $Revision: 1.17.2.3 $
*
* @jmx.mbean name="jboss.mq:service=MessageCache"
* extends="org.jboss.system.ServiceMBean"
*/
public class MessageCache extends ServiceMBeanSupport implements MessageCacheMBean, MBeanRegistration, Runnable
{
// The cached messages are orded in a LRU linked list
private LRUCache lruCache = new LRUCache();
// Provides a Unique ID to MessageHanles
private long messageCounter = 0;
long cacheHits = 0;
long cacheMisses = 0;
CacheStore cacheStore;
ObjectName cacheStoreName;
private Thread referenceSoftner;
private long highMemoryMark = 1024L * 1000 * 16;
private long maxMemoryMark = 1024L * 1000 * 32;
public static final long ONE_MEGABYTE = 1024L * 1000;
int softRefCacheSize = 0;
int totalCacheSize = 0;
// Used to get notified when message are being deleted by GC
ReferenceQueue referenceQueue = new ReferenceQueue();
// The historical number of softenings
long softenedSize = 0;
// Check the soft reference depth
boolean checkSoftReferenceDepth = false;
/**
* The getInstance method
*
* @return a MessageCache value
*
* @jmx.managed-attribute
*/
public MessageCache getInstance()
{
return this;
}
/**
* Adds a message to the cache
*/
public MessageReference add(SpyMessage message) throws javax.jms.JMSException
{
return addInternal(message, null, MessageReference.NOT_STORED);
}
/**
* Adds a message to the cache.
*/
public MessageReference add(SpyMessage message, BasicQueue queue, int stored)
throws javax.jms.JMSException
{
return addInternal(message, queue, stored);
}
/**
* Adds a message to the cache.
*/
public MessageReference addInternal(SpyMessage message, BasicQueue queue, int stored)
throws javax.jms.JMSException
{
boolean trace = log.isTraceEnabled();
// Create the message reference
MessageReference mh = new MessageReference();
mh.init(this, getNextCounter(trace), message, queue);
mh.setStored(stored);
// Add it to the cache
if (trace)
log.trace("add lock aquire message " + mh);
synchronized (mh)
{
if (trace)
log.trace("add lock aquire lruCache " + mh);
synchronized (lruCache)
{
lruCache.addMostRecent(mh);
totalCacheSize++;
if (trace)
log.trace("add locks release" + mh);
}
}
validateSoftReferenceDepth();
return mh;
}
/**
* removes a message from the cache
*/
public void remove(MessageReference mr)
throws JMSException
{
// Remove if not done already
removeInternal(mr, true, true);
}
/**
* removes a message from the cache without returning it to the pool
* used in two phase removes for joint cache/persistence
*/
public void removeDelayed(MessageReference mr)
throws JMSException
{
// Remove from the cache
removeInternal(mr, true, false);
}
/**
* removes a message from the cache but does not clear it,
* used in softening
*/
void soften(MessageReference mr)
throws JMSException
{
// Remove from the cache
removeInternal(mr, false, false);
softRefCacheSize++;
}
/**
* removes a message from the cache
*/
protected void removeInternal(MessageReference mr, boolean clear, boolean reset)
throws JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("remove lock aquire message " + mr + " clear=" + clear + " reset=" + reset);
synchronized (mr)
{
if (mr.stored != MessageReference.REMOVED)
{
if (trace)
log.trace("remove lock aquire lrucache " + mr + " clear= " + clear + " reset=" + reset);
synchronized (lruCache)
{
if (mr.hardReference != null)//If message is not hard, dont do lru stuff
lruCache.remove(mr);
if (clear)
totalCacheSize--;
if (trace)
log.trace("remove lock release lrucache " + mr + " clear= " + clear + " reset=" + reset);
}
if (clear)
mr.clear();//Will remove it from storage if stored
}
if (reset)
mr.reset();//Return to the pool
if (trace)
log.trace("remove lock release message " + mr + " clear= " + clear + " reset=" + reset);
}
}
/**
* The strategy is that we keep the most recently used messages as
* Hard references. Then we make the older ones soft references. Making
* something a soft reference stores it to disk so we need to avoid making
* soft references if we can avoid it. But once it is made a soft reference does
* not mean that it is removed from memory. Depending on how agressive the JVM's
* GC is, it may stay around long enough for it to be used by a client doing a read,
* saving us read from the file system. If memory gets tight the GC will remove
* the soft references. What we want to do is make sure there are at least some
* soft references available so that the GC can reclaim memory.
* @see Runnable#run()
*/
public void run()
{
try
{
while (true)
{
// Get the next soft reference that was canned by the GC
Reference r = null;
if (checkSoftReferenceDepth)
r = referenceQueue.poll();
else
r = referenceQueue.remove(1000);
if (r != null)
{
softRefCacheSize--;
// the GC will free a set of messages together, so we poll them
// all before we validate the soft reference depth.
while ((r = referenceQueue.poll()) != null)
{
softRefCacheSize--;
}
if( log.isTraceEnabled() )
log.trace("soft reference cache size is now: "+softRefCacheSize);
checkSoftReferenceDepth = true;
}
if (checkSoftReferenceDepth)
checkSoftReferenceDepth = validateSoftReferenceDepth();
}
} catch (JMSException e)
{
log.error("Message Cache Thread Stopped: ", e);
} catch (InterruptedException e)
{
// Signal to exit the thread.
}
log.debug("Thread exiting.");
}
/**
* Gets the next message counter
*/
long getNextCounter(boolean trace)
{
// Get the next counter
if (trace)
log.trace("counter lock aquire " + messageCounter);
long result = 0;
synchronized (lruCache)
{
result = messageCounter++;
if(trace)
log.trace("counter lock release " + result);
}
return result;
}
/**
* This method is in charge of determining if it time to convert some
* hard references over to soft references.
*/
boolean validateSoftReferenceDepth()
throws JMSException
{
boolean trace = log.isTraceEnabled();
// Loop until softening is not required or we find a message we can soften
while (getState() == ServiceMBeanSupport.STARTED)
{
MessageReference messageToSoften = null;
if (trace)
log.trace("run lock aquire, validateSoftReferenceDepth");
synchronized (lruCache)
{
// howmany to change over to soft refs
int softenCount = 0;
int hardCount = getHardRefCacheSize();
int softCount = getSoftRefCacheSize();
// Only soften when there is more than one hard reference
// Should probably parameterise this?
if (hardCount < 2)
return false;
long currentMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
if (currentMem > highMemoryMark)
{
// we need to get more aggresive... how much?? lets get
// a mesurment from 0 to 1
float severity = ((float) (currentMem - highMemoryMark)) / (maxMemoryMark - highMemoryMark);
severity = Math.min(severity, 1.0F);
if (trace)
log.trace("Memory usage serverity=" + severity);
int totalMessageInMem = hardCount + softCount;
int howManyShouldBeSoft = (int) ((totalMessageInMem) * severity);
softenCount = howManyShouldBeSoft - softCount;
}
// We can only do so much, somebody else is using all the memory?
if (softenCount > hardCount)
{
log.debug("Soften count " + softenCount + " greater than hard references " + hardCount);
softenCount = hardCount;
}
// Ignore soften counts of 1 since this will happen too often even
// if the serverity is low since it will round up.
if (softenCount > 1)
{
if (trace)
log.trace("Need to soften " + softenCount + " messages");
Node node = lruCache.getLeastRecent();
messageToSoften = (MessageReference) node.data;
}
if (trace)
log.trace("run lock release, validateSoftReferenceDepth");
}
// No softening required
if (messageToSoften == null)
return false;
if (trace)
log.trace("soften lock acquire " + messageToSoften);
synchronized (messageToSoften)
{
// Soften unless it was removed
if (messageToSoften.messageCache != null &&
messageToSoften.stored != MessageReference.REMOVED)
{
messageToSoften.makeSoft();
if (messageToSoften.stored == MessageReference.STORED)
{
softenedSize++;
return true;
}
}
else if (trace)
log.trace("not softening removed message " + messageToSoften);
if (trace)
log.trace("soften lock release " + messageToSoften);
}
}
return false;
}
/**
* This gets called when a MessageReference is de-referenced.
* We will pop it to the top of the RLU
*/
void messageReferenceUsedEvent(MessageReference mh, boolean wasHard, boolean trace)
throws JMSException
{
if (trace)
log.trace("messageReferenceUsedEvent lock aquire message " + mh + " wasHard=" + wasHard);
synchronized (mh)
{
if (trace)
log.trace("messageReferenceUsedEvent lock aquire lrucache " + mh + " wasHard=" + wasHard);
synchronized (lruCache)
{
if (wasHard)
lruCache.makeMostRecent(mh);
else
{
lruCache.addMostRecent(mh);
}
if (trace)
log.trace("messageReferenceUsedEvent locks released " + mh + " wasHard=" + wasHard);
}
}
if (wasHard == false)
checkSoftReferenceDepth = true;
}
//////////////////////////////////////////////////////////////////////////////////
// Perisitence methods used by the MessageReference.
//////////////////////////////////////////////////////////////////////////////////
SpyMessage loadFromStorage(MessageReference mh) throws JMSException
{
return (SpyMessage)cacheStore.loadFromStorage(mh);
}
void saveToStorage(MessageReference mh, SpyMessage message) throws JMSException
{
cacheStore.saveToStorage(mh, message);
}
void removeFromStorage(MessageReference mh) throws JMSException
{
cacheStore.removeFromStorage(mh);
}
//////////////////////////////////////////////////////////////////////////////////
//
// The following section deals the the JMX interface to manage the Cache
//
//////////////////////////////////////////////////////////////////////////////////
/**
* This gets called to start the cache service. Synch. by start
*/
protected void startService() throws Exception
{
cacheStore = (CacheStore)getServer().getAttribute(cacheStoreName, "Instance");
referenceSoftner = new Thread(this, "JBossMQ Cache Reference Softner");
referenceSoftner.setDaemon(true);
referenceSoftner.start();
}
/**
* This gets called to stop the cache service.
*/
protected void stopService()
{
synchronized(lruCache)
{
referenceSoftner.interrupt();
referenceSoftner = null;
}
cacheStore = null;
}
/**
* Gets the hardRefCacheSize
* @return Returns a int
*
* @jmx.managed-attribute
*/
public int getHardRefCacheSize()
{
synchronized(lruCache)
{
return lruCache.size();
}
}
/**
* The getSoftenedSize method
*
* @return a long value
*
* @jmx.managed-attribute
*/
public long getSoftenedSize()
{
return softenedSize;
}
/**
* Gets the softRefCacheSize
* @return Returns a int
*
* @jmx.managed-attribute
*/
public int getSoftRefCacheSize()
{
return softRefCacheSize;
}
/**
* Gets the totalCacheSize
* @return Returns a int
*
* @jmx.managed-attribute
*/
public int getTotalCacheSize()
{
return totalCacheSize;
}
/**
* Gets the cacheMisses
* @return Returns a int
*
* @jmx.managed-attribute
*/
public long getCacheMisses()
{
return cacheMisses;
}
/**
* Gets the cacheHits
* @return Returns a long
*
* @jmx.managed-attribute
*/
public long getCacheHits()
{
return cacheHits;
}
/**
* Gets the highMemoryMark
* @return Returns a long
*
* @jmx.managed-attribute
*/
public long getHighMemoryMark()
{
return highMemoryMark / ONE_MEGABYTE;
}
/**
* Sets the highMemoryMark
* @param highMemoryMark The highMemoryMark to set
*
* @jmx.managed-attribute
*/
public void setHighMemoryMark(long highMemoryMark)
{
this.highMemoryMark = highMemoryMark * ONE_MEGABYTE;
}
/**
* Gets the maxMemoryMark
* @return Returns a long
*
* @jmx.managed-attribute
*/
public long getMaxMemoryMark()
{
return maxMemoryMark / ONE_MEGABYTE;
}
/**
* Gets the CurrentMemoryUsage
* @return Returns a long
*
* @jmx.managed-attribute
*/
public long getCurrentMemoryUsage()
{
return (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / ONE_MEGABYTE;
}
/**
* Sets the maxMemoryMark
* @param maxMemoryMark The maxMemoryMark to set
*
* @jmx.managed-attribute
*/
public void setMaxMemoryMark(long maxMemoryMark)
{
this.maxMemoryMark = maxMemoryMark * ONE_MEGABYTE;
}
/**
* @see ServiceMBeanSupport#getName()
*/
public String getName()
{
return "MessageCache";
}
/**
* @see MessageCacheMBean#setCacheStore(ObjectName)
*
* @jmx.managed-attribute
*/
public void setCacheStore(ObjectName cacheStoreName)
{
this.cacheStoreName = cacheStoreName;
}
/**
* The getCacheStore method
*
* @return an ObjectName value
*
* @jmx.managed-attribute
*/
public ObjectName getCacheStore()
{
return cacheStoreName;
}
/**
* This class implements a simple, efficient LRUCache. It is pretty much a
* cut down version of the code in org.jboss.pool.cache.LeastRecentlyUsedCache
*
*
*/
class LRUCache
{
int currentSize = 0;
//maps objects to their nodes
HashMap map = new HashMap();
Node mostRecent = null;
Node leastRecent = null;
public void addMostRecent(Object o)
{
Node newNode = new Node();
newNode.data = o;
//insert into map
Object oldNode = map.put(o,newNode);
if(oldNode != null)
{
map.put(o,oldNode);
throw new RuntimeException("Can't add object '"+o+"' to LRUCache that is already in cache.");
}
//insert into linked list
if(mostRecent == null)
{
//first element
mostRecent = newNode;
leastRecent = newNode;
}
else
{
newNode.lessRecent = mostRecent;
mostRecent.moreRecent = newNode;
mostRecent = newNode;
}
++currentSize;
}
// Not used anywhere!!
public void addLeastRecent(Object o)
{
Node newNode = new Node();
newNode.data = o;
//insert into map
Object oldNode = map.put(o,newNode);
if(oldNode != null)
{
map.put(o,oldNode);
throw new RuntimeException("Can't add object '"+o+"' to LRUCache that is already in cache.");
}
//insert into linked list
if(leastRecent == null)
{
//first element
mostRecent = newNode;
leastRecent = newNode;
}
else
{
newNode.moreRecent = leastRecent;
leastRecent.lessRecent = newNode;
leastRecent = newNode;
}
++currentSize;
}
public void remove(Object o)
{
//remove from map
Node node = (Node) map.remove(o);
if(node == null)
throw new RuntimeException("Can't remove object '"+o+"' that is not in cache.");
//remove from linked list
Node more = node.moreRecent;
Node less = node.lessRecent;
if(more == null) {//means node is mostRecent
mostRecent = less;
if (mostRecent != null) {
mostRecent.moreRecent = null;//Mark it as beeing at the top of tree
}
} else {
more.lessRecent = less;
}
if(less == null) {//means node is leastRecent
leastRecent = more;
if (leastRecent != null) {
leastRecent.lessRecent = null;//Mark it last in tree
}
} else {
less.moreRecent = more;
}
--currentSize;
}
public void makeMostRecent(Object o)
{
//get node from map
Node node = (Node) map.get(o);
if(node == null)
throw new RuntimeException("Can't make most recent object '"+o+"' that is not in cache.");
//reposition in linked list, first remove
Node more = node.moreRecent;
Node less = node.lessRecent;
if(more == null) //means node is mostRecent
return;
else
more.lessRecent = less;
if(less == null) //means node is leastRecent
leastRecent = more;
else
less.moreRecent = more;
//now add back in at most recent position
node.lessRecent = mostRecent;
node.moreRecent = null;//We are at the top
mostRecent.moreRecent = node;
mostRecent = node;
}
public int size()
{
return currentSize;
}
public Node getMostRecent()
{
return mostRecent;
}
public Node getLeastRecent()
{
return leastRecent;
}
}
class Node
{
Node moreRecent = null;
Node lessRecent = null;
Object data = null;
}
}
/*
vim:tabstop=3:expandtab:ai
*/