Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix queue populator shared metrics #2612

Open
wants to merge 3 commits into
base: development/9.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/queuePopulator/BucketFileLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ const LogReader = require('./LogReader');
class BucketFileLogReader extends LogReader {
constructor(params) {
const { zkClient, kafkaConfig, dmdConfig, logger,
extensions, metricsProducer, metricsHandler } = params;
extensions, extensionNames, metricsProducer, metricsHandler } = params;
super({ zkClient, kafkaConfig, logConsumer: null,
logId: `bucketFile_${dmdConfig.logName}`, logger, extensions,
metricsProducer, metricsHandler });

this._dmdConfig = dmdConfig;
this._log = logger;
this._extensionNames = extensionNames;
this._log.info('initializing bucketfile log reader', {
method: 'BucketFileLogReader.constructor',
dmdConfig,
Expand Down Expand Up @@ -45,6 +46,7 @@ class BucketFileLogReader extends LogReader {

getMetricLabels() {
return {
origin: this._extensionNames,
Kerkesni marked this conversation as resolved.
Show resolved Hide resolved
logName: 'bucket-file',
logId: this._dmdConfig.logName,
};
Expand Down
4 changes: 3 additions & 1 deletion lib/queuePopulator/KafkaLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class KafkaLogReader extends LogReader {
*/
constructor(params) {
const { zkClient, kafkaConfig, zkConfig, qpKafkaConfig,
logger, extensions, metricsProducer, metricsHandler } = params;
logger, extensions, extensionNames, metricsProducer, metricsHandler } = params;
// conf contains global kafka and queuePoplator kafka configs
const conf = {
hosts: kafkaConfig.hosts,
Expand All @@ -33,6 +33,7 @@ class KafkaLogReader extends LogReader {
logId: `kafka_${qpKafkaConfig.logName}`, logger, extensions,
metricsProducer, metricsHandler });
this._kafkaConfig = conf;
this._extensionNames = extensionNames;
}

/**
Expand Down Expand Up @@ -67,6 +68,7 @@ class KafkaLogReader extends LogReader {
*/
getMetricLabels() {
return {
origin: this._extensionNames,
logName: 'kafka-log',
logId: this._kafkaConfig.logName,
};
Expand Down
4 changes: 3 additions & 1 deletion lib/queuePopulator/MongoLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const LogReader = require('./LogReader');
class MongoLogReader extends LogReader {
constructor(params) {
const { zkClient, kafkaConfig, zkConfig, mongoConfig,
logger, extensions, metricsProducer, metricsHandler } = params;
logger, extensions, extensionNames, metricsProducer, metricsHandler } = params;
logger.info('initializing mongo log reader',
{ method: 'MongoLogReader.constructor',
mongoConfig });
Expand All @@ -14,6 +14,7 @@ class MongoLogReader extends LogReader {
logId: `mongo_${mongoConfig.logName}`, logger, extensions,
metricsProducer, metricsHandler });
this._mongoConfig = mongoConfig;
this._extensionNames = extensionNames;
}

/**
Expand Down Expand Up @@ -44,6 +45,7 @@ class MongoLogReader extends LogReader {

getMetricLabels() {
return {
origin: this._extensionNames,
logName: 'mongo-log',
logId: this._mongoConfig.logName,
};
Expand Down
33 changes: 19 additions & 14 deletions lib/queuePopulator/QueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
startCircuitBreakerMetricsExport,
} = require('../CircuitBreaker');

// origin is the name of the workflow that triggered the processing
// of the log entries. The label can include multiple values when
// the queue populator is shared between multiple workflows (e.g.
// replication, lifecycle, notification).
const metricLabels = ['origin', 'logName', 'logId'];

/**
Expand All @@ -36,25 +40,25 @@
*/

const logReadOffsetMetric = ZenkoMetrics.createGauge({
name: 's3_replication_read_offset',
name: 's3_backbeat_populator_read_offset',
help: 'Current read offset of metadata journal',
labelNames: metricLabels,
});

const logSizeMetric = ZenkoMetrics.createGauge({
name: 's3_replication_log_size',
name: 's3_backbeat_populator_log_size',
help: 'Current size of metadata journal',
labelNames: metricLabels,
});

const logTimestamp = ZenkoMetrics.createGauge({
name: 's3_replication_log_timestamp',
name: 's3_backbeat_populator_log_timestamp',
help: 'Last timestamp read from the metadata journal',
labelNames: metricLabels,
});

const messageMetrics = ZenkoMetrics.createCounter({
name: 's3_replication_populator_messages_total',
name: 's3_backbeat_populator_messages_total',
help: 'Total number of Kafka messages produced by the queue populator',
labelNames: [...metricLabels, 'publishStatus'],
});
Expand All @@ -71,10 +75,6 @@
labelNames: metricLabels,
});

const defaultLabels = {
origin: 'replication',
};

const notificationEvent = ZenkoMetrics.createCounter({
name: 's3_notification_queue_populator_events_total',
help: 'Total number of oplog events processed by notification extension',
Expand All @@ -90,12 +90,12 @@
* @property {GaugeSet} logSize - Set the log size metric
*/
const metricsHandler = {
messages: wrapCounterInc(messageMetrics, defaultLabels),
objects: wrapCounterInc(objectMetrics, defaultLabels),
bytes: wrapCounterInc(byteMetrics, defaultLabels),
logReadOffset: wrapGaugeSet(logReadOffsetMetric, defaultLabels),
logSize: wrapGaugeSet(logSizeMetric, defaultLabels),
logTimestamp: wrapGaugeSet(logTimestamp, defaultLabels),
messages: wrapCounterInc(messageMetrics, {}),
objects: wrapCounterInc(objectMetrics, {}),
bytes: wrapCounterInc(byteMetrics, {}),
logReadOffset: wrapGaugeSet(logReadOffsetMetric, {}),
logSize: wrapGaugeSet(logSizeMetric, {}),
logTimestamp: wrapGaugeSet(logTimestamp, {}),
notifEvent: wrapCounterInc(notificationEvent, {}),
};

Expand Down Expand Up @@ -301,6 +301,7 @@
}

_setupLogSources() {
const extensionNames = this._loadedExtensions.join(',');

Check warning on line 304 in lib/queuePopulator/QueuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

lib/queuePopulator/QueuePopulator.js#L304

Added line #L304 was not covered by tests
switch (this.qpConfig.logSource) {
case 'bucketd':
// initialization of log source is deferred until the
Expand All @@ -317,6 +318,7 @@
mongoConfig: this.qpConfig.mongo,
logger: this.log,
extensions: this._extensions,
extensionNames,
metricsProducer: this._mProducer,
metricsHandler,
}),
Expand All @@ -334,6 +336,7 @@
),
logger: this.log,
extensions: this._extensions,
extensionNames,
metricsProducer: this._mProducer,
metricsHandler,
}),
Expand All @@ -347,6 +350,7 @@
dmdConfig: this.qpConfig.dmd,
logger: this.log,
extensions: this._extensions,
extensionNames,
metricsProducer: this._mProducer,
metricsHandler,
}),
Expand Down Expand Up @@ -385,6 +389,7 @@
raftId: token,
logger: this.log,
extensions: this._extensions,
extensionNames: this._loadedExtensions.join(','),
metricsProducer: this._mProducer,
metricsHandler,
}));
Expand Down
4 changes: 3 additions & 1 deletion lib/queuePopulator/RaftLogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const LogReader = require('./LogReader');
class RaftLogReader extends LogReader {
constructor(params) {
const { zkClient, kafkaConfig, bucketdConfig, httpsConfig,
raftId, logger, extensions, metricsProducer, metricsHandler } = params;
raftId, logger, extensions, extensionNames, metricsProducer, metricsHandler } = params;
const { host, port } = bucketdConfig;
logger.info('initializing raft log reader',
{ method: 'RaftLogReader.constructor',
Expand All @@ -27,6 +27,7 @@ class RaftLogReader extends LogReader {
super({ zkClient, kafkaConfig, logConsumer, logId: `raft_${raftId}`,
logger, extensions, metricsProducer, metricsHandler });
this.raftId = raftId;
this._extensionNames = extensionNames;
}

getLogInfo() {
Expand All @@ -35,6 +36,7 @@ class RaftLogReader extends LogReader {

getMetricLabels() {
return {
origin: this._extensionNames,
logName: 'raft-log',
logId: this.raftId,
};
Expand Down
2 changes: 1 addition & 1 deletion monitoring/replication/dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -1972,7 +1972,7 @@
"targets": [
{
"datasource": null,
"expr": "clamp_min(\n max(mongodb_mongod_replset_oplog_head_timestamp{\n namespace=\"${namespace}\", job=\"${job_mongod}\"})\n -\n min(s3_replication_log_timestamp{namespace=\"${namespace}\",\n job=\"${job_queue_populator}\"}),\n0)",
"expr": "clamp_min(\n max(mongodb_mongod_replset_oplog_head_timestamp{\n namespace=\"${namespace}\", job=\"${job_mongod}\"})\n -\n min(s3_backbeat_populator_log_timestamp{namespace=\"${namespace}\",\n job=\"${job_queue_populator}\"}),\n0)",
"format": "time_series",
"hide": false,
"instant": false,
Expand Down
2 changes: 1 addition & 1 deletion monitoring/replication/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ def up(component: str, expr: str = None, title: str = None, **kwargs):
' max(mongodb_mongod_replset_oplog_head_timestamp{',
' namespace="${namespace}", job="${job_mongod}"})',
' -',
' min(s3_replication_log_timestamp{namespace="${namespace}",',
' min(s3_backbeat_populator_log_timestamp{namespace="${namespace}",',
' job="${job_queue_populator}"}),',
'0)',
]),
Expand Down
67 changes: 67 additions & 0 deletions tests/unit/lib/queuePopulator/LogReader.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ const { errors } = require('arsenal');
const { Logger } = require('werelogs');

const LogReader = require('../../../../lib/queuePopulator/LogReader');
const KafkaLogReader = require('../../../../lib/queuePopulator/KafkaLogReader');
const BucketFileLogReader = require('../../../../lib/queuePopulator/BucketFileLogReader');
const RaftLogReader = require('../../../../lib/queuePopulator/RaftLogReader');
const MongoLogReader = require('../../../../lib/queuePopulator/MongoLogReader');


class MockLogConsumer {
Expand Down Expand Up @@ -505,4 +509,67 @@ describe('LogReader', () => {
});
});
});

describe('getMetricLabels', () => {
[{
name: 'KafkaLogReader',
Reader: KafkaLogReader,
config: {
kafkaConfig: {
hosts: 'localhost:9092',
},
qpKafkaConfig: {
logName: 'test-log',
},
},
logName: 'kafka-log',
}, {
name: 'BucketFileLogReader',
Reader: BucketFileLogReader,
config: {
dmdConfig: {
logName: 'test-log',
host: 'localhost',
port: 8000,
},
},
logName: 'bucket-file',
}, {
name: 'RaftLogReader',
Reader: RaftLogReader,
config: {
raftId: 'test-log',
bucketdConfig: {
host: 'localhost',
port: 8000,
},
},
logName: 'raft-log',
}, {
name: 'RaftLogReader',
Reader: MongoLogReader,
config: {
mongoConfig: {
logName: 'test-log',
host: 'localhost',
port: 8000,
},
},
logName: 'mongo-log',
}].forEach(params => {
it(`should return proper ${params.name} metrics labels`, () => {
const reader = new params.Reader({
...params.config,
logger: new Logger('test:LogReader'),
extensionNames: 'replication,lifecycle,notification',
});
const expectedLabels = {
logId: 'test-log',
logName: params.logName,
origin: 'replication,lifecycle,notification',
};
assert.deepStrictEqual(reader.getMetricLabels(), expectedLabels);
});
});
});
});
Loading