Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.4.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
ewencp committed Sep 18, 2017
2 parents 8d8ee5d + 2180b7a commit 689057b
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 305 deletions.
91 changes: 0 additions & 91 deletions checkstyle/checkstyle.xml

This file was deleted.

14 changes: 14 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?xml version="1.0"?>
<!DOCTYPE suppressions PUBLIC
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">

<suppressions>

<!-- Intentionally high coupling to pull together config classes -->
<suppress
checks="ClassDataAbstractionCoupling"
files="(S3OutputStream|S3SinkConnectorConfig).java"
/>

</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
CREDENTIALS_PROVIDER_CLASS_DEFAULT,
new CredentialsProviderValidator(),
Importance.LOW,
"Credentials provider or provider chain to use for authentication to AWS. By default the "
+ " connector uses 'DefaultAWSCredentialsProviderChain'.",
"Credentials provider or provider chain to use for authentication to AWS"
+ ". By default the connector uses 'DefaultAWSCredentialsProviderChain'.",
group,
++orderInGroup,
Width.LONG,
Expand Down Expand Up @@ -150,8 +150,9 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
Type.STRING,
AVRO_CODEC_DEFAULT,
Importance.LOW,
"The Avro compression codec to be used for output files. Available values: null, deflate, "
+ "snappy and bzip2 (codec source is org.apache.avro.file.CodecFactory)",
"The Avro compression codec to be used for output files. Available "
+ "values: null, deflate, snappy and bzip2 (codec source is "
+ "org.apache.avro.file.CodecFactory)",
group,
++orderInGroup,
Width.LONG,
Expand Down Expand Up @@ -200,7 +201,7 @@ public String getBucketName() {
return getString(S3_BUCKET_CONFIG);
}

public String getSSEA() {
public String getSsea() {
return getString(SSEA_CONFIG);
}

Expand All @@ -214,7 +215,10 @@ public AWSCredentialsProvider getCredentialsProvider() {
return ((Class<? extends AWSCredentialsProvider>)
getClass(S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance();
} catch (IllegalAccessException | InstantiationException e) {
throw new ConnectException("Invalid class for: " + S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, e);
throw new ConnectException(
"Invalid class for: " + S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG,
e
);
}
}

Expand Down Expand Up @@ -265,10 +269,18 @@ public void ensureValid(String name, Object value) {
}
Number number = (Number) value;
if (number.longValue() < min) {
throw new ConfigException(name, value, "Part size must be at least: " + min + " bytes (5MB)");
throw new ConfigException(
name,
value,
"Part size must be at least: " + min + " bytes (5MB)"
);
}
if (number.longValue() > max) {
throw new ConfigException(name, value, "Part size must be no more: " + Integer.MAX_VALUE + " bytes (~2GB)");
throw new ConfigException(
name,
value,
"Part size must be no more: " + Integer.MAX_VALUE + " bytes (~2GB)"
);
}
}

Expand All @@ -294,7 +306,11 @@ private static class RegionValidator implements ConfigDef.Validator {
public void ensureValid(String name, Object region) {
String regionStr = ((String) region).toLowerCase().trim();
if (RegionUtils.getRegion(regionStr) == null) {
throw new ConfigException(name, region, "Value must be one of: " + Utils.join(RegionUtils.getRegions(), ", "));
throw new ConfigException(
name,
region,
"Value must be one of: " + Utils.join(RegionUtils.getRegions(), ", ")
);
}
}

Expand All @@ -311,7 +327,11 @@ public void ensureValid(String name, Object provider) {
&& AWSCredentialsProvider.class.isAssignableFrom((Class<?>) provider)) {
return;
}
throw new ConfigException(name, provider, "Class must extend: " + AWSCredentialsProvider.class);
throw new ConfigException(
name,
provider,
"Class must extend: " + AWSCredentialsProvider.class
);
}

@Override
Expand All @@ -330,7 +350,7 @@ public static ConfigDef getConfig() {

ConfigDef visible = new ConfigDef();
for (ConfigDef.ConfigKey key : everything.values()) {
if(!blacklist.contains(key.name)) {
if (!blacklist.contains(key.name)) {
visible.define(key);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,12 @@ public void start(Map<String, String> props) {
Class<? extends S3Storage> storageClass =
(Class<? extends S3Storage>)
connectorConfig.getClass(StorageCommonConfig.STORAGE_CLASS_CONFIG);
storage = StorageFactory.createStorage(storageClass, S3SinkConnectorConfig.class, connectorConfig, url);
storage = StorageFactory.createStorage(
storageClass,
S3SinkConnectorConfig.class,
connectorConfig,
url
);
if (!storage.bucketExists()) {
throw new DataException("No-existent S3 bucket: " + connectorConfig.getBucketName());
}
Expand All @@ -108,8 +113,8 @@ public void start(Map<String, String> props) {

open(context.assignment());
log.info("Started S3 connector task with assigned partitions: {}", assignment);
} catch (ClassNotFoundException | IllegalAccessException | InstantiationException | InvocationTargetException
| NoSuchMethodException e) {
} catch (ClassNotFoundException | IllegalAccessException | InstantiationException
| InvocationTargetException | NoSuchMethodException e) {
throw new ConnectException("Reflection exception: ", e);
} catch (AmazonClientException e) {
throw new ConnectException(e);
Expand All @@ -123,21 +128,30 @@ public String version() {

@Override
public void open(Collection<TopicPartition> partitions) {
// assignment should be empty, either because this is the initial call or because it follows a call to "close".
// assignment should be empty, either because this is the initial call or because it follows
// a call to "close".
assignment.addAll(partitions);
for (TopicPartition tp : assignment) {
TopicPartitionWriter writer =
new TopicPartitionWriter(tp, writerProvider, partitioner, connectorConfig, context, time);
TopicPartitionWriter writer = new TopicPartitionWriter(
tp,
writerProvider,
partitioner,
connectorConfig,
context,
time
);
topicPartitionWriters.put(tp, writer);
}
}

@SuppressWarnings("unchecked")
private Format<S3SinkConnectorConfig, String> newFormat() throws ClassNotFoundException, IllegalAccessException,
InstantiationException, InvocationTargetException,
NoSuchMethodException {
private Format<S3SinkConnectorConfig, String> newFormat()
throws ClassNotFoundException, IllegalAccessException, InstantiationException,
InvocationTargetException, NoSuchMethodException {
Class<Format<S3SinkConnectorConfig, String>> formatClass =
(Class<Format<S3SinkConnectorConfig, String>>) connectorConfig.getClass(S3SinkConnectorConfig.FORMAT_CLASS_CONFIG);
(Class<Format<S3SinkConnectorConfig, String>>) connectorConfig.getClass(
S3SinkConnectorConfig.FORMAT_CLASS_CONFIG
);
return formatClass.getConstructor(S3Storage.class).newInstance(storage);
}

Expand All @@ -155,7 +169,8 @@ private Partitioner<FieldSchema> newPartitioner(S3SinkConnectorConfig config)
Map<String, ?> originals = config.originals();
for (String originalKey : originals.keySet()) {
if (!plainValues.containsKey(originalKey)) {
// pass any additional configs down to the partitioner so that custom partitioners can have their own configs
// pass any additional configs down to the partitioner so that custom partitioners can
// have their own configs
plainValues.put(originalKey, originals.get(originalKey));
}
}
Expand Down Expand Up @@ -187,7 +202,9 @@ public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
}

@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> offsets
) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition tp : assignment) {
Long offset = topicPartitionWriters.get(tp).getOffsetToCommitAndReset();
Expand Down
Loading

0 comments on commit 689057b

Please sign in to comment.