package api import ( "context" "crypto/rand" "encoding/binary" "encoding/hex" "encoding/json" "errors" "fmt" "sync" "sync/atomic" maelstrom "github.com/jepsen-io/maelstrom/demo/go" ) type readHandler uint8 const ( readHandlerStart readHandler = iota readHandlerBroadcast readHandlerCounter readHandlerEnd ) const currentReadHandler = readHandlerCounter const counterGlobalKey = "global_counter_kv" func init() { if currentReadHandler <= readHandlerStart { panic("read handler is too small") } if currentReadHandler >= readHandlerEnd { panic("read handle is too big") } } func NewServer(ctx context.Context, node *maelstrom.Node) (*server, error) { var prefix [2]byte _, err := rand.Read(prefix[:]) if err != nil { return nil, fmt.Errorf("reading random data: %w", err) } s := &server{ baseCtx: ctx, node: node, prefix: prefix, kv: maelstrom.NewSeqKV(node), } s.setup() return s, nil } type server struct { baseCtx context.Context node *maelstrom.Node kv *maelstrom.KV topology map[string][]string nextSeqId uint64 prefix [2]byte storeMu sync.Mutex store []int } func (s *server) Run() error { return s.node.Run() } func (s *server) setup() { s.node.Handle("generate", s.handleGenerate) s.node.Handle("broadcast", genericHandler(s, s.handleBroadcast)) s.node.Handle("topology", genericHandler(s, s.handleTopology)) s.node.Handle("add", genericHandler(s, s.handleAdd)) switch currentReadHandler { case readHandlerCounter: s.node.Handle("read", genericHandler(s, s.handleCounterRead)) case readHandlerBroadcast: s.node.Handle("read", genericHandler(s, s.handleBroadcastRead)) } } type generateResponse struct { maelstrom.MessageBody ID string `json:"id"` } func (s *server) handleGenerate(msg maelstrom.Message) error { outbody := [10]byte{ s.prefix[0], s.prefix[1], } id := atomic.AddUint64(&s.nextSeqId, 1) binary.BigEndian.PutUint64(outbody[2:], id) out := generateResponse{ ID: hex.EncodeToString(outbody[:]), } out.Type = "generate_ok" return s.node.Reply(msg, out) } type broadcastMessage struct { Message int `json:"message"` } type broadcastResponse struct { maelstrom.MessageBody } func (s *server) handleBroadcast(msg broadcastMessage) (broadcastResponse, error) { value := msg.Message s.storeMu.Lock() defer s.storeMu.Unlock() s.store = append(s.store, value) response := broadcastResponse{ MessageBody: maelstrom.MessageBody{ Type: "broadcast_ok", }, } return response, nil } type genericReadMessage struct{} type broadcastReadResponse struct { maelstrom.MessageBody Messages []int `json:"messages"` } func (s *server) handleBroadcastRead(msg genericReadMessage) (broadcastReadResponse, error) { s.storeMu.Lock() defer s.storeMu.Unlock() response := broadcastReadResponse{ MessageBody: maelstrom.MessageBody{ Type: "read_ok", }, Messages: make([]int, len(s.store)), } copy(response.Messages, s.store) return response, nil } type topologyMessage struct { Topology map[string][]string `json:"topology"` } type topologyResponse struct { maelstrom.MessageBody } func (s *server) handleTopology(msg topologyMessage) (resp topologyResponse, err error) { s.topology = msg.Topology resp.MessageBody = responseByType("topology_ok") return resp, nil } type addMessage struct { Delta int `json:"delta"` } type addMessageResponse struct { maelstrom.MessageBody } func (s *server) handleAdd(msg addMessage) (resp addMessageResponse, err error) { var done bool for !done { current, err := s.kv.ReadInt(s.baseCtx, counterGlobalKey) if err != nil { switch terr := err.(type) { case *maelstrom.RPCError: if terr.Code == maelstrom.KeyDoesNotExist { break } return resp, fmt.Errorf("reading current value: %w", err) default: return resp, fmt.Errorf("reading current value: %w", err) } } next := current + msg.Delta err = s.kv.CompareAndSwap(s.baseCtx, counterGlobalKey, current, next, true) if err != nil { rpcErr := new(maelstrom.RPCError) if errors.As(err, &rpcErr) { if rpcErr.Code == maelstrom.PreconditionFailed { continue } return resp, fmt.Errorf("updating value: %w", err) } } break } resp.MessageBody = responseByType("add_ok") return resp, nil } type counterReadResponse struct { maelstrom.MessageBody Value int `json:"value"` } func (s *server) handleCounterRead(msg genericReadMessage) (resp counterReadResponse, err error) { resp.Value, err = s.kv.ReadInt(s.baseCtx, counterGlobalKey) if err != nil { return resp, fmt.Errorf("reading value: %e", err) } resp.MessageBody = responseByType("read_ok") return resp, nil } func genericHandler[T, U any](s *server, h func(msg T) (U, error)) maelstrom.HandlerFunc { return func(msg maelstrom.Message) error { var body T err := json.Unmarshal(msg.Body, &body) if err != nil { return fmt.Errorf("unmarshalling body: %w", err) } resp, err := h(body) if err != nil { return err } return s.node.Reply(msg, resp) } } func responseByType(msgType string) maelstrom.MessageBody { return maelstrom.MessageBody{ Type: msgType, } }