package wel import ( "active/tools" "encoding/json" "fmt" "git.jiaxianghudong.com/webs/pkg/xkafka" ) type WelYaml struct { WelKafka xkafka.Options `json:"wel_kafka" yaml:"wel_kafka"` } var ( hallSend chan map[string]interface{} ) func SendWel(msg map[string]interface{}) { hallSend <- msg } func LoopSend() { config := WelYaml{} hallSend = make(chan map[string]interface{}, 1000) tools.ReloadConfig("./active.yaml", &config) producer := xkafka.Producer{} err := producer.Init(config.WelKafka) fmt.Printf("kafka %+v \n", config) if err != nil { fmt.Println("kafka 没有配置", err) return } for { select { case msg := <-hallSend: bytes, _ := json.Marshal(msg) fmt.Println("debugs kafka", string(bytes)) producer.SendMessage(string(bytes)) } } }