From 264b527f7f5235e664bf7769651a91a06933eab7 Mon Sep 17 00:00:00 2001 From: phufool Date: Fri, 20 Oct 2017 17:33:12 -0400 Subject: [PATCH] Update aerospike client to 4.0.8 --- project/Build.scala | 2 +- .../scala/com/tapad/aerospike/AsSetOps.scala | 53 +++++++++++++++---- .../com/tapad/aerospike/ClientSettings.scala | 22 ++++++-- 3 files changed, 63 insertions(+), 14 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index 41a07d5..6de9c00 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -14,7 +14,7 @@ object Scaerospike extends Build { organization := "com.tapad.scaerospike" ) ++ Seq(libraryDependencies ++= Seq( - "com.aerospike" % "aerospike-client" % "3.0.30", + "com.aerospike" % "aerospike-client" % "4.0.8", "io.netty" % "netty-buffer" % "4.0.23.Final", "org.scalatest" %% "scalatest" % "2.2.6" % "test" ) diff --git a/src/main/scala/com/tapad/aerospike/AsSetOps.scala b/src/main/scala/com/tapad/aerospike/AsSetOps.scala index 510ef23..2f3b076 100644 --- a/src/main/scala/com/tapad/aerospike/AsSetOps.scala +++ b/src/main/scala/com/tapad/aerospike/AsSetOps.scala @@ -1,12 +1,14 @@ package com.tapad.aerospike +import java.util.concurrent.atomic.AtomicReference + import com.aerospike.client.async.AsyncClient -import com.aerospike.client.listener.{DeleteListener, WriteListener, RecordListener, RecordArrayListener} +import com.aerospike.client.listener._ import com.aerospike.client._ -import com.aerospike.client.policy.QueryPolicy +import com.aerospike.client.policy.{Policy, BatchPolicy, QueryPolicy} +import com.aerospike.client.query.{Statement, RecordSet} import scala.concurrent.{Promise, ExecutionContext, Future} -import com.aerospike.client.{AerospikeException, Key} import scala.collection.JavaConverters._ import scala.collection.breakOut @@ -47,10 +49,15 @@ trait AsSetOps[K, V] { */ def putBins(key: K, values: Map[String, V], customTtl: Option[Int] = None) : Future[Unit] - /** + /** * Delete a key. */ def delete(key: K, bin: String = "") : Future[Unit] + + /** + * Executes a query statement + */ + def query(statement: Statement): Future[Map[K, Record]] } /** @@ -63,6 +70,8 @@ private[aerospike] class AsSet[K, V](private final val client: AsyncClient, writeSettings: WriteSettings) (implicit keyGen: KeyGenerator[K], valueMapping: ValueMapping[V], executionContext: ExecutionContext) extends AsSetOps[K, V] { + private final val batchPolicy = readSettings.buildBatchPolicy() + private final val readPolicy = readSettings.buildReadPolicy() private final val queryPolicy = readSettings.buildQueryPolicy() private final val writePolicy = writeSettings.buildWritePolicy() @@ -87,7 +96,7 @@ private[aerospike] class AsSet[K, V](private final val client: AsyncClient, } } - private[aerospike] def query[R](policy: QueryPolicy, + private[aerospike] def query[R](policy: Policy, key: Key, bins: Seq[String] = Seq.empty, extract: Record => R): Future[R] = { val result = Promise[R]() @@ -105,7 +114,7 @@ private[aerospike] class AsSet[K, V](private final val client: AsyncClient, result.future } - private[aerospike] def multiQuery[T](policy: QueryPolicy, + private[aerospike] def multiQuery[T](policy: BatchPolicy, keys: Seq[Key], bins: Seq[String], extract: Record => T): Future[Map[K, T]] = { @@ -134,19 +143,19 @@ private[aerospike] class AsSet[K, V](private final val client: AsyncClient, } def get(key: K, bin: String = ""): Future[Option[V]] = { - query(queryPolicy, genKey(key), bins = Seq(bin), extractSingleBin(bin, _)) + query(readPolicy, genKey(key), bins = Seq(bin), extractSingleBin(bin, _)) } def getBins(key: K, bins: Seq[String]): Future[Map[String, V]] = { - query(queryPolicy, genKey(key), bins = bins, extractMultiBin) + query(readPolicy, genKey(key), bins = bins, extractMultiBin) } def multiGet(keys: Seq[K], bin: String = ""): Future[Map[K, Option[V]]] = { - multiQuery(queryPolicy, keys.map(genKey), bins = Seq(bin), extractSingleBin(bin, _)) + multiQuery(batchPolicy, keys.map(genKey), bins = Seq(bin), extractSingleBin(bin, _)) } def multiGetBins(keys: Seq[K], bins: Seq[String]): Future[Map[K, Map[String, V]]] = { - multiQuery(queryPolicy, keys.map(genKey), bins, extractMultiBin) + multiQuery(batchPolicy, keys.map(genKey), bins, extractMultiBin) } def put(key: K, value: V, bin: String = "", customTtl: Option[Int] = None): Future[Unit] = { @@ -194,6 +203,30 @@ private[aerospike] class AsSet[K, V](private final val client: AsyncClient, } result.future } + + def query(statement: Statement): Future[Map[K, Record]] = { + val result = Promise[Map[K, Record]]() + val records = scala.collection.mutable.Map[K, Record]() + val listener = new RecordSequenceListener { + def onRecord(key: Key, record: Record): Unit = { + records += (key.userKey.getObject.asInstanceOf[K] -> record) + } + + def onSuccess() { + result.success(records.toMap) + } + + def onFailure(exception: AerospikeException) { + result.failure(exception) + } + } + try { + client.query(queryPolicy, listener, statement) + } catch { + case e: Exception => result.failure(e) + } + result.future + } } diff --git a/src/main/scala/com/tapad/aerospike/ClientSettings.scala b/src/main/scala/com/tapad/aerospike/ClientSettings.scala index 4643a05..af3cced 100644 --- a/src/main/scala/com/tapad/aerospike/ClientSettings.scala +++ b/src/main/scala/com/tapad/aerospike/ClientSettings.scala @@ -1,7 +1,7 @@ package com.tapad.aerospike import com.aerospike.client.async.{MaxCommandAction, AsyncClientPolicy} -import com.aerospike.client.policy.{WritePolicy, QueryPolicy} +import com.aerospike.client.policy.{BatchPolicy, Policy, WritePolicy, QueryPolicy} import java.util.concurrent.ExecutorService /** @@ -31,9 +31,25 @@ object ClientSettings { case class ReadSettings(timeout: Int = 0, maxRetries: Int = 2, sleepBetweenRetries: Int = 500, maxConcurrentNodes: Int = 0) { + private[aerospike] def buildReadPolicy() = { + val p = new Policy() + p.setTimeout(timeout) + p.maxRetries = maxRetries + p.sleepBetweenRetries = sleepBetweenRetries + p + } + + private[aerospike] def buildBatchPolicy() = { + val p = new BatchPolicy() + p.setTimeout(timeout) + p.maxRetries = maxRetries + p.sleepBetweenRetries = sleepBetweenRetries + p + } + private[aerospike] def buildQueryPolicy() = { val p = new QueryPolicy() - p.timeout = timeout + p.setTimeout(timeout) p.maxRetries = maxRetries p.sleepBetweenRetries = sleepBetweenRetries p.maxConcurrentNodes = maxConcurrentNodes @@ -49,7 +65,7 @@ case class WriteSettings(expiration: Int = 0, timeout: Int = 0, maxRetries: Int private[aerospike] def buildWritePolicy() = { val p = new WritePolicy() p.expiration = expiration - p.timeout = timeout + p.setTimeout(timeout) p.maxRetries = maxRetries p.sleepBetweenRetries = sleepBetweenRetries p