• You are here: 
  • Home
  • Chapter 6 Schema Design

Using counters for building real time statistics

Posted on August 12th, 2011

(The high Performance Cassandra cookbook was based on apache Cassandra 0.7.X which did not have counter support )
Counters are a distributed, eventually consistent, non-atomic counter implementation on top of Apache Cassandra. A value can now represent a non-atomic counter that is able to receive updates on any of its key’s replicas. This recipe shows how to use the counter feature in Cassandra to pre-calculate results. This recipe builds a real-time analytics for business web logs, allowing us to count volume  per day, per hour, and per minute granularity.

A sample dataset is saved in a file:

url | username | event_time | time_to_serve_millis
/page1.htm | edward | 2011-01-02  :04:01:04 | 45
/page1.htm | stacey | 2011-01-02   :04:01:05 | 46
/page1.htm | stacey | 2011-01-02   :04:02:07 | 40
/page2.htm | edward | 2011-01-02 :04:02:45 | 22

Our goal is to pre-compute two results sets. The first is page counts bucketed by minute:

page | time | count
/page1.htm | 2011-01-02  :04:01 | 2
/page1.htm | 2011-01-02  :04:02 | 1
/page2.htm | 2011-01-02  :04:02 | 1

The second is a count of cumulative resources by user bucketed by hour:

user | time | total_time_to_serve
edward | 2011-01-02  :04 | 67
stacey | 2011-01-02  :04 | 86

How to do it…

create a file hpcbuild/c06/Counter.java

package hpcas.c06;

import hpcas.c03.*;
import java.io.*;
import java.text.*;
import java.util.*;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;

/* create a class to represent each input row */
class Record {
  String url,username;
  Date date;
  int timeToServe;
}

public class Counter {
  public static void main (String [] args) throws Exception {
    FramedConnWrapper wrap = new FramedConnWrapper(
            Util.envOrProp("host"),
            Integer.parseInt(Util.envOrProp("port")));
    wrap.open();
    wrap.getClient().set_keyspace("counttest");
    for (Record r: readRecords(Util.envOrProp("afile"))){
      writeRecord(wrap.getClient(),r);
    }
    wrap.close();
  }
  /* This method reads an entire file into a list of records */
  public static List<Record> readRecords(String file) throws Exception {
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    List<Record> results = new ArrayList<Record>();
    BufferedReader br = new BufferedReader(new FileReader(new File(file)));
    String line = null;
    while ( (line = br.readLine()) !=null){
      Record r = new Record();
      String [] parts = line.split("\\|");
      r.url=parts[0];
      r.username=parts[1];
      r.date = df.parse(parts[2]);
      r.timeToServe= Integer.parseInt(parts[3].trim());
      results.add(r);
    }
    return results;
  }
  /* this method writes each record to cassandra */
  public static void writeRecord(Cassandra.Client c, Record r) throws Exception {
    /*we have multiple date formats that bucket records by hour, minute, to day */
    DateFormat bucketByMinute = new SimpleDateFormat("yyyy-MM-dd HH:mm");
    DateFormat bucketByDay = new SimpleDateFormat("yyyy-MM-dd");
    DateFormat bucketByHour = new SimpleDateFormat("yyyy-MM-dd HH");

    CounterColumn counter = new CounterColumn();
    ColumnParent cp = new ColumnParent("page_counts_by_minute");
    counter.setName(ByteBufferUtil.bytes(bucketByMinute.format(r.date)));
    counter.setValue(1);
    c.add(ByteBufferUtil.bytes( bucketByDay.format(r.date)+"-"+r.url)
            , cp, counter, ConsistencyLevel.ONE);

    CounterColumn counter2 = new CounterColumn();
    ColumnParent cp2 = new ColumnParent("user_usage_by_minute");
    counter2.setName(ByteBufferUtil.bytes(bucketByHour.format(r.date)));
    /* Incrementing by a value rather then one. */
    counter2.setValue(r.timeToServe);
    c.add(ByteBufferUtil.bytes( bucketByDay.format(r.date)+"-"+r.username)
            , cp2, counter2, ConsistencyLevel.ONE);
  }
}

Use the CLI to create the required keyspaces and column families.

[default@unknown] create keyspace counttest;
[default@unknown] use counttest;
[default@counttest] create column family page_counts_by_minute with [default@counttest] default_validation_class=CounterColumnType and replicate_on_write=true;
[default@counttest] create column family user_usage_by_minute with default_validation_class=CounterColumnType and replicate_on_write=true;

Run the Counter class.

host=127.0.0.1 port=9160 file=a.txt ant -DclassToRun=hpcas.c06.Counter run

Use list operations in the cli to confirm the results.

[default@unknown] use counttest;
[default@counttest] assume user_usage_by_minute keys as ascii;
[default@counttest] assume user_usage_by_minute validator as ascii;
[default@counttest] assume user_usage_by_minute comparator as ascii;

[default@counttest] list user_usage_by_minute;
——————-
RowKey: 2011-01-02- stacey
=> (counter=2011-01-02 04, value=86)
——————-
RowKey: 2011-01-02- edward
=> (counter=2011-01-02 04, value=67)

assume page_counts_by_minute keys as ascii;
assume page_counts_by_minute comparator as ascii;
assume page_counts_by_minute validator as ascii;

[default@counttest] list page_counts_by_minute;
——————-
RowKey: 2011-01-02-/page1.htm
=> (counter=2011-01-02 04:01, value=2)
=> (counter=2011-01-02 04:02, value=1)
——————-
RowKey: 2011-01-02-/page2.htm
=> (counter=2011-01-02 04:02, value=1)

2 Rows Returned.

How it works…

By bucketing row keys column keys  and inserting into counters columns the required results are available to read using simple get and list commands. This method of pre-calculation pushes most of the processing burden on the loading side. This would be an alternative to an ETL process which would have to load, possibly index, and then query data to get results. With this system results are available on demand.

There is more…

There are some important differences between counter columns and standard columns.

Counter performance

Unlike standard columns writing to a counter does involve a read on one of the natural endpoints for the counter. This makes these writes more intensive then a typical write operation.

Batching writes

Batching up counter writes on the client side will lower the number of increment calls. While this contradicts key concept of a counter it may be a valid optimization depending on how real time you results need to be.

Caveats

This recipe buckets on dates using strings. The date should be stored as epoch time or some other form which is not effected by daylight savings or other time zone related issues. This recipe was demonstrated this way for readability.

Tags: , , ,
Filed under Chapter 6 Schema Design | 44 Comments »