Sunday, April 22, 2007

StreamCruncher 1.13 Release Candidate is ready!

1) This version includes support for Oracle 10g and has been tested on Oracle Enterprise 10.2.0.

10g being an Enterprise grade Database, requires Tuning by a DB Expert before you start using it as the underlying Database for the StreamCruncher. I don't claim to be an Oracle expert and so I'd ask my DBA to setup the Database for very low Latency, deferred Disk flush, Larger Page and Cache sizes - so that he/she will translate that into the necessary Oracle Configuration changes. People have been creating TableSpaces on RAM Drives mostly to host Indexes for Tables that are constantly modified and heavily contended. I'd also think of creating the whole Database on such a RAM Drive.

StreamCruncher also creates Tables and Indexes for internal purposes. You'll have to ensure that the Schema in which these get created (usually the User name provided in the StreamCruncher DB Config file) are on the TableSpace that is Tuned & Configured for this purpose.

Another good thing to remember to tell the DBA would be the nature in which Events/Rows are operated upon in the Database via StreamCruncher. In any Internal Table, Events are mostly pumped by one Thread and consumed by another Thread - very similar to a Queue or a Conveyor Belt. Updates are done on an Indexed Column mostly on a small set of Rows that are usually in the Page Cache. All DB access (Insert/Update/Delete) by StreamCruncher on its Internal Tables are through Indexes - some Unique and some are not.

Remember, Oracle Database Tuning is an Industry in itself. Make sure you've tuned your setup well!

2) There was another small Concurrency issue in the Kernel that has also been fixed. The last of these kind of issues, hopefully. So, I've finally got rid of the "Unique Index Violation" errors I used to get only on Multi-processor machines. Version 1.12 had it fixed for Single Processor machines. I also have to admit that this fix affects the performance on single Processor machines too, though an increase of only by about 10-13%.

Tuesday, April 17, 2007

Event Processing in 2007 and beyond

An Essay, by Ashwin Jayaprakash (Apr 2007)
Website:
http://www.streamcruncher.com

A sure sign of a technology becoming mature and widely accepted is when developers start losing interest in it and crave for something more. How many of us still take pride in designing and writing applications that use Stateless Session Beans, Message Driven Beans and ActionForms anymore? Let's face it: the tools have matured, we have Blueprints and Design Patterns that guide us at every step of a project's phase. All the (remaining) vendors now offer more or less similar sets of features. Clustering, WebService stacks, and basic tooling are givens now.
We have grudgingly moved towards some sort of interoperability. Some of us feel that XML is overused and misused, but still love XQuery and XPath. SOAP took its own time to evolve from Draft specifications. When it did, most of us realized that it was too raw and bloated to do anything really useful. So, we had to wait for all the sister specifications like Addressing, Transaction, Delivery to reach a decent level of sophistication - or switched to service APIs that don't have formal specifications, like REST or XML-RPC. In the meantime, there were other specifications fighting for relevance - BPEL, Workflow and others to handle Orchestration... that relied on WebServices to do the dirty work. And when we were not looking somebody slid in SOA and ESB.

Where are we really headed?

What will we poor Developers have to fiddle around with over the next few years? After years of manoeuvering and fighting, we have specifications for almost everything, finally and enough for the CTO and his/her team of Architects in companies where Technology is not a core business, to think at a higher, abstract level and treat IT Projects merely as "enablers". If we filter out all the fluff from the ESB, SOA, WS, WSA and other technologies that hide behind catchy acronyms, we have to acknowledge that Software is slowly moving towards commoditization. Electronics/Hardware industry achieved that several decades ago. It was probably easier for them to do it because they left all the hard part of "Programmable logic and customization" to the Software industry.
We have 2 widely used programming languages. XML, XPath and XQuery to transform and massage data from one format to another. WebServices and its related protocols and formats to send and receive data which usually happens to be XML payload, but mostly within the Company's Intranet. BPEL to handle more complex workflows, some of which extend beyond the Intranet. ESB provides the platform to streamline all the different formats and protocols, that we use both internally and externally. So on and so forth. Essentially, we finally have things which finally resemble "Software Components". The situation sounds similar to the Motorcar revolution we've all heard of, which happened in the beginning of the previous century. The Internal combustion engine became more efficient (think of Java/JEE and .Net), Coach builders started making Car bodies, accessories and trimmings (think Packaging - XML and related technologies), good roads and extensive Highway networks got built (think WebServices, ESB etc) and then gradually Traffic cops got replaced by complex Traffic lights and other control networks (think BPEL and competing Workflow/Orchestration standards).
What we will see in the future will be more intelligent IT Systems, and if you want to carry the Motorcar analogy even further - something like Cars that can drive themselves and a "Big-Brother" like network that co-ordinates all traffic. We in the Software industry are definitely heading that way. RFID is finally, commercially viable. RFID and the technologies that it will foster will lead to more prevalent and intelligent Warehouses, smarter Assembly lines, better integration between Traffic control, Accident avoidance and GPS Navigation systems. A host of systems that can automatically respond to feedback and act quickly. Automatic Trading is one among them. One of those technologies crucial to such development is Event Processing. This is a broad category of complimentary technologies - Business Activity Monotoring, Event Stream Processing, Complex Event Processing, Event Driven Architecture. Some of these might appear to be combinations of the others. To keep it simple, we'll label them as Event Processing technologies.
Some of these technologies have been around for several decades in one form or another. Control Systems have custom, in-built, realtime, hardwired Event Processing. Some companies have been using Rule Engines either directly as Inference Engines or just to respond to Facts (as Events). Simple Database Triggers and Stored Procedures are still popular. In short, Event Processing in its new avatar is going to become more widely used in the future. We have also been observing many Event Processing Products and Projects stepping out of their niches by offering general purpose stacks that can be integrated into almost any kind of application.

The tenets of Event Processing

A facility to consume the state(s) of a System as discrete Events, a language or User Interface to specify the conditions/properties to look for in those Events from one System or across Systems (Co-relation) and provide actions to be performed when such situations arise.
It is no accident that Event Processing looks like a cross between Database Query processing and Rule Engines. There is no denying that the technology evolved from existing technologies. However the distinguishing characteristics of Event Processing technology are - the constructs it provides to specify Time based boundaries, fixed size moving Windows, Aggregation and Partitioning/Grouping Events - on a stream of incoming Events and a facility to continously scan these Events for telltale signs, all this expressed as a concise Query.
Database Triggers which in turn execute SQL/Stored Procedures everytime an Event/Row is inserted can provide rudimentary Event Processing functionality. Rule Engines can be used to discern information from Facts being asserted. But none of these technologies alone provide a concise and yet very expressive Domain Specific Language along with a platform, specifically built to constantly monitor/process live streams of Events.
There are just a handful of Event Processing Stacks today. Some are very high end, some very high end but address a specific vertical like Automatic Trading, some general purpose stacks meant for every one and also a few that have been re-packaged and/or re-labelled and shoved into the fray.
Each Stack is implemented in a different way, which shows in the way the Query constructs have different syntax, but the semantics are similar.
Let's look at how one such Event Processor can be used to solve those real-world problems that have been nagging us. No, I don't mean Global Warming, Poverty or the AIDS epidemic. We'll leave that to SOA and ESB.
StreamCruncher is an Event Processor written in Java. It uses a Database to maintain the Events and other artifacts. Since latency and performance are crucial to Event Processing, Embedded and In-Memory Databases are preferred. It exposes the SQL language and its capabilities that we have come to rely so much on, by extending it with an "Event Processing Query Language". The Event Processing Kernel is multi-threaded, and processes Events (and Queries) in stages, to leverage the power of Multi-Core Processors. The Database does some of the heavy lifting by doing the parts it does best like Joining, Sorting etc.
Imagine that there is a small community of farmers who grow Organic Oat, Wheat, Corn and other stuff. They sell their produce and a few other products like Premium Breakfast Cereal, Peanut Butter, Free-range Eggs etc as a Co-operative. To reach a wider customer base, they opened a small Website 2 years ago to cater to online orders. Recently, when an international TV Station featured this Community's products on their Health show, their online sales sky rocketed and almost crashed and burned their Servers. So, they are compelled to upgrade their Website and add a few features to it to ease the burden on their overworked, small IT team.

Online Retail Store

This is what they want for their Online Store: They have quite a few Customers like Spas and Department store chains around the nation, which have arrangements with the Farm, where they place regular, automatic, bulk Orders. These Customers are treated as Priority Customers and they have Service Level Agreements to fulfill the Orders (Verify Payment, alert the Warehouse and Shipment team etc) within 30 minutes of placing the Order. If 30 minutes pass without an Order being "Fulfilled", then an alert is raised for further investigation.
We create 2 Streams to receive Events - one for Orders placed and another for Order Fulfillments. They receive a constant stream of Events, which have to be monitored for SLA failures. It can be accomplished by a simple Event Processing Query in StreamCruncher, by using the Window constructs and the "Alert.. Using..When.." Correlation clause. This is a classic Co-relation example.
To fully understand all the features and syntax of the Query language and the procedure to setup Input and Output Event Streams and other operations, please consult the Documentation. Here's a skinny on the syntax: A Sliding Window is defined like this "store last N", a Time based Window like this "store last N milliseconds/seconds/minutes/hours/days" and a Tumbling Window like this "store latest N". The Stream can be divided into small Partitions with Windows defined at specific levels like this "partition by country, state, city store last N seconds". This creates Time based Windows at the "Country > State > City" levels.
SLA Failure Alert:
select unful_order_id, unful_cust_id
from
alert
order_event.order_id as unful_order_id,
order_event.customer_id as unful_cust_id
using
cust_order (partition by store last 30 minutes
          where customer_id in
          (select customer_id from priority_customer))
         as order_event correlate on order_id,
fulfillment (partition by store last 30 minutes
           where customer_id in
           (select customer_id from priority_customer))
         as fulfillment_event correlate on order_id
when
present(order_event and not fulfillment_event);
Let's analyze the Query. There are 2 Tables cust_order and fulfillment, into which the 2 Event streams flow. The Query looks like an SQL Join Query, although this syntax is more expressive, yet concise. The Alert clause is specifically for performing Event Correlation across multiple Streams. Other scenarios described further ahead use syntax that are only minor extensions of SQL, unlike the Alert clause. The Query keeps running on the 2 Streams, monitoring the 2 Streams and generating Output Events as SLA Failure Alerts. The "Alert..Using.." clause consumes Events from the 2 Streams. The "correlate on" clause is like the "join on" clause in SQL which specifies the column/Event property to use to match Events from different Streams. The "When.." clause specifies the Pattern to look for. This Query is meant to output an Event when the Customer Order Event expires without receiving a corresponding Fulfillment Order within 30 minutes since the Order was placed, thus producing an SLA Breach Alert. Such Queries are also called Pattern Matching Queries as they are configured to look for the presence or absence of specific combinations of Events.
Since the Farm has these contracts with their corporate consumers, they have to maintain a minimum number of items in stock in their Warehouses distributed around the nation, to avoid an SLA breach.
We have to setup a mechanism which constantly monitors the items in stock in the Warehouses. When an Order is placed, the products and their quantities are deducted from the Warehouse's stock. Products are supplied from different Warehouses based on the location closest to the Ship-to address. Warehouses are also stocked periodically and the products and quantities are added to the Warehouse's stock. So, we receive 2 kinds of Events - Orders, which reduce the Stock and so carry a negative number for quantity and Re-stocks, which increase the Stock.
We also use a regular Database Table, to store the minimum stock that must be maintained for each product. When the stock for that product goes below the prescribed quantity an alert should be raised.
Warehouse Re-Stock Alert:
select country, state, city, item_sku, sum_item_qty,
 stock_min_level
from warehouse (partition by country, state, city, item_sku
   store lastest 500
   with pinned sum(item_qty) entrance only as sum_item_qty)
   as stock_level_events,
   stock_level_master
where stock_level_events.$row_status is new
    and stock_level_events.item_sku =
    stock_level_master.stock_item_sku
    and stock_level_events.sum_item_qty <
    stock_level_master.stock_min_level;
 
The "partition by country, state, city, item_sku" means that separate Windows are maintained based on the Event's Country, State, City and Product. An Aggregate function - "sum(item_qty)" is used to maintain the sum of the quantities of the products in the Window.
The Window is a Tumbling Window, where Events are consumed in one cycle and discarded in the next cycle. Their lifespan is just one cycle. Since the aggregate here, is the Sum of the "item_qty" of all the Events currently in the Window, it changes even when Events expire. But we don't want the Sum to change when an Event expires and we just want to keep the sum of all the items passing through the Window. The "entrance only" clause in the Aggregate function informs the Kernel to calcualte the aggregate when an Event enters the Window and to ignore it when the Event is expelled from the Window. Since the Orders and re-stock quantities are negative and positive numbers, the aggregate will automatically store the total quantity in the Warehouse.
Whenever the Sum goes below the minimum value specified in the "stock_level_master" Table, an alert is raised.
The "pinned" clause is used to hold/pin up all the Aggregates even if all Events in the Window expire. Otherwise, for "Time based windows" and "Tumbling Windows", the Aggregate if declared, expires along with the Window when all Events in that Window expire. It gets re-created when new Events arrive.
The ability to merge an Event Stream with a regular Table reduces the maintenance required to maintain the Master data of minimum values, adding/removing Products etc because it is a regular DB Table. And the Query's behaviour is driven by the Master Table (stock_level_master).
Orders are placed from all over the nation. To reduce the overhead of shipping smaller orders, the process is streamlined by aggregating orders being shipped to the same City and are then taken care of by local distributers. This way, bulk-shipment offers can be availed by the Farm.
Shipment directives are held for 30 minutes. If more than 5 such directives are received within those 30 minutes, to the same City, they are dispatched in batches of 5 even before the 30 minutes expire. Thus saving the overhead of shipping smaller orders individually.
Shipment Aggregator:
select country, state, city, item_sku, item_qty,
 order_time, order_id
from cust_order
(partition by country, state, city
store last 30 minutes max 5)
as order_events
where order_events.$row_status is dead;
This Query produces Output Events when an Event gets expelled from the Window, as indicated by the "$row_status is dead" clause after it has stayed in the Window for 30 minutes or expelled before that as part of an "Aggregated Order" when the Window fills up with 5 Order Events.
The Website itself has to be spruced up with some nifty features. To draw the attention of Online visitors, the top 5, best selling items over the past 30 days are to be displayed. This is accomplished by a Query that uses a Chained Partition clause, where the results from one Partition are re-directed to another Partition, which slices and dices the intermediate data into the final form.
Best Selling Items:
select order_country, order_state, order_category,
 order_item_sku, order_total_qty
from cust_order
  (partition by order_country, order_state, order_category,
   order_item_sku store last 30 days
   with sum(order_quantity) as order_total_qty)
to
  (partition by order_country, order_state, order_category
   store highest 5 using order_total_qty
   with update group order_country, order_state, order_category,
   order_item_sku where $row_status is new) as order_events
where order_events.$row_status is not dead;
This Query uses a "Chained Partition", where 2 different Partition clauses are chained be means of the the "to" keyword and the results from the first Partition are fed to the second Partition, whose results form the input to the Query. Such chaining is required when a single Partition cannot accomplish the task. The first Partition stores a rolling sum of the total Quantity sold at the "Country > State > Category > SKU" level and those Sums are sent to the second Partition which uses the "..is new" clause to pick up only the newly calculated/changed Sums. The second Partition uses a "Highest Rows/Events" window, which is a slight variation of the Sliding Window to maintain an up-to-date list of the "top 5" selling items in each "Country > State > Category".
As can be seen from the examples above, Event Processing is quite versatile and can be used to implement many Use Cases concisely and efficiently. All Event Processors are built to handle good data rates, which greatly simplifies the development and maintenance cost of the applications using them.

Links

Friday, April 13, 2007

Finally!! StreamCruncher 1.12 is ready and it's no longer a Beta version. This is the Release Candidate.

(Also, performance test results - read further. Hint: 8,000 TPS !! on 1.6 GHz Laptop)

I found the time to review some parts of the Kernel code. It turned out that there were small things here and there that needed fixing. Since the Kernel is heavily multi-threaded, it was important that locking be reduced. As a result, the CAS (Compare and Set) operations (Java 1.5+) are used in many places. This is much faster than actually waiting on a lock and then realising that the logic in the protected section does not have to be executed anyway.

After fixing these issues, I modified the "TimeWindowFPerfTest" class to capture more metrics. Apart from just calculating the Average Latency added to each Event by the Kernel in a Straight/Simple processing case, this Test now calculates the average total time it takes to insert rows into the Database and for the Kernel to publish them.

The Test was already described before. This time, with the bug fixes, there were no Index-violation exceptions. So, on my Laptop running Windows XP Home with 1 GB RAM and a single 1.6Ghz Intel Centrino Processor, I ran the "TimeWindowFPerfTest" performance test using the Sun JDK 1.6 and StreamCruncher 1.12 with H2 Database.

I redirected the verbose Console output to a log file and thereby eliminated the otherwise excessive overhead added by the Console logging. This way, I also have proof of all the Tests that were performed.

The Test uses a Thread to generate and pump 'X' events in one shot without pausing. A Query with Time based Partition is defined on this Stream. The Window size is 5 seconds. A "$row_status is new" clause is used to output only the new Events that arrive at the Window and not the ones that exit the Window when their 5 seconds are over. This way, an accurate measurement of how much overhead the Kernel is imposing can be calculated. The total time taken for the entire batch to be inserted and for it to be pumped out of the Kernel can also be calculated. This can then be used to calculate the Transactions per second - the most important metric.

The Test pumps these 'X' events and then waits for some time that is sufficient for the Events to clear the area and then pumps the same number again...and again..At the end of the Test, the results are retrieved, verified and then the Averages are calculated.

Ok, here it comes..Keep in mind that this is a single CPU and the Event "pumper" and the Kernel are running in parallel. The H2 Database (current version) is completely single-Threaded - and so there's no concurrency at all, even though StreamCruncher supports concurrent operations.

I ran 3 rounds for each configuration and here are the results:

Set 1 (4000 Events per Batch):

Set 1 - Round 1
Total events published: 36000. Each batch was of size:4000. Avg time to publish each event (Latency in Msecs): 224.0
Avg time (in Msecs) to insert 4000 Events into the DB: 418.0
Avg time (in Msecs) to process 4000 Events by the Kernel: 376.0
Avg time (in Msecs) for the insertion of first Event in the batch of 4000 Events into DB to publication of last Event in batch by Kernel: 598.0

Set 1 - Round 2
Total events published: 36000. Each batch was of size:4000. Avg time to publish each event (Latency in Msecs): 199.0
Avg time (in Msecs) to insert 4000 Events into the DB: 428.0
Avg time (in Msecs) to process 4000 Events by the Kernel: 397.0
Avg time (in Msecs) for the insertion of first Event in the batch of 4000 Events into DB to publication of last Event in batch by Kernel: 600.0

Set 1 - Round 3
Total events published: 36000. Each batch was of size:4000. Avg time to publish each event (Latency in Msecs): 261.0
Avg time (in Msecs) to insert 4000 Events into the DB: 387.0
Avg time (in Msecs) to process 4000 Events by the Kernel: 336.0
Avg time (in Msecs) for the insertion of first Event in the batch of 4000 Events into DB to publication of last Event in batch by Kernel: 591.0

Set 2 (8000 Events per Batch):

Set 2 - Round 1
Total events published: 64000. Each batch was of size:8000. Avg time to publish each event (Latency in Msecs): 378.0
Avg time (in Msecs) to insert 8000 Events into the DB: 603.0
Avg time (in Msecs) to process 8000 Events by the Kernel: 699.0
Avg time (in Msecs) for the insertion of first Event in the batch of 8000 Events into DB to publication of last Event in batch by Kernel: 1044.0

Set 2 - Round 2
Total events published: 64000. Each batch was of size:8000. Avg time to publish each event (Latency in Msecs): 457.0
Avg time (in Msecs) to insert 8000 Events into the DB: 533.0
Avg time (in Msecs) to process 8000 Events by the Kernel: 666.0
Avg time (in Msecs) for the insertion of first Event in the batch of 8000 Events into DB to publication of last Event in batch by Kernel: 1013.0

Set 2 - Round 3
Total events published: 64000. Each batch was of size:8000. Avg time to publish each event (Latency in Msecs): 392.0
Avg time (in Msecs) to insert 8000 Events into the DB: 593.0
Avg time (in Msecs) to process 8000 Events by the Kernel: 839.0
Avg time (in Msecs) for the insertion of first Event in the batch of 8000 Events into DB to publication of last Event in batch by Kernel: 1064.0

Set 3 (10,000 Events per Batch):

Set 3 - Round 1
Total events published: 70000. Each batch was of size:10000. Avg time to publish each event (Latency in Msecs): 491.0
Avg time (in Msecs) to insert 10000 Events into the DB: 705.0
Avg time (in Msecs) to process 10000 Events by the Kernel: 783.0
Avg time (in Msecs) for the insertion of first Event in the batch of 10000 Events into DB to publication of last Event in batch by Kernel: 1220.0

Set 3 - Round 2
Total events published: 70000. Each batch was of size:10000. Avg time to publish each event (Latency in Msecs): 518.0
Avg time (in Msecs) to insert 10000 Events into the DB: 689.0
Avg time (in Msecs) to process 10000 Events by the Kernel: 845.0
Avg time (in Msecs) for the insertion of first Event in the batch of 10000 Events into DB to publication of last Event in batch by Kernel: 1198.0

Set 3 - Round 3
Total events published: 70000. Each batch was of size:10000. Avg time to publish each event (Latency in Msecs): 513.0
Avg time (in Msecs) to insert 10000 Events into the DB: 647.0
Avg time (in Msecs) to process 10000 Events by the Kernel: 743.0
Avg time (in Msecs) for the insertion of first Event in the batch of 10000 Events into DB to publication of last Event in batch by Kernel: 1151.0

While the Tests were running, I kept noticing that the CPU even for the 10K Set was not rising above ~15% and that for just 1 second periods. Which is quite puzzling. It might be because the Producer and the Consumer Threads are not really running in parallel because the one common resource - the Database is always locked by one of these 2 (sets of) Threads. I was expecting the CPU to peak and the Tests to crumble at the 10K Set. But it didn't, which is a very good sign.

This means that StreamCruncher can do 8000 Transactions Per Second (Straight/Simple Processing) on a very ordinary setup and perform exponentially better on better hardware (more Cores and/or CPUs) and commercial Databases. This, combined with Horizontal Partitioning of the Stream data (using the Pre-Filters and multiple Queries to split the Events and process in parallel) should produce fantastic performance.

The test results/logs can be downloaded from here.

Monday, April 09, 2007

StreamCruncher 1.11 Beta is available. No changes to the code though. I had forgotten to update the Syntax diagram in 1.10 Beta which included the "$diff" and other custom Provider changes.

Saturday, April 07, 2007

StreamCruncher 1.10 Beta is ready! This release includes:

1) $diff clause for In-built Aggregate Functions along with custom Baseline Provider feature. ClusterHealthTest demonstrates this new feature.

2) Custom Window size Provider should now be declared in the Query, statically. TimeWFPartitionWinSizeProviderTest demonstrates this change.

3) A simple class StandAloneDemo shows how to run a sample using just the Java main(..) method. sc_run_standalone.bat can be used to run the Demo