goraft中所有节点的状态

huangapple go评论86阅读模式
英文:

Status of all nodes in goraft

问题

我可以为您提供一些解决方案:

  1. 在您的代码中,您可以通过在节点成为新的主服务器时发送网络请求来实现节点发送“我是新的领导者”的消息。您可以在main.go文件中的main函数中添加以下代码来实现:
import (
    "net/http"
    "bytes"
)

func main() {
    // ...

    // Start the server
    log.Fatal(s.ListenAndServe("localhost:2001"))

    // Send message when becoming the leader
    if s.raftServer.State() == raft.Leader {
        sendLeaderMessage()
    }
}

func sendLeaderMessage() {
    // Create a JSON payload for the message
    payload := []byte(`{"message": "I am the new LEADER"}`)

    // Send a POST request to the other nodes
    nodes := []string{"2002", "2003", "2004"}
    for _, node := range nodes {
        url := fmt.Sprintf("http://%s/leader", node)
        _, err := http.Post(url, "application/json", bytes.NewBuffer(payload))
        if err != nil {
            log.Printf("Failed to send leader message to node %s: %v", node, err)
        }
    }
}

然后,您可以在每个节点的服务器代码中添加一个处理/leader路由的处理程序,以接收并处理来自新领导者的消息。

  1. 另一种方法是使用Raft库中提供的回调函数来实现此功能。您可以在main.go文件中的main函数中添加以下代码:
func main() {
    // ...

    // Register a callback function for leadership changes
    s.raftServer.RegisterListener(&raft.DefaultListener{
        OnLeadershipChange: func(newLeader string) {
            if newLeader == s.raftServer.Name() {
                sendLeaderMessage()
            }
        },
    })

    // Start the server
    log.Fatal(s.ListenAndServe("localhost:2001"))
}

这样,当节点成为新的主服务器时,回调函数将被触发,并且您可以在其中调用sendLeaderMessage函数来发送消息。

请注意,这只是一些可能的解决方案之一,具体取决于您的需求和代码结构。您可能需要根据您的实际情况进行适当的调整和修改。

英文:

I have cluster of 4nodes 2001,2002,2003 & 2004.
They are bind using goraft.
Supppose 2001 is master server.
Now when it fails, another node becomes the server.
Now what I want is that, the node which becomes the current server should send message that I am the new LEADER.
So how to achieve that?
I am using GORAFT with GORAFD implementation.
I am here attaching the source code.

main.go - For CLient

package main

import (
	"flag"
	"fmt"
	"github.com/goraft/raft"
	"github.com/goraft/raftd/command"
	"github.com/goraft/raftd/server"
	"log"
	"math/rand"
	"os"
	"time"
	"strconv"
)

var verbose bool
var trace bool
var debug bool
var host string
var port int
var join string

func init() {
	flag.Parse()
	flag.BoolVar(&verbose, "v", false, "verbose logging")
	flag.BoolVar(&trace, "trace", false, "Raft trace debugging")
	flag.BoolVar(&debug, "debug", false, "Raft debugging")
	flag.StringVar(&host, "h", "localhost", "hostname")
	p,_:=strconv.Atoi(flag.Arg(1))
	flag.IntVar(&port, "p", p, "port")
	flag.StringVar(&join, "join", "", "host:port of leader to join")
	flag.Usage = func() {
		fmt.Fprintf(os.Stderr, "Usage: %s [arguments] <data-path> \n", os.Args[0])
		flag.PrintDefaults()
	}
}

func main() {
	log.SetFlags(0)
	flag.Parse()
	if verbose {
		log.Print("Verbose logging enabled.")
	}
	if trace {
		raft.SetLogLevel(raft.Trace)
		log.Print("Raft trace debugging enabled.")
	} else if debug {
		raft.SetLogLevel(raft.Debug)
		log.Print("Raft debugging enabled.")
	}

	rand.Seed(time.Now().UnixNano())
	
	// Setup commands.
	raft.RegisterCommand(&command.WriteCommand{})

	// Set the data directory.
	if flag.NArg() == 0 {
		flag.Usage()
		log.Fatal("Data path argument required")
	}
	path := flag.Arg(0)
	if err := os.MkdirAll(path, 0744); err != nil {
		log.Fatalf("Unable to create path: %v", err)
	}

	log.SetFlags(log.LstdFlags)
	s := server.New(path, host, port)
	log.Fatal(s.ListenAndServe("localhost:2001"))
	fmt.Println("I am changing my status");
}

Main.go - for Server i.e 2001

package main

import (
	"flag"
	"fmt"
	"github.com/goraft/raft"
	"github.com/goraft/raftd/command"
	"github.com/goraft/raftd/server"
	"log"
	"math/rand"
	"os"
	"time"
	"strconv"
)

var verbose bool
var trace bool
var debug bool
var host string
var port int
var join string

func init() {
	flag.Parse()
	flag.BoolVar(&verbose, "v", false, "verbose logging")
	flag.BoolVar(&trace, "trace", false, "Raft trace debugging")
	flag.BoolVar(&debug, "debug", false, "Raft debugging")
	flag.StringVar(&host, "h", "localhost", "hostname")
	p,_:=strconv.Atoi(flag.Arg(1))
	flag.IntVar(&port, "p", p, "port")
	flag.StringVar(&join, "join", "", "host:port of leader to join")
	flag.Usage = func() {
		fmt.Fprintf(os.Stderr, "Usage: %s [arguments] <data-path> \n", os.Args[0])
		flag.PrintDefaults()
	}
}

func main() {
	log.SetFlags(0)
	flag.Parse()
	if verbose {
		log.Print("Verbose logging enabled.")
	}
	if trace {
		raft.SetLogLevel(raft.Trace)
		log.Print("Raft trace debugging enabled.")
	} else if debug {
		raft.SetLogLevel(raft.Debug)
		log.Print("Raft debugging enabled.")
	}

	rand.Seed(time.Now().UnixNano())
	
	// Setup commands.
	raft.RegisterCommand(&command.WriteCommand{})

	// Set the data directory.
	if flag.NArg() == 0 {
		flag.Usage()
		log.Fatal("Data path argument required")
	}
	path := flag.Arg(0)
	if err := os.MkdirAll(path, 0744); err != nil {
		log.Fatalf("Unable to create path: %v", err)
	}

	log.SetFlags(log.LstdFlags)
	s := server.New(path, host, port)
	log.Fatal(s.ListenAndServe(join))
}

Common Server.go code

package server

import (
	"bytes"
	"encoding/json"
	"fmt"
	"github.com/goraft/raft"
	"github.com/goraft/raftd/command"
	"github.com/goraft/raftd/db"
	"github.com/gorilla/mux"
	"io/ioutil"
	"log"
	"math/rand"
	"net/http"
	"path/filepath"
	"sync"
	"time"
)

// The raftd server is a combination of the Raft server and an HTTP
// server which acts as the transport.
type Server struct {
	name       string
	host       string
	port       int
	path       string
	router     *mux.Router
	raftServer raft.Server
	httpServer *http.Server
	db         *db.DB
	mutex      sync.RWMutex
}

// Creates a new server.
func New(path string, host string, port int) *Server {
	s := &Server{
		host:   host,
		port:   port,
		path:   path,
		db:     db.New(),
		router: mux.NewRouter(),
	}

	// Read existing name or generate a new one.
	if b, err := ioutil.ReadFile(filepath.Join(path, "name")); err == nil {
		s.name = string(b)
	} else {
		s.name = fmt.Sprintf("%07x", rand.Int())[0:7]
		if err = ioutil.WriteFile(filepath.Join(path, "name"), []byte(s.name), 0644); err != nil {
			panic(err)
		}
	}

	return s
}

// Returns the connection string.
func (s *Server) connectionString() string {
	return fmt.Sprintf("http://%s:%d", s.host, s.port)
}

// Starts the server.
func (s *Server) ListenAndServe(leader string) error {
	var err error

	log.Printf("Initializing Raft Server: %s", s.path)

	// Initialize and start Raft server.
	transporter := raft.NewHTTPTransporter("/raft", 200*time.Millisecond)
	s.raftServer, err = raft.NewServer(s.name, s.path, transporter, nil, s.db, "")
	if err != nil {
		log.Fatal(err)
	}
	transporter.Install(s.raftServer, s)
	s.raftServer.Start()

	if leader != "" {
		// Join to leader if specified.

		log.Println("Attempting to join leader:", leader)

		if !s.raftServer.IsLogEmpty() {
			log.Fatal("Cannot join with an existing log")
		}
		if err := s.Join(leader); err != nil {
			log.Fatal(err)
		}

	} else if s.raftServer.IsLogEmpty() {
		// Initialize the server by joining itself.

		log.Println("Initializing new cluster")

		_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
			Name:             s.raftServer.Name(),
			ConnectionString: s.connectionString(),
		})
		if err != nil {
			log.Fatal(err)
		}

	} else {
		log.Println("Recovered from log")
	}

	log.Println("Initializing HTTP server")

	// Initialize and start HTTP server.
	s.httpServer = &http.Server{
		Addr:    fmt.Sprintf(":%d", s.port),
		Handler: s.router,
	}

	s.router.HandleFunc("/db/{key}", s.readHandler).Methods("GET")
	s.router.HandleFunc("/db/{key}", s.writeHandler).Methods("POST")
	s.router.HandleFunc("/join", s.joinHandler).Methods("POST")

	log.Println("Listening at:", s.connectionString())

	return s.httpServer.ListenAndServe()
}

// This is a hack around Gorilla mux not providing the correct net/http
// HandleFunc() interface.
func (s *Server) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
	s.router.HandleFunc(pattern, handler)
}

// Joins to the leader of an existing cluster.
func (s *Server) Join(leader string) error {
	command := &raft.DefaultJoinCommand{
		Name:     s.raftServer.Name(),
		ConnectionString: s.connectionString(),
	}

	var b bytes.Buffer
	json.NewEncoder(&b).Encode(command)
	resp, err := http.Post(fmt.Sprintf("http://%s/join", leader), "application/json", &b)
	if err != nil {
		return err
	}
	resp.Body.Close()

	return nil
}

func (s *Server) joinHandler(w http.ResponseWriter, req *http.Request) {
	command := &raft.DefaultJoinCommand{}

	if err := json.NewDecoder(req.Body).Decode(&command); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	if _, err := s.raftServer.Do(command); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
}

func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) {
	vars := mux.Vars(req)
	value := s.db.Get(vars["key"])
	w.Write([]byte(value))
}

func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) {
	vars := mux.Vars(req)

	// Read the value from the POST body.
	b, err := ioutil.ReadAll(req.Body)
	if err != nil {
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	value := string(b)

	// Execute the command against the Raft server.
	_, err = s.raftServer.Do(command.NewWriteCommand(vars["key"], value))
	if err != nil {
		http.Error(w, err.Error(), http.StatusBadRequest)
	}
}

Please give some solutions.

答案1

得分: 1

我做到了。

我刚刚在goraft库代码中插入了一个新行,该行用于选择领导者。

所以要做的就是打开goraft的server.go文件,并进行以下更改。

原始的Server.go - 第[287-309]行

// 设置服务器的状态。
func (s *server) setState(state string) {
    s.mutex.Lock()
    defer s.mutex.Unlock()

    // 暂时存储先前的值。
    prevState := s.state
    prevLeader := s.leader

    // 更新状态和领导者。
    s.state = state
    if state == Leader {
        s.leader = s.Name()
        s.syncedPeer = make(map[string]bool)
    }

    // 分发状态和领导者更改事件。
    s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))

    if prevLeader != s.leader {
        s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
    }
}

编辑后的Server.go

// 设置服务器的状态。
func (s *server) setState(state string) {
    s.mutex.Lock()
    defer s.mutex.Unlock()

    // 暂时存储先前的值。
    prevState := s.state
    prevLeader := s.leader

    // 更新状态和领导者。
    s.state = state
    if state == Leader {
        s.leader = s.Name()
        s.syncedPeer = make(map[string]bool)
    }

    // 分发状态和领导者更改事件。
    s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))

    if prevLeader != s.leader {
        fmt.Println("我是领导者..!!", s.connectionString, "   ", s.path)
        s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
    }
}

这样,它将在活动主服务器的控制台上打印当前服务器的连接字符串和存储路径。

英文:

I did it.

I have just insertednew line in goraft-library code where leader selection happens.

So to make it just goto
server.go file of goraft and make following changes.

Original Server.go - line [287-309]

// Sets the state of the server.
func (s *server) setState(state string) {
	s.mutex.Lock()
	defer s.mutex.Unlock()

	// Temporarily store previous values.
	prevState := s.state
	prevLeader := s.leader

	// Update state and leader.
	s.state = state
	if state == Leader {
		s.leader = s.Name()
		s.syncedPeer = make(map[string]bool)
	}

	// Dispatch state and leader change events.
	s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))

	if prevLeader != s.leader {
		s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
	}
}

Edited Server.go

// Sets the state of the server.
func (s *server) setState(state string) {
	s.mutex.Lock()
	defer s.mutex.Unlock()

	// Temporarily store previous values.
	prevState := s.state
	prevLeader := s.leader

	// Update state and leader.
	s.state = state
	if state == Leader {
		s.leader = s.Name()
		s.syncedPeer = make(map[string]bool)
	}

	// Dispatch state and leader change events.
	s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))

	if prevLeader != s.leader {
		fmt.Println("I am the Leader..!!  ",s.connectionString,"   ",s.path)
		s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
	}
}

So it will print connection Stirng as well as storage path of the current server on the console of active master server.

huangapple
  • 本文由 发表于 2016年2月17日 17:19:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/35452424.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定