forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
77 lines (67 loc) · 2.03 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"strings"
"time"
"github.com/Shopify/sarama"
"go.opentelemetry.io/otel/exporters/stdout"
)
var (
brokers = flag.String("brokers", "localhost:9092", "The Kafka brokers to connect to, as a comma separated list")
topic = flag.String("topic", "default_topic", "The Kafka topic to use")
logger = log.New(os.Stdout, "[OTelInterceptor] ", log.LstdFlags)
)
func main() {
flag.Parse()
if *brokers == "" {
logger.Fatalln("at least one broker is required")
}
splitBrokers := strings.Split(*brokers, ",")
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
// oTel stdout example
pusher, err := stdout.InstallNewPipeline([]stdout.Option{
stdout.WithQuantiles([]float64{0.5, 0.9, 0.99}),
}, nil)
if err != nil {
logger.Fatalf("failed to initialize stdout export pipeline: %v", err)
}
defer pusher.Stop()
// simple sarama producer that adds a new producer interceptor
conf := sarama.NewConfig()
conf.Version = sarama.V0_11_0_0
conf.Producer.Interceptors = []sarama.ProducerInterceptor{NewOTelInterceptor(splitBrokers)}
producer, err := sarama.NewAsyncProducer(splitBrokers, conf)
if err != nil {
panic("Couldn't create a Kafka producer")
}
defer producer.AsyncClose()
// kill -2, trap SIGINT to trigger a shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// ticker
bulkSize := 2
duration := 5 * time.Second
ticker := time.NewTicker(duration)
logger.Printf("Starting to produce %v messages every %v", bulkSize, duration)
for {
select {
case t := <-ticker.C:
now := t.Format(time.RFC3339)
logger.Printf("\nproducing %v messages to topic %s at %s", bulkSize, *topic, now)
for i := 0; i < bulkSize; i++ {
producer.Input() <- &sarama.ProducerMessage{
Topic: *topic, Key: nil,
Value: sarama.StringEncoder(fmt.Sprintf("test message %v/%v from kafka-client-go-test at %s", i+1, bulkSize, now)),
}
}
case <-signals:
logger.Println("terminating the program")
logger.Println("Bye :)")
return
}
}
}