Files
flyio/internal/api/server.go

258 lines
5.1 KiB
Go

package api
import (
"context"
"crypto/rand"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"sync"
"sync/atomic"
maelstrom "github.com/jepsen-io/maelstrom/demo/go"
)
type readHandler uint8
const (
readHandlerStart readHandler = iota
readHandlerBroadcast
readHandlerCounter
readHandlerEnd
)
const currentReadHandler = readHandlerCounter
const counterGlobalKey = "global_counter_kv"
func init() {
if currentReadHandler <= readHandlerStart {
panic("read handler is too small")
}
if currentReadHandler >= readHandlerEnd {
panic("read handle is too big")
}
}
func NewServer(ctx context.Context, node *maelstrom.Node) (*server, error) {
var prefix [2]byte
_, err := rand.Read(prefix[:])
if err != nil {
return nil, fmt.Errorf("reading random data: %w", err)
}
s := &server{
baseCtx: ctx,
node: node,
prefix: prefix,
kv: maelstrom.NewSeqKV(node),
}
s.setup()
return s, nil
}
type server struct {
baseCtx context.Context
node *maelstrom.Node
kv *maelstrom.KV
topology map[string][]string
nextSeqId uint64
prefix [2]byte
storeMu sync.Mutex
store []int
}
func (s *server) Run() error {
return s.node.Run()
}
func (s *server) setup() {
s.node.Handle("generate", s.handleGenerate)
s.node.Handle("broadcast", genericHandler(s, s.handleBroadcast))
s.node.Handle("topology", genericHandler(s, s.handleTopology))
s.node.Handle("add", genericHandler(s, s.handleAdd))
switch currentReadHandler {
case readHandlerCounter:
s.node.Handle("read", genericHandler(s, s.handleCounterRead))
case readHandlerBroadcast:
s.node.Handle("read", genericHandler(s, s.handleBroadcastRead))
}
}
type generateResponse struct {
maelstrom.MessageBody
ID string `json:"id"`
}
func (s *server) handleGenerate(msg maelstrom.Message) error {
outbody := [10]byte{
s.prefix[0],
s.prefix[1],
}
id := atomic.AddUint64(&s.nextSeqId, 1)
binary.BigEndian.PutUint64(outbody[2:], id)
out := generateResponse{
ID: hex.EncodeToString(outbody[:]),
}
out.Type = "generate_ok"
return s.node.Reply(msg, out)
}
type broadcastMessage struct {
Message int `json:"message"`
}
type broadcastResponse struct {
maelstrom.MessageBody
}
func (s *server) handleBroadcast(msg broadcastMessage) (broadcastResponse, error) {
value := msg.Message
s.storeMu.Lock()
defer s.storeMu.Unlock()
s.store = append(s.store, value)
response := broadcastResponse{
MessageBody: maelstrom.MessageBody{
Type: "broadcast_ok",
},
}
return response, nil
}
type genericReadMessage struct{}
type broadcastReadResponse struct {
maelstrom.MessageBody
Messages []int `json:"messages"`
}
func (s *server) handleBroadcastRead(msg genericReadMessage) (broadcastReadResponse, error) {
s.storeMu.Lock()
defer s.storeMu.Unlock()
response := broadcastReadResponse{
MessageBody: maelstrom.MessageBody{
Type: "read_ok",
},
Messages: make([]int, len(s.store)),
}
copy(response.Messages, s.store)
return response, nil
}
type topologyMessage struct {
Topology map[string][]string `json:"topology"`
}
type topologyResponse struct {
maelstrom.MessageBody
}
func (s *server) handleTopology(msg topologyMessage) (resp topologyResponse, err error) {
s.topology = msg.Topology
resp.MessageBody = responseByType("topology_ok")
return resp, nil
}
type addMessage struct {
Delta int `json:"delta"`
}
type addMessageResponse struct {
maelstrom.MessageBody
}
func (s *server) handleAdd(msg addMessage) (resp addMessageResponse, err error) {
var done bool
for !done {
current, err := s.kv.ReadInt(s.baseCtx, counterGlobalKey)
if err != nil {
switch terr := err.(type) {
case *maelstrom.RPCError:
if terr.Code == maelstrom.KeyDoesNotExist {
break
}
return resp, fmt.Errorf("reading current value: %w", err)
default:
return resp, fmt.Errorf("reading current value: %w", err)
}
}
next := current + msg.Delta
err = s.kv.CompareAndSwap(s.baseCtx, counterGlobalKey, current, next, true)
if err != nil {
rpcErr := new(maelstrom.RPCError)
if errors.As(err, &rpcErr) {
if rpcErr.Code == maelstrom.PreconditionFailed {
continue
}
return resp, fmt.Errorf("updating value: %w", err)
}
}
break
}
resp.MessageBody = responseByType("add_ok")
return resp, nil
}
type counterReadResponse struct {
maelstrom.MessageBody
Value int `json:"value"`
}
func (s *server) handleCounterRead(msg genericReadMessage) (resp counterReadResponse, err error) {
resp.Value, err = s.kv.ReadInt(s.baseCtx, counterGlobalKey)
if err != nil {
return resp, fmt.Errorf("reading value: %e", err)
}
resp.MessageBody = responseByType("read_ok")
return resp, nil
}
func genericHandler[T, U any](s *server, h func(msg T) (U, error)) maelstrom.HandlerFunc {
return func(msg maelstrom.Message) error {
var body T
err := json.Unmarshal(msg.Body, &body)
if err != nil {
return fmt.Errorf("unmarshalling body: %w", err)
}
resp, err := h(body)
if err != nil {
return err
}
return s.node.Reply(msg, resp)
}
}
func responseByType(msgType string) maelstrom.MessageBody {
return maelstrom.MessageBody{
Type: msgType,
}
}