Skip to content

Commit

Permalink
Merge from cloudant/clouseau
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangphcn committed Sep 4, 2018
1 parent 4dbd4ba commit 335c400
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 47 deletions.
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
# clouseau

Clouseau uses Scalang to expose Lucene functionality via erlang-like nodes.
Expose Lucene features to erlang RPC.

## Configuration options
This guide explains the various clouseau configuration options available, and how to use them to tune clouseau performance and scalability. There are two categories of clouseau options, first category is about tuning the JVM (ex: Xmx) and other category of options that go into clouseau.ini.

Clouseau configuration options (as determined by the relevant role in chef-repo) are stored in `/opt/clouseau/etc/clouseau.ini` and some options (about JVM tuning) go into the command used to start and stop clouseau.

Example clouseau configuration options in clouseau.ini:
```
[clouseau]
max_indexes_open=15000
close_if_idle=true
idle_check_interval_secs=600
```

## Running a local dev cluster

Expand Down
33 changes: 25 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ the License.
<dependency>
<groupId>com.boundary</groupId>
<artifactId>scalang-scala_2.9.1</artifactId>
<version>0.28-cloudant2</version>
<version>0.28-cloudant5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down Expand Up @@ -156,17 +156,13 @@ the License.
<version>${scala.plugin.version}</version>
<configuration>
<launchers>
<launcher>
<id>clouseau</id>
<mainClass>com.cloudant.clouseau.Main</mainClass>
</launcher>
<launcher>
<id>clouseau1</id>
<mainClass>com.cloudant.clouseau.Main</mainClass>
<jvmArgs>
<jvmArg>[email protected]</jvmArg>
<jvmArg>-Dclouseau.cookie=monster</jvmArg>
<jvmArg>-Dclouseau.dir=target/clouseau1</jvmArg>
<jvmArg>-Dclouseau.dir=${basedir}/target/clouseau1</jvmArg>
</jvmArgs>
</launcher>
<launcher>
Expand All @@ -175,7 +171,7 @@ the License.
<jvmArgs>
<jvmArg>[email protected]</jvmArg>
<jvmArg>-Dclouseau.cookie=monster</jvmArg>
<jvmArg>-Dclouseau.dir=target/clouseau2</jvmArg>
<jvmArg>-Dclouseau.dir=${basedir}/target/clouseau2</jvmArg>
</jvmArgs>
</launcher>
<launcher>
Expand All @@ -184,7 +180,7 @@ the License.
<jvmArgs>
<jvmArg>[email protected]</jvmArg>
<jvmArg>-Dclouseau.cookie=monster</jvmArg>
<jvmArg>-Dclouseau.dir=target/clouseau3</jvmArg>
<jvmArg>-Dclouseau.dir=${basedir}/target/clouseau3</jvmArg>
</jvmArgs>
</launcher>
</launchers>
Expand Down Expand Up @@ -363,4 +359,25 @@ the License.
</plugins>
</reporting>

<distributionManagement>
<repository>
<id>maven.cloudant.com</id>
<name>maven.cloudant.com-releases</name>
<url>scpexe://maven.cloudant.com/var/www/domains/cloudant.com/maven/htdocs/repo/</url>
</repository>
<snapshotRepository>
<id>maven.cloudant.com</id>
<name>maven.cloudant.com-snapshots</name>
<url>scpexe://maven.cloudant.com/var/www/domains/cloudant.com/maven/htdocs/repo/</url>
</snapshotRepository>
<site>
<id>website</id>
<url>scpexe://maven.cloudant.com/var/www/domains/cloudant.com/maven/htdocs/site/</url>
</site>
</distributionManagement>

<scm>
<connection>scm:git:https://github.com/cloudant-labs/clouseau.git</connection>
</scm>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ case class OpenIndexMsg(peer: Pid, path: String, options: Any)
case class CleanupPathMsg(path: String)
case class RenamePathMsg(dbName: String)
case class CleanupDbMsg(dbName: String, activeSigs: List[String])
case class DiskSizeMsg(path: String)

case class Group1Msg(query: String, field: String, refresh: Boolean, groupSort: Any, groupOffset: Int,
groupLimit: Int)
Expand Down Expand Up @@ -88,6 +89,8 @@ object ClouseauTypeFactory extends TypeFactory {
Some(UpdateDocMsg(id, doc))
case ('delete, 2) =>
Some(DeleteDocMsg(reader.readAs[String]))
case ('disk_size, 2) =>
Some(DiskSizeMsg(reader.readAs[String]))
case ('commit, 2) =>
Some(CommitMsg(toLong(reader.readTerm)))
case ('set_update_seq, 2) =>
Expand Down
14 changes: 14 additions & 0 deletions src/main/scala/com/cloudant/clouseau/IndexManagerService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ class IndexManagerService(ctx: ServiceContext[ConfigurationArgs]) extends Servic
pid ! 'delete
'ok
}
case DiskSizeMsg(path: String) =>
getDiskSize(path)
case 'close_lru =>
lru.close()
'ok
Expand Down Expand Up @@ -146,6 +148,18 @@ class IndexManagerService(ctx: ServiceContext[ConfigurationArgs]) extends Servic
'ignored
}

private def getDiskSize(path: String) = {
val indexDir = new File(rootDir, path)
val files = indexDir.list()
if (files != null) {
val size = files.foldLeft(0L)((acc, fileName) =>
acc + (new File(indexDir, fileName)).length())
('ok, List(('disk_size, size)))
} else {
('error, 'not_a_directory)
}
}

private def replyAll(path: String, msg: Any) {
waiters.remove(path) match {
case Some(list) =>
Expand Down
108 changes: 74 additions & 34 deletions src/main/scala/com/cloudant/clouseau/IndexService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import scalang.Pid
import scalang.Reference
import com.spatial4j.core.context.SpatialContext
import com.spatial4j.core.distance.DistanceUtils
import java.util.HashSet

case class IndexServiceArgs(config: Configuration, name: String, queryParser: QueryParser, writer: IndexWriter)
case class HighlightParameters(highlighter: Highlighter, highlightFields: List[String], highlightNumber: Int, analyzers: List[Analyzer])
Expand All @@ -71,6 +72,7 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
var pendingSeq = updateSeq
var committing = false
var forceRefresh = false
var idle = true

val searchTimer = metrics.timer("searches")
val updateTimer = metrics.timer("updates")
Expand All @@ -80,45 +82,57 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
// Start committer heartbeat
val commitInterval = ctx.args.config.getInt("commit_interval_secs", 30)
sendEvery(self, 'maybe_commit, commitInterval * 1000)
val countFieldsEnabled = ctx.args.config.getBoolean("clouseau.count_fields", false)
send(self, 'count_fields)

// Check if the index is idle and optionally close it if there is no activity between
//Two consecutive idle status checks.
val closeIfIdleEnabled = ctx.args.config.getBoolean("clouseau.close_if_idle", false)
val idleTimeout = ctx.args.config.getInt("clouseau.idle_check_interval_secs", 300)
if (closeIfIdleEnabled) {
sendEvery(self, 'close_if_idle, idleTimeout * 1000)
}

debug("Opened at update_seq %d".format(updateSeq))

override def handleCall(tag: (Pid, Reference), msg: Any): Any = {
idle = false
send('main, ('touch_lru, ctx.args.name))

msg match {
case request: SearchRequest =>
search(request)
case Group1Msg(query: String, field: String, refresh: Boolean, groupSort: Any, groupOffset: Int,
groupLimit: Int) =>
group1(query, field, refresh, groupSort, groupOffset, groupLimit)
case request: Group2Msg =>
group2(request)
case 'get_update_seq =>
('ok, updateSeq)
case UpdateDocMsg(id: String, doc: Document) =>
debug("Updating %s".format(id))
updateTimer.time {
ctx.args.writer.updateDocument(new Term("_id", id), doc)
}
'ok
case DeleteDocMsg(id: String) =>
debug("Deleting %s".format(id))
deleteTimer.time {
ctx.args.writer.deleteDocuments(new Term("_id", id))
}
'ok
case CommitMsg(commitSeq: Long) => // deprecated
pendingSeq = commitSeq
debug("Pending sequence is now %d".format(commitSeq))
'ok
case SetUpdateSeqMsg(newSeq: Long) =>
pendingSeq = newSeq
debug("Pending sequence is now %d".format(newSeq))
'ok
case 'info =>
('ok, getInfo)
}
internalHandleCall(tag, msg)
}

def internalHandleCall(tag: (Pid, Reference), msg: Any): Any = msg match {
case request: SearchRequest =>
search(request)
case Group1Msg(query: String, field: String, refresh: Boolean, groupSort: Any, groupOffset: Int,
groupLimit: Int) =>
group1(query, field, refresh, groupSort, groupOffset, groupLimit)
case request: Group2Msg =>
group2(request)
case 'get_update_seq =>
('ok, updateSeq)
case UpdateDocMsg(id: String, doc: Document) =>
debug("Updating %s".format(id))
updateTimer.time {
ctx.args.writer.updateDocument(new Term("_id", id), doc)
}
'ok
case DeleteDocMsg(id: String) =>
debug("Deleting %s".format(id))
deleteTimer.time {
ctx.args.writer.deleteDocuments(new Term("_id", id))
}
'ok
case CommitMsg(commitSeq: Long) => // deprecated
pendingSeq = commitSeq
debug("Pending sequence is now %d".format(commitSeq))
'ok
case SetUpdateSeqMsg(newSeq: Long) =>
pendingSeq = newSeq
debug("Pending sequence is now %d".format(newSeq))
'ok
case 'info =>
('ok, getInfo)
}

override def handleCast(msg: Any) = msg match {
Expand All @@ -139,6 +153,13 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
exit(msg)
case ('close, reason) =>
exit(reason)
case ('close_if_idle) =>
if (idle) {
exit("Idle Timeout")
}
idle = true
case 'count_fields =>
countFields
case 'delete =>
val dir = ctx.args.writer.getDirectory
ctx.args.writer.close()
Expand All @@ -157,6 +178,25 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
committing = false
}

def countFields() {
if (countFieldsEnabled) {
val leaves = reader.leaves().iterator()
val warningThreshold = ctx.args.config.
getInt("clouseau.field_count_warn_threshold", 5000)
val fields = new HashSet[String]()
while (leaves.hasNext() && fields.size <= warningThreshold) {
val fieldInfoIter = leaves.next.reader().getFieldInfos().iterator()
while (fieldInfoIter.hasNext() && fields.size <= warningThreshold) {
fields.add(fieldInfoIter.next().name)
}
}
if (fields.size > warningThreshold) {
warn("Index has more than %d fields, ".format(warningThreshold) +
"too many fields will lead to heap exhuastion")
}
}
}

override def exit(msg: Any) {
debug("Closed with reason: %.1000s".format(msg))
try {
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/com/cloudant/clouseau/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ object Main extends App {

val name = config.getString("clouseau.name", "[email protected]")
val cookie = config.getString("clouseau.cookie", "monster")
val closeIfIdleEnabled = config.getBoolean("clouseau.close_if_idle", false)
val idleTimeout = config.getInt("clouseau.idle_check_interval_secs", 300)
if (closeIfIdleEnabled) {
logger.info("Idle timout is enabled and will check the indexer idle status every %d seconds".format(idleTimeout))
}
val nodeconfig = NodeConfig(
typeFactory = ClouseauTypeFactory,
typeEncoder = ClouseauTypeEncoder,
Expand Down
Loading

0 comments on commit 335c400

Please sign in to comment.