Discover the Elegant Simplicity of JSR 166

Discover the Elegant Simplicity of JSR 166

By Nathan Tippy, OCI Senior Software Engineer

September 2006


Introduction

Concurrent or multi-threaded software is not a new technology but its importance has been accelerating. This is primarily due to the low cost of multi-core CPUs that are becoming common in even the most basic machines. This trend is expected to continue as hardware manufacturers, following Moore's law, cram greater numbers of cores onto a single die at ever lower costs.

In the past older applications have taken advantage of faster hardware for improved performance, thus extending their functional life. In the future this will be less common because CPUs are not expected to be significantly faster; instead they will be doing more in parallel.

There is no better feeling for a developer than knowing that his/her software has withstood the test of time. This is rare, of course, because technology and methodologies don't remain static but continue to progress with the relentless march of time.

It is not advisable to rewrite everything to make use of concurrent algorithms. However, finding places where it makes sense to add multi-threading and "use the right tool for the job" may greatly increase application longevity.

It would require a short course to do justice to the topic of concurrent Java development so the remainder of this brief will focus on demonstrating a few of the features with the hope that the reader will continue further study on his/her own.

Simple Demonstration of New Concurrency Package

The new concurrency package is not intended to replace the traditional use of synchronize, wait and notify etc. but it does give the developer a whole new set of design choices. Most of the new classes rely on CAS (Compare and Swap) operations, instead of the traditional synchronized block, for their excellent performance and ease of use.

CAS works just as it sounds; First it checks to ensure that the existing value is what it expects it to be and then replaces it with the new value unless it has already been changed by another thread. This sounds rather simple but it can only be done as an operation by the CPU because it must be atomic. Modern CPUs support this mechanism which has now been exposed in the JVM and used heavily in the java.util.concurrent package.

Example 1 Part 1

Using the AtomicLong class it's very easy to put together a simple thread safe sequence number generator. Note that no synchronization is needed because CAS is relied upon to ensure thread safety.

  1. package com.ociweb.concurrency.atomic.example1;
  2. import java.util.concurrent.atomic.*;
  3.  
  4. public class SequenceNumberGenerator {
  5. private AtomicLong number = new AtomicLong(0);
  6.  
  7. public long nextSequenceNumber() {
  8. return number.getAndIncrement();
  9. }
  10. }

To see CAS at work, take a look at the source code for the getAndIncrement() method within the AtomicLong class. You will find a loop very similar to the one below. Any thread attempting to set a new value for the AtomicLong is required to pass in what they perceive to be the current value. If this snapshot value matches what the real current value is then the update is done but if they do not match then a new snapshot value is retrieved and the update is attempted again. When there are threads modifying the value simultaneously many attempts may be required before the value is changed successfully. This characteristic of CAS makes is less desirable under conditions where there are large numbers of threads contending on relatively few resources but in most applications this is not the case so CAS works well.

  1. while (true) {
  2. long current = get();
  3. long next = current + 1;
  4. if (compareAndSet(current, next))
  5. return current;
  6. }

Notice that when using CAS there will be less predictability as more threads attempt to modify the same object. This is because each thread grabs the value, increments it and then attempts to set the new value. Any given thread is only allowed to set the new value if its current value still matches the original value. If other threads are doing the same thing at the same time this becomes less likely. For most cases this does not cause any performance issues unless a large number of threads is used.

Example 1 Part 2

A simple Runnable that asks for the next number until it reaches a predefined limit will be used to demonstrate the number generator. It's never a good practice to kill a thread. InsteadRunnables should be designed to exit on their own after receiving some sort of predefined message or flag. Each thread is responsible for shutting down on there own when the work is complete. This is made possible by passing in the sequenceLimit to each Runnable so each thread can compare it with the current sequence number to determine when all the numbers have been generated.

  1. package com.ociweb.concurrency.atomic.example1;
  2.  
  3. public class SequenceRunnable implements Runnable {
  4. private final int id;
  5. private final SequenceNumberGenerator generator;
  6. private final long sequenceLimit;
  7.  
  8. protected SequenceRunnable(int id,
  9. SequenceNumberGenerator generator,
  10. long sequenceLimit) {
  11. this.id = id;
  12. this.generator = generator;
  13. this.sequenceLimit = sequenceLimit;
  14. }
  15.  
  16. public void run(){
  17.  
  18. boolean workToDo = true;
  19.  
  20. while (workToDo) {
  21. long sequenceNumber =
  22. generator.nextSequenceNumber();
  23.  
  24. workToDo = (sequenceNumber<sequenceLimit);
  25.  
  26. System.out.println("Thread:"+id+
  27. "SequenceNumber:"+sequenceNumber);
  28.  
  29. }
  30. }
  31. }

Example 1 Part 3

For this example the newFixedThreadPool builder method on the Executor class is used to create an Executor instance which will maintain a fixed number of threads. The Executor class supports many thread creation strategies and can greatly simplify the management of threads. Note that the example also makes use of the availableProcessors() method which returns how many processors are available for use by the JVM at that moment. This method may return 1 if the example is run on a single processor machine. If this is the case the threadCount is changed to 2 in order to perform a more meaningful demonstration.

  1. package com.ociweb.concurrency.atomic.example1;
  2.  
  3. import java.util.concurrent.Executor;
  4. import java.util.concurrent.Executors;
  5.  
  6. public class SequenceNumberDemo {
  7.  
  8. private static final int SEQUENCE_LIMIT = 30;
  9. private final SequenceNumberGenerator numberGenerator;
  10. private final int threadCount;
  11. private final long sequenceLimit;
  12. private final Executor executor;
  13.  
  14. public SequenceNumberDemo(int threads,
  15. long sequenceLimit) {
  16.  
  17. this.threadCount = threads;
  18. this.sequenceLimit = sequenceLimit;
  19. this.numberGenerator = new SequenceNumberGenerator();
  20. this.executor = Executors.newFixedThreadPool(threads);
  21. };
  22.  
  23. public static void main(String [] args) {
  24. int threadCount=
  25. Runtime.getRuntime().availableProcessors();
  26.  
  27. if (threadCount<2) {
  28. System.out.println("get a real machine");
  29. threadCount=2;
  30. }
  31.  
  32. SequenceNumberDemo app =
  33. new SequenceNumberDemo(threadCount,
  34. SEQUENCE_LIMIT);
  35. app.generateSequences();
  36. }
  37.  
  38. private void generateSequences() {
  39.  
  40. for (int id=0; id<threadCount; id++) {
  41. Runnable runMeSoon = new SequenceRunnable(id,
  42. numberGenerator,
  43. sequenceLimit);
  44. executor.execute(runMeSoon);
  45. }
  46. }
  47. }

How to verify the results of our SequenceNumberGenerator

After implementing a sequence number generator example how can one go about verifying that it is really doing what is expected? Each sequence number must be verified to ensure it is only used once and if an error is detected there must be a clean way to exit quickly.

Example 2 Part 1

Adding a simple jumpTo method will let the application exit quickly by passing in the sequenceLimit. The example will be simpler because no extra logic will be needed to handle the early exit condition. When an error is detected by a thread it will use this new method to set the sequenceNumber to the last number it expects to generate. Once this is done it will be detected by all the other threads and they will end on their own because it will appear to them that all the numbers have been generated.

  1. package com.ociweb.concurrency.atomic.example2;
  2. import java.util.concurrent.atomic.*;
  3.  
  4. public class SequenceNumberGenerator {
  5. private AtomicLong number = new AtomicLong(0);
  6.  
  7. public long nextSequenceNumber() {
  8. return number.getAndIncrement();
  9. }
  10.  
  11. public void jumpToSequenceNumber(long newValue) {
  12. number.set(newValue);
  13. }
  14. }

Example 2 Part 2

In order to put the number generator to a more strenuous test a countDownLatch was added. The countDownLatch will force every thread to wait until the count goes down to zero. Once all of the threads have been initialized they will all be released to retrieve sequence numbers at the same time.

Using a List of the same length as the sequenceLimit provides an easy way to ensure no number is used twice. The List's set method will return the previous value for the index in question. If the value returned by the set call is not null then the sequence number has already been used which indicates that something has gone wrong.

If any errors are detected the generators position will be set to the sequenceLimit with the new jumpToSequenceNumber method. In production applications it may be inappropriate to change the sequence number in this way so one may consider adding a separate isDone boolean in that case. Regardless of how this gets implemented it's important that the threads exit on their own. Never call stop() on a running thread and don't interrupt one unless it's interrupt policy is well understood.

  1. package com.ociweb.concurrency.atomic.example2;
  2.  
  3. import java.util.List;
  4. import java.util.concurrent.CountDownLatch;
  5.  
  6. public class SequenceRunnable implements Runnable {
  7. private final int id;
  8. private final SequenceNumberGenerator generator;
  9. private final long sequenceLimit;
  10. private final CountDownLatch latch;
  11. private final List<Integer> sequenceArray;
  12.  
  13. protected SequenceRunnable(int id,
  14. SequenceNumberGenerator generator,
  15. long sequenceLimit,
  16. CountDownLatch latch,
  17. List<Integer> sequenceArray) {
  18. this.id = id;
  19. this.generator = generator;
  20. this.sequenceLimit = sequenceLimit;
  21. this.latch = latch;
  22. this.sequenceArray = sequenceArray;
  23. }
  24.  
  25. public void run(){
  26.  
  27. try {
  28. latch.countDown();
  29. latch.await();
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32.  
  33. //be sure to stop the other threads
  34. generator.jumpToSequenceNumber(sequenceLimit);
  35.  
  36. return;
  37. }
  38.  
  39. long sequenceNumber = generator.nextSequenceNumber();
  40. for(;sequenceNumber<sequenceLimit;
  41. sequenceNumber = generator.nextSequenceNumber()) {
  42.  
  43. //check if number is valid
  44. if (null!=sequenceArray.set((int)sequenceNumber,id)) {
  45. System.out.println("Thread:"+id+
  46. " found sequenceNumber "+sequenceNumber+
  47. " was already used.");
  48.  
  49. //be sure to stop all the other threads
  50. generator.jumpToSequenceNumber(sequenceLimit);
  51. }
  52.  
  53. //do work here
  54. System.out.println("Thread:"+id+
  55. " SequenceNumber:"+sequenceNumber);
  56.  
  57. //uncomment this 'train wreck' to test that the threads exit cleanly
  58.  
  59. //if ((id==sequenceArray.get(0))&&(sequenceNumber>5))
  60. //generator.jumpToSequenceNumber(5);
  61.  
  62. }
  63. }
  64. }

Example 2 Part 3

Added latch and sequenceArray to support the new functionality of the SequenceRunnable class.

  1. package com.ociweb.concurrency.atomic.example2;
  2.  
  3. import java.util.Arrays;
  4. import java.util.List;
  5. import java.util.concurrent.CountDownLatch;
  6. import java.util.concurrent.Executor;
  7. import java.util.concurrent.Executors;
  8.  
  9. public class SequenceNumberDemo {
  10.  
  11. private static final int SEQUENCE_LIMIT = 30;
  12.  
  13. private final SequenceNumberGenerator numberGenerator;
  14. private final int threadCount;
  15. private final long sequenceLimit;
  16. private final Executor executor;
  17.  
  18. private final CountDownLatch latch;
  19. private final List<Integer> sequenceArray;
  20.  
  21.  
  22. public SequenceNumberDemo(int threads, long sequenceLimit) {
  23.  
  24. this.threadCount = threads;
  25. this.sequenceLimit = sequenceLimit;
  26. this.numberGenerator = new SequenceNumberGenerator();
  27. this.executor = Executors.newFixedThreadPool(threads);
  28.  
  29. this.latch = new CountDownLatch(threads);
  30. this.sequenceArray = Arrays.asList(new Integer[(int)sequenceLimit]);
  31.  
  32. };
  33.  
  34. public static void main(String [] args) {
  35. int threadCount=Runtime.getRuntime().availableProcessors();
  36. if (threadCount<2) {
  37. System.out.println("get a real machine");
  38. threadCount=2;
  39. }
  40.  
  41. SequenceNumberDemo app =
  42. new SequenceNumberDemo(threadCount,SEQUENCE_LIMIT);
  43. app.generateSequences();
  44. }
  45.  
  46. private void generateSequences() {
  47.  
  48. for (int id=0; id<threadCount; id++) {
  49. Runnable runMeSoon = new SequenceRunnable(id,
  50. numberGenerator,
  51. sequenceLimit,
  52. latch,
  53. sequenceArray);
  54. executor.execute(runMeSoon);
  55. }
  56.  
  57. }
  58. }

How to monitor our SequenceNumberGenerator

Some better monitoring of the sequence numbers and threads can be done now that all the number/thread pairs are being recorded. Using the sequenceList data the text output is enhanced to show which sequence numbers were retrieved by each thread.

Example 3 Part 1

Running the previous examples it may become apparent that one thread will often monopolize all the time and it gets the majority of the sequence numbers. To help limit this problem the CountDownLatch has been removed and replaced with a CyclicBarrier. The CyclicBarrierdoes the same thing as a CountDownLatch but it can be used multiple times. This can come in very handy when threads need to be controlled more tightly such as in particle or life simulations.

Notice that this class gets an iterator from the sequenceArray and walks down it without synchronization while other threads may be modifying the list. This is not a safe assumption for Lists but it will work in this example because the CopyOnWriteArrayList class is used in the SequenceNumberDemo class. It is left to the reader to put this to the test by passing in other List implementations.

  1. package com.ociweb.concurrency.atomic.example3;
  2.  
  3. import java.util.Iterator;
  4. import java.util.List;
  5. import java.util.concurrent.BrokenBarrierException;
  6. import java.util.concurrent.CountDownLatch;
  7. import java.util.concurrent.CyclicBarrier;
  8.  
  9. public class SequenceRunnable implements Runnable {
  10. private static final int WAIT_ON_OTHERS_COUNT = 10;
  11. private final int id;
  12. private final SequenceNumberGenerator generator;
  13. private final long sequenceLimit;
  14. private final CyclicBarrier latch;
  15. private final List<Integer> sequenceArray;
  16.  
  17. protected SequenceRunnable(int id,
  18. SequenceNumberGenerator generator,
  19. long sequenceLimit,
  20. CyclicBarrier latch,
  21. List<Integer> sequenceArray) {
  22. this.id = id;
  23. this.generator = generator;
  24. this.sequenceLimit = sequenceLimit;
  25. this.latch = latch;
  26. this.sequenceArray = sequenceArray;
  27. }
  28.  
  29. public void run(){
  30.  
  31. try {
  32. latch.await();
  33. } catch (Exception e) {
  34.  
  35. e.printStackTrace();
  36. //be sure to stop the other threads
  37. generator.jumpToSequenceNumber(sequenceLimit);
  38. return;
  39. }
  40.  
  41. int count=0;
  42. long sequenceNumber = generator.nextSequenceNumber();
  43. for(;sequenceNumber<sequenceLimit;
  44. sequenceNumber = generator.nextSequenceNumber()) {
  45.  
  46. //check if number is valid
  47. if (null!=sequenceArray.set((int)sequenceNumber,id)) {
  48. System.out.println("Thread:"+id+
  49. " found sequenceNumber "+sequenceNumber+
  50. " was already used.");
  51.  
  52. //be sure to stop all the other threads
  53. generator.jumpToSequenceNumber(sequenceLimit);
  54. }
  55.  
  56. //System.out.println("Thread:"+id+
  57. // " SequenceNumber:"+sequenceNumber);
  58.  
  59. //print sequence list without sync!
  60. Iterator<Integer> verificationIterator =
  61. sequenceArray.iterator();
  62.  
  63. StringBuffer output = new StringBuffer();
  64. output.append("id:").append(id).append(' ');
  65. int pos=0;
  66. while (verificationIterator.hasNext()) {
  67. Integer currentId = verificationIterator.next();
  68.  
  69. //only add the sequence numbers that belong to me.
  70. if ((null != currentId) && (id == currentId)){
  71. output.append(pos).append(' ');
  72. }
  73.  
  74. pos++;
  75. }
  76. System.out.println(output);
  77.  
  78. //wait if you are getting ahead of the others
  79. if (++count%WAIT_ON_OTHERS_COUNT==0){
  80. try {
  81. latch.await();
  82.  
  83. } catch (Exception e) {
  84.  
  85. e.printStackTrace();
  86. //be sure to stop the other threads
  87. generator.jumpToSequenceNumber(sequenceLimit);
  88. return;
  89.  
  90. }
  91. }
  92. }
  93. }
  94. }

Example 3 Part 2

The latch has been changed to a CyclicBarrier and the sequenceArray is now a CopyOnWriteArrayList. The CyclicBarrier has already been discussed so no need to repeat it here. The CopyOnWriteArrayList does what the name implies. Every write or update causes a new copy of the internal array to be created. This enables any array that backs up an iterator to remain unchanged until the iterator completes. The consequence of this is that an iterator could have old data but this is not a problem for this particular example.

A THREAD_COUNT_MULTIPLIER constant has also been added so the thread count can be modified for better demos.

  1. package com.ociweb.concurrency.atomic.example3;
  2.  
  3. import java.util.List;
  4. import java.util.concurrent.BrokenBarrierException;
  5. import java.util.concurrent.ConcurrentHashMap;
  6. import java.util.concurrent.CopyOnWriteArrayList;
  7. import java.util.concurrent.CountDownLatch;
  8. import java.util.concurrent.CyclicBarrier;
  9. import java.util.concurrent.Executor;
  10. import java.util.concurrent.Executors;
  11.  
  12. public class SequenceNumberDemo {
  13.  
  14. private static final int THREAD_COUNT_MULTIPLIER = 2;
  15. private static final int SEQUENCE_LIMIT = 30;
  16.  
  17. private final SequenceNumberGenerator numberGenerator;
  18. private final int threadCount;
  19. private final long sequenceLimit;
  20. private final Executor executor;
  21.  
  22. private final CyclicBarrier latch;
  23. private final List<Integer> sequenceArray;
  24.  
  25.  
  26. public SequenceNumberDemo(int threads, long sequenceLimit) {
  27.  
  28. this.threadCount = threads;
  29. this.sequenceLimit = sequenceLimit;
  30. this.numberGenerator = new SequenceNumberGenerator();
  31. this.executor = Executors.newFixedThreadPool(threads);
  32.  
  33. this.latch = new CyclicBarrier(threads);
  34.  
  35. this.sequenceArray =
  36. new CopyOnWriteArrayList<Integer>(
  37. new Integer[(int)sequenceLimit]);
  38. };
  39.  
  40. public static void main(String [] args) {
  41. int threadCount=Runtime.getRuntime().availableProcessors();
  42. if (threadCount<2) {
  43. System.out.println("get a real machine");
  44. threadCount=2;
  45. }
  46.  
  47. SequenceNumberDemo app =
  48. new SequenceNumberDemo(threadCount*
  49. THREAD_COUNT_MULTIPLIER,SEQUENCE_LIMIT);
  50.  
  51. app.generateSequences();
  52. }
  53.  
  54. private void generateSequences() {
  55.  
  56. for (int id=0; id<threadCount; id++) {
  57. Runnable runMeSoon = new SequenceRunnable(id,
  58. numberGenerator,
  59. sequenceLimit,
  60. latch,
  61. sequenceArray);
  62. executor.execute(runMeSoon);
  63. }
  64. }
  65. }

Conclusion

As a further exercise it is left to the reader to add a BlockingQueue to accept the rapidly generated messages before they are sent to the output stream. This is great pattern to follow whenever there is a source producing more work than one consumer can handle.

There are many more features of the concurrency package that are worth investigation. They are very powerful and can often prevent one from reinventing the wheel.

References

secret