From bbf7bbec4f4f6ca1532c0ebd487729eed4c1097a Mon Sep 17 00:00:00 2001 From: jiangph Date: Thu, 22 Aug 2019 17:42:38 +0800 Subject: [PATCH] close writer before soft-deletion --- .../clouseau/IndexCleanupService.scala | 40 +++++++++---------- .../clouseau/IndexManagerService.scala | 12 ++++++ .../com/cloudant/clouseau/IndexService.scala | 14 ++++++- .../scala/com/cloudant/clouseau/Utils.scala | 35 ++++++++++++++++ .../clouseau/IndexCleanupServiceSpec.scala | 11 +++-- 5 files changed, 85 insertions(+), 27 deletions(-) diff --git a/src/main/scala/com/cloudant/clouseau/IndexCleanupService.scala b/src/main/scala/com/cloudant/clouseau/IndexCleanupService.scala index c0ba4c7f..c6b288b7 100644 --- a/src/main/scala/com/cloudant/clouseau/IndexCleanupService.scala +++ b/src/main/scala/com/cloudant/clouseau/IndexCleanupService.scala @@ -15,9 +15,6 @@ package com.cloudant.clouseau import com.yammer.metrics.scala._ import java.io.File import java.util.regex.Pattern -import java.text.SimpleDateFormat -import java.util.Calendar -import java.util.TimeZone import org.apache.log4j.Logger import scalang._ @@ -31,19 +28,10 @@ class IndexCleanupService(ctx: ServiceContext[ConfigurationArgs]) extends Servic val dir = new File(rootDir, path) logger.info("Removing %s".format(path)) recursivelyDelete(dir) - case RenamePathMsg(dbName: String) => - val srcDir = new File(rootDir, dbName) - val sdf = new SimpleDateFormat("yyyyMMdd'.'HHmmss") - sdf.setTimeZone(TimeZone.getTimeZone("UTC")) - val sdfNow = sdf.format(Calendar.getInstance().getTime()) - // move timestamp information in dbName to end of destination path - // for example, from foo.1234567890 to foo.20170912.092828.deleted.1234567890 - val destPath = dbName.dropRight(10) + sdfNow + ".deleted." + dbName.takeRight(10) - val destDir = new File(rootDir, destPath) - logger.info("Renaming '%s' to '%s'".format( - srcDir.getAbsolutePath, destDir.getAbsolutePath) - ) - rename(srcDir, destDir) + case RenamePathMsg(path: String) => + logger.info("Soft-deleting " + path) + val pattern = Pattern.compile(path + "/([0-9a-f]+)$") + rename(rootDir, path, pattern) case CleanupDbMsg(dbName: String, activeSigs: List[String]) => logger.info("Cleaning up " + dbName) val pattern = Pattern.compile("shards/[0-9a-f]+-[0-9a-f]+/" + dbName + "\\.[0-9]+/([0-9a-f]+)$") @@ -78,13 +66,23 @@ class IndexCleanupService(ctx: ServiceContext[ConfigurationArgs]) extends Servic fileOrDir.delete } - private def rename(srcDir: File, destDir: File) { - if (!srcDir.isDirectory) { + private def rename(fileOrDir: File, path: String, includePattern: Pattern) { + if (!fileOrDir.isDirectory) { return } - if (!srcDir.renameTo(destDir)) { - logger.error("Failed to rename directory from '%s' to '%s'".format( - srcDir.getAbsolutePath, destDir.getAbsolutePath)) + for (file <- fileOrDir.listFiles) { + rename(file, path, includePattern) + } + + val m = includePattern.matcher(fileOrDir.getAbsolutePath) + if (m.find) { + logger.info("Soft-deleting index " + m.group) + call('main, ('rename, m.group)) match { + case 'ok => + 'ok + case ('error, 'not_found) => + Utils.rename(rootDir, path, fileOrDir.getName) + } } } diff --git a/src/main/scala/com/cloudant/clouseau/IndexManagerService.scala b/src/main/scala/com/cloudant/clouseau/IndexManagerService.scala index 7a838cd6..0f5143ca 100644 --- a/src/main/scala/com/cloudant/clouseau/IndexManagerService.scala +++ b/src/main/scala/com/cloudant/clouseau/IndexManagerService.scala @@ -119,6 +119,18 @@ class IndexManagerService(ctx: ServiceContext[ConfigurationArgs]) extends Servic pid ! 'delete 'ok } + case ('rename, path: String) => + lru.get(path) match { + case null => + ('error, 'not_found) + case pid: Pid => + call('pid, ('close_writer)) + val dbpath = path.substring(0, path.lastIndexOf('/')) + val sig = path.substring(path.lastIndexOf('/') + 1) + Utils.rename(rootDir, dbpath, sig) + call('pid, ('exit_with_deleted)) + 'ok + } case DiskSizeMsg(path: String) => getDiskSize(path) case 'close_lru => diff --git a/src/main/scala/com/cloudant/clouseau/IndexService.scala b/src/main/scala/com/cloudant/clouseau/IndexService.scala index 0f1c4b99..7d28eeaa 100644 --- a/src/main/scala/com/cloudant/clouseau/IndexService.scala +++ b/src/main/scala/com/cloudant/clouseau/IndexService.scala @@ -169,6 +169,12 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w exit("Idle Timeout") } idle = true + case 'close_writer => + debug("Closing writer") + ctx.args.writer.close() + 'ok + case 'exit_with_deleted => + exit('deleted) case 'count_fields => countFields case 'delete => @@ -210,7 +216,6 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w } override def exit(msg: Any) { - debug("Closed with reason: %.1000s".format(msg)) try { reader.close() } catch { @@ -220,7 +225,12 @@ class IndexService(ctx: ServiceContext[IndexServiceArgs]) extends Service(ctx) w ctx.args.writer.rollback() } catch { case e: AlreadyClosedException => 'ignored - case e: IOException => warn("Error while closing writer", e) + case e: IOException => + val dir = ctx.args.writer.getDirectory + if (IndexWriter.isLocked(dir)) { + IndexWriter.unlock(dir); + } + warn("Error while closing writer", e) } finally { super.exit(msg) } diff --git a/src/main/scala/com/cloudant/clouseau/Utils.scala b/src/main/scala/com/cloudant/clouseau/Utils.scala index 7b07f1ec..a58c2057 100644 --- a/src/main/scala/com/cloudant/clouseau/Utils.scala +++ b/src/main/scala/com/cloudant/clouseau/Utils.scala @@ -15,6 +15,12 @@ package com.cloudant.clouseau import org.apache.lucene.index.Term import org.apache.lucene.util.BytesRef import org.apache.lucene.util.NumericUtils +import org.apache.log4j.Logger +import java.io.File +import java.io.IOException +import java.util.Calendar +import java.util.TimeZone +import java.text.SimpleDateFormat object Utils { @@ -29,4 +35,33 @@ object Utils { new BytesRef(string) } + def rename(rootDir: File, dbName: String, sig: String) { + val logger = Logger.getLogger("clouseau.utils") + val srcParentDir = new File(rootDir, dbName) + val sdf = new SimpleDateFormat("yyyyMMdd'.'HHmmss") + sdf.setTimeZone(TimeZone.getTimeZone("UTC")) + val sdfNow = sdf.format(Calendar.getInstance().getTime()) + // move timestamp information in dbName to end of destination path + // for example, from foo.1234567890 to foo.20170912.092828.deleted.1234567890 + val destParentPath = dbName.dropRight(10) + sdfNow + ".deleted." + dbName.takeRight(10) + val destParentDir = new File(rootDir, destParentPath) + logger.info("Renaming '%s' to '%s'".format( + srcParentDir.getAbsolutePath, destParentDir.getAbsolutePath) + ) + if (!srcParentDir.isDirectory) { + return + } + if (!destParentDir.exists) { + destParentDir.mkdirs + } + + val srcDir = new File(srcParentDir, sig) + val destDir = new File(destParentDir, sig) + + if (!srcDir.renameTo(destDir)) { + logger.error("Failed to rename directory from '%s' to '%s'".format( + srcDir.getAbsolutePath, destDir.getAbsolutePath)) + } + } + } diff --git a/src/test/scala/com/cloudant/clouseau/IndexCleanupServiceSpec.scala b/src/test/scala/com/cloudant/clouseau/IndexCleanupServiceSpec.scala index 0c815bb9..91a78768 100644 --- a/src/test/scala/com/cloudant/clouseau/IndexCleanupServiceSpec.scala +++ b/src/test/scala/com/cloudant/clouseau/IndexCleanupServiceSpec.scala @@ -23,9 +23,9 @@ class IndexCleanupServiceSpec extends SpecificationWithJUnit { "the index clean-up service" should { "rename index when database is deleted" in new cleanup_service { - node.cast(service, RenamePathMsg("foo.1234567890")) must be equalTo 'ok + node.cast(cleanup, RenamePathMsg("shards/00000000-ffffffff/foo.1234567890")) must be equalTo 'ok Thread.sleep(1000) - val indexdir = new File("target", "indexes") + val indexdir = new File(new File(new File("target", "indexes"), "shards"), "00000000-ffffffff") var subdirlist = List[String]() for (file <- indexdir.listFiles if file.getName contains ".deleted") { @@ -41,7 +41,9 @@ class IndexCleanupServiceSpec extends SpecificationWithJUnit { trait cleanup_service extends RunningNode { val config = new SystemConfiguration() val args = new ConfigurationArgs(config) - val service = node.spawnService[IndexCleanupService, ConfigurationArgs](args) + val cleanup = node.spawnService[IndexCleanupService, ConfigurationArgs](args) + var manager = node.spawnService[IndexManagerService, ConfigurationArgs]('main, args) + val mbox = node.spawnMbox val dir = new File("target", "indexes") @@ -51,7 +53,8 @@ trait cleanup_service extends RunningNode { } } - val foodir = new File(new File("target", "indexes"), "foo.1234567890") + val foodir = new File(new File(new File(new File(new File("target", "indexes"), "shards"), + "00000000-ffffffff"), "foo.1234567890"), "5838a59330e52227a58019dc1b9edd6e") if (!foodir.exists) { foodir.mkdirs }