Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose configuration options on the session context #87

Merged
merged 2 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public final long getPointer() {

abstract void doClose(long pointer) throws Exception;

// Ensure native library is loaded before any proxy object is used
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed if creating a SessionConfig before SessionContexts is used

static {
JNILoader.load();
}

@Override
public final void close() throws Exception {
if (closed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.apache.arrow.datafusion;

/** Configures options related to query execution */
@SuppressWarnings("UnusedReturnValue")
public class ExecutionOptions {
private final SessionConfig config;

ExecutionOptions(SessionConfig config) {
this.config = config;
}

/**
* Get execution options related to reading Parquet data
*
* @return {@link ParquetOptions} instance for this config
*/
public ParquetOptions parquet() {
return new ParquetOptions(config);
}

/**
* Get the batch size
*
* @return batch size
*/
public long batchSize() {
return SessionConfig.getExecutionOptionsBatchSize(config.getPointer());
}

/**
* Set the size of batches to use when creating new data batches
*
* @param batchSize the batch size to set
* @return the modified {@link ExecutionOptions} instance
*/
public ExecutionOptions withBatchSize(long batchSize) {
SessionConfig.setExecutionOptionsBatchSize(config.getPointer(), batchSize);
return this;
}

/**
* Get whether batch coalescing is enabled
*
* @return whether batch coalescing is enabled
*/
public boolean coalesceBatches() {
return SessionConfig.getExecutionOptionsCoalesceBatches(config.getPointer());
}

/**
* Set whether to enable batch coalescing
*
* @param enabled whether to enable batch coalescing
* @return the modified {@link ExecutionOptions} instance
*/
public ExecutionOptions withCoalesceBatches(boolean enabled) {
SessionConfig.setExecutionOptionsCoalesceBatches(config.getPointer(), enabled);
return this;
}

/**
* Get whether statistics collection is enabled
*
* @return whether statistics collection is enabled
*/
public boolean collectStatistics() {
return SessionConfig.getExecutionOptionsCollectStatistics(config.getPointer());
}

/**
* Set whether to enable statistics collection
*
* @param enabled whether to enable statistics collection
* @return the modified {@link ExecutionOptions} instance
*/
public ExecutionOptions withCollectStatistics(boolean enabled) {
SessionConfig.setExecutionOptionsCollectStatistics(config.getPointer(), enabled);
return this;
}

/**
* Get the target number of partitions
*
* @return number of partitions
*/
public long targetPartitions() {
return SessionConfig.getExecutionOptionsTargetPartitions(config.getPointer());
}

/**
* Set the target number of partitions
*
* @param targetPartitions the number of partitions to set
* @return the modified {@link ExecutionOptions} instance
*/
public ExecutionOptions withTargetPartitions(long targetPartitions) {
SessionConfig.setExecutionOptionsTargetPartitions(config.getPointer(), targetPartitions);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package org.apache.arrow.datafusion;

import java.util.Optional;

/** Configures options specific to reading Parquet data */
@SuppressWarnings("UnusedReturnValue")
public class ParquetOptions {
private final SessionConfig config;

ParquetOptions(SessionConfig config) {
this.config = config;
}

/**
* Get whether parquet data page level metadata (Page Index) statistics are used
*
* @return whether using the page index is enabled
*/
public boolean enablePageIndex() {
return SessionConfig.getParquetOptionsEnablePageIndex(config.getPointer());
}

/**
* Set whether to use parquet data page level metadata (Page Index) statistics to reduce the
* number of rows decoded.
*
* @param enabled whether using the page index is enabled
* @return the modified {@link ParquetOptions} instance
*/
public ParquetOptions withEnablePageIndex(boolean enabled) {
SessionConfig.setParquetOptionsEnablePageIndex(config.getPointer(), enabled);
return this;
}

/**
* Get whether pruning is enabled, meaning reading row groups will be skipped based on metadata
*
* @return whether pruning is enabled
*/
public boolean pruning() {
return SessionConfig.getParquetOptionsPruning(config.getPointer());
}

/**
* Set whether pruning is enabled, meaning reading row groups will be skipped based on metadata
*
* @param enabled whether to enable pruning
* @return the modified {@link ParquetOptions} instance
*/
public ParquetOptions withPruning(boolean enabled) {
SessionConfig.setParquetOptionsPruning(config.getPointer(), enabled);
return this;
}

/**
* Get whether file metadata is skipped, to avoid schema conflicts
*
* @return whether metadata is skipped
*/
public boolean skipMetadata() {
return SessionConfig.getParquetOptionsSkipMetadata(config.getPointer());
}

/**
* Set whether file metadata is skipped, to avoid schema conflicts
*
* @param enabled whether to skip metadata
* @return the modified {@link ParquetOptions} instance
*/
public ParquetOptions withSkipMetadata(boolean enabled) {
SessionConfig.setParquetOptionsSkipMetadata(config.getPointer(), enabled);
return this;
}

/**
* Get the metadata size hint
*
* @return metadata size hint value
*/
public Optional<Long> metadataSizeHint() {
long sizeHint = SessionConfig.getParquetOptionsMetadataSizeHint(config.getPointer());
return sizeHint < 0 ? Optional.empty() : Optional.of(sizeHint);
}

/**
* Set the metadata size hint, which is used to attempt to read the full metadata at once rather
* than needing one read to get the metadata size and then a second read for the metadata itself.
*
* @param metadataSizeHint the metadata size hint
* @return the modified {@link ParquetOptions} instance
*/
public ParquetOptions withMetadataSizeHint(Optional<Long> metadataSizeHint) {
long value = -1L;
if (metadataSizeHint.isPresent()) {
value = metadataSizeHint.get();
if (value < 0) {
throw new RuntimeException("metadataSizeHint cannot be negative");
}
}
SessionConfig.setParquetOptionsMetadataSizeHint(config.getPointer(), value);
return this;
}

/**
* Get whether filter pushdown is enabled, so filters are applied during parquet decoding
*
* @return whether filter pushdown is enabled
*/
public boolean pushdownFilters() {
return SessionConfig.getParquetOptionsPushdownFilters(config.getPointer());
}

/**
* Set whether filter pushdown is enabled, so filters are applied during parquet decoding
*
* @param enabled whether to pushdown filters
* @return the modified {@link ParquetOptions} instance
*/
public ParquetOptions withPushdownFilters(boolean enabled) {
SessionConfig.setParquetOptionsPushdownFilters(config.getPointer(), enabled);
return this;
}

/**
* Get whether filter reordering is enabled to minimize evaluation cost
*
* @return whether filter reordering is enabled
*/
public boolean reorderFilters() {
return SessionConfig.getParquetOptionsReorderFilters(config.getPointer());
}

/**
* Set whether filter reordering is enabled to minimize evaluation cost
*
* @param enabled whether to reorder filters
* @return the modified {@link ParquetOptions} instance
*/
public ParquetOptions withReorderFilters(boolean enabled) {
SessionConfig.setParquetOptionsReorderFilters(config.getPointer(), enabled);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.apache.arrow.datafusion;

import java.util.function.Consumer;

/** Configuration for creating a {@link SessionContext} using {@link SessionContexts#withConfig} */
public class SessionConfig extends AbstractProxy implements AutoCloseable {
/** Create a new default {@link SessionConfig} */
public SessionConfig() {
super(create());
}

/**
* Get options related to query execution
*
* @return {@link ExecutionOptions} instance for this config
*/
public ExecutionOptions executionOptions() {
return new ExecutionOptions(this);
}

/**
* Get options specific to parsing SQL queries
*
* @return {@link SqlParserOptions} instance for this config
*/
public SqlParserOptions sqlParserOptions() {
return new SqlParserOptions(this);
}

/**
* Modify this session configuration and then return it, to simplify use in a try-with-resources
* statement
*
* @param configurationCallback Callback used to update the configuration
* @return This {@link SessionConfig} instance after being updated
*/
public SessionConfig withConfiguration(Consumer<SessionConfig> configurationCallback) {
configurationCallback.accept(this);
return this;
}

@Override
void doClose(long pointer) {
destroy(pointer);
}

private static native long create();

private static native void destroy(long pointer);

// ExecutionOptions native methods

static native long getExecutionOptionsBatchSize(long pointer);

static native void setExecutionOptionsBatchSize(long pointer, long batchSize);

static native boolean getExecutionOptionsCoalesceBatches(long pointer);

static native void setExecutionOptionsCoalesceBatches(long pointer, boolean enabled);

static native boolean getExecutionOptionsCollectStatistics(long pointer);

static native void setExecutionOptionsCollectStatistics(long pointer, boolean enabled);

static native long getExecutionOptionsTargetPartitions(long pointer);

static native void setExecutionOptionsTargetPartitions(long pointer, long batchSize);

// ParquetOptions native methods

static native boolean getParquetOptionsEnablePageIndex(long pointer);

static native void setParquetOptionsEnablePageIndex(long pointer, boolean enabled);

static native boolean getParquetOptionsPruning(long pointer);

static native void setParquetOptionsPruning(long pointer, boolean enabled);

static native boolean getParquetOptionsSkipMetadata(long pointer);

static native void setParquetOptionsSkipMetadata(long pointer, boolean enabled);

static native long getParquetOptionsMetadataSizeHint(long pointer);

static native void setParquetOptionsMetadataSizeHint(long pointer, long value);

static native boolean getParquetOptionsPushdownFilters(long pointer);

static native void setParquetOptionsPushdownFilters(long pointer, boolean enabled);

static native boolean getParquetOptionsReorderFilters(long pointer);

static native void setParquetOptionsReorderFilters(long pointer, boolean enabled);

// SqlParserOptions native methods

static native boolean getSqlParserOptionsParseFloatAsDecimal(long pointer);

static native void setSqlParserOptionsParseFloatAsDecimal(long pointer, boolean enabled);

static native boolean getSqlParserOptionsEnableIdentNormalization(long pointer);

static native void setSqlParserOptionsEnableIdentNormalization(long pointer, boolean enabled);

static native String getSqlParserOptionsDialect(long pointer);

static native void setSqlParserOptionsDialect(long pointer, String dialect);
}
Loading