Designing your own AbstractType
Posted on October 14th, 2011
(In the Cassandra high performance cookbook we showed how to create an abstract type in cassandra 0.7.X. The API changed again in 0.8.X and again in 1.0.X. Hopefully the API has stabilized.)
(For this recipe we are using a subset of the code from Cassandra-AnyType )
Extending AbstractTypes give you the ability to control how data is stored, sorted, and even displayed in Cassandra. For this recipe we develop an AbstractType subclass that serializes Objects into JSON. The columns are sorted by their compateTo() method not the byte sorting that is the default.
How to do it…
Use git to checkout AnyType. Then use maven to build the project.
$ git clone https://github.com/edwardcapriolo/Cassandra-AnyType
$ cd Cassandra-AnyType
$ mvn install
Sections of the code relevant to the example are highlighted below.
package com.jointhegrid.anytype;
import com.google.gson.Gson;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
/* AnyType uses a one byte header to represent the type of the column */
public class AnyType extends AbstractType<ByteBuffer>{
public static final byte HEADER_GSON_COMPARABLE=(byte) 5;
public static final AnyType instance = new AnyType();
AnyType(){
}
/* helper method not needed by cassandra*/
public Any composeAny(ByteBuffer bb) {
byte type = bb.get();
if (type == HEADER_GSON_COMPARABLE){
if (bb.remaining() == 0){
return new Any(GsonComparable.class,null);
} else {
int classSize = (bb.get() & 0xFF) << (7+1);
classSize = (bb.get() & 0xFF);
byte [] classB = new byte[classSize];
bb.get(classB);
int objSize = (bb.get() & 0xFF) << (7+1);
objSize = (bb.get() & 0xFF);
byte [] objB = new byte[objSize];
bb.get(objB);
Gson g = new Gson();
try {
Class c = Class.forName(new String(classB));
Object o =g.fromJson(new String(objB),c );
return new Any(GsonComparable.class, o);
} catch (ClassNotFoundException ex) {
Logger.getLogger(AnyType.class.getName()).log(Level.SEVERE, null, ex);
}
return null;
}
}
return null;
}
/* helper method not needed by cassandra*/
public ByteBuffer decomposeAny(Any t) {
if (t.getType().equals(GsonComparable.class)){
ByteBuffer bb=null;
if (t.getElement()==null){
bb = ByteBuffer.allocate(1);
bb.put(HEADER_GSON_COMPARABLE);
} else {
Gson g = new Gson();
String classOfElement = t.getElement().getClass().getName();
String s = g.toJson(t.getElement());
bb = ByteBuffer.allocate(
1 // header
+(2+classOfElement.getBytes().length) //header and class
+(2+s.getBytes().length)); //header and data
bb.put(HEADER_GSON_COMPARABLE);
bb.put ( (byte) ((classOfElement.getBytes().length >> (7+1)) & 0xFF));
bb.put( (byte) (classOfElement.getBytes().length & 0xFF) );
bb.put(classOfElement.getBytes());
bb.put ( (byte) ((s.getBytes().length >> (7+1)) & 0xFF));
bb.put( (byte) (s.getBytes().length & 0xFF) );
bb.put(s.getBytes());
}
bb.position(0);
return bb;
}
return null;
}
/* this method control how the string will be displayed */
public String getString(ByteBuffer bb) {
ByteBuffer other = bb.duplicate();
return ByteBufferUtil.bytesToHex(bb);
}
/* throw an exception here if the data is invalid */
public void validate(ByteBuffer bb) throws MarshalException {
}
/* This method controls how Cassandra will sort the data */
public int compare(ByteBuffer o1, ByteBuffer o2) {
if (null == o1) {
if (null == o2) {
return 0;
} else {
return -1;
}
}
if (o2==null){
return 1;
}
ByteBuffer one = o1.duplicate();
ByteBuffer two = o2.duplicate();
if (o1.remaining() == 0) {
return o2.remaining() == 0 ? 0 : -1;
}
if (o2.remaining() == 0) {
return 1;
}
Any a1 = AnyType.instance.composeAny(one);
Any b1 = AnyType.instance.composeAny(two);
if (a1.getType()==null){
return b1.getType() == null ? 0 : -1;
}
if (b1.getType()==null){
return 1;
}
if (a1.getType().equals(b1.getType())){
if (a1.getElement() == null){
return b1.getElement() == null? 0: -1;
}
if (b1.getElement() == null){
return 1;
} else if (one.get(0) == AnyType.HEADER_GSON_COMPARABLE){
return ((Comparable)a1.getElement()).compareTo((Comparable) b1.getElement());
} else {
int res = ((Comparable)a1.getElement()).compareTo((Comparable) b1.getElement());
return res;
}
} else {
return BytesType.bytesCompare(o1, o2);
}
}
public ByteBuffer compose(ByteBuffer bb) {
return bb.duplicate();
}
public ByteBuffer decompose(ByteBuffer t) {
return t;
}
}
package com.jointhegrid.anytype;
public class Any {
private Class type;
private Object element;
public Any(Object aThing){
if (aThing !=null) {
type=aThing.getClass();
element=aThing;
}
}
public Any(Class c, Object aThing){
type=c;
element=aThing;
}
public Any(){
}
public Class getType() {
return type;
}
public void setType(Class type) {
this.type = type;
}
public Object getElement() {
return element;
}
public void setElement(Object element) {
this.element = element;
}
public String toString(){
StringBuilder sb = new StringBuilder();
sb.append("type:").append(type);
sb.append(" ");
sb.append("element:").append(element);
return sb.toString();
}
@Override
public int hashCode() {
int hash = 7;
hash = 67 * hash + (this.type != null ? this.type.hashCode() : 0);
hash = 67 * hash + (this.element != null ? this.element.hashCode() : 0);
return hash;
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final Any other = (Any) obj;
if (this.type != other.type && (this.type == null || !this.type.equals(other.type))) {
return false;
}
if (this.element != other.element && (this.element == null || !this.element.equals(other.element))) {
return false;
}
return true;
}
}
The following is a unit tests the demonstrates the usage of the class.
package com.jointhegrid.anytype;
public class Stuff implements Comparable{
int x;
double d;
public Stuff(){
}
@Override
public int compareTo(Object o) {
if (o instanceof Stuff){
Stuff stuff = (Stuff) o;
if (this.x == stuff.x){
return 0;
} else if (this.x>stuff.x){
return 1;
} else {
return -1;
}
}
return -1;
}
@Override
public String toString() {
return "Stuff{" + "x=" + x + "d=" + d + '}';
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final Stuff other = (Stuff) obj;
if (this.x != other.x) {
return false;
}
if (Double.doubleToLongBits(this.d) != Double.doubleToLongBits(other.d)) {
return false;
}
return true;
}
@Override
public int hashCode() {
int hash = 7;
hash = 43 * hash + this.x;
hash = 43 * hash + (int) (Double.doubleToLongBits(this.d) ^ (Double.doubleToLongBits(this.d) >>> 32));
return hash;
}
}
package com.jointhegrid.anytype;
import java.nio.ByteBuffer;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import java.util.List;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.protocol.TProtocol;
import org.apache.cassandra.contrib.utils.service.CassandraServiceDataCleaner;
import org.apache.cassandra.service.EmbeddedCassandraService;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class AnyTypeInAction {
static EmbeddedCassandraService ecs;
@BeforeClass
public static void setUpClass() throws Exception {
CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
cleaner.prepare();
ecs = new EmbeddedCassandraService();
ecs.start();
}
@AfterClass
public static void tearDownClass() throws Exception {
}
@Before
public void setUp() {
}
@After
public void tearDown() {
}
@Test
public void rangeTest2WithGSON() throws Exception {
Cassandra.Client client = getClient();
/* Use CQL to create the needed keyspaces and column families */
String query = "CREATE KEYSPACE etest WITH strategy_options:replication_factor=1 AND strategy_class = 'SimpleStrategy'";
client.execute_cql_query(ByteBufferUtil.bytes(query), Compression.NONE);
client.set_keyspace("etest");
String cf = "create columnfamily cf"
+ " (key bytea primary key) "
+ "with comparator = 'com.jointhegrid.anytype.AnyType' ";
client.execute_cql_query(ByteBufferUtil.bytes(cf), Compression.NONE);
/* create instances of stuff that are comparable */
Stuff s = new Stuff();
s.x = 5;
s.d = 90;
Stuff t = new Stuff();
t.x = 6;
t.d = 90;
Stuff u = new Stuff();
u.x = 7;
u.d = 90;
Any s1 = new Any(GsonComparable.class, s);
Any s2 = new Any(GsonComparable.class, t);
Any s3 = new Any(GsonComparable.class, u);
ColumnParent cp = new ColumnParent("cf");
Column column = new Column();
column.name = AnyType.instance.decomposeAny(s1);
column.setValue("val".getBytes());
column.setTimestamp(1L);
client.insert(ByteBuffer.wrap("a".getBytes()), cp, column, ConsistencyLevel.ONE);
column.name = AnyType.instance.decomposeAny(s2);
client.insert(ByteBuffer.wrap("a".getBytes()), cp, column, ConsistencyLevel.ONE);
column.name = AnyType.instance.decomposeAny(s3);
client.insert(ByteBuffer.wrap("a".getBytes()), cp, column, ConsistencyLevel.ONE);
SliceRange sr = new SliceRange();
sr.setStart(new byte[0]);
sr.setFinish(new byte[0]);
sr.setCount(2);
SlicePredicate sp = new SlicePredicate();
sp.setSlice_range(sr);
List<ColumnOrSuperColumn> cols = client.get_slice(ByteBufferUtil.bytes("a"),
cp, sp, ConsistencyLevel.ONE);
/* ensure that the returned objects are in the order you desire */
Assert.assertEquals(2, cols.size());
Assert.assertEquals(s, AnyType.instance.composeAny(cols.get(0).column.name).getElement());
Assert.assertEquals(t, AnyType.instance.composeAny(cols.get(1).column.name).getElement());
System.err.println("doing second slice");
SlicePredicate sp2 = new SlicePredicate();
SliceRange sr2 = new SliceRange();
sr2.setStart((AnyType.instance.decomposeAny(s2)));
sr2.setFinish(new byte[0]);
sr2.setCount(3);
sp2.setSlice_range(sr2);
cols = client.get_slice(ByteBufferUtil.bytes("a"),
cp, sp2, ConsistencyLevel.ONE);
Assert.assertEquals(2, cols.size());
Assert.assertEquals(s2, AnyType.instance.composeAny(cols.get(0).column.name));
Assert.assertEquals(s3, AnyType.instance.composeAny(cols.get(1).column.name));
}
}
How it works…
AnyType uses the GSON library to serialize java objects into JSON strings. While the data is ultimately stored as a ByteBuffer does need to be converted into a Java object so the compareTo method can be called. ByteBuffers are efficient however it is important to remember that some methods advance their internal position while other methods do not. Underflow or Overflow exceptions typically occur when uses have mismanaged the position of the ByteBuffer.
Originally Cassandra was schema-less. While it still can be used in this way attaching type information to columns or designing your own types makes Cassandra something like an object database. Types also support validation which is important because Cassandra is typically used in a write before read manner. Without validation other systems can insert bad data accidentally.
Filed under Chapter 9 Coding and Internals, Uncategorized | 62 Comments »
Writing Composite Columns with from Java
Posted on August 29th, 2011
Composite columns allow a list of types to be packed into single column. One of the benefits of composites is they also sort properly with respect to multiple components. While higher level clients like Hector and Pycasa already have support for composite columns, support may not yet be in place for the particular client you are using. This recipe shows a simple way to write a Composite Column from java. Since this program uses only simple byte arrays it should be easy to port this program to another programming language.
The best source of documentation for Composite Columns sits directly in the Cassandra code at ./src/java/org/apache/cassandra/db/marshal/CompositeType.java
/*
* The encoding of a CompositeType column name should be:
* <component><component><component> …
* where <component> is:
* <length of value><value><’end-of-component’ byte>
* where <length of value> is a 2 bytes unsigned short the and the
* ‘end-of-component’ byte should always be 0 for actual column name.
*/
How to do it…
Create a file CompositeTool.java.
import java.io.ByteArrayOutputStream; import java.util.List;
public class CompositeTool {
public static byte [] makeComposite(List<byte []> b){
ByteArrayOutputStream bos = new ByteArrayOutputStream();
for (int i=0;i<b.size() ;i++){
bos.write( (byte) ((b.get(i).length >> (7+1) ) & 0xFF)) ;
bos.write( (byte) (b.get(i).length & 0xFF)) ;
for (int j=0;j<b.get(i).length;j++){
bos.write( b.get(i)[j] & 0xFF) ;
}
bos.write((byte)0);
}
return bos.toByteArray();
}
}
Also create CompositeToolTest.java.
package casbase;
import org.apache.cassandra.thrift.Column; import org.apache.cassandra.thrift.ColumnParent; import java.util.List; import java.util.ArrayList; import org.apache.cassandra.thrift.Compression; import java.nio.ByteBuffer; import org.apache.cassandra.thrift.ConsistencyLevel; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import static org.junit.Assert.*;
public class CompositeToolTest {
public CompositeToolTest() {
}
/* We uesd CQL to create the required keyspaces and column families for this test */
@BeforeClass
public static void setUpClass() throws Exception {
ConnWrapper cw = dbTest.getClient();
cw.getClient().execute_cql_query(
ByteBuffer.wrap(("create keyspace composite_test with strategy_class "
+ "= 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options:replication_factor=1").getBytes()), Compression.NONE);
cw.getClient().execute_cql_query(
ByteBuffer.wrap("USE composite_test".getBytes()), Compression.NONE);
cw.getClient().execute_cql_query(
ByteBuffer.wrap(("create columnfamily composite_test (key bytea primary key) "
+ "with comparator = 'CompositeType(AsciiType,AsciiType)'").getBytes()), Compression.NONE);
}
@AfterClass
public static void tearDownClass() throws Exception {
ConnWrapper cw = dbTest.getClient();
/*
* comments are in place so the CLI can be used to view the column
* remove to make unit tests repeatable
try {
cw.getClient().execute_cql_query(ByteBuffer.wrap
("drop keyspace composite_test".getBytes()), Compression.NONE);
} catch (Exception ex) {
}
*
*/
}
@Test
public void testAgain() throws Exception {
List<byte[]> b = new ArrayList<byte[]>();
b.add("hello".getBytes());
b.add("goodbye".getBytes());
byte[] x = CompositeTool.makeComposite(b);
ConnWrapper cw = dbTest.getClient();
cw.getClient().set_keyspace("composite_test");
ColumnParent cp = new ColumnParent("composite_test");
Column c = new Column();
c.setName(x);
c.setValue("again".getBytes());
c.setTimestamp(System.currentTimeMillis());
cw.getClient().insert(ByteBuffer.wrap("data".getBytes()), cp, c, ConsistencyLevel.ONE);
}
}
Use the cli to ensure the composite column is set correctly.
[default@composite_test] use composite_test;
[default@composite_test] list composite_test;
Using default limit of 100
——————-
RowKey: 64617461
=> (column=hello:goodbye, value=again, timestamp=1314636481883)
1 Row Returned.
How it works…
Compsite columns have a simple format however many of the higher level clients deceptively wrap code in abstraction. As our example shows writing to a composite column is relatively easy.
There is more…
thanks thobbs and pcmanus
Filed under Chapter 9 Coding and Internals, Uncategorized | 64 Comments »