代码拉取完成,页面将自动刷新
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/Shopify/sarama"
"github.com/krallistic/kazoo-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
plog "github.com/prometheus/common/log"
"github.com/prometheus/common/version"
"github.com/rcrowley/go-metrics"
"gopkg.in/alecthomas/kingpin.v2"
)
const (
namespace = "kafka"
clientID = "kafka_exporter"
)
var (
clusterBrokers *prometheus.Desc
topicPartitions *prometheus.Desc
topicCurrentOffset *prometheus.Desc
topicOldestOffset *prometheus.Desc
topicPartitionLeader *prometheus.Desc
topicPartitionReplicas *prometheus.Desc
topicPartitionInSyncReplicas *prometheus.Desc
topicPartitionUsesPreferredReplica *prometheus.Desc
topicUnderReplicatedPartition *prometheus.Desc
consumergroupCurrentOffset *prometheus.Desc
consumergroupCurrentOffsetSum *prometheus.Desc
consumergroupLag *prometheus.Desc
consumergroupLagSum *prometheus.Desc
consumergroupLagZookeeper *prometheus.Desc
consumergroupMembers *prometheus.Desc
)
// Exporter collects Kafka stats from the given server and exports them using
// the prometheus metrics package.
type Exporter struct {
client sarama.Client
topicFilter *regexp.Regexp
groupFilter *regexp.Regexp
mu sync.Mutex
useZooKeeperLag bool
zookeeperClient *kazoo.Kazoo
nextMetadataRefresh time.Time
metadataRefreshInterval time.Duration
}
type kafkaOpts struct {
uri []string
useSASL bool
useSASLHandshake bool
saslUsername string
saslPassword string
saslMechanism string
useTLS bool
tlsCAFile string
tlsCertFile string
tlsKeyFile string
tlsInsecureSkipTLSVerify bool
kafkaVersion string
useZooKeeperLag bool
uriZookeeper []string
labels string
metadataRefreshInterval string
serviceName string
kerberosConfigPath string
realm string
keyTabPath string
kerberosAuthType string
}
// CanReadCertAndKey returns true if the certificate and key files already exists,
// otherwise returns false. If lost one of cert and key, returns error.
func CanReadCertAndKey(certPath, keyPath string) (bool, error) {
certReadable := canReadFile(certPath)
keyReadable := canReadFile(keyPath)
if certReadable == false && keyReadable == false {
return false, nil
}
if certReadable == false {
return false, fmt.Errorf("error reading %s, certificate and key must be supplied as a pair", certPath)
}
if keyReadable == false {
return false, fmt.Errorf("error reading %s, certificate and key must be supplied as a pair", keyPath)
}
return true, nil
}
// If the file represented by path exists and
// readable, returns true otherwise returns false.
func canReadFile(path string) bool {
f, err := os.Open(path)
if err != nil {
return false
}
defer f.Close()
return true
}
// NewExporter returns an initialized Exporter.
func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string) (*Exporter, error) {
var zookeeperClient *kazoo.Kazoo
config := sarama.NewConfig()
config.ClientID = clientID
kafkaVersion, err := sarama.ParseKafkaVersion(opts.kafkaVersion)
if err != nil {
return nil, err
}
config.Version = kafkaVersion
if opts.useSASL {
// Convert to lowercase so that SHA512 and SHA256 is still valid
opts.saslMechanism = strings.ToLower(opts.saslMechanism)
switch opts.saslMechanism {
case "scram-sha512":
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
case "scram-sha256":
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
case "gssapi":
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeGSSAPI)
config.Net.SASL.GSSAPI.ServiceName = opts.serviceName
config.Net.SASL.GSSAPI.KerberosConfigPath = opts.kerberosConfigPath
config.Net.SASL.GSSAPI.Realm = opts.realm
config.Net.SASL.GSSAPI.Username = opts.saslUsername
if opts.kerberosAuthType == "keytabAuth" {
config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
config.Net.SASL.GSSAPI.KeyTabPath = opts.keyTabPath
} else {
config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
config.Net.SASL.GSSAPI.Password = opts.saslPassword
}
case "plain":
default:
return nil, fmt.Errorf(
`invalid sasl mechanism "%s": can only be "scram-sha256", "scram-sha512" or "plain"`,
opts.saslMechanism,
)
}
config.Net.SASL.Enable = true
config.Net.SASL.Handshake = opts.useSASLHandshake
if opts.saslUsername != "" {
config.Net.SASL.User = opts.saslUsername
}
if opts.saslPassword != "" {
config.Net.SASL.Password = opts.saslPassword
}
}
if opts.useTLS {
config.Net.TLS.Enable = true
config.Net.TLS.Config = &tls.Config{
RootCAs: x509.NewCertPool(),
InsecureSkipVerify: opts.tlsInsecureSkipTLSVerify,
}
if opts.tlsCAFile != "" {
if ca, err := ioutil.ReadFile(opts.tlsCAFile); err == nil {
config.Net.TLS.Config.RootCAs.AppendCertsFromPEM(ca)
} else {
return nil, err
}
}
canReadCertAndKey, err := CanReadCertAndKey(opts.tlsCertFile, opts.tlsKeyFile)
if err != nil {
return nil, errors.Wrap(err, "error reading cert and key")
}
if canReadCertAndKey {
cert, err := tls.LoadX509KeyPair(opts.tlsCertFile, opts.tlsKeyFile)
if err == nil {
config.Net.TLS.Config.Certificates = []tls.Certificate{cert}
} else {
return nil, err
}
}
}
if opts.useZooKeeperLag {
plog.Infoln("Using zookeeper lag, so connecting to zookeeper")
zookeeperClient, err = kazoo.NewKazoo(opts.uriZookeeper, nil)
if err != nil {
return nil, errors.Wrap(err, "error connecting to zookeeper")
}
}
interval, err := time.ParseDuration(opts.metadataRefreshInterval)
if err != nil {
return nil, errors.Wrap(err, "Cannot parse metadata refresh interval")
}
config.Metadata.RefreshFrequency = interval
client, err := sarama.NewClient(opts.uri, config)
if err != nil {
return nil, errors.Wrap(err, "Error Init Kafka Client")
}
plog.Infoln("Done Init Clients")
// Init our exporter.
return &Exporter{
client: client,
topicFilter: regexp.MustCompile(topicFilter),
groupFilter: regexp.MustCompile(groupFilter),
useZooKeeperLag: opts.useZooKeeperLag,
zookeeperClient: zookeeperClient,
nextMetadataRefresh: time.Now(),
metadataRefreshInterval: interval,
}, nil
}
// Describe describes all the metrics ever exported by the Kafka exporter. It
// implements prometheus.Collector.
func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
ch <- clusterBrokers
ch <- topicCurrentOffset
ch <- topicOldestOffset
ch <- topicPartitions
ch <- topicPartitionLeader
ch <- topicPartitionReplicas
ch <- topicPartitionInSyncReplicas
ch <- topicPartitionUsesPreferredReplica
ch <- topicUnderReplicatedPartition
ch <- consumergroupCurrentOffset
ch <- consumergroupCurrentOffsetSum
ch <- consumergroupLag
ch <- consumergroupLagZookeeper
ch <- consumergroupLagSum
}
// Collect fetches the stats from configured Kafka location and delivers them
// as Prometheus metrics. It implements prometheus.Collector.
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
var wg = sync.WaitGroup{}
ch <- prometheus.MustNewConstMetric(
clusterBrokers, prometheus.GaugeValue, float64(len(e.client.Brokers())),
)
offset := make(map[string]map[int32]int64)
now := time.Now()
if now.After(e.nextMetadataRefresh) {
plog.Info("Refreshing client metadata")
if err := e.client.RefreshMetadata(); err != nil {
plog.Errorf("Cannot refresh topics, using cached data: %v", err)
}
e.nextMetadataRefresh = now.Add(e.metadataRefreshInterval)
}
topics, err := e.client.Topics()
if err != nil {
plog.Errorf("Cannot get topics: %v", err)
return
}
getTopicMetrics := func(topic string) {
defer wg.Done()
if !e.topicFilter.MatchString(topic) {
return
}
partitions, err := e.client.Partitions(topic)
if err != nil {
plog.Errorf("Cannot get partitions of topic %s: %v", topic, err)
return
}
ch <- prometheus.MustNewConstMetric(
topicPartitions, prometheus.GaugeValue, float64(len(partitions)), topic,
)
e.mu.Lock()
offset[topic] = make(map[int32]int64, len(partitions))
e.mu.Unlock()
for _, partition := range partitions {
broker, err := e.client.Leader(topic, partition)
if err != nil {
plog.Errorf("Cannot get leader of topic %s partition %d: %v", topic, partition, err)
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionLeader, prometheus.GaugeValue, float64(broker.ID()), topic, strconv.FormatInt(int64(partition), 10),
)
}
currentOffset, err := e.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
plog.Errorf("Cannot get current offset of topic %s partition %d: %v", topic, partition, err)
} else {
e.mu.Lock()
offset[topic][partition] = currentOffset
e.mu.Unlock()
ch <- prometheus.MustNewConstMetric(
topicCurrentOffset, prometheus.GaugeValue, float64(currentOffset), topic, strconv.FormatInt(int64(partition), 10),
)
}
oldestOffset, err := e.client.GetOffset(topic, partition, sarama.OffsetOldest)
if err != nil {
plog.Errorf("Cannot get oldest offset of topic %s partition %d: %v", topic, partition, err)
} else {
ch <- prometheus.MustNewConstMetric(
topicOldestOffset, prometheus.GaugeValue, float64(oldestOffset), topic, strconv.FormatInt(int64(partition), 10),
)
}
replicas, err := e.client.Replicas(topic, partition)
if err != nil {
plog.Errorf("Cannot get replicas of topic %s partition %d: %v", topic, partition, err)
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionReplicas, prometheus.GaugeValue, float64(len(replicas)), topic, strconv.FormatInt(int64(partition), 10),
)
}
inSyncReplicas, err := e.client.InSyncReplicas(topic, partition)
if err != nil {
plog.Errorf("Cannot get in-sync replicas of topic %s partition %d: %v", topic, partition, err)
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionInSyncReplicas, prometheus.GaugeValue, float64(len(inSyncReplicas)), topic, strconv.FormatInt(int64(partition), 10),
)
}
if broker != nil && replicas != nil && len(replicas) > 0 && broker.ID() == replicas[0] {
ch <- prometheus.MustNewConstMetric(
topicPartitionUsesPreferredReplica, prometheus.GaugeValue, float64(1), topic, strconv.FormatInt(int64(partition), 10),
)
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionUsesPreferredReplica, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
)
}
if replicas != nil && inSyncReplicas != nil && len(inSyncReplicas) < len(replicas) {
ch <- prometheus.MustNewConstMetric(
topicUnderReplicatedPartition, prometheus.GaugeValue, float64(1), topic, strconv.FormatInt(int64(partition), 10),
)
} else {
ch <- prometheus.MustNewConstMetric(
topicUnderReplicatedPartition, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
)
}
if e.useZooKeeperLag {
ConsumerGroups, err := e.zookeeperClient.Consumergroups()
if err != nil {
plog.Errorf("Cannot get consumer group %v", err)
}
for _, group := range ConsumerGroups {
offset, _ := group.FetchOffset(topic, partition)
if offset > 0 {
consumerGroupLag := currentOffset - offset
ch <- prometheus.MustNewConstMetric(
consumergroupLagZookeeper, prometheus.GaugeValue, float64(consumerGroupLag), group.Name, topic, strconv.FormatInt(int64(partition), 10),
)
}
}
}
}
}
for _, topic := range topics {
wg.Add(1)
go getTopicMetrics(topic)
}
wg.Wait()
getConsumerGroupMetrics := func(broker *sarama.Broker) {
defer wg.Done()
if err := broker.Open(e.client.Config()); err != nil && err != sarama.ErrAlreadyConnected {
plog.Errorf("Cannot connect to broker %d: %v", broker.ID(), err)
return
}
defer broker.Close()
groups, err := broker.ListGroups(&sarama.ListGroupsRequest{})
if err != nil {
plog.Errorf("Cannot get consumer group: %v", err)
return
}
groupIds := make([]string, 0)
for groupId := range groups.Groups {
if e.groupFilter.MatchString(groupId) {
groupIds = append(groupIds, groupId)
}
}
describeGroups, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{Groups: groupIds})
if err != nil {
plog.Errorf("Cannot get describe groups: %v", err)
return
}
for _, group := range describeGroups.Groups {
offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: 1}
for _, member := range group.Members {
assignment, err := member.GetMemberAssignment()
if err != nil {
plog.Errorf("Cannot get GetMemberAssignment of group member %v : %v", member, err)
return
}
for topic, partions := range assignment.Topics {
for _, partition := range partions {
offsetFetchRequest.AddPartition(topic, partition)
}
}
}
ch <- prometheus.MustNewConstMetric(
consumergroupMembers, prometheus.GaugeValue, float64(len(group.Members)), group.GroupId,
)
offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest)
if err != nil {
plog.Errorf("Cannot get offset of group %s: %v", group.GroupId, err)
continue
}
for topic, partitions := range offsetFetchResponse.Blocks {
// If the topic is not consumed by that consumer group, skip it
topicConsumed := false
for _, offsetFetchResponseBlock := range partitions {
// Kafka will return -1 if there is no offset associated with a topic-partition under that consumer group
if offsetFetchResponseBlock.Offset != -1 {
topicConsumed = true
break
}
}
if !topicConsumed {
continue
}
var currentOffsetSum int64
var lagSum int64
for partition, offsetFetchResponseBlock := range partitions {
err := offsetFetchResponseBlock.Err
if err != sarama.ErrNoError {
plog.Errorf("Error for partition %d :%v", partition, err.Error())
continue
}
currentOffset := offsetFetchResponseBlock.Offset
currentOffsetSum += currentOffset
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
e.mu.Lock()
if offset, ok := offset[topic][partition]; ok {
// If the topic is consumed by that consumer group, but no offset associated with the partition
// forcing lag to -1 to be able to alert on that
var lag int64
if offsetFetchResponseBlock.Offset == -1 {
lag = -1
} else {
lag = offset - offsetFetchResponseBlock.Offset
lagSum += lag
}
ch <- prometheus.MustNewConstMetric(
consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
} else {
plog.Errorf("No offset of topic %s partition %d, cannot get consumer group lag", topic, partition)
}
e.mu.Unlock()
}
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic,
)
ch <- prometheus.MustNewConstMetric(
consumergroupLagSum, prometheus.GaugeValue, float64(lagSum), group.GroupId, topic,
)
}
}
}
if len(e.client.Brokers()) > 0 {
for _, broker := range e.client.Brokers() {
wg.Add(1)
go getConsumerGroupMetrics(broker)
}
wg.Wait()
} else {
plog.Errorln("No valid broker, cannot get consumer group metrics")
}
}
func init() {
metrics.UseNilMetrics = true
prometheus.MustRegister(version.NewCollector("kafka_exporter"))
}
func main() {
var (
listenAddress = kingpin.Flag("web.listen-address", "Address to listen on for web interface and telemetry.").Default(":9308").String()
metricsPath = kingpin.Flag("web.telemetry-path", "Path under which to expose metrics.").Default("/metrics").String()
topicFilter = kingpin.Flag("topic.filter", "Regex that determines which topics to collect.").Default(".*").String()
groupFilter = kingpin.Flag("group.filter", "Regex that determines which consumer groups to collect.").Default(".*").String()
logSarama = kingpin.Flag("log.enable-sarama", "Turn on Sarama logging.").Default("false").Bool()
opts = kafkaOpts{}
)
kingpin.Flag("kafka.server", "Address (host:port) of Kafka server.").Default("kafka:9092").StringsVar(&opts.uri)
kingpin.Flag("sasl.enabled", "Connect using SASL/PLAIN.").Default("false").BoolVar(&opts.useSASL)
kingpin.Flag("sasl.handshake", "Only set this to false if using a non-Kafka SASL proxy.").Default("true").BoolVar(&opts.useSASLHandshake)
kingpin.Flag("sasl.username", "SASL user name.").Default("").StringVar(&opts.saslUsername)
kingpin.Flag("sasl.password", "SASL user password.").Default("").StringVar(&opts.saslPassword)
kingpin.Flag("sasl.mechanism", "The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism").Default("").StringVar(&opts.saslMechanism)
kingpin.Flag("sasl.service-name", "Service name when using kerberos Auth").Default("").StringVar(&opts.serviceName)
kingpin.Flag("sasl.kerberos-config-path", "Kerberos config path").Default("").StringVar(&opts.kerberosConfigPath)
kingpin.Flag("sasl.realm", "Kerberos realm").Default("").StringVar(&opts.realm)
kingpin.Flag("sasl.kerberos-auth-type", "Kerberos auth type. Either 'keytabAuth' or 'userAuth'").Default("").StringVar(&opts.kerberosAuthType)
kingpin.Flag("sasl.keytab-path", "Kerberos keytab file path").Default("").StringVar(&opts.keyTabPath)
kingpin.Flag("tls.enabled", "Connect using TLS.").Default("false").BoolVar(&opts.useTLS)
kingpin.Flag("tls.ca-file", "The optional certificate authority file for TLS client authentication.").Default("").StringVar(&opts.tlsCAFile)
kingpin.Flag("tls.cert-file", "The optional certificate file for client authentication.").Default("").StringVar(&opts.tlsCertFile)
kingpin.Flag("tls.key-file", "The optional key file for client authentication.").Default("").StringVar(&opts.tlsKeyFile)
kingpin.Flag("tls.insecure-skip-tls-verify", "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure.").Default("false").BoolVar(&opts.tlsInsecureSkipTLSVerify)
kingpin.Flag("kafka.version", "Kafka broker version").Default(sarama.V1_0_0_0.String()).StringVar(&opts.kafkaVersion)
kingpin.Flag("use.consumelag.zookeeper", "if you need to use a group from zookeeper").Default("false").BoolVar(&opts.useZooKeeperLag)
kingpin.Flag("zookeeper.server", "Address (hosts) of zookeeper server.").Default("localhost:2181").StringsVar(&opts.uriZookeeper)
kingpin.Flag("kafka.labels", "Kafka cluster name").Default("").StringVar(&opts.labels)
kingpin.Flag("refresh.metadata", "Metadata refresh interval").Default("30s").StringVar(&opts.metadataRefreshInterval)
plog.AddFlags(kingpin.CommandLine)
kingpin.Version(version.Print("kafka_exporter"))
kingpin.HelpFlag.Short('h')
kingpin.Parse()
plog.Infoln("Starting kafka_exporter", version.Info())
plog.Infoln("Build context", version.BuildContext())
labels := make(map[string]string)
// Protect against empty labels
if opts.labels != "" {
for _, label := range strings.Split(opts.labels, ",") {
splitted := strings.Split(label, "=")
if len(splitted) >= 2 {
labels[splitted[0]] = splitted[1]
}
}
}
setup(*listenAddress, *metricsPath, *topicFilter, *groupFilter, *logSarama, opts, labels)
}
func setup(
listenAddress string,
metricsPath string,
topicFilter string,
groupFilter string,
logSarama bool,
opts kafkaOpts,
labels map[string]string,
) {
clusterBrokers = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "brokers"),
"Number of Brokers in the Kafka Cluster.",
nil, labels,
)
topicPartitions = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partitions"),
"Number of partitions for this Topic",
[]string{"topic"}, labels,
)
topicCurrentOffset = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_current_offset"),
"Current Offset of a Broker at Topic/Partition",
[]string{"topic", "partition"}, labels,
)
topicOldestOffset = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_oldest_offset"),
"Oldest Offset of a Broker at Topic/Partition",
[]string{"topic", "partition"}, labels,
)
topicPartitionLeader = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_leader"),
"Leader Broker ID of this Topic/Partition",
[]string{"topic", "partition"}, labels,
)
topicPartitionReplicas = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_replicas"),
"Number of Replicas for this Topic/Partition",
[]string{"topic", "partition"}, labels,
)
topicPartitionInSyncReplicas = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_in_sync_replica"),
"Number of In-Sync Replicas for this Topic/Partition",
[]string{"topic", "partition"}, labels,
)
topicPartitionUsesPreferredReplica = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_leader_is_preferred"),
"1 if Topic/Partition is using the Preferred Broker",
[]string{"topic", "partition"}, labels,
)
topicUnderReplicatedPartition = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_under_replicated_partition"),
"1 if Topic/Partition is under Replicated",
[]string{"topic", "partition"}, labels,
)
consumergroupCurrentOffset = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "current_offset"),
"Current Offset of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
)
consumergroupCurrentOffsetSum = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "current_offset_sum"),
"Current Offset of a ConsumerGroup at Topic for all partitions",
[]string{"consumergroup", "topic"}, labels,
)
consumergroupLag = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "lag"),
"Current Approximate Lag of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
)
consumergroupLagZookeeper = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroupzookeeper", "lag_zookeeper"),
"Current Approximate Lag(zookeeper) of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, nil,
)
consumergroupLagSum = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "lag_sum"),
"Current Approximate Lag of a ConsumerGroup at Topic for all partitions",
[]string{"consumergroup", "topic"}, labels,
)
consumergroupMembers = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "members"),
"Amount of members in a consumer group",
[]string{"consumergroup"}, labels,
)
if logSarama {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
exporter, err := NewExporter(opts, topicFilter, groupFilter)
if err != nil {
plog.Fatalln(err)
}
defer exporter.client.Close()
prometheus.MustRegister(exporter)
http.Handle(metricsPath, promhttp.Handler())
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`<html>
<head><title>Kafka Exporter</title></head>
<body>
<h1>Kafka Exporter</h1>
<p><a href='` + metricsPath + `'>Metrics</a></p>
</body>
</html>`))
})
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
// need more specific sarama check
w.Write([]byte("ok"))
})
plog.Infoln("Listening on", listenAddress)
plog.Fatal(http.ListenAndServe(listenAddress, nil))
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。