Skip to content

Commit

Permalink
Added support for an adjustable maximum cap on the total memory alloc…
Browse files Browse the repository at this point in the history
…ated for warning notifications from the server, ensuring the driver does not exhaust available memory resources. [Ruei Yang Huang]
  • Loading branch information
Brooke-white committed Nov 19, 2024
1 parent 97314f2 commit 9fd4e6e
Show file tree
Hide file tree
Showing 12 changed files with 310 additions and 177 deletions.
Binary file removed src/.DS_Store
Binary file not shown.
9 changes: 9 additions & 0 deletions src/main/java/com/amazon/redshift/RedshiftProperty.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,15 @@ public enum RedshiftProperty {
"Specifies size of ring buffer during fetching result set. Can be specified as specified size or percent of heap memory."),

/**
* Specifies size of buffer during fetching result set. Can be specified as specified size or
* percent of heap memory.
*/
MAX_WARNING_COUNT(
"maxwarningcount",
"100",
"Specifies the maximum number warning objects allowed to be kept in memory per connection, statement, and result set. (\"-1\" being unlimited)"),

/**
* Force one of
* <ul>
* <li>SSPI (Windows transparent single-sign-on)</li>
Expand Down
16 changes: 10 additions & 6 deletions src/main/java/com/amazon/redshift/core/QueryExecutorBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.amazon.redshift.core.v3.RedshiftRowsBlockingQueue;
import com.amazon.redshift.jdbc.AutoSave;
import com.amazon.redshift.jdbc.EscapeSyntaxCallMode;
import com.amazon.redshift.jdbc.RedshiftWarningWrapper;
import com.amazon.redshift.jdbc.PreferQueryMode;
import com.amazon.redshift.logger.LogLevel;
import com.amazon.redshift.logger.RedshiftLogger;
Expand Down Expand Up @@ -54,7 +55,7 @@ public abstract class QueryExecutorBase implements QueryExecutor {
// default value for server versions that don't report standard_conforming_strings
private boolean standardConformingStrings = false;

private SQLWarning warnings;
private RedshiftWarningWrapper warningChain;
private final ArrayList<RedshiftNotification> notifications = new ArrayList<RedshiftNotification>();

private final LruCache<Object, CachedQuery> statementCache;
Expand Down Expand Up @@ -230,10 +231,10 @@ public void sendQueryCancel() throws SQLException {
}

public synchronized void addWarning(SQLWarning newWarning) {
if (warnings == null) {
warnings = newWarning;
if (warningChain == null) {
warningChain = new RedshiftWarningWrapper(newWarning, properties);
} else {
warnings.setNextWarning(newWarning);
warningChain.appendWarning(newWarning);
}
}

Expand All @@ -254,8 +255,11 @@ public synchronized RedshiftNotification[] getNotifications() throws SQLExceptio

@Override
public synchronized SQLWarning getWarnings() {
SQLWarning chain = warnings;
warnings = null;
if (warningChain == null) {
return null;
}
SQLWarning chain = warningChain.getFirstWarning();
warningChain = null;
return chain;
}

Expand Down
26 changes: 17 additions & 9 deletions src/main/java/com/amazon/redshift/core/ResultHandlerBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.List;

import java.util.Properties;
import com.amazon.redshift.util.RedshiftException;
import com.amazon.redshift.jdbc.RedshiftWarningWrapper;
import com.amazon.redshift.core.v3.MessageLoopState;
import com.amazon.redshift.core.v3.RedshiftRowsBlockingQueue;

Expand All @@ -24,8 +26,12 @@ public class ResultHandlerBase implements ResultHandler {
private SQLException firstException;
private SQLException lastException;

private SQLWarning firstWarning;
private SQLWarning lastWarning;
private RedshiftWarningWrapper warningChain;
Properties props;

public ResultHandlerBase(Properties inProps) {
this.props = inProps;
}

@Override
public void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples,
Expand All @@ -43,12 +49,11 @@ public void secureProgress() {

@Override
public void handleWarning(SQLWarning warning) {
if (firstWarning == null) {
firstWarning = lastWarning = warning;
return;
if (warningChain == null) {
warningChain = new RedshiftWarningWrapper(warning, props);
} else {
warningChain.appendWarning(warning);
}
lastWarning.setNextException(warning);
lastWarning = warning;
}

@Override
Expand All @@ -75,7 +80,10 @@ public SQLException getException() {

@Override
public SQLWarning getWarning() {
return firstWarning;
if (warningChain == null) {
return null;
}
return warningChain.getFirstWarning();
}

@Override
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/amazon/redshift/core/SetupQueryRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.List;
import java.util.Properties;

/**
* Poor man's Statement &amp; ResultSet, used for initial queries while we're still initializing the
Expand All @@ -25,6 +26,12 @@ public class SetupQueryRunner {
private static class SimpleResultHandler extends ResultHandlerBase {
private List<Tuple> tuples;

SimpleResultHandler() {
// This class overrided the handleWarning method and ignore warnings.
// No need to handle property value
super(new Properties());
}

List<Tuple> getResults() {
return tuples;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ public void doSubprotocolBegin() throws SQLException {
if(RedshiftLogger.isEnable())
logger.log(LogLevel.DEBUG, "Issuing BEGIN before fastpath or copy call.");

ResultHandler handler = new ResultHandlerBase() {
ResultHandler handler = new ResultHandlerBase(properties) {
private boolean sawBegin = false;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class BatchResultHandler extends ResultHandlerBase {

BatchResultHandler(RedshiftStatementImpl rsStatement, Query[] queries, ParameterList[] parameterLists,
boolean expectGeneratedKeys) {
super(rsStatement.getConnectionProperties());
this.rsStatement = rsStatement;
this.queries = queries;
this.parameterLists = parameterLists;
Expand Down
34 changes: 22 additions & 12 deletions src/main/java/com/amazon/redshift/jdbc/RedshiftConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.Properties;

public class RedshiftConnectionImpl implements BaseConnection {

Expand Down Expand Up @@ -134,6 +135,7 @@ private enum ReadOnlyBehavior {

private boolean disableIsValidQuery = false;

protected Properties props;
// Default statement prepare threshold.
protected int prepareThreshold;

Expand Down Expand Up @@ -164,7 +166,7 @@ private enum ReadOnlyBehavior {
private final boolean bindStringAsVarchar;

// Current warnings; there might be more on queryExecutor too.
private SQLWarning firstWarning = null;
private RedshiftWarningWrapper warningChain;

// Timer for scheduling TimerTasks for this connection.
// Only instantiated if a task is actually scheduled.
Expand Down Expand Up @@ -243,7 +245,7 @@ public RedshiftConnectionImpl(HostSpec[] hostSpecs,
logger.log(LogLevel.DEBUG, com.amazon.redshift.util.DriverInfo.DRIVER_FULL_NAME);
logger.log(LogLevel.DEBUG, "JVM architecture is " + (RedshiftConnectionImpl.IS_64_BIT_JVM ? "64-bit" : "32-bit"));
}

this.props = info;
RedshiftProperties.evaluateProperties(info);

m_settings = new RedshiftJDBCSettings();
Expand Down Expand Up @@ -617,10 +619,10 @@ public ReplicationProtocol getReplicationProtocol() {
*/
public void addWarning(SQLWarning warn) {
// Add the warning to the chain
if (firstWarning != null) {
firstWarning.setNextWarning(warn);
if (warningChain != null) {
warningChain.appendWarning(warn);
} else {
firstWarning = warn;
warningChain = new RedshiftWarningWrapper(warn, props);
}

}
Expand Down Expand Up @@ -863,6 +865,10 @@ public TypeInfo getTypeInfo() {
return typeCache;
}

public Properties getConnectionProperties() {
return props;
}

@Override
public void addDataType(String type, String name) {
try {
Expand Down Expand Up @@ -961,20 +967,20 @@ public String nativeSQL(String sql) throws SQLException {
public synchronized SQLWarning getWarnings() throws SQLException {
checkClosed();
SQLWarning newWarnings = queryExecutor.getWarnings(); // NB: also clears them.
if (firstWarning == null) {
firstWarning = newWarnings;
if (warningChain == null) {
warningChain = new RedshiftWarningWrapper(newWarnings, props);
} else {
firstWarning.setNextWarning(newWarnings); // Chain them on.
warningChain.appendWarning(newWarnings); // Chain them on.
}

return firstWarning;
return warningChain.getFirstWarning();
}

@Override
public synchronized void clearWarnings() throws SQLException {
checkClosed();
queryExecutor.getWarnings(); // Clear and discard.
firstWarning = null;
warningChain = null;
}

public void setDatabaseMetadataCurrentDbOnly(boolean databaseMetadataCurrentDbOnly) throws SQLException {
Expand Down Expand Up @@ -1088,15 +1094,15 @@ private void executeTransactionCommand(Query query) throws SQLException {
}

try {
getQueryExecutor().execute(query, null, new TransactionCommandHandler(), 0, 0, flags);
getQueryExecutor().execute(query, null, new TransactionCommandHandler(props), 0, 0, flags);
} catch (SQLException e) {
// Don't retry composite queries as it might get partially executed
if (query.getSubqueries() != null || !queryExecutor.willHealOnRetry(e)) {
throw e;
}
query.close();
// retry
getQueryExecutor().execute(query, null, new TransactionCommandHandler(), 0, 0, flags);
getQueryExecutor().execute(query, null, new TransactionCommandHandler(props), 0, 0, flags);
}
}

Expand Down Expand Up @@ -1353,6 +1359,10 @@ public RedshiftNotification[] getNotifications(int timeoutMillis) throws SQLExce
* Handler for transaction queries.
*/
private class TransactionCommandHandler extends ResultHandlerBase {
TransactionCommandHandler(Properties inProps) {
super(inProps);
}

public void handleCompletion() throws SQLException {
SQLWarning warning = getWarning();
if (warning != null) {
Expand Down
37 changes: 22 additions & 15 deletions src/main/java/com/amazon/redshift/jdbc/RedshiftResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.Properties;

public class RedshiftResultSet implements ResultSet, com.amazon.redshift.RedshiftRefCursorResultSet {

Expand Down Expand Up @@ -118,7 +119,7 @@ public class RedshiftResultSet implements ResultSet, com.amazon.redshift.Redshif
protected int currentRow = -1; // Index into 'rows' of our currrent row (0-based)
protected int rowOffset; // Offset of row 0 in the actual resultset
protected Tuple thisRow; // copy of the current result row
protected SQLWarning warnings = null; // The warning chain
protected RedshiftWarningWrapper warningChain; // The warning chain
/**
* True if the last obtained column value was SQL NULL as specified by {@link #wasNull}. The value
* is always updated by the {@link #checkResultSet} method.
Expand Down Expand Up @@ -902,8 +903,9 @@ public boolean isLast() throws SQLException {
}
}

Properties inProps = ((RedshiftConnectionImpl) connection).getConnectionProperties();
// Do the actual fetch.
connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows, 0);
connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(inProps), fetchRows, 0);

// Now prepend our one saved row and move to it.
rows.add(0, thisRow);
Expand Down Expand Up @@ -1969,12 +1971,13 @@ public class CursorResultHandler extends ResultHandlerBase {

int resultsettype;

public CursorResultHandler() {
this(0);
public CursorResultHandler(Properties inProps) {
this(0, inProps);
}

public CursorResultHandler(int resultsettype) {
this.resultsettype = resultsettype;
public CursorResultHandler(int resultsettype, Properties inProps) {
super(inProps);
this.resultsettype = resultsettype;
}

@Override
Expand Down Expand Up @@ -2113,9 +2116,9 @@ public boolean next() throws SQLException {
fetchRows = maxRows - rowOffset;
}
}
Properties inProps = ((RedshiftConnectionImpl) connection).getConnectionProperties();
// Execute the fetch and update this resultset.
connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows, 0);
connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(inProps), fetchRows, 0);

currentRow = 0;

Expand Down Expand Up @@ -2163,7 +2166,8 @@ private boolean fetchMoreInQueueFromSuspendedPortal() throws SQLException {
((RedshiftStatementImpl)statement).updateStatementCancleState(StatementCancelState.IDLE, StatementCancelState.IN_QUERY);

// Execute the fetch and update this resultset.
connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(resultsettype), fetchRows, (int)rowCount);
Properties inProps = ((RedshiftConnectionImpl) connection).getConnectionProperties();
connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(resultsettype, inProps), fetchRows, (int)rowCount);

// We should get a new queue
if (queueRows != null) {
Expand Down Expand Up @@ -3088,19 +3092,22 @@ public InputStream getBinaryStream(String columnName) throws SQLException {

public SQLWarning getWarnings() throws SQLException {
checkClosed();
return warnings;
if (warningChain == null) {
return null;
}
return warningChain.getFirstWarning();
}

public void clearWarnings() throws SQLException {
checkClosed();
warnings = null;
warningChain = null;
}

protected void addWarning(SQLWarning warnings) {
if (this.warnings != null) {
this.warnings.setNextWarning(warnings);
protected void addWarning(SQLWarning warnings) throws RedshiftException {
if (this.warningChain != null) {
this.warningChain.appendWarning(warnings);
} else {
this.warnings = warnings;
this.warningChain = new RedshiftWarningWrapper(warnings, ((RedshiftConnectionImpl) this.connection).getConnectionProperties());
}
}

Expand Down
Loading

0 comments on commit 9fd4e6e

Please sign in to comment.