Sunday, November 23, 2014

Handling requests Asynchronously in Java using Jersey 2.13 and Glassfish 4.1


In recent Jersey release 2.x, there is a new API for async request processing. It includes an excellent example server-async-standalone-webapp.
In this article, I'll show how to run it and the results we can get by leveraging async processing.

First, we need to download Glassfish 4.1 Web profile (https://glassfish.java.net/download.html).
Then start it using command:

./asadmin start-domain

Next, we need to download Jersey examples bundle from https://jersey.java.net/download.html and compile server-async-standalone example by running 'mvn package'.

Then we can deploy it using following command:

./asadmin deploy <path>/jersey/examples/server-async-standalone/webapp/target/server-async-standalone-webapp.war

Now we are ready to login to Glassfish Admin Console (localhost 4848) and see the status of deployed application:



Now we're ready to run a client GUI application, which allows us to run tests against server. Go to 'server-async-standalone/client' and run 'mvn exec:java'. 
The application looks like below. I ran sync vs async test on 100 requests and got response times improved from 20 secs to 1.2 secs. 

Sync

Async


The code is following for sync:

    @GET
    @Path("sync/{echo}")
    public String syncEcho(@PathParam("echo") final String echo) {
        try {
            Thread.sleep(SLEEP_TIME_IN_MILLIS);
        } catch (final InterruptedException ex) {
            throw new ServiceUnavailableException();
        }
        return echo;

    }

And for async:

private static final ExecutorService TASK_EXECUTOR = Executors.newCachedThreadPool();
...
    @GET
    @Path("async/{echo}")
    public void asyncEcho(@PathParam("echo") final String echo, @Suspended final AsyncResponse ar) {
        TASK_EXECUTOR.submit(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(SLEEP_TIME_IN_MILLIS);
                } catch (final InterruptedException ex) {
                    ar.cancel();
                }
                ar.resume(echo);
            }
        });
    }

Note that in Async example, the number of background threads is unbounded (TASK_EXECUTOR is a cached thread pool without limit). So in reality it won't improve the amount of resources (threads) consumed by server in Aync mode. 
In Sync mode, the threads hold http executor. The default number of http worker threads in Glassfish is 5. This explains why we get response time around 20 secs for 100 requests. 

"http-listener-1(4)" daemon prio=6 tid=0x000000000cc7d000 nid=0xcf8 waiting on condition [0x00000000110ad000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at org.glassfish.jersey.examples.server.async.LongRunningEchoResource.syncEcho(LongRunningEchoResource.java:77)
at sun.reflect.GeneratedMethodAccessor437.invoke(Unknown Source)
...

Note that this is just an example, which illustrates how to enable Async processing. In real world scenarios there can be a third party service, which can be invoked using Jersey Client on the server side. If that service slows down, then the whole http thread pool on the server side can be exhausted and prevent other requests from getting processed. 
The suggested solution in this case would be to switch to Async Jersey Client and Async service implementation. 

Async server processing can also help in case where there is a lot of slow clients. For example 20k mobile clients, who send payloads in chunks with 1 second delays. This scenario can easily bring down Sync server side implementations.

Async processing is gaining momentum on server side nowadays. There are multiple technologies, 
which enable it. Among them are NodeJS, Netty, Akka and Play Framework. Jersey Async processing is one of them.  





Saturday, July 5, 2014

Implementing a Distributed Counter Service Using Hazelcast, Jersey 2 and Guice

Modern web services are often required to scale horizontally in order to handle growing load.
Here I present an example distributed Counter service, which uses modern technology stack consisting of Hazelcast, Jersey 2 and Guice.

The sample is based on an excellent example posted by Piersy and can be downloaded from here:
http://github.com/rafalrusin/jersey2-guice-example-with-test

The counter is just a shared value across all participating nodes within a cluster. All nodes can query for data and increase it's value by specified delta. Note that Hazelcast handles synchronization within distributed environment. In order to update the state atomically, we use Hazelcast EntryProcessor.

@Singleton
public class CounterService {

    public int increase(final int delta) {
        return (Integer) map.executeOnKey("counterKey", new CounterEntryProcessor(delta));
    }
}

public class CounterEntryProcessor implements EntryProcessor<String, Integer>, EntryBackupProcessor<String, Integer> {
    private final int delta;

    public CounterEntryProcessor(int delta) {
        this.delta = delta;
    }

    @Override
    public Integer process(Map.Entry<String, Integer> entry) {
        int newValue = entry.getValue() + delta;
        entry.setValue(newValue);
        return newValue;
    }
}

The service is just a Guice Singleton. It has an operation called 'increase', which takes a delta and creates EntryProcessor job for Hazelcast to submit to a node, which owns the value at the time and will update it atomically.
Hazelcast is a library, which implements Java Collections in distributed fashion. It handles replication, cluster membership and distributed locking.

Web service is a simple JAXRS REST Resource. Following code invokes Guice service layer from Resouce implementation:

@RequestScoped
@Path("counter")
public class CounterResource {

    private CounterService service;

    @Inject
    public CounterResource(CounterService service) {
        this.service = service;
    }

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    @Produces(MediaType.TEXT_PLAIN)
    public String increase(String delta) {
        return "" + service.increase(Integer.parseInt(delta));
    }
}

We also have a test case, which invokes the whole stack and does a request to the service. The whole stack is very lightweight. In runs on Jersey with Embedded Tomcat. The test case completes within a few seconds.

In order to run the example, you need to build distribution first (run dist.sh), then start nodes in separate shells and invoke curl POST requests to manipulate counter state. Here's a sample interaction:

./run.sh 8090
./run.sh 8092
...
Jul 05, 2014 10:20:06 AM com.hazelcast.cluster.ClusterService
INFO: [10.0.0.12]:5702 [dev] [3.2.3]

Members [2] {
        Member [10.0.0.12]:5701
        Member [10.0.0.12]:5702 this
}

Jul 05, 2014 10:20:08 AM com.hazelcast.core.LifecycleService
INFO: [10.0.0.12]:5702 [dev] [3.2.3] Address[10.0.0.12]:5702 is STARTED
...

$ curl -d '1' -H 'Content-Type: text/plain;' http://localhost:8092/webapp/api/counter
22
$ curl -d '1' -H 'Content-Type: text/plain;' http://localhost:8090/webapp/api/counter
23
$ curl -d '5' -H 'Content-Type: text/plain;' http://localhost:8090/webapp/api/counter
28

I think that Hazelcast is a very good step forward into distributed computing. For an every day programmer, it's a set of primitives to manipulate in order to implement a distributed system. It is very easy to integrate it into existing project, which could be either JavaEE or standalone app.

Saturday, June 14, 2014

Running hadoop 2.2.0 wordcount example under Windows

Recently I went to Hadoop Summit in San Jose (http://hadoopsummit.org/san-jose/). The conference was quite interesting (excluding a few boring talks). I found out that HortonWorks is trying to push hard Hadoop into enterprise environment with Hadoop 2.x and Yarn. I love this idea, since there seems to be no good standard for distributed containers in Java these days (forget about JEE clustering).

Surprisingly enough, it looks like Hadoop 2.2.0 is supported natively on Windows, which IMO is a great achievement and is a sign of platform getting more mature.
In this article I show how to run a simple WordCount example in Hadoop 2.2.0 under Windows.

First of all, you need to compile hadoop 2.2.0 distribution, which takes a lot of time (and sometimes tweaking pom files). I uploaded a precompiled version here. You need to edit windows environment variables and add path to bin dir and HADOOP_HOME variable pointing to the dir.

Then you need to format node and run example. Following commands do that:

E:\test>hdfs namenode -format

E:\hadoop-2.2.0\sbin>start-dfs

E:\hadoop-2.2.0\sbin>start-yarn
starting yarn daemons

E:\test>hdfs dfs -mkdir /input

E:\test>hdfs dfs -copyFromLocal file1.txt input

E:\test>hdfs dfs -cat /input/words.txt
...

E:\test>yarn jar E:\hadoop-2.2.0\share\hadoop\mapreduce\hadoop-mapreduce-examples-2.2.0.jar wordcount /input/words.txt /output
14/06/14 14:29:47 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
14/06/14 14:29:47 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/06/14 14:29:48 INFO input.FileInputFormat: Total input paths to process : 1
...

E:\test>hdfs dfs -cat  /output/part-r-00000
abc1    1
abc2    3
abc3    1

I hope that Hadoop 2.x gains wide adoption in Enterprise Environment, since the industry needs the next gen standard for distributed apps.