From 8bb6797e247b814c92f8be137082f84ea1cba2a7 Mon Sep 17 00:00:00 2001 From: Aleksandr Trushkin Date: Thu, 13 Apr 2023 00:43:44 +0300 Subject: [PATCH] added g-counter support --- Makefile | 3 + cmd/main.go | 40 ++++++------ internal/api/server.go | 134 +++++++++++++++++++++++++++++++++++------ 3 files changed, 138 insertions(+), 39 deletions(-) diff --git a/Makefile b/Makefile index 3a93bc3..e4a8310 100644 --- a/Makefile +++ b/Makefile @@ -7,5 +7,8 @@ test_ids: build test_broadcast: build maelstrom test -w broadcast --bin ./flyio --time-limit 20 --rate 10 --node-count 1 +test_counter: build + maelstrom test -w g-counter --bin ./flyio --node-count 3 --rate 100 --time-limit 20 --nemesis partition + maelrun: build maelstrom serve diff --git a/cmd/main.go b/cmd/main.go index 53e710f..74ea79c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,28 +13,28 @@ import ( ) func main() { - ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) - defer cancel() + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() - err := app(ctx) - if err != nil { - log.Fatalf("running app: %v", err) - } - - os.Exit(0) -} - -func app(ctx context.Context,) error { - node := maelstrom.NewNode() - - srv, err := api.NewServer(node) - if err != nil { - return fmt.Errorf("making new server: %w", err) - } - err = srv.Run() + err := app(ctx) if err != nil { - return fmt.Errorf("running server: %w", err) + log.Fatalf("running app: %v", err) } - return nil + os.Exit(0) +} + +func app(ctx context.Context) error { + node := maelstrom.NewNode() + + srv, err := api.NewServer(ctx, node) + if err != nil { + return fmt.Errorf("making new server: %w", err) + } + err = srv.Run() + if err != nil { + return fmt.Errorf("running server: %w", err) + } + + return nil } diff --git a/internal/api/server.go b/internal/api/server.go index f0c7c60..dec6028 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -1,10 +1,12 @@ package api import ( + "context" "crypto/rand" "encoding/binary" "encoding/hex" "encoding/json" + "errors" "fmt" "sync" "sync/atomic" @@ -12,7 +14,30 @@ import ( maelstrom "github.com/jepsen-io/maelstrom/demo/go" ) -func NewServer(node *maelstrom.Node) (*server, error) { +type readHandler uint8 + +const ( + readHandlerStart readHandler = iota + readHandlerBroadcast + readHandlerCounter + readHandlerEnd +) + +const currentReadHandler = readHandlerCounter + +const counterGlobalKey = "global_counter_kv" + +func init() { + if currentReadHandler <= readHandlerStart { + panic("read handler is too small") + } + + if currentReadHandler >= readHandlerEnd { + panic("read handle is too big") + } +} + +func NewServer(ctx context.Context, node *maelstrom.Node) (*server, error) { var prefix [2]byte _, err := rand.Read(prefix[:]) if err != nil { @@ -20,8 +45,10 @@ func NewServer(node *maelstrom.Node) (*server, error) { } s := &server{ - node: node, - prefix: prefix, + baseCtx: ctx, + node: node, + prefix: prefix, + kv: maelstrom.NewSeqKV(node), } s.setup() @@ -30,9 +57,11 @@ func NewServer(node *maelstrom.Node) (*server, error) { } type server struct { - node *maelstrom.Node + baseCtx context.Context - topology map[string][]string + node *maelstrom.Node + kv *maelstrom.KV + topology map[string][]string nextSeqId uint64 prefix [2]byte @@ -48,8 +77,15 @@ func (s *server) Run() error { func (s *server) setup() { s.node.Handle("generate", s.handleGenerate) s.node.Handle("broadcast", genericHandler(s, s.handleBroadcast)) - s.node.Handle("read", genericHandler(s, s.handleBroadcastRead)) s.node.Handle("topology", genericHandler(s, s.handleTopology)) + s.node.Handle("add", genericHandler(s, s.handleAdd)) + + switch currentReadHandler { + case readHandlerCounter: + s.node.Handle("read", genericHandler(s, s.handleCounterRead)) + case readHandlerBroadcast: + s.node.Handle("read", genericHandler(s, s.handleBroadcastRead)) + } } type generateResponse struct { @@ -99,7 +135,7 @@ func (s *server) handleBroadcast(msg broadcastMessage) (broadcastResponse, error return response, nil } -type broadcastReadMessage struct{} +type genericReadMessage struct{} type broadcastReadResponse struct { maelstrom.MessageBody @@ -107,7 +143,7 @@ type broadcastReadResponse struct { Messages []int `json:"messages"` } -func (s *server) handleBroadcastRead(msg broadcastReadMessage) (broadcastReadResponse, error) { +func (s *server) handleBroadcastRead(msg genericReadMessage) (broadcastReadResponse, error) { s.storeMu.Lock() defer s.storeMu.Unlock() @@ -123,24 +159,78 @@ func (s *server) handleBroadcastRead(msg broadcastReadMessage) (broadcastReadRes return response, nil } -type topologyMessage struct{ - Topology map[string][]string `json:"topology"` +type topologyMessage struct { + Topology map[string][]string `json:"topology"` } -type topologyResponse struct{ +type topologyResponse struct { maelstrom.MessageBody } -func (s *server) handleTopology(msg topologyMessage) (topologyResponse, error) { - s.topology = msg.Topology +func (s *server) handleTopology(msg topologyMessage) (resp topologyResponse, err error) { + s.topology = msg.Topology - response := topologyResponse{ - MessageBody: maelstrom.MessageBody{ - Type: "topology_ok", - }, - } + resp.MessageBody = responseByType("topology_ok") + return resp, nil +} - return response, nil +type addMessage struct { + Delta int `json:"delta"` +} + +type addMessageResponse struct { + maelstrom.MessageBody +} + +func (s *server) handleAdd(msg addMessage) (resp addMessageResponse, err error) { + var done bool + for !done { + current, err := s.kv.ReadInt(s.baseCtx, counterGlobalKey) + if err != nil { + switch terr := err.(type) { + case *maelstrom.RPCError: + if terr.Code == maelstrom.KeyDoesNotExist { + break + } + return resp, fmt.Errorf("reading current value: %w", err) + default: + return resp, fmt.Errorf("reading current value: %w", err) + } + } + + next := current + msg.Delta + err = s.kv.CompareAndSwap(s.baseCtx, counterGlobalKey, current, next, true) + if err != nil { + rpcErr := new(maelstrom.RPCError) + if errors.As(err, &rpcErr) { + if rpcErr.Code == maelstrom.PreconditionFailed { + continue + } + return resp, fmt.Errorf("updating value: %w", err) + } + } + break + } + + resp.MessageBody = responseByType("add_ok") + return resp, nil +} + +type counterReadResponse struct { + maelstrom.MessageBody + + Value int `json:"value"` +} + +func (s *server) handleCounterRead(msg genericReadMessage) (resp counterReadResponse, err error) { + resp.Value, err = s.kv.ReadInt(s.baseCtx, counterGlobalKey) + if err != nil { + return resp, fmt.Errorf("reading value: %e", err) + } + + resp.MessageBody = responseByType("read_ok") + + return resp, nil } func genericHandler[T, U any](s *server, h func(msg T) (U, error)) maelstrom.HandlerFunc { @@ -159,3 +249,9 @@ func genericHandler[T, U any](s *server, h func(msg T) (U, error)) maelstrom.Han return s.node.Reply(msg, resp) } } + +func responseByType(msgType string) maelstrom.MessageBody { + return maelstrom.MessageBody{ + Type: msgType, + } +}