We just wrapped up our first of a two-part series on Mastering MapReduce together with Curt Monash. We’ve spent a lot of time discussing MapReduce with Curt and wanted to help educate the community on exactly what it is and how it applies to data management and analysis. We’ve published the recorded webcast and below are the slides we presented from an Aster Data perspective which outline:
- What is Aster Data’s SQL-MapReduce?
- Example industry applications of SQL-MapReduce
- Walking through the SQL-MapReduce syntax
This post was co-authored by John Cieslewicz, Eric Friedman, and Peter Pawlowski of Aster Data Systems
One year ago we introduced SQL/MapReduce for the Aster nCluster database, which integrates MapReduce and SQL to enable deep analytics within the database. Pushing computation inside the database and close to the data is increasingly important as data sizes grow exponentially. As SQL/MR turns one year old, we are happy to announce that we will be presenting our SQL/MR innovations next week at the 35th International Conference on Very Large Data Bases (VLDB), the premier international forum for database research.
We developed SQL/MR because we saw a growing gap between the deep analytics and application needs of very large data and the capabilities provided by SQL and traditional relational-only data processing. We call this gap, the “SQL Gap.”
SQL and the relational query processing model are well suited for many, but not all data processing tasks. Some queries are cumbersome, non-intuitive or impossible to express in SQL (note: now that SQL is turing complete, nothing is strictly impossible, but it can be very painful and perform very badly) – check out our paper for some examples. Moreover, query optimizers have a limited number of algorithms at their disposal to process data, which leads to convoluted data processing in situations where applying a little domain knowledge can yield a much more straightforward algorithm.
We found traditional user-defined functions (UDFs) to fall short in bridging this gap between SQL and the answers to challenging analytic problems that need to be solved. UDFs are often user-unfriendly, inflexible, and not easily parallelized. SQL/MR functions, in contrast, are designed to be easy to develop, easy to install, and easy to use – providing developers and analysts with a powerful tool to tackle the challenges posed by very large data.
To do this, we integrated the MapReduce programming model with SQL. MapReduce is a well known paradigm for parallel, fault-tolerant data processing that allows developers to write procedural code that is then applied to data in parallel. Pure MapReduce, however, misses out on aspects of SQL and relational data processing that are great – such as query optimizations, managed data, and transactions. By integrating SQL and MapReduce on top of Aster nCluster’s hardware management and fault tolerance, we leverage the strengths of each, resulting in a system that is much more powerful than either in isolation.
SQL/MapReduce At VLDB
SQL/MR is much more than a user-defined function. As our paper title states, a SQL/MR function is self-describing, polymorphic, and parallelizable. Let’s explore each of these characteristics and see why we hope researchers at VLDB will be as excited as we are by SQL/MR.
Self-Describing and Polymorphic
The behavior and output characteristics of a SQL/MR function are determined dynamically at query-time instead of statically at install-time as is the case with traditional user-defined functions. These characteristics allow SQL/MR functions to behave much more like general purpose library functions than single-use, specific application user-defined functions. When a SQL/MR function is used in a query, the nCluster query planner negotiates a contract with the SQL/MR function, providing the function’s input schema and optional user-supplied parameters. In return, the SQL/MR function agrees to a contract that specifies its output schema for the duration of the query. This contract negotiation is what makes SQL/MR functions self-describing, and their ability to be invoked on different input with different optional parameters makes them polymorphic as well. To summarize, by invoking a SQL/MR function on different input, with different optional parameters, the SQL/MR function may export a different output schema and perform different computation – the possibilities are entirely up to the developer!
The SQL/MR programming model, like that of MapReduce, is inherently parallelizable. Developers write procedural code in the language of their choice, but at runtime that code will run in parallel across hundreds of nodes within an nCluster database. Advanced analytics and application code can now be executed in parallel, directly on data stored within nCluster making nCluster an application-friendly, high performance data warehouse and data application system. Advanced analytics capabilities that we have already pushed inside nCluster using SQL/MR include click-stream sessionization, general purpose time series path matching, dynamic and massively parallel data loading from heterogeneous sources, and genetic sequence analysis.
Bridging the Gap
SQL/MapReduce has matured greatly over the past year and is used by our customers in creative ways we never imagined – proof positive that SQL/MapReduce is an effective way to bridge the “SQL Gap” to deeper analytics on very large data. Check out the other application examples and sample code videos on www.asterdata.com, as well as the MapReduce category of this blog
We’d love to hear from you about any type of analysis you’re doing where stand-alone SQL is becoming overly-complex … and please look us up at the VLDB 2009 show if you’re making the trip to Lyon, France!
(Apart from the authors, Brent Chun, Mohit Aron, Abhishek Marwah, Raghu Venkat, Vinay Bondhugula, and Prasan Roy of Aster Data Systems are notable contributors to the overall SQL/MR effort.)
Aster’s SQL/MR framework (In-Database MapReduce) enables our users to write custom analytic functions (SQL/MR functions) in a programming language like Java or Python, install them in the cluster, and then invoke them from SQL to analyze data stored in nCluster database tables. These SQL/MR functions transform one table into another, but do so in a massively parallel way. As increasingly valuable analytic functions are pushed into the database, the value of constructing a data structure once, and reusing it across a large number of rows, increases substantially. Our API was designed with this in mind.
What’s the SQL/MR API look like? The SQL/MR function is given an iterator to a set of input rows, as well as an emitter for outputting rows. We decided on this interface for a number of reasons, with one of the most important being the ability to maintain state between rows. We’ve found that many useful analytic functions need to construct some state before processing a row of input, and this state construction should be amortized over as many rows as possible.
Here’s a wireframe of one type of SQL/MR function (a RowFunction):
class RealAsterFunction implements RowFunction
void operateOnSomeRows(RowIterator iterator, RowEmitter outputEmitter)
// Construct some data structure to enable fast processing.
// Read a row from iterator, process it, and emit a result.
When this SQL/MR function is invoked in nCluster, the system starts several copies of this function on each node (think: one per CPU core). Each function is given an iterator to the rows that live in its local slice of the data. An alternative design, which is akin to the standard scalar UDF, would have been:
class NotRealAsterFunction implements PossibleRowFunction
static void operateOnRow(Row currentRow, RowEmitter outputEmitter)
// Process the given row and emit a result.
In this design, the static operateOnRow method would be called for each row in the function’s input. State can no longer be easily stored between rows. For simple functions, like computing the absolute value or a substring of a particular column, there’s no need for such inter-row state. But, as we’ve implemented more interesting analytic functions, we’ve found that enabling the storage of such state, or more specifically paying only once for the construction of something complex and then reusing it, has real value. Without the ability to save state between rows, the construction of this state would dominate the function’s execution.
Examples abound. Consider a SQL/MR function which applies a complex model to score the data in the database, whether it’s scoring a customer for insurance risk, scoring an internet user for an ad’s effectiveness, or scoring a snippet of text for its sentiment. These functions often construct a data structure in memory to accelerate scoring, which works very well with the SQL/MR API: build the data structure once and reuse it across a large number of rows.
A sentiment analysis SQL/MR function, designed to classify a set of notes written up by customer service reps or a set of comments post on a blog, would likely first build a hash table of words to sentiment scores, based on some dictionary file. This function would then iterate through each snippet of text, converting each word to its stem and then doing a fast lookup via the hash table. Such a persistent data structure accelerates the sentiment scoring of each text snippet.
Another example is Aster’s nPath SQL/MR function. At a high level, this function looks for patterns in ordered data, with the pattern specified with a regular expression. When nPath runs, it converts the pattern into a data structure optimized for fast, constant-memory pattern matching. If state couldn’t be maintained between rows, there’d be a large price to reconstructing this data structure on each new row.
Repeating the high bit: as increasingly valuable analytic functions are pushed into the database, the value of constructing a data structure once, and reusing it across a large number of rows, increases substantially. The SQL/MR API was designed with this in mind.
When we announced Aster nCluster’s In-Database MapReduce feature last year, many people were intrigued by the new analytics they would be able to do in their database. However, In-Database MapReduce is new and often loaded with a lot of technical discussion on how it’s different from PL/SQL or UDF’s, whether it’s suitable for business aanalysts or developers, and more.What people really want to know is how businesses can take advantage of MapReduce.
I’ve referred to how our customers use In-Database MapReduce (and nPath) for click-stream analytics . In our “MapReduce for Data Warehousing and Analytics” webinar last week, Anand Rajaraman covered several other example applications. Rajaraman is CEO and Founder of Kosmix and Consulting Assistant Professor in the Computer Science Department at Stanford University (full disclosure: Anand is also on the Aster board of directors). After spending some time discussing graphing, i.e. finding the shortest path between items, Rajaraman discusses applications in finance, behavioral analytics, text, and statistical analysis that can be easily completed with In-Database MapReduce but are difficult or impossible with SQL alone.
As Rajaraman says, “We need to think beyond conventional relational databases. We need to move on to MapReduce. And the best way of doing that is to combine MapReduce with SQL.”
Our goal at Aster is to build a product that will answer your analytical questions sooner. Sooner doesn’t just mean faster database performance – it means faster answers from the moment you conceive of the question to the moment you get the answer. This means allowing analysts and end-users to easily ask the questions on their mind.
Aster nCluster, our massively-parallel database, has supported SQL from birth. SQL is great in many respects: it allows people of various levels of technical proficiency to ask lots of interesting questions in a relatively straightforward way. SQL’s easy to learn but powerful enough to ask the right questions.
But, we’ve realized that in many situations SQL just doesn’t cut it. If you want to sessionize your web clicks or find interesting user paths, run a custom fraud classifier, or tokenize and stem words across documents, you’re out of luck. Enter SQL/MR, one part of our vision of what a 21st-century database system should look like.
Let’s say your data is in nCluster. If your analytic question can be answered using SQL, you don’t have to worry about writing Java or Python. But, as soon as something more complicated comes up, you can write a SQL/MR function against our simple API, upload it into the cluster, and have it start operating on your data by invoking it from SQL. How is this related to MapReduce? It turns out that these functions are sufficient to express a full MapReduce dataflow. How are SQL/MR functions different than the UDFs of yore? It’s all about scale, usability, reusability; all three contributing to you getting your answer sooner.
SQL/MR functions play in a massively-parallel sandbox, one with terabytes and terabytes of data, so they’re designed to be readily parallelized. Yes, they just accept a table as input and produce a table as output, but they do so in a distributed way at huge scale. They can take as input either rows (think “map”) or well-defined partitions (think “reduce”), which allows nCluster to move data and/or computation around to make sure that the right data is on the right node at the right time. SQL/MR functions are table functions breaking out of the single node straight-jacket. This means you can analyze lots of data fast.
Usability We want to make sure that developers using our SQL/MR framework spend their time thinking about the analytics, not dealing with infrastructure issues. We have a straight-foward API (think: you get a stream of rows and give us back a stream of rows) and a debugging interface that lets you monitor execution of your function across our cluster. Want to write and run a function? One command installs the function, and a single SQL statements invokes it. The data you provide the function is defined in SQL, and the output can be sliced and dices with more SQL – no digging into Java if you want to change a projection, provide the function a different slice of data, or add a sort onto the output. All this allows a developer to get a working function – sooner – and an analyst to tweak the question more readily.
Reusability We’ve gone to great lengths to make sure that a SQL/MR function, once written, can be leveraged far and wide. As mentioned before, SQL/MR functions are invoked from SQL, which means that they can be used by users who don’t know anything about Java. They also accept “argument clauses” – custom parameters which integrate nicely with SQL. Our functions are polymorphic, which means their output is dynamically determined by their input. This means that they can be used in a variety of contexts. And, it means that any number of people can write a function which you can easily reuse over your data. A function, once written, can be reused all over the place, allowing users to ask their questions faster (since someone’s probably asked a similar question in the past).
In fact, we’ve leveraged the SQL/MR framework to build a function that ships with nCluster: nPath. But this is just the first step, and
the sky’s the limit. SQL/MR could enable functions for market basket analysis, k-means clustering, support vector machines, natural language processing, among others.
How soon will your questions be answered? I’d love to hear of any ideas you have for analytic functions you’re struggling to write in SQL which you think could be a good fit for SQL/MapReduce
As a follow-on to the introductory nPath post, I wanted to share a little more depth on the nPath SQL syntax and a more sophisticated example which can be applied in click-stream or Web analytics. I’ll try to keep it concise for my colleagues who don’t want the pretty marketing bow .
SEO and SEM are critical traffic drivers for just about any consumer-facing website. Third party analytics offerings such as Google Analytics or Omniture can provide great turn-key package of canned reports. However, certain deep analytics on sequential events are simply out of the reach of not only these outsourced analytics services, but also in-house Web analytics data warehouses implemented on traditional solutions such as Oracle or SQL Server.
For example, suppose we are interested in the optimization of our website flow in order to retain and engage visitors driven to us by SEO/SEM. We want to answer the question: for SEO/SEM-driven traffic that stay on our site only for 5 or less pageviews and then leave our site and never return in the same session, what are the top referring search queries and what are the top path of navigated pages on our site? In traditional data warehouse solutions, this problem would require a five-way self-join of granular weblog data, which is simply unfeasible for large sites such as Myspace.
With the Aster nPath SQL/MR function, this problem can be expressed in a straightforward query that is executed in a very efficient manner in just a single pass over the granular data. The query below returns the top combinations of referral query string (of the entry page of the visit to our site) and on-site navigation path of up to 5 pages before leaving the site:
SELECT entry_refquerystring, entry_page || “,” || onsite_pagepath as onsite_pagepath, count(*) as session_count FROM nPath(
ON ( select * from clicks where year = 2009 )
PARTITION BY customerid, sessionid
ORDER BY timestamp
PATTERN ( ‘Entry.Onsite+.OffSite+$’ )
domain ilike “mysite.com” and refdomain ~* “yahoo.com|google.com|msn.com|live.com” as Entry,
domain ilike “mysite.com” as OnSite,
domain not ilike “mysite.com” as OffSite
MODE( NONOVERLAPPING )
first(page of Entry) as entry_page,
first(refquerystring of Entry) as entry_refquerystring,
accumulate(page of Onsite) as onsite_pagepath,
count(* of Onsite) as onsitecount_minus1
WHERE onsitecount_minus1 < 4
GROUP BY 1,2
ORDER BY 3 DESC
It may not sound sexy, but analyzing a sequence of events over time is non-trivial in standard SQL. When Aster started providing customers ways to express time-series analysis more elegantly and have it return result sets in 90 seconds as opposed to 90 minutes, we started getting comments like, “That’s beautiful … I would make out with my screen if I could!”.
Crazy, I know.
Time-series, or sequential pattern analysis, is useful in a number of industries and applications such as:
- Price changes over time in financial market data
- Path analysis within click-stream data to define anything from top referring sites to the “golden” paths customers navigate before purchasing
- Patterns which detect deviant activity such as spamming or insurance claims fraud
- Sessionization (mapping each event in a clickstream to a human user session)
More specifically, one customer wanted to improve the effectiveness of their advertising and drive more visitors to their site. They asked us to help determine the top paths people take to get to their site and top paths people take after leaving the site. Knowing the “before” path gave them insight into what sites to place advertisements on to drive traffic to their site. Additionally, pathing helps the customer understand behavior/preferences of users who visit their site (e.g., if espn.com is a top site that is in the path of the 5 pages leading up to the customer site, they know that many visitors like sports).
However, discovering relationships between rows of data is difficult to express in SQL, which must invoke multiple self-joins of the data. These joins dramatically expand the amount of data involved in the query and slow down query performance – not to mention complexity in developing and parsing these expressions.
What’s the solution? There are a few, but Aster’s approach has been to develop extensions to SQL which is executed in-database in a single-pass over the data in a massively-parallel fashion utilizing nPath, which is a SQL-MapReduce (SQL/MR) function used to perform regular expression pattern matching over a sequence of rows. It allows users to:
- Specify any pattern in an ordered collection – a sequence – of rows with symbols;
- Specify additional conditions on the rows matching these symbols; and
- Extract useful information from these row sequences.
I’ll share the syntax of nPath here to give you more context of how the query operates:
FROM nPath (
nPath performs pattern matching and computes aggregates. The results of this computation are the output rows of nPath. The rows from nPath can subsequently be used like any other table in a SQL query: rows from nPath may be filtered with the WHERE clause, joined to rows from other tables, grouped with the GROUP BY clause, and so on.
The result? Incredibly powerful insight into a series of events which indicates a pattern or segment can be expressed in SQL, run in parallel on massive clusters of compute-power in an extremely efficient manner via a single pass over the data, and made accessible to business analysts through traditional BI tools.
What do you think? What other methods are people using to tackle these types of problem?