Данная статья продолжает цикл моих переводов статей Jakob Jenkov об оптимизации Java приложений.
Кольцевой буфер - это массив, который используется в качестве очереди. Кольцевой буфер имеет позиции, которые отмечают следующую позицию для чтения и записи в кольцевой буфер. Когда позиция записи достигает конца массива, она возвращается к 0. То же самое верно и для чтения.
Установка позиции чтения и записи обратно на ноль, когда они достигают конца массива, также иногда называется "обтеканием". Именно такое поведение превращает массив в кольцевой буфер. Когда позиция чтения и записи достигает конца массива, они продолжаются с начала, как если бы массив был кольцом. Отсюда и название кольцевой буфер.
В этом руководстве будет объяснено, как работает кольцевой буфер, и показаны две реализации кольцевого буфера Java.
Как работает кольцевой буфер
Кольцевой буфер - это массив с фиксированным размером, содержащий элементы буфера, точно так же, как ограниченная очередь.
В дополнение к массиву с элементами, кольцевой буфер содержит позицию записи, которая указывает на позицию в массиве, в которую должен быть вставлен следующий элемент.
Кольцевой буфер также отслеживает позицию чтения - следующую позицию в массиве для чтения элемента.
Кольцевой буфер также отслеживает свободное и используемое пространство в массиве элементов. Когда позиция записи не вернулась в начало, сделав круг, используемое пространство находится между позицией записи и позицией чтения. Все остальное - свободное пространство. Приведенная ниже диаграмма иллюстрирует кольцевой буфер в этой ситуации:
После того как указатель записи сделал круг, ситуация иная. Внезапно свободное пространство - это пространство между позицией записи и позицией чтения. Все остальное - это используемое пространство. Вот диаграмма, иллюстрирующая тот же кольцевой буфер после прохода по кругу:
Существуют различные способы отслеживания позиции считывания (и, следовательно, используемого и свободного пространства) в кольцевом буфере. Позже в этом руководстве я объясню два различных способа реализации кольцевого буфера в Java.
Кольцевые буферы быстры
Кольцевые буферы - это быстрый способ реализации структур, похожих на очереди. Под "быстрыми" я подразумеваю как то, что они достаточно просты в реализации, так и то, что они работают довольно хорошо.
Использование кольцевых буферов
Кольцевые буферы полезны как для реальных очередей, таких как очередь сообщений, так и для создания данных, которые впоследствии потребляются потоковым способом.
Кольцевые буферы особенно полезны, когда вам нужна жесткая верхняя граница для того, сколько данных может находиться в кольцевом буфере (в очереди).
Если вам не нужна верхняя граница вашей структуры очереди, возможно, вам следует использовать связанный список или кольцевой буфер, который может изменять свой размер (выделять новый, больший массив, когда он заполнен, и перемещать все элементы в этот массив).
В этом руководстве я сосредоточусь только на реализациях ограниченного кольцевого буфера.
Реализации кольцевого буфера
Существует множество способов реализации кольцевых буферов. В этом руководстве по кольцевому буферу я покажу вам два самых простых метода реализации кольцевого буфера. Эти методы являются:
- Использовать коэффициент заполнения
- Использовать маркер переворота
Кольцевой буфер с коэффициентом заполнения
Один из способов отслеживать позицию записи, позицию чтения и количество элементов в кольцевом буфере - использовать переменные под них. Позиция записи отмечает следующую позицию в кольцевом буфере для записи элемента. Количество заполнений показывает, сколько элементов в данный момент хранится в буфере.
При записи элементов в кольцевой буфер ему просто нужно проверить заполнен он или нет. Если он не заполнен, элемент может быть вставлен в позицию записи, а позиция записи перемещена в следующий свободный слот.
Аналогично, при считывании элементов из кольцевого буфера ему просто нужно проверить количество заполнений, чтобы убедиться, что кольцевой буфер пуст. Позиция чтения вычисляется путем вычитания количества заполнений из позиции записи. Если результат этого вычисления отрицательный, позиция записи обошла массив вокруг, и вам нужно добавить размер буфера (массива) к позиции чтения, чтобы получить правильную позицию чтения.
Ниже приведена реализация кольцевого буфера, которая использует счетчик заполнения. Обратите внимание, что он явно не отслеживает позицию чтения, но вычисляет позицию чтения на основе позиции записи и количества заполнений. Обратите также внимание, что поле fill count публичное (не fillCount).
public class RingBufferFillCount {
public Object[] elements = null;
private int capacity = 0;
private int writePos = 0;
private int available = 0;
public RingBufferFillCount(int capacity) {
this.capacity = capacity;
this.elements = new Object[capacity];
}
public void reset() {
this.writePos = 0;
this.available = 0;
}
public int capacity() { return this.capacity; }
public int available(){ return this.available; }
public int remainingCapacity() {
return this.capacity - this.available;
}
public boolean put(Object element){
if(available < capacity){
if(writePos >= capacity){
writePos = 0;
}
elements[writePos] = element;
writePos++;
available++;
return true;
}
return false;
}
public Object take() {
if(available == 0){
return null;
}
int nextSlot = writePos - available;
if(nextSlot < 0){
nextSlot += capacity;
}
Object nextObj = elements[nextSlot];
available--;
return nextObj;
}
}
Кольцевой буфер с маркером переворота
Другой вариант отслеживания позиции чтения, позиции записи и количества элементов в буфере - это просто сохранить позицию чтения в дополнение к позиции записи.
Количество элементов в буфере вычисляется на основе позиции записи и чтения. То, как выглядит вычисление, зависит от того, была ли позиция записи перевернута или нет.
Если позиция записи не прошла круг, то можно просто вычесть позицию чтения из позиции записи, чтобы узнать, сколько элементов находится в буфере. Если позиция записи перевернулась, то доступное пространство равно емкости минус позиция чтения плюс позиция записи.
Чтобы отслеживать, была ли позиция записи перевернута или нет, используется специальный "маркер переворота". Вот откуда взялось название реализации. На самом деле, в большинстве случаев вы могли бы просто проверить, больше или меньше позиция записи, чем позиция чтения, чтобы определить, обернулась ли позиция записи. Но это не работает, когда позиция записи и позиция чтения равны (кольцевой буфер либо полностью заполнен, либо полностью пуст).
Ниже приведена реализация кольцевого буфера, который использует перевернутый маркер и позицию считывания.
public class RingBufferFlipMarker {
public Object[] elements = null;
public int capacity = 0;
public int writePos = 0;
public int readPos = 0;
public boolean flipped = false; //the flip marker
public RingBufferFlipMarker(int capacity) {
this.capacity = capacity;
this.elements = new Object[capacity];
}
public void reset() {
this.writePos = 0;
this.readPos = 0;
this.flipped = false;
}
public int available() {
if(!flipped){
return writePos - readPos;
}
return capacity - readPos + writePos;
}
public int remainingCapacity() {
if(!flipped){
return capacity - writePos;
}
return readPos - writePos;
}
public boolean put(Object element){
if(!flipped){
if(writePos == capacity){
writePos = 0;
flipped = true;
if(writePos < readPos){
elements[writePos++] = element;
return true;
} else {
return false;
}
} else {
elements[writePos++] = element;
return true;
}
} else {
if(writePos < readPos ){
elements[writePos++] = element;
return true;
} else {
return false;
}
}
}
public Object take() {
if(!flipped){
if(readPos < writePos){
return elements[readPos++];
} else {
return null;
}
} else {
if(readPos == capacity){
readPos = 0;
flipped = false;
if(readPos < writePos){
return elements[readPos++];
} else {
return null;
}
} else {
return elements[readPos++];
}
}
}
}
Скорость кольцевого буфера
Мои тесты показали, что кольцевой буфер, использующий счетчик заполнения, работает немного быстрее, чем кольцевой буфер, использующий флип-маркер. Но - разница настолько мала, что почти незначительна.
Пакетные режимы
Я реализовал операции пакетного режима put() и take() для обеих реализаций кольцевого буфера. Реализации с пакетными операциями перечислены далее в этом руководстве. Ввод и получение нескольких элементов одновременно происходит значительно быстрее, чем ввод и получение одного элемента за раз.
Мои тесты показали, что пакетные операции put() и take() обеспечивают до 4 раз большую пропускную способность при одновременном вводе и извлечении одного элемента. Точное количество зависит от размера партии. Меньшие пакеты обеспечивают более высокую пропускную способность, чем меньшие пакеты, поскольку на сжатые циклы копирования массива тратится больше времени.
Пакетные операции реализации кольцевого буфера с использованием маркера позиции чтения + переворота были примерно на 15% быстрее, чем пакетные операции кольцевого буфера с использованием подсчета заполнения.
Параллелизм
Ни одна из двух реализаций, показанных выше, не является потокобезопасной.
У меня сложилось впечатление, что реализацию с использованием позиции чтения и перевернутого маркера проще реализовать в потокобезопасной версии для случая single reader, single writer. Вариант с одним считывателем и одной записью означает, что только один поток помещает элементы в кольцевой буфер. И я имею в виду только один поток вообще, а не одновременно. Только один поток. Тот же принцип применим и к потоку записи. Только один и тот же поток записывает данные в кольцевой буфер. Однако поток чтения не обязательно должен совпадать с потоком записи.
Я еще не реализовал ни одну версию кольцевого буфера с одним считывателем, ни с одной записью, так что на самом деле я не знаю. Я обновлю эту статью, если когда-нибудь сделаю это.
Кольцевой буфер с подсчетом заполнения. Включая пакетные операции
Ниже реализация кольцевого буфера, который использует счетчик заполнения, включая пакетные операции put() и take().
public class RingBufferFillCount {
public Object[] elements = null;
public int capacity = 0;
public int writePos = 0;
public int available = 0;
public RingBufferFillCount(int capacity) {
this.capacity = capacity;
this.elements = new Object[capacity];
}
public void reset() {
this.writePos = 0;
this.available = 0;
}
public int remainingCapacity() {
return this.capacity - this.available;
}
public boolean put(Object element){
if(available < capacity){
if(writePos >= capacity){
writePos = 0;
}
elements[writePos] = element;
writePos++;
available++;
return true;
}
return false;
}
public int put(Object[] newElements){
return put(newElements, newElements.length);
}
public int put(Object[] newElements, int length){
int readPos = 0;
if(this.writePos > this.available){
if(length <= this.capacity - this.writePos){
for(; readPos < length; readPos++){
this.elements[this.writePos++] = newElements[readPos];
}
this.available += readPos;
return length;
} else {
int lastEmptyPos = writePos - available;
for(; this.writePos < this.capacity; this.writePos++){
this.elements[this.writePos] = newElements[readPos++];
}
this.writePos = 0;
int endPos = Math.min(length - readPos, capacity - available - readPos);
for(;this.writePos < endPos; this.writePos++){
this.elements[this.writePos] = newElements[readPos++];
}
this.available += readPos;
return readPos;
}
} else {
int endPos = this.capacity - this.available + this.writePos;
for(; this.writePos < endPos; this.writePos++){
this.elements[this.writePos] = newElements[readPos++];
}
this.available += readPos;
return readPos;
}
}
public Object take() {
if(available == 0){
return null;
}
int nextSlot = writePos - available;
if(nextSlot < 0){
nextSlot += capacity;
}
Object nextObj = elements[nextSlot];
available--;
return nextObj;
}
public int take(Object[] into){
return take(into, into.length);
}
public int take(Object[] into, int length){
int intoPos = 0;
if(available <= writePos){
int nextPos= writePos - available;
int endPos = nextPos + Math.min(available, length);
for(;nextPos < endPos; nextPos++){
into[intoPos++] = this.elements[nextPos];
}
this.available -= intoPos;
return intoPos;
} else {
int nextPos = writePos - available + capacity;
int leftInTop = capacity - nextPos;
if(length <= leftInTop){
//copy directly
for(; intoPos < length; intoPos++){
into[intoPos] = this.elements[nextPos++];
}
this.available -= length;
return length;
} else {
//copy top
for(; nextPos < capacity; nextPos++){
into[intoPos++] = this.elements[nextPos];
}
nextPos = 0;
int leftToCopy = length - intoPos;
int endPos = Math.min(writePos, leftToCopy);
for(;nextPos < endPos; nextPos++){
into[intoPos++] = this.elements[nextPos];
}
this.available -= intoPos;
return intoPos;
}
}
}
}
Кольцевой буфер с использованием маркера переворота. Включая пакетные операции
Ниже реализация кольцевого буфера, который использует позицию чтения и маркер переворота, включая пакетные операции put() и take().
public class RingBufferFlip {
public Object[] elements = null;
public int capacity = 0;
public int writePos = 0;
public int readPos = 0;
public boolean flipped = false;
public RingBufferFlip(int capacity) {
this.capacity = capacity;
this.elements = new Object[capacity];
}
public void reset() {
this.writePos = 0;
this.readPos = 0;
this.flipped = false;
}
public int available() {
if(!flipped){
return writePos - readPos;
}
return capacity - readPos + writePos;
}
public int remainingCapacity() {
if(!flipped){
return capacity - writePos;
}
return readPos - writePos;
}
public boolean put(Object element){
if(!flipped){
if(writePos == capacity){
writePos = 0;
flipped = true;
if(writePos < readPos){
elements[writePos++] = element;
return true;
} else {
return false;
}
} else {
elements[writePos++] = element;
return true;
}
} else {
if(writePos < readPos ){
elements[writePos++] = element;
return true;
} else {
return false;
}
}
}
public int put(Object[] newElements, int length){
int newElementsReadPos = 0;
if(!flipped){
//readPos lower than writePos - free sections are:
//1) from writePos to capacity
//2) from 0 to readPos
if(length <= capacity - writePos){
//new elements fit into top of elements array - copy directly
for(; newElementsReadPos < length; newElementsReadPos++){
this.elements[this.writePos++] = newElements[newElementsReadPos];
}
return newElementsReadPos;
} else {
//new elements must be divided between top and bottom of elements array
//writing to top
for(;this.writePos < capacity; this.writePos++){
this.elements[this.writePos] = newElements[newElementsReadPos++];
}
//writing to bottom
this.writePos = 0;
this.flipped = true;
int endPos = Math.min(this.readPos, length - newElementsReadPos);
for(; this.writePos < endPos; this.writePos++){
this.elements[writePos] = newElements[newElementsReadPos++];
}
return newElementsReadPos;
}
} else {
//readPos higher than writePos - free sections are:
//1) from writePos to readPos
int endPos = Math.min(this.readPos, this.writePos + length);
for(; this.writePos < endPos; this.writePos++){
this.elements[this.writePos] = newElements[newElementsReadPos++];
}
return newElementsReadPos;
}
}
public Object take() {
if(!flipped){
if(readPos < writePos){
return elements[readPos++];
} else {
return null;
}
} else {
if(readPos == capacity){
readPos = 0;
flipped = false;
if(readPos < writePos){
return elements[readPos++];
} else {
return null;
}
} else {
return elements[readPos++];
}
}
}
public int take(Object[] into, int length){
int intoWritePos = 0;
if(!flipped){
//writePos higher than readPos - available section is writePos - readPos
int endPos = Math.min(this.writePos, this.readPos + length);
for(; this.readPos < endPos; this.readPos++){
into[intoWritePos++] = this.elements[this.readPos];
}
return intoWritePos;
} else {
//readPos higher than writePos - available sections are
//top + bottom of elements array
if(length <= capacity - readPos){
//length is lower than the elements available at the top
//of the elements array - copy directly
for(; intoWritePos < length; intoWritePos++){
into[intoWritePos] = this.elements[this.readPos++];
}
return intoWritePos;
} else {
//length is higher than elements available at the top of the elements array
//split copy into a copy from both top and bottom of elements array.
//copy from top
for(; this.readPos < capacity; this.readPos++){
into[intoWritePos++] = this.elements[this.readPos];
}
//copy from bottom
this.readPos = 0;
this.flipped = false;
int endPos = Math.min(this.writePos, length - intoWritePos);
for(; this.readPos < endPos; this.readPos++){
into[intoWritePos++] = this.elements[this.readPos];
}
return intoWritePos;
}
}
}
}
#code #java #performance #ring buffer