gateway-m2/main.go
2025-11-10 15:16:38 +07:00

268 lines
5.8 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"net/http/httputil"
"net/url"
"os"
"strings"
"sync"
"time"
pb "forth.rd/tbm-gateway/registry"
"github.com/go-chi/chi/v5"
"github.com/golang-jwt/jwt/v5"
"github.com/joho/godotenv"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// var jwtSecret = []byte(os.Getenv("JWT_SECRET"))
// var apiKey = os.Getenv("API_KEY")
var (
jwtSecret string
apiKey string
registrySecret string
devMode bool
)
type Instance struct {
URL string
LastSeen time.Time
Healthy bool
}
type Registry struct {
mu sync.RWMutex
services map[string][]*Instance
rr map[string]int
}
func NewRegistry() *Registry {
return &Registry{services: map[string][]*Instance{}, rr: map[string]int{}}
}
func (r *Registry) Register(name, url string) {
r.mu.Lock()
defer r.mu.Unlock()
for _, i := range r.services[name] {
if i.URL == url {
i.LastSeen = time.Now()
i.Healthy = true
return
}
}
r.services[name] = append(r.services[name], &Instance{URL: url, Healthy: true, LastSeen: time.Now()})
}
func (r *Registry) Heartbeat(name, url string) {
r.mu.Lock()
defer r.mu.Unlock()
for _, i := range r.services[name] {
if i.URL == url {
i.LastSeen = time.Now()
i.Healthy = true
return
}
}
}
func (r *Registry) Deregister(name, url string) {
r.mu.Lock()
defer r.mu.Unlock()
insts := r.services[name]
out := insts[:0]
for _, i := range insts {
if i.URL == url {
out = append(out, i)
}
}
r.services[name] = out
}
func (r *Registry) NextHealty(name string) *Instance {
r.mu.Lock()
defer r.mu.Unlock()
insts := r.services[name]
if len(insts) == 0 {
return nil
}
for range insts {
idx := r.rr[name] % len(insts)
r.rr[name]++
i := insts[idx]
if i.Healthy {
return i
}
}
return nil
}
type registryServer struct {
pb.UnimplementedRegistryServer
r *Registry
}
func (s *registryServer) Register(ctx context.Context, info *pb.ServiceInfo) (*pb.RegisterResponse, error) {
if info.Token != registrySecret {
return nil, status.Error(codes.PermissionDenied, "invalid token")
}
s.r.Register(info.Name, info.Url)
return &pb.RegisterResponse{Ok: true}, nil
}
func (s *registryServer) Heartbeat(ctx context.Context, hb *pb.ServiceHeartbeat) (*pb.HeartbeatResponse, error) {
s.r.Heartbeat(hb.Name, hb.Url)
return &pb.HeartbeatResponse{Ok: true}, nil
}
func (s *registryServer) Deregister(ctx context.Context, id *pb.ServiceID) (*pb.DeregisterResponse, error) {
s.r.Deregister(id.Name, id.Url)
return &pb.DeregisterResponse{Ok: true}, nil
}
func (s *registryServer) GetServiceList(ctx context.Context, _ *pb.Empty) (*pb.ServiceList, error) {
s.r.mu.RLock()
defer s.r.mu.RUnlock()
resp := &pb.ServiceList{}
for name, insts := range s.r.services {
for _, i := range insts {
resp.Services = append(resp.Services, &pb.ServiceInfo{Name: name, Url: i.URL})
}
}
return resp, nil
}
func startGRPCServer(reg *Registry) {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatal(err)
}
s := grpc.NewServer()
pb.RegisterRegistryServer(s, &registryServer{r: reg})
log.Println("gRPC Registry listening on :50051")
go s.Serve(lis)
}
func verifyJWT(tokenString string) error {
token, err := jwt.Parse(tokenString, func(t *jwt.Token) (interface{}, error) {
if _, ok := t.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method")
}
return jwtSecret, nil
})
if err != nil || !token.Valid {
return fmt.Errorf("invalid token")
}
return nil
}
func authMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/healthz") || strings.HasPrefix(r.URL.Path, "/__registry") {
next.ServeHTTP(w, r)
return
}
if apiKey != "" && r.Header.Get("X-API-Key") == apiKey {
next.ServeHTTP(w, r)
return
}
authHeader := r.Header.Get("Authorization")
if strings.HasPrefix(authHeader, "Bearer ") {
token := strings.TrimPrefix(authHeader, "Bearer ")
if err := verifyJWT(token); err == nil {
next.ServeHTTP(w, r)
return
}
}
http.Error(w, "Unauthorized", http.StatusUnauthorized)
})
}
func main() {
godotenv.Load()
jwtSecret = os.Getenv("JWT_SECRET")
apiKey = os.Getenv("API_KEY")
if jwtSecret == "" || apiKey == "" {
fmt.Errorf("env value not ok")
os.Exit(1)
}
registrySecret = os.Getenv("REGISTRY_SECRET")
if registrySecret == "" {
devMode = true
} else {
devMode = false
}
reg := NewRegistry()
startGRPCServer(reg)
go func() {
for {
time.Sleep(30 * time.Second)
reg.mu.Lock()
for name, list := range reg.services {
newList := make([]*Instance, 0, len(list))
for _, i := range list {
if time.Since(i.LastSeen) < 60*time.Second {
newList = append(newList, i)
}
}
reg.services[name] = newList
}
reg.mu.Unlock()
}
}()
r := chi.NewRouter()
r.Use(authMiddleware)
r.Get("/healthz", func(w http.ResponseWriter, _ *http.Request) {
w.Write([]byte("gateway ok"))
})
r.Get("/_registry", func(w http.ResponseWriter, _ *http.Request) {
reg.mu.RLock()
defer reg.mu.RUnlock()
json.NewEncoder(w).Encode(reg.services)
})
r.HandleFunc("/*", func(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
seg := strings.Split(strings.TrimLeft(path, "/"), "/")[0]
inst := reg.NextHealty(seg)
if inst == nil {
http.Error(w, "no healthy instance", http.StatusServiceUnavailable)
return
}
target, _ := url.Parse(inst.URL)
proxy := httputil.NewSingleHostReverseProxy(target)
req.URL.Path = "/" + strings.Join(strings.Split(strings.TrimLeft(path, "/"), "/")[1:], "/")
proxy.ServeHTTP(w, req)
})
log.Println("HTTP Gateway running on :8080")
log.Fatal(http.ListenAndServe(":8080", r))
}