Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data Loss / Corruption] Loading Errors when Using Multiple Connections Sequentially #43

Open
mharradon opened this issue Sep 11, 2024 · 1 comment

Comments

@mharradon
Copy link

Loading data from multiple connections sequentially can cause various corrupted load errors. I believe this is due to logic in decoder_dict_from_ctx not differentiating connections properly when loading decoding dictionaries:

// we cache the instantiated decoder dictionaries keyed by (DbConnection, dict_id)
// DbConnection would ideally be db.path() because it's the same for multiple connections to the same db, but that would be less robust (e.g. in-memory databases)
lazy_static::lazy_static! {
static ref DICTS: RwLock<LruCache<(usize, i32), Arc<DecoderDictionary<'static>>>> = RwLock::new(LruCache::with_expiry_duration(Duration::from_secs(10)));
}
let id: i32 = ctx.get(arg_index)?;
let db = unsafe { ctx.get_connection()? }; // SAFETY: This might be unsafe depending on how the connection is used. See https://github.com/rusqlite/rusqlite/issues/643#issuecomment-640181213
let db_handle_pointer = unsafe { db.handle() } as usize; // SAFETY: We're only getting the pointer as an int, not using the raw connection
let mut dicts_write = DICTS.write().unwrap();
let entry = dicts_write.entry((db_handle_pointer, id));

Here is a reproducing snippet. There is a bunch of boilerplate, but essentially the code creates two sqlite DBs, writes data / compresses them, and then loads from them both to validate the data.

import random
import sqlite3
import platform
from time import sleep
from math import ceil
from os import remove
import json
from os import environ
from multiprocessing import Pool, set_start_method
import logging

def gen_data():
    NUM_RUNS = 1024
    MAX_DUPS = 128
    NUM_ROWS = 1024

    full_dat = [xi 
                for xs in [[random.randint(0, 255)] * random.randint(1, MAX_DUPS) for _ in range(NUM_RUNS)]
                for xi in xs]
    split_inds = [0] + sorted([random.choice(range(len(full_dat))) + 1 for _ in range(NUM_ROWS)])
    dat = [str(full_dat[start_ind:end_ind]) for start_ind, end_ind in zip(split_inds[:-1], split_inds[1:])]
    return dat

def load_zstd(conn):
    with conn:
        conn.execute("PRAGMA journal_mode=WAL")
        conn.execute("PRAGMA auto_vacuum=1")
        conn.commit()
        conn.execute("VACUUM")
        conn.commit()

    with conn:
        conn.enable_load_extension(True)
    with conn:
        if platform.system().lower() != "darwin":
            conn.load_extension('libsqlite_zstd.so')
        else:
            conn.load_extension('libsqlite_zstd.dylib')
    with conn:
        conn.enable_load_extension(False)

def create_table_with_compression(connection):
    column_specs = [('entry_index', 'INTEGER', 'PRIMARY KEY'), 
                    ('entry_data', 'BLOB')]
    column_spec_strs = [' '.join(column_spec) for column_spec in column_specs]
    create_statement = f"CREATE TABLE t({', '.join(column_spec_strs)})"

    with connection:
        connection.execute(create_statement)
        connection.commit()

    compress_config = {
        'table': 't',
        'column': 'entry_data',
        'compression_level': 15,
        'dict_chooser': "'a'",
        'min_dict_size_bytes_for_training': 256,
        'dict_size_ratio': 0.1,
        'train_dict_samples_ratio': 100.0,
    }

    min_compress_size = ceil(1 / (compress_config['dict_size_ratio'] * compress_config['train_dict_samples_ratio']))

    with connection:
        connection.execute("SELECT zstd_enable_transparent(?)",
                           (json.dumps(compress_config),))
        connection.commit()

def insert_data_into_table(connection, data):
    with connection:
        connection.executemany(f"INSERT OR REPLACE INTO t VALUES(?, ?)",
                               enumerate(data))
        connection.commit()

def incremental_compress(connection):
    with connection:
        compression_result = connection.execute(f"SELECT zstd_incremental_maintenance(60, 1.0)")
        connection.commit()

def gen_db(filename, data):
    connection = sqlite3.connect(filename)

    load_zstd(connection)

    create_table_with_compression(connection)
    insert_data_into_table(connection, data)
    incremental_compress(connection)

    connection.close()

def load_db(filename):
    logging.warning(f"Loading {filename}")

    connection = sqlite3.connect(filename)
    load_zstd(connection)
    result = [xi[0].decode('ascii') for xi in connection.execute("SELECT entry_data FROM t")]
    connection.close()

    logging.warning(f"Loaded {filename}")
    return result

GEN_MULTIPROC = True
LOAD_MULTIPROC = False
MAX_TRIES = 64

if __name__=="__main__":
    set_start_method('spawn')
    environ["SQLITE_ZSTD_LOG"] = "warn"
    a_dat = gen_data()
    b_dat = gen_data()

    for i in range(MAX_TRIES):
        try:
            remove('a.sqlite')
        except:
            pass
        try:
            remove('b.sqlite')
        except:
            pass

        if GEN_MULTIPROC:
            with Pool(2) as p:
                p.starmap(gen_db, 
                          (('a.sqlite', a_dat),
                           ('b.sqlite', b_dat))) 
        else:
            gen_db('a.sqlite', a_dat)
            gen_db('b.sqlite', b_dat)

        environ["SQLITE_ZSTD_LOG"] = "debug"
        if LOAD_MULTIPROC:
            with Pool(2) as p:
                a_loaded, b_loaded = p.map(load_db, ['a.sqlite',
                                                     'b.sqlite'])
        else:
            a_loaded = load_db('a.sqlite')
            b_loaded = load_db('b.sqlite')

        environ["SQLITE_ZSTD_LOG"] = "warn"

        assert(all(ai == aii for ai, aii in zip(a_dat, a_loaded)))
        assert(all(bi == bii for bi, bii in zip(b_dat, b_loaded)))

This generates output like the following (SQLITE_ZSTD logging adjusted in the code to clarify the key indicator):

WARNING:root:Loading a.sqlite
[2024-09-11T14:58:24Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:24Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded a.sqlite
WARNING:root:Loading b.sqlite
[2024-09-11T14:58:24Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:24Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded b.sqlite
WARNING:root:Loading a.sqlite
[2024-09-11T14:58:25Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded a.sqlite
WARNING:root:Loading b.sqlite
[2024-09-11T14:58:25Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded b.sqlite
WARNING:root:Loading a.sqlite
[2024-09-11T14:58:25Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded a.sqlite
WARNING:root:Loading b.sqlite
[2024-09-11T14:58:25Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded b.sqlite
WARNING:root:Loading a.sqlite
[2024-09-11T14:58:25Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
[2024-09-11T14:58:25Z DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s)
WARNING:root:Loaded a.sqlite
WARNING:root:Loading b.sqlite
[2024-09-11T14:58:25Z INFO  sqlite_zstd::create_extension] [sqlite-zstd] initialized
Traceback (most recent call last):
  File "/Users/mharradon/tmp/sqlite-ztd-error/error.py", line 138, in <module>
    b_loaded = load_db('b.sqlite')
  File "/Users/mharradon/tmp/sqlite-ztd-error/error.py", line 96, in load_db
    result = [xi[0].decode('ascii') for xi in connection.execute("SELECT entry_data FROM t")]
  File "/Users/mharradon/tmp/sqlite-ztd-error/error.py", line 96, in <listcomp>
    result = [xi[0].decode('ascii') for xi in connection.execute("SELECT entry_data FROM t")]
sqlite3.OperationalError: decoding

Caused by:
    Corrupted block detected

I suspect the code I referred to is the issue as each decoding error is missing a preceding DEBUG sqlite_zstd::dict_management] loading decoder dictionary 1 (should only happen once per 10s). Also, as far as I've seen, the issue can be worked around in 2 ways:

  1. Load databases in fully independent processes (LOAD_MULTIPROC = True in the demo script)
  2. sleep(10) between accessing different connections

I originally observed this problem in a setting where the databases were written and read by separate systems at different times, so I don't suspect issues due to interaction between compression and decompression. I can't rule out that compression has a similar version of this issue on its own.

mharradon pushed a commit to mharradon/sqlite-zstd that referenced this issue Sep 11, 2024
mharradon pushed a commit to mharradon/sqlite-zstd that referenced this issue Sep 11, 2024
@mharradon
Copy link
Author

I have attempted a (very sloppy) fix in #44. Apologies, I'm no Rust author 😆. I can confirm that this fixes the above issue with both GEN_ and LOAD_- MULTIPROC = False - and that changes to both the encoder and the decoder parts were necessary to achieve that. Hopefully if nothing else that can point in the direction of a nicer fix.

Thanks for the great project!

mharradon pushed a commit to mharradon/sqlite-zstd that referenced this issue Sep 12, 2024
@mharradon mharradon changed the title [Data Loss / Data Corruption] Loading Errors when Using Multiple Connections Sequentially [Data Loss / Corruption] Loading Errors when Using Multiple Connections Sequentially Sep 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant