The Artima Developer Community
Sponsored Link

Weblogs Forum
Distributing synchronization across threads

3 replies on 1 page. Most recent reply: Dec 9, 2006 11:30 PM by Gregg Wonderly

Welcome Guest
  Sign In

Go back to the topic listing  Back to Topic List Click to reply to this topic  Reply to this Topic Click to search messages in this forum  Search Forum Click for a threaded view of the topic  Threaded View   
Previous Topic   Next Topic
Flat View: This topic has 3 replies on 1 page
Gregg Wonderly

Posts: 317
Nickname: greggwon
Registered: Apr, 2003

Distributing synchronization across threads (View in Weblogs)
Posted: Oct 25, 2006 10:26 PM
Reply to this message Reply
Summary
The Java keyword, synchronized, is the simplest form of concurrency control in Java. With the advent of the work by Doug Lea and notible others on the new java.util.concurrent package, there are more tools. When dealing with highly contested resources, distributing the locking is key.
Advertisement
I recently came up against a concurrency issue in the net.jini.loader.perf.PerferredClassProvider class in the 2.1 version of the Jini Technology Starter Kit (JTSK). During classloader creation by the PreferredClassProvider, a HashMap is used as a lock to make sure a single class loader is created for each codebase associated with classes being resolved.

The section of code that does this classloader creation, has a great big

synchronized( loaderTable) {

...lots of code, including network access...

}
section of code that has a global lock, which really injects a lot of delays when you have multiple network paths active and some of them are really slow compared to the others.

The code in the middle of the synchronized block is two fold. The first bit of code uses the synchronization to lock access to the map while unreferenced classloaders are scavenged from it. This work is going to be pretty quick on the average.

The second bit of code is work the ClassLoader is created. To create the ClassLoader there are several steps particular to that ClassLoader, but which shouldn't lock access to ClassLoaders which already exist.

However, the structure of the code, does just that. It needs to keep out multiple threads that are all trying to access the same codebase ClassLoader potentially.

The java.util.concurrent.ConcurrentHashMap class in JDK1.5 and later provides a really powerful mechanism for resolving certain concurrency issues with minimal locking. Below is a more complete representation of the structure.

synchronized( loaderTable ) {
    /*
     * Take this opportunity to remove from the table entries
     * whose weak references have been cleared.
     */
    Object ref;
    while ((ref = refQueue.poll()) != null) {
	if (ref instanceof LoaderKey) {
	    LoaderKey key = (LoaderKey) ref;
	    loaderTable.remove(key);
	} else if (ref instanceof LoaderEntry) {
	    LoaderEntry entry = (LoaderEntry) ref;
	    if (!entry.removed) {        // ignore entries removed below
		loaderTable.remove(entry.key);
	    }
	}
    }

    /*
     * Look up the codebase URL path and parent class loader pair
     * in the table of RMI class loaders.
     */
    LoaderKey key = new LoaderKey(urls, parent);
    LoaderEntry entry = (LoaderEntry) loaderTable.get(key);

    ClassLoader loader;
    if (entry == null ||
	(loader = (ClassLoader) entry.get()) == null)
    {
	/*
	 * If entry was in table but it's weak reference was cleared,
	 * remove it from the table and mark it as explicitly cleared,
	 * so that new matching entry that we put in the table will
	 * not be erroneously removed when this entry is processed
	 * from the weak reference queue.
	 */
	if (entry != null) {
	    loaderTable.remove(key);
	    entry.removed = true;
	}

	/*
	 * An existing loader with the given URL path and
	 * parent was not found.  Perform the following steps
	 * to obtain an appropriate loader:
	 * 
	 * Search for an ancestor of the parent class loader
	 * whose export urls match the parameter URL path
	 * 
	 * If a matching ancestor could not be found, create a
	 * new class loader instance for the requested
	 * codebase URL path and parent class loader.  The
	 * loader instance is created within an access control
	 * context restricted to the permissions necessary to
	 * load classes from its codebase URL path.
	 */
	loader = findOriginLoader(urls, parent);

	if (loader == null) {
	    loader = createClassLoader(urls, parent, requireDlPerm);
	}

	/*
	 * Finally, create an entry to hold the new loader with a
	 * weak reference and store it in the table with the key.
	 */
	entry = new LoaderEntry(key, loader);
	loaderTable.put(key, entry);
    }
    return loader;
}
To make a long story short, this code can be changed so that it looks like the following, and suddenly the concurrency is confined to locks on the individual ClassLoaders and fast network paths can quickly pass through with little contention.

// Declared somewhere at the class level.
int coreCount = ... 1 ...
int maxCount = ... 10 ...
int secondsAlive = ... 5 ...

// The queue for the class loader creation activities
LinkedBlockingQueue queue = new LinkedBlockingQueue();

// The executor using the queue.
ThreadPoolExecutor exec = new ThreadPoolExecutor( coreCount, maxCount,
	secondsAlive, TimeUnits.SECONDS, queue );

// The map holding the created futures.
ConcurrentHashMap> loaderFutures =
	new ConcurrentHashMap>();


// The replacement code structure...

/*
 * Look up the codebase URL path and parent class loader pair
 * in the table of RMI class loaders.
 */
final LoaderKey key = new LoaderKey(urls, parent);
final ClassLoader curLoader;

/*
 * Clean up the table and establish that we really don't
 * have a usable entry in this synchronized block.
 */
synchronized( loaderTable ) {
    /*
     * Take this opportunity to remove from the table entries
     * whose weak references have been cleared.
     */
    Object ref;
    while ((ref = refQueue.poll()) != null) {
        if (ref instanceof LoaderKey) {
            LoaderKey lkey = (LoaderKey) ref;
            loaderTable.remove(lkey);
            loaderFutures.remove( lkey );
        } else if (ref instanceof LoaderEntry) {
            LoaderEntry entry = (LoaderEntry) ref;
            if (!entry.removed) {    // ignore entries removed below
                loaderTable.remove(entry.key);
                loaderFutures.remove( entry.key );
            }
        }
    }

    /*
     * Get an atomic view of the current class loader's visability.
     * The contents of loaderTable and loaderFutures should mirror
     * each other.  We need a hard reference to the loader if it
     * is still alive, so that we don't lose it before the method
     * exits.  So, we grab that reference here, but we don't use it
     * anywhere as the 'curLoader' value, we just use that reference
     * to keep it alive until the Future.get() call returns the LoaderEntry
     * that references the ClassLoader, and we get the ClassLoader from
     * that LoaderEntry.
     */

    // Get any existing entry for this loader.
    final LoaderEntry lastEntry = (LoaderEntry) loaderTable.get(key);

    // Assign and hold a reference to the loader if present
    // so that GC doesn't take it away.
    if (lastEntry == null ||
            (curLoader = (ClassLoader) lastEntry.get()) == null) {

        // loader gone, remove future so that we will recreate/locate
        // the needed loader.
        loaderFutures.remove( key );

         /*
          * If entry was in table but it's weak reference was cleared,
          * remove it from the table and mark it as explicitly cleared,
          * so that new matching entry that we put in the table will
          * not be erroneously removed when this entry is processed
          * from the weak reference queue.
          */
        loaderTable.remove( key );
        if( lastEntry != null )
            lastEntry.removed = true;
    }
}

// If we get here, we know that we have to generate the class
// loader entry by either finding the existing one, or creating
// a new one.  The winning "putIfAbsent()" call below will designate
// the FutureTask that will be run to do the work.
FutureTaskfut = new FutureTask( 
    new Callable () {
        public LoaderEntry call() {

            /*
             * An existing loader with the given URL path and
             * parent was not found.  Perform the following steps
             * to obtain an appropriate loader:
             * 
             * Search for an ancestor of the parent class loader
             * whose export urls match the parameter URL path
             * 
             * If a matching ancestor could not be found, create a
             * new class loader instance for the requested
             * codebase URL path and parent class loader.  The
             * loader instance is created within an access control
             * context restricted to the permissions necessary to
             * load classes from its codebase URL path.
             */
            ClassLoader loader = findOriginLoader( urls, parent );

            if( loader == null ) {
                loader = createClassLoader(urls, parent, requireDlPerm);
            }

            /*
             * Finally, create an entry to hold the new loader with a
             * weak reference and store it in the table with the key.
             */
            LoaderEntry entry = new LoaderEntry(key, loader);
            synchronized( loaderTable ) {
                loaderTable.put(key, entry);
            }
            return entry;
        }
    }
);

// Try to add this FutureTask to the map.  If its already there, we get
// back the existing entry and can just get() that value.
// Otherwise, we need to queue the future to run, and then get the value
// returned.
FutureTask runfut = loaderFutures.putIfAbsent( key, fut );
try {
    if( runfut != null ) {
        if( logger.isLoggable( Level.FINER ) )
            logger.finer("waiting for loader for: "+key );
        return runfut.get().get();
    } else {
        if( logger.isLoggable( Level.FINER ) )
            logger.finer("starting loader task for: "+key );
        exec.execute( fut );
        if( logger.isLoggable( Level.FINER ) )
            logger.finer("getting loader for: "+key );
        return fut.get().get();
    }
} catch( InterruptedException ex ) {
    logger.log( Level.SEVERE, ex.toString(), ex );
    throw (NoClassDefFoundError)new NoClassDefFoundError(
        ex.toString() ).initCause( ex );
} catch( ExecutionException ex ) {
    logger.log( Level.SEVERE, ex.toString(), ex );
    throw (NoClassDefFoundError)new NoClassDefFoundError( 
        ex.toString() ).initCause( ex );
}


If you study this code, you can see that we've traded the use of the global lock on the class loader creation steps for a distributed set of locks that are keyed by the specific classloader needed. The FutureTask usage provides a nice way to associate the lock with something that provides access to the needed object (the ClassLoader instance associated with the LoaderKey).

The java.util.concurrent package provides a lot of nice tools for managing concurrency and dealing with resource contention. Thanks to Doug Lea and the rest of the group for a great addition to the Java platform!


James Watson

Posts: 2024
Nickname: watson
Registered: Sep, 2005

Re: Distributing synchronization across threads Posted: Oct 26, 2006 9:55 AM
Reply to this message Reply
Even without the new concurrent libraries, wouldn't something like this be an improvement over the original?

Map keys = new HashMap();
Map loaders = new HashMap();
 
 
public Object getKey(Object lookup)
{
  Object key;
 
  synchronized(keys) {
    key = keys.get(lookup);
    if (key == null) {
       key = lookup;
       keys.put(key, key);
    }
  }
 
  return key;
}
 
public Loader getLoader(Object lookup)
{
  Object key = getKey(lookup);
 
  synchronized(key) {
    WeakReference loaderRef;
 
    synchronized(loaders) {
      loaderRef = (WeakReference) loaders.get(key);
    }
 
    Loader loader;
 
    if (loaderRef == null || (loader = (Loader) loaderRef.get()) == null) {
        loader = createLoader(lookup);
        loaderRef = new WeakReference(loader);
 
        synchronized(loaders) { 
          loaders.put(key, loaderRef);
        }
    }
 
    return loader;
  }
}

Cameron Purdy

Posts: 186
Nickname: cpurdy
Registered: Dec, 2004

Re: Distributing synchronization across threads Posted: Oct 26, 2006 10:29 PM
Reply to this message Reply
We saw something scarily / eerily similar a few years ago. I can't disclose names or details, but the system in question would synchronize on a global, then do a synchronous request/response using HTTP to a compute server farm.

It didn't get a lot done.

So they added compute servers.

It still didn't get a lot done.

So they added threads to the server that was dishing off requests to the compute farm.

It still didn't get a lot done.

When we pointed out that no more than one "job" could be processing at a given time, regardless of the number of threads and the number of compute servers, they were a bit embarassed, because in retrospect it was obvious.


Peace,

Cameron Purdy
http://tangosol.com/

Gregg Wonderly

Posts: 317
Nickname: greggwon
Registered: Apr, 2003

Re: Distributing synchronization across threads Posted: Dec 9, 2006 11:30 PM
Reply to this message Reply
> When we pointed out that no more than one "job" could be
> processing at a given time, regardless of the number of
> threads and the number of compute servers, they were a bit
> embarassed, because in retrospect it was obvious.

I missed your response Cameron. These cases exist in multiple places. Your message caused me to think about it some more. It looks like they finally fixed java.sql.DriverManager in JSE-6 so that getConnection() is no longer synchronized. In applications using lots of different databases, if one was down, and something was trying to connect to it, all the other database connections would hang on entry to getConnection() until the TCP connect timed out.

There are lots of things that people do to make their code thread safe, which is not 'application safe' :-)

Hope you've had a great 2006!

Flat View: This topic has 3 replies on 1 page
Topic: Distributing synchronization across threads Previous Topic   Next Topic Topic: Typing like a Functional Programmer


Sponsored Links



Google
  Web Artima.com   

Copyright © 1996-2014 Artima, Inc. All Rights Reserved. - Privacy Policy - Terms of Use - Advertise with Us