Skip to content

Commit

Permalink
Refactor EtcdRepository watch consume (#29738)
Browse files Browse the repository at this point in the history
* Refactor EtcdRepository watch consume

* Fix checkstyle

* Fix spotless
  • Loading branch information
zhaojinchao95 authored Jan 16, 2024
1 parent 4ad497a commit a308ac7
Showing 1 changed file with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
Expand All @@ -33,6 +34,7 @@
import io.etcd.jetcd.watch.WatchEvent;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties;
Expand All @@ -44,14 +46,20 @@

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
* Registry repository of ETCD.
*/
@Slf4j
public final class EtcdRepository implements ClusterPersistRepository {

private static final ExecutorService EVENT_LISTENER_EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Etcd-EventListener-%d").build());

private Client client;

private EtcdProperties etcdProps;
Expand Down Expand Up @@ -148,8 +156,7 @@ public void watch(final String key, final DataChangedEventListener dataChangedEv
for (WatchEvent each : response.getEvents()) {
Type type = getEventChangedType(each);
if (Type.IGNORED != type) {
dataChangedEventListener.onChange(new DataChangedEvent(each.getKeyValue().getKey().toString(StandardCharsets.UTF_8),
each.getKeyValue().getValue().toString(StandardCharsets.UTF_8), type));
dispatchEvent(dataChangedEventListener, each, type);
}
}
});
Expand All @@ -173,9 +180,19 @@ private Type getEventChangedType(final WatchEvent event) {
}
}

private void dispatchEvent(final DataChangedEventListener dataChangedEventListener, final WatchEvent event, final Type type) {
CompletableFuture.runAsync(() -> dataChangedEventListener.onChange(new DataChangedEvent(event.getKeyValue().getKey().toString(StandardCharsets.UTF_8),
event.getKeyValue().getValue().toString(StandardCharsets.UTF_8), type)), EVENT_LISTENER_EXECUTOR).whenComplete((unused, throwable) -> {
if (null != throwable) {
log.error("Dispatch event failed", throwable);
}
});
}

@Override
public void close() {
client.close();
EVENT_LISTENER_EXECUTOR.shutdown();
}

@Override
Expand Down

0 comments on commit a308ac7

Please sign in to comment.