本文整理汇总了Java中org.apache.kafka.common.metrics.stats.Rate类的典型用法代码示例。如果您正苦于以下问题:Java Rate类的具体用法?Java Rate怎么用?Java Rate使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Rate类属于org.apache.kafka.common.metrics.stats包,在下文中一共展示了Rate类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: BufferPool
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
/**
* Create a new buffer pool
*
* @param memory The maximum amount of memory that this buffer pool can allocate
* @param poolableSize The buffer size to cache in the free list rather than deallocating
* @param metrics instance of Metrics
* @param time time instance
* @param metricGrpName logical group name for metrics
*/
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
this.poolableSize = poolableSize;
this.lock = new ReentrantLock();
this.free = new ArrayDeque<>();
this.waiters = new ArrayDeque<>();
this.totalMemory = memory;
this.availableMemory = memory;
this.metrics = metrics;
this.time = time;
this.waitTime = this.metrics.sensor(WAIT_TIME_SENSOR_NAME);
MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
metricGrpName,
"The fraction of time an appender waits for space allocation.");
this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:BufferPool.java
示例2: FetchManagerMetrics
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
this.metrics = metrics;
this.metricsRegistry = metricsRegistry;
this.bytesFetched = metrics.sensor("bytes-fetched");
this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeAvg), new Avg());
this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeMax), new Max());
this.bytesFetched.add(metrics.metricInstance(metricsRegistry.bytesConsumedRate), new Rate());
this.recordsFetched = metrics.sensor("records-fetched");
this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg), new Avg());
this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsConsumedRate), new Rate());
this.fetchLatency = metrics.sensor("fetch-latency");
this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyAvg), new Avg());
this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyMax), new Max());
this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchRequestRate), new Rate(new Count()));
this.recordsFetchLag = metrics.sensor("records-lag");
this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), new Max());
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:Fetcher.java
示例3: ConsumerCoordinatorMetrics
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
private ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
this.commitLatency = metrics.sensor("commit-latency");
this.commitLatency.add(metrics.metricName("commit-latency-avg",
this.metricGrpName,
"The average time taken for a commit request"), new Avg());
this.commitLatency.add(metrics.metricName("commit-latency-max",
this.metricGrpName,
"The max time taken for a commit request"), new Max());
this.commitLatency.add(metrics.metricName("commit-rate",
this.metricGrpName,
"The number of commit calls per second"), new Rate(new Count()));
Measurable numParts =
new Measurable() {
public double measure(MetricConfig config, long now) {
return subscriptions.assignedPartitions().size();
}
};
metrics.addMetric(metrics.metricName("assigned-partitions",
this.metricGrpName,
"The number of partitions currently assigned to this consumer"), numParts);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:ConsumerCoordinator.java
示例4: BufferPool
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
/**
* Create a new buffer pool
*
* @param memory The maximum amount of memory that this buffer pool can allocate
* @param poolableSize The buffer size to cache in the free list rather than deallocating
* @param metrics instance of Metrics
* @param time time instance
* @param metricGrpName logical group name for metrics
*/
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
this.poolableSize = poolableSize;
this.lock = new ReentrantLock();
this.free = new ArrayDeque<ByteBuffer>();
this.waiters = new ArrayDeque<Condition>();
this.totalMemory = memory;
this.availableMemory = memory;
this.metrics = metrics;
this.time = time;
this.waitTime = this.metrics.sensor("bufferpool-wait-time");
MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
metricGrpName,
"The fraction of time an appender waits for space allocation.");
this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
}
开发者ID:txazo,项目名称:kafka,代码行数:25,代码来源:BufferPool.java
示例5: ConsumerCoordinatorMetrics
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
this.commitLatency = metrics.sensor("commit-latency");
this.commitLatency.add(metrics.metricName("commit-latency-avg",
this.metricGrpName,
"The average time taken for a commit request"), new Avg());
this.commitLatency.add(metrics.metricName("commit-latency-max",
this.metricGrpName,
"The max time taken for a commit request"), new Max());
this.commitLatency.add(metrics.metricName("commit-rate",
this.metricGrpName,
"The number of commit calls per second"), new Rate(new Count()));
Measurable numParts =
new Measurable() {
public double measure(MetricConfig config, long now) {
return subscriptions.assignedPartitions().size();
}
};
metrics.addMetric(metrics.metricName("assigned-partitions",
this.metricGrpName,
"The number of partitions currently assigned to this consumer"), numParts);
}
开发者ID:txazo,项目名称:kafka,代码行数:26,代码来源:ConsumerCoordinator.java
示例6: StreamsMetricsThreadImpl
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
StreamsMetricsThreadImpl(final Metrics metrics, final String groupName, final String prefix, final Map<String, String> tags) {
super(metrics, groupName, tags);
commitTimeSensor = metrics.sensor(prefix + ".commit-latency", Sensor.RecordingLevel.INFO);
commitTimeSensor.add(metrics.metricName("commit-latency-avg", this.groupName, "The average commit time in ms", this.tags), new Avg());
commitTimeSensor.add(metrics.metricName("commit-latency-max", this.groupName, "The maximum commit time in ms", this.tags), new Max());
commitTimeSensor.add(metrics.metricName("commit-rate", this.groupName, "The average per-second number of commit calls", this.tags), new Rate(new Count()));
pollTimeSensor = metrics.sensor(prefix + ".poll-latency", Sensor.RecordingLevel.INFO);
pollTimeSensor.add(metrics.metricName("poll-latency-avg", this.groupName, "The average poll time in ms", this.tags), new Avg());
pollTimeSensor.add(metrics.metricName("poll-latency-max", this.groupName, "The maximum poll time in ms", this.tags), new Max());
pollTimeSensor.add(metrics.metricName("poll-rate", this.groupName, "The average per-second number of record-poll calls", this.tags), new Rate(new Count()));
processTimeSensor = metrics.sensor(prefix + ".process-latency", Sensor.RecordingLevel.INFO);
processTimeSensor.add(metrics.metricName("process-latency-avg", this.groupName, "The average process time in ms", this.tags), new Avg());
processTimeSensor.add(metrics.metricName("process-latency-max", this.groupName, "The maximum process time in ms", this.tags), new Max());
processTimeSensor.add(metrics.metricName("process-rate", this.groupName, "The average per-second number of process calls", this.tags), new Rate(new Count()));
punctuateTimeSensor = metrics.sensor(prefix + ".punctuate-latency", Sensor.RecordingLevel.INFO);
punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", this.groupName, "The average punctuate time in ms", this.tags), new Avg());
punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", this.groupName, "The maximum punctuate time in ms", this.tags), new Max());
punctuateTimeSensor.add(metrics.metricName("punctuate-rate", this.groupName, "The average per-second number of punctuate calls", this.tags), new Rate(new Count()));
taskCreatedSensor = metrics.sensor(prefix + ".task-created", Sensor.RecordingLevel.INFO);
taskCreatedSensor.add(metrics.metricName("task-created-rate", this.groupName, "The average per-second number of newly created tasks", this.tags), new Rate(new Count()));
tasksClosedSensor = metrics.sensor(prefix + ".task-closed", Sensor.RecordingLevel.INFO);
tasksClosedSensor.add(metrics.metricName("task-closed-rate", this.groupName, "The average per-second number of closed tasks", this.tags), new Rate(new Count()));
skippedRecordsSensor = metrics.sensor(prefix + ".skipped-records");
skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", this.groupName, "The average per-second number of skipped records.", this.tags), new Rate(new Sum()));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:33,代码来源:StreamThread.java
示例7: maybeRegisterTopicMetrics
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
private void maybeRegisterTopicMetrics(String topic) {
// if one sensor of the metrics has been registered for the topic,
// then all other sensors should have been registered; and vice versa
String topicRecordsCountName = "topic." + topic + ".records-per-batch";
Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
if (topicRecordCount == null) {
Map<String, String> metricTags = Collections.singletonMap("topic", topic);
String metricGrpName = "producer-topic-metrics";
topicRecordCount = this.metrics.sensor(topicRecordsCountName);
MetricName m = this.metrics.metricName("record-send-rate", metricGrpName, metricTags);
topicRecordCount.add(m, new Rate());
String topicByteRateName = "topic." + topic + ".bytes";
Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
m = this.metrics.metricName("byte-rate", metricGrpName, metricTags);
topicByteRate.add(m, new Rate());
String topicCompressionRateName = "topic." + topic + ".compression-rate";
Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName);
m = this.metrics.metricName("compression-rate", metricGrpName, metricTags);
topicCompressionRate.add(m, new Avg());
String topicRetryName = "topic." + topic + ".record-retries";
Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
m = this.metrics.metricName("record-retry-rate", metricGrpName, metricTags);
topicRetrySensor.add(m, new Rate());
String topicErrorName = "topic." + topic + ".record-errors";
Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
m = this.metrics.metricName("record-error-rate", metricGrpName, metricTags);
topicErrorSensor.add(m, new Rate());
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:35,代码来源:Sender.java
示例8: maybeRegisterConnectionMetrics
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
public void maybeRegisterConnectionMetrics(String connectionId) {
if (!connectionId.isEmpty() && metricsPerConnection) {
// if one sensor of the metrics has been registered for the connection,
// then all other sensors should have been registered; and vice versa
String nodeRequestName = "node-" + connectionId + ".bytes-sent";
Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
if (nodeRequest == null) {
String metricGrpName = metricGrpPrefix + "-node-metrics";
Map<String, String> tags = new LinkedHashMap<>(metricTags);
tags.put("node-id", "node-" + connectionId);
nodeRequest = sensor(nodeRequestName);
MetricName metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, tags);
nodeRequest.add(metricName, new Rate());
metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
nodeRequest.add(metricName, new Rate(new Count()));
metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
nodeRequest.add(metricName, new Avg());
metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
nodeRequest.add(metricName, new Max());
String nodeResponseName = "node-" + connectionId + ".bytes-received";
Sensor nodeResponse = sensor(nodeResponseName);
metricName = metrics.metricName("incoming-byte-rate", metricGrpName, tags);
nodeResponse.add(metricName, new Rate());
metricName = metrics.metricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
nodeResponse.add(metricName, new Rate(new Count()));
String nodeTimeName = "node-" + connectionId + ".latency";
Sensor nodeRequestTime = sensor(nodeTimeName);
metricName = metrics.metricName("request-latency-avg", metricGrpName, tags);
nodeRequestTime.add(metricName, new Avg());
metricName = metrics.metricName("request-latency-max", metricGrpName, tags);
nodeRequestTime.add(metricName, new Max());
}
}
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:39,代码来源:Selector.java
示例9: testCleanupMemoryAvailabilityOnMetricsException
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
@PrepareForTest({Sensor.class, MetricName.class})
@Test
public void testCleanupMemoryAvailabilityOnMetricsException() throws Exception {
Metrics mockedMetrics = createNiceMock(Metrics.class);
Sensor mockedSensor = createNiceMock(Sensor.class);
MetricName metricName = createNiceMock(MetricName.class);
expect(mockedMetrics.sensor(BufferPool.WAIT_TIME_SENSOR_NAME)).andReturn(mockedSensor);
mockedSensor.record(anyDouble(), anyLong());
expectLastCall().andThrow(new OutOfMemoryError());
expect(mockedMetrics.metricName(anyString(), eq(metricGroup), anyString())).andReturn(metricName);
mockedSensor.add(metricName, new Rate(TimeUnit.NANOSECONDS));
replay(mockedMetrics, mockedSensor, metricName);
BufferPool bufferPool = new BufferPool(2, 1, mockedMetrics, time, metricGroup);
bufferPool.allocate(1, 0);
try {
bufferPool.allocate(2, 1000);
assertTrue("Expected oom.", false);
} catch (OutOfMemoryError expected) {
}
assertEquals(1, bufferPool.availableMemory());
assertEquals(0, bufferPool.queued());
//This shouldn't timeout
bufferPool.allocate(1, 0);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:29,代码来源:BufferPoolTest.java
示例10: testSampledStatInitialValue
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
@Test
public void testSampledStatInitialValue() {
// initialValue from each SampledStat is set as the initialValue on its Sample.
// The only way to test the initialValue is to infer it by having a SampledStat
// with expired Stats, because their values are reset to the initial values.
// Most implementations of combine on SampledStat end up returning the default
// value, so we can use this. This doesn't work for Percentiles though.
// This test looks a lot like testOldDataHasNoEffect because it's the same
// flow that leads to this state.
Max max = new Max();
Min min = new Min();
Avg avg = new Avg();
Count count = new Count();
Rate.SampledTotal sampledTotal = new Rate.SampledTotal();
long windowMs = 100;
int samples = 2;
MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples);
max.record(config, 50, time.milliseconds());
min.record(config, 50, time.milliseconds());
avg.record(config, 50, time.milliseconds());
count.record(config, 50, time.milliseconds());
sampledTotal.record(config, 50, time.milliseconds());
time.sleep(samples * windowMs);
assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.milliseconds()), EPS);
assertEquals(Double.MAX_VALUE, min.measure(config, time.milliseconds()), EPS);
assertEquals(0.0, avg.measure(config, time.milliseconds()), EPS);
assertEquals(0, count.measure(config, time.milliseconds()), EPS);
assertEquals(0.0, sampledTotal.measure(config, time.milliseconds()), EPS);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:32,代码来源:MetricsTest.java
示例11: testRateWindowing
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
@Test
public void testRateWindowing() throws Exception {
// Use the default time window. Set 3 samples
MetricConfig cfg = new MetricConfig().samples(3);
Sensor s = metrics.sensor("test.sensor", cfg);
s.add(metrics.metricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
int sum = 0;
int count = cfg.samples() - 1;
// Advance 1 window after every record
for (int i = 0; i < count; i++) {
s.record(100);
sum += 100;
time.sleep(cfg.timeWindowMs());
}
// Sleep for half the window.
time.sleep(cfg.timeWindowMs() / 2);
// prior to any time passing
double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0;
KafkaMetric km = metrics.metrics().get(metrics.metricName("test.rate", "grp1"));
assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, km.value(), EPS);
assertEquals("Elapsed Time = 75 seconds", elapsedSecs,
((Rate) km.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:28,代码来源:MetricsTest.java
示例12: maybeRegisterTopicMetrics
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
public void maybeRegisterTopicMetrics(String topic) {
// if one sensor of the metrics has been registered for the topic,
// then all other sensors should have been registered; and vice versa
String topicRecordsCountName = "topic." + topic + ".records-per-batch";
Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
if (topicRecordCount == null) {
Map<String, String> metricTags = new LinkedHashMap<String, String>();
metricTags.put("topic", topic);
String metricGrpName = "producer-topic-metrics";
topicRecordCount = this.metrics.sensor(topicRecordsCountName);
MetricName m = this.metrics.metricName("record-send-rate", metricGrpName, metricTags);
topicRecordCount.add(m, new Rate());
String topicByteRateName = "topic." + topic + ".bytes";
Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
m = this.metrics.metricName("byte-rate", metricGrpName, metricTags);
topicByteRate.add(m, new Rate());
String topicCompressionRateName = "topic." + topic + ".compression-rate";
Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName);
m = this.metrics.metricName("compression-rate", metricGrpName, metricTags);
topicCompressionRate.add(m, new Avg());
String topicRetryName = "topic." + topic + ".record-retries";
Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
m = this.metrics.metricName("record-retry-rate", metricGrpName, metricTags);
topicRetrySensor.add(m, new Rate());
String topicErrorName = "topic." + topic + ".record-errors";
Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
m = this.metrics.metricName("record-error-rate", metricGrpName, metricTags);
topicErrorSensor.add(m, new Rate());
}
}
开发者ID:txazo,项目名称:kafka,代码行数:36,代码来源:Sender.java
示例13: recordTopicFetchMetrics
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
public void recordTopicFetchMetrics(String topic, int bytes, int records) {
Map<String, String> metricTags = new HashMap<>();
metricTags.put("topic", topic.replace(".", "_"));
// record bytes fetched
String name = "topic." + topic + ".bytes-fetched";
Sensor bytesFetched = this.metrics.getSensor(name);
if (bytesFetched == null) {
bytesFetched = this.metrics.sensor(name);
bytesFetched.add(this.metrics.metricName("fetch-size-avg",
this.metricGrpName,
"The average number of bytes fetched per request for topic " + topic,
metricTags), new Avg());
bytesFetched.add(this.metrics.metricName("fetch-size-max",
this.metricGrpName,
"The maximum number of bytes fetched per request for topic " + topic,
metricTags), new Max());
bytesFetched.add(this.metrics.metricName("bytes-consumed-rate",
this.metricGrpName,
"The average number of bytes consumed per second for topic " + topic,
metricTags), new Rate());
}
bytesFetched.record(bytes);
// record records fetched
name = "topic." + topic + ".records-fetched";
Sensor recordsFetched = this.metrics.getSensor(name);
if (recordsFetched == null) {
recordsFetched = this.metrics.sensor(name);
recordsFetched.add(this.metrics.metricName("records-per-request-avg",
this.metricGrpName,
"The average number of records in each request for topic " + topic,
metricTags), new Avg());
recordsFetched.add(this.metrics.metricName("records-consumed-rate",
this.metricGrpName,
"The average number of records consumed per second for topic " + topic,
metricTags), new Rate());
}
recordsFetched.record(records);
}
开发者ID:txazo,项目名称:kafka,代码行数:41,代码来源:Fetcher.java
示例14: addPartitionSensors
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
void addPartitionSensors(int partition) {
Sensor recordsProducedSensor = metrics.sensor("records-produced-partition-" + partition);
recordsProducedSensor.add(new MetricName("records-produced-rate-partition-" + partition, METRIC_GROUP_NAME,
"The average number of records per second that are produced to this partition", _tags), new Rate());
_recordsProducedPerPartition.put(partition, recordsProducedSensor);
Sensor errorsSensor = metrics.sensor("produce-error-partition-" + partition);
errorsSensor.add(new MetricName("produce-error-rate-partition-" + partition, METRIC_GROUP_NAME,
"The average number of errors per second when producing to this partition", _tags), new Rate());
_produceErrorPerPartition.put(partition, errorsSensor);
}
开发者ID:linkedin,项目名称:kafka-monitor,代码行数:12,代码来源:ProduceService.java
示例15: buildSensors
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
private List<TopicSensors.SensorMetric<ProducerRecord>> buildSensors(String key) {
List<TopicSensors.SensorMetric<ProducerRecord>> sensors = new ArrayList<>();
// Note: synchronized due to metrics registry not handling concurrent add/check-exists activity in a reliable way
synchronized (metrics) {
addSensor(key, "messages-per-sec", new Rate(), sensors, false);
addSensor(key, "total-messages", new Total(), sensors, false);
addSensor(key, "failed-messages", new Total(), sensors, true);
addSensor(key, "failed-messages-per-sec", new Rate(), sensors, true);
}
return sensors;
}
开发者ID:confluentinc,项目名称:ksql,代码行数:13,代码来源:ProducerCollector.java
示例16: buildSensors
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
private List<TopicSensors.SensorMetric<ConsumerRecord>> buildSensors(String key) {
List<TopicSensors.SensorMetric<ConsumerRecord>> sensors = new ArrayList<>();
// Note: synchronized due to metrics registry not handling concurrent add/check-exists activity in a reliable way
synchronized (this.metrics) {
addSensor(key, "messages-per-sec", new Rate(), sensors, false);
addSensor(key, "c-total-messages", new Total(), sensors, false);
addSensor(key, "c-failed-messages", new Total(), sensors, true);
addSensor(key, "failed-messages-per-sec", new Rate(), sensors, true);
}
return sensors;
}
开发者ID:confluentinc,项目名称:ksql,代码行数:14,代码来源:ConsumerCollector.java
示例17: addThroughputMetrics
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
private void addThroughputMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-rate", groupNameFromScope(scopeName),
"The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:5,代码来源:StreamsMetricsImpl.java
示例18: GroupCoordinatorMetrics
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
this.heartbeatLatency = metrics.sensor("heartbeat-latency");
this.heartbeatLatency.add(metrics.metricName("heartbeat-response-time-max",
this.metricGrpName,
"The max time taken to receive a response to a heartbeat request"), new Max());
this.heartbeatLatency.add(metrics.metricName("heartbeat-rate",
this.metricGrpName,
"The average number of heartbeats per second"), new Rate(new Count()));
this.joinLatency = metrics.sensor("join-latency");
this.joinLatency.add(metrics.metricName("join-time-avg",
this.metricGrpName,
"The average time taken for a group rejoin"), new Avg());
this.joinLatency.add(metrics.metricName("join-time-max",
this.metricGrpName,
"The max time taken for a group rejoin"), new Max());
this.joinLatency.add(metrics.metricName("join-rate",
this.metricGrpName,
"The number of group joins per second"), new Rate(new Count()));
this.syncLatency = metrics.sensor("sync-latency");
this.syncLatency.add(metrics.metricName("sync-time-avg",
this.metricGrpName,
"The average time taken for a group sync"), new Avg());
this.syncLatency.add(metrics.metricName("sync-time-max",
this.metricGrpName,
"The max time taken for a group sync"), new Max());
this.syncLatency.add(metrics.metricName("sync-rate",
this.metricGrpName,
"The number of group syncs per second"), new Rate(new Count()));
Measurable lastHeartbeat =
new Measurable() {
public double measure(MetricConfig config, long now) {
return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
}
};
metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago",
this.metricGrpName,
"The number of seconds since the last controller heartbeat was sent"),
lastHeartbeat);
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:45,代码来源:AbstractCoordinator.java
示例19: SelectorMetrics
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
public SelectorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
this.metrics = metrics;
this.metricGrpPrefix = metricGrpPrefix;
this.metricTags = metricTags;
this.metricsPerConnection = metricsPerConnection;
String metricGrpName = metricGrpPrefix + "-metrics";
StringBuilder tagsSuffix = new StringBuilder();
for (Map.Entry<String, String> tag: metricTags.entrySet()) {
tagsSuffix.append(tag.getKey());
tagsSuffix.append("-");
tagsSuffix.append(tag.getValue());
}
// 监控连接关闭,使用rate记录每秒连接关闭数
this.connectionClosed = sensor("connections-closed:" + tagsSuffix.toString());
MetricName metricName = metrics.metricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
this.connectionClosed.add(metricName, new Rate());
// 监控连接创建,使用rate记录每秒连接创建数
this.connectionCreated = sensor("connections-created:" + tagsSuffix.toString());
metricName = metrics.metricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
this.connectionCreated.add(metricName, new Rate());
// 监控网络操作数,使用rate记录每秒钟所有连接上执行读写操作总数
this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix.toString());
metricName = metrics.metricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
bytesTransferred.add(metricName, new Rate(new Count()));
// 监控发送请求相关指标
this.bytesSent = sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
this.bytesSent.add(metricName, new Rate());
metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
this.bytesSent.add(metricName, new Rate(new Count()));
metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
this.bytesSent.add(metricName, new Avg());
metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
this.bytesSent.add(metricName, new Max());
// 监控接受请求的相关指标
this.bytesReceived = sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
metricName = metrics.metricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
this.bytesReceived.add(metricName, new Rate());
metricName = metrics.metricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
this.bytesReceived.add(metricName, new Rate(new Count()));
// 监控select方法指标
this.selectTime = sensor("select-time:" + tagsSuffix.toString());
metricName = metrics.metricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
this.selectTime.add(metricName, new Rate(new Count()));
metricName = metrics.metricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
this.selectTime.add(metricName, new Avg());
metricName = metrics.metricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
// 监控select方法的相关指标
this.ioTime = sensor("io-time:" + tagsSuffix.toString());
metricName = metrics.metricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
this.ioTime.add(metricName, new Avg());
metricName = metrics.metricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
metricName = metrics.metricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
topLevelMetricNames.add(metricName);
this.metrics.addMetric(metricName, new Measurable() {
public double measure(MetricConfig config, long now) {
return channels.size();
}
});
}
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:71,代码来源:Selector.java
示例20: testSimpleStats
import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
@Test
public void testSimpleStats() throws Exception {
ConstantMeasurable measurable = new ConstantMeasurable();
metrics.addMetric(metrics.metricName("direct.measurable", "grp1", "The fraction of time an appender waits for space allocation."), measurable);
Sensor s = metrics.sensor("test.sensor");
s.add(metrics.metricName("test.avg", "grp1"), new Avg());
s.add(metrics.metricName("test.max", "grp1"), new Max());
s.add(metrics.metricName("test.min", "grp1"), new Min());
s.add(metrics.metricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
s.add(metrics.metricName("test.occurences", "grp1"), new Rate(TimeUnit.SECONDS, new Count()));
s.add(metrics.metricName("test.count", "grp1"), new Count());
s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT,
new Percentile(metrics.metricName("test.median", "grp1"), 50.0),
new Percentile(metrics.metricName("test.perc99_9", "grp1"), 99.9)));
Sensor s2 = metrics.sensor("test.sensor2");
s2.add(metrics.metricName("s2.total", "grp1"), new Total());
s2.record(5.0);
int sum = 0;
int count = 10;
for (int i = 0; i < count; i++) {
s.record(i);
sum += i;
}
// prior to any time passing
double elapsedSecs = (config.timeWindowMs() * (config.samples() - 1)) / 1000.0;
assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs), count / elapsedSecs,
metrics.metrics().get(metrics.metricName("test.occurences", "grp1")).value(), EPS);
// pretend 2 seconds passed...
long sleepTimeMs = 2;
time.sleep(sleepTimeMs * 1000);
elapsedSecs += sleepTimeMs;
assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(metrics.metricName("s2.total", "grp1")).value(), EPS);
assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(metrics.metricName("test.avg", "grp1")).value(), EPS);
assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(metrics.metricName("test.max", "grp1")).value(), EPS);
assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(metrics.metricName(&
|
请发表评论