xhall.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package broadcast
  2. import (
  3. "active/tools"
  4. "encoding/json"
  5. "fmt"
  6. "git.jiaxianghudong.com/webs/pkg/hall"
  7. "git.jiaxianghudong.com/webs/pkg/xkafka"
  8. "sync"
  9. )
  10. type HallConfig struct {
  11. HallConfig []*hall.Server `json:"hall_config" yaml:"hall_config"`
  12. HallKafka xkafka.Options `json:"hall_kafka" yaml:"hall_kafka"`
  13. }
  14. var (
  15. sendPool = sync.Pool{
  16. New: func() interface{} { return new(MsgFormat) },
  17. }
  18. hallSend chan *MsgFormat
  19. )
  20. type MsgFormat struct {
  21. NoticeList []uint32 `json:"-"`
  22. Number string `json:"number"`
  23. }
  24. func SendBroadcast(msg string, userid uint32) {
  25. msgFormat := sendPool.Get().(*MsgFormat)
  26. msgFormat.NoticeList = []uint32{userid}
  27. msgFormat.Number = msg
  28. hallSend <- msgFormat
  29. }
  30. func LoopSend() {
  31. config := HallConfig{}
  32. hallSend = make(chan *MsgFormat)
  33. tools.ReloadConfig("./active.yaml", &config)
  34. producer := xkafka.Producer{}
  35. err := producer.Init(config.HallKafka)
  36. if err != nil {
  37. fmt.Println("没有配置", err)
  38. return
  39. }
  40. for {
  41. select {
  42. case msg := <-hallSend:
  43. list := []int64{}
  44. for i := range msg.NoticeList {
  45. list = append(list, int64(msg.NoticeList[i]))
  46. }
  47. message := xkafka.Message{
  48. Msg: msg.Number,
  49. NoticeList: list,
  50. T: "broadcast",
  51. SubT: "number",
  52. }
  53. bytes, _ := json.Marshal(message)
  54. producer.SendMessage(string(bytes))
  55. msg.NoticeList = make([]uint32, 0)
  56. sendPool.Put(msg)
  57. }
  58. }
  59. }