• 文档
  • 控制台
  • 登录
  • 立即注册
    目前不支持用户自主注册,如需注册账号,请联系400-080-1100
消息队列Kafka用户指南
最近更新时间:

5 部署应用

5.1 SDK

5.1.1 Go SDK

本文介绍如何在 VPC 环境下通过 SASL 接入点接入 CMQ-Kafka 版消息队列收发消息。 

前提条件

在开始开发前,请确保您已经完成以下工作。

· 安装 Go1.16 或以上版本的 SDK。

· 配置合适的 IDE 环境和编码环境。

· 在 CMQ-Kafka 版消息队列控制台创建实例。 

开始开发

下面以 GoLand 作为 IDE 环境来开发。

(1) 创建 go modules 工程。

(2) 新建配置 config.yaml。

brokers: XXX:XX,XXX:XX,XXX:XX

saslenable: true

user: XXX

pwd: "XXX"

topic: topic1

comsumergrp: g1

image.png

image.png

(3) 生产消息。

producerDemo.go 示例代码如下:

package main

import (

"encoding/json"

"fmt"

"github.com/Shopify/sarama"

"github.com/rs/zerolog/log"

"gopkg.in/yaml.v3"

"io/ioutil"

"strings"

"time"

)

type Config struct {

Brokers string `yaml:"brokers"`

SASLEnable bool `json:"saslenable"`

User string `yaml:"user"`

Pwd string `yaml:"pwd"`

Topic string `yaml:"topic"`

ComsumerGrp string `yaml:"comsumergrp"`

}

type producerClient struct {

syncProducer sarama.SyncProducer

client sarama.Client

createTime int64

}

func newProducerClient(brokers string,conf Config) (*producerClient, error) {

brokerList := strings.Split(brokers, ",")

config := sarama.NewConfig()

config.Version = sarama.V2_4_0_0

config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要 leader 和 follow 都确认

config.Producer.Partitioner = sarama.NewManualPartitioner // 指定一个 partition

config.Producer.Return.Successes = true // 成功交付的消息将在

success channel 返回

config.Metadata.Retry.Max = 1

config.Metadata.Timeout = time.Second * 5

// sasl 配置信息

//开启 SASL 认证

config.Net.SASL.Enable = conf.SASLEnable

//SASL 认证用户名

config.Net.SASL.User = conf.User

//SASL 认证密码

config.Net.SASL.Password = conf.Pwd

fmt.Println("newProducerClient",config)

//创建一个 Client

client, err := sarama.NewClient(brokerList, config)

if err != nil {

return nil, err

}

//创建一个 producer

syncProducer, err := sarama.NewSyncProducerFromClient(client)

if err != nil {

_ = client.Close()

return nil, err

}

producer := &producerClient{

syncProducer: syncProducer,

client: client,

createTime: time.Now().Unix(),

}

return producer, nil

}

func main(){

log.Info().Interface("produce messsage", "begin").Send()

//读取 broker 配置文件

var conf Config

yamlfile,err := ioutil.ReadFile("config.yaml")

if err!= nil{

fmt.Println("read config err",err)

}

err2 := yaml.Unmarshal(yamlfile,&conf)

if err2!=nil{

fmt.Println("unmarsharl config err",err2)

return

}

fmt.Println("config:",conf)

brokers := conf.Brokers

topicName := conf.Topic

fmt.Println("brokers => ",brokers)

log.Info().Interface("run", "生产消息").Send()

//创建一个生产者

producerClient, err := newProducerClient(brokers,conf)

if err != nil {

fmt.Println("newProducerClient error => ", err)

panic(err)

}

defer producerClient.client.Close()

//构造一个生产者消息

msg := &sarama.ProducerMessage{

Topic: topicName,

Value: sarama.StringEncoder(time.Now().String()),

Partition: 1,

}

//发送消息

_, _, err = producerClient.syncProducer.SendMessage(msg)

if err != nil {

fmt.Println("SendMessage error => ", err)

}

jmsg, _ := json.Marshal(msg)

fmt.Println("SendMessage => ", string(jmsg))

}

(4) 消费消息。

consumerDemo.go 示例代码如下:

package main

import (

"context"

"encoding/json"

"fmt"

"github.com/Shopify/sarama"

"github.com/rs/zerolog/log"

"gopkg.in/yaml.v3"

"io/ioutil"

"strings"

"sync"

"time"

)

type consumerGroupClient struct {

consumer sarama.ConsumerGroup

client sarama.Client

createTime int64

ready chan bool

}

type groupConsumer struct {

ready chan bool

msgCh chan *sarama.ConsumerMessage

limit int

received int

}

// Setup is run at the beginning of a new session, before ConsumeClaim

func (c *groupConsumer) Setup(sarama.ConsumerGroupSession) error {

// Mark the consumer as ready

close(c.ready)

return nil

}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (c *groupConsumer) Cleanup(sarama.ConsumerGroupSession) error {

return nil

}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (c *groupConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim

sarama.ConsumerGroupClaim) error {

// NOTE:

// Do not move the code below to a goroutine.

// The `ConsumeClaim` itself is called within a goroutine, see:

// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29

for message := range claim.Messages() {

fmt.Println(fmt.Sprintf("Message claimed: value = %s, timestamp = %v,

topic = %s", string(message.Value), message.Timestamp, message.Topic))

session.MarkMessage(message, "")

c.msgCh <- message

}

return nil

}

func newConsumerGroupClient(brokers string, groupID string, conf Config)

(*consumerGroupClient, error) {

brokerList := strings.Split(brokers, ",")

config := sarama.NewConfig()

config.Version = sarama.V2_4_0_0

config.Metadata.Retry.Max = 1

config.Metadata.Timeout = time.Second * 5

config.Consumer.Offsets.Initial = sarama.OffsetOldest

config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin

// sasl 配置信息

//开启 SASL 验证

config.Net.SASL.Enable = conf.SASLEnable

//SASL 认证用户名

config.Net.SASL.User = conf.User

//SASL 认证密码

config.Net.SASL.Password = conf.Pwd

//创建一个 Client

baseClient, err := sarama.NewClient(brokerList, config)

if err != nil {

return nil, err

}

//创建 consumer

client, err := sarama.NewConsumerGroupFromClient(groupID, baseClient)

if err != nil {

_ = baseClient.Close()

return nil, err

}

consumer := &consumerGroupClient{

consumer: client,

client: baseClient,

createTime: time.Now().Unix(),

}

return consumer, nil

}

func (c *consumerGroupClient) ConsumeMessage(topics string, timeoutSec int, limit int)

([]*sarama.ConsumerMessage, error) {

/**

* Setup a new Sarama consumer group

*/

if limit == -1 {

limit = 999999999

}

consumer := groupConsumer{

ready: make(chan bool),

msgCh: make(chan *sarama.ConsumerMessage),

limit: limit,

}

defer close(consumer.msgCh)

var messages []*sarama.ConsumerMessage

ctx, ctxCancel := context.WithCancel(context.Background())

wg := &sync.WaitGroup{}

wg.Add(1)

go func() {

defer wg.Done()

for {

// `Consume` should be called inside an infinite loop, when a

// server-side rebalance happens, the consumer session will need

to be

// recreated to get the new claims

pConsumer := &consumer

if err := c.consumer.Consume(ctx, strings.Split(topics, ","),

pConsumer); err != nil {

fmt.Println("Error from consumer:", err)

}

// check if context was cancelled, signaling that the consumer should

stop

if ctx.Err() != nil {

fmt.Println("Consume Error from consumer:", ctx.Err())

return

}

consumer.ready = make(chan bool) // exited by rebalance, so not ready,

call Consume again

}

}()

<-consumer.ready // Await till the consumer has been set up the first time(channel

closed in SetUp())

fmt.Println("Sarama consumer up and running!...")

// read until timeout or limit

reading := true

for reading {

select {

case <-ctx.Done():

fmt.Println("consumer group terminating: context cancelled")

reading = false

case <-time.After(time.Second * time.Duration(timeoutSec)):

fmt.Println("consumer group terminating: by timeout")

reading = false

case msg := <-consumer.msgCh:

if msg == nil {

reading = false

} else {

messages = append(messages, msg)

consumer.received++

if consumer.received == consumer.limit {

fmt.Println("consumer group terminating: by count

limit")

reading = false

}

}

}

}

ctxCancel()

wg.Wait()

return messages, nil

}

func main() {

log.Info().Interface("consumer messsage", "begin").Send()

//读取配置文件

var conf Config

yamlfile,err := ioutil.ReadFile("config.yaml")

if err!= nil{

fmt.Println("read config err",err)

}

err2 := yaml.Unmarshal(yamlfile,&conf)

if err2!=nil{

fmt.Println("unmarsharl config err",err2)

return

}

fmt.Println("config:",conf)

brokers := conf.Brokers

groupID := conf.ComsumerGrp

topicName := conf.Topic

consumerGroupClient, err := newConsumerGroupClient(brokers, groupID,conf)

if err != nil {

fmt.Println("newConsumerGroupClient error => ", err)

panic(err)

}

//消费消息

msg, err := consumerGroupClient.ConsumeMessage(topicName, 10, -1)

if err != nil {

fmt.Println("ConsumeMessage error => ", err)

}

jmsg, _ := json.Marshal(msg)

fmt.Println("ConsumeMessage success => ", string((jmsg)))

}




意见反馈

文档内容是否对您有帮助?

如您有其他疑问,您也可以通过在线客服来与我们联系探讨 在线客服

联系我们
回到顶部