added g-counter support

This commit is contained in:
2023-04-13 00:43:44 +03:00
parent f644078a3a
commit 8bb6797e24
3 changed files with 138 additions and 39 deletions

View File

@ -7,5 +7,8 @@ test_ids: build
test_broadcast: build test_broadcast: build
maelstrom test -w broadcast --bin ./flyio --time-limit 20 --rate 10 --node-count 1 maelstrom test -w broadcast --bin ./flyio --time-limit 20 --rate 10 --node-count 1
test_counter: build
maelstrom test -w g-counter --bin ./flyio --node-count 3 --rate 100 --time-limit 20 --nemesis partition
maelrun: build maelrun: build
maelstrom serve maelstrom serve

View File

@ -13,28 +13,28 @@ import (
) )
func main() { func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel() defer cancel()
err := app(ctx) err := app(ctx)
if err != nil {
log.Fatalf("running app: %v", err)
}
os.Exit(0)
}
func app(ctx context.Context,) error {
node := maelstrom.NewNode()
srv, err := api.NewServer(node)
if err != nil {
return fmt.Errorf("making new server: %w", err)
}
err = srv.Run()
if err != nil { if err != nil {
return fmt.Errorf("running server: %w", err) log.Fatalf("running app: %v", err)
} }
return nil os.Exit(0)
}
func app(ctx context.Context) error {
node := maelstrom.NewNode()
srv, err := api.NewServer(ctx, node)
if err != nil {
return fmt.Errorf("making new server: %w", err)
}
err = srv.Run()
if err != nil {
return fmt.Errorf("running server: %w", err)
}
return nil
} }

View File

@ -1,10 +1,12 @@
package api package api
import ( import (
"context"
"crypto/rand" "crypto/rand"
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -12,7 +14,30 @@ import (
maelstrom "github.com/jepsen-io/maelstrom/demo/go" maelstrom "github.com/jepsen-io/maelstrom/demo/go"
) )
func NewServer(node *maelstrom.Node) (*server, error) { type readHandler uint8
const (
readHandlerStart readHandler = iota
readHandlerBroadcast
readHandlerCounter
readHandlerEnd
)
const currentReadHandler = readHandlerCounter
const counterGlobalKey = "global_counter_kv"
func init() {
if currentReadHandler <= readHandlerStart {
panic("read handler is too small")
}
if currentReadHandler >= readHandlerEnd {
panic("read handle is too big")
}
}
func NewServer(ctx context.Context, node *maelstrom.Node) (*server, error) {
var prefix [2]byte var prefix [2]byte
_, err := rand.Read(prefix[:]) _, err := rand.Read(prefix[:])
if err != nil { if err != nil {
@ -20,8 +45,10 @@ func NewServer(node *maelstrom.Node) (*server, error) {
} }
s := &server{ s := &server{
node: node, baseCtx: ctx,
prefix: prefix, node: node,
prefix: prefix,
kv: maelstrom.NewSeqKV(node),
} }
s.setup() s.setup()
@ -30,9 +57,11 @@ func NewServer(node *maelstrom.Node) (*server, error) {
} }
type server struct { type server struct {
node *maelstrom.Node baseCtx context.Context
topology map[string][]string node *maelstrom.Node
kv *maelstrom.KV
topology map[string][]string
nextSeqId uint64 nextSeqId uint64
prefix [2]byte prefix [2]byte
@ -48,8 +77,15 @@ func (s *server) Run() error {
func (s *server) setup() { func (s *server) setup() {
s.node.Handle("generate", s.handleGenerate) s.node.Handle("generate", s.handleGenerate)
s.node.Handle("broadcast", genericHandler(s, s.handleBroadcast)) s.node.Handle("broadcast", genericHandler(s, s.handleBroadcast))
s.node.Handle("read", genericHandler(s, s.handleBroadcastRead))
s.node.Handle("topology", genericHandler(s, s.handleTopology)) s.node.Handle("topology", genericHandler(s, s.handleTopology))
s.node.Handle("add", genericHandler(s, s.handleAdd))
switch currentReadHandler {
case readHandlerCounter:
s.node.Handle("read", genericHandler(s, s.handleCounterRead))
case readHandlerBroadcast:
s.node.Handle("read", genericHandler(s, s.handleBroadcastRead))
}
} }
type generateResponse struct { type generateResponse struct {
@ -99,7 +135,7 @@ func (s *server) handleBroadcast(msg broadcastMessage) (broadcastResponse, error
return response, nil return response, nil
} }
type broadcastReadMessage struct{} type genericReadMessage struct{}
type broadcastReadResponse struct { type broadcastReadResponse struct {
maelstrom.MessageBody maelstrom.MessageBody
@ -107,7 +143,7 @@ type broadcastReadResponse struct {
Messages []int `json:"messages"` Messages []int `json:"messages"`
} }
func (s *server) handleBroadcastRead(msg broadcastReadMessage) (broadcastReadResponse, error) { func (s *server) handleBroadcastRead(msg genericReadMessage) (broadcastReadResponse, error) {
s.storeMu.Lock() s.storeMu.Lock()
defer s.storeMu.Unlock() defer s.storeMu.Unlock()
@ -123,24 +159,78 @@ func (s *server) handleBroadcastRead(msg broadcastReadMessage) (broadcastReadRes
return response, nil return response, nil
} }
type topologyMessage struct{ type topologyMessage struct {
Topology map[string][]string `json:"topology"` Topology map[string][]string `json:"topology"`
} }
type topologyResponse struct{ type topologyResponse struct {
maelstrom.MessageBody maelstrom.MessageBody
} }
func (s *server) handleTopology(msg topologyMessage) (topologyResponse, error) { func (s *server) handleTopology(msg topologyMessage) (resp topologyResponse, err error) {
s.topology = msg.Topology s.topology = msg.Topology
response := topologyResponse{ resp.MessageBody = responseByType("topology_ok")
MessageBody: maelstrom.MessageBody{ return resp, nil
Type: "topology_ok", }
},
}
return response, nil type addMessage struct {
Delta int `json:"delta"`
}
type addMessageResponse struct {
maelstrom.MessageBody
}
func (s *server) handleAdd(msg addMessage) (resp addMessageResponse, err error) {
var done bool
for !done {
current, err := s.kv.ReadInt(s.baseCtx, counterGlobalKey)
if err != nil {
switch terr := err.(type) {
case *maelstrom.RPCError:
if terr.Code == maelstrom.KeyDoesNotExist {
break
}
return resp, fmt.Errorf("reading current value: %w", err)
default:
return resp, fmt.Errorf("reading current value: %w", err)
}
}
next := current + msg.Delta
err = s.kv.CompareAndSwap(s.baseCtx, counterGlobalKey, current, next, true)
if err != nil {
rpcErr := new(maelstrom.RPCError)
if errors.As(err, &rpcErr) {
if rpcErr.Code == maelstrom.PreconditionFailed {
continue
}
return resp, fmt.Errorf("updating value: %w", err)
}
}
break
}
resp.MessageBody = responseByType("add_ok")
return resp, nil
}
type counterReadResponse struct {
maelstrom.MessageBody
Value int `json:"value"`
}
func (s *server) handleCounterRead(msg genericReadMessage) (resp counterReadResponse, err error) {
resp.Value, err = s.kv.ReadInt(s.baseCtx, counterGlobalKey)
if err != nil {
return resp, fmt.Errorf("reading value: %e", err)
}
resp.MessageBody = responseByType("read_ok")
return resp, nil
} }
func genericHandler[T, U any](s *server, h func(msg T) (U, error)) maelstrom.HandlerFunc { func genericHandler[T, U any](s *server, h func(msg T) (U, error)) maelstrom.HandlerFunc {
@ -159,3 +249,9 @@ func genericHandler[T, U any](s *server, h func(msg T) (U, error)) maelstrom.Han
return s.node.Reply(msg, resp) return s.node.Reply(msg, resp)
} }
} }
func responseByType(msgType string) maelstrom.MessageBody {
return maelstrom.MessageBody{
Type: msgType,
}
}