Skip to content

Commit

Permalink
MINOR: cleanup ProcessorContextImplTest (#18682)
Browse files Browse the repository at this point in the history
Reviewers: Bill Bejeck <[email protected]>
  • Loading branch information
mjsax authored Jan 24, 2025
1 parent 40890fa commit ad79b4a
Showing 1 changed file with 0 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,6 @@ private ProcessorContextImpl getStandbyContext() {

@Test
public void globalKeyValueStoreShouldBeReadOnly() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
when(stateManager.globalStore(anyString())).thenReturn(null);

Expand Down Expand Up @@ -173,8 +171,6 @@ public void globalKeyValueStoreShouldBeReadOnly() {

@Test
public void globalTimestampedKeyValueStoreShouldBeReadOnly() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
when(stateManager.globalStore(anyString())).thenReturn(null);

Expand Down Expand Up @@ -299,8 +295,6 @@ public void globalSessionStoreShouldBeReadOnly() {

@Test
public void localKeyValueStoreShouldNotAllowInitOrClose() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
when(stateManager.globalStore(anyString())).thenReturn(null);

Expand Down Expand Up @@ -343,8 +337,6 @@ public void localKeyValueStoreShouldNotAllowInitOrClose() {

@Test
public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
when(stateManager.globalStore(anyString())).thenReturn(null);

Expand Down Expand Up @@ -521,8 +513,6 @@ public void localSessionStoreShouldNotAllowInitOrClose() {

@Test
public void shouldNotSendRecordHeadersToChangelogTopic() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
when(stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).thenReturn(CHANGELOG_PARTITION);

Expand Down Expand Up @@ -553,18 +543,9 @@ public void shouldNotSendRecordHeadersToChangelogTopic() {

@Test
public void shouldSendRecordHeadersToChangelogTopicWhenConsistencyEnabled() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
when(stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).thenReturn(CHANGELOG_PARTITION);

context = buildProcessorContextImpl(streamsConfig, stateManager);

final StreamTask task = mock(StreamTask.class);
context.transitionToActive(task, null, null);

mockProcessorNodeWithLocalKeyValueStore();

final Position position = Position.emptyPosition();
final Headers headers = new RecordHeaders();
headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
Expand Down Expand Up @@ -593,17 +574,6 @@ public void shouldSendRecordHeadersToChangelogTopicWhenConsistencyEnabled() {

@Test
public void shouldThrowUnsupportedOperationExceptionOnLogChange() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);

final StreamTask task = mock(StreamTask.class);
context.transitionToActive(task, null, null);

mockProcessorNodeWithLocalKeyValueStore();

context = getStandbyContext();
assertThrows(
UnsupportedOperationException.class,
Expand All @@ -613,17 +583,6 @@ public void shouldThrowUnsupportedOperationExceptionOnLogChange() {

@Test
public void shouldThrowUnsupportedOperationExceptionOnGetStateStore() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);

final StreamTask task = mock(StreamTask.class);
context.transitionToActive(task, null, null);

mockProcessorNodeWithLocalKeyValueStore();

context = getStandbyContext();
assertThrows(
UnsupportedOperationException.class,
Expand All @@ -633,17 +592,6 @@ public void shouldThrowUnsupportedOperationExceptionOnGetStateStore() {

@Test
public void shouldThrowUnsupportedOperationExceptionOnForward() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);

final StreamTask task = mock(StreamTask.class);
context.transitionToActive(task, null, null);

mockProcessorNodeWithLocalKeyValueStore();

context = getStandbyContext();
assertThrows(
UnsupportedOperationException.class,
Expand All @@ -653,17 +601,6 @@ public void shouldThrowUnsupportedOperationExceptionOnForward() {

@Test
public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);

final StreamTask task = mock(StreamTask.class);
context.transitionToActive(task, null, null);

mockProcessorNodeWithLocalKeyValueStore();

context = getStandbyContext();
assertThrows(
UnsupportedOperationException.class,
Expand All @@ -673,17 +610,6 @@ public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() {

@Test
public void shouldThrowUnsupportedOperationExceptionOnCommit() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);

final StreamTask task = mock(StreamTask.class);
context.transitionToActive(task, null, null);

mockProcessorNodeWithLocalKeyValueStore();

context = getStandbyContext();
assertThrows(
UnsupportedOperationException.class,
Expand All @@ -693,17 +619,6 @@ public void shouldThrowUnsupportedOperationExceptionOnCommit() {

@Test
public void shouldThrowUnsupportedOperationExceptionOnSchedule() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);

final StreamTask task = mock(StreamTask.class);
context.transitionToActive(task, null, null);

mockProcessorNodeWithLocalKeyValueStore();

context = getStandbyContext();
assertThrows(
UnsupportedOperationException.class,
Expand All @@ -713,17 +628,6 @@ public void shouldThrowUnsupportedOperationExceptionOnSchedule() {

@Test
public void shouldThrowUnsupportedOperationExceptionOnTopic() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);

final StreamTask task = mock(StreamTask.class);
context.transitionToActive(task, null, null);

mockProcessorNodeWithLocalKeyValueStore();

context = getStandbyContext();
assertThrows(
UnsupportedOperationException.class,
Expand All @@ -733,17 +637,6 @@ public void shouldThrowUnsupportedOperationExceptionOnTopic() {

@Test
public void shouldThrowUnsupportedOperationExceptionOnPartition() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);

final StreamTask task = mock(StreamTask.class);
context.transitionToActive(task, null, null);

mockProcessorNodeWithLocalKeyValueStore();

context = getStandbyContext();
assertThrows(
UnsupportedOperationException.class,
Expand All @@ -753,17 +646,6 @@ public void shouldThrowUnsupportedOperationExceptionOnPartition() {

@Test
public void shouldThrowUnsupportedOperationExceptionOnOffset() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);

final StreamTask task = mock(StreamTask.class);
context.transitionToActive(task, null, null);

mockProcessorNodeWithLocalKeyValueStore();

context = getStandbyContext();
assertThrows(
UnsupportedOperationException.class,
Expand All @@ -773,17 +655,6 @@ public void shouldThrowUnsupportedOperationExceptionOnOffset() {

@Test
public void shouldThrowUnsupportedOperationExceptionOnTimestamp() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);

final StreamTask task = mock(StreamTask.class);
context.transitionToActive(task, null, null);

mockProcessorNodeWithLocalKeyValueStore();

context = getStandbyContext();
assertThrows(
UnsupportedOperationException.class,
Expand All @@ -793,17 +664,6 @@ public void shouldThrowUnsupportedOperationExceptionOnTimestamp() {

@Test
public void shouldThrowUnsupportedOperationExceptionOnCurrentNode() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);

final StreamTask task = mock(StreamTask.class);
context.transitionToActive(task, null, null);

mockProcessorNodeWithLocalKeyValueStore();

context = getStandbyContext();
assertThrows(
UnsupportedOperationException.class,
Expand All @@ -813,17 +673,6 @@ public void shouldThrowUnsupportedOperationExceptionOnCurrentNode() {

@Test
public void shouldThrowUnsupportedOperationExceptionOnSetRecordContext() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);

final StreamTask task = mock(StreamTask.class);
context.transitionToActive(task, null, null);

mockProcessorNodeWithLocalKeyValueStore();

context = getStandbyContext();
assertThrows(
UnsupportedOperationException.class,
Expand All @@ -833,17 +682,6 @@ public void shouldThrowUnsupportedOperationExceptionOnSetRecordContext() {

@Test
public void shouldThrowUnsupportedOperationExceptionOnRecordContext() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);

final StreamTask task = mock(StreamTask.class);
context.transitionToActive(task, null, null);

mockProcessorNodeWithLocalKeyValueStore();

context = getStandbyContext();
assertThrows(
UnsupportedOperationException.class,
Expand All @@ -853,8 +691,6 @@ public void shouldThrowUnsupportedOperationExceptionOnRecordContext() {

@Test
public void shouldMatchStreamTime() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);
Expand All @@ -870,8 +706,6 @@ public void shouldMatchStreamTime() {

@Test
public void shouldAddAndGetProcessorKeyValue() {
foreachSetUp();

when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);

context = buildProcessorContextImpl(streamsConfig, stateManager);
Expand All @@ -891,8 +725,6 @@ public void shouldAddAndGetProcessorKeyValue() {

@Test
public void shouldSetAndGetProcessorMetaData() {
foreachSetUp();

context = buildProcessorContextImpl(streamsConfig, stateManager);

mockProcessorNodeWithLocalKeyValueStore();
Expand Down

0 comments on commit ad79b4a

Please sign in to comment.