Skip to content

Commit

Permalink
fix(group): remove unnecessary members on ConsumerGroupOffset on api …
Browse files Browse the repository at this point in the history
…and handle it on client side

close #467
  • Loading branch information
tchiotludo committed Oct 21, 2020
1 parent 860b82a commit dde8114
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Header from '../../../Header/Header';
import Form from '../../../../components/Form/Form';
import Dropdown from 'react-bootstrap/Dropdown';
import DatePicker from '../../../../components/DatePicker';
import { formatDateTime } from '../../../../utils/converters';
import {formatDateTime, groupedTopicOffset} from '../../../../utils/converters';
import Joi from 'joi-browser';
import {
uriConsumerGroup,
Expand All @@ -20,7 +20,7 @@ class ConsumerGroupUpdate extends Form {
clusterId: '',
consumerGroupId: '',
timestamp: '',
groupedTopicOffset: this.props.groupedTopicOffset || {},
topicOffset: this.props.topicOffset || {},
firstOffsets: {},
lastOffsets: {},
formData: {},
Expand All @@ -34,12 +34,12 @@ class ConsumerGroupUpdate extends Form {
const { clusterId, consumerGroupId } = this.props.match.params;

this.setState({ clusterId, consumerGroupId }, () => {
this.getGroupedTopicOffset();
this.getTopicOffset();
});
}

async getGroupedTopicOffset() {
const { clusterId, consumerGroupId, groupedTopicOffset, timestamp} = this.state;
async getTopicOffset() {
const { clusterId, consumerGroupId, topicOffset, timestamp} = this.state;
const momentValue = moment(timestamp);

const date =
Expand All @@ -59,34 +59,36 @@ class ConsumerGroupUpdate extends Form {
: '';

let data = {};
if (JSON.stringify(groupedTopicOffset) === JSON.stringify({})) {
if (JSON.stringify(topicOffset) === JSON.stringify({})) {
data = await this.getApi(uriConsumerGroup(clusterId, consumerGroupId));
data = data.data;
const topicOffset = groupedTopicOffset(data.offsets);

if (data) {
this.setState({ groupedTopicOffset: data.groupedTopicOffset }, () =>
this.createValidationSchema(data.groupedTopicOffset)
this.setState({ topicOffset: topicOffset}, () =>
this.createValidationSchema(topicOffset)
);
} else {
this.setState({ groupedTopicOffset: {} });
this.setState({ topicOffset: {} });
}
} else if (date !== '') {
data = await this.getApi(uriConsumerGroupOffsetsByTimestamp(clusterId, consumerGroupId, date));
data = data.data;
this.handleOffsetsByTimestamp(data);
} else {
this.createValidationSchema(groupedTopicOffset);
this.createValidationSchema(topicOffset);
}
}

createValidationSchema = groupedTopicOffset => {
createValidationSchema = topicOffset => {
let { formData, checked} = this.state;
let firstOffsets = {};
let lastOffsets = {};
let name = '';

Object.keys(groupedTopicOffset).forEach(topidId => {
Object.keys(topicOffset).forEach(topidId => {
checked[topidId] = true;
groupedTopicOffset[topidId].forEach(offset => {
topicOffset[topidId].forEach(offset => {
name = `${topidId}-${offset.partition}`;
this.schema[name] = Joi.number()
.min(offset.firstOffset || 0)
Expand Down Expand Up @@ -180,29 +182,29 @@ class ConsumerGroupUpdate extends Form {
toast.success(`Offsets for '${consumerGroupId}' updated successfully.`);
}

renderGroupedTopicOffset = () => {
const { groupedTopicOffset, checked } = this.state;
rendertopicOffset = () => {
const { topicOffset, checked } = this.state;
const renderedItems = [];

Object.keys(groupedTopicOffset).forEach(topicId => {
Object.keys(topicOffset).forEach(topicId => {
renderedItems.push(
<fieldset id={`fieldset-${topicId}`} key={topicId}>
<legend id={`legend-${topicId}`}>
<input
type="checkbox"
value={topicId}
checked={checked[topicId] || false}
onChange={this.checkedGroupedTopicOffset}/> {topicId}
onChange={this.checkedtopicOffset}/> {topicId}
</legend>
{this.renderPartitionInputs(groupedTopicOffset[topicId], topicId, !checked[topicId])}
{this.renderPartitionInputs(topicOffset[topicId], topicId, !checked[topicId])}
</fieldset>
);
});

return renderedItems;
};

checkedGroupedTopicOffset = (event) => {
checkedtopicOffset = (event) => {
const { checked } = this.state;
checked[event.target.value] = event.target.checked;

Expand Down Expand Up @@ -293,7 +295,7 @@ class ConsumerGroupUpdate extends Form {
value={timestamp}
label={''}
onChange={value => {
this.setState({ timestamp: value }, () => this.getGroupedTopicOffset());
this.setState({ timestamp: value }, () => this.getTopicOffset());
}}
/>
</div>
Expand All @@ -315,7 +317,7 @@ class ConsumerGroupUpdate extends Form {
className="khq-form khq-update-consumer-group-offsets"
onSubmit={() => this.handleSubmit()}
>
{this.renderGroupedTopicOffset()}
{this.rendertopicOffset()}
{this.renderButton(
'Update',
this.handleSubmit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import React from 'react';
import Table from '../../../components/Table';
import { uriConsumerGroups, uriConsumerGroupDelete } from '../../../utils/endpoints';
import constants from '../../../utils/constants';
import { calculateTopicOffsetLag } from '../../../utils/converters';
import {calculateTopicOffsetLag, groupedTopicOffset} from '../../../utils/converters';
import Header from '../../Header';
import SearchBar from '../../../components/SearchBar';
import Pagination from '../../../components/Pagination';
Expand Down Expand Up @@ -86,7 +86,7 @@ class ConsumerGroupList extends Root {
state: consumerGroup.state,
coordinator: consumerGroup.coordinator.id,
members: consumerGroup.members ? consumerGroup.members.length : 0,
topics: consumerGroup.groupedTopicOffset ? consumerGroup.groupedTopicOffset : {}
topics: consumerGroup.offsets ? groupedTopicOffset(consumerGroup.offsets) : {}
});
});

Expand Down
14 changes: 14 additions & 0 deletions client/src/utils/converters.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ export function calculateTopicOffsetLag(topicOffsets, topicId) {
return offsetLag;
}

export function groupedTopicOffset(offsets) {
return (offsets || [])
.reduce((accumulator, r) => {
if (accumulator[r.topic] === undefined) {
accumulator[r.topic] = [];
}

accumulator[r.topic].push(r);

return accumulator;
}, Object.create(null));
}


export function formatDateTime(value, format, utc = false) {
let milli = value.milli || 0;
const date = new Date(
Expand Down
11 changes: 0 additions & 11 deletions client/src/utils/endpoints.js
Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,6 @@ export const uriConsumerGroupOffsetsByTimestamp = (clusterId, groupId, timestamp
return `${apiUrl}/${clusterId}/group/${groupId}/offsets/start?timestamp=${timestamp}`;
};

export const uriConsumerGroupGroupedTopicOffset = (clusterId, groupId, timestamp) => {
let uri = `${apiUrl}/group/grouped-topic-offset?clusterId=${clusterId}&groupId=${groupId}`;

if (timestamp !== '') {
uri += `&timestamp=${timestamp}`;
}

return uri;
};

export const uriConsumerGroupDelete = (clusterId, groupId) => {
return `${apiUrl}/${clusterId}/group/${groupId}`;
};
Expand Down Expand Up @@ -315,7 +305,6 @@ export default {
uriDeleteSchema,
uriPreferredSchemaForTopic,
uriSchemaCreate,
uriConsumerGroupGroupedTopicOffset,
uriConsumerGroupUpdate,
uriTopicsConfigs,
uriLatestSchemaVersion,
Expand Down
21 changes: 1 addition & 20 deletions src/main/java/org/akhq/models/ConsumerGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,7 @@ public ConsumerGroup(
this.offsets.add(new TopicPartition.ConsumerGroupOffset(
offset.getKey(),
offset.getValue(),
topicOffsets,
this.members
.stream()
.filter(consumer -> consumer.getAssignments()
.stream()
.filter(topicPartition ->
topicPartition.getPartition() == offset.getKey().partition() &&
topicPartition.getTopic().equals(offset.getKey().topic())
)
.collect(Collectors.toList())
.size() > 0
)
.findFirst()
.orElse(null)
topicOffsets
));
}

Expand Down Expand Up @@ -126,10 +113,4 @@ public long getOffsetLag(String topic) {
(a1, a2) -> a1 + a2
);
}

public Map<String, List<TopicPartition.ConsumerGroupOffset>> getGroupedTopicOffset() {
return this.offsets
.stream()
.collect(Collectors.groupingBy(TopicPartition::getTopic));
}
}
6 changes: 1 addition & 5 deletions src/main/java/org/akhq/models/TopicPartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public TopicPartition(String topic, int partition) {
public static class ConsumerGroupOffset extends TopicPartition {
private Optional<Long> offset;
private Optional<String> metadata;
private Optional<Consumer> member;
private Optional<Long> firstOffset;
private Optional<Long> lastOffset;

Expand All @@ -47,22 +46,19 @@ public ConsumerGroupOffset(TopicPartition topicPartition) {

this.offset = Optional.empty();
this.metadata = Optional.empty();
this.member = Optional.empty();
this.firstOffset = Optional.empty();
this.lastOffset = Optional.empty();
}

public ConsumerGroupOffset(
org.apache.kafka.common.TopicPartition topicPartition,
OffsetAndMetadata offsetAndMetadata,
Partition.Offsets partiionOffsets,
Consumer member
Partition.Offsets partiionOffsets
) {
super(topicPartition);

this.offset = offsetAndMetadata != null ? Optional.of(offsetAndMetadata.offset()) : Optional.empty();
this.metadata = offsetAndMetadata != null ? Optional.of(offsetAndMetadata.metadata()) : Optional.empty();
this.member = member != null ? Optional.of(member) : Optional.empty();
this.firstOffset = partiionOffsets != null ? Optional.of(partiionOffsets.getFirstOffset()) : Optional.empty();
this.lastOffset = partiionOffsets != null ? Optional.of(partiionOffsets.getLastOffset()) : Optional.empty();
}
Expand Down

0 comments on commit dde8114

Please sign in to comment.