Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

close writer before soft deletion of searchindex #25

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 19 additions & 21 deletions src/main/scala/com/cloudant/clouseau/IndexCleanupService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ package com.cloudant.clouseau
import com.yammer.metrics.scala._
import java.io.File
import java.util.regex.Pattern
import java.text.SimpleDateFormat
import java.util.Calendar
import java.util.TimeZone
import org.apache.log4j.Logger
import scalang._

Expand All @@ -31,19 +28,10 @@ class IndexCleanupService(ctx: ServiceContext[ConfigurationArgs]) extends Servic
val dir = new File(rootDir, path)
logger.info("Removing %s".format(path))
recursivelyDelete(dir)
case RenamePathMsg(dbName: String) =>
val srcDir = new File(rootDir, dbName)
val sdf = new SimpleDateFormat("yyyyMMdd'.'HHmmss")
sdf.setTimeZone(TimeZone.getTimeZone("UTC"))
val sdfNow = sdf.format(Calendar.getInstance().getTime())
// move timestamp information in dbName to end of destination path
// for example, from foo.1234567890 to foo.20170912.092828.deleted.1234567890
val destPath = dbName.dropRight(10) + sdfNow + ".deleted." + dbName.takeRight(10)
val destDir = new File(rootDir, destPath)
logger.info("Renaming '%s' to '%s'".format(
srcDir.getAbsolutePath, destDir.getAbsolutePath)
)
rename(srcDir, destDir)
case RenamePathMsg(path: String) =>
logger.info("Soft-deleting " + path)
val pattern = Pattern.compile(path + "/([0-9a-f]+)$")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this a different pattern to the one we use in CleanupDbMsg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for CleanupDbMsg, the passed is pure database name while database shardname is passed for SoftDeletePathMsg, like shards/00000000-ffffffff/foo.1234567890. So the regexp is different for these two messages.

rename(rootDir, path, pattern)
case CleanupDbMsg(dbName: String, activeSigs: List[String]) =>
logger.info("Cleaning up " + dbName)
val pattern = Pattern.compile("shards/[0-9a-f]+-[0-9a-f]+/" + dbName + "\\.[0-9]+/([0-9a-f]+)$")
Expand Down Expand Up @@ -78,13 +66,23 @@ class IndexCleanupService(ctx: ServiceContext[ConfigurationArgs]) extends Servic
fileOrDir.delete
}

private def rename(srcDir: File, destDir: File) {
if (!srcDir.isDirectory) {
private def rename(fileOrDir: File, path: String, includePattern: Pattern) {
if (!fileOrDir.isDirectory) {
return
}
if (!srcDir.renameTo(destDir)) {
logger.error("Failed to rename directory from '%s' to '%s'".format(
srcDir.getAbsolutePath, destDir.getAbsolutePath))
for (file <- fileOrDir.listFiles) {
rename(file, path, includePattern)
}

val m = includePattern.matcher(fileOrDir.getAbsolutePath)
if (m.find) {
logger.info("Soft-deleting index " + m.group)
call('main, ('rename, m.group)) match {
case 'ok =>
'ok
case ('error, 'not_found) =>
Utils.rename(rootDir, path, fileOrDir.getName)
}
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/main/scala/com/cloudant/clouseau/IndexManagerService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ class IndexManagerService(ctx: ServiceContext[ConfigurationArgs]) extends Servic
pid ! 'delete
'ok
}
case ('rename, path: String) =>
lru.get(path) match {
case null =>
('error, 'not_found)
case pid: Pid =>
call('pid, ('close_writer))
val dbpath = path.substring(0, path.lastIndexOf('/'))
val sig = path.substring(path.lastIndexOf('/') + 1)
Utils.rename(rootDir, dbpath, sig)
call('pid, ('exit_with_deleted))
'ok
}
case DiskSizeMsg(path: String) =>
getDiskSize(path)
case 'close_lru =>
Expand Down
14 changes: 12 additions & 2 deletions src/main/scala/com/cloudant/clouseau/IndexService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
exit("Idle Timeout")
}
idle = true
case 'close_writer =>
debug("Closing writer")
ctx.args.writer.close()
'ok
case 'exit_with_deleted =>
exit('deleted)
case 'count_fields =>
countFields
case 'delete =>
Expand Down Expand Up @@ -210,7 +216,6 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
}

override def exit(msg: Any) {
debug("Closed with reason: %.1000s".format(msg))
try {
reader.close()
} catch {
Expand All @@ -220,7 +225,12 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w
ctx.args.writer.rollback()
} catch {
case e: AlreadyClosedException => 'ignored
case e: IOException => warn("Error while closing writer", e)
case e: IOException =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the right pattern. rollback() is already a close() (but without the commit), so the fallback to a failed rollback() is just the isLocked/unlock portion (the finally clause).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, let me remove close() line.

val dir = ctx.args.writer.getDirectory
if (IndexWriter.isLocked(dir)) {
IndexWriter.unlock(dir);
}
warn("Error while closing writer", e)
} finally {
super.exit(msg)
}
Expand Down
35 changes: 35 additions & 0 deletions src/main/scala/com/cloudant/clouseau/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ package com.cloudant.clouseau
import org.apache.lucene.index.Term
import org.apache.lucene.util.BytesRef
import org.apache.lucene.util.NumericUtils
import org.apache.log4j.Logger
import java.io.File
import java.io.IOException
import java.util.Calendar
import java.util.TimeZone
import java.text.SimpleDateFormat

object Utils {

Expand All @@ -29,4 +35,33 @@ object Utils {
new BytesRef(string)
}

def rename(rootDir: File, dbName: String, sig: String) {
val logger = Logger.getLogger("clouseau.utils")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the rootDir here is always the same, so don't pass it in, simply fetch it within this method. This will make it clearer which directory we are renaming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks that there is noctx in Utils. So I keep this for now.

val srcParentDir = new File(rootDir, dbName)
val sdf = new SimpleDateFormat("yyyyMMdd'.'HHmmss")
sdf.setTimeZone(TimeZone.getTimeZone("UTC"))
val sdfNow = sdf.format(Calendar.getInstance().getTime())
// move timestamp information in dbName to end of destination path
// for example, from foo.1234567890 to foo.20170912.092828.deleted.1234567890
val destParentPath = dbName.dropRight(10) + sdfNow + ".deleted." + dbName.takeRight(10)
val destParentDir = new File(rootDir, destParentPath)
logger.info("Renaming '%s' to '%s'".format(
srcParentDir.getAbsolutePath, destParentDir.getAbsolutePath)
)
if (!srcParentDir.isDirectory) {
return
}
if (!destParentDir.exists) {
destParentDir.mkdirs
}

val srcDir = new File(srcParentDir, sig)
val destDir = new File(destParentDir, sig)

if (!srcDir.renameTo(destDir)) {
logger.error("Failed to rename directory from '%s' to '%s'".format(
srcDir.getAbsolutePath, destDir.getAbsolutePath))
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ class IndexCleanupServiceSpec extends SpecificationWithJUnit {
"the index clean-up service" should {

"rename index when database is deleted" in new cleanup_service {
node.cast(service, RenamePathMsg("foo.1234567890")) must be equalTo 'ok
node.cast(cleanup, RenamePathMsg("shards/00000000-ffffffff/foo.1234567890")) must be equalTo 'ok
Thread.sleep(1000)
val indexdir = new File("target", "indexes")
val indexdir = new File(new File(new File("target", "indexes"), "shards"), "00000000-ffffffff")
var subdirlist = List[String]()

for (file <- indexdir.listFiles if file.getName contains ".deleted") {
Expand All @@ -41,7 +41,9 @@ class IndexCleanupServiceSpec extends SpecificationWithJUnit {
trait cleanup_service extends RunningNode {
val config = new SystemConfiguration()
val args = new ConfigurationArgs(config)
val service = node.spawnService[IndexCleanupService, ConfigurationArgs](args)
val cleanup = node.spawnService[IndexCleanupService, ConfigurationArgs](args)
var manager = node.spawnService[IndexManagerService, ConfigurationArgs]('main, args)

val mbox = node.spawnMbox

val dir = new File("target", "indexes")
Expand All @@ -51,7 +53,8 @@ trait cleanup_service extends RunningNode {
}
}

val foodir = new File(new File("target", "indexes"), "foo.1234567890")
val foodir = new File(new File(new File(new File(new File("target", "indexes"), "shards"),
"00000000-ffffffff"), "foo.1234567890"), "5838a59330e52227a58019dc1b9edd6e")
if (!foodir.exists) {
foodir.mkdirs
}
Expand Down