Skip to content

Commit

Permalink
fix(core): ensure that the database connection is closed when nx exits (
Browse files Browse the repository at this point in the history
#28821)

<!-- Please make sure you have read the submission guidelines before
posting an PR -->
<!--
https://github.com/nrwl/nx/blob/master/CONTRIBUTING.md#-submitting-a-pr
-->

<!-- Please make sure that your commit message follows our format -->
<!-- Example: `fix(nx): must begin with lowercase` -->

<!-- If this is a particularly complex change or feature addition, you
can request a dedicated Nx release for this pull request branch. Mention
someone from the Nx team or the `@nrwl/nx-pipelines-reviewers` and they
will confirm if the PR warrants its own release for testing purposes,
and generate it for you if appropriate. -->

## Current Behavior
<!-- This is the behavior we have today -->

DB connections are sometimes left open.

## Expected Behavior
<!-- This is the behavior we should expect with the changes in this PR
-->

DB connections are closed when the process exits

## Related Issue(s)
<!-- Please link the issue being fixed so it gets closed when this is
merged. -->

Fixes #

(cherry picked from commit bc1a6cf)
  • Loading branch information
Cammisuli authored and FrozenPandaz committed Nov 6, 2024
1 parent 4c91df6 commit 89311c7
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 39 deletions.
9 changes: 9 additions & 0 deletions packages/nx/bin/nx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { assertSupportedPlatform } from '../src/native/assert-supported-platform
import { performance } from 'perf_hooks';
import { setupWorkspaceContext } from '../src/utils/workspace-context';
import { daemonClient } from '../src/daemon/client/client';
import { removeDbConnections } from '../src/utils/db-connection';

function main() {
if (
Expand Down Expand Up @@ -274,4 +275,12 @@ const getLatestVersionOfNx = ((fn: () => string) => {
return () => cache || (cache = fn());
})(_getLatestVersionOfNx);

function nxCleanup() {
removeDbConnections();
}
process.on('exit', nxCleanup);
process.on('SIGINT', nxCleanup);
process.on('SIGTERM', nxCleanup);
process.on('SIGHUP', nxCleanup);

main();
3 changes: 3 additions & 0 deletions packages/nx/src/daemon/server/shutdown-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
DaemonProjectGraphError,
ProjectGraphError,
} from '../../project-graph/error-types';
import { removeDbConnections } from '../../utils/db-connection';

export const SERVER_INACTIVITY_TIMEOUT_MS = 10800000 as const; // 10800000 ms = 3 hours

Expand Down Expand Up @@ -71,6 +72,8 @@ export async function handleServerProcessTermination({
deleteDaemonJsonProcessCache();
cleanupPlugins();

removeDbConnections();

serverLogger.log(`Server stopped because: "${reason}"`);
} finally {
process.exit(0);
Expand Down
99 changes: 66 additions & 33 deletions packages/nx/src/native/db/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use std::thread;
use std::time::Duration;
use tracing::trace;

#[derive(Default)]
pub struct NxDbConnection {
pub conn: Connection,
pub conn: Option<Connection>,
}

const MAX_RETRIES: u32 = 20;
Expand Down Expand Up @@ -53,77 +54,109 @@ macro_rules! retry_db_operation_when_busy {

impl NxDbConnection {
pub fn new(connection: Connection) -> Self {
Self { conn: connection }
Self {
conn: Some(connection),
}
}

pub fn execute<P: Params + Clone>(&self, sql: &str, params: P) -> Result<usize> {
retry_db_operation_when_busy!(self.conn.execute(sql, params.clone()))
.map_err(|e| anyhow::anyhow!("DB execute error: \"{}\", {:?}", sql, e))
if let Some(conn) = &self.conn {
retry_db_operation_when_busy!(conn.execute(sql, params.clone()))
.map_err(|e| anyhow::anyhow!("DB execute error: \"{}\", {:?}", sql, e))
} else {
anyhow::bail!("No database connection available")
}
}

pub fn execute_batch(&self, sql: &str) -> Result<()> {
retry_db_operation_when_busy!(self.conn.execute_batch(sql))
.map_err(|e| anyhow::anyhow!("DB execute batch error: \"{}\", {:?}", sql, e))
if let Some(conn) = &self.conn {
retry_db_operation_when_busy!(conn.execute_batch(sql))
.map_err(|e| anyhow::anyhow!("DB execute batch error: \"{}\", {:?}", sql, e))
} else {
anyhow::bail!("No database connection available")
}
}

pub fn prepare(&self, sql: &str) -> Result<Statement> {
retry_db_operation_when_busy!(self.conn.prepare(sql))
.map_err(|e| anyhow::anyhow!("DB prepare error: \"{}\", {:?}", sql, e))
if let Some(conn) = &self.conn {
retry_db_operation_when_busy!(conn.prepare(sql))
.map_err(|e| anyhow::anyhow!("DB prepare error: \"{}\", {:?}", sql, e))
} else {
anyhow::bail!("No database connection available")
}
}

pub fn transaction<T>(
&mut self,
transaction_operation: impl Fn(&Connection) -> rusqlite::Result<T>,
) -> Result<T> {
let transaction = retry_db_operation_when_busy!(self.conn.transaction())
.map_err(|e| anyhow::anyhow!("DB transaction error: {:?}", e))?;
if let Some(conn) = self.conn.as_mut() {
let transaction = retry_db_operation_when_busy!(conn.transaction())
.map_err(|e| anyhow::anyhow!("DB transaction error: {:?}", e))?;

let result = transaction_operation(&transaction)
.map_err(|e| anyhow::anyhow!("DB transaction operation error: {:?}", e))?;
let result = transaction_operation(&transaction)
.map_err(|e| anyhow::anyhow!("DB transaction operation error: {:?}", e))?;

transaction
.commit()
.map_err(|e| anyhow::anyhow!("DB transaction commit error: {:?}", e))?;
transaction
.commit()
.map_err(|e| anyhow::anyhow!("DB transaction commit error: {:?}", e))?;

Ok(result)
Ok(result)
} else {
anyhow::bail!("No database connection available")
}
}

pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<Option<T>>
where
P: Params + Clone,
F: FnOnce(&Row<'_>) -> rusqlite::Result<T> + Clone,
{
retry_db_operation_when_busy!(self
.conn
.query_row(sql, params.clone(), f.clone())
.optional())
.map_err(|e| anyhow::anyhow!("DB query error: \"{}\", {:?}", sql, e))
if let Some(conn) = &self.conn {
retry_db_operation_when_busy!(conn.query_row(sql, params.clone(), f.clone()).optional())
.map_err(|e| anyhow::anyhow!("DB query error: \"{}\", {:?}", sql, e))
} else {
anyhow::bail!("No database connection available")
}
}

pub fn close(self) -> rusqlite::Result<(), (Connection, Error)> {
self.conn
.close()
.inspect_err(|e| trace!("Error in close: {:?}", e))
pub fn close(self) -> Result<()> {
trace!("Closing database connection");
if let Some(conn) = self.conn {
conn.close()
.map_err(|(_, err)| anyhow::anyhow!("Unable to close connection: {:?}", err))
} else {
anyhow::bail!("No database connection available")
}
}

pub fn pragma_update<V>(
&self,
schema_name: Option<DatabaseName<'_>>,
pragma_name: &str,
pragma_value: V,
) -> rusqlite::Result<()>
) -> Result<()>
where
V: ToSql + Clone,
{
retry_db_operation_when_busy!(self.conn.pragma_update(
schema_name,
pragma_name,
pragma_value.clone()
))
if let Some(conn) = &self.conn {
retry_db_operation_when_busy!(conn.pragma_update(
schema_name,
pragma_name,
pragma_value.clone()
))
.map_err(|e| anyhow::anyhow!("DB pragma update error: {:?}", e))
} else {
anyhow::bail!("No database connection available")
}
}

pub fn busy_handler(&self, callback: Option<fn(i32) -> bool>) -> Result<()> {
retry_db_operation_when_busy!(self.conn.busy_handler(callback))
.map_err(|e| anyhow::anyhow!("DB busy handler error: {:?}", e))
if let Some(conn) = &self.conn {
retry_db_operation_when_busy!(conn.busy_handler(callback))
.map_err(|e| anyhow::anyhow!("DB busy handler error: {:?}", e))
} else {
anyhow::bail!("No database connection available")
}
}
}
6 changes: 3 additions & 3 deletions packages/nx/src/native/db/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ pub(super) fn initialize_db(nx_version: String, db_path: &Path) -> anyhow::Resul
create_metadata_table(&mut c, &nx_version)?;
c
}
check @ _ => {
trace!("Incompatible database because: {:?}", check);
reason => {
trace!("Incompatible database because: {:?}", reason);
trace!("Disconnecting from existing incompatible database");
c.close().map_err(|(_, error)| anyhow::Error::from(error))?;
c.close()?;
trace!("Removing existing incompatible database");
remove_file(db_path)?;

Expand Down
8 changes: 7 additions & 1 deletion packages/nx/src/native/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::native::{db::connection::NxDbConnection, hasher::hash};
use napi::bindgen_prelude::External;
use std::fs::create_dir_all;
use std::path::PathBuf;
use std::process;
use std::{mem, process};
use tracing::{trace, trace_span};

#[napi]
Expand Down Expand Up @@ -40,3 +40,9 @@ pub fn connect_to_nx_db(
Ok(External::new(c))
})
}

#[napi]
pub fn close_db_connection(mut connection: External<NxDbConnection>) -> anyhow::Result<()> {
let conn = mem::take(connection.as_mut());
conn.close()
}
2 changes: 2 additions & 0 deletions packages/nx/src/native/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ export interface CachedResult {
outputsPath: string
}

export declare export function closeDbConnection(connection: ExternalObject<NxDbConnection>): void

export declare export function connectToNxDb(cacheDir: string, nxVersion: string, dbName?: string | undefined | null): ExternalObject<NxDbConnection>

export declare export function copy(src: string, dest: string): void
Expand Down
1 change: 1 addition & 0 deletions packages/nx/src/native/native-bindings.js
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ module.exports.TaskDetails = nativeBinding.TaskDetails
module.exports.TaskHasher = nativeBinding.TaskHasher
module.exports.Watcher = nativeBinding.Watcher
module.exports.WorkspaceContext = nativeBinding.WorkspaceContext
module.exports.closeDbConnection = nativeBinding.closeDbConnection
module.exports.connectToNxDb = nativeBinding.connectToNxDb
module.exports.copy = nativeBinding.copy
module.exports.EventType = nativeBinding.EventType
Expand Down
7 changes: 6 additions & 1 deletion packages/nx/src/native/tasks/task_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ impl NxTaskHistory {
}

fn setup(&self) -> anyhow::Result<()> {
array::load_module(&self.db.conn)?;
array::load_module(
self.db
.conn
.as_ref()
.expect("Database connection should be available"),
)?;
self.db
.execute_batch(
"
Expand Down
9 changes: 8 additions & 1 deletion packages/nx/src/utils/db-connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { connectToNxDb, ExternalObject } from '../native';
import { closeDbConnection, connectToNxDb, ExternalObject } from '../native';
import { workspaceDataDirectory } from './cache-directory';
import { version as NX_VERSION } from '../../package.json';

Expand All @@ -18,6 +18,13 @@ export function getDbConnection(
return connection;
}

export function removeDbConnections() {
for (const connection of dbConnectionMap.values()) {
closeDbConnection(connection);
}
dbConnectionMap.clear();
}

function getEntryOrSet<TKey, TVal>(
map: Map<TKey, TVal>,
key: TKey,
Expand Down

0 comments on commit 89311c7

Please sign in to comment.