Skip to content

Commit ef535d7

Browse files
shahzad31emilioalvapvigneshshanmugam
authored
fix: calculate push page size based on monitor size per batch (#993)
* retry on payload * add paylaod based chunking * add paylaod based chunking * Fix unit tests * conflicts resolution 101 * update tests * PR feedback * PR feedback * PR feedback * show only for 413 * keep size as mb and kb --------- Co-authored-by: emilioalvap <emilio.alvarezpineiro@elastic.co> Co-authored-by: vigneshshanmugam <vignesh.shanmugam22@gmail.com>
1 parent 65b9553 commit ef535d7

File tree

6 files changed

+143
-32
lines changed

6 files changed

+143
-32
lines changed

__tests__/push/utils.test.ts

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
*
2424
*/
2525

26-
import { normalizeMonitorName } from '../../src/push/utils';
26+
import { getSizedBatches, normalizeMonitorName } from '../../src/push/utils';
2727

2828
describe('Push Utils', () => {
2929
it("normalize monitor's name", () => {
@@ -38,4 +38,42 @@ describe('Push Utils', () => {
3838
expect(normalizeMonitorName('x.y.z')).toBe('x_y_z');
3939
expect(normalizeMonitorName('foo:bar:blah')).toBe('foo_bar_blah');
4040
});
41+
42+
describe('getSizedChunks', () => {
43+
it('should return empty array when input is empty', () => {
44+
expect(getSizedBatches([], new Map(), 100, 100)).toEqual([]);
45+
});
46+
47+
it('should return chunks of input array based on maxChunkSizeKB', () => {
48+
const input = [
49+
{ id: '1' },
50+
{ id: '2' },
51+
{ id: '3' },
52+
{ id: '4' },
53+
{ id: '5' },
54+
{ id: '6' },
55+
];
56+
// map of id and sizes
57+
const sizes = new Map([
58+
['1', 10 * 1024],
59+
['2', 20 * 1024],
60+
['3', 30 * 1024],
61+
['4', 40 * 1024],
62+
['5', 50 * 1024],
63+
['6', 60 * 1024],
64+
]);
65+
expect(getSizedBatches(input, sizes, 1000, 100)).toEqual([input]);
66+
expect(getSizedBatches(input, sizes, 100, 3)).toEqual([
67+
[input[0], input[1], input[2]],
68+
[input[3], input[4]],
69+
[input[5]],
70+
]);
71+
expect(getSizedBatches(input, sizes, 100, 2)).toEqual([
72+
[input[0], input[1]],
73+
[input[2], input[3]],
74+
[input[4]],
75+
[input[5]],
76+
]);
77+
});
78+
});
4179
});

src/push/index.ts

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,12 @@ import {
5050
bulkGetMonitors,
5151
bulkPutMonitors,
5252
createMonitorsLegacy,
53-
CHUNK_SIZE,
53+
BATCH_SIZE,
54+
MAX_PAYLOAD_SIZE_KB,
5455
} from './kibana_api';
5556
import {
56-
getChunks,
57+
getBatches,
58+
getSizedBatches,
5759
isBulkAPISupported,
5860
isLightweightMonitorSupported,
5961
logDiff,
@@ -102,7 +104,10 @@ export async function push(monitors: Monitor[], options: PushOptions) {
102104
for (const value of sizes.values()) {
103105
totalSize += value;
104106
}
105-
progress('total size of the monitors payload is ' + printBytes(totalSize));
107+
progress(
108+
`total size of the ${sizes.size} monitors payload is ` +
109+
printBytes(totalSize)
110+
);
106111
}
107112

108113
if (options.dryRun) {
@@ -115,11 +120,19 @@ export async function push(monitors: Monitor[], options: PushOptions) {
115120
const updatedMonitors = new Set<string>([...changedIDs, ...newIDs]);
116121
if (updatedMonitors.size > 0) {
117122
const updatedMonSchemas = schemas.filter(s => updatedMonitors.has(s.id));
118-
const chunks = getChunks(updatedMonSchemas, CHUNK_SIZE);
119-
for (const chunk of chunks) {
123+
const batches = getSizedBatches(
124+
updatedMonSchemas,
125+
sizes,
126+
MAX_PAYLOAD_SIZE_KB,
127+
BATCH_SIZE
128+
);
129+
if (batches.length > 1) {
130+
progress(`Monitors will be pushed as ${batches.length} batches.`);
131+
}
132+
for (const batch of batches) {
120133
await liveProgress(
121-
bulkPutMonitors(options, chunk),
122-
`creating or updating ${chunk.length} monitors`
134+
bulkPutMonitors(options, batch),
135+
`creating or updating ${batch.length} monitors`
123136
);
124137
}
125138
}
@@ -136,11 +149,11 @@ export async function push(monitors: Monitor[], options: PushOptions) {
136149
options.yes
137150
);
138151
}
139-
const chunks = getChunks(Array.from(removedIDs), CHUNK_SIZE);
140-
for (const chunk of chunks) {
152+
const batches = getBatches(Array.from(removedIDs), BATCH_SIZE);
153+
for (const batch of batches) {
141154
await liveProgress(
142-
bulkDeleteMonitors(options, chunk),
143-
`deleting ${chunk.length} monitors`
155+
bulkDeleteMonitors(options, batch),
156+
`deleting ${batch.length} monitors`
144157
);
145158
}
146159
}
@@ -317,14 +330,18 @@ export async function pushLegacy(monitors: Monitor[], options: PushOptions) {
317330
}
318331

319332
let schemas: MonitorSchema[] = [];
333+
let sizes: Map<string, number> = new Map();
320334
if (monitors.length > 0) {
321335
progress(`preparing ${monitors.length} monitors`);
322-
({ schemas } = await buildMonitorSchema(monitors, false));
323-
const chunks = getChunks(schemas, 10);
324-
for (const chunk of chunks) {
336+
({ schemas, sizes } = await buildMonitorSchema(monitors, false));
337+
const batches = getSizedBatches(schemas, sizes, MAX_PAYLOAD_SIZE_KB, 10);
338+
if (batches.length > 1) {
339+
progress(`Monitors will be pushed as ${batches.length} batches.`);
340+
}
341+
for (const batch of batches) {
325342
await liveProgress(
326-
createMonitorsLegacy({ schemas: chunk, keepStale: true, options }),
327-
`creating or updating ${chunk.length} monitors`
343+
createMonitorsLegacy({ schemas: batch, keepStale: true, options }),
344+
`creating or updating ${batch.length} monitors`
328345
);
329346
}
330347
} else {

src/push/kibana_api.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ import {
3636
} from './request';
3737
import { generateURL } from './utils';
3838

39-
// Default chunk size for bulk put / delete
40-
export const CHUNK_SIZE = parseInt(process.env.CHUNK_SIZE) || 100;
39+
// Default batch size for bulk put / delete
40+
export const BATCH_SIZE = parseInt(process.env.CHUNK_SIZE) || 250;
41+
export const MAX_PAYLOAD_SIZE_KB = 50 * 1000; // 50 MB in KB
4142

4243
export type PutResponse = {
4344
createdMonitors: string[];
@@ -49,12 +50,15 @@ export async function bulkPutMonitors(
4950
options: PushOptions,
5051
schemas: MonitorSchema[]
5152
) {
53+
const url = generateURL(options, 'bulk_update') + '/_bulk_update';
54+
5255
const resp = await sendReqAndHandleError<PutResponse>({
53-
url: generateURL(options, 'bulk_update') + '/_bulk_update',
56+
url,
5457
method: 'PUT',
5558
auth: options.auth,
5659
body: JSON.stringify({ monitors: schemas }),
5760
});
61+
5862
const { failedMonitors } = resp;
5963
if (failedMonitors && failedMonitors.length > 0) {
6064
throw formatFailedMonitors(failedMonitors);

src/push/monitor.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ export type MonitorSchema = Omit<MonitorConfig, 'locations'> & {
5050
content?: string;
5151
filter?: Monitor['filter'];
5252
hash?: string;
53+
size?: number;
5354
};
5455

5556
// Abbreviated monitor info, as often returned by the API,
@@ -300,7 +301,7 @@ export function buildMonitorFromYaml(
300301
});
301302

302303
/**
303-
* Params support is only available for lighweight monitors
304+
* Params support is only available for lightweight monitors
304305
* post 8.7.2 stack
305306
*/
306307
if (isParamOptionSupported(options.kibanaVersion)) {

src/push/request.ts

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,14 @@ export async function sendReqAndHandleError<T>(
5858
options: APIRequestOptions
5959
): Promise<T> {
6060
const { statusCode, body } = await sendRequest(options);
61+
6162
return (
62-
await handleError(statusCode, options.url, body)
63+
await handleError(
64+
statusCode,
65+
options.url,
66+
body,
67+
statusCode === 413 ? `${options.body?.length} bytes sent` : ''
68+
)
6369
).json() as Promise<T>;
6470
}
6571

@@ -74,7 +80,8 @@ type APIError = {
7480
export async function handleError(
7581
statusCode: number,
7682
url: string,
77-
body: Dispatcher.ResponseData['body']
83+
body: Dispatcher.ResponseData['body'],
84+
extraMessage?: string
7885
): Promise<Dispatcher.ResponseData['body']> {
7986
if (statusCode === 404) {
8087
throw formatNotFoundError(url, await body.text());
@@ -84,9 +91,19 @@ export async function handleError(
8491
const resp = await body.text();
8592
parsed = JSON.parse(resp) as APIError;
8693
} catch (e) {
87-
throw formatAPIError(statusCode, 'unexpected error', e.message);
94+
throw formatAPIError(
95+
statusCode,
96+
'unexpected error',
97+
e.message,
98+
extraMessage
99+
);
88100
}
89-
throw formatAPIError(statusCode, parsed.error, parsed.message);
101+
throw formatAPIError(
102+
statusCode,
103+
parsed.error,
104+
parsed.message,
105+
extraMessage
106+
);
90107
}
91108

92109
return body;
@@ -111,16 +128,18 @@ export function formatNotFoundError(url: string, message: string) {
111128
}
112129

113130
export function formatAPIError(
114-
statuCode: number,
131+
statusCode: number,
115132
error: string,
116-
message: string
133+
message: string,
134+
extraMessage = ''
117135
) {
118136
let outer = bold(`${symbols['failed']} Error\n`);
119137
let inner = bold(
120-
`${symbols['failed']} monitor creation failed - ${statuCode}:${error}\n`
138+
`${symbols['failed']} monitor creation failed - ${statusCode}:${error}\n`
121139
);
122140
inner += indent(message, ' ');
123141
outer += indent(inner);
142+
outer += extraMessage ? indent(extraMessage, ' ') : '';
124143
return red(outer);
125144
}
126145

src/push/utils.ts

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,44 @@ export function printBytes(bytes: number) {
9090
return `${bytes.toFixed(1)} ${BYTE_UNITS[exponent]}`;
9191
}
9292

93-
export function getChunks<T>(arr: Array<T>, size: number): Array<T[]> {
94-
const chunks = [];
93+
export function getBatches(arr: string[], size: number): string[][] {
94+
const batches = [];
9595
for (let i = 0; i < arr.length; i += size) {
96-
chunks.push(arr.slice(i, i + size));
96+
batches.push(arr.slice(i, i + size));
9797
}
98-
return chunks;
98+
return batches;
99+
}
100+
101+
export function getSizedBatches<T extends { id?: string }>(
102+
arr: Array<T>,
103+
sizes: Map<string, number>,
104+
maxBatchSizeKB: number,
105+
maxBatchItems: number
106+
): Array<T[]> {
107+
const batches: Array<T[]> = [];
108+
let currentBatch: T[] = [];
109+
let currentSize = 0;
110+
111+
for (const item of arr) {
112+
const sizeKB = item.id ? Math.round(sizes.get(item.id) / 1000) : 1;
113+
// If adding the current item would exceed limits, create a new chunk
114+
if (
115+
currentBatch.length >= maxBatchItems ||
116+
currentSize + sizeKB > maxBatchSizeKB
117+
) {
118+
batches.push(currentBatch);
119+
currentBatch = [];
120+
currentSize = 0;
121+
}
122+
currentBatch.push(item);
123+
currentSize += sizeKB;
124+
}
125+
126+
// Push the last chunk if it contains any items
127+
if (currentBatch.length > 0) {
128+
batches.push(currentBatch);
129+
}
130+
return batches;
99131
}
100132

101133
type Operation =

0 commit comments

Comments
 (0)