diff --git a/lib/queuePopulator/BucketFileLogReader.js b/lib/queuePopulator/BucketFileLogReader.js index dcf5b7dad..af98abe51 100644 --- a/lib/queuePopulator/BucketFileLogReader.js +++ b/lib/queuePopulator/BucketFileLogReader.js @@ -6,13 +6,14 @@ const LogReader = require('./LogReader'); class BucketFileLogReader extends LogReader { constructor(params) { const { zkClient, kafkaConfig, dmdConfig, logger, - extensions, metricsProducer, metricsHandler } = params; + extensions, extensionNames, metricsProducer, metricsHandler } = params; super({ zkClient, kafkaConfig, logConsumer: null, logId: `bucketFile_${dmdConfig.logName}`, logger, extensions, metricsProducer, metricsHandler }); this._dmdConfig = dmdConfig; this._log = logger; + this._extensionNames = extensionNames; this._log.info('initializing bucketfile log reader', { method: 'BucketFileLogReader.constructor', dmdConfig, @@ -45,6 +46,7 @@ class BucketFileLogReader extends LogReader { getMetricLabels() { return { + origin: this._extensionNames, logName: 'bucket-file', logId: this._dmdConfig.logName, }; diff --git a/lib/queuePopulator/KafkaLogReader.js b/lib/queuePopulator/KafkaLogReader.js index a5be6cae4..0d7637321 100644 --- a/lib/queuePopulator/KafkaLogReader.js +++ b/lib/queuePopulator/KafkaLogReader.js @@ -19,7 +19,7 @@ class KafkaLogReader extends LogReader { */ constructor(params) { const { zkClient, kafkaConfig, zkConfig, qpKafkaConfig, - logger, extensions, metricsProducer, metricsHandler } = params; + logger, extensions, extensionNames, metricsProducer, metricsHandler } = params; // conf contains global kafka and queuePoplator kafka configs const conf = { hosts: kafkaConfig.hosts, @@ -33,6 +33,7 @@ class KafkaLogReader extends LogReader { logId: `kafka_${qpKafkaConfig.logName}`, logger, extensions, metricsProducer, metricsHandler }); this._kafkaConfig = conf; + this._extensionNames = extensionNames; } /** @@ -67,6 +68,7 @@ class KafkaLogReader extends LogReader { */ getMetricLabels() { return { + origin: this._extensionNames, logName: 'kafka-log', logId: this._kafkaConfig.logName, }; diff --git a/lib/queuePopulator/MongoLogReader.js b/lib/queuePopulator/MongoLogReader.js index ac9a93568..e215214f2 100644 --- a/lib/queuePopulator/MongoLogReader.js +++ b/lib/queuePopulator/MongoLogReader.js @@ -5,7 +5,7 @@ const LogReader = require('./LogReader'); class MongoLogReader extends LogReader { constructor(params) { const { zkClient, kafkaConfig, zkConfig, mongoConfig, - logger, extensions, metricsProducer, metricsHandler } = params; + logger, extensions, extensionNames, metricsProducer, metricsHandler } = params; logger.info('initializing mongo log reader', { method: 'MongoLogReader.constructor', mongoConfig }); @@ -14,6 +14,7 @@ class MongoLogReader extends LogReader { logId: `mongo_${mongoConfig.logName}`, logger, extensions, metricsProducer, metricsHandler }); this._mongoConfig = mongoConfig; + this._extensionNames = extensionNames; } /** @@ -44,6 +45,7 @@ class MongoLogReader extends LogReader { getMetricLabels() { return { + origin: this._extensionNames, logName: 'mongo-log', logId: this._mongoConfig.logName, }; diff --git a/lib/queuePopulator/QueuePopulator.js b/lib/queuePopulator/QueuePopulator.js index 035145436..82fc90ace 100644 --- a/lib/queuePopulator/QueuePopulator.js +++ b/lib/queuePopulator/QueuePopulator.js @@ -23,6 +23,10 @@ const { startCircuitBreakerMetricsExport, } = require('../CircuitBreaker'); +// origin is the name of the workflow that triggered the processing +// of the log entries. The label can include multiple values when +// the queue populator is shared between multiple workflows (e.g. +// replication, lifecycle, notification). const metricLabels = ['origin', 'logName', 'logId']; /** @@ -36,25 +40,25 @@ const metricLabels = ['origin', 'logName', 'logId']; */ const logReadOffsetMetric = ZenkoMetrics.createGauge({ - name: 's3_replication_read_offset', + name: 's3_backbeat_populator_read_offset', help: 'Current read offset of metadata journal', labelNames: metricLabels, }); const logSizeMetric = ZenkoMetrics.createGauge({ - name: 's3_replication_log_size', + name: 's3_backbeat_populator_log_size', help: 'Current size of metadata journal', labelNames: metricLabels, }); const logTimestamp = ZenkoMetrics.createGauge({ - name: 's3_replication_log_timestamp', + name: 's3_backbeat_populator_log_timestamp', help: 'Last timestamp read from the metadata journal', labelNames: metricLabels, }); const messageMetrics = ZenkoMetrics.createCounter({ - name: 's3_replication_populator_messages_total', + name: 's3_backbeat_populator_messages_total', help: 'Total number of Kafka messages produced by the queue populator', labelNames: [...metricLabels, 'publishStatus'], }); @@ -71,10 +75,6 @@ const byteMetrics = ZenkoMetrics.createCounter({ labelNames: metricLabels, }); -const defaultLabels = { - origin: 'replication', -}; - const notificationEvent = ZenkoMetrics.createCounter({ name: 's3_notification_queue_populator_events_total', help: 'Total number of oplog events processed by notification extension', @@ -90,12 +90,12 @@ const notificationEvent = ZenkoMetrics.createCounter({ * @property {GaugeSet} logSize - Set the log size metric */ const metricsHandler = { - messages: wrapCounterInc(messageMetrics, defaultLabels), - objects: wrapCounterInc(objectMetrics, defaultLabels), - bytes: wrapCounterInc(byteMetrics, defaultLabels), - logReadOffset: wrapGaugeSet(logReadOffsetMetric, defaultLabels), - logSize: wrapGaugeSet(logSizeMetric, defaultLabels), - logTimestamp: wrapGaugeSet(logTimestamp, defaultLabels), + messages: wrapCounterInc(messageMetrics, {}), + objects: wrapCounterInc(objectMetrics, {}), + bytes: wrapCounterInc(byteMetrics, {}), + logReadOffset: wrapGaugeSet(logReadOffsetMetric, {}), + logSize: wrapGaugeSet(logSizeMetric, {}), + logTimestamp: wrapGaugeSet(logTimestamp, {}), notifEvent: wrapCounterInc(notificationEvent, {}), }; @@ -301,6 +301,7 @@ class QueuePopulator { } _setupLogSources() { + const extensionNames = this._loadedExtensions.join(','); switch (this.qpConfig.logSource) { case 'bucketd': // initialization of log source is deferred until the @@ -317,6 +318,7 @@ class QueuePopulator { mongoConfig: this.qpConfig.mongo, logger: this.log, extensions: this._extensions, + extensionNames, metricsProducer: this._mProducer, metricsHandler, }), @@ -334,6 +336,7 @@ class QueuePopulator { ), logger: this.log, extensions: this._extensions, + extensionNames, metricsProducer: this._mProducer, metricsHandler, }), @@ -347,6 +350,7 @@ class QueuePopulator { dmdConfig: this.qpConfig.dmd, logger: this.log, extensions: this._extensions, + extensionNames, metricsProducer: this._mProducer, metricsHandler, }), @@ -385,6 +389,7 @@ class QueuePopulator { raftId: token, logger: this.log, extensions: this._extensions, + extensionNames: this._loadedExtensions.join(','), metricsProducer: this._mProducer, metricsHandler, })); diff --git a/lib/queuePopulator/RaftLogReader.js b/lib/queuePopulator/RaftLogReader.js index 24061e037..a0622c9b9 100644 --- a/lib/queuePopulator/RaftLogReader.js +++ b/lib/queuePopulator/RaftLogReader.js @@ -7,7 +7,7 @@ const LogReader = require('./LogReader'); class RaftLogReader extends LogReader { constructor(params) { const { zkClient, kafkaConfig, bucketdConfig, httpsConfig, - raftId, logger, extensions, metricsProducer, metricsHandler } = params; + raftId, logger, extensions, extensionNames, metricsProducer, metricsHandler } = params; const { host, port } = bucketdConfig; logger.info('initializing raft log reader', { method: 'RaftLogReader.constructor', @@ -27,6 +27,7 @@ class RaftLogReader extends LogReader { super({ zkClient, kafkaConfig, logConsumer, logId: `raft_${raftId}`, logger, extensions, metricsProducer, metricsHandler }); this.raftId = raftId; + this._extensionNames = extensionNames; } getLogInfo() { @@ -35,6 +36,7 @@ class RaftLogReader extends LogReader { getMetricLabels() { return { + origin: this._extensionNames, logName: 'raft-log', logId: this.raftId, }; diff --git a/monitoring/replication/dashboard.json b/monitoring/replication/dashboard.json index 9a1ecc273..5d429542f 100644 --- a/monitoring/replication/dashboard.json +++ b/monitoring/replication/dashboard.json @@ -1972,7 +1972,7 @@ "targets": [ { "datasource": null, - "expr": "clamp_min(\n max(mongodb_mongod_replset_oplog_head_timestamp{\n namespace=\"${namespace}\", job=\"${job_mongod}\"})\n -\n min(s3_replication_log_timestamp{namespace=\"${namespace}\",\n job=\"${job_queue_populator}\"}),\n0)", + "expr": "clamp_min(\n max(mongodb_mongod_replset_oplog_head_timestamp{\n namespace=\"${namespace}\", job=\"${job_mongod}\"})\n -\n min(s3_backbeat_populator_log_timestamp{namespace=\"${namespace}\",\n job=\"${job_queue_populator}\"}),\n0)", "format": "time_series", "hide": false, "instant": false, diff --git a/monitoring/replication/dashboard.py b/monitoring/replication/dashboard.py index c92244e00..462b91182 100644 --- a/monitoring/replication/dashboard.py +++ b/monitoring/replication/dashboard.py @@ -462,7 +462,7 @@ def up(component: str, expr: str = None, title: str = None, **kwargs): ' max(mongodb_mongod_replset_oplog_head_timestamp{', ' namespace="${namespace}", job="${job_mongod}"})', ' -', - ' min(s3_replication_log_timestamp{namespace="${namespace}",', + ' min(s3_backbeat_populator_log_timestamp{namespace="${namespace}",', ' job="${job_queue_populator}"}),', '0)', ]), diff --git a/tests/unit/lib/queuePopulator/LogReader.spec.js b/tests/unit/lib/queuePopulator/LogReader.spec.js index 87b45a905..d541b0b73 100644 --- a/tests/unit/lib/queuePopulator/LogReader.spec.js +++ b/tests/unit/lib/queuePopulator/LogReader.spec.js @@ -9,6 +9,10 @@ const { errors } = require('arsenal'); const { Logger } = require('werelogs'); const LogReader = require('../../../../lib/queuePopulator/LogReader'); +const KafkaLogReader = require('../../../../lib/queuePopulator/KafkaLogReader'); +const BucketFileLogReader = require('../../../../lib/queuePopulator/BucketFileLogReader'); +const RaftLogReader = require('../../../../lib/queuePopulator/RaftLogReader'); +const MongoLogReader = require('../../../../lib/queuePopulator/MongoLogReader'); class MockLogConsumer { @@ -505,4 +509,67 @@ describe('LogReader', () => { }); }); }); + + describe('getMetricLabels', () => { + [{ + name: 'KafkaLogReader', + Reader: KafkaLogReader, + config: { + kafkaConfig: { + hosts: 'localhost:9092', + }, + qpKafkaConfig: { + logName: 'test-log', + }, + }, + logName: 'kafka-log', + }, { + name: 'BucketFileLogReader', + Reader: BucketFileLogReader, + config: { + dmdConfig: { + logName: 'test-log', + host: 'localhost', + port: 8000, + }, + }, + logName: 'bucket-file', + }, { + name: 'RaftLogReader', + Reader: RaftLogReader, + config: { + raftId: 'test-log', + bucketdConfig: { + host: 'localhost', + port: 8000, + }, + }, + logName: 'raft-log', + }, { + name: 'RaftLogReader', + Reader: MongoLogReader, + config: { + mongoConfig: { + logName: 'test-log', + host: 'localhost', + port: 8000, + }, + }, + logName: 'mongo-log', + }].forEach(params => { + it(`should return proper ${params.name} metrics labels`, () => { + const reader = new params.Reader({ + ...params.config, + logger: new Logger('test:LogReader'), + extensionNames: 'replication,lifecycle,notification', + }); + const expectedLabels = { + logId: 'test-log', + logName: params.logName, + origin: 'replication,lifecycle,notification', + }; + assert.deepStrictEqual(reader.getMetricLabels(), expectedLabels); + }); + }); + }); });