Перейти к содержанию
    

Я сделал такую реализацию.

typedef enum CycBuffState {cbsOk = 0, cbsOverwrite = 1, cbsBuffFull = 2, cbsBuffEmpty = 3, cbsError = 4} enCycBuffState;

typedef struct
{
uint32_t MaxBufSize,
HeadIdx,
TailIdx;
       enCycBuffState State;
uint8_t *Buffer;
} stRingBuffer;

void RingBuf_Init (stRingBuffer *ring_buf, uint8_t *buf, uint32_t max_size)
{
 ring_buf->Buffer = buf;
 ring_buf->MaxBufSize = max_size;
 ring_buf->HeadIdx = ring_buf->TailIdx = 0;
 ring_buf->State = cbsBuffEmpty;
}

enCycBuffState RingBuf_PushByte(stRingBuffer *ring_buf, uint8_t val, uint8_t allow_overwrite)
{
 if ((ring_buf->HeadIdx+1) == ring_buf->TailIdx)
 {
   if (!allow_overwrite)
     return cbsBuffFull;
 }

 ring_buf->Buffer[ring_buf->HeadIdx] = val;
 ring_buf->HeadIdx++;

 if (ring_buf->HeadIdx >= ring_buf->MaxBufSize)
   ring_buf->HeadIdx = 0;

 return cbsOk;
}

enCycBuffState RingBuf_PushBytes(stRingBuffer *ring_buf, uint8_t len, uint8_t *buf, uint8_t allow_overwrite)
{
 uint32_t free_space;

 if (ring_buf->HeadIdx == ring_buf->TailIdx)
 {
   free_space = ring_buf->MaxBufSize;
 }
 else
 {
   if (ring_buf->HeadIdx >= ring_buf->TailIdx)
      free_space = ring_buf->HeadIdx - ring_buf->TailIdx;
    else
      free_space = (ring_buf->MaxBufSize - ring_buf->TailIdx) + ring_buf->HeadIdx;

   if (free_space < len)
     return cbsBuffFull;
 }
 while (len)
 {
    RingBuf_PushByte(ring_buf, *buf, allow_overwrite);
    buf++;
    len--;
 }

 return cbsOk;
}

enCycBuffState RingBuf_PopByte(stRingBuffer *ring_buf, uint8_t *val)
{
 if (ring_buf->HeadIdx == ring_buf->TailIdx)
   return cbsError;

 *val = ring_buf->Buffer[ring_buf->TailIdx];
 ring_buf->Buffer[ring_buf->TailIdx] = 0;
 ring_buf->TailIdx++;

 if (ring_buf->TailIdx >= ring_buf->MaxBufSize)
   ring_buf->TailIdx = 0;

 return cbsOk;
}

enCycBuffState RingBuf_PopBytes(stRingBuffer *ring_buf, uint8_t len, uint8_t *val)
{
 while (len)
 {
   RingBuf_PopByte(ring_buf, val);
   val++;
   len--;
 }

 return cbsOk;
}

 

И потом проверяю (в IAR, в симуляторе)

#include <string.h>
#include "ringbuf.h"

stRingBuffer ring_buf;

uint8_t *temp_buff;
uint8_t *test_buff = "Hello World";
uint8_t *read_buff;

int main()
{
  RingBuf_Init (&ring_buf, temp_buff, 32);
  while (1)
  {
     RingBuf_PushBytes(&ring_buf, strlen(test_buff), test_buff, 0);
     
     RingBuf_PopBytes(&ring_buf, strlen(test_buff), read_buff);
  }
}

Но несмотря на то что я отслеживаю наличие свободного места.

uint32_t free_space;
  
  if (ring_buf->HeadIdx == ring_buf->TailIdx)
  {
    free_space = ring_buf->MaxBufSize;
  }
  else
  {
    if (ring_buf->HeadIdx >= ring_buf->TailIdx)
       free_space = ring_buf->HeadIdx - ring_buf->TailIdx;
     else
       free_space = (ring_buf->MaxBufSize - ring_buf->TailIdx) + ring_buf->HeadIdx;
  
    if (free_space < len)
      return cbsBuffFull;

В отладчике я вижу перезапись буфера. Что я упустил?

Изменено пользователем Jenya7

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

Много чего, там и переполнение и... проще показать. Вот вам пример снизу, немного адаптировал из другой программы. Самая высокоуровневая функция - это Write.

 

uint32_t BufferFreeSize()
{
if(RdIdx > WrIdx)
	return RdIdx - WrIdx;
else
	return (BUFFER_SIZE - WrIdx) + RdIdx;
}

bool CheckOverEndIdx(uint32_t& pIdx)
{
if(pIdx > BUFFER_SIZE)
{
	pIdx -= (BUFFER_SIZE - 1);

	return true;
}
else
{
	return false;
}
}

void PushBytes_Internal(const uint8_t* const pDataBlock, const uint16_t pBlockSize)
{
uint16_t bw = 0;// количество записанных байт
uint32_t TempWrIdx = WrIdx;// берём текущее значение индекса записи
Service::RetVal rv;

do
{
	uint16_t btw;// количество байт, которые необходимо записать

	CheckOverEndIdx(TempWrIdx);
	if((TempWrIdx + pBlockSize - bw) > BUFFER_SIZE)
		btw = BUFFER_SIZE - TempWrIdx + 1;
	else
		btw = pBlockSize - bw;

	for(uint32_t idx = TempWrIdx, idx_end = idx + btw; idx < idx_end; idx++) BufferData[idx] = pDataBlock[bw++];

	TempWrIdx += btw;
	CheckOverEndIdx(TempWrIdx);
} while(bw < pBlockSize);

WrIdx = TempWrIdx;
}

bool Write(const uint8_t* const pDataBlock, const uint16_t pBlockSize)
{
if(pBlockSize > BUFFER_SIZE) return false;

if(BufferFreeSize() < (pBlockSize + 1))
{
	RdIdx += pBlockSize;
	CheckOverEndIdx(RdIdx);
}

// записываем блок данных
PushBytes_Internal(WrIdx, mNPQPack, pBlockSize, &WrIdx);

return false;
}

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

Много чего, там и переполнение и... проще показать. Вот вам пример снизу, немного адаптировал из другой программы. Самая высокоуровневая функция - это Write.

Но вы все таки разрешаете запись, освобождая место. А я думаю блокировать запись пока данные не будут считаны.

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

uint8_t *temp_buff;
uint8_t *read_buff;

Ну и куда они в результате смотрят?

Сделайте так -

uint8_t temp_buff[32];
uint8_t read_buff[32];

 

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

uint8_t *temp_buff;
uint8_t *read_buff;

Ну и куда они в результате смотрят?

Сделайте так -

uint8_t temp_buff[32];
uint8_t read_buff[32];

да. спасибо. указатели нужно конечно инициализировать.

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

Но вы все таки разрешаете запись, освобождая место. А я думаю блокировать запись пока данные не будут считаны.

 

if(BufferFreeSize() < (pBlockSize + 1))

{

RdIdx += pBlockSize;

CheckOverEndIdx(RdIdx);

}

 

заменить на

 

if(BufferFreeSize() < (pBlockSize + 1)) return;

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

Чет мудреные какие-то реализации. Так не проще?

template  <class T> 
class RingBuf
{
public:
RingBuf (unsigned int size) : buf(new T[size]), head(0), sz_contains(0), SZ_MAX(size){};	
~RingBuf() {
	delete [] buf;
}
bool Pop (T &val)
{
	if (sz_contains == 0)
		return false;
	else
	{
		val = buf[head++]; head %= SZ_MAX; sz_contains--;
		return true;
	}		
}
bool Push (T val)
{
	if (!GetFree())
		return false;
	else
	{			
		buf[(head + sz_contains++) % SZ_MAX] = val;			
		return true;
	}	
}
bool Push (T val[] , unsigned int count)
{
	if (count > GetFree())
		return false;
	for (unsigned int i = 0; i < count; i++)		
		Push(val[i]);				
	return true;
}
void Clear() {sz_contains = 0;}
unsigned int GetFree () {return SZ_MAX - sz_contains;}

private:
unsigned int sz_contains;
const unsigned int SZ_MAX;
T *buf;
unsigned int head;	
};

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

Чет мудреные какие-то реализации. Так не проще?

Нет проверки влезет ли записываемый кусок в свободное место.

 

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

Нет проверки влезет ли записываемый кусок в свободное место.

У меня поэлементый метод только накидал, счас блок сделаем

Вот

	bool Push (T val[] , unsigned int count)
{
	if (count > GetFree())
		return false;
	for (unsigned int i = 0; i < count; i++)
	{
		Push(val[i]);		
	}
	return true;
}

Использование тривиально

	const int SZ = 4;
RingBuf<int> r(SZ);
int ar[] = {1,2,3};
bool mres = r.Push(ar, sizeof(ar)/sizeof(ar[0])); // несколько элементов	
printf ("push %s  \n", mres ? "OK  ": "FULL  ");
       r.Push(val); // один элемент

Pop для нескольких элементов - аналогично

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

buf[(head + sz_contains++) % SZ_MAX] = val;

угроза переполнения "sz_contains".

Откуда ? После 2^32 push??

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

После 2^32 push?

я так на волне паранойи и подумал. Но при внимательном рассмотрении - SZ_MAX имеет такой же тип, что и sz_contains.

 

 

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

Хм. Зачем все так сложно мудрить, если есть уже давно простейшая рабочая реализация прописанная в букварях по программированию, остается ее только портировать на свой язык.

При чем тут еще от задачи зависит. Если это SPSC, тогда вообще все просто и без блокировок. Если MPSC/MPMC/SPMC - нужны уже блокировки.

Приведу простейший пример SPSC, написан был мной лет 8 назад(то есть просто реализован на ц++ алгоритм из букваря) и очень часто используется по сей день везде(драйверы, очередя сообщений, итд итп)

template<class T, int Size> class Fifo_spsc{
public:
static_assert((Size&(Size-1)) == 0, "Size must be power of 2");
Fifo_spsc(){
	rx = 0;
	wx = 0;
}

// Producer
bool push(const T &v){
	uint32_t w = (wx+1)&(Size-1);
	if(w==rx)return false; //ERR_FIFO_FULL;
	data[wx] = v;
	__DMB(); // just a memory barrier - ensure data has been completely stored
	wx = w; // make item available for reading
	return true;
}

// Consumer
bool pop(T *v){
	uint32_t r = rx;
	if(r==wx)return false; //ERR_FIFO_EMPTY;
	*v = data[r];
	__DMB(); // ensure data has been completely loaded
	rx = (r+1)&(Size-1); // make item available for writing
	return true;
}

// Consumer
void flush(){
	__DMB();
	rx = wx;
}

protected:
uint16_t rx, wx;
T data[size];
};

Памяти занимает - собственно сами данные + 2 16-битных слова.

Хоть и c++, что позволяет использовать этот буфер для любых типов - как простого uint_8t, так и для сложных обьектов. Это базовая реализация для практически всех других более сложных очередей.

Один элемент всегда пустой - это делает код гораздо проще и быстрее, и как ни странно зачастую - меньше требовательным к памяти :)

 

Также, если у нас строго один писатель и один читатель, то можно сделать и чтение не по одному элементу, а сразу массивом.

// Consumer, use with caution
const T* getConsecutiveData_ptr()const{
	if(rx==wx)return 0; // empty
	return &data[rx];
}

int getConsecutiveData_len()const{
	if(wx>rx)return wx-rx;
	else return Size-rx;
}

void popConsecutiveData(unsigned n){
	__DMB();
	rx = (rx+n)&(Size-1);
}

Важно понимать, что связь между producer и consumer должна быть только через функции этого буфера, без каких либо других переменных, вызовов функций итд в обход этого FIFO. Иначе однозначно рано или поздно схватите гонку.

То есть он подходит, например когда producer - это прерывание, а consumer - вечный цикл. Что для большинства задач достаточно.

 

Если у нас еvent-driven-движок (то есть без вечных циклов, все на событиях) - тогда нужна более сложная реализация. Но использовать нужно опять же только ее, без всяких сторонних связей.

Если интересно - расскажу и покажу как, я на этих циклических буферах(в том числе с обьектами не фиксированного размера) уже собаку сьел за много лет, и в своих программах использую только их - как самый простой, эффективный и безопасный интерфейс для связи между обьектами.

 

Но это все C++, зато позволяет эффективно и безопасно работать и писать очень простой код.

Например. Прерывание uart

void U0RxCbk::readyRead(LPC_UART_TypeDef *uart){ // interrupt
    while(Uart::isCharAvailable(uart)){
        bool r = rxfifo.push(Uart::read_char(uart));
        if(!r){
            printf("U0RxCbk::readyRead(): rxfifo full!\n");
        }
    }
}

Приемник - просто функция, которая будет вызвана, когда на это появится время(в порядке приоритета и очереди).

Которая в свою очередь принимает текст NMEA и дальше парсит его.

void Gps::start_receiver(){
    // Устанавливаем обработчик события
    rxfifo.set_onReadyRead([this](){ // собственно сам обработчик
        uint8_t ch;
        while(true){
            bool r = rxfifo.pop(&ch); // читаем из очереди
            parse(ch); // парсим
            if(!r)break; // если очередь стала пустой - выходим
        }
    }, GPS_TASK_PRIORITY); // приоритет обработчика
}

 

Это более старый вариант, привел, чтобы понять как оно работает, скажем так, внутри.

Сейчас я пользуюсь в основном таким:

class GpsReceiver : public UmFifoSingleReceiver<uint8_t, GpsReceiver, GPS_TASK_PRIORITY>{
protected:
    void onItemReceived(uint8_t ch){
        parse(ch);
    }
};

То есть вся логика чтения из буфера реализована один раз отдельно и используется повторно сколько угодно раз.

Так же есть аналогичные реализации, когда приемник может обрабатывать не один элемент - а массив из нескольких(например отправка через DMA), физически - подряд лежащих в нашем ring-buffere.

Overhed-a нет, наоборот такая реализация на шаблонах работает быстрее, чем обычные сишные :) Большинство работы делает компилятор, там даже вызова функции не будет - компилятор функцию parse встроит в недра самой очереди.

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

самое первое (а там и дальше так)

uint32_t w = (wx+1)&(Size-1);

это не кольцевой буфер, а обычный.

 

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

Присоединяйтесь к обсуждению

Вы можете написать сейчас и зарегистрироваться позже. Если у вас есть аккаунт, авторизуйтесь, чтобы опубликовать от имени своего аккаунта.

Гость
Ответить в этой теме...

×   Вставлено с форматированием.   Вставить как обычный текст

  Разрешено использовать не более 75 эмодзи.

×   Ваша ссылка была автоматически встроена.   Отображать как обычную ссылку

×   Ваш предыдущий контент был восстановлен.   Очистить редактор

×   Вы не можете вставлять изображения напрямую. Загружайте или вставляйте изображения по ссылке.

×
×
  • Создать...