package broadcast import ( "active/tools" "encoding/json" "fmt" "git.jiaxianghudong.com/webs/pkg/hall" "git.jiaxianghudong.com/webs/pkg/xkafka" "sync" ) type HallConfig struct { HallConfig []*hall.Server `json:"hall_config" yaml:"hall_config"` HallKafka xkafka.Options `json:"hall_kafka" yaml:"hall_kafka"` } var ( sendPool = sync.Pool{ New: func() interface{} { return new(MsgFormat) }, } hallSend chan *MsgFormat ) type MsgFormat struct { NoticeList []uint32 `json:"-"` Number string `json:"number"` } func SendBroadcast(msg string, userid uint32) { msgFormat := sendPool.Get().(*MsgFormat) msgFormat.NoticeList = []uint32{userid} msgFormat.Number = msg hallSend <- msgFormat } func LoopSend() { config := HallConfig{} hallSend = make(chan *MsgFormat) tools.ReloadConfig("./active.yaml", &config) producer := xkafka.Producer{} err := producer.Init(config.HallKafka) if err != nil { fmt.Println("没有配置", err) return } for { select { case msg := <-hallSend: list := []int64{} for i := range msg.NoticeList { list = append(list, int64(msg.NoticeList[i])) } message := xkafka.Message{ Msg: msg.Number, NoticeList: list, T: "broadcast", SubT: "number", } bytes, _ := json.Marshal(message) producer.SendMessage(string(bytes)) msg.NoticeList = make([]uint32, 0) sendPool.Put(msg) } } }