Skip to content

Commit

Permalink
Support for Avro files.
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrczarnas committed Jan 11, 2025
1 parent 27986c6 commit 08a3c71
Show file tree
Hide file tree
Showing 19 changed files with 323 additions and 8 deletions.
8 changes: 4 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# 1.10.1

* Small UI fixes to open pages directly from an URL.
* Fix problems when installing on Windows using pip, when Python was installed from Windows Store and uses a deeply nested folder structure
# 1.11.0

* Fixing problems when importing files from a non-existing folder
* Upgrade DuckDB to 1.1.3
* Support Avro files
2 changes: 1 addition & 1 deletion dqops/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<tablesaw.version>0.43.1</tablesaw.version>
<license.licenseName>apache_v2</license.licenseName>
<npm.build.script>build</npm.build.script>
<duckdb.version>1.1.2</duckdb.version>
<duckdb.version>1.1.3</duckdb.version>
</properties>

<licenses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ const DatabaseConnection = ({
nameOfDatabase === 'CSV' ||
nameOfDatabase === 'Parquet' ||
nameOfDatabase === 'JSON' ||
nameOfDatabase === 'Avro' ||
nameOfDatabase === 'Iceberg' ||
nameOfDatabase === 'Delta Lake'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import {
DuckdbParametersSpecFilesFormatTypeEnum,
IcebergFileFormatSpec,
JsonFileFormatSpec,
ParquetFileFormatSpec
ParquetFileFormatSpec,
AvroFileFormatSpec
} from '../../api';
import SectionWrapper from '../Dashboard/SectionWrapper';
import Select from '../Select';
import CsvFormatConfiguration from './FormatsConfiguration/CsvFormatConfiguration';
import IcebergFormatConfiguration from './FormatsConfiguration/IcebergFormatConfiguration';
import JsonFormatConfiguration from './FormatsConfiguration/JsonFormatConfiguration';
import ParquetFormatConfiguration from './FormatsConfiguration/ParquetFormatConfiguration';
import AvroFormatConfiguration from './FormatsConfiguration/AvroFormatConfiguration';
import { TConfiguration } from './TConfiguration';

type TFileFormatConfigurationProps = {
Expand All @@ -37,6 +39,10 @@ const sourceFilesTypeOptions = [
label: 'Parquet',
value: DuckdbParametersSpecFilesFormatTypeEnum.parquet
},
{
label: 'Avro',
value: DuckdbParametersSpecFilesFormatTypeEnum.avro
},
{
label: 'Iceberg',
value: DuckdbParametersSpecFilesFormatTypeEnum.iceberg
Expand Down Expand Up @@ -76,6 +82,12 @@ export default function FileFormatConfiguration({
return fileFormatType === DuckdbParametersSpecFilesFormatTypeEnum.parquet;
}

function isAvroFileFormatSpec(
config: TConfiguration
): config is AvroFileFormatSpec {
return fileFormatType === DuckdbParametersSpecFilesFormatTypeEnum.avro;
}

function isIcebergFileFormatSpec(
config: TConfiguration
): config is IcebergFileFormatSpec {
Expand Down Expand Up @@ -108,6 +120,14 @@ export default function FileFormatConfiguration({
/>
) : null;
}
case DuckdbParametersSpecFilesFormatTypeEnum.avro: {
return isAvroFileFormatSpec(configuration) ? (
<AvroFormatConfiguration
configuration={configuration}
onChangeConfiguration={onChangeConfiguration}
/>
) : null;
}
case DuckdbParametersSpecFilesFormatTypeEnum.iceberg: {
return isIcebergFileFormatSpec(configuration) ? (
<IcebergFormatConfiguration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import React, { useMemo } from 'react';
import { AvroFileFormatSpec } from '../../../api';
import { TConfigurationItemRowBoolean } from './RowItem/TConfigurationItemRowBoolean';
import FormatConfigurationRenderer from '../FormatConfigurationRenderer';

type TIcebergConfigurationProps = {
configuration: AvroFileFormatSpec;
onChangeConfiguration: (params: Partial<AvroFileFormatSpec>) => void;
};

export default function IcebergFormatConfiguration({
configuration,
onChangeConfiguration
}: TIcebergConfigurationProps) {

const avroConfigurationBooleans: TConfigurationItemRowBoolean[] =
useMemo(() => {
return [
{
label: 'Filename',
value: configuration?.filename,
onChange: (value) =>
onChangeConfiguration({ filename: value })
}
];
}, [configuration]);

return (
<FormatConfigurationRenderer
configurationStrings={[]}
configurationBooleans={avroConfigurationBooleans}
type="Avro"
/>
);
}
2 changes: 2 additions & 0 deletions dqops/src/main/frontend/src/components/SvgIcon/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { ReactComponent as CommentSvg } from './svg/comment.svg';
import { ReactComponent as ConfigurationSvg } from './svg/configuration.svg';
import { ReactComponent as CopyTextSvg } from './svg/copy-text.svg';
import { ReactComponent as CsvSvg } from './svg/csv-icon.svg';
import { ReactComponent as AvroSvg } from './svg/avro-icon.svg';
import { ReactComponent as DashboardsSvg } from './svg/dashboards.svg';
import { ReactComponent as DataDictionarySvg } from './svg/data-dictionary.svg';
import { ReactComponent as DataSourcesSvg } from './svg/data_sources.svg';
Expand Down Expand Up @@ -245,6 +246,7 @@ const iconsMap: any = {
datadictionary: DataDictionarySvg,
duckdb: DuckdbSvg,
csv: CsvSvg,
avro: AvroSvg,
json: JsonSvg,
parquet: ParquetSvg,
comment: CommentSvg,
Expand Down
20 changes: 20 additions & 0 deletions dqops/src/main/frontend/src/components/SvgIcon/svg/avro-icon.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions dqops/src/main/frontend/src/pages/CreateConnection/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ const CreateConnection = () => {
case 'Parquet':
fileFormat = DuckdbParametersSpecFilesFormatTypeEnum.parquet;
break;
case 'Avro':
fileFormat = DuckdbParametersSpecFilesFormatTypeEnum.avro;
break;
case 'Iceberg':
fileFormat = DuckdbParametersSpecFilesFormatTypeEnum.iceberg;
break;
Expand Down
6 changes: 6 additions & 0 deletions dqops/src/main/frontend/src/shared/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,12 @@ export const databaseOptions: IDatabaseOption[] = [
iconName: 'csv',
displayName: 'CSV'
},
{
type: ConnectionModelProviderTypeEnum.duckdb,
name: 'Avro',
iconName: 'avro',
displayName: 'Avro'
},
{
type: ConnectionModelProviderTypeEnum.databricks,
name: 'Databricks',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.dqops.metadata.id.ChildHierarchyNodeFieldMap;
import com.dqops.metadata.id.ChildHierarchyNodeFieldMapImpl;
import com.dqops.metadata.sources.BaseProviderParametersSpec;
import com.dqops.metadata.sources.fileformat.avro.AvroFileFormatSpec;
import com.dqops.metadata.sources.fileformat.csv.CsvFileFormatSpec;
import com.dqops.metadata.sources.fileformat.deltalake.DeltaLakeFileFormatSpec;
import com.dqops.metadata.sources.fileformat.iceberg.IcebergFileFormatSpec;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class DuckdbParametersSpec extends BaseProviderParametersSpec
put("csv", o -> o.csv);
put("json", o -> o.json);
put("parquet", o -> o.parquet);
put("avro", o -> o.avro);
put("iceberg", o -> o.iceberg);
put("delta_lake", o -> o.deltaLake);
}
Expand Down Expand Up @@ -101,6 +103,11 @@ public class DuckdbParametersSpec extends BaseProviderParametersSpec
@JsonSerialize(using = IgnoreEmptyYamlSerializer.class)
private ParquetFileFormatSpec parquet;

@JsonPropertyDescription("Avro file format specification.")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonSerialize(using = IgnoreEmptyYamlSerializer.class)
private AvroFileFormatSpec avro;

@JsonPropertyDescription("Iceberg file format specification.")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonSerialize(using = IgnoreEmptyYamlSerializer.class)
Expand Down Expand Up @@ -281,6 +288,24 @@ public void setParquet(ParquetFileFormatSpec parquet) {
propagateHierarchyIdToField(parquet, "parquet");
}

/**
* Returns the avro file format specification.
* @return Avro file format specification.
*/
public AvroFileFormatSpec getAvro() {
return avro;
}

/**
* Sets the avro file format specification.
* @param avro Avro file format specification.
*/
public void setAvro(AvroFileFormatSpec avro) {
setDirtyIf(!Objects.equals(this.avro, avro));
this.avro = avro;
propagateHierarchyIdToField(avro, "avro");
}

/**
* Returns the Iceberg table format specification.
* @return Iceberg table format specification.
Expand Down Expand Up @@ -551,6 +576,7 @@ public boolean isSetHivePartitioning(){
case csv: return getCsv() != null && getCsv().getHivePartitioning() != null && getCsv().getHivePartitioning();
case json: return getJson() != null && getJson().getHivePartitioning() != null && getJson().getHivePartitioning();
case parquet: return getParquet() != null && getParquet().getHivePartitioning() != null && getParquet().getHivePartitioning();
case avro: return false; // not supported by DuckDB
}
}
return false;
Expand Down Expand Up @@ -588,6 +614,8 @@ public String getFullExtension(){
return fileTypeExtension + (compressionExtension == null ? "" : compressionExtension);
}
}
// avro does not support compression

return fileTypeExtension;
}

Expand All @@ -604,6 +632,7 @@ public boolean isFormatSetForType(){
case csv: return this.getCsv() != null;
case json: return this.getJson() != null;
case parquet: return this.getParquet() != null;
case avro: return this.getAvro() != null;
case iceberg: return this.getIceberg() != null;
case delta_lake: return this.getDeltaLake() != null;
default: throw new RuntimeException("The file format is not supported : " + filesFormatType);
Expand Down Expand Up @@ -681,6 +710,9 @@ public DuckdbParametersSpec expandAndTrim(SecretValueProvider secretValueProvide
if(cloned.json != null){
cloned.json = cloned.json.expandAndTrim(secretValueProvider, lookupContext);
}
if(cloned.avro != null){
cloned.avro = cloned.avro.expandAndTrim(secretValueProvider, lookupContext);
}
cloned.user = secretValueProvider.expandValue(cloned.user, lookupContext);
cloned.password = secretValueProvider.expandValue(cloned.password, lookupContext);
cloned.region = secretValueProvider.expandValue(cloned.region, lookupContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ private List<String> getAvailableExtensions() {
"aws",
"azure",
"iceberg",
"delta"
"delta",
"avro"
);
}

Expand All @@ -293,6 +294,10 @@ private void registerExtensions() {
availableExtensionList.stream().forEach(extensionName -> {
try {
String installExtensionQuery = "INSTALL " + extensionName;
if (Objects.equals(extensionName, "avro")) {
installExtensionQuery += " FROM community"; // https://duckdb.org/2024/12/09/duckdb-avro-extension.html
}

this.executeCommand(installExtensionQuery, JobCancellationToken.createDummyJobCancellationToken());
String loadExtensionQuery = "LOAD " + extensionName;
this.executeCommand(loadExtensionQuery, JobCancellationToken.createDummyJobCancellationToken());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ public enum DuckdbFilesFormatType {
@JsonProperty("parquet")
parquet,

@JsonProperty("avro")
avro,

@JsonProperty("iceberg")
iceberg,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import com.dqops.metadata.sources.fileformat.FileFormatSpec;
import com.dqops.metadata.sources.fileformat.FilePathListSpec;
import com.dqops.metadata.sources.fileformat.ParquetFileFormatSpec;
import com.dqops.metadata.sources.fileformat.avro.AvroFileFormatSpec;
import com.dqops.metadata.sources.fileformat.csv.CsvFileFormatSpec;
import com.dqops.metadata.sources.fileformat.deltalake.DeltaLakeFileFormatSpec;
import com.dqops.metadata.sources.fileformat.iceberg.IcebergFileFormatSpec;
Expand Down Expand Up @@ -1032,4 +1033,12 @@ public interface HierarchyNodeResultVisitor<P, R> {
* @return Accept's result.
*/
R accept(ConnectionSimilarityIndexListImpl connectionSimilarityIndexWrappers, P parameter);

/**
* Accepts an Avro file configuration settings.
* @param avroFileFormatSpec Avro connection settings.
* @param parameter Additional visitor's parameter.
* @return Accept's result.
*/
R accept(AvroFileFormatSpec avroFileFormatSpec, P parameter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import com.dqops.metadata.sources.fileformat.FileFormatSpec;
import com.dqops.metadata.sources.fileformat.FilePathListSpec;
import com.dqops.metadata.sources.fileformat.ParquetFileFormatSpec;
import com.dqops.metadata.sources.fileformat.avro.AvroFileFormatSpec;
import com.dqops.metadata.sources.fileformat.csv.CsvFileFormatSpec;
import com.dqops.metadata.sources.fileformat.deltalake.DeltaLakeFileFormatSpec;
import com.dqops.metadata.sources.fileformat.iceberg.IcebergFileFormatSpec;
Expand Down Expand Up @@ -1380,4 +1381,16 @@ public TreeNodeTraversalResult accept(ConnectionSimilarityIndexWrapperImpl conne
public TreeNodeTraversalResult accept(ConnectionSimilarityIndexListImpl connectionSimilarityIndexWrappers, T parameter) {
return TreeNodeTraversalResult.TRAVERSE_CHILDREN;
}

/**
* Accepts an Avro file configuration settings.
*
* @param avroFileFormatSpec Avro connection settings.
* @param parameter Additional visitor's parameter.
* @return Accept's result.
*/
@Override
public TreeNodeTraversalResult accept(AvroFileFormatSpec avroFileFormatSpec, T parameter) {
return TreeNodeTraversalResult.TRAVERSE_CHILDREN;
}
}
Loading

0 comments on commit 08a3c71

Please sign in to comment.