Skip to content

Commit

Permalink
lmdb and sql fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeTWC1984 committed Mar 22, 2024
1 parent 93629ef commit 82ffe30
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 58 deletions.
8 changes: 7 additions & 1 deletion bin/storage-cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ args = args.get(); // simple hash
// copy debug flag into config (for standalone)
config.Storage.debug = args.debug;

// disable storage transactions for CLI
// indicate that you want to enable engine level transaction
// to pack multiple crud operation in one transaction
// this is default for setup/migration, to avoid use --notrx argument
if(cmd == 'install' || cmd == 'setup') config.Storage.trans = true
if(args.notrx) config.Storage.trans = false

// disable storage transactions for CLI (this is storage level transaction)
config.Storage.transactions = false;

var print = function (msg) {
Expand Down
19 changes: 17 additions & 2 deletions bin/storage-migrate.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,13 @@ var StorageMigrator = {
// massage config, override logger
config.Storage.logger = self.logger;
config.Storage.log_event_types = { all: 1 };

config.NewStorage.logger = self.logger;
config.NewStorage.log_event_types = { all: 1 };

// if engine supports (sql and lmdb) begin transaction
config.NewStorage.trans = true
if(args.notrx) config.NewStorage.trans = false

// start both standalone storage instances
async.series(
[
Expand All @@ -100,8 +103,12 @@ var StorageMigrator = {
self.logPrint( 3, "Switching to user: " + config.uid );
process.setuid( config.uid );
}

self.logPrint(2, "before test");

self.testStorage();
// self.startMigration();

}
); // series
},
Expand All @@ -110,18 +117,26 @@ var StorageMigrator = {
// test both old and new storage
var self = this;
this.logDebug(3, "Testing storage engines");
self.logPrint(2, "test begin test");

async.series(
[
function(callback) {
self.oldStorage.get('global/users', callback);
self.logPrint(2, "test old users");
},
function(callback) {
self.newStorage.put('test/test1', { "foo1": "bar1" }, function(err) {
if (err) return callback(err);
self.logPrint(2, "test new foo test");

// throw new Error('before del')

self.newStorage.delete('test/test1', function(err) {

self.logPrint(2, "test new before delete", err);
if (err) return callback(err);
self.logPrint(2, "test new delete");

callback();
});
Expand Down
2 changes: 1 addition & 1 deletion bundle
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ if [ ! -e 'package.json' ]; then
fi

if [ "$lmdb" = 1 ]; then
npm i lmdb --loglevel silent
npm i lmdb@2.9.4 --loglevel silent
fi

cd - &>/dev/null
Expand Down
49 changes: 27 additions & 22 deletions engines/Lmdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// },

const Component = require("pixl-server/component");
// const Component = require("pixl-component");
const { Readable } = require('stream');
const lmdb = require("lmdb");

Expand Down Expand Up @@ -44,12 +43,19 @@ module.exports = class LmdbEngine extends Component {

async setup(callback) {

// this.db = new Level(this.dbpath, { valueEncoding: 'json' })
const self = this;

let db = lmdb.open(this.dbpath, { valueEncoding: 'json' })
this.db = db

// // if db is in use by other process we'll get an error here
// await this.db.open()

this.db = lmdb.open(this.dbpath, this.config)

if(this.storage.config.get('trans')) {
// beginTransaction
db.transactionSync(() => new Promise(resolve => self.commit = resolve));

}

callback();
}
Expand Down Expand Up @@ -77,7 +83,7 @@ module.exports = class LmdbEngine extends Component {
err.message = "Failed to store object: " + key + ": " + err;
self.logError('error', '' + err);
if (callback) callback(err);
}
}
}

putStream(key, inp, callback) {
Expand Down Expand Up @@ -109,8 +115,6 @@ module.exports = class LmdbEngine extends Component {
else {
callback(new NoSuchKeyError(key), null)
}
// this.db.doesExist(key) ?
// callback(null, { mod: 1, len: 0 }) : callback(new NoSuchKeyError(key, 'head'), null)
}
catch (err) {
err.message = "Failed to head key: " + key + ": " + err.message;
Expand All @@ -122,7 +126,7 @@ module.exports = class LmdbEngine extends Component {
get(key, callback) {
// fetch LevelDB value by given key
const self = this;

key = this.prepKey(key);

let isBinary = self.storage.isBinaryKey(key)
Expand All @@ -148,7 +152,7 @@ module.exports = class LmdbEngine extends Component {
finally {
callback(getError, val)
}

}

getStream(key, callback) {
Expand All @@ -171,36 +175,37 @@ module.exports = class LmdbEngine extends Component {
}


delete(key, callback) {
async delete(key, callback) {
// delete LevelDb key given key
var self = this;
key = this.prepKey(key);

this.logDebug(9, "Deleting LevelDb Object: " + key);

let delError = null
let delError = null;

this.db.del(key, function (err, deleted) {
if (!err && !deleted) delError = new NoSuchKeyError(key)
if (err) {
self.logError('lmdb', "Failed to delete object: " + key + ": " + err);
delError = err
}
else self.logDebug(9, "Delete complete: " + key);
try {
delError = await this.db.del(key) ? self.logDebug(9, "Delete complete: " + key) : new NoSuchKeyError(key)
}
catch (err) {
self.logDebug(9, "Delete complete: " + key);
delError = err
}

callback(delError)

callback(delError);
});
}

runMaintenance(callback) {
// run daily maintenance
callback();
}

shutdown(callback) {
async shutdown(callback) {
// shutdown storage
this.logDebug(2, "Closing Lmdb");
if (this.db) this.db.close();
if(this.commit) await this.commit();
if (this.db) await this.db.close();
callback();
}

Expand Down
65 changes: 33 additions & 32 deletions engines/SQL.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ module.exports = class SQLEngine extends Component {
// setup SQL connection
const self = this;
const sql_config = this.config.get();

this.db = knex(sql_config)

let db = knex(sql_config);
this.db = db;

this.db.client.pool.on('createSuccess', () => {
self.logDebug(3, "SQL connected successfully")
Expand Down Expand Up @@ -124,6 +125,8 @@ module.exports = class SQLEngine extends Component {
})
}

if(self.storage.config.get('trans')) this.trx = await db.transaction()

if (!self.storage.started) return callback();

}
Expand All @@ -139,9 +142,11 @@ module.exports = class SQLEngine extends Component {
*/
async put(key, value, callback) {
// store key+value in SQL
var self = this;
const self = this;
const db = this.trx || this.db

key = this.prepKey(key);

if (this.storage.isBinaryKey(key)) {
this.logDebug(9, "Storing SQL Binary Object: " + key, '' + value.length + ' bytes');
}
Expand All @@ -153,10 +158,10 @@ module.exports = class SQLEngine extends Component {
// For oracle/mssql use MERGE statement, for other drivers use "INSEERT/ON CONFLICT" mechanism
try {
if (this.mergeStmt) { // this.client === 'mssql' || this.client === 'oracledb'
await this.db.raw(this.mergeStmt, [ key, Buffer.from(value)])
await db.raw(this.mergeStmt, [ key, Buffer.from(value)])
}
else {
await this.db(this.tableName)
await db(this.tableName)
.insert({ K: key, V: Buffer.from(value), updated: this.db.fn.now() })
.onConflict('K')
.merge()
Expand All @@ -176,8 +181,8 @@ module.exports = class SQLEngine extends Component {

putStream(key, inp, callback) {
// store key+value in SQL using read stream
var self = this;

const self = this;
// There is no common way to stream BLOBs from SQL
// So, we have to do this the RAM-hard way...

Expand All @@ -193,13 +198,15 @@ module.exports = class SQLEngine extends Component {

async head(key, callback) {
// head value by given key. Just return blob size
var self = this;
const self = this;
const db = this.trx || this.db

key = this.prepKey(key);

try {
let rows = await this.db(this.tableName).where('K', key).select([
this.db.raw(`${this.getBlobSizeFn} as len`),
this.db.raw('1 as mod')
let rows = await db(this.tableName).where('K', key).select([
db.raw(`${this.getBlobSizeFn} as len`),
db.raw('1 as mod')
])
if (rows.length > 0) {
callback(null, rows[0]);
Expand All @@ -221,13 +228,14 @@ module.exports = class SQLEngine extends Component {

async get(key, callback) {
// fetch SQL value given key
var self = this;
const self = this;
const db = this.trx || this.db
key = this.prepKey(key);

this.logDebug(9, "Fetching SQL Object: " + key);

try {
let data = (await this.db(this.tableName).where('K', key).select(["K as key", 'V as value']))[0] // expected {key: key, value: value}
let data = (await db(this.tableName).where('K', key).select(["K as key", 'V as value']))[0] // expected {key: key, value: value}
let result = (data || {}).value
if (result) {
if (self.storage.isBinaryKey(key)) {
Expand Down Expand Up @@ -328,32 +336,24 @@ module.exports = class SQLEngine extends Component {

async delete(key, callback) {
// delete SQL key given key
var self = this;
const self = this;
const db = this.trx || this.db
key = this.prepKey(key);

this.logDebug(9, "Deleting SQL Object: " + key);

let ERR
let delError

try {
let d = await this.db(this.tableName).where('K', key).del()

if (d > 0) {
self.logDebug(9, "Delete complete: " + key);
if (callback) callback(null)
} else {
ERR = new Error("Failed to fetch key: " + key + ": Not found");
ERR.code = "NoSuchKey";

}

let d = await db(this.tableName).where('K', key).del()
delError = d > 0 ? self.logDebug(9, "Delete complete: " + key) : new Error("Failed to fetch key: " + key + ": Not found");
}
catch (err) {
self.logError('sql', "Failed to delete object: " + key + ": " + err);
ERR = err
delError = err
}

if (callback) callback(ERR)
callback(delError)

}

Expand All @@ -362,10 +362,11 @@ module.exports = class SQLEngine extends Component {
callback();
}

shutdown(callback) {
async shutdown(callback) {
// shutdown storage
this.logDebug(2, "Shutting down SQL");
this.db.destroy()
if(this.trx) await this.trx.commit()
await this.db.destroy()
callback();
}

Expand Down

0 comments on commit 82ffe30

Please sign in to comment.