Circular Buffer or Ring Buffer is an interesting data structure, that I honestly didn't have the chance or the need to use before. Until, I decided to connect to LMAX and start getting their feed in real time. It's a quite known problem in computer science literature, the thing that you learn in your first algorithm course, The Producer-Consumer problem. Using a semaphore or a blocking queue or concurrent queue.. will do the trick, but I wanted to have a fast producer and a fast consumer without blocking, controlling the memory size and reducing CPU usage.
Here I am searching the web, searching in my library if I missed any interesting data structure for my case, even s The Algorithm Design Manual
by Steven S. Skiena that I consider a must have, didn't find the answer. After sometimes a friend of mine told me about a presentation How to do 100k TPS less than 1ms Latency my surprise was is that the same people that I want to connect to their system already solved this problem. Great presentation, Martin Thompson talks about "Mechanical Harmony" and how they did achieve it in LMAX, one of the interesting point that I tried later for my case was the single, fixed-sized buffer. Looking at few implementation online, from the C++ wikipedia exemple to one academic implementation of Neal R. Wagner.
After few Unit test where I varied Producer-Consumer speed, I found that sometimes the consumer read empty values where it shouldn't. Looking again the dequeue method
Here I am searching the web, searching in my library if I missed any interesting data structure for my case, even s The Algorithm Design Manual
After few Unit test where I varied Producer-Consumer speed, I found that sometimes the consumer read empty values where it shouldn't. Looking again the dequeue method
public T dequeue() { if (!isEmpty()) { readCount++; size--; head = (head + 1) % maxSize; T result = queue[head]; queue[head] = null; return result; } else { return null; } }There is a chance that we read the head before writing it even if we did pass the condition that the queue is not empty. Therefore I added king of double check, but without the locking, see below.
import java.util.concurrent.atomic.AtomicInteger; public class CircularBuffer{ private final int maxSize; private volatile int head = 0; private volatile int readCount = 0; private volatile int writeCount = 0; private volatile int tail = 0; private final AtomicInteger size = new AtomicInteger(0); private final T[] queue; @SuppressWarnings("unchecked") public CircularBuffer(int size) { this.maxSize = size; this.queue = (T[]) new Object[this.maxSize]; } public int getReadCount() { return readCount; } public int getWriteCount() { return writeCount; } public int getSize() { return size.get(); } public T dequeue() { if (!isEmpty()) { int tmp = (head + 1) % maxSize; T result = queue[tmp]; if (result == null) { return null; } readCount++; size.decrementAndGet(); head = tmp; queue[head] = null; return result; } else { return null; } } @Override public String toString() { return "Write #" + writeCount + " Read #" + readCount + " Size #" + size.toString(); } public void enqueue(T element) { if (!isFull()) { writeCount++; size.incrementAndGet(); tail = (tail + 1) % maxSize; queue[tail] = element; } else { throw new RuntimeException(" Circular Buffer is Full"); } } public boolean isEmpty() { return size.get() == 0; } public boolean isFull() { return maxSize == size.get(); } public synchronized String printStatus(javascript:void(0)) { StringBuffer sb = new StringBuffer(); for (int i = 0; i < maxSize; i++) { if (queue[i] != null) sb.append("\nqueue[" + i + "]=" + queue[i]); } return sb.toString(); } }
No comments:
Post a Comment