loop.go 788 B

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. package wel
  2. import (
  3. "active/tools"
  4. "encoding/json"
  5. "fmt"
  6. "git.jiaxianghudong.com/webs/pkg/xkafka"
  7. )
  8. type WelYaml struct {
  9. WelKafka xkafka.Options `json:"wel_kafka" yaml:"wel_kafka"`
  10. }
  11. var (
  12. hallSend chan map[string]interface{}
  13. )
  14. func SendWel(msg map[string]interface{}) {
  15. hallSend <- msg
  16. }
  17. func LoopSend() {
  18. config := WelYaml{}
  19. hallSend = make(chan map[string]interface{}, 1000)
  20. tools.ReloadConfig("./active.yaml", &config)
  21. producer := xkafka.Producer{}
  22. err := producer.Init(config.WelKafka)
  23. fmt.Printf("kafka %+v \n", config)
  24. if err != nil {
  25. fmt.Println("kafka 没有配置", err)
  26. return
  27. }
  28. for {
  29. select {
  30. case msg := <-hallSend:
  31. bytes, _ := json.Marshal(msg)
  32. fmt.Println("debugs kafka", string(bytes))
  33. producer.SendMessage(string(bytes))
  34. }
  35. }
  36. }