Skip to content

Commit

Permalink
Move data source state manager to StateContext (#31439)
Browse files Browse the repository at this point in the history
* Move data source state manager to StateContext

* Move data source state manager to StateContext

* Move data source state manager to StateContext

* Move data source state manager to StateContext

* Move data source state manager to StateContext
  • Loading branch information
menghaoranss authored May 31, 2024
1 parent 1445d2f commit e383088
Show file tree
Hide file tree
Showing 31 changed files with 155 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
Expand Down Expand Up @@ -76,6 +77,7 @@ void assertExportWithContextManager() {

private ContextManager mockContextManager() {
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
when(database.getResourceMetaData()).thenReturn(mock(ResourceMetaData.class));
when(database.getResourceMetaData().getStorageUnits()).thenReturn(Collections.singletonMap("ds_0", mock(StorageUnit.class)));
when(database.getProtocolType()).thenReturn(TypedSPILoader.getService(DatabaseType.class, "FIXTURE"));
ShardingSphereMetaData metaData = mock(ShardingSphereMetaData.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,12 @@
import org.apache.shardingsphere.agent.plugin.metrics.core.config.MetricCollectorType;
import org.apache.shardingsphere.agent.plugin.metrics.core.config.MetricConfiguration;
import org.apache.shardingsphere.agent.plugin.metrics.core.fixture.collector.MetricsCollectorFixture;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.state.instance.InstanceStateContext;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.standalone.workerid.generator.StandaloneWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
Expand Down Expand Up @@ -82,9 +74,8 @@ void assertExportWithContextManager() {

private ContextManager mockContextManager() {
MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(mock(MetaDataPersistService.class), new ShardingSphereMetaData());
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(
new ComputeNodeInstance(mock(InstanceMetaData.class)), new StandaloneWorkerIdGenerator(), new ModeConfiguration("Standalone", null),
mock(LockContext.class), new EventBusContext());
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class));
ContextManager result = mock(ContextManager.class, RETURNS_DEEP_STUBS);
when(result.getMetaDataContexts()).thenReturn(metaDataContexts);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ private ContextManager mockContextManager() {
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(
new ComputeNodeInstance(mock(InstanceMetaData.class)), new StandaloneWorkerIdGenerator(), new ModeConfiguration("Standalone", null),
mock(LockContext.class), new EventBusContext());
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class));
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.storage.service.QualifiedDataSourceStatusService;
import org.apache.shardingsphere.readwritesplitting.constant.ReadwriteSplittingDataSourceType;
import org.apache.shardingsphere.readwritesplitting.distsql.statement.AlterReadwriteSplittingStorageUnitStatusStatement;
import org.apache.shardingsphere.readwritesplitting.exception.ReadwriteSplittingRuleExceptionIdentifier;
Expand Down Expand Up @@ -77,8 +76,7 @@ private void checkBeforeUpdate(final AlterReadwriteSplittingStorageUnitStatusSta

private void updateStatus(final ContextManager contextManager, final AlterReadwriteSplittingStorageUnitStatusStatement sqlStatement) {
DataSourceState status = sqlStatement.isEnable() ? DataSourceState.ENABLED : DataSourceState.DISABLED;
new QualifiedDataSourceStatusService(contextManager.getRepository())
.changeStatus(database.getName(), sqlStatement.getRuleName(), sqlStatement.getStorageUnitName(), status);
contextManager.getPersistServiceFacade().getQualifiedDataSourceStatePersistService().updateState(database.getName(), sqlStatement.getRuleName(), sqlStatement.getStorageUnitName(), status);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.state.StateContext;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.rule.TransactionRule;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -70,7 +69,6 @@ void setUp() {
mock(MetaDataPersistService.class), new ShardingSphereMetaData(databases, mock(ResourceMetaData.class), globalRuleMetaData, new ConfigurationProperties(new Properties())));
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
when(contextManager.getComputeNodeInstanceContext().getInstance().getState()).thenReturn(new InstanceStateContext());
when(contextManager.getStateContext()).thenReturn(new StateContext());
}

private Map<String, ShardingSphereDatabase> mockDatabases() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ public final class ContextManager implements AutoCloseable {

private final PersistRepository repository;

public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNodeInstanceContext computeNodeInstanceContext, final PersistRepository repository) {
public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNodeInstanceContext computeNodeInstanceContext, final PersistRepository repository, final boolean force) {
this.metaDataContexts = new AtomicReference<>(metaDataContexts);
this.computeNodeInstanceContext = computeNodeInstanceContext;
this.repository = repository;
persistServiceFacade = new PersistServiceFacade(repository, computeNodeInstanceContext.getModeConfiguration(), this);
stateContext = new StateContext(this.metaDataContexts.get().getMetaData(), persistServiceFacade.getQualifiedDataSourceStatePersistService().loadStates(), force);
metaDataContextManager = new MetaDataContextManager(this.metaDataContexts, computeNodeInstanceContext, persistServiceFacade);
executorEngine = ExecutorEngine.createExecutorEngineWithSize(metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE));
stateContext = new StateContext();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.apache.shardingsphere.metadata.factory.InternalMetaDataFactory;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.storage.QualifiedDataSourceStatus;
import org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;

import javax.sql.DataSource;
import java.sql.SQLException;
Expand Down Expand Up @@ -95,7 +95,7 @@ public static MetaDataContexts create(final MetaDataPersistService persistServic
* @throws SQLException SQL exception
*/
public static MetaDataContexts create(final MetaDataPersistService persistService, final ContextManagerBuilderParameter param,
final ComputeNodeInstanceContext computeNodeInstanceContext, final Map<String, QualifiedDataSourceStatus> statusMap) throws SQLException {
final ComputeNodeInstanceContext computeNodeInstanceContext, final Map<String, QualifiedDataSourceState> statusMap) throws SQLException {
boolean isDatabaseMetaDataExisted = !persistService.getDatabaseMetaDataService().loadAllDatabaseNames().isEmpty();
Map<String, DatabaseConfiguration> effectiveDatabaseConfigs = isDatabaseMetaDataExisted
? createEffectiveDatabaseConfigurations(getDatabaseNames(computeNodeInstanceContext, param.getDatabaseConfigs(), persistService), param.getDatabaseConfigs(), persistService)
Expand Down Expand Up @@ -155,7 +155,7 @@ private static void closeGeneratedDataSources(final String databaseName, final M
}
}

private static void checkDataSourceStates(final Map<String, DatabaseConfiguration> databaseConfigs, final Map<String, QualifiedDataSourceStatus> statusMap, final boolean force) {
private static void checkDataSourceStates(final Map<String, DatabaseConfiguration> databaseConfigs, final Map<String, QualifiedDataSourceState> statusMap, final boolean force) {
Map<String, DataSourceState> storageDataSourceStates = getStorageDataSourceStates(statusMap);
databaseConfigs.forEach((key, value) -> {
if (!value.getStorageUnits().isEmpty()) {
Expand All @@ -164,7 +164,7 @@ private static void checkDataSourceStates(final Map<String, DatabaseConfiguratio
});
}

private static Map<String, DataSourceState> getStorageDataSourceStates(final Map<String, QualifiedDataSourceStatus> statusMap) {
private static Map<String, DataSourceState> getStorageDataSourceStates(final Map<String, QualifiedDataSourceState> statusMap) {
Map<String, DataSourceState> result = new HashMap<>(statusMap.size(), 1F);
statusMap.forEach((key, value) -> {
List<String> values = Splitter.on(".").splitToList(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.shardingsphere.mode.service.persist.MetaDataManagerPersistService;
import org.apache.shardingsphere.mode.service.persist.PersistServiceBuilder;
import org.apache.shardingsphere.mode.service.persist.ProcessPersistService;
import org.apache.shardingsphere.mode.service.persist.QualifiedDataSourceStatePersistService;
import org.apache.shardingsphere.mode.service.pojo.ShardingSphereSchemaDataAlteredPOJO;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.mode.state.StatePersistService;
Expand All @@ -46,10 +47,13 @@ public final class PersistServiceFacade {

private final ProcessPersistService processPersistService;

private final QualifiedDataSourceStatePersistService qualifiedDataSourceStatePersistService;

public PersistServiceFacade(final PersistRepository repository, final ModeConfiguration modeConfiguration, final ContextManager contextManager) {
metaDataPersistService = new MetaDataPersistService(repository);
computeNodePersistService = new ComputeNodePersistService(repository);
statePersistService = new StatePersistService(repository);
qualifiedDataSourceStatePersistService = new QualifiedDataSourceStatePersistService(repository);
PersistServiceBuilder persistServiceBuilder = TypedSPILoader.getService(PersistServiceBuilder.class, modeConfiguration.getType());
metaDataManagerPersistService = persistServiceBuilder.buildMetaDataManagerPersistService(contextManager);
processPersistService = persistServiceBuilder.buildProcessPersistService(repository);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.storage.service;
package org.apache.shardingsphere.mode.service.persist;

import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.mode.storage.QualifiedDataSourceStatus;
import org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;
import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
import org.apache.shardingsphere.mode.storage.yaml.YamlQualifiedDataSourceStatus;
import org.apache.shardingsphere.mode.storage.yaml.YamlQualifiedDataSourceStatusSwapper;
Expand All @@ -33,21 +33,21 @@
import java.util.Map;

/**
* Qualified data source status service.
* Qualified data source state persist service.
*/
@RequiredArgsConstructor
public final class QualifiedDataSourceStatusService {
public final class QualifiedDataSourceStatePersistService {

private final PersistRepository repository;

/**
* Load qualified data source status.
* Load qualified data source states.
*
* @return qualified data source status
* @return qualified data source states
*/
public Map<String, QualifiedDataSourceStatus> loadStatus() {
public Map<String, QualifiedDataSourceState> loadStates() {
Collection<String> qualifiedDataSourceNodes = repository.getChildrenKeys(QualifiedDataSourceNode.getRootPath());
Map<String, QualifiedDataSourceStatus> result = new HashMap<>(qualifiedDataSourceNodes.size(), 1F);
Map<String, QualifiedDataSourceState> result = new HashMap<>(qualifiedDataSourceNodes.size(), 1F);
qualifiedDataSourceNodes.forEach(each -> {
String yamlContent = repository.query(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(new QualifiedDataSource(each)));
if (!Strings.isNullOrEmpty(yamlContent)) {
Expand All @@ -58,15 +58,15 @@ public Map<String, QualifiedDataSourceStatus> loadStatus() {
}

/**
* Change qualified data source status.
* Update qualified data source state.
*
* @param databaseName database name
* @param groupName group name
* @param storageUnitName storage unit name
* @param dataSourceState data source state
*/
public void changeStatus(final String databaseName, final String groupName, final String storageUnitName, final DataSourceState dataSourceState) {
QualifiedDataSourceStatus status = new QualifiedDataSourceStatus(dataSourceState);
public void updateState(final String databaseName, final String groupName, final String storageUnitName, final DataSourceState dataSourceState) {
QualifiedDataSourceState status = new QualifiedDataSourceState(dataSourceState);
repository.persist(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(
new QualifiedDataSource(databaseName, groupName, storageUnitName)), YamlEngine.marshal(new YamlQualifiedDataSourceStatusSwapper().swapToYamlConfiguration(status)));
}
Expand Down
Loading

0 comments on commit e383088

Please sign in to comment.