There are lots of questions that pops up my mind regarding any message broker. One of which is what happens to the message once it is consumed.
Here is where the offset management comes in. Actually, kafka tracks the position of the offset. When we disable auto-commit then we have to commit the message manually. By committing the offset, the consumer acknowledges that it has successfully processed the message. This ensures that the consumer will not reprocess the same message upon restart.
But I had to reprocess the data to show the statistics in a rotational manner. I implemented the following.
Initial Setup:
- The consumer is set up with
auto_offset_reset="latest"
to start from the latest message. - The initial seek operation to the last 5 minutes is done within the
seek_to_latest_five_minutes
method.
Looping Logic:
- The
consume_messages
method regularly checks if the timeout period (timeout_seconds
) has passed. - If the timeout is reached, the consumer seeks to the offset corresponding to 5 minutes ago.
- The consumer then processes messages within the loop.
Message Filtering:
- Messages older than 5 minutes are filtered out using the
is_last_five_minutes
method. - If an older message is encountered, the processing loop breaks, ensuring that only recent messages are processed.
Re-seeking:
- The
seek_to_latest_five_minutes
method is called to adjust the offset to the last 5 minutes’ worth of messages.
Here is a video of how its rotates.
This is how we ensure the consumer only processes messages produced in the last 5 minutes and keeps looping through those messages without starting from the beginning, you need to continually adjust the offset to the messages within the last 5 minutes.
If no new messages are produced within the last five minutes, the consumer may appear to hang until new messages arrive. But I wanted to show the messages even the producer is not producing so that I had to use cache system.
There are lots of things that are involved here.
- Aggregating what you call are symbols in finance literature for the stock exchanges
- Used UK and US stock exchanges data
- Left finnhub for the fact that storing historic data for last year was a premium feature
- Used an incredible library of yahoo called yfinance
- This provides data in dataframe format and I transformed it to for the kafka format
- Tweaked producer and introduced recycling of the threads to achieve more efficiency
- Fetching last 5 minutes data in the consumer and discarding older ones
- Also configure if a timeout happens consumer will reprocess existing data and if producer is not producing anymore would do the same, reprocess existing data (this part was time consuming)
What else?
I am still skeptical of the log processing part although it is picking up from the last commited message and not going to the very beginning but still.
And the cache system should be more robust to reprocess the messages when producer is not producing or the last partition is reached. The logic needs to be revised for this part. May be today is not the day to achieve this. who can fight with pre-written destiny ๐
Time will teach me what I want to achieve. Next step is to find a system in the consumer service to integrate the stored historic data with the realtime one so that trading algorithm could be implemented.
Should I integrate a redis cache to store the lastest price value and compare with historic data to process the prior to give buy or sell or profit or loss decision?
How do I keep the latest data. I think i could implement the same in the consumer handler and give a delay so that kafka fetched with a delay of 5 minute and results would be displayed in the UI for the user to make a decision.
This kafka had me preoccupied so that I just realized to keep the records of only one exchange of london and one from usa. I have to grab the statistics for already 10 symbols for better decision. In the real life this is going to be more difficult as one needs to fetch data from multiple sources.
Again I would stop for today, will pick up this work later. Its time to enjoy biriyani ๐