sub.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package model
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "sync/atomic"
  6. "time"
  7. "active/constant"
  8. "active/internal/model/scratchcard"
  9. "git.jiaxianghudong.com/go/logs"
  10. "git.jiaxianghudong.com/webs/pkg/rds"
  11. "github.com/go-redis/redis"
  12. )
  13. // GetAllActiceByType 活动数据
  14. func GetAllActiceByType(activeType string) ActiveSwitchAll {
  15. // 先读取缓存
  16. if data, ok := ActiveCache.Load("AllActive"); ok {
  17. if v, ok := data.(map[string]ActiveSwitchAll)[activeType]; ok {
  18. logs.Info("get AllActive by cahce!")
  19. return v
  20. }
  21. }
  22. var activeSwitchModel ActiveSwitchAll
  23. am, _ := activeSwitchModel.Get(activeType)
  24. if am.ID == "" {
  25. return am
  26. }
  27. c := map[string]ActiveSwitchAll{}
  28. c[activeType] = am
  29. ActiveCache.Store("AllActive", c)
  30. return am
  31. }
  32. // GetAllActice 数据缓存
  33. func GetAllActice() []ActiveSwitchAll {
  34. // 先读取缓存
  35. if data, ok := ActiveCacheArr.Load("AllActiveArr"); ok {
  36. return data.([]ActiveSwitchAll)
  37. }
  38. od, err := GetActiveSwitch()
  39. if err != nil {
  40. return od
  41. }
  42. logs.Info("Set AllActiveArr cahce success!")
  43. ActiveCacheArr.Store("AllActiveArr", od)
  44. return od
  45. }
  46. // SubRedisConfig 运行时配置订阅
  47. func SubRedisConfig() {
  48. r := rds.Register("redis8")
  49. _, err := r.Ping().Result()
  50. if err != nil {
  51. fmt.Println("配置订阅错误", err)
  52. }
  53. // 监听test频道
  54. pubSub := r.Subscribe(constant.SUBACTOPIC)
  55. // 订阅
  56. go func() {
  57. fmt.Println("配置订阅启动4")
  58. var receipt interface{}
  59. var err error
  60. for {
  61. receipt, err = pubSub.Receive()
  62. if err != nil {
  63. fmt.Println(err)
  64. }
  65. fmt.Println("配置订阅消息来了2")
  66. switch v := receipt.(type) {
  67. case *redis.Message: // 单个订阅subscribe
  68. fmt.Printf("%s: message: %s\n", v.Channel, v.Payload)
  69. switch v.Payload {
  70. case "AllActive":
  71. // 清除内容
  72. ActiveCache.Delete("AllActive")
  73. ActiveCacheArr.Delete("AllActiveArr")
  74. atomic.CompareAndSwapInt32(&rebateExpireAt, 1, 0)
  75. atomic.CompareAndSwapInt32(&invitebeanExpireAt, 1, 0)
  76. atomic.CompareAndSwapInt32(&scratchcard.ScratchCard, 1, 0)
  77. // 添加上传更新piggybank函数
  78. if constant.PiggyBank {
  79. sleep := rand.Int() % 1000
  80. time.Sleep(time.Duration(sleep) * time.Millisecond)
  81. val := rds.Redis.Get("piggybanksend_lock").Val()
  82. if val == "" {
  83. rds.Redis.Set("piggybanksend_lock", time.Now().Unix(), 30*time.Second)
  84. PubPiggyBankConf()
  85. }
  86. }
  87. fmt.Println("AllActive,内容订阅完毕")
  88. break
  89. default:
  90. }
  91. case error:
  92. fmt.Println("你好")
  93. return
  94. default:
  95. fmt.Println("pong....")
  96. }
  97. }
  98. }()
  99. }