Tuesday, April 17, 2007

Event Processing in 2007 and beyond

An Essay, by Ashwin Jayaprakash (Apr 2007)

A sure sign of a technology becoming mature and widely accepted is when developers start losing interest in it and crave for something more. How many of us still take pride in designing and writing applications that use Stateless Session Beans, Message Driven Beans and ActionForms anymore? Let's face it: the tools have matured, we have Blueprints and Design Patterns that guide us at every step of a project's phase. All the (remaining) vendors now offer more or less similar sets of features. Clustering, WebService stacks, and basic tooling are givens now.
We have grudgingly moved towards some sort of interoperability. Some of us feel that XML is overused and misused, but still love XQuery and XPath. SOAP took its own time to evolve from Draft specifications. When it did, most of us realized that it was too raw and bloated to do anything really useful. So, we had to wait for all the sister specifications like Addressing, Transaction, Delivery to reach a decent level of sophistication - or switched to service APIs that don't have formal specifications, like REST or XML-RPC. In the meantime, there were other specifications fighting for relevance - BPEL, Workflow and others to handle Orchestration... that relied on WebServices to do the dirty work. And when we were not looking somebody slid in SOA and ESB.

Where are we really headed?

What will we poor Developers have to fiddle around with over the next few years? After years of manoeuvering and fighting, we have specifications for almost everything, finally and enough for the CTO and his/her team of Architects in companies where Technology is not a core business, to think at a higher, abstract level and treat IT Projects merely as "enablers". If we filter out all the fluff from the ESB, SOA, WS, WSA and other technologies that hide behind catchy acronyms, we have to acknowledge that Software is slowly moving towards commoditization. Electronics/Hardware industry achieved that several decades ago. It was probably easier for them to do it because they left all the hard part of "Programmable logic and customization" to the Software industry.
We have 2 widely used programming languages. XML, XPath and XQuery to transform and massage data from one format to another. WebServices and its related protocols and formats to send and receive data which usually happens to be XML payload, but mostly within the Company's Intranet. BPEL to handle more complex workflows, some of which extend beyond the Intranet. ESB provides the platform to streamline all the different formats and protocols, that we use both internally and externally. So on and so forth. Essentially, we finally have things which finally resemble "Software Components". The situation sounds similar to the Motorcar revolution we've all heard of, which happened in the beginning of the previous century. The Internal combustion engine became more efficient (think of Java/JEE and .Net), Coach builders started making Car bodies, accessories and trimmings (think Packaging - XML and related technologies), good roads and extensive Highway networks got built (think WebServices, ESB etc) and then gradually Traffic cops got replaced by complex Traffic lights and other control networks (think BPEL and competing Workflow/Orchestration standards).
What we will see in the future will be more intelligent IT Systems, and if you want to carry the Motorcar analogy even further - something like Cars that can drive themselves and a "Big-Brother" like network that co-ordinates all traffic. We in the Software industry are definitely heading that way. RFID is finally, commercially viable. RFID and the technologies that it will foster will lead to more prevalent and intelligent Warehouses, smarter Assembly lines, better integration between Traffic control, Accident avoidance and GPS Navigation systems. A host of systems that can automatically respond to feedback and act quickly. Automatic Trading is one among them. One of those technologies crucial to such development is Event Processing. This is a broad category of complimentary technologies - Business Activity Monotoring, Event Stream Processing, Complex Event Processing, Event Driven Architecture. Some of these might appear to be combinations of the others. To keep it simple, we'll label them as Event Processing technologies.
Some of these technologies have been around for several decades in one form or another. Control Systems have custom, in-built, realtime, hardwired Event Processing. Some companies have been using Rule Engines either directly as Inference Engines or just to respond to Facts (as Events). Simple Database Triggers and Stored Procedures are still popular. In short, Event Processing in its new avatar is going to become more widely used in the future. We have also been observing many Event Processing Products and Projects stepping out of their niches by offering general purpose stacks that can be integrated into almost any kind of application.

The tenets of Event Processing

A facility to consume the state(s) of a System as discrete Events, a language or User Interface to specify the conditions/properties to look for in those Events from one System or across Systems (Co-relation) and provide actions to be performed when such situations arise.
It is no accident that Event Processing looks like a cross between Database Query processing and Rule Engines. There is no denying that the technology evolved from existing technologies. However the distinguishing characteristics of Event Processing technology are - the constructs it provides to specify Time based boundaries, fixed size moving Windows, Aggregation and Partitioning/Grouping Events - on a stream of incoming Events and a facility to continously scan these Events for telltale signs, all this expressed as a concise Query.
Database Triggers which in turn execute SQL/Stored Procedures everytime an Event/Row is inserted can provide rudimentary Event Processing functionality. Rule Engines can be used to discern information from Facts being asserted. But none of these technologies alone provide a concise and yet very expressive Domain Specific Language along with a platform, specifically built to constantly monitor/process live streams of Events.
There are just a handful of Event Processing Stacks today. Some are very high end, some very high end but address a specific vertical like Automatic Trading, some general purpose stacks meant for every one and also a few that have been re-packaged and/or re-labelled and shoved into the fray.
Each Stack is implemented in a different way, which shows in the way the Query constructs have different syntax, but the semantics are similar.
Let's look at how one such Event Processor can be used to solve those real-world problems that have been nagging us. No, I don't mean Global Warming, Poverty or the AIDS epidemic. We'll leave that to SOA and ESB.
StreamCruncher is an Event Processor written in Java. It uses a Database to maintain the Events and other artifacts. Since latency and performance are crucial to Event Processing, Embedded and In-Memory Databases are preferred. It exposes the SQL language and its capabilities that we have come to rely so much on, by extending it with an "Event Processing Query Language". The Event Processing Kernel is multi-threaded, and processes Events (and Queries) in stages, to leverage the power of Multi-Core Processors. The Database does some of the heavy lifting by doing the parts it does best like Joining, Sorting etc.
Imagine that there is a small community of farmers who grow Organic Oat, Wheat, Corn and other stuff. They sell their produce and a few other products like Premium Breakfast Cereal, Peanut Butter, Free-range Eggs etc as a Co-operative. To reach a wider customer base, they opened a small Website 2 years ago to cater to online orders. Recently, when an international TV Station featured this Community's products on their Health show, their online sales sky rocketed and almost crashed and burned their Servers. So, they are compelled to upgrade their Website and add a few features to it to ease the burden on their overworked, small IT team.

Online Retail Store

This is what they want for their Online Store: They have quite a few Customers like Spas and Department store chains around the nation, which have arrangements with the Farm, where they place regular, automatic, bulk Orders. These Customers are treated as Priority Customers and they have Service Level Agreements to fulfill the Orders (Verify Payment, alert the Warehouse and Shipment team etc) within 30 minutes of placing the Order. If 30 minutes pass without an Order being "Fulfilled", then an alert is raised for further investigation.
We create 2 Streams to receive Events - one for Orders placed and another for Order Fulfillments. They receive a constant stream of Events, which have to be monitored for SLA failures. It can be accomplished by a simple Event Processing Query in StreamCruncher, by using the Window constructs and the "Alert.. Using..When.." Correlation clause. This is a classic Co-relation example.
To fully understand all the features and syntax of the Query language and the procedure to setup Input and Output Event Streams and other operations, please consult the Documentation. Here's a skinny on the syntax: A Sliding Window is defined like this "store last N", a Time based Window like this "store last N milliseconds/seconds/minutes/hours/days" and a Tumbling Window like this "store latest N". The Stream can be divided into small Partitions with Windows defined at specific levels like this "partition by country, state, city store last N seconds". This creates Time based Windows at the "Country > State > City" levels.
SLA Failure Alert:
select unful_order_id, unful_cust_id
order_event.order_id as unful_order_id,
order_event.customer_id as unful_cust_id
cust_order (partition by store last 30 minutes
          where customer_id in
          (select customer_id from priority_customer))
         as order_event correlate on order_id,
fulfillment (partition by store last 30 minutes
           where customer_id in
           (select customer_id from priority_customer))
         as fulfillment_event correlate on order_id
present(order_event and not fulfillment_event);
Let's analyze the Query. There are 2 Tables cust_order and fulfillment, into which the 2 Event streams flow. The Query looks like an SQL Join Query, although this syntax is more expressive, yet concise. The Alert clause is specifically for performing Event Correlation across multiple Streams. Other scenarios described further ahead use syntax that are only minor extensions of SQL, unlike the Alert clause. The Query keeps running on the 2 Streams, monitoring the 2 Streams and generating Output Events as SLA Failure Alerts. The "Alert..Using.." clause consumes Events from the 2 Streams. The "correlate on" clause is like the "join on" clause in SQL which specifies the column/Event property to use to match Events from different Streams. The "When.." clause specifies the Pattern to look for. This Query is meant to output an Event when the Customer Order Event expires without receiving a corresponding Fulfillment Order within 30 minutes since the Order was placed, thus producing an SLA Breach Alert. Such Queries are also called Pattern Matching Queries as they are configured to look for the presence or absence of specific combinations of Events.
Since the Farm has these contracts with their corporate consumers, they have to maintain a minimum number of items in stock in their Warehouses distributed around the nation, to avoid an SLA breach.
We have to setup a mechanism which constantly monitors the items in stock in the Warehouses. When an Order is placed, the products and their quantities are deducted from the Warehouse's stock. Products are supplied from different Warehouses based on the location closest to the Ship-to address. Warehouses are also stocked periodically and the products and quantities are added to the Warehouse's stock. So, we receive 2 kinds of Events - Orders, which reduce the Stock and so carry a negative number for quantity and Re-stocks, which increase the Stock.
We also use a regular Database Table, to store the minimum stock that must be maintained for each product. When the stock for that product goes below the prescribed quantity an alert should be raised.
Warehouse Re-Stock Alert:
select country, state, city, item_sku, sum_item_qty,
from warehouse (partition by country, state, city, item_sku
   store lastest 500
   with pinned sum(item_qty) entrance only as sum_item_qty)
   as stock_level_events,
where stock_level_events.$row_status is new
    and stock_level_events.item_sku =
    and stock_level_events.sum_item_qty <
The "partition by country, state, city, item_sku" means that separate Windows are maintained based on the Event's Country, State, City and Product. An Aggregate function - "sum(item_qty)" is used to maintain the sum of the quantities of the products in the Window.
The Window is a Tumbling Window, where Events are consumed in one cycle and discarded in the next cycle. Their lifespan is just one cycle. Since the aggregate here, is the Sum of the "item_qty" of all the Events currently in the Window, it changes even when Events expire. But we don't want the Sum to change when an Event expires and we just want to keep the sum of all the items passing through the Window. The "entrance only" clause in the Aggregate function informs the Kernel to calcualte the aggregate when an Event enters the Window and to ignore it when the Event is expelled from the Window. Since the Orders and re-stock quantities are negative and positive numbers, the aggregate will automatically store the total quantity in the Warehouse.
Whenever the Sum goes below the minimum value specified in the "stock_level_master" Table, an alert is raised.
The "pinned" clause is used to hold/pin up all the Aggregates even if all Events in the Window expire. Otherwise, for "Time based windows" and "Tumbling Windows", the Aggregate if declared, expires along with the Window when all Events in that Window expire. It gets re-created when new Events arrive.
The ability to merge an Event Stream with a regular Table reduces the maintenance required to maintain the Master data of minimum values, adding/removing Products etc because it is a regular DB Table. And the Query's behaviour is driven by the Master Table (stock_level_master).
Orders are placed from all over the nation. To reduce the overhead of shipping smaller orders, the process is streamlined by aggregating orders being shipped to the same City and are then taken care of by local distributers. This way, bulk-shipment offers can be availed by the Farm.
Shipment directives are held for 30 minutes. If more than 5 such directives are received within those 30 minutes, to the same City, they are dispatched in batches of 5 even before the 30 minutes expire. Thus saving the overhead of shipping smaller orders individually.
Shipment Aggregator:
select country, state, city, item_sku, item_qty,
 order_time, order_id
from cust_order
(partition by country, state, city
store last 30 minutes max 5)
as order_events
where order_events.$row_status is dead;
This Query produces Output Events when an Event gets expelled from the Window, as indicated by the "$row_status is dead" clause after it has stayed in the Window for 30 minutes or expelled before that as part of an "Aggregated Order" when the Window fills up with 5 Order Events.
The Website itself has to be spruced up with some nifty features. To draw the attention of Online visitors, the top 5, best selling items over the past 30 days are to be displayed. This is accomplished by a Query that uses a Chained Partition clause, where the results from one Partition are re-directed to another Partition, which slices and dices the intermediate data into the final form.
Best Selling Items:
select order_country, order_state, order_category,
 order_item_sku, order_total_qty
from cust_order
  (partition by order_country, order_state, order_category,
   order_item_sku store last 30 days
   with sum(order_quantity) as order_total_qty)
  (partition by order_country, order_state, order_category
   store highest 5 using order_total_qty
   with update group order_country, order_state, order_category,
   order_item_sku where $row_status is new) as order_events
where order_events.$row_status is not dead;
This Query uses a "Chained Partition", where 2 different Partition clauses are chained be means of the the "to" keyword and the results from the first Partition are fed to the second Partition, whose results form the input to the Query. Such chaining is required when a single Partition cannot accomplish the task. The first Partition stores a rolling sum of the total Quantity sold at the "Country > State > Category > SKU" level and those Sums are sent to the second Partition which uses the "..is new" clause to pick up only the newly calculated/changed Sums. The second Partition uses a "Highest Rows/Events" window, which is a slight variation of the Sliding Window to maintain an up-to-date list of the "top 5" selling items in each "Country > State > Category".
As can be seen from the examples above, Event Processing is quite versatile and can be used to implement many Use Cases concisely and efficiently. All Event Processors are built to handle good data rates, which greatly simplifies the development and maintenance cost of the applications using them.