kafka 入门分享
一开始自己docker 跑起来一个kafka 但是踩了很多坑 容器跑不起来 后来又容器内无法消费 本地连接失败等
后来发现镜像依赖的不一样 最后用的bitnami/zookeeper 和 bitnami/kafka
上代码
docker-compose.yml
version: '3'
services:
zookeeper:
image: bitnami/zookeeper:latest
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:latest
depends_on: [ zookeeper ]
ports:
- "9092:9092"
- "9093:9093"
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_CREATE_TOPICS="test:1:1"
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://localhost:9092,EXTERNAL://localhost:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
volumes:
- /data/product/zj_bigdata/data/kafka/docker.sock:/var/run/docker.sock
kafka-manager:
image: sheepkiller/kafka-manager
depends_on: [ zookeeper ]
ports:
- "9190:9000"
environment:
ZK_HOSTS: zookeeper:2181
producer.go
package main
import (
"context"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
topic := "gotest"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9093", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
consumer.go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
topic := "gotest"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9093", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
b := make([]byte, 10e3) // 10KB max per message
for {
n, err := batch.Read(b)
if err != nil {
break
}
fmt.Println(string(b[:n]))
}
if err := batch.Close(); err != nil {
log.Fatal("failed to close batch:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close connection:", err)
}
}
参考连接