Friday, January 15, 2010

RoadMap to SaaS

I have observed a pattern from multiple enterprises moving from a traditional web app to a SaaS. Trying to capture this pattern and a number of lessons learned. I use a J2EE Web App architecture to illustrate but the same principles apply to other technology platforms.

Stage 1: Some working Web App

At the very beginning, we have an web application that works well. We analyze the function of the web application and group the implementation classes accordingly

Stage 2: Separate functionality across processes

We analyze the functions and partition them into different processes (JVM). The partition needs to be coarse-grain and each process will communicate with each other via a service interface exposed by a Facade class. The service interface can be any remote object invocation interface or XML over HTTP. Restful web service is the de-facto way for the service interface.

Stage 3: Move different processes to different machines

To scale out beyond a single server's capacity, we move the process to a separate machine. Notice that the machine can be a physical machine, or a virtual machine running on top of hypervisor.

Stage 4: Build service pools

If the service itself is stateless, we can easily scale out the service capacity by putting multiple machines (running the same service) into a server pool. A network load balancer will be used to spread the workload evenly to the member servers.

When the workload increase, more machines can be added to the pool to boost up the overall capacity of the service. Elastic scalability provided by Cloud computing provider make growing and shrinking the pool even more rapid, and can hence dynamic workload more effectively.

Stage 5: Scale the data tier by partitioning

After we scale out the processing tier, we found the data tier becomes the bottleneck. So we also need to distribute the data access workload by partition the database according to the data key. Here are some classical techniques how we can build a distributed database.

Stage 6: Add Cache to reduce server load

If the application is has a high read/write ratio, or has some tolerance of data staleness, we can add a cache layer to reduce the hit of the actual services. Clients will check the cache before sending the request to the service.

We need to make sure the cached items to remain fresh. There are various schemes to achieve this. e.g. cached items can be expired after certain timeout, or an explicit invalidation request can be make to specific cached items when the corresponding backend data has been changed.

We can use local cache (reside in the same machine as the client) or a distributed cache engine such as Memcached or Oracle Coherence.

Stage 7: Consider which service to expose to public

At this point, we want to expose some of the services to the public either because this can bring revenue to our company or can facilitate a better B2B integration with our business partners. There are a lot of considerations to decide what to be expose, such as security consideration, scalability, service level agreement, utilization tracking ... etc.

Stage 8: Deploy an ingress service gateway

Once we decide what services to be exposed, we decide to use a specialized ingress service gateway to handle the concern that we outline above. Most of the XML service gateway is equipped with message validation, security checking, message transformation, routing logic.

Stage 9: Deploy an egress service gateway

We not only providing service to the public, but may also consume other public services. In this case we deploy an egress service gateway which can help to lookup the service provider endpoints, extract the service policy of the public service providers ... etc.

One important function of the egress service gateway is to manage my dependencies to external service providers. It typically keeps a list of equivalent service providers together with their availability and response time, and routing my request to the one according to my selection criteria (e.g. cheapest, most-reliable, lowest-latency ... etc.)

Stage 10: Implement service version control

My service will evolve after exposing to the public. In the ideal case, only the service implementation change but not the service interface so there is no need to change the client code.

But mostly likely it is not that ideal. There are chances that I need to change the service interface or the message format. In this case, I may need to run multiple versions of the services simultaneously to make sure I am not breaking an existing clients. This means I need the ingress service gateway to be intelligent about routing the client request to the right version of the service implementation.

A typical way is to maintain a matrix of version and keep track of their compatibilities. For example, we can use a release convention such that all minor releases is required to be backward compatible but major release is not required for that. Having this compatibility matrix information, the ingress gateway can determine the client version from its incoming request and route it to the server which has the latest compatible version.

Stage 11: Outsource infrastructure to public Cloud provider

Purchase necessary hardware equipment and maintaining them can be very costly, especially when there are idle time in the usage of computing resources. Idle time is usually unavoidable because we need to budget the resource at the peak workload scenarios so there are idle time at non-peak hours.

For a more efficient use of computing resources, we can consider some public cloud provider such as Amazon AWS or Microsoft Azure.

But it is important that running an application in the cloud may need to redesign the application to cope with some unique characteristics of the cloud environment, such as how to deal with high network latency and bandwidth cost, as well as how to design application to live with an eventually consistent DB.

Tuesday, January 12, 2010

Notes on Oracle Coherence

Oracle Coherence is a distributed cache that functionally comparable to Memcached. On top of the basic cache API function, it has some additional capabilities that is attractive for building large scale enterprise applications.

The API is based on the Java Map (Hashtable) Interface, which provides a key/value store semantics where the value can be any Java Serializable object. Coherence allows data stored in multiple caches identified by a unique name (which they called a "named cache").

Below code examples are extracted from the great presentation from Brian Oliver of Oracle

The common usage pattern is to first locate a cache by its name, and then act on the cache.

Basic cache function (Map, JCache)
  • Get data by key
  • Update data by key
  • Remove data by key
NamedCache nc = CacheFactory.getCache("mine");

Object previous = nc.put("key", "hello world");

Object current = nc.get("key");

int size = nc.size();

Object value = nc.remove("key");

Set keys = nc.keySet();

Set entries = nc.entrySet();

boolean exists = nc.containsKey("key");

Cache Modification Event Listener (ObservableMap)

You can register an event listener on a cache to catch certain change events happen within the cache.
  • New cache item is inserted
  • Existing cache item is deleted
  • Existing cache item is updated
NamedCache nc = CacheFactory.getCache("stocks");

nc.addMapListener(new MapListener() {
public void onInsert(MapEvent mapEvent) {

public void onUpdate(MapEvent mapEvent) {

public void onDelete(MapEvent mapEvent) {

View of Filtered Cache (QueryMap)

You can also define a "view" by providing a "filter" which is basically a boolean function. Only items that is evaluated to be true by this function will be visible in this view.

NamedCache nc = CacheFactory.getCache("people");

Set keys =
nc.keySet(new LikeFilter("getLastName", "%Stone%"));

Set entries =
nc.entrySet( new EqualsFilter("getAge", 35));

Continuous Query Support (ContinuousQueryCache)

The view can also be used as a "continuous query". All new coming data that fulfilled the filter criteria will be included automatically in the view.

NamedCache nc = CacheFactory.getCache("stocks");

NamedCache expensiveItems =
new ContinuousQueryCache(nc,
new GreaterThan("getPrice", 1000));

Parallel Query Support (InvocableMap)

We can also spread a query execution and partial aggregation across all nodes and have them execute in parallel, followed by the final aggregation.
NamedCache nc = CacheFactory.getCache("stocks");

Double total =
new DoubleSum("getQuantity"));

Set symbols =
(Set)nc.aggregate(new EqualsFilter("getOwner", "Larry"),
new DistinctValue("getSymbol"));

Parallel Execution Processing Support (InvocableMap)

We can ship a piece of processing logic to all nodes which will execute the processing in parallel
NamedCache nc = CacheFactory.getCache("stocks");

nc.invokeAll(new EqualsFilter("getSymbol", "ORCL"),
new StockSplitProcessor());

class StockSplitProcessor extends AbstractProcessor {
Object process(Entry entry) {
Stock stock = (Stock)entry.getValue();
stock.quantity *= 2;
return null;

Implementation Architecture

Oracle Coherence runs on a cluster of identical server machines connected via a network. Within each server, there are multiple layers of software provide a unified data storage and processing abstraction over a distributed environment.

Smart Data Proxy

Application typically runs within a node of the cluster as well. The cache interface is implemented by a set of smart data proxy which knows the location of master (primary) and slave (backup) copy of data based on its key.

Read through with 2 level cache

When the client "read" data from the proxy, it first try to find the data in a local cache (also called the "near cache" within the same machine). If it is not found, the smart proxy will then locate the distributed cache for the corresponding copy (also called the L2 cache). Since this is a read, either a master or a slave copy is fine. If the smart proxy wouldn't find data from the distributed cache, it will lookup data from the backend DB. The return data will then propagate back to the client and the cache will be populated.

Master/Slave data partitioning

Updating data (insert, update, delete) is done in the reverse way. Under the master/slave architecture, all updates will go to the corresponding master node that owns that piece of data. Coherence support two modes of update; "Write through" and "Write behind". "Write through" will update the DB backend immediately after updating the master copy, but before updating the slave copy, and therefore keep the DB always up to date. "Write behind" will update the slave copy and then the DB in an asynchronous fashion. Data lost is possible in "write behind" mode, which has a higher throughput because multiple write can be merge in a single write, resulting in a fewer number of writes.

Moving processing logic towards data

While extracting data from the cache to the application is a typical way of processing data, it is not very scalable when large volume of data is required to be processed. Instead of shipping the data to the processing logic, a much more efficient way is to ship the processing logic to where the data is residing. This is exactly why Oracle Coherence provide an invocableMap interface where the client can provide a "processor" class that get shipped to every node where processing can be conducted with local data. Moving code towards the data dstributed across many nodes also enable parallel processing because now every node can conduct local processing in parallel.

The processor logic is shipped into the processing queue of the execution node, which has an active processor dequeue the processor object and execute it. Notice that this execution is performed in a serial manner, in other words, the processor will completely finished a processing job before proceeding to the next job. There is no worry about multi-threading issue and no need to use locks, and therefore no dead lock issue.