diff --git a/.gitignore b/.gitignore index adf8f72..ed00a91 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,6 @@ # Go workspace file go.work +# My ignores +store +flyio diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3a93bc3 --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ +build: + go build -o flyio cmd/main.go + +test_ids: build + maelstrom test -w unique-ids --bin ./flyio --time-limit 30 --rate 1000 --node-count 3 --availability total --nemesis partition + +test_broadcast: build + maelstrom test -w broadcast --bin ./flyio --time-limit 20 --rate 10 --node-count 1 + +maelrun: build + maelstrom serve diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..53e710f --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,40 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + + "git.loyso.art/frx/flyio/internal/api" + + maelstrom "github.com/jepsen-io/maelstrom/demo/go" +) + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + + err := app(ctx) + if err != nil { + log.Fatalf("running app: %v", err) + } + + os.Exit(0) +} + +func app(ctx context.Context,) error { + node := maelstrom.NewNode() + + srv, err := api.NewServer(node) + if err != nil { + return fmt.Errorf("making new server: %w", err) + } + err = srv.Run() + if err != nil { + return fmt.Errorf("running server: %w", err) + } + + return nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..397c83d --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module git.loyso.art/frx/flyio + +go 1.19 + +require github.com/jepsen-io/maelstrom/demo/go v0.0.0-20230321201811-151ad3cff117 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..676183b --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/jepsen-io/maelstrom/demo/go v0.0.0-20230321201811-151ad3cff117 h1:B7h6hYmW+bgtHKlHJnqW6bf8mGycFSLYvZAeKIkihnU= +github.com/jepsen-io/maelstrom/demo/go v0.0.0-20230321201811-151ad3cff117/go.mod h1:i6aVIs5AIOOaQF1lAisBm7DDeWM1Iopf+26UxjagsCU= diff --git a/internal/api/server.go b/internal/api/server.go new file mode 100644 index 0000000..f0c7c60 --- /dev/null +++ b/internal/api/server.go @@ -0,0 +1,161 @@ +package api + +import ( + "crypto/rand" + "encoding/binary" + "encoding/hex" + "encoding/json" + "fmt" + "sync" + "sync/atomic" + + maelstrom "github.com/jepsen-io/maelstrom/demo/go" +) + +func NewServer(node *maelstrom.Node) (*server, error) { + var prefix [2]byte + _, err := rand.Read(prefix[:]) + if err != nil { + return nil, fmt.Errorf("reading random data: %w", err) + } + + s := &server{ + node: node, + prefix: prefix, + } + + s.setup() + + return s, nil +} + +type server struct { + node *maelstrom.Node + + topology map[string][]string + + nextSeqId uint64 + prefix [2]byte + + storeMu sync.Mutex + store []int +} + +func (s *server) Run() error { + return s.node.Run() +} + +func (s *server) setup() { + s.node.Handle("generate", s.handleGenerate) + s.node.Handle("broadcast", genericHandler(s, s.handleBroadcast)) + s.node.Handle("read", genericHandler(s, s.handleBroadcastRead)) + s.node.Handle("topology", genericHandler(s, s.handleTopology)) +} + +type generateResponse struct { + maelstrom.MessageBody + ID string `json:"id"` +} + +func (s *server) handleGenerate(msg maelstrom.Message) error { + outbody := [10]byte{ + s.prefix[0], + s.prefix[1], + } + + id := atomic.AddUint64(&s.nextSeqId, 1) + + binary.BigEndian.PutUint64(outbody[2:], id) + out := generateResponse{ + ID: hex.EncodeToString(outbody[:]), + } + + out.Type = "generate_ok" + + return s.node.Reply(msg, out) +} + +type broadcastMessage struct { + Message int `json:"message"` +} + +type broadcastResponse struct { + maelstrom.MessageBody +} + +func (s *server) handleBroadcast(msg broadcastMessage) (broadcastResponse, error) { + value := msg.Message + + s.storeMu.Lock() + defer s.storeMu.Unlock() + + s.store = append(s.store, value) + + response := broadcastResponse{ + MessageBody: maelstrom.MessageBody{ + Type: "broadcast_ok", + }, + } + return response, nil +} + +type broadcastReadMessage struct{} + +type broadcastReadResponse struct { + maelstrom.MessageBody + + Messages []int `json:"messages"` +} + +func (s *server) handleBroadcastRead(msg broadcastReadMessage) (broadcastReadResponse, error) { + s.storeMu.Lock() + defer s.storeMu.Unlock() + + response := broadcastReadResponse{ + MessageBody: maelstrom.MessageBody{ + Type: "read_ok", + }, + Messages: make([]int, len(s.store)), + } + + copy(response.Messages, s.store) + + return response, nil +} + +type topologyMessage struct{ + Topology map[string][]string `json:"topology"` +} + +type topologyResponse struct{ + maelstrom.MessageBody +} + +func (s *server) handleTopology(msg topologyMessage) (topologyResponse, error) { + s.topology = msg.Topology + + response := topologyResponse{ + MessageBody: maelstrom.MessageBody{ + Type: "topology_ok", + }, + } + + return response, nil +} + +func genericHandler[T, U any](s *server, h func(msg T) (U, error)) maelstrom.HandlerFunc { + return func(msg maelstrom.Message) error { + var body T + err := json.Unmarshal(msg.Body, &body) + if err != nil { + return fmt.Errorf("unmarshalling body: %w", err) + } + + resp, err := h(body) + if err != nil { + return err + } + + return s.node.Reply(msg, resp) + } +}