Learn Event Streaming by Recreating Kafka 🔗
I don't know if I really like Kafka all that much
That said its an interesting way for applications to communicate. See I have been imagining a global game of life implementation with distributed realtime events for each cell in the world. While that is a rather far off dream it made sense to tackle one of its problems. In the past I have setup Kafka development envs with Nix this is all pretty easy. It even goes so far as to try and track how many shells are currently running to track shutting down Kafka when all is said and done. The only reason I do this is because Kafka is written in Java, famous for the addage, "write once use 80% of the resources everywhere." For better or worse that has always stunk for me.
Kafka can't be that complicated write?
Thats probably both true and false. The only way to tell would be to try. So while this might be structured like a tutorial its really a devlog of the failures to interpret the features.
Goals 🔗
To start we want to create a realtime streaming platform that provides at least:
- A binary protocol
- A protocol built on TCP
- TCP connection multiplexing for consumers and producers
- Event Stream and Log Sequence Merge storage
- Be filesystem oriented where possible
- Don't invent everything just whats needed
- GUI testing and debugging tools
- Protocol client for consumers and producers
Arch Diagram 🔗
Protocol Design 🔗
--- title: "Message" --- packet-beta 0-31: "Message Size UInt32" 32-63: "Message Type UInt32" 64-95: "Message Payload (Variable Length)"DevLog 🔗
25 12 2024 🔗
Having now deployed this to k8s through k0s on a local but remote server I have noticed there are throughput issues. While, on the same machine it is possible for a high velocity producer to continuously send events at raw go runtime speed while the server consumes them. But the server is a little more limited and we are finding that either due to disk access speed introduced by containerd or that systems slower architecture we can easily exceed the available threads and crash the app.
I have a couple of ideas of how to handle this:
- Right now I allocate and write to both a log file and pebbled db on each message received. I might be better batching writes and feeding them into pebble through a channel.
- The condition seems to be limited to producer events and possibly there is a bug related to how connections are closed. Possibly, they are not closing immediately on client close and are waiting 5 seconds, effectively backlogging.
22 12 2024 🔗
There has been some work setting up k0s and learning the toolchains involved. I also integrated a build pipeline using Nix. Allows this project to now produce a 40mb image that is easy to deploy to my local k0s. Intentionally, this k8s instance is on a remote machine so I have at least a small non-localhost network effect when testing. I refactored how event handlers are declared under a specific interface for handlers.
func (h *Handlers) ExecuteHandler(name string, ctx context.Context, contract interface{}) (context.Context, error)
func (h *Handlers) ExecuteWithWriteHandler(name string, ctx context.Context, contract interface{}, w io.Writer) (context.Context, error)
while I dont know the normality of creating functional interfaces in Golang, this felt more natural than constructing a type because I had some regular variance in types to controll write access.
That said a message handler did become a type
type Handlers struct {
s *Server
messageHandlers map[string]func(ctx context.Context, contract interface{}, writer io.Writer) (context.Context, error)
}
Handler registration is then
handlers := NewHandlers(s)
handlers.RegisterHandler(ConsumerRegistrationHandler, consumerRegistration)
handlers.RegisterHandler(ProducerRegistrationHandler, producerRegistration)
handlers.RegisterHandler(PollHandler, pollHandler)
handlers.RegisterHandler(MessageHandler, messageHandler)
And execution looks like
case ConsumerRegistration:
if ctx, err = h.ExecuteHandler(ConsumerRegistrationHandler, ctx, m); err != nil {
slog.Error("Error registering consumer", "Error", err)
cancel()
break
}
ctx = context.WithValue(ctx, "ConsumerGroup", m.ConsumerName)
The variance the aforementioned interface provided is related to if the handler will have access to the connection so it can write messages back to the client. I currently only have a single usecase which involves polling. Since this is based on a TCP connection we can infer ACK and appropriately handle those errors on the connection.
At this point I decided that a 5s context timeout might not account for long running connection and on each message publish we extend the timeout. Generally, my opinion is that if you are actively sending we should keep you alive and allow timely termination if you pause. One concern is that each time context timeout is extended we deepend the context object. I assume this causes it to increase in size. I need to do some research to assure if a connection was kept alive for days it wouldn't prove a memory leak.
05 11 2024 🔗
Consumer groups
Having implemented a polling mechanism it came to mind that I might have multiple concurrent consumers polling for messages. So I need to maintain a shared offset of the all the consumers registered in a consumer group. So I modified my consumer contract.
type ConsumerRegistration struct {
ApiContract
TopicName string `codec:"topic,string"`
ConsumerName string `codec:"consumer,string"`
Offset uint64 `codec:"offset,uint64"`
}
We now allow the consumer to name themselves and this allows us to allocate a new handle for each client reading allowing two clients to read from the same file at different offsets.
func declareConsumer(consumerName string, store *EventStore) (string, error) {
i := 0
for {
var name = consumerName + "_" + strconv.Itoa(i)
if exists := store.Get(name); exists == nil {
return name, nil
} else {
i++
}
if i > 1000 {
return "", errors.New("Too many consumers")
}
}
}
While a little hacky and providing an arbitrary limit of 1000 consumers per group per server but we generate a sequential consumername for our event store. This will find the first open gap in the list of 0-1000. I have wondered if I can have a range like coroutine that retains the global sequence but I wanted to ensure the list was not exhausted after 1000 but that only 1000 could exist concurrently.
11 09 2024 🔗
Addressing handshake I decided with some trepidation to use the go ctx object to hold state while a handler is looping. The sequence is a little something like this
sequenceDiagram Client->>+Server: Open TCP Conn Server-->Client: OK loop connection alt Communicates Client->>Server: Register Producer Server->>Client: ACK Client->>Server: Publish Event Server->>Client: ACK else Client->>Server: Disconnect end end Server->>-Server: CLOSE CONNECTIONDuring thisconnection loop we retain a context stack with a timeout of 5 seconds. So each time we connect to the krappy server we have to establish why we are there and that goroutine then waits until there is more data. Each message is handled by the same goroutine. The choice of using context was intentional but I could have also stored that data outside the context in the closure formed by the goroutine.
The same process happens with the consumer registration. I did want the connection to be a reusable as possible though so once producer is registered that connection could be reused to register a different producer. I don't know if there is a usecase for that but without more reasons to want to isolate a context I followed this process.
10 09 2024 🔗
The first real learning here was about how kafka deals with compacted topics. So this project is blind of the formal implementation so I looked for an algorithm that was designed to collapse a stream of events to its final value. I looked at a number of tree like patterns that would allow me to collect all the events out of sync and then be able to refer to only the latest but I stopped with the LSM (Log-Structued Merge) Wikipedia which I discovered was similar to the rocksdb implementation that kafka uses. I selected pebbleDB as it was based on the original LevelDB implementation I had used in a previous Erlang project. So turns out getting a compacted topic is pretty easy and as long as I can guarantee the write of the produced message I can guarantee that I can have a top value.
The two reasons I selected go for this project was to give me a strong toolchain for concurrency and speed. Golang has great libs for handling network connections and building servers but that still left me to understand how best to build something that could handle massive throughput. I will set my benchmarks running on k8s on an x86-64 linux env.
But this project has a lot to do with managing concurrent resources and mutex which golang does a pretty good job of too.
So the first release includes full stream retention and a compacted topic using leveldb in memory.
I created a couple of e2e tools that allow me to hammer the server both as a producer and consumer but understanding the lifecycle of a connection is my biggest challenge.
Whats ugly about this project is creating a API that requires multiple steps to complete an initial handshake.