162 lines
2.9 KiB
Go
162 lines
2.9 KiB
Go
package api
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
maelstrom "github.com/jepsen-io/maelstrom/demo/go"
|
|
)
|
|
|
|
func NewServer(node *maelstrom.Node) (*server, error) {
|
|
var prefix [2]byte
|
|
_, err := rand.Read(prefix[:])
|
|
if err != nil {
|
|
return nil, fmt.Errorf("reading random data: %w", err)
|
|
}
|
|
|
|
s := &server{
|
|
node: node,
|
|
prefix: prefix,
|
|
}
|
|
|
|
s.setup()
|
|
|
|
return s, nil
|
|
}
|
|
|
|
type server struct {
|
|
node *maelstrom.Node
|
|
|
|
topology map[string][]string
|
|
|
|
nextSeqId uint64
|
|
prefix [2]byte
|
|
|
|
storeMu sync.Mutex
|
|
store []int
|
|
}
|
|
|
|
func (s *server) Run() error {
|
|
return s.node.Run()
|
|
}
|
|
|
|
func (s *server) setup() {
|
|
s.node.Handle("generate", s.handleGenerate)
|
|
s.node.Handle("broadcast", genericHandler(s, s.handleBroadcast))
|
|
s.node.Handle("read", genericHandler(s, s.handleBroadcastRead))
|
|
s.node.Handle("topology", genericHandler(s, s.handleTopology))
|
|
}
|
|
|
|
type generateResponse struct {
|
|
maelstrom.MessageBody
|
|
ID string `json:"id"`
|
|
}
|
|
|
|
func (s *server) handleGenerate(msg maelstrom.Message) error {
|
|
outbody := [10]byte{
|
|
s.prefix[0],
|
|
s.prefix[1],
|
|
}
|
|
|
|
id := atomic.AddUint64(&s.nextSeqId, 1)
|
|
|
|
binary.BigEndian.PutUint64(outbody[2:], id)
|
|
out := generateResponse{
|
|
ID: hex.EncodeToString(outbody[:]),
|
|
}
|
|
|
|
out.Type = "generate_ok"
|
|
|
|
return s.node.Reply(msg, out)
|
|
}
|
|
|
|
type broadcastMessage struct {
|
|
Message int `json:"message"`
|
|
}
|
|
|
|
type broadcastResponse struct {
|
|
maelstrom.MessageBody
|
|
}
|
|
|
|
func (s *server) handleBroadcast(msg broadcastMessage) (broadcastResponse, error) {
|
|
value := msg.Message
|
|
|
|
s.storeMu.Lock()
|
|
defer s.storeMu.Unlock()
|
|
|
|
s.store = append(s.store, value)
|
|
|
|
response := broadcastResponse{
|
|
MessageBody: maelstrom.MessageBody{
|
|
Type: "broadcast_ok",
|
|
},
|
|
}
|
|
return response, nil
|
|
}
|
|
|
|
type broadcastReadMessage struct{}
|
|
|
|
type broadcastReadResponse struct {
|
|
maelstrom.MessageBody
|
|
|
|
Messages []int `json:"messages"`
|
|
}
|
|
|
|
func (s *server) handleBroadcastRead(msg broadcastReadMessage) (broadcastReadResponse, error) {
|
|
s.storeMu.Lock()
|
|
defer s.storeMu.Unlock()
|
|
|
|
response := broadcastReadResponse{
|
|
MessageBody: maelstrom.MessageBody{
|
|
Type: "read_ok",
|
|
},
|
|
Messages: make([]int, len(s.store)),
|
|
}
|
|
|
|
copy(response.Messages, s.store)
|
|
|
|
return response, nil
|
|
}
|
|
|
|
type topologyMessage struct{
|
|
Topology map[string][]string `json:"topology"`
|
|
}
|
|
|
|
type topologyResponse struct{
|
|
maelstrom.MessageBody
|
|
}
|
|
|
|
func (s *server) handleTopology(msg topologyMessage) (topologyResponse, error) {
|
|
s.topology = msg.Topology
|
|
|
|
response := topologyResponse{
|
|
MessageBody: maelstrom.MessageBody{
|
|
Type: "topology_ok",
|
|
},
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
func genericHandler[T, U any](s *server, h func(msg T) (U, error)) maelstrom.HandlerFunc {
|
|
return func(msg maelstrom.Message) error {
|
|
var body T
|
|
err := json.Unmarshal(msg.Body, &body)
|
|
if err != nil {
|
|
return fmt.Errorf("unmarshalling body: %w", err)
|
|
}
|
|
|
|
resp, err := h(body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return s.node.Reply(msg, resp)
|
|
}
|
|
}
|