Skip to content

Commit f551c5b

Browse files
Skyeden-3020 | retransimission with detached consumers (#1938)
* SKYEDEN-3020 | Retransmission with detached consumers * SKYEDEN-3020 | Refactor * SKYEDEN-3020 | Refactor * SKYEDEN-3271 | comment + removed move offsets to the end from the subscription diagnostic tab + improved retransmission UI * SKYEDEN-3020 | changes related to PR + frontend tests * SKYEDEN-3020 | resolve conflicts * SKYEDEN-3020 | changes related to CR - added toString for PartitionOffset class
1 parent da5a91f commit f551c5b

File tree

28 files changed

+407
-325
lines changed

28 files changed

+407
-325
lines changed

hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java

+8
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ public Set<ConsumerGroupMember> getMembers() {
4141
return members;
4242
}
4343

44+
public boolean isStable() {
45+
return state.equals("Stable");
46+
}
47+
48+
public boolean isEmpty() {
49+
return state.equals("Empty");
50+
}
51+
4452
@Override
4553
public boolean equals(Object o) {
4654
if (this == o) {

hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffset.java

+12
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,16 @@ public boolean equals(Object obj) {
5252
public int hashCode() {
5353
return Objects.hash(topic, partition, offset);
5454
}
55+
56+
@Override
57+
public String toString() {
58+
return "PartitionOffset{"
59+
+ "topic="
60+
+ topic
61+
+ ", partition="
62+
+ partition
63+
+ ", offset="
64+
+ offset
65+
+ '}';
66+
}
5567
}

hermes-console/json-server/server.ts

+3-5
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@ server.post('/query/subscriptions', (req, res) => {
2020
res.jsonp(subscriptions);
2121
});
2222

23-
server.post('/topics/*/subscriptions/*/moveOffsetsToTheEnd', (req, res) => {
24-
res.sendStatus(200);
25-
});
26-
2723
server.post('/topicSubscriptions', (req, res) => {
2824
res.sendStatus(200);
2925
});
@@ -83,7 +79,9 @@ server.post('/offline-retransmission/tasks', (req, res) => {
8379
server.put(
8480
'/topics/:topic/subscriptions/:subscroption/retransmission',
8581
(req, res) => {
86-
res.sendStatus(200);
82+
setTimeout(() => {
83+
res.sendStatus(200);
84+
}, 2000);
8785
},
8886
);
8987

hermes-console/src/api/hermes-client/index.ts

-9
Original file line numberDiff line numberDiff line change
@@ -310,15 +310,6 @@ export function fetchDashboardUrl(path: string): ResponsePromise<DashboardUrl> {
310310
return axios.get<DashboardUrl>(path);
311311
}
312312

313-
export function moveSubscriptionOffsets(
314-
topicName: string,
315-
subscription: string,
316-
): ResponsePromise<null> {
317-
return axios.post<null>(
318-
`/topics/${topicName}/subscriptions/${subscription}/moveOffsetsToTheEnd`,
319-
);
320-
}
321-
322313
export function removeTopic(topic: String): ResponsePromise<void> {
323314
return axios.delete(`/topics/${topic}`);
324315
}

hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.spec.ts

-57
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,9 @@ import { createTestingPinia } from '@pinia/testing';
33
import { dummyConsumerGroups } from '@/dummy/consumerGroups';
44
import { dummySubscription } from '@/dummy/subscription';
55
import { dummyTopic } from '@/dummy/topic';
6-
import {
7-
expectNotificationDispatched,
8-
notificationStoreSpy,
9-
} from '@/utils/test-utils';
106
import {
117
fetchConsumerGroupsErrorHandler,
128
fetchConsumerGroupsHandler,
13-
moveSubscriptionOffsetsHandler,
149
} from '@/mocks/handlers';
1510
import { setActivePinia } from 'pinia';
1611
import { setupServer } from 'msw/node';
@@ -81,56 +76,4 @@ describe('useConsumerGroups', () => {
8176
expect(error.value.fetchConsumerGroups).not.toBeNull();
8277
});
8378
});
84-
85-
it('should show message that moving offsets was successful', async () => {
86-
// given
87-
server.use(
88-
moveSubscriptionOffsetsHandler({
89-
topicName,
90-
subscriptionName,
91-
statusCode: 200,
92-
}),
93-
);
94-
server.listen();
95-
const notificationStore = notificationStoreSpy();
96-
97-
const { moveOffsets } = useConsumerGroups(topicName, subscriptionName);
98-
99-
// when
100-
moveOffsets();
101-
102-
// then
103-
await waitFor(() => {
104-
expectNotificationDispatched(notificationStore, {
105-
type: 'success',
106-
title: 'notifications.subscriptionOffsets.move.success',
107-
});
108-
});
109-
});
110-
111-
it('should show message that moving offsets was unsuccessful', async () => {
112-
// given
113-
server.use(
114-
moveSubscriptionOffsetsHandler({
115-
topicName,
116-
subscriptionName,
117-
statusCode: 500,
118-
}),
119-
);
120-
server.listen();
121-
122-
const notificationStore = notificationStoreSpy();
123-
const { moveOffsets } = useConsumerGroups(topicName, subscriptionName);
124-
125-
// when
126-
moveOffsets();
127-
128-
// then
129-
await waitFor(() => {
130-
expectNotificationDispatched(notificationStore, {
131-
type: 'error',
132-
title: 'notifications.subscriptionOffsets.move.failure',
133-
});
134-
});
135-
});
13679
});

hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.ts

+1-34
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,10 @@
1-
import { dispatchErrorNotification } from '@/utils/notification-utils';
2-
import {
3-
fetchConsumerGroups as getConsumerGroups,
4-
moveSubscriptionOffsets,
5-
} from '@/api/hermes-client';
1+
import { fetchConsumerGroups as getConsumerGroups } from '@/api/hermes-client';
62
import { ref } from 'vue';
7-
import { useGlobalI18n } from '@/i18n';
8-
import { useNotificationsStore } from '@/store/app-notifications/useAppNotifications';
93
import type { ConsumerGroup } from '@/api/consumer-group';
104
import type { Ref } from 'vue';
115

126
export interface UseConsumerGroups {
137
consumerGroups: Ref<ConsumerGroup[] | undefined>;
14-
moveOffsets: () => void;
158
loading: Ref<boolean>;
169
error: Ref<UseConsumerGroupsErrors>;
1710
}
@@ -43,36 +36,10 @@ export function useConsumerGroups(
4336
}
4437
};
4538

46-
const moveOffsets = async () => {
47-
const notificationsStore = useNotificationsStore();
48-
try {
49-
await moveSubscriptionOffsets(topicName, subscriptionName);
50-
await notificationsStore.dispatchNotification({
51-
title: useGlobalI18n().t(
52-
'notifications.subscriptionOffsets.move.success',
53-
{
54-
subscriptionName,
55-
},
56-
),
57-
text: '',
58-
type: 'success',
59-
});
60-
} catch (e: any) {
61-
await dispatchErrorNotification(
62-
e,
63-
notificationsStore,
64-
useGlobalI18n().t('notifications.subscriptionOffsets.move.failure', {
65-
subscriptionName,
66-
}),
67-
);
68-
}
69-
};
70-
7139
fetchConsumerGroups();
7240

7341
return {
7442
consumerGroups,
75-
moveOffsets,
7643
loading,
7744
error,
7845
};

hermes-console/src/composables/subscription/use-subscription/useSubscription.spec.ts

+67-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { afterEach } from 'vitest';
1+
import { afterEach, expect } from 'vitest';
22
import {
33
createRetransmissionHandler,
44
fetchSubscriptionErrorHandler,
@@ -18,7 +18,6 @@ import {
1818
dummySubscriptionHealth,
1919
dummySubscriptionMetrics,
2020
} from '@/dummy/subscription';
21-
import { expect } from 'vitest';
2221
import {
2322
expectNotificationDispatched,
2423
notificationStoreSpy,
@@ -391,6 +390,39 @@ describe('useSubscription', () => {
391390
});
392391
});
393392

393+
[200, 500].forEach((statusCode) => {
394+
it(`should correctly manage the state of retransmission with status code ${statusCode}`, async () => {
395+
// given
396+
server.use(
397+
createRetransmissionHandler({
398+
topicName: dummySubscription.topicName,
399+
subscriptionName: dummySubscription.name,
400+
statusCode,
401+
delayMs: 100,
402+
}),
403+
);
404+
server.listen();
405+
406+
const { retransmitMessages, retransmitting } = useSubscription(
407+
dummySubscription.topicName,
408+
dummySubscription.name,
409+
);
410+
411+
expect(retransmitting.value).toBeFalsy();
412+
413+
// when
414+
retransmitMessages(new Date().toISOString());
415+
416+
// then
417+
await waitFor(() => {
418+
expect(retransmitting.value).toBeTruthy();
419+
});
420+
await waitFor(() => {
421+
expect(retransmitting.value).toBeFalsy();
422+
});
423+
});
424+
});
425+
394426
it('should show message that skipping all messages was successful', async () => {
395427
// given
396428
server.use(
@@ -448,6 +480,39 @@ describe('useSubscription', () => {
448480
});
449481
});
450482
});
483+
484+
[200, 500].forEach((statusCode) => {
485+
it(`should correctly manage the state of skipping all messages with status code ${statusCode}`, async () => {
486+
// given
487+
server.use(
488+
createRetransmissionHandler({
489+
topicName: dummySubscription.topicName,
490+
subscriptionName: dummySubscription.name,
491+
statusCode,
492+
delayMs: 100,
493+
}),
494+
);
495+
server.listen();
496+
497+
const { skipAllMessages, skippingAllMessages } = useSubscription(
498+
dummySubscription.topicName,
499+
dummySubscription.name,
500+
);
501+
502+
expect(skippingAllMessages.value).toBeFalsy();
503+
504+
// when
505+
skipAllMessages();
506+
507+
// then
508+
await waitFor(() => {
509+
expect(skippingAllMessages.value).toBeTruthy();
510+
});
511+
await waitFor(() => {
512+
expect(skippingAllMessages.value).toBeFalsy();
513+
});
514+
});
515+
});
451516
});
452517

453518
function expectErrors(

hermes-console/src/composables/subscription/use-subscription/useSubscription.ts

+12-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ export interface UseSubscription {
3232
subscriptionLastUndeliveredMessage: Ref<SentMessageTrace | null>;
3333
trackingUrls: Ref<TrackingUrl[] | undefined>;
3434
loading: Ref<boolean>;
35+
retransmitting: Ref<boolean>;
36+
skippingAllMessages: Ref<boolean>;
3537
error: Ref<UseSubscriptionsErrors>;
3638
removeSubscription: () => Promise<boolean>;
3739
suspendSubscription: () => Promise<boolean>;
@@ -73,7 +75,8 @@ export function useSubscription(
7375
fetchSubscriptionLastUndeliveredMessage: null,
7476
getSubscriptionTrackingUrls: null,
7577
});
76-
78+
const retransmitting = ref(false);
79+
const skippingAllMessages = ref(false);
7780
const fetchSubscription = async () => {
7881
try {
7982
loading.value = true;
@@ -233,6 +236,7 @@ export function useSubscription(
233236
};
234237

235238
const retransmitMessages = async (from: string): Promise<boolean> => {
239+
retransmitting.value = true;
236240
try {
237241
await retransmitSubscriptionMessages(topicName, subscriptionName, {
238242
retransmissionDate: from,
@@ -257,10 +261,13 @@ export function useSubscription(
257261
}),
258262
);
259263
return false;
264+
} finally {
265+
retransmitting.value = false;
260266
}
261267
};
262268

263269
const skipAllMessages = async (): Promise<boolean> => {
270+
skippingAllMessages.value = true;
264271
const tomorrowDate = new Date();
265272
tomorrowDate.setDate(tomorrowDate.getDate() + 1);
266273
try {
@@ -290,6 +297,8 @@ export function useSubscription(
290297
),
291298
);
292299
return false;
300+
} finally {
301+
skippingAllMessages.value = false;
293302
}
294303
};
295304

@@ -305,6 +314,8 @@ export function useSubscription(
305314
subscriptionLastUndeliveredMessage,
306315
trackingUrls,
307316
loading,
317+
retransmitting,
318+
skippingAllMessages,
308319
error,
309320
removeSubscription,
310321
suspendSubscription,

hermes-console/src/i18n/en-US/index.ts

-4
Original file line numberDiff line numberDiff line change
@@ -625,10 +625,6 @@ const en_US = {
625625
reason: 'Reason',
626626
timestamp: 'Timestamp',
627627
},
628-
moveOffsets: {
629-
tooltip: 'Move subscription offsets to the end',
630-
button: 'MOVE OFFSETS',
631-
},
632628
},
633629
search: {
634630
collection: {

0 commit comments

Comments
 (0)