Skip to content

A type-safe, asynchronous, composable Scala extension to the HBase Client API

License

Notifications You must be signed in to change notification settings

SvenvDam/HTypes

Repository files navigation

Build Status

HTypes

A type-safe, asynchronous, composable Scala extension to the HBase Client API

HTypes is a simple Scala extension to the Apache HBase API with no dependencies. It assumes you have have the HBase client API available as a dependency in your project. It adds the following improvements to the Java API:

  • Asynchronous execution of queries.
  • Automatic conversion between objects and an encoded form.
  • Automatic conversion between common primitive types and byte arrays (and the option to add more yourself).

Installation

libraryDependencies += "com.svenvandam" %% "htypes-core" % "0.5"

Example

Basic usage

import com.svenvandam.htypes.Implicits._
import com.svenvandam.htypes.model._
import com.svenvandam.htypes.hbase._
import com.svenvandam.htypes.hbase.query.{PutUtils, GetUtils}
import org.apache.hadoop.hbase.client.{Connection, Put, Scan}
import org.apache.hadoop.hbase._


// an example type we'll be encoding and decoding

case class User(id: String, name: String, age: Int)

// typeclass instances for User to en/decode it to an HBase compatible format

implicit val userCodec = new RowCodec[User] {
  private val ageColumn = Column("profile", "age")
  private val nameColumn = Column("profile", "name")

  def getColumns = Set(
    ageColumn,
    nameColumn
  )
  
  def encode(user: User): Row = Row(
    user.id,
    Map(
      ageColumn -> CellValue(user.age, None),
      nameColumn -> CellValue(user.name, None)
    )
  )
  
  def decode(row: Row): Option[User] = for {
    id                      <- row.key.as[String]
    CellValue(ageBytes, _)  <- row.values.get(ageColumn)
    age                     <- ageBytes.as[Int]
    CellValue(nameBytes, _) <- row.values.get(nameColumn)
    name                    <- nameBytes.as[String]
  } yield User(id, name, age)
}

// you'll have to do this yourself

val conn: Connection = getConnection()

val table = conn.getTable(TableName.valueOf("MyTable"))

// a user gets automatically encoded to an HBase compatible form and stored in a Put query

val put = PutUtils.createFrom(User("id123", "Alice", 30))

table.put(put)

// Automatically scan all columns associated with User

val scan = new Scan().addColumnsFrom[User]

// Automatically convert scan result to a series of time-versioned User's

val scanResult = table.getScanner(scan).as[User]

for {
  usersInScan       <- scanResult
  userValues        <- usersInScan
  (user, timestamp) <- userValues
} println(s"Found user $user at timestamp $timestamp")

Wrapping side-effects

// HTypes lets you execute queries in an effect wrapper
// You have to define a backend to execute your query in (Future, Task, IO, etc)
// An EffectBackend instance for Future is provided by HTypes

import com.svenvandam.htypes.effect.FutureEffectBackend
import scala.concurrent.ExecutionContext.Implicits.global

implicit val asyncBackend = FutureEffectBackend()

val f: Future[Unit] = table.putEffect(put)

f.foreach(_ => println("Put result!"))

Composing encoders and decoders

import com.svenvandam.htypes.Implicits._
import com.svenvandam.htypes.model._
import com.svenvandam.htypes.hbase._

case class UserWithSession(id: String, name: String, age: Int, SessionId: Long)

case class User(id: String, name: String, age: Int)

val userCodec = new RowCodec[User] {
  private val ageColumn = Column("profile", "age")
  private val nameColumn = Column("profile", "name")

  def getColumns = Set(
    ageColumn,
    nameColumn
  )
  
  def encode(user: User): Row = Row(
    user.id,
    Map(
      ageColumn -> CellValue(user.age, None),
      nameColumn -> CellValue(user.name, None)
    )
  )
  
  def decode(row: Row): Option[User] = for {
    id                      <- row.key.as[String]
    CellValue(ageBytes, _)  <- row.values.get(ageColumn)
    age                     <- ageBytes.as[Int]
    CellValue(nameBytes, _) <- row.values.get(nameColumn)
    name                    <- nameBytes.as[String]
  } yield User(id, name, age)
}


// mapping an existing RowDecoder
def getNewSessionId(): Long = ???

val userSessionDecoder: RowDecoder[UserWithSession] = 
  userCodec.map { case user =>
    UserWithSession(user.id, user.name, user.age, getNewSessionId())
  }

// contramap an existing RowDecoder
val userSessionEncoder: RowEncoder[UserWithSession] = 
  userCodec.contramap[UserWithSession] { case userWithSession =>
    User(userWithSession.id, userWithSession.name, userWithSession.age)
  } 

// combining decoders

case class UserInfo(id: String, lastBoughtItem: Int)

case class UserWithInfo(id: String, name: String, age: Int, lastBoughtItem: Int)

val productColumn = Column("history", "last_bought_item")

def decode(row: Row): Option[UserInfo] = for {
  id                          <- row.key.as[String]
  CellValue(productBytes, _)  <- row.values.get(productColumn)
  product                     <- productBytes.as[Int]
} yield UserInfo(id, product)

val userInfoDecoder = RowDecoder(decode, Set(productColumn))

val userWithInfoDecoder: RowDecoder[UserWithInfo] = userCodec.combine(
  userInfoDecoder, 
  (user: User, info: UserInfo) => UserWithInfo(user.id, user.name, user.age, info.lastBoughtItem)
)

About

A type-safe, asynchronous, composable Scala extension to the HBase Client API

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages