Saturday, October 20, 2012

Coprocessor access to HBase internals

By Lars Hofhansl

Most folks familiar with HBase have heard of coprocessors.
Coprocessors come in two flavors: Observers and Endpoints.

An Observer is similar to a database trigger, an Endpoint can be likened to a stored procedure.
This analogy only goes that far, though.

While triggers and stored procedures are (typically) sandboxed and expressed in a highlevel language (typically SQL with procedural extensions), coprocessors are written in Java and are designed to extend HBase directly (in the sense of avoiding subclassing the HRegionServer class in order to extend it). Code in a coprocessor will happily shutdown a region server by calling System.exit(...)!

On the other hand coprocessors are strangely limited. Before HBASE-6522 they had no access to a RegionServer's locks and leases and hence it was impossible to implement check-and-set type as a coprocessor (because the row modified would need to be locked), or to time out expensive server side data structures (via leases).
HBASE-6522 makes some trivial changes to remedy that.

It was also hard to maintain any kind of share state in coprocessors.
Keep in mind that region coprocessors are loaded per region and there might be 100's of regions for a given region server.

Static members won't work reliably, because coprocessor classes are loaded by special classloaders.

HBASE-6505 fixes that too. Now the RegionCoprocessorEnvironment provides a getSharedData() method, which returns a ConcurrentMap, which is held by the coprocessor environment as a weak reference (in a special map with strongly referenced keys and weakly referenced values), and held strongly by the environment that manages each coprocessor.
That way if the coprocessor is blacklisted (due to throwing an unexpected exception) the coprocessors environment is removed, and any shared data is immediately available for garbage collection, thus avoiding ugly and error prone reference counting (maybe this warrants a separate post).

This shared data is per coprocessor class and per regionserver. As long as there is at least one region observer or endpoint active this shared data is not garbage collected and can be accessed to share state between the remaining coprocessors of the same class.

These changes allow coprocessor to be used for a variety of use cases.
State can be shared across them, allowing coordination between many regions, for example for coordinated queries.
Row locks can be created and released - allowing for check-and-set type operations.
And leases can be used to safely expire expensive data structures or to time out locks among other uses.

Update:
I should also mention that RegionObservers already have access to a region's MVCC.

Sunday, October 14, 2012

Secondary Indexes (Part II)

By Lars Hofhansl

This post last week on secondary indexes caused a lot of discussion - which I find interesting, because this is (IMHO) one of the less interesting posts here.

It appears that I did not provide enough context to frame the discussion.

So... What do people means when they say "secondary index". Turns out many things:
  • Reverse Lookup
  • Range Scans
  • Sorting
  • Index Covered Queries
  • etc
The idea outlined in last weeks post is good for global Reverse Lookup, partially useful for Range Scans (as long as the range is reasonably small), not very useful for Sorting (all KVs would have to be brought back to the client), and entirely useless for Index Covered Queries (the index might not contain the whole truth).

I also was not clear about the consistency guarantees provided.
The "ragged edge" of current time is always a thorny issue in distributed systems. Without a centralized clock (or other generator of monotonically increasing ids) the notion of "at the same time" is at best a fuzzy one (see also Google's Spanner paper and it's notion of current time being an interval bounded by clock precision, rather than a single point).

The idea of last week's indexing is consistent when the index is scanned as of a time in the past (as long at the time is far enough in the past to cover clock skew between servers - since all RegionServers must be installed with NTP, a few dozen milliseconds should be OK). Note you need my HBASE-4536 (0.94+) for this to be correct with deletes.

Queries for current results lead to interesting anomalies pointed out by Daniel (rephrased here):
  1. a client could read along the index
  2. it just passed index entry X
  3. another client changes the indexed column's value to Y and hence adds index entry Y
  4. the first client now checks back with the main row (whose latest version of the indexed column was changed, so X != Y and the timestamps won't match)
  5. the first client would conclude that the indexentry resulted from a partial update, and hence ignore it (to be consistent it should return either the new or the old value)
That is mostly in line with HBase's other operation, with the difference that it won't return a value that did exist when the logical scan operation started.

If that is not good enough it can be addressed in two ways:
  1. Instead of retrieving the latest versions of the KVs of the main row, retrieve all versions. If there is a version of the indexed column that matches the timestamp of the index entry, then use the versions of all other columns less then next version of the indexed columns (those are all the column values that existed before the index was updated to the new value). If there is no matching version of the indexed column then ignore, if the matching version is the latest version, then use the latest versions of all columns.
  2. Have one extra roundtrip, retrieve all version of the indexed column(s).
    If no version of the indexed column matched the timestamp... ignore, if is it the latest version retrieve the rest of the main at the same timestamp used to the retrieve the indexed column (to avoid another client who could have added later rows). Or if the matching value is not latest one retrieve the rest of the row as of the time of the next value (as in #1)
Whether #1 or #2 is more efficient depends on how many versions of all columns have to be retrieved vs. the extra roundtrip for just the indexed column(s).

That way this should be correct, and makes use of the atomicity guarantees provided by HBase.

The last part that was not mentioned last week is how we should encode the values. All column values to be indexed have be encoded in a way that lends itself to lexicographic sorting of bytes. This is easy for string (ignoring locale specific rules), but harder to integer and even floating point numbers.
The excellent Lily library has code to perform this kind of encoding.
(Incidentally it is also a complete index solution using write ahead logging - into HBase itself - to provide idempotent index transactions).

--
 
Lastly, it is also the simplest solution one can possibly get away with that requires no changes to HBase and no write ahead logging.

This all turned out to to be bit more complex than anticipated, so it should definitely be encapsulated in a client side library.

Sunday, October 7, 2012

Musings on Secondary Indexes

By Lars Hofhansl

There has been a lot of discussion about 2ndary indexes in HBase recently.

Since HBase is a distributed system there are many naive ways to do this, but it is not trivial to get it correct. HBase is a consistent key value store, so you would expect the same guarantees from your indexes.

The choices mostly boil down to two solutions:
  1. "Local" (Region level) indexes. These are good for low latency (everything is handled at locally at a server). The trade off is that it is impossible to ask globally where the row matching an index is located. Instead all regions have to be consulted.
  2. Global indexes. These can be used for global index queries, but involve multi server updates for index maintenance.
Here I will focus on the latter: Global Indexes.

In turns out a correct solution can be implemented with relative simplicity without any changes to HBase.
First off there are three important observations when talking about index updates:
  1. An index update is like a transaction
  2. An index update transaction is idempotent
  3. With a bit of discipline partial index transactions can be detected at runtime. I.e. the atomicity can be guaranteed at read-time.
Naively we could just write data to the "main" table and to a special index table:
The index table would be keyed by <columnValue>|<rowkey>. The <rowkey> is necessary to keep the index entries unique.

This simple approach breaks down in the face of failures. Say a client writes to these two tables, but fails before one of the changes could be applied. Now we may have the main table update but missed the index update or vice versa.
Notice that it is OK to have a stale index entry (a false index match) but not to have a stale main table entry.

So here is a solution for consistent secondary indexes in HBase:
  • Update: If using TTL, setup the main table to expire slightly before the index table(s).
  • For a Put, first retrieve a local timestamp, apply all changes to the index table(s) first using that timestamp. Then apply the changes to the main table. Using the same timestamp.
    The timestamp can safely be created locally (with the same caveat that generally applies to HBase: Updates to the same KV in the same ms leads to more or less undefined results).
  • Upon delete, read back the row, then delete the main table entry, then remove all index entries (using the column values and timestamps read back from the row and using the same delete type - family, column, or version - that we used for the main table). Since we used the same timestamps this will be correctly.
    Update:
    As Daniel Gómez Ferro points out, this in fact can only work for version deletes. I.e. only a specific version of a row can be deleted. In order to delete all versions of a column (a column delete marker) of a row or all versions of all columns (a family delete marker), all of these versions would need to be enumerated in the client and be deleted one by one.
Note that if there is a failure we might end up with an extra entry in the index, but we'll never have an entry in the main table and not in the index table.
  • Now upon reading along an index, we find the rows that the index entries point to. When we retrieve the rows we have to verify that a row we read has the expected timestamp for the index column. If not we simply ignore that row/index entry. It must be from a partial index update - either from a concurrent update that is still in progress, or from a partially failed update.
    This is how we assure atomicity at read-time; or in other words we can tolerate a slightly wrong index without affecting correctness.
Lastly we need a M/R job that can build an index from an existing table. Because of the idempotent nature of index transaction - stemming from the fact that everything in HBase is timestamped - we can enable the index on a table (i.e. clients will immediately start to write to both tables) and then run the M/R job. As soon as the M/R is finished the index is up to date and can be used (the M/R job would update some index metadata in the end to mark the index as available).

That's it. No server side code needed. No locking. And this will scale nicely. Just a little bit of discipline. Discipline that could easily be encapsulated in a client library.

Since partial failures will be rare, and we can filter those at read time, we need not clean the index up.

Because of the timestamps we can also still perform correct time range queries.