• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java Rate类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.common.metrics.stats.Rate的典型用法代码示例。如果您正苦于以下问题:Java Rate类的具体用法?Java Rate怎么用?Java Rate使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Rate类属于org.apache.kafka.common.metrics.stats包,在下文中一共展示了Rate类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: BufferPool

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
/**
 * Create a new buffer pool
 *
 * @param memory The maximum amount of memory that this buffer pool can allocate
 * @param poolableSize The buffer size to cache in the free list rather than deallocating
 * @param metrics instance of Metrics
 * @param time time instance
 * @param metricGrpName logical group name for metrics
 */
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
    this.poolableSize = poolableSize;
    this.lock = new ReentrantLock();
    this.free = new ArrayDeque<>();
    this.waiters = new ArrayDeque<>();
    this.totalMemory = memory;
    this.availableMemory = memory;
    this.metrics = metrics;
    this.time = time;
    this.waitTime = this.metrics.sensor(WAIT_TIME_SENSOR_NAME);
    MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
                                               metricGrpName,
                                               "The fraction of time an appender waits for space allocation.");
    this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:BufferPool.java


示例2: FetchManagerMetrics

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
    this.metrics = metrics;
    this.metricsRegistry = metricsRegistry;

    this.bytesFetched = metrics.sensor("bytes-fetched");
    this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeAvg), new Avg());
    this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeMax), new Max());
    this.bytesFetched.add(metrics.metricInstance(metricsRegistry.bytesConsumedRate), new Rate());

    this.recordsFetched = metrics.sensor("records-fetched");
    this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg), new Avg());
    this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsConsumedRate), new Rate());

    this.fetchLatency = metrics.sensor("fetch-latency");
    this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyAvg), new Avg());
    this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyMax), new Max());
    this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchRequestRate), new Rate(new Count()));

    this.recordsFetchLag = metrics.sensor("records-lag");
    this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), new Max());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:Fetcher.java


示例3: ConsumerCoordinatorMetrics

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
private ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
    this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";

    this.commitLatency = metrics.sensor("commit-latency");
    this.commitLatency.add(metrics.metricName("commit-latency-avg",
        this.metricGrpName,
        "The average time taken for a commit request"), new Avg());
    this.commitLatency.add(metrics.metricName("commit-latency-max",
        this.metricGrpName,
        "The max time taken for a commit request"), new Max());
    this.commitLatency.add(metrics.metricName("commit-rate",
        this.metricGrpName,
        "The number of commit calls per second"), new Rate(new Count()));

    Measurable numParts =
        new Measurable() {
            public double measure(MetricConfig config, long now) {
                return subscriptions.assignedPartitions().size();
            }
        };
    metrics.addMetric(metrics.metricName("assigned-partitions",
        this.metricGrpName,
        "The number of partitions currently assigned to this consumer"), numParts);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:ConsumerCoordinator.java


示例4: BufferPool

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
/**
 * Create a new buffer pool
 * 
 * @param memory The maximum amount of memory that this buffer pool can allocate
 * @param poolableSize The buffer size to cache in the free list rather than deallocating
 * @param metrics instance of Metrics
 * @param time time instance
 * @param metricGrpName logical group name for metrics
 */
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
    this.poolableSize = poolableSize;
    this.lock = new ReentrantLock();
    this.free = new ArrayDeque<ByteBuffer>();
    this.waiters = new ArrayDeque<Condition>();
    this.totalMemory = memory;
    this.availableMemory = memory;
    this.metrics = metrics;
    this.time = time;
    this.waitTime = this.metrics.sensor("bufferpool-wait-time");
    MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
                                               metricGrpName,
                                               "The fraction of time an appender waits for space allocation.");
    this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
}
 
开发者ID:txazo,项目名称:kafka,代码行数:25,代码来源:BufferPool.java


示例5: ConsumerCoordinatorMetrics

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
    this.metrics = metrics;
    this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";

    this.commitLatency = metrics.sensor("commit-latency");
    this.commitLatency.add(metrics.metricName("commit-latency-avg",
        this.metricGrpName,
        "The average time taken for a commit request"), new Avg());
    this.commitLatency.add(metrics.metricName("commit-latency-max",
        this.metricGrpName,
        "The max time taken for a commit request"), new Max());
    this.commitLatency.add(metrics.metricName("commit-rate",
        this.metricGrpName,
        "The number of commit calls per second"), new Rate(new Count()));

    Measurable numParts =
        new Measurable() {
            public double measure(MetricConfig config, long now) {
                return subscriptions.assignedPartitions().size();
            }
        };
    metrics.addMetric(metrics.metricName("assigned-partitions",
        this.metricGrpName,
        "The number of partitions currently assigned to this consumer"), numParts);
}
 
开发者ID:txazo,项目名称:kafka,代码行数:26,代码来源:ConsumerCoordinator.java


示例6: StreamsMetricsThreadImpl

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
StreamsMetricsThreadImpl(final Metrics metrics, final String groupName, final String prefix, final Map<String, String> tags) {
    super(metrics, groupName, tags);
    commitTimeSensor = metrics.sensor(prefix + ".commit-latency", Sensor.RecordingLevel.INFO);
    commitTimeSensor.add(metrics.metricName("commit-latency-avg", this.groupName, "The average commit time in ms", this.tags), new Avg());
    commitTimeSensor.add(metrics.metricName("commit-latency-max", this.groupName, "The maximum commit time in ms", this.tags), new Max());
    commitTimeSensor.add(metrics.metricName("commit-rate", this.groupName, "The average per-second number of commit calls", this.tags), new Rate(new Count()));

    pollTimeSensor = metrics.sensor(prefix + ".poll-latency", Sensor.RecordingLevel.INFO);
    pollTimeSensor.add(metrics.metricName("poll-latency-avg", this.groupName, "The average poll time in ms", this.tags), new Avg());
    pollTimeSensor.add(metrics.metricName("poll-latency-max", this.groupName, "The maximum poll time in ms", this.tags), new Max());
    pollTimeSensor.add(metrics.metricName("poll-rate", this.groupName, "The average per-second number of record-poll calls", this.tags), new Rate(new Count()));

    processTimeSensor = metrics.sensor(prefix + ".process-latency", Sensor.RecordingLevel.INFO);
    processTimeSensor.add(metrics.metricName("process-latency-avg", this.groupName, "The average process time in ms", this.tags), new Avg());
    processTimeSensor.add(metrics.metricName("process-latency-max", this.groupName, "The maximum process time in ms", this.tags), new Max());
    processTimeSensor.add(metrics.metricName("process-rate", this.groupName, "The average per-second number of process calls", this.tags), new Rate(new Count()));

    punctuateTimeSensor = metrics.sensor(prefix + ".punctuate-latency", Sensor.RecordingLevel.INFO);
    punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", this.groupName, "The average punctuate time in ms", this.tags), new Avg());
    punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", this.groupName, "The maximum punctuate time in ms", this.tags), new Max());
    punctuateTimeSensor.add(metrics.metricName("punctuate-rate", this.groupName, "The average per-second number of punctuate calls", this.tags), new Rate(new Count()));

    taskCreatedSensor = metrics.sensor(prefix + ".task-created", Sensor.RecordingLevel.INFO);
    taskCreatedSensor.add(metrics.metricName("task-created-rate", this.groupName, "The average per-second number of newly created tasks", this.tags), new Rate(new Count()));

    tasksClosedSensor = metrics.sensor(prefix + ".task-closed", Sensor.RecordingLevel.INFO);
    tasksClosedSensor.add(metrics.metricName("task-closed-rate", this.groupName, "The average per-second number of closed tasks", this.tags), new Rate(new Count()));

    skippedRecordsSensor = metrics.sensor(prefix + ".skipped-records");
    skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", this.groupName, "The average per-second number of skipped records.", this.tags), new Rate(new Sum()));

}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:33,代码来源:StreamThread.java


示例7: maybeRegisterTopicMetrics

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
private void maybeRegisterTopicMetrics(String topic) {
    // if one sensor of the metrics has been registered for the topic,
    // then all other sensors should have been registered; and vice versa
    String topicRecordsCountName = "topic." + topic + ".records-per-batch";
    Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
    if (topicRecordCount == null) {
        Map<String, String> metricTags = Collections.singletonMap("topic", topic);
        String metricGrpName = "producer-topic-metrics";

        topicRecordCount = this.metrics.sensor(topicRecordsCountName);
        MetricName m = this.metrics.metricName("record-send-rate", metricGrpName, metricTags);
        topicRecordCount.add(m, new Rate());

        String topicByteRateName = "topic." + topic + ".bytes";
        Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
        m = this.metrics.metricName("byte-rate", metricGrpName, metricTags);
        topicByteRate.add(m, new Rate());

        String topicCompressionRateName = "topic." + topic + ".compression-rate";
        Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName);
        m = this.metrics.metricName("compression-rate", metricGrpName, metricTags);
        topicCompressionRate.add(m, new Avg());

        String topicRetryName = "topic." + topic + ".record-retries";
        Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
        m = this.metrics.metricName("record-retry-rate", metricGrpName, metricTags);
        topicRetrySensor.add(m, new Rate());

        String topicErrorName = "topic." + topic + ".record-errors";
        Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
        m = this.metrics.metricName("record-error-rate", metricGrpName, metricTags);
        topicErrorSensor.add(m, new Rate());
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:35,代码来源:Sender.java


示例8: maybeRegisterConnectionMetrics

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
public void maybeRegisterConnectionMetrics(String connectionId) {
    if (!connectionId.isEmpty() && metricsPerConnection) {
        // if one sensor of the metrics has been registered for the connection,
        // then all other sensors should have been registered; and vice versa
        String nodeRequestName = "node-" + connectionId + ".bytes-sent";
        Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
        if (nodeRequest == null) {
            String metricGrpName = metricGrpPrefix + "-node-metrics";

            Map<String, String> tags = new LinkedHashMap<>(metricTags);
            tags.put("node-id", "node-" + connectionId);

            nodeRequest = sensor(nodeRequestName);
            MetricName metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, tags);
            nodeRequest.add(metricName, new Rate());
            metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
            nodeRequest.add(metricName, new Rate(new Count()));
            metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
            nodeRequest.add(metricName, new Avg());
            metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
            nodeRequest.add(metricName, new Max());

            String nodeResponseName = "node-" + connectionId + ".bytes-received";
            Sensor nodeResponse = sensor(nodeResponseName);
            metricName = metrics.metricName("incoming-byte-rate", metricGrpName, tags);
            nodeResponse.add(metricName, new Rate());
            metricName = metrics.metricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
            nodeResponse.add(metricName, new Rate(new Count()));

            String nodeTimeName = "node-" + connectionId + ".latency";
            Sensor nodeRequestTime = sensor(nodeTimeName);
            metricName = metrics.metricName("request-latency-avg", metricGrpName, tags);
            nodeRequestTime.add(metricName, new Avg());
            metricName = metrics.metricName("request-latency-max", metricGrpName, tags);
            nodeRequestTime.add(metricName, new Max());
        }
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:39,代码来源:Selector.java


示例9: testCleanupMemoryAvailabilityOnMetricsException

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
@PrepareForTest({Sensor.class, MetricName.class})
@Test
public void testCleanupMemoryAvailabilityOnMetricsException() throws Exception {
    Metrics mockedMetrics = createNiceMock(Metrics.class);
    Sensor mockedSensor = createNiceMock(Sensor.class);
    MetricName metricName = createNiceMock(MetricName.class);

    expect(mockedMetrics.sensor(BufferPool.WAIT_TIME_SENSOR_NAME)).andReturn(mockedSensor);

    mockedSensor.record(anyDouble(), anyLong());
    expectLastCall().andThrow(new OutOfMemoryError());
    expect(mockedMetrics.metricName(anyString(), eq(metricGroup), anyString())).andReturn(metricName);
    mockedSensor.add(metricName, new Rate(TimeUnit.NANOSECONDS));

    replay(mockedMetrics, mockedSensor, metricName);

    BufferPool bufferPool = new BufferPool(2, 1, mockedMetrics, time,  metricGroup);
    bufferPool.allocate(1, 0);
    try {
        bufferPool.allocate(2, 1000);
        assertTrue("Expected oom.", false);
    } catch (OutOfMemoryError expected) {
    }
    assertEquals(1, bufferPool.availableMemory());
    assertEquals(0, bufferPool.queued());
    //This shouldn't timeout
    bufferPool.allocate(1, 0);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:29,代码来源:BufferPoolTest.java


示例10: testSampledStatInitialValue

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
@Test
public void testSampledStatInitialValue() {
    // initialValue from each SampledStat is set as the initialValue on its Sample.
    // The only way to test the initialValue is to infer it by having a SampledStat
    // with expired Stats, because their values are reset to the initial values.
    // Most implementations of combine on SampledStat end up returning the default
    // value, so we can use this. This doesn't work for Percentiles though.
    // This test looks a lot like testOldDataHasNoEffect because it's the same
    // flow that leads to this state.
    Max max = new Max();
    Min min = new Min();
    Avg avg = new Avg();
    Count count = new Count();
    Rate.SampledTotal sampledTotal = new Rate.SampledTotal();

    long windowMs = 100;
    int samples = 2;
    MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples);
    max.record(config, 50, time.milliseconds());
    min.record(config, 50, time.milliseconds());
    avg.record(config, 50, time.milliseconds());
    count.record(config, 50, time.milliseconds());
    sampledTotal.record(config, 50, time.milliseconds());
    time.sleep(samples * windowMs);

    assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.milliseconds()), EPS);
    assertEquals(Double.MAX_VALUE, min.measure(config, time.milliseconds()), EPS);
    assertEquals(0.0, avg.measure(config, time.milliseconds()), EPS);
    assertEquals(0, count.measure(config, time.milliseconds()), EPS);
    assertEquals(0.0, sampledTotal.measure(config, time.milliseconds()), EPS);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:32,代码来源:MetricsTest.java


示例11: testRateWindowing

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
@Test
public void testRateWindowing() throws Exception {
    // Use the default time window. Set 3 samples
    MetricConfig cfg = new MetricConfig().samples(3);
    Sensor s = metrics.sensor("test.sensor", cfg);
    s.add(metrics.metricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));

    int sum = 0;
    int count = cfg.samples() - 1;
    // Advance 1 window after every record
    for (int i = 0; i < count; i++) {
        s.record(100);
        sum += 100;
        time.sleep(cfg.timeWindowMs());
    }

    // Sleep for half the window.
    time.sleep(cfg.timeWindowMs() / 2);

    // prior to any time passing
    double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0;

    KafkaMetric km = metrics.metrics().get(metrics.metricName("test.rate", "grp1"));
    assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, km.value(), EPS);
    assertEquals("Elapsed Time = 75 seconds", elapsedSecs,
            ((Rate) km.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:28,代码来源:MetricsTest.java


示例12: maybeRegisterTopicMetrics

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
public void maybeRegisterTopicMetrics(String topic) {
    // if one sensor of the metrics has been registered for the topic,
    // then all other sensors should have been registered; and vice versa
    String topicRecordsCountName = "topic." + topic + ".records-per-batch";
    Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
    if (topicRecordCount == null) {
        Map<String, String> metricTags = new LinkedHashMap<String, String>();
        metricTags.put("topic", topic);
        String metricGrpName = "producer-topic-metrics";

        topicRecordCount = this.metrics.sensor(topicRecordsCountName);
        MetricName m = this.metrics.metricName("record-send-rate", metricGrpName, metricTags);
        topicRecordCount.add(m, new Rate());

        String topicByteRateName = "topic." + topic + ".bytes";
        Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
        m = this.metrics.metricName("byte-rate", metricGrpName, metricTags);
        topicByteRate.add(m, new Rate());

        String topicCompressionRateName = "topic." + topic + ".compression-rate";
        Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName);
        m = this.metrics.metricName("compression-rate", metricGrpName, metricTags);
        topicCompressionRate.add(m, new Avg());

        String topicRetryName = "topic." + topic + ".record-retries";
        Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
        m = this.metrics.metricName("record-retry-rate", metricGrpName, metricTags);
        topicRetrySensor.add(m, new Rate());

        String topicErrorName = "topic." + topic + ".record-errors";
        Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
        m = this.metrics.metricName("record-error-rate", metricGrpName, metricTags);
        topicErrorSensor.add(m, new Rate());
    }
}
 
开发者ID:txazo,项目名称:kafka,代码行数:36,代码来源:Sender.java


示例13: recordTopicFetchMetrics

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
public void recordTopicFetchMetrics(String topic, int bytes, int records) {
    Map<String, String> metricTags = new HashMap<>();
    metricTags.put("topic", topic.replace(".", "_"));

    // record bytes fetched
    String name = "topic." + topic + ".bytes-fetched";
    Sensor bytesFetched = this.metrics.getSensor(name);
    if (bytesFetched == null) {
        bytesFetched = this.metrics.sensor(name);
        bytesFetched.add(this.metrics.metricName("fetch-size-avg",
                this.metricGrpName,
                "The average number of bytes fetched per request for topic " + topic,
                metricTags), new Avg());
        bytesFetched.add(this.metrics.metricName("fetch-size-max",
                this.metricGrpName,
                "The maximum number of bytes fetched per request for topic " + topic,
                metricTags), new Max());
        bytesFetched.add(this.metrics.metricName("bytes-consumed-rate",
                this.metricGrpName,
                "The average number of bytes consumed per second for topic " + topic,
                metricTags), new Rate());
    }
    bytesFetched.record(bytes);

    // record records fetched
    name = "topic." + topic + ".records-fetched";
    Sensor recordsFetched = this.metrics.getSensor(name);
    if (recordsFetched == null) {
        recordsFetched = this.metrics.sensor(name);
        recordsFetched.add(this.metrics.metricName("records-per-request-avg",
                this.metricGrpName,
                "The average number of records in each request for topic " + topic,
                metricTags), new Avg());
        recordsFetched.add(this.metrics.metricName("records-consumed-rate",
                this.metricGrpName,
                "The average number of records consumed per second for topic " + topic,
                metricTags), new Rate());
    }
    recordsFetched.record(records);
}
 
开发者ID:txazo,项目名称:kafka,代码行数:41,代码来源:Fetcher.java


示例14: addPartitionSensors

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
void addPartitionSensors(int partition) {
  Sensor recordsProducedSensor = metrics.sensor("records-produced-partition-" + partition);
  recordsProducedSensor.add(new MetricName("records-produced-rate-partition-" + partition, METRIC_GROUP_NAME,
      "The average number of records per second that are produced to this partition", _tags), new Rate());
  _recordsProducedPerPartition.put(partition, recordsProducedSensor);

  Sensor errorsSensor = metrics.sensor("produce-error-partition-" + partition);
  errorsSensor.add(new MetricName("produce-error-rate-partition-" + partition, METRIC_GROUP_NAME,
      "The average number of errors per second when producing to this partition", _tags), new Rate());
  _produceErrorPerPartition.put(partition, errorsSensor);
}
 
开发者ID:linkedin,项目名称:kafka-monitor,代码行数:12,代码来源:ProduceService.java


示例15: buildSensors

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
private List<TopicSensors.SensorMetric<ProducerRecord>> buildSensors(String key) {
  List<TopicSensors.SensorMetric<ProducerRecord>> sensors = new ArrayList<>();

  // Note: synchronized due to metrics registry not handling concurrent add/check-exists activity in a reliable way
  synchronized (metrics) {
    addSensor(key, "messages-per-sec", new Rate(), sensors, false);
    addSensor(key, "total-messages", new Total(), sensors, false);
    addSensor(key, "failed-messages", new Total(), sensors, true);
    addSensor(key, "failed-messages-per-sec", new Rate(), sensors, true);
  }
  return sensors;
}
 
开发者ID:confluentinc,项目名称:ksql,代码行数:13,代码来源:ProducerCollector.java


示例16: buildSensors

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
private List<TopicSensors.SensorMetric<ConsumerRecord>> buildSensors(String key) {

    List<TopicSensors.SensorMetric<ConsumerRecord>> sensors = new ArrayList<>();

    // Note: synchronized due to metrics registry not handling concurrent add/check-exists activity in a reliable way
    synchronized (this.metrics) {
      addSensor(key, "messages-per-sec", new Rate(), sensors, false);
      addSensor(key, "c-total-messages", new Total(), sensors, false);
      addSensor(key, "c-failed-messages", new Total(), sensors, true);
      addSensor(key, "failed-messages-per-sec", new Rate(), sensors, true);
    }
    return sensors;
  }
 
开发者ID:confluentinc,项目名称:ksql,代码行数:14,代码来源:ConsumerCollector.java


示例17: addThroughputMetrics

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
private void addThroughputMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
    maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-rate", groupNameFromScope(scopeName),
        "The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:5,代码来源:StreamsMetricsImpl.java


示例18: GroupCoordinatorMetrics

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
    this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";

    this.heartbeatLatency = metrics.sensor("heartbeat-latency");
    this.heartbeatLatency.add(metrics.metricName("heartbeat-response-time-max",
        this.metricGrpName,
        "The max time taken to receive a response to a heartbeat request"), new Max());
    this.heartbeatLatency.add(metrics.metricName("heartbeat-rate",
        this.metricGrpName,
        "The average number of heartbeats per second"), new Rate(new Count()));

    this.joinLatency = metrics.sensor("join-latency");
    this.joinLatency.add(metrics.metricName("join-time-avg",
            this.metricGrpName,
            "The average time taken for a group rejoin"), new Avg());
    this.joinLatency.add(metrics.metricName("join-time-max",
            this.metricGrpName,
            "The max time taken for a group rejoin"), new Max());
    this.joinLatency.add(metrics.metricName("join-rate",
            this.metricGrpName,
            "The number of group joins per second"), new Rate(new Count()));

    this.syncLatency = metrics.sensor("sync-latency");
    this.syncLatency.add(metrics.metricName("sync-time-avg",
            this.metricGrpName,
            "The average time taken for a group sync"), new Avg());
    this.syncLatency.add(metrics.metricName("sync-time-max",
            this.metricGrpName,
            "The max time taken for a group sync"), new Max());
    this.syncLatency.add(metrics.metricName("sync-rate",
            this.metricGrpName,
            "The number of group syncs per second"), new Rate(new Count()));

    Measurable lastHeartbeat =
        new Measurable() {
            public double measure(MetricConfig config, long now) {
                return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
            }
        };
    metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago",
        this.metricGrpName,
        "The number of seconds since the last controller heartbeat was sent"),
        lastHeartbeat);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:45,代码来源:AbstractCoordinator.java


示例19: SelectorMetrics

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
public SelectorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
    this.metrics = metrics;
    this.metricGrpPrefix = metricGrpPrefix;
    this.metricTags = metricTags;
    this.metricsPerConnection = metricsPerConnection;
    String metricGrpName = metricGrpPrefix + "-metrics";
    StringBuilder tagsSuffix = new StringBuilder();

    for (Map.Entry<String, String> tag: metricTags.entrySet()) {
        tagsSuffix.append(tag.getKey());
        tagsSuffix.append("-");
        tagsSuffix.append(tag.getValue());
    }
    // 监控连接关闭,使用rate记录每秒连接关闭数
    this.connectionClosed = sensor("connections-closed:" + tagsSuffix.toString());
    MetricName metricName = metrics.metricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
    this.connectionClosed.add(metricName, new Rate());

    // 监控连接创建,使用rate记录每秒连接创建数
    this.connectionCreated = sensor("connections-created:" + tagsSuffix.toString());
    metricName = metrics.metricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
    this.connectionCreated.add(metricName, new Rate());

    // 监控网络操作数,使用rate记录每秒钟所有连接上执行读写操作总数
    this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix.toString());
    metricName = metrics.metricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
    bytesTransferred.add(metricName, new Rate(new Count()));

    // 监控发送请求相关指标
    this.bytesSent = sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred);
    metricName = metrics.metricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
    this.bytesSent.add(metricName, new Rate());
    metricName = metrics.metricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
    this.bytesSent.add(metricName, new Rate(new Count()));
    metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
    this.bytesSent.add(metricName, new Avg());
    metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
    this.bytesSent.add(metricName, new Max());

    // 监控接受请求的相关指标
    this.bytesReceived = sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred);
    metricName = metrics.metricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
    this.bytesReceived.add(metricName, new Rate());
    metricName = metrics.metricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
    this.bytesReceived.add(metricName, new Rate(new Count()));

    // 监控select方法指标
    this.selectTime = sensor("select-time:" + tagsSuffix.toString());
    metricName = metrics.metricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
    this.selectTime.add(metricName, new Rate(new Count()));
    metricName = metrics.metricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
    this.selectTime.add(metricName, new Avg());
    metricName = metrics.metricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
    this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));

    // 监控select方法的相关指标
    this.ioTime = sensor("io-time:" + tagsSuffix.toString());
    metricName = metrics.metricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
    this.ioTime.add(metricName, new Avg());
    metricName = metrics.metricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
    this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));

    metricName = metrics.metricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
    topLevelMetricNames.add(metricName);
    this.metrics.addMetric(metricName, new Measurable() {
        public double measure(MetricConfig config, long now) {
            return channels.size();
        }
    });
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:71,代码来源:Selector.java


示例20: testSimpleStats

import org.apache.kafka.common.metrics.stats.Rate; //导入依赖的package包/类
@Test
public void testSimpleStats() throws Exception {
    ConstantMeasurable measurable = new ConstantMeasurable();

    metrics.addMetric(metrics.metricName("direct.measurable", "grp1", "The fraction of time an appender waits for space allocation."), measurable);
    Sensor s = metrics.sensor("test.sensor");
    s.add(metrics.metricName("test.avg", "grp1"), new Avg());
    s.add(metrics.metricName("test.max", "grp1"), new Max());
    s.add(metrics.metricName("test.min", "grp1"), new Min());
    s.add(metrics.metricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
    s.add(metrics.metricName("test.occurences", "grp1"), new Rate(TimeUnit.SECONDS, new Count()));
    s.add(metrics.metricName("test.count", "grp1"), new Count());
    s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT,
                         new Percentile(metrics.metricName("test.median", "grp1"), 50.0),
                         new Percentile(metrics.metricName("test.perc99_9", "grp1"), 99.9)));

    Sensor s2 = metrics.sensor("test.sensor2");
    s2.add(metrics.metricName("s2.total", "grp1"), new Total());
    s2.record(5.0);

    int sum = 0;
    int count = 10;
    for (int i = 0; i < count; i++) {
        s.record(i);
        sum += i;
    }
    // prior to any time passing
    double elapsedSecs = (config.timeWindowMs() * (config.samples() - 1)) / 1000.0;
    assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs), count / elapsedSecs,
                 metrics.metrics().get(metrics.metricName("test.occurences", "grp1")).value(), EPS);

    // pretend 2 seconds passed...
    long sleepTimeMs = 2;
    time.sleep(sleepTimeMs * 1000);
    elapsedSecs += sleepTimeMs;

    assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(metrics.metricName("s2.total", "grp1")).value(), EPS);
    assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(metrics.metricName("test.avg", "grp1")).value(), EPS);
    assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(metrics.metricName("test.max", "grp1")).value(), EPS);
    assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(metrics.metricName(& 

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java BinaryTournamentSelection类代码示例发布时间:2022-05-15
下一篇:
Java UserDefinedAssessmentTypeCollection类代码示例发布时间:2022-05-15
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap