Skip to content

Commit

Permalink
Update aerospike client to 4.0.8
Browse files Browse the repository at this point in the history
  • Loading branch information
phufool committed Nov 8, 2017
1 parent f3418db commit 264b527
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 14 deletions.
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object Scaerospike extends Build {
organization := "com.tapad.scaerospike"
) ++ Seq(libraryDependencies ++=
Seq(
"com.aerospike" % "aerospike-client" % "3.0.30",
"com.aerospike" % "aerospike-client" % "4.0.8",
"io.netty" % "netty-buffer" % "4.0.23.Final",
"org.scalatest" %% "scalatest" % "2.2.6" % "test"
)
Expand Down
53 changes: 43 additions & 10 deletions src/main/scala/com/tapad/aerospike/AsSetOps.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.tapad.aerospike

import java.util.concurrent.atomic.AtomicReference

import com.aerospike.client.async.AsyncClient
import com.aerospike.client.listener.{DeleteListener, WriteListener, RecordListener, RecordArrayListener}
import com.aerospike.client.listener._
import com.aerospike.client._
import com.aerospike.client.policy.QueryPolicy
import com.aerospike.client.policy.{Policy, BatchPolicy, QueryPolicy}
import com.aerospike.client.query.{Statement, RecordSet}

import scala.concurrent.{Promise, ExecutionContext, Future}
import com.aerospike.client.{AerospikeException, Key}
import scala.collection.JavaConverters._
import scala.collection.breakOut

Expand Down Expand Up @@ -47,10 +49,15 @@ trait AsSetOps[K, V] {
*/
def putBins(key: K, values: Map[String, V], customTtl: Option[Int] = None) : Future[Unit]

/**
/**
* Delete a key.
*/
def delete(key: K, bin: String = "") : Future[Unit]

/**
* Executes a query statement
*/
def query(statement: Statement): Future[Map[K, Record]]
}

/**
Expand All @@ -63,6 +70,8 @@ private[aerospike] class AsSet[K, V](private final val client: AsyncClient,
writeSettings: WriteSettings)
(implicit keyGen: KeyGenerator[K], valueMapping: ValueMapping[V], executionContext: ExecutionContext) extends AsSetOps[K, V] {

private final val batchPolicy = readSettings.buildBatchPolicy()
private final val readPolicy = readSettings.buildReadPolicy()
private final val queryPolicy = readSettings.buildQueryPolicy()
private final val writePolicy = writeSettings.buildWritePolicy()

Expand All @@ -87,7 +96,7 @@ private[aerospike] class AsSet[K, V](private final val client: AsyncClient,
}
}

private[aerospike] def query[R](policy: QueryPolicy,
private[aerospike] def query[R](policy: Policy,
key: Key,
bins: Seq[String] = Seq.empty, extract: Record => R): Future[R] = {
val result = Promise[R]()
Expand All @@ -105,7 +114,7 @@ private[aerospike] class AsSet[K, V](private final val client: AsyncClient,
result.future
}

private[aerospike] def multiQuery[T](policy: QueryPolicy,
private[aerospike] def multiQuery[T](policy: BatchPolicy,
keys: Seq[Key],
bins: Seq[String],
extract: Record => T): Future[Map[K, T]] = {
Expand Down Expand Up @@ -134,19 +143,19 @@ private[aerospike] class AsSet[K, V](private final val client: AsyncClient,
}

def get(key: K, bin: String = ""): Future[Option[V]] = {
query(queryPolicy, genKey(key), bins = Seq(bin), extractSingleBin(bin, _))
query(readPolicy, genKey(key), bins = Seq(bin), extractSingleBin(bin, _))
}

def getBins(key: K, bins: Seq[String]): Future[Map[String, V]] = {
query(queryPolicy, genKey(key), bins = bins, extractMultiBin)
query(readPolicy, genKey(key), bins = bins, extractMultiBin)
}

def multiGet(keys: Seq[K], bin: String = ""): Future[Map[K, Option[V]]] = {
multiQuery(queryPolicy, keys.map(genKey), bins = Seq(bin), extractSingleBin(bin, _))
multiQuery(batchPolicy, keys.map(genKey), bins = Seq(bin), extractSingleBin(bin, _))
}

def multiGetBins(keys: Seq[K], bins: Seq[String]): Future[Map[K, Map[String, V]]] = {
multiQuery(queryPolicy, keys.map(genKey), bins, extractMultiBin)
multiQuery(batchPolicy, keys.map(genKey), bins, extractMultiBin)
}

def put(key: K, value: V, bin: String = "", customTtl: Option[Int] = None): Future[Unit] = {
Expand Down Expand Up @@ -194,6 +203,30 @@ private[aerospike] class AsSet[K, V](private final val client: AsyncClient,
}
result.future
}

def query(statement: Statement): Future[Map[K, Record]] = {
val result = Promise[Map[K, Record]]()
val records = scala.collection.mutable.Map[K, Record]()
val listener = new RecordSequenceListener {
def onRecord(key: Key, record: Record): Unit = {
records += (key.userKey.getObject.asInstanceOf[K] -> record)
}

def onSuccess() {
result.success(records.toMap)
}

def onFailure(exception: AerospikeException) {
result.failure(exception)
}
}
try {
client.query(queryPolicy, listener, statement)
} catch {
case e: Exception => result.failure(e)
}
result.future
}
}


22 changes: 19 additions & 3 deletions src/main/scala/com/tapad/aerospike/ClientSettings.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.tapad.aerospike

import com.aerospike.client.async.{MaxCommandAction, AsyncClientPolicy}
import com.aerospike.client.policy.{WritePolicy, QueryPolicy}
import com.aerospike.client.policy.{BatchPolicy, Policy, WritePolicy, QueryPolicy}
import java.util.concurrent.ExecutorService

/**
Expand Down Expand Up @@ -31,9 +31,25 @@ object ClientSettings {


case class ReadSettings(timeout: Int = 0, maxRetries: Int = 2, sleepBetweenRetries: Int = 500, maxConcurrentNodes: Int = 0) {
private[aerospike] def buildReadPolicy() = {
val p = new Policy()
p.setTimeout(timeout)
p.maxRetries = maxRetries
p.sleepBetweenRetries = sleepBetweenRetries
p
}

private[aerospike] def buildBatchPolicy() = {
val p = new BatchPolicy()
p.setTimeout(timeout)
p.maxRetries = maxRetries
p.sleepBetweenRetries = sleepBetweenRetries
p
}

private[aerospike] def buildQueryPolicy() = {
val p = new QueryPolicy()
p.timeout = timeout
p.setTimeout(timeout)
p.maxRetries = maxRetries
p.sleepBetweenRetries = sleepBetweenRetries
p.maxConcurrentNodes = maxConcurrentNodes
Expand All @@ -49,7 +65,7 @@ case class WriteSettings(expiration: Int = 0, timeout: Int = 0, maxRetries: Int
private[aerospike] def buildWritePolicy() = {
val p = new WritePolicy()
p.expiration = expiration
p.timeout = timeout
p.setTimeout(timeout)
p.maxRetries = maxRetries
p.sleepBetweenRetries = sleepBetweenRetries
p
Expand Down

0 comments on commit 264b527

Please sign in to comment.