A Third Choice
I’m working on a project right now that uses Cassandra as one of it’s databases, and I need to determine how I’m going to bucket my data. Unfortunately, the most common bucketing solutions have significant limitations in my use case.
I’d like to propose a new way to bucket data that uses a probabilistic method to get an approximate count for when a new bucket should be created.
Quick Cassandra Bucketing Background
If you already understand how Cassandra partitioning and bucketing work, feel free to skip to the next section. For everyone else, here’s a quick high level overview to set the stage.
Cassandra breaks the traditional database model by structuring data in a way that is tailored to a particular query (or narrow set of queries). This requires determining ahead of time how your data will be accessed, but it rewards the effort with massive gains in throughput for that narrow set of queries.1
As part of the setup, all Cassandra tables need to define a column, or columns, that will be used used to determine which “partition” each row of data belongs to. Cassandra will ensure all data in the same partition is stored on the same nodes, so queries for information in a single partition can be accomplished quickly. Cassandra will also automatically distribute the partitions across it’s nodes pseudo-evenly with a hashing algorithm for the sake of load balancing. When set up properly, read queries should only be pulling information from a single partition. Queries that cross partitions will be significantly slower than those that come from a single partition.2
Getting the partitions “right-sized” is important. Performance degrades if the partitions get too large. Cassandra can officially hold 2 billion rows per partition, but the practical limit is 100MB of data per partition. Very small partitions can also be a problem by causing additional overhead in the hashing/node lookup process, so it’s typically not a good idea to set up your data in a way that only has a few dozen (or fewer) rows per partition. However, it’s not as much of a performance concern as oversized partitions.
The limit on the minimum size for a partition is typically something more practical. Since we want to make sure our read queries are only hitting a single partition whenever possible, all of the data that is queried together needs to be partitioned together. In most cases, this is the requirement that forces partition sizes to increase. This can be a problem when you have an indefinite stream of data going into a single table. For example, suppose you are logging data from a group a sensors. You could partition out the data for each sensor (perhaps by a sensor ID column), but eventually, the stream of information from even a single sensor is going to exceed the 100MB soft-limit.
This is where bucketing comes in. Bucketing is where you add another column that’s sole purpose for existence is to break up the partitions. These generally fall into one of two categories, time-based, and count-based. Both have their own advantages and disadvantages.
Time-based is the simplest solution and works by using the extra bucketing column to break up the partitions by time. Using the sensor example from above, you could have a bucket column that is just a numerical representation of the month and year the data was logged (e.g. “202602”). This would automatically create a new partition for the sensor data every month when the bucket value changes.3 This can easily be tuned based on the expected rate of data (e.g. daily, yearly, every 6 hours, etc).
Time-based approaches have a major drawback though. What they gain in simplicity, they lose in adaptability. What if your the rate of data from your sensor is highly variable? What is sometimes you may get 50MB in a hour, but other times, it may take a month, or longer to get that much information? In this case, you’re basically forced to use hourly bucketing because the drawbacks of oversized partitions are greater than the drawbacks of undersized partitions, but there is still going to be weakness. Future queries for data from that sensor may need to iterate through dozens of partitions for adequate data. This is where count-based strategies come in.
In count-based bucketing strategies, we fix this problem by simply counting the number of rows that are added to the database and switching to a new bucket exactly when the partition is right-sized. In the sensor example above, lets assume the average sensor entry is 1KB and we want to target a 10MB partition size. That means we want 10,000 entries in each partition, so we’ll just take the entry count, divide by 10,000, round down, and that will be the entry in our bucket column. Easy-peasy right-sized buckets! So what’s the problem here?
The problem is that keeping track of the entry count comes with it’s own overhead. Cassandra doesn’t inherently keep track of the number of rows in each partition, and doing so in an additional database can potentially create an additional workload that is orders of magnitude larger than the main data load itself. Depending on how your data is distributed, and how critical it is that every database user gets the exact same information, it may also be necessary to implement some kind of locking mechanism to ensure that only one user can be inserting data at a time.
There are times when this is still a viable strategy, particularly when you already create an incrementing item number for some other purpose (e.g. an order number), but there are plenty of other times where using a count-based strategy entirely negates the throughput benefits you may have hoped to get from using Cassandra in the first place.
In this blog post, I suggest a third strategy.
The Idea of Probabilistic Bucketing
Time-based and count-based bucketing solutions are perfectly acceptable in many cases, but there is one situation where both of them fall short: when the rate of information flow is highly-variable, highly-unpredictable, and too large for efficient counting. Such situations are fairly common.
As a specific example, Discord used Cassandra (and now uses ScyllaDB, a Cassandra clone) in exactly this kind of situation. There is no way for Discord to accurately predict the flow of messages into each chat server. Some servers are going to be orders of magnitude more active than others. Some servers are going to have significant spikes in activity at times that are unpredictable (for Discord at a large scale). Their solution was to tune a time-based solution for the most active servers, and accept the performance cost in less active servers (since, ya know, they’re less active anyway).
I’m facing a very similar challenge, but I wanted to avoid the problem of tiny/empty partitions.
I decided to use a modified count-based solution where I only occasionally increment the count based on a random factor.
For example, in my case, the item ID numbers that I’m already generating end with several bytes of random data. I can just look at 9 bits of random data and increment the count if those bits are all 1s. This means I’ll be incrementing the counter an average of 1 time for every 512 inserts.
I can then increment the bucket when the count reaches 40. This gives me about a 1 in 1 quadrillion chance that I will have either less than 4,200 or greater than 58,000 rows in the partition when the bucket increments. Both of of those extremes are acceptable outcomes. In the more common cases, it’s around 1 in 1,000 that there will be fewer than 12,000 rows or greater than 32,000 rows in given partition. That’s a pretty good range!
The numbers can also be tuned for different desired outcomes. Increasing the likelihood of incrementing the counter (e.g. looking at fewer bits of random data) will improve the ability to target narrower partition size ranges, but will increase the load on the row counter. Changing the target count will, obviously, increase and decrease the upper and lower range of likely partition sizes. 1/512 and 40 work well for me.4
It’s essentially still a count-based bucketing strategy, but the counter is incremented substantially fewer times.
A quick summary of options:
Time-Based
- Pros
- Simple
- No counting overhead
- Cons
- Prone to tiny and/or empty buckets
- Best For
- Situations with consistent data flow
Count-Based
- Pros
- Precise bucket sizing
- Predictable partition transitions
- Cons
- Significant counting overhead
- Best For
- Situations where the count is tracked anyway
- Situations where bucket sizing is critical and the overhead is worth it (although I’d consider if Cassandra is really the right DB in this case)
Probabilistic-Based
- Pros
- No tiny/empty buckets
- Massive reduction in counting overhead
- Cons
- Less control over bucket sizing
- More complex to coordinate in distributed environments
- Best For
- Situations with unpredictable and highly-variable data flows
Probabilistic Bucketing Implementation
In a situation where only one process is ever accessing the data, implementing the counter is fairly simple. Some pseudocode:
CHANGE_BUCKET_COUNT = 40
DO_INCREMENT_NUM = 511
ProbCounterUpdate(database, bucketId, randomBits)
{
randomNumber = BitsToNum(randomBits);
if (randomNumber == DO_INCREMENT_NUM)
{
database.IncrementBucketCounter(bucketId);
bucketData = database.GetBucketData(bucketId);
if (bucketData.counter >= CHANGE_BUCKET_COUNT)
{
database.IncrementBucket(bucketId);
database.ResetBucketCounter(bucketId);
}
}
}
In a distributed environment, additional coordination logic will be necessary to make sure all of the agents querying the database will swap to the new bucket at the same time. This could be accomplished with locking. I have another solution that I haven’t been able to test effectively (i.e. in an actual distributed environment), so I’ll relegate it to a footnote.5
A real solution is likely to have some kind of caching in place as well, so a more real implementation will probably look more like this:
CHANGE_BUCKET_COUNT = 40
DO_INCREMENT_NUM = 511
ProbCounterUpdate(database, cache, bucketId, randomBits)
{
randomNumber = BitsToNum(randomBits);
if (randomNumber == DO_INCREMENT_NUM)
{
database.IncrementBucketCounter(bucketId);
cache.UpdateBucket(bucketId);
bucketData = cache.GetBucketData(bucketId);
if (bucketData.counter >= CHANGE_BUCKET_COUNT)
{
PreUpdateCoord(bucketData);
database.IncrementBucket(bucketId);
database.ResetBucketCounter(bucketId);
cache.UpdateBucket(bucketId);
PostUpdateCoord(bucketData);
}
}
}
On the read query side, it’s also pretty straightforward. There is a possibility that your results will be split across two partitions (i.e. the current bucket and the prior bucket). There are two basic approaches to accounting for this. You can preemptively query both partitions and stitch the information together, or you can query the current partition, count the results, and then query the prior partition when necessary. I decided to go with the later because I expect that one query will be enough in most cases.
The need to check multiple partitions is similar to time-based bucketing, with two key advantages. First, you automatically know you are at the end of the results, and don’t need to query a prior partition, when the current bucket is 0. Second, it’s less likely that you will even need to search additional partitions, and in the case that you do, it’s essentially certain that you will only need to check one additional partition for the required number of rows (unless you are requesting a very large number of rows).
GetRows(...other required parameters..., partitionKey, numRows, bucket = null)
{
if (bucket is null)
bucket = cache.GetBucket(partitionKey);
...other logic to build query...
results = database.ExecuteReadQuery();
while (results.Count < numRows && bucket > 0)
{
extraResults = GetRows(...other required parameters..., partitionKey, numRows - results.Count, bucket - 1);
results = Combine(results, extraResults);
}
return results;
}
Summary
Highly variable and highly unpredictable data flow environments are typically the worst-case scenario for Cassandra bucketing strategies. I believe that this probabilistic bucket strategy is able to provide a “best of both worlds” solution to this problem.
It eliminates the tiny/empty partition problem of time-based bucketing strategies while massively reducing the overhead in count-based bucketing strategies. The main drawbacks are limited control over partition sizing (relative to count-based solutions) and that it doesn’t eliminate the requirement for external counting infrastructure (relative to time-based solutions).
It’s not a perfect strategy that fits all situations, but I believe there is a niche out there that desperately needs a better solution, and that probabilistic bucketing can be that solution.
- It also means that other queries outside of that narrow range will be prohibitively slow, so plan carefully. ๐ โฉ๏ธ
- In cases where you need data from multiple partitions, it’s better to have your application send separate queries for each partition. Another consideration during setup is “clustering”, which essentially pre-sorts the data based on a designated column (or columns). Clustering is also critical for performance, but it’s not particularly relevant to bucketing. โฉ๏ธ
- Cassandra routes insert queries to the appropriate partition on the fly. If an insert needs to go to a partition that doesn’t exist, it just creates the partition. In this example, where the partition is determined by the sensor’s ID and a bucket value, when a new insert shows up at the beginning of the month with the same sensor ID and new bucket value, it automatically starts putting the new data in a new partition. โฉ๏ธ
- Handy Excel formula for working this out:
=BINOM.DIST($count-1, $target, $odds, TRUE)
$count = count when bucket will be incremented (e.g. 40)
$target = target number of rows to check
$odds = odds of incrementing the counter on any given insert (e.g. 1/512)
For example, entering=BINOM.DIST(40-1, 20000, 1/512, TRUE)shows that the there is a 53.85% chance, with my selection of 512 and 40, that any particular partition will have more than 20,000 rows. โฉ๏ธ - My solution adds a virtual “Bucket State” to each bucket count tracker. Every bucket will be in one of three states, “Steady State”, “Pre-Change”, or “Post-Change”. The buckets don’t actually have any kind of state property in the database. Rather, the state is implied based on the values in lastChange and nextChange properties that are stored in the database. Most of the time, buckets are in Steady State.
When the bucket change counter number is reached, the bucket is moved into a Pre-Change state. A future nextChange time is set and the bucket is incremented, but the counter is not reset. In this state, all agents continue to use the prior bucket (i.e. “bucket – 1”) until the nextChange time is reached.
At the nextChange time, the bucket moves into Post-Change state. In this state, all agents start using the new bucket number. The first agent to access the bucket will reset the counter to 0. There is a possibility that several agents will reset the counter to 0 at the same time, but that’s inconsequential (and preferred to the overhead of a formal locking system). After the counter is reset, the bucket is consider to be in Steady State again.
I use a local cache for bucket data that is critical for ensuring all agents swap buckets at the same time. The nextChange property is set to ensure that, by the time the bucket changeover happens, all caches will have either have retrieved the Pre-Change state (including the nextChange time) or will have stale enough data that they’re going to request fresh bucket data from the database anyway. โฉ๏ธ
Leave a Reply