12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- package broadcast
- import (
- "active/tools"
- "encoding/json"
- "fmt"
- "git.jiaxianghudong.com/webs/pkg/hall"
- "git.jiaxianghudong.com/webs/pkg/xkafka"
- "sync"
- )
- type HallConfig struct {
- HallConfig []*hall.Server `json:"hall_config" yaml:"hall_config"`
- HallKafka xkafka.Options `json:"hall_kafka" yaml:"hall_kafka"`
- }
- var (
- sendPool = sync.Pool{
- New: func() interface{} { return new(MsgFormat) },
- }
- hallSend chan *MsgFormat
- )
- type MsgFormat struct {
- NoticeList []uint32 `json:"-"`
- Number string `json:"number"`
- }
- func SendBroadcast(msg string, userid uint32) {
- msgFormat := sendPool.Get().(*MsgFormat)
- msgFormat.NoticeList = []uint32{userid}
- msgFormat.Number = msg
- hallSend <- msgFormat
- }
- func LoopSend() {
- config := HallConfig{}
- hallSend = make(chan *MsgFormat)
- tools.ReloadConfig("./active.yaml", &config)
- producer := xkafka.Producer{}
- err := producer.Init(config.HallKafka)
- if err != nil {
- fmt.Println("没有配置", err)
- return
- }
- for {
- select {
- case msg := <-hallSend:
- list := []int64{}
- for i := range msg.NoticeList {
- list = append(list, int64(msg.NoticeList[i]))
- }
- message := xkafka.Message{
- Msg: msg.Number,
- NoticeList: list,
- T: "broadcast",
- SubT: "number",
- }
- bytes, _ := json.Marshal(message)
- producer.SendMessage(string(bytes))
- msg.NoticeList = make([]uint32, 0)
- sendPool.Put(msg)
- }
- }
- }
|