• You are here: 
  • Home

Using nodetool setstreamthroughput to adjust decomission and other streaming peformance

Posted on August 3rd, 2012

Cassandra uses streaming in several places. Streaming is used when a node joins or leaves the ring. For example, when a node is decomissioned using nodetool decommission it streams its data to other nodes before leaving the ring. This value can be adjusted in both the confirguration file and during runtime using nodetool or JMX.

How to do it…

In the conf/cassandra.yaml file the default value can be set:

# When unset, the default is 400 Mbps or 50 MB/s.
# stream_throughput_outbound_megabits_per_sec: 400

This command can be adjusted at runtime using nodetool. The value here is in MB/s

bin/nodetool -h cdbeq01 -p 8585 setstreamthroughput 51

Note: These changes need to be done per machine.

How it works…

setstreamthroughput is a rate limit. The default is 50 Mb/second. If you have a high speed network such as gigabit ethernew this value can be set higher to speed-up certain operations. For example, when doing nodetool decommission the decomission will finsih sooner if the configuration value is raised, assuming no other limiting factor is involved. However, you may wish to set the value lower to researve bandwidth and other resources for low latency operations.

There is more…

Remember that a network with Gigabit ethernet ports may not be capable of supporting 100% utilization on each port at the same time. Consult your network engineer and consider how many of these streaming or other operations bandwidth intensive are happening at once when adjusting this setting cluster wide.

 

Filed under Chapter 4 - Performance Tuning, Chapter 7 Administration | No Comments »

Raising the number of processes for Cassandra

Posted on July 9th, 2012

Redhat linux 6 and derivatives like CENTOS 6 lower the number of processes a user process can start to avoid accidental fork-bomls. Under high request rate this can cause the operating system to kill the cassandra process.

How to do it

Change the system limits from 1024 to 10240. Start a new shell for these changes to take effect.

vi /etc/security/limits.d/90-nproc.conf
*          soft    nproc     10240

How it works

An active cassandra server can create many threads and processes. Normally Cassandra daemonizes and runs as a standard user not root. This setting allows non root users to create more processes.

Filed under Chapter 4 - Performance Tuning, Chapter 7 Administration | No Comments »

Using nodetool to temporarily disable a node

Posted on February 6th, 2012

There are times when you would like a Cassandra node to stop receiving requests or appear like it is down to outside clients. You may want to use this to disable a node and adjust your log4j properties for more debugging, or your may be running some administrative action like repair or rebuildsstables and you want this action to complete as quickly as possible. Being able to disable Cassandra without shutting it down completely can be very handy.

How to do it:

Use nodetool to disable Cassandra only on a specific node.

/usr/local/apache-cassandra-1.0.7/bin/nodetool -h cassandraserver -p 8585 disablegossip
/usr/local/apache-cassandra-1.0.7/bin/nodetool -h cassandraserver -p 8585 disablethrift

How it works:

Gossip is the protocol that Cassandra nodes use to determine which nodes are UP or DOWN. Cassandra nodes gossip to each other and maintain a state of the cluster. By disabling gossip other Cassandra nodes will stop forwarding read and write requests to this node because they will believe the node is down. Client using thrift will still be able to connect to a node in this state. Disablethrift disables the client port 9160 which prevents clients from connecting to this node. Typically you will enable both disablethrift and disablegossip together.

There is more:

To re-enable gossip and thrift use the following commands:

/usr/local/apache-cassandra-1.0.7/bin/nodetool -h cassandraserver -p 8585 enablegossip
/usr/local/apache-cassandra-1.0.7/bin/nodetool -h cassandraserver -p 8585 enablethrift

Tags: , , , , ,
Filed under Chapter 7 Administration | 65 Comments »

Designing your own AbstractType

Posted on October 14th, 2011

(In the Cassandra high performance cookbook we showed how to create an abstract type in cassandra 0.7.X. The API changed again in 0.8.X and again in 1.0.X. Hopefully the API has stabilized.)

(For this recipe we are using a subset of the code from Cassandra-AnyType )

Extending AbstractTypes give you the ability to control how data is stored, sorted, and even displayed in Cassandra. For this recipe we develop an AbstractType subclass that serializes Objects into JSON. The columns are sorted by their compateTo() method not the byte sorting that is the default.

How to do it…

Use git to checkout AnyType. Then use maven to build the project.

$ git clone https://github.com/edwardcapriolo/Cassandra-AnyType
$ cd Cassandra-AnyType
$ mvn install

Sections of the code relevant to the example are highlighted below.


package com.jointhegrid.anytype;

import com.google.gson.Gson;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;

/* AnyType uses a one byte header to represent the type of the column */
public class AnyType extends AbstractType<ByteBuffer>{
  public static final byte HEADER_GSON_COMPARABLE=(byte) 5;
  public static final AnyType instance = new AnyType();

  AnyType(){

  }

  /* helper method not needed by cassandra*/
  public Any composeAny(ByteBuffer bb) {    
    byte type = bb.get();
    if (type == HEADER_GSON_COMPARABLE){
      if (bb.remaining() == 0){
        return new Any(GsonComparable.class,null);
      } else {
        int classSize = (bb.get() & 0xFF) << (7+1);
        classSize = (bb.get() & 0xFF);
        byte [] classB = new byte[classSize];
        bb.get(classB);
        int objSize = (bb.get() & 0xFF) << (7+1);
        objSize = (bb.get() & 0xFF);
        byte [] objB = new byte[objSize];
        bb.get(objB);
        Gson g = new Gson();
        try {
          Class c = Class.forName(new String(classB));
          Object o =g.fromJson(new String(objB),c );
          return new Any(GsonComparable.class, o);
        } catch (ClassNotFoundException ex) {
          Logger.getLogger(AnyType.class.getName()).log(Level.SEVERE, null, ex);
        }
        return null;
      }
    }
    return null;
  }

  /* helper method not needed by cassandra*/
  public ByteBuffer decomposeAny(Any t) {
      if (t.getType().equals(GsonComparable.class)){
      ByteBuffer bb=null;
      if (t.getElement()==null){
        bb = ByteBuffer.allocate(1);
        bb.put(HEADER_GSON_COMPARABLE);
      } else {
        Gson g = new Gson();
        String classOfElement = t.getElement().getClass().getName();
        String s = g.toJson(t.getElement());
        bb = ByteBuffer.allocate(
                1 // header
                +(2+classOfElement.getBytes().length) //header and class
                +(2+s.getBytes().length)); //header and data
        bb.put(HEADER_GSON_COMPARABLE);
        bb.put ( (byte) ((classOfElement.getBytes().length >> (7+1)) & 0xFF));
        bb.put( (byte) (classOfElement.getBytes().length & 0xFF) );
        bb.put(classOfElement.getBytes());
        bb.put ( (byte) ((s.getBytes().length >> (7+1)) & 0xFF));
        bb.put( (byte) (s.getBytes().length & 0xFF) );
        bb.put(s.getBytes());
      }
      bb.position(0);
      return bb;
    }
    return null;
  }

  /* this method control how the string will be displayed */
  public String getString(ByteBuffer bb) {
    ByteBuffer other = bb.duplicate();
    return ByteBufferUtil.bytesToHex(bb);
  }

  /* throw an exception here if the data is invalid */
  public void validate(ByteBuffer bb) throws MarshalException {    
  }

  /* This method controls how Cassandra will sort the data */
  public int compare(ByteBuffer o1, ByteBuffer o2) {
    if (null == o1) {
      if (null == o2) {
        return 0;
      } else {
        return -1;
      }
    }
    if (o2==null){
      return 1;
    }

    ByteBuffer one = o1.duplicate();
    ByteBuffer two = o2.duplicate();
    if (o1.remaining() == 0) {
      return o2.remaining() == 0 ? 0 : -1;
    }
    if (o2.remaining() == 0) {
      return 1;
    }
    Any a1 = AnyType.instance.composeAny(one);
    Any b1 = AnyType.instance.composeAny(two);
    if (a1.getType()==null){
      return b1.getType() == null ? 0 : -1;
    }
    if (b1.getType()==null){
      return 1;
    }
    if (a1.getType().equals(b1.getType())){
      if (a1.getElement() == null){
        return b1.getElement() == null? 0: -1;
      }
      if (b1.getElement() == null){
        return 1;
      } else if (one.get(0) == AnyType.HEADER_GSON_COMPARABLE){
        return ((Comparable)a1.getElement()).compareTo((Comparable) b1.getElement());
      } else {
        int res = ((Comparable)a1.getElement()).compareTo((Comparable) b1.getElement());
        return res;
      }
    } else {
      return BytesType.bytesCompare(o1, o2);
    }
  }

  public ByteBuffer compose(ByteBuffer bb) {
    return bb.duplicate();
  }

  public ByteBuffer decompose(ByteBuffer t) {
    return t;
  }

}

package com.jointhegrid.anytype;

public class Any  {

  private Class type;
  private Object element;

  public Any(Object aThing){
    if (aThing !=null) {
      type=aThing.getClass();
      element=aThing;
    }
  }
 
  public Any(Class c, Object aThing){
    type=c;
    element=aThing;
  }

  public Any(){

  }

  public Class getType() {
    return type;
  }

  public void setType(Class type) {
    this.type = type;
  }
 
  public Object getElement() {
    return element;
  }

  public void setElement(Object element) {
    this.element = element;
  }

  public String toString(){
    StringBuilder sb = new StringBuilder();
    sb.append("type:").append(type);
    sb.append(" ");
    sb.append("element:").append(element);
    return sb.toString();
  }

  @Override
  public int hashCode() {
    int hash = 7;
    hash = 67 * hash + (this.type != null ? this.type.hashCode() : 0);
    hash = 67 * hash + (this.element != null ? this.element.hashCode() : 0);
    return hash;
  }

  @Override
  public boolean equals(Object obj) {
    if (obj == null) {
      return false;
    }
    if (getClass() != obj.getClass()) {
      return false;
    }
    final Any other = (Any) obj;
    if (this.type != other.type && (this.type == null || !this.type.equals(other.type))) {
      return false;
    }
    if (this.element != other.element && (this.element == null || !this.element.equals(other.element))) {
      return false;
    }
    return true;
  }

}

The following is a unit tests the demonstrates the usage of the class.

package com.jointhegrid.anytype;

public class Stuff implements Comparable{
  int x;
  double d;
  public Stuff(){

  }

  @Override
  public int compareTo(Object o) {
    if (o instanceof Stuff){
      Stuff stuff = (Stuff) o;
      if (this.x == stuff.x){
        return 0;
      } else if (this.x>stuff.x){
        return 1;
      } else {
        return -1;
      }
    }
    return -1;
  }

  @Override
  public String toString() {
    return "Stuff{" + "x=" + x + "d=" + d + '}';
  }

  @Override
  public boolean equals(Object obj) {
    if (obj == null) {
      return false;
    }
    if (getClass() != obj.getClass()) {
      return false;
    }
    final Stuff other = (Stuff) obj;
    if (this.x != other.x) {
      return false;
    }
    if (Double.doubleToLongBits(this.d) != Double.doubleToLongBits(other.d)) {
      return false;
    }
    return true;
  }

  @Override
  public int hashCode() {
    int hash = 7;
    hash = 43 * hash + this.x;
    hash = 43 * hash + (int) (Double.doubleToLongBits(this.d) ^ (Double.doubleToLongBits(this.d) >>> 32));
    return hash;
  }

}

package com.jointhegrid.anytype;

import java.nio.ByteBuffer;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import java.util.List;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.protocol.TProtocol;
import org.apache.cassandra.contrib.utils.service.CassandraServiceDataCleaner;
import org.apache.cassandra.service.EmbeddedCassandraService;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class AnyTypeInAction {

  static EmbeddedCassandraService ecs;

  @BeforeClass
  public static void setUpClass() throws Exception {
    CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
    cleaner.prepare();
    ecs = new EmbeddedCassandraService();
    ecs.start();
  }

  @AfterClass
  public static void tearDownClass() throws Exception {
  }

  @Before
  public void setUp() {
  }

  @After
  public void tearDown() {
  }

  @Test
  public void rangeTest2WithGSON() throws Exception {
    Cassandra.Client client = getClient();
    /* Use CQL to create the needed keyspaces and column families */
    String query = "CREATE KEYSPACE etest WITH  strategy_options:replication_factor=1 AND strategy_class = 'SimpleStrategy'";
    client.execute_cql_query(ByteBufferUtil.bytes(query), Compression.NONE);
    client.set_keyspace("etest");

    String cf = "create columnfamily cf"
            + " (key bytea primary key) "
            + "with comparator = 'com.jointhegrid.anytype.AnyType' ";
    client.execute_cql_query(ByteBufferUtil.bytes(cf), Compression.NONE);

    /* create instances of stuff that are comparable */
    Stuff s = new Stuff();
    s.x = 5;
    s.d = 90;

    Stuff t = new Stuff();
    t.x = 6;
    t.d = 90;

    Stuff u = new Stuff();
    u.x = 7;
    u.d = 90;

    Any s1 = new Any(GsonComparable.class, s);
    Any s2 = new Any(GsonComparable.class, t);
    Any s3 = new Any(GsonComparable.class, u);

    ColumnParent cp = new ColumnParent("cf");
    Column column = new Column();
    column.name = AnyType.instance.decomposeAny(s1);
    column.setValue("val".getBytes());
    column.setTimestamp(1L);

    client.insert(ByteBuffer.wrap("a".getBytes()), cp, column, ConsistencyLevel.ONE);

    column.name = AnyType.instance.decomposeAny(s2);
    client.insert(ByteBuffer.wrap("a".getBytes()), cp, column, ConsistencyLevel.ONE);

    column.name = AnyType.instance.decomposeAny(s3);
    client.insert(ByteBuffer.wrap("a".getBytes()), cp, column, ConsistencyLevel.ONE);

    SliceRange sr = new SliceRange();
    sr.setStart(new byte[0]);
    sr.setFinish(new byte[0]);
    sr.setCount(2);

    SlicePredicate sp = new SlicePredicate();
    sp.setSlice_range(sr);

    List<ColumnOrSuperColumn> cols = client.get_slice(ByteBufferUtil.bytes("a"),
            cp, sp, ConsistencyLevel.ONE);

    /* ensure that the returned objects are in the order you desire */
    Assert.assertEquals(2, cols.size());
    Assert.assertEquals(s, AnyType.instance.composeAny(cols.get(0).column.name).getElement());
    Assert.assertEquals(t, AnyType.instance.composeAny(cols.get(1).column.name).getElement());

    System.err.println("doing second slice");
    SlicePredicate sp2 = new SlicePredicate();
    SliceRange sr2 = new SliceRange();
    sr2.setStart((AnyType.instance.decomposeAny(s2)));
    sr2.setFinish(new byte[0]);
    sr2.setCount(3);
    sp2.setSlice_range(sr2);

    cols = client.get_slice(ByteBufferUtil.bytes("a"),
            cp, sp2, ConsistencyLevel.ONE);

    Assert.assertEquals(2, cols.size());
    Assert.assertEquals(s2, AnyType.instance.composeAny(cols.get(0).column.name));
    Assert.assertEquals(s3, AnyType.instance.composeAny(cols.get(1).column.name));

  }

}

How it works…

AnyType uses the GSON library to serialize java objects into JSON strings. While the data is ultimately stored as a ByteBuffer does need to be converted into a Java object so the compareTo method can be called. ByteBuffers are efficient however it is important to remember that some methods advance their internal position while other methods do not. Underflow or Overflow exceptions typically occur when uses have mismanaged the position of the ByteBuffer.
Originally Cassandra was schema-less. While it still can be used in this way attaching type information to columns or designing your own types makes Cassandra something like an object database. Types also support validation which is important because Cassandra is typically used in a write before read manner. Without validation other systems can insert bad data accidentally.

Filed under Chapter 9 Coding and Internals, Uncategorized | 62 Comments »

Writing Composite Columns with from Java

Posted on August 29th, 2011

Composite columns allow a list of types to be packed into single column. One of the benefits of composites is they also sort properly with respect to multiple components. While higher level clients like Hector and Pycasa already have support for composite columns, support may not yet be in place for the particular client you are using. This recipe shows a simple way to write a Composite Column from java. Since this program uses only simple byte arrays it should be easy to port this program to another programming language.

The best source of documentation for Composite Columns sits directly in the Cassandra code at ./src/java/org/apache/cassandra/db/marshal/CompositeType.java

/*
* The encoding of a CompositeType column name should be:
*   <component><component><component> …
* where <component> is:
*   <length of value><value><’end-of-component’ byte>
* where <length of value> is a 2 bytes unsigned short the and the
* ‘end-of-component’ byte should always be 0 for actual column name.
*/

How to do it…

Create a file CompositeTool.java.

import java.io.ByteArrayOutputStream;
import java.util.List;
public class CompositeTool {
  public static  byte [] makeComposite(List<byte []> b){
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    for (int i=0;i<b.size() ;i++){
      bos.write( (byte) ((b.get(i).length >> (7+1) ) & 0xFF)) ;
      bos.write( (byte) (b.get(i).length & 0xFF)) ;
      for (int j=0;j<b.get(i).length;j++){
        bos.write(  b.get(i)[j] & 0xFF) ;
      }
      bos.write((byte)0);
    }
    return bos.toByteArray();
  }
}

Also create  CompositeToolTest.java.

package casbase;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnParent;
import java.util.List;
import java.util.ArrayList;
import org.apache.cassandra.thrift.Compression;
import java.nio.ByteBuffer;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class CompositeToolTest {
  public CompositeToolTest() {
  }
  /* We uesd CQL to create the required keyspaces and column families for this test */
  @BeforeClass
  public static void setUpClass() throws Exception {
    ConnWrapper cw = dbTest.getClient();
    cw.getClient().execute_cql_query(
     ByteBuffer.wrap(("create keyspace composite_test with strategy_class "
     + "= 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options:replication_factor=1").getBytes()), Compression.NONE);
    cw.getClient().execute_cql_query(
     ByteBuffer.wrap("USE composite_test".getBytes()), Compression.NONE);
    cw.getClient().execute_cql_query(
     ByteBuffer.wrap(("create columnfamily composite_test (key bytea primary key) "
     + "with comparator = 'CompositeType(AsciiType,AsciiType)'").getBytes()), Compression.NONE);
  }
  @AfterClass
  public static void tearDownClass() throws Exception {
    ConnWrapper cw = dbTest.getClient();
    /*
    * comments are in place so the CLI can be used to view the column
    * remove to make unit tests repeatable
    try {
     cw.getClient().execute_cql_query(ByteBuffer.wrap
      ("drop keyspace composite_test".getBytes()), Compression.NONE);
    } catch (Exception ex) {
    }
   *
   */
  }
  @Test
  public void testAgain() throws Exception {
    List<byte[]> b = new ArrayList<byte[]>();
    b.add("hello".getBytes());
    b.add("goodbye".getBytes());
    byte[] x = CompositeTool.makeComposite(b);
    ConnWrapper cw = dbTest.getClient();
    cw.getClient().set_keyspace("composite_test");
    ColumnParent cp = new ColumnParent("composite_test");
    Column c = new Column();
    c.setName(x);
    c.setValue("again".getBytes());
    c.setTimestamp(System.currentTimeMillis());
    cw.getClient().insert(ByteBuffer.wrap("data".getBytes()), cp, c, ConsistencyLevel.ONE);
  }
}

Use the cli to ensure the composite column is set correctly.
[default@composite_test] use composite_test;
[default@composite_test] list composite_test;
Using default limit of 100
——————-
RowKey: 64617461
=> (column=hello:goodbye, value=again, timestamp=1314636481883)

1 Row Returned.

How it works…

Compsite columns have a simple format however many of the higher level clients deceptively wrap code in abstraction. As our example shows writing to a composite column is relatively easy.

There is more…

thanks thobbs and pcmanus

Filed under Chapter 9 Coding and Internals, Uncategorized | 64 Comments »

Setting up your host file and DNS system correctly

Posted on August 16th, 2011

Apache Cassandra is a distributed storage system. Like many distributed storage systems it is a requirement that your IP and DNS systems are setup correctly on all hosts for proper functionality.

Getting ready

For this recipe assume you have three hosts cas01.domain.pvt(10.1.1.1), cas02.domain.pvt(10.1.1.2), cas03.domain.pvt(10.1.1.3)

How to do it…

Ensure your hostfile resolves the DNS name to your routable IP (not localhost) by editing /etc/hosts

127.0.0.1               localhost.localdomain localhost
10.1.1.1     cas01.domain.pvt cas01

Run the hostname command and ensure the hostname exactly matches your host file

$ hostname
cas01.domain.pvt

Go to another machine on your network and ping this host by hostname.

# ping -c1 cas01.domain.pvt
PING cas01.domain.pvt (10.1.1.1) 56(84) bytes of data.
64 bytes from cas01.domain.pvt (10.1.1.1): icmp_seq=1 ttl=64 time=0.020 ms

How it works…

Unless the DNS system is setup correctly hosts will have difficult time communicating with each other. It is possible to work around DNS by rpc_address and listen_address in the conf/cassnadra.yaml file, but Java’s JMX systems which is used by nodetool uses RMI and will may have trouble connecting to remove hosts if the name resolution is not correct.

There is more…

If possible try to avoid multiple ip per machine which can cause applications to auto-detect the wrong IP. Also be aware of IPV6 support which is enabled by default in many Linux distributions. IPV6 can lead to confusion if enabled but not configured correctly.

At times you can set the java property -D java.rmi.server.hostname=XXX in the conf/cassandra-env.sh file.

See also…

Infomation on java properties that effect the JMX port can be found at http://download.oracle.com/javase/1.4.2/docs/guide/rmi/javarmiproperties.html

For information about how JMI and RMI are challenged by multi-homed systems visit http://weblogs.java.net/blog/emcmanus/archive/2006/12/multihomed_comp.html

Tags: , ,
Filed under Chapter 1 Getting Started | 64 Comments »

Creating column familes with the CompositeType comparator

Posted on August 16th, 2011

(The High Performance Cassandra Cookbook was targeted on version 0.7.X, Composite types entered the Cassandra tree in version 0.8.1 and some minor changes were made in Cassandra to accommodate them)
CompositeTypes are an alternative to packing and serializing your own structures into cassandra’s columns. CompositeType allows a user to create a record which is a list of other Cassandra types such as UTF8Type or TimeUUID type. This recipe shows how to use the CompositeType information inside the comparators metadata.

How to do it…

Using the Cassandra CLI create a column family with a comparator specified as CompositeType that has two components which are UTF8 Strings.

[default@edstest] use edstest;
[default@edstest] create column family composite_test with comparator = ‘CompositeType(UTF8Type,UTF8Type)’;

You can insert into the composite type by using a : inside a single quoted string to separate the parts.

[default@edstest] set composite_test['a']['thing1:thing2']= ‘wow’;
Value inserted.
[default@edstest] get composite_test['a'];
=> (column=thing1:thing2, value=776f77, timestamp=1313540404197000)
Returned 1 results.

Create another composite column this time using UTF8Type and IntegerType. Then confirm that you can not insert data of the wrong type.

[default@edstest] create column family composite_test2 with comparator = ‘CompositeType(UTF8Type,IntegerType)’;
[default@edstest] set composite_test2 ['a']['thing:1']=’stuff’;
Value inserted.
[default@edstest] set composite_test2 ['a']['thing:z']=’stuff’;
org.apache.cassandra.db.marshal.MarshalException: unable to make int from ‘z’

[default@edstest] get composite_test2 ['a'];
=> (column=thing:1, value=7374756666, timestamp=1313540630087000)
Returned 1 results.

How it works…

Cassandra stores the CompositeType metadata. This information allows method such as get_slice to return data in a sorted order. This has an advantage over using a serialization method like JSON where Cassandra can only sort on the byte level, because it is unaware of the data’s structure.

There is more…

Composite columns may eventually replace ‘super column families’ in cassandra. This change should be transparent to end users of ‘super column families’ as far as the thrift API is concerned.

Tags: , ,
Filed under Chapter 2 The Cassandra CLI | 46 Comments »

Using counters for building real time statistics

Posted on August 12th, 2011

(The high Performance Cassandra cookbook was based on apache Cassandra 0.7.X which did not have counter support )
Counters are a distributed, eventually consistent, non-atomic counter implementation on top of Apache Cassandra. A value can now represent a non-atomic counter that is able to receive updates on any of its key’s replicas. This recipe shows how to use the counter feature in Cassandra to pre-calculate results. This recipe builds a real-time analytics for business web logs, allowing us to count volume  per day, per hour, and per minute granularity.

A sample dataset is saved in a file:

url | username | event_time | time_to_serve_millis
/page1.htm | edward | 2011-01-02  :04:01:04 | 45
/page1.htm | stacey | 2011-01-02   :04:01:05 | 46
/page1.htm | stacey | 2011-01-02   :04:02:07 | 40
/page2.htm | edward | 2011-01-02 :04:02:45 | 22

Our goal is to pre-compute two results sets. The first is page counts bucketed by minute:

page | time | count
/page1.htm | 2011-01-02  :04:01 | 2
/page1.htm | 2011-01-02  :04:02 | 1
/page2.htm | 2011-01-02  :04:02 | 1

The second is a count of cumulative resources by user bucketed by hour:

user | time | total_time_to_serve
edward | 2011-01-02  :04 | 67
stacey | 2011-01-02  :04 | 86

How to do it…

create a file hpcbuild/c06/Counter.java

package hpcas.c06;

import hpcas.c03.*;
import java.io.*;
import java.text.*;
import java.util.*;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;

/* create a class to represent each input row */
class Record {
  String url,username;
  Date date;
  int timeToServe;
}

public class Counter {
  public static void main (String [] args) throws Exception {
    FramedConnWrapper wrap = new FramedConnWrapper(
            Util.envOrProp("host"),
            Integer.parseInt(Util.envOrProp("port")));
    wrap.open();
    wrap.getClient().set_keyspace("counttest");
    for (Record r: readRecords(Util.envOrProp("afile"))){
      writeRecord(wrap.getClient(),r);
    }
    wrap.close();
  }
  /* This method reads an entire file into a list of records */
  public static List<Record> readRecords(String file) throws Exception {
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    List<Record> results = new ArrayList<Record>();
    BufferedReader br = new BufferedReader(new FileReader(new File(file)));
    String line = null;
    while ( (line = br.readLine()) !=null){
      Record r = new Record();
      String [] parts = line.split("\\|");
      r.url=parts[0];
      r.username=parts[1];
      r.date = df.parse(parts[2]);
      r.timeToServe= Integer.parseInt(parts[3].trim());
      results.add(r);
    }
    return results;
  }
  /* this method writes each record to cassandra */
  public static void writeRecord(Cassandra.Client c, Record r) throws Exception {
    /*we have multiple date formats that bucket records by hour, minute, to day */
    DateFormat bucketByMinute = new SimpleDateFormat("yyyy-MM-dd HH:mm");
    DateFormat bucketByDay = new SimpleDateFormat("yyyy-MM-dd");
    DateFormat bucketByHour = new SimpleDateFormat("yyyy-MM-dd HH");

    CounterColumn counter = new CounterColumn();
    ColumnParent cp = new ColumnParent("page_counts_by_minute");
    counter.setName(ByteBufferUtil.bytes(bucketByMinute.format(r.date)));
    counter.setValue(1);
    c.add(ByteBufferUtil.bytes( bucketByDay.format(r.date)+"-"+r.url)
            , cp, counter, ConsistencyLevel.ONE);

    CounterColumn counter2 = new CounterColumn();
    ColumnParent cp2 = new ColumnParent("user_usage_by_minute");
    counter2.setName(ByteBufferUtil.bytes(bucketByHour.format(r.date)));
    /* Incrementing by a value rather then one. */
    counter2.setValue(r.timeToServe);
    c.add(ByteBufferUtil.bytes( bucketByDay.format(r.date)+"-"+r.username)
            , cp2, counter2, ConsistencyLevel.ONE);
  }
}

Use the CLI to create the required keyspaces and column families.

[default@unknown] create keyspace counttest;
[default@unknown] use counttest;
[default@counttest] create column family page_counts_by_minute with [default@counttest] default_validation_class=CounterColumnType and replicate_on_write=true;
[default@counttest] create column family user_usage_by_minute with default_validation_class=CounterColumnType and replicate_on_write=true;

Run the Counter class.

host=127.0.0.1 port=9160 file=a.txt ant -DclassToRun=hpcas.c06.Counter run

Use list operations in the cli to confirm the results.

[default@unknown] use counttest;
[default@counttest] assume user_usage_by_minute keys as ascii;
[default@counttest] assume user_usage_by_minute validator as ascii;
[default@counttest] assume user_usage_by_minute comparator as ascii;

[default@counttest] list user_usage_by_minute;
——————-
RowKey: 2011-01-02- stacey
=> (counter=2011-01-02 04, value=86)
——————-
RowKey: 2011-01-02- edward
=> (counter=2011-01-02 04, value=67)

assume page_counts_by_minute keys as ascii;
assume page_counts_by_minute comparator as ascii;
assume page_counts_by_minute validator as ascii;

[default@counttest] list page_counts_by_minute;
——————-
RowKey: 2011-01-02-/page1.htm
=> (counter=2011-01-02 04:01, value=2)
=> (counter=2011-01-02 04:02, value=1)
——————-
RowKey: 2011-01-02-/page2.htm
=> (counter=2011-01-02 04:02, value=1)

2 Rows Returned.

How it works…

By bucketing row keys column keys  and inserting into counters columns the required results are available to read using simple get and list commands. This method of pre-calculation pushes most of the processing burden on the loading side. This would be an alternative to an ETL process which would have to load, possibly index, and then query data to get results. With this system results are available on demand.

There is more…

There are some important differences between counter columns and standard columns.

Counter performance

Unlike standard columns writing to a counter does involve a read on one of the natural endpoints for the counter. This makes these writes more intensive then a typical write operation.

Batching writes

Batching up counter writes on the client side will lower the number of increment calls. While this contradicts key concept of a counter it may be a valid optimization depending on how real time you results need to be.

Caveats

This recipe buckets on dates using strings. The date should be stored as epoch time or some other form which is not effected by daylight savings or other time zone related issues. This recipe was demonstrated this way for readability.

Tags: , , ,
Filed under Chapter 6 Schema Design | 44 Comments »

Determining response times with tcprstat

Posted on August 9th, 2011

Tcprstat is a free, open-source TCP analysis tool that watches network traffic and computes the delay between requests and responses. From this it derives response-time statistics and prints them out. Tcprstat is a useful way to see the response time which is useful in Cassandra because it typically servers numerous low latency reads a second.

Getting ready

This recipe requires the tcprstat utility. More information is available at http://www.percona.com/docs/wiki/tcprstat:start#installing_tcprstat

How to do it…

Download the static binary and set the execute permissions.

#wget http://github.com/downloads/Lowercases/tcprstat/tcprstat-static.v0.3.1.x86_64
#chmod a+x tcprstat-static.v0.3.1.x86_64

Start tcprstat listening for connections on 9160 with an interval of 1 second for 5 iterations.

$ sudo ./tcprstat-static.v0.3.1.x86_64 -f '%a\t%95M\t%99M\n' -p 9160 -t 1 -n 5
avg 95_max 99_max
34970 319130 390066
80550 388822 1193412
61438 196447 1505527
118754 836736 1528195
244351 2162598 2283161

How it works…

tcprstats requires root access to be able to tap the network device. After initialization it monitors connections and records the timing of requests and responses. It then displays this information in real time. tcprstat can also be used on pcap files captured with tcpdump.

Tags: , ,
Filed under Chapter 13 Monitoring | 45 Comments »

Incrementing and Decrementing Counters from the CLI

Posted on August 7th, 2011

(Because the Cassandra High Performance Cookbook was based on the Cassandra 0.7.X series, counters were still under development as book was being written. )

How to do it…

Create a column family and set its default_validation_class to CounterColumnType. Refer to the chapter 2 recipe, Creating a column family from the CLI for more info.

[default@app] use app;
[default@app] create column family counters with default_validation_class=CounterColumnType and replicate_on_write=true;

Use the incr method increment the value of a column by 1.
[default@app] incr counterCF [ascii('a')][ascii('x')];
Value incremented.
[default@app] incr counterCF [ascii('a')][ascii('x')];
Value incremented.

retrieve the value using the standard get.
[default@app] get counterCF[ascii('a')];
=> (counter=78, value=2)
Returned 1 results.

Use the ‘by’ clause to increment counters by a specific number.
[default@app] incr counterCF [ascii('a')][ascii('x')] by 9;
Value incremented.
[default@app] get counterCF[ascii('a')];
=> (counter=78, value=11)
Returned 1 results.

Decrement the counter using the ‘decr’ statement.

[default@app] decr counterCF [ascii('a')][ascii('x')] ;
Value decremented.
[default@app] get counterCF[ascii('a')];
=> (counter=78, value=10)
Returned 1 results.
[default@app] decr counterCF [ascii('a')][ascii('x')] by 2;
Value decremented.
[default@app] get counterCF[ascii('a')];
=> (counter=78, value=8)
Returned 1 results.

Use a signed integer in the ‘by’ clause.

[default@app] decr counterCF [ascii('a')][ascii('x')] by -2;
Value decremented.
[default@app] get counterCF[ascii('a')];
=> (counter=78, value=10)
Returned 1 results.

How it works…

Counter columns are different then normal columns they can only hold an integer value. This value can be incremented or decremented without having to retrieve it’s current value from the server. This is important because reading before writing is a Cassandra anti-pattern and multiple clients can operate on a counter without having to synchronize externally.

See Also…

In Chapter 5 the CAP chapter the recipe, Demonstrating how consistency is not a
lock or a transaction , shows a situation where counters are needed. In chapter 10 Libraries and Applications the recipe Using Cages to implement an atomic read and set shows how to do counting with an external locking solution.

The Bonus recipe Using counters for real time statistical reporting.

Tags: , , ,
Filed under Chapter 2 The Cassandra CLI | 45 Comments »