From 07ec247b53fc52b35fc4c02a41214b4fad93b693 Mon Sep 17 00:00:00 2001 From: "pakintada@gmail.com" Date: Fri, 6 Sep 2024 14:44:44 +0700 Subject: [PATCH] feat(upgrade): :sparkles: WIP auto upgrade pipeline Add upgrade listener to server --- server/data/redis.go | 4 ++++ server/server.go | 41 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/server/data/redis.go b/server/data/redis.go index 072475f..fb20eb6 100644 --- a/server/data/redis.go +++ b/server/data/redis.go @@ -209,3 +209,7 @@ func (r *RedisCli) Add(key string, value interface{}) error { return err.Err() } + +func (r *RedisCli) Subscribe(topic string) *redis.PubSub { + return r.Client.Subscribe(context.Background(), topic) +} diff --git a/server/server.go b/server/server.go index 252ceeb..b25c029 100644 --- a/server/server.go +++ b/server/server.go @@ -22,13 +22,14 @@ import ( "strings" "github.com/jmoiron/sqlx" + "github.com/redis/go-redis/v9" "github.com/go-chi/chi/v5" "github.com/go-chi/cors" "go.uber.org/zap" ) -const VERSION = "1.0.1b" +const VERSION = "1.0.31" type Server struct { server *http.Server @@ -40,12 +41,19 @@ type Server struct { taoLogger *logger.TaoLogger } +var update_noti *redis.PubSub + func NewServer(cfg *config.ServerConfig, oauthService oauth.OAuthService) *Server { taoLogger := logger.NewTaoLogger(cfg) taoLogger.Log = taoLogger.Log.Named("Server") redisClient := data.NewRedisClient("redis:6379", "") + update_noti = redisClient.Subscribe("updater.noti") + _, err := update_noti.Receive(context.Background()) + if err != nil { + taoLogger.Log.Error("Failed to subscribe to updater.noti", zap.Error(err)) + } return &Server{ server: &http.Server{Addr: fmt.Sprintf(":%d", cfg.ServerPort)}, @@ -66,6 +74,7 @@ func (s *Server) Run() error { // log.Printf("Server running on %s", s.server.Addr) s.taoLogger.Log.Info("Server running", zap.String("addr", s.server.Addr)) s.taoLogger.Log.Info("Version", zap.String("version", VERSION)) + go s.UpgradeListener(context.Background()) defer func(Log *zap.Logger) { err := Log.Sync() @@ -199,14 +208,36 @@ func (s *Server) createHandler() { }) // display all routes [DEBUG] - chi.Walk(r, func(method string, route string, handler http.Handler, middlewares ...func(http.Handler) http.Handler) error { - //fmt.Println(method, " ---> ", route) - return nil - }) + // chi.Walk(r, func(method string, route string, handler http.Handler, middlewares ...func(http.Handler) http.Handler) error { + // //fmt.Println(method, " ---> ", route) + // return nil + // }) s.server.Handler = r } +func (s *Server) UpgradeListener(ctx context.Context) { + if update_noti != nil { + s.taoLogger.Log.Debug("Subscribed to updater.noti") + ch := update_noti.Channel() + + for msg := range ch { + switch msg.Channel { + case "updater.noti": + s.taoLogger.Log.Info("Received upgrade notification") + // s.server.Shutdown(ctx) + payload := msg.Payload + s.taoLogger.Log.Debug("Received payload", zap.Any("payload", payload)) + if payload == "new_version" { + s.taoLogger.Log.Info("New version available, shutting down server") + s.server.Shutdown(ctx) + } + } + } + } + +} + func (s *Server) Shutdown(ctx context.Context) error { return s.server.Shutdown(ctx) }