From 82ffe304c718af44109acc3c02e8f6cf8378f21b Mon Sep 17 00:00:00 2001 From: miketwc1984 Date: Thu, 21 Mar 2024 21:55:22 -0400 Subject: [PATCH] lmdb and sql fixes --- bin/storage-cli.js | 8 +++++- bin/storage-migrate.js | 19 ++++++++++-- bundle | 2 +- engines/Lmdb.js | 49 +++++++++++++++++-------------- engines/SQL.js | 65 +++++++++++++++++++++--------------------- 5 files changed, 85 insertions(+), 58 deletions(-) diff --git a/bin/storage-cli.js b/bin/storage-cli.js index 0888cb0..864feaf 100755 --- a/bin/storage-cli.js +++ b/bin/storage-cli.js @@ -47,7 +47,13 @@ args = args.get(); // simple hash // copy debug flag into config (for standalone) config.Storage.debug = args.debug; -// disable storage transactions for CLI +// indicate that you want to enable engine level transaction +// to pack multiple crud operation in one transaction +// this is default for setup/migration, to avoid use --notrx argument +if(cmd == 'install' || cmd == 'setup') config.Storage.trans = true +if(args.notrx) config.Storage.trans = false + +// disable storage transactions for CLI (this is storage level transaction) config.Storage.transactions = false; var print = function (msg) { diff --git a/bin/storage-migrate.js b/bin/storage-migrate.js index 1cd4dc2..c44bf3b 100644 --- a/bin/storage-migrate.js +++ b/bin/storage-migrate.js @@ -76,10 +76,13 @@ var StorageMigrator = { // massage config, override logger config.Storage.logger = self.logger; config.Storage.log_event_types = { all: 1 }; - + config.NewStorage.logger = self.logger; config.NewStorage.log_event_types = { all: 1 }; - + // if engine supports (sql and lmdb) begin transaction + config.NewStorage.trans = true + if(args.notrx) config.NewStorage.trans = false + // start both standalone storage instances async.series( [ @@ -100,8 +103,12 @@ var StorageMigrator = { self.logPrint( 3, "Switching to user: " + config.uid ); process.setuid( config.uid ); } + + self.logPrint(2, "before test"); self.testStorage(); + // self.startMigration(); + } ); // series }, @@ -110,18 +117,26 @@ var StorageMigrator = { // test both old and new storage var self = this; this.logDebug(3, "Testing storage engines"); + self.logPrint(2, "test begin test"); async.series( [ function(callback) { self.oldStorage.get('global/users', callback); + self.logPrint(2, "test old users"); }, function(callback) { self.newStorage.put('test/test1', { "foo1": "bar1" }, function(err) { if (err) return callback(err); + self.logPrint(2, "test new foo test"); + + // throw new Error('before del') self.newStorage.delete('test/test1', function(err) { + + self.logPrint(2, "test new before delete", err); if (err) return callback(err); + self.logPrint(2, "test new delete"); callback(); }); diff --git a/bundle b/bundle index c191a5c..1868e55 100755 --- a/bundle +++ b/bundle @@ -411,7 +411,7 @@ if [ ! -e 'package.json' ]; then fi if [ "$lmdb" = 1 ]; then - npm i lmdb --loglevel silent + npm i lmdb@2.9.4 --loglevel silent fi cd - &>/dev/null diff --git a/engines/Lmdb.js b/engines/Lmdb.js index f993b89..5818652 100644 --- a/engines/Lmdb.js +++ b/engines/Lmdb.js @@ -11,7 +11,6 @@ // }, const Component = require("pixl-server/component"); -// const Component = require("pixl-component"); const { Readable } = require('stream'); const lmdb = require("lmdb"); @@ -44,12 +43,19 @@ module.exports = class LmdbEngine extends Component { async setup(callback) { - // this.db = new Level(this.dbpath, { valueEncoding: 'json' }) + const self = this; + + let db = lmdb.open(this.dbpath, { valueEncoding: 'json' }) + this.db = db // // if db is in use by other process we'll get an error here // await this.db.open() - - this.db = lmdb.open(this.dbpath, this.config) + + if(this.storage.config.get('trans')) { + // beginTransaction + db.transactionSync(() => new Promise(resolve => self.commit = resolve)); + + } callback(); } @@ -77,7 +83,7 @@ module.exports = class LmdbEngine extends Component { err.message = "Failed to store object: " + key + ": " + err; self.logError('error', '' + err); if (callback) callback(err); - } + } } putStream(key, inp, callback) { @@ -109,8 +115,6 @@ module.exports = class LmdbEngine extends Component { else { callback(new NoSuchKeyError(key), null) } - // this.db.doesExist(key) ? - // callback(null, { mod: 1, len: 0 }) : callback(new NoSuchKeyError(key, 'head'), null) } catch (err) { err.message = "Failed to head key: " + key + ": " + err.message; @@ -122,7 +126,7 @@ module.exports = class LmdbEngine extends Component { get(key, callback) { // fetch LevelDB value by given key const self = this; - + key = this.prepKey(key); let isBinary = self.storage.isBinaryKey(key) @@ -148,7 +152,7 @@ module.exports = class LmdbEngine extends Component { finally { callback(getError, val) } - + } getStream(key, callback) { @@ -171,25 +175,25 @@ module.exports = class LmdbEngine extends Component { } - delete(key, callback) { + async delete(key, callback) { // delete LevelDb key given key var self = this; key = this.prepKey(key); this.logDebug(9, "Deleting LevelDb Object: " + key); - let delError = null + let delError = null; - this.db.del(key, function (err, deleted) { - if (!err && !deleted) delError = new NoSuchKeyError(key) - if (err) { - self.logError('lmdb', "Failed to delete object: " + key + ": " + err); - delError = err - } - else self.logDebug(9, "Delete complete: " + key); + try { + delError = await this.db.del(key) ? self.logDebug(9, "Delete complete: " + key) : new NoSuchKeyError(key) + } + catch (err) { + self.logDebug(9, "Delete complete: " + key); + delError = err + } + + callback(delError) - callback(delError); - }); } runMaintenance(callback) { @@ -197,10 +201,11 @@ module.exports = class LmdbEngine extends Component { callback(); } - shutdown(callback) { + async shutdown(callback) { // shutdown storage this.logDebug(2, "Closing Lmdb"); - if (this.db) this.db.close(); + if(this.commit) await this.commit(); + if (this.db) await this.db.close(); callback(); } diff --git a/engines/SQL.js b/engines/SQL.js index 3a96c75..6e30dbd 100644 --- a/engines/SQL.js +++ b/engines/SQL.js @@ -63,8 +63,9 @@ module.exports = class SQLEngine extends Component { // setup SQL connection const self = this; const sql_config = this.config.get(); - - this.db = knex(sql_config) + + let db = knex(sql_config); + this.db = db; this.db.client.pool.on('createSuccess', () => { self.logDebug(3, "SQL connected successfully") @@ -124,6 +125,8 @@ module.exports = class SQLEngine extends Component { }) } + if(self.storage.config.get('trans')) this.trx = await db.transaction() + if (!self.storage.started) return callback(); } @@ -139,9 +142,11 @@ module.exports = class SQLEngine extends Component { */ async put(key, value, callback) { // store key+value in SQL - var self = this; + const self = this; + const db = this.trx || this.db + key = this.prepKey(key); - + if (this.storage.isBinaryKey(key)) { this.logDebug(9, "Storing SQL Binary Object: " + key, '' + value.length + ' bytes'); } @@ -153,10 +158,10 @@ module.exports = class SQLEngine extends Component { // For oracle/mssql use MERGE statement, for other drivers use "INSEERT/ON CONFLICT" mechanism try { if (this.mergeStmt) { // this.client === 'mssql' || this.client === 'oracledb' - await this.db.raw(this.mergeStmt, [ key, Buffer.from(value)]) + await db.raw(this.mergeStmt, [ key, Buffer.from(value)]) } else { - await this.db(this.tableName) + await db(this.tableName) .insert({ K: key, V: Buffer.from(value), updated: this.db.fn.now() }) .onConflict('K') .merge() @@ -176,8 +181,8 @@ module.exports = class SQLEngine extends Component { putStream(key, inp, callback) { // store key+value in SQL using read stream - var self = this; - + const self = this; + // There is no common way to stream BLOBs from SQL // So, we have to do this the RAM-hard way... @@ -193,13 +198,15 @@ module.exports = class SQLEngine extends Component { async head(key, callback) { // head value by given key. Just return blob size - var self = this; + const self = this; + const db = this.trx || this.db + key = this.prepKey(key); try { - let rows = await this.db(this.tableName).where('K', key).select([ - this.db.raw(`${this.getBlobSizeFn} as len`), - this.db.raw('1 as mod') + let rows = await db(this.tableName).where('K', key).select([ + db.raw(`${this.getBlobSizeFn} as len`), + db.raw('1 as mod') ]) if (rows.length > 0) { callback(null, rows[0]); @@ -221,13 +228,14 @@ module.exports = class SQLEngine extends Component { async get(key, callback) { // fetch SQL value given key - var self = this; + const self = this; + const db = this.trx || this.db key = this.prepKey(key); - + this.logDebug(9, "Fetching SQL Object: " + key); try { - let data = (await this.db(this.tableName).where('K', key).select(["K as key", 'V as value']))[0] // expected {key: key, value: value} + let data = (await db(this.tableName).where('K', key).select(["K as key", 'V as value']))[0] // expected {key: key, value: value} let result = (data || {}).value if (result) { if (self.storage.isBinaryKey(key)) { @@ -328,32 +336,24 @@ module.exports = class SQLEngine extends Component { async delete(key, callback) { // delete SQL key given key - var self = this; + const self = this; + const db = this.trx || this.db key = this.prepKey(key); this.logDebug(9, "Deleting SQL Object: " + key); - let ERR + let delError try { - let d = await this.db(this.tableName).where('K', key).del() - - if (d > 0) { - self.logDebug(9, "Delete complete: " + key); - if (callback) callback(null) - } else { - ERR = new Error("Failed to fetch key: " + key + ": Not found"); - ERR.code = "NoSuchKey"; - - } - + let d = await db(this.tableName).where('K', key).del() + delError = d > 0 ? self.logDebug(9, "Delete complete: " + key) : new Error("Failed to fetch key: " + key + ": Not found"); } catch (err) { self.logError('sql', "Failed to delete object: " + key + ": " + err); - ERR = err + delError = err } - if (callback) callback(ERR) + callback(delError) } @@ -362,10 +362,11 @@ module.exports = class SQLEngine extends Component { callback(); } - shutdown(callback) { + async shutdown(callback) { // shutdown storage this.logDebug(2, "Shutting down SQL"); - this.db.destroy() + if(this.trx) await this.trx.commit() + await this.db.destroy() callback(); }