If you have been using google collections lately, chances are you will have been using one of the transform static methods available to you.
List<String> source = Lists.newArrayList("a", "b", "c"); ... Iterables.transform(source, Functions.toInt());
By default the above transform will execute your list transformations on the current thread. While this approach works well and is in fact preferable for the majority of cases you may find yourself in a situation where you need to leverage the power of concurrency.
Some examples that might benefit from concurrency:
- Transformations that interact with the network.
- Transformations that read from a database.
- Transformations that require significant computation.
The first two examples provide excellent opportunities for exploiting concurrency. These are both likely to trigger a low level network operation that will cause the current thread to block until a remote process has completed. This is valuable time that could be used to spin up another thread and do something useful while the first one is blocked.
That sounds like , but how do we go about introducing some concurrency to our collection processing? Well, we will need to implement our own version of the transform method that processes elements using a number of threads.
First things first, what does transform actually do? It’s pretty simple and can be boiled down to something like this:
public <F, T> List<T> transform(List<F> src, Function<F, T> fn) { List<T> result = Lists.newArrayList(); for (F element : src) { result.add(fn.apply(element)); } return result; }
Nothing too complicated, grab each element in the list and pair it with the given function. The google version is a lot more elegant in reality but the main point is to show how the transformation is applied.
... fn.apply(element) ...
In order to make our element transformation concurrent we first need to wrap it up in something that can can be offloaded to another thread. For this the Callable interface introduced in Java 5 is ideal.
public <F, T> List<T> transform(List<F> src, Function<F, T> fn) { ... for (F element : src) { Callable<T> transformTask = new Callable<T>() { public T call() throws Exception { return fn.apply(element); } }; } ... }
Once transformations are represented as callable objects we need to ship them off to something that can coordinate a bunch of threads to get them all executed. The ExecutorService is a nice, easy way to get this done.
public <F, T> List<T> transform(List<F> src, Function<F, T> fn) { ExecutorService service = Executors.newFixedThreadPool(3); List<Future<T>> resultFuture = Lists.newArrayList(); for (F element : src) { Callable<T> transformTask = new Callable<T>() { public T call() throws Exception { return fn.apply(element); } }; resultFuture.add(service.submit(transformTask)); } ... }
The ExecutorService returns a Future which allows us to track the progress of our callables. After we have scheduled our transformations we want the current thread to wait until everything has been completed. Future.get provides exactly this behaviour.
public <F, T> List<T> transform(List<F> src, Function<F, T> fn) { ExecutorService service = Executors.newFixedThreadPool(3); List<Future<T>> resultFuture = Lists.newArrayList(); for (F element : src) { Callable<T> transformTask = new Callable<T>() { public T call() throws Exception { return fn.apply(element); } }; resultFuture.add(service.submit(transformTask)); } List<T> result = Lists.newArrayList(); for (Future<T> future : resultFuture) { try { result.add(future.get()); } catch (Exception e) { throw new RuntimeException(e); } } return result; }
And that’s pretty much it, our transformations will be queued up into the executor service and executed in parallel. The current thread will block until the last transformation is complete and then the list of transformed results will be returned as expected.
Now all you need to do is replace the relevant imports of the google transform method with your own concurrent version and your done. If you are using compacted-collections however, things are even easier. You can switch to parallel processing mode with a single fluent method call. It’s as easy as selecting the number of threads you want:
// Standard transformations FluentList.listOf("a", "b", "c").map(Functions.toInt()); // Parallel transformations FluentList.listOf("a", "b", "c").parallel(3).map(Functions.toInt());
This will apply the parallel processing strategy to all subsequent transform and filter operations. Everything works exactly as it would with the default single threaded strategy. This allows you to start out with lightweight single thread approach and easily move up to the big guns if and when performance starts to become a problem.
Check it out, I’m no concurrency expert so any feedback on areas for improvement is welcome.