Перейти к содержанию
    

Кольцевая очередь: мешать ли все в кучу?

Приветствую!

Из названия, как обычно, нифига не понятно; сейчас объясню.

При разработке ПО под МК или другие процессорные девайсы я часто пользуюсь очередьми. Обычные программные FIFO без наворотов и изысков. Все, что медленное (оносительно скорости процессора) - летит через очередь. Например, принты по отладочному UART летят через очередь, всякие транзакции между SPI-часами тоже через нее, чаще всего, прокладываю (конечно же, очереди разные). Короче, там где можно - очередь. Получается некая эмуляция доступа к букету низкоскоростных приблуд без бесполезных фризов процессора на ожидание флагов периферии и т.д. Всякие протоколы обменов у меня, в общем-то, тоже через очереди - по сути, из приемной выгребаю входные сообщения, в передающую кладу то, что надо передать. Вот я и подвел к сути проблемы (да и не проблема это, а всего лишь желание узнать, кто как делает). Допустим, есть протокол SLIP, который пакует пользовательские данные в фреймы и гоняет по UART-проводам. API драйвера предоставляет некую функцию SendMsg(void *data, u32 len), и когда мы хотим отправить N байт сообщения, мы вызываем SendMsg(msg, N). SLIP не накладывает никаких ограничений на длину фреймов, поэтому самый гибкий вариант - уметь передавать фреймы любой длины. Внутри SendMsg() должна подготовить некий дескриптор-заголовок, опираясь на длину сообщения N, записать его в очередь, а затем записать N байт данных в очередь, и при необходимости дать отмашку для запуска UART. Процесс, который непосредственно выгребает данные из очереди (пусть это будет прерывание по завершению передачи байта по UART), должен уметь отделять мух от котлет дескрипторы от данных, чтобы формировать символы-ограничители кадров (обычно я формирую SLIP-фрейм налету, без промежуточного буфера). Т.е. в SendMsg() проверяется, вместится ли в очередь передачи дескриптор (длина сообщения) + количество байт, указанное в дескрипторе: если уместится, то в нее пишется этот дескриптор, а затем - данные. Процесс, выгребающий эти данные из очереди, должен неинтрузивно определить, есть ли данные в очереди: если есть - вычитать дескриптор, по нему определить, сколько данных надо отправить, и если размер занятого количества данных в этой очереди больше или равен этому "сколько", начать отправку (иначе - это ситуация, когда SendMsg() только заполнил дескриптор, но не успел дописать все данные, в этом нет ничего страшного: прерывание UART можно отключить, а SendMsg() все равно запустит процесс отправки по завершении записи в очередь). Логика линейна, от гонок избавлена, все безопасно, но это лишь мне так кажется. Другой вариант - дескрипторы в отдельную очередь класть, данные - в отдельную. Это, КМК, безопаснее с точки зрения целостности очереди. Имея дескриптор определенного размера, мы никогда не получим шанса сдвига его положения в очереди дескрипторов. Т.е. вопрос больше даже религиозный - лить в одну очередь или в разные - кто как делает?

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

Делаю так:
Есть полезные данные и их длина.
1. Добавляем служебные данные (адрес, контрольная сумма и пр.)
2. Проходим процедурой формирования байтстаффинга и скидываем все это в очередь
3. включаем передачу и в прерывании UART оно само отправляет данные, пока в очереди что-то есть

 


 

 

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

13 минут назад, iamnot сказал:

Делаю так...

Понял, подход в целом немного другой, просто я так не делаю: представьте обратную задачу - по UART принимается SLIP-поток (почти непрерывно) различных фреймов. Как их собирать в сообщения, когда, а главное - кто тот процесс, который этим занимается? У меня, например, тут же в прерывании по приему байта по UART идет "распаковка" протокола. При обнаружении начала очередного кадра резервирую место в очереди под дескриптор, и зануляю его. Вытаскиваю из потока байты, при необходимости конвертирую согласно фреймингу, пихаю в очередь. Как только обнаружил конец фрейма - заполняю ранее подготовленное место под дескриптор. Как итог - имею очередь из последовательностей "дескриптор-сообщение (соответсвующее этому дескриптору)". С точки зрения архитектуры ПО, ИМХО, проще и логичнее обрабатывать.

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

42 минуты назад, Arlleex сказал:

Процесс, выгребающий эти данные из очереди, должен неинтрузивно определить, есть ли данные в очереди: если есть - вычитать дескриптор, по нему определить, сколько данных надо отправить, и если размер занятого количества данных в этой очереди больше или равен этому "сколько", начать отправку

Вроде как стандартный путь записи в очередь (корректной с точки зрения доступа из разных процессов/ISR): сперва записать данные полностью и только потом - передвинуть указатель записи. Какой смысл двигать указатель заранее?

И 2-й вопрос: А почему передающий процесс у Вас не может сразу начать передачу SLIP-кадра, не дожидаясь его полной записи в очередь? Ведь на канальном уровне не modbus, а SLIP, который не имеет привязок к времянкам. А значит пишущий процесс может писать данные, продвигая указатель по мере записи, а читающий - вычитывать из очереди, по мере появления там данных, кодировать на лету и передавать в UART.

18 минут назад, Arlleex сказал:

При обнаружении начала очередного кадра резервирую место в очереди под дескриптор, и зануляю его. Вытаскиваю из потока байты, при необходимости конвертирую согласно фреймингу, пихаю в очередь. Как только обнаружил конец фрейма - заполняю ранее подготовленное место под дескриптор.

Занулять зачем? У Вас читающий процесс может прочитать сообщение до того, как оно будет полностью записано в очередь?

Имхо - не лучшее решение. Я в таких случаях (когда один процесс должен писать в очередь, но при этом, до некоторого времени, читающий процесс не должен иметь возможности читать из неё (потому что например - пишущий процесс может передумать и удалить записанное сообщение из очереди)) использую кольцевые буфера с несколькими указателями записи (общее число всех указателей >2). В данном случае можно: "указатель записи", "указатель готовых записанных данных", "указатель чтения". Пишущий процесс, по мере записи данных кадра, продвигает указатель записи, но не трогает "указатель готовых записанных данных". И только когда пишущий процесс решит, что кадр принят полностью, валиден (проверка CRC), он приравнивает "указатель готовых записанных данных" = "указатель записи". Если же он решит, что принятый кадр плохой (например - не сошлась CRC, что выяснилось только к последнему принятому байту кадра), и не стоит его передавать дальше, то он наоборот делает: "указатель записи" = "указатель готовых записанных данных". А читающий процесс берёт данные между "указатель готовых записанных данных" и "указателем чтения". Тут нет лишнего пинания читающего процесса, и он не читает очередь из области ещё не готового кадра.

 

PS: Иногда использовал и большее число указателей (например - для синхронизации обработки данных очереди разными ISR и ядрами CPU). Очень удобно.

PPS: И да - я тоже SLIP кодировал/декодировал "на лету" как Вы. Прямо в ISR.   :wink:

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

1 час назад, jcxz сказал:

Какой смысл двигать указатель заранее?

Я наверное не совсем правильно описал: у меня один читатель, один писатель. Пишу из потока, читаю из прерывания (чтобы потом отправить вычитанный байт, например). Указатель да, передвигается после записи элементов в очередь. Но чтобы в таком случае записать дескриптор + данные, нужно делать промежуточный буфер, в котором сначала поместить дескриптор, потом скопировать данные, переданные в эту функцию по указателю, чего в одном моем проекте сделать не получится в силу ограниченного ОЗУ. Поэтому функция SendMsg() последовательными записями добавляет в очередь дескриптор, а затем сами данные

u32 SendMsg(u8 msg[], u32 len)
{
  u32 reterr = 0;
  if(len > 0 && DSCTxQ.GetFree() >= sizeof(len) + len)
    DSCTxQ.Write((RBTYPE *)&len, sizeof(len)),         // запись дескриптора (длина полезных данных)
    DSCTxQ.Write(msg, len), DSCUARTTxIrqEn();          // запись полезных данных, активация отправки
  else reterr = 1;
  return reterr;
}

Либо городить еще и реализацию Write(), аргументами которой будет несколько пар "msg-len", чего не хочется (я файлик с реализацией FIFO вообще трогать не хочу:smile:).

Цитата

А почему передающий процесс у Вас не может сразу начать передачу SLIP-кадра, не дожидаясь его полной записи в очередь?

На самом деле все несколько не так... Если очередь была пуста (механизм вычитки из нее и отправки по UART еще не запущен), то вызов SendMsg() добавит текущее сообщение в очередь на отправку и активирует прерывание. Прерывание будет постепенно вычитывать из очереди, формировать все эти обрамления фрейма, перекодировку экскейпов и т.д., но ведь в это время я могу напихивать в очередь еще порций дескрипторов + данных. В этом случае, когда прерывание закончит работу над первым сообщением, увидит в очереди следующий дескриптор + следующее сообщение и начнет обрабатывать его. Т.е. цепочечная обработка вырисовывается. В прерывании предусмотрен случай, когда данные нужно отправить, а в очереди их пока еще нет - т.к. поток не успел до конца дописать передаваемое сообщение, например. Просто выключается прерывание по передаче, а как только в очередь допишутся все остальные данные, SendMsg() активирует это прерывание снова, в котором механизм вычитыки продолжится с места, где остановился прошлый раз.

Цитата

Занулять зачем? У Вас читающий процесс может прочитать сообщение до того, как оно будет полностью записано в очередь?

Тут все несколько интереснее: на передачу занулять не нужно, т.к. я в прикладном коде всегда знаю, сколько мне надо отправить, поэтому дескриптор заполняется вначале и однозначно (см. SendMsg()). А вот прием... Мы же априори не знаем, какой длины сообщение мы ждем: я пихаю байты в очередь по мере поступления их в UART, т.е. где-то в приемном ISR по UART у меня вызвается метод приемной очереди Read(rxbyte, 1) (1 байт). Со стороны приложения (если очень часто мониторить занятость очереди) можно увидеть постепенное ее заполнение. Так вот в данном случае зануление дескриптора даст потоку (который выгребает уже готовенькие сообщения для прикладного ПО) понять, что сообщение в очереди пока что не полностью пришло. Ведь иначе точно также понадобится промежуточный буфер под хранение сообщения (чтобы разом потом записать и дескриптор, и данные). Поэтому у меня процесс вычитыки и обработки сообщений построен периодическим мониторингом кол-ва занятого места в приемной очереди. Как только я вижу, что в ней заполнилось места как минимум на дескриптор (4 байта у меня), я его (этот дескриптор) неинтрузивно вычитываю (там будет 0, пока сообщение не придет полностью (то самое зануление работает именно для этого)). Как только в этом дескрипторе появится что-то, отличное от 0 (ну и меньше максимально допустимого неким define-ом), это и есть len принятого сообщения. Теперь можно полноценно (с перемещением указателей) выкинуть из очереди дескриптор и вычитать данные - это и будет сообщение для прикладного ПО. Вот пример, как я это делаю

Скрытый текст

// поток ОС
static void ExchTask(void)
{
  exch_Init();
  
  while(1)
  {
    if(exch_WaitMsg(10)) // периодический мониторинг почтового ящика
      exch_ParsMsg();    // если че-то есть - парсим
  }
}

// мониторинг почтового ящика
u32 exch_WaitMsg(u32 tim)
{
  u32 len = 0;
  if(ExchRxQ.GetBusy() > sizeof(len))
    ExchRxQ.Peek((RBTYPE *)&len, sizeof(len)); // неинтрузивное чтение (без перемещения указателя чтения)
  if(len == 0) eds_Timeout(tim);               // если пусто - курим
  return len;
}

// парсинг
void ParsMsg(void)
{
  while(1) // писем может поднакопиться, поэтому "разом" обрабатываем все
  {
    u32 len = 0;
    if(ExchRxQ.GetBusy() > sizeof(len))
      ExchRxQ.Peek((RBTYPE *)&len, sizeof(len));
    if(len > 0)
    {
      if(len <= EXCH_MAXMSG_SIZE &&
         len <= sizeof(sExchRxMsg))
      {
        __ALIGNED(4) sExchRxMsg msg;
        ExchRxQ.FreeR(sizeof(len));               // удаляем дескриптор
        ExchRxQ.Read((RBTYPE *)&msg, len);        // читаем сообщение
        if(hw_CalcCRC32((u32 *)&msg, len / 4) == 0)
          MsgProc(&msg);                          // если все гладко - обрабатываем
      }
      else rb_FreeR(&ExchRxQ, sizeof(len) + len); // а если сообщение дюже большое - нафиг нафиг
    }
    else break;
  }
}

 

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

Дескриптор это самодельная надстройка над данными, которая отделяет пакеты друг от друга?
Если это так то можно не добавлять дескрипторы вообще, то есть: есть очередь (речь о приеме)
в нее складываются данные как есть. Это все происходит в ISP. Так как протокол SLIP уже имеет признаки начала и конца,
то задача отделения одного пакете от другого не стоит.
В задаче просто разбираем очередь и выполняем действия согласно данным. Там же и отбрасываем неправильные пакеты.
При отправке формируем ответное сообщение по принципу как я писал выше.
Итого одна очередь на прием, одна на отправку. Во внутренности фифо лезть не надо.

if(len > 0 && DSCTxQ.GetFree() >= sizeof(len) + len)
вместо + видимо надо *
И фигурных скобок добавить

 

 

Изменено пользователем iamnot

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

7 минут назад, iamnot сказал:

вместо + видимо надо *

Нет-нет, именно "если в очереди есть место под размер дескриптора + кол-во данных, указаном в этом дескрипторе"...

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

37 минут назад, Arlleex сказал:

Я наверное не совсем правильно описал: у меня один читатель, один писатель. Пишу из потока, читаю из прерывания (чтобы потом отправить вычитанный байт, например). Указатель да, передвигается после записи элементов в очередь. Но чтобы в таком случае записать дескриптор + данные, нужно делать промежуточный буфер, в котором сначала поместить дескриптор, потом скопировать данные, переданные в эту функцию по указателю, чего в одном моем проекте сделать не получится в силу ограниченного ОЗУ. Поэтому функция SendMsg() последовательными записями добавляет в очередь дескриптор, а затем сами данные

Не очень понятно - зачем "промежуточный буфер"? В вашем примере кода неясно - когда именно передвигается указатель записи в очереди? После каждой Write() или только после Write() завершающей запись всего кадра? Я имел в виду - действовать именно по 2-му варианту. И никаких "промежуточных буферов" не будет нужно. Пускай дескриптор+данные добавляются "последовательными записями", главное - указатель записи очереди передвинуть только после полной записи сообщения.

37 минут назад, Arlleex сказал:

но ведь в это время я могу напихивать в очередь еще порций дескрипторов + данных. В этом случае, когда прерывание закончит работу над первым сообщением, увидит в очереди следующий дескриптор + следующее сообщение и начнет обрабатывать его. Т.е. цепочечная обработка вырисовывается. В прерывании предусмотрен случай, когда данные нужно отправить, а в очереди их пока еще нет - т.к. поток не успел до конца дописать передаваемое сообщение, например.

Видимо Вы хотите сказать, что к моменту начала записи сообщения в очередь передачи, пишущий процесс у вас ещё не знает заранее размера сообщения и не может сообщить передающему потоку его размер (чтобы тот правильно сформировал обрамление кадра)?

В этом случае можно делать канальное кодирование при передаче не в читающем процессе, а в пишущем. Тогда читателю не будут нужны границы кадра. Либо ещё можно передавать размеры кадра через отдельную очередь: пишем данные в очередь тела кадра, но не пишем ничего в очередь границ кадра. В очередь границ кадра пишем сообщение (просто размер записанного кадра) только после записи последнего байта тела. Читатель читает обе очереди, видит что данные в очереди тела появились - формирует начало кадра, и начинает кодировать/передавать тело, подсчитывая количество прочитанных из очереди байт. До тех пор, пока не обнаружит в очереди границ кадра сообщение с размером текущего кадра - только тогда, досчитав нужное число считанных байт, он должен сформировать конец кадра. И читать количество данных в этих очередях нужно в таком порядке: читаем кол-во данных в очереди тела, затем - в очереди границ, потом только читаем сами данные (нужное число байт) из очереди тела. И опять - с начала итерации.

И дескрипторы тут не нужны. Только граница кадра через очередь границ.

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

45 минут назад, jcxz сказал:

В вашем примере кода неясно - когда именно передвигается указатель записи в очереди?

Указатель записи двигается в конце каждой функции Write(). Для универсальности и законченности, что ли...
Если не двигать, а сделать отдельный метод WriteUpd(), например, там да, буфер не нужен (собственно, об этом Вы и говорите).

Цитата

Видимо Вы хотите сказать, что к моменту начала записи сообщения в очередь передачи, пишущий процесс у вас ещё не знает заранее размера сообщения и не может сообщить передающему потоку его размер (чтобы тот правильно сформировал обрамление кадра)?

Постараюсь однозначно сейчас определить некоторые термины, чтобы мы понимали друг друга и не путались (да и сам я не путался:biggrin:). У девайса есть UART - девайс может передавать и принимать данные в SLIP-кадрах. Внутри ПО организовано 2 очереди: на передачу и на прием. Из клиентского ПО я хочу оперировать "сообщениями" - массивом байт известной длины. Сообщение A добавляется в очередь на передачу с помощью функции SendMsg(A, sizeof(A)). Механизм фрейминга вычитывает из этой очереди сообщение (он их отделяет по дескриптору-длине), формирует-отправляет потихонечку кадр SLIP, назовем его A'. Теперь этот упакованный A' улетел из девайса. Что мы здесь знаем? Знаем, что клиентский код всегда в курсе, сколько он сейчас хочет отправить (sizeof(A)). Фреймер, реализованный как механизм, внедренный в ISR передачи UART-модуля, постепенно разгребает передающую очередь, обнаруживает в нем дескриптор, и отправляет один за одним байты данных из этой очереди в количестве, указанном в дескрипторе. Если UART слишком быстр (или кто-то прервал поток, который пишет в очередь очередное передаваемое сообщение, где-то в середине), передача "приостановится", но как только SendMsg() доотработает, этот механизм возобновится. С этим вопросов нет. Однако есть еще очередь принимаемых сообщений. На входе фреймера у нас A' (т.е. запакованное сообщение), и оно подается не сразу, а байт за байтом. А в очереди принимаемых сообщений с точки зрения клиентского потока я хочу видеть последовательность "дескриптор1-данные1", "дескриптор2-данные2" и т.д. Фреймер, обнаружив спецсимвол начала кадра, будет постепенно заполнять очередь по 1 байту распакованными данными (по мере приема сообщения). Но мы же не знаем, сколько нам придет? Поэтому "резервируем" местечко в очереди (по сути делаем Write() с обнулением выделенной области и запоминанием адресов реальных ячеек памяти), принимаем-распаковываем сообщение в очередь, и как только нашли конец-границу кадра - пишем в это резервированное место полученную длину. В итоге в очереди принимаемых сообщений у нас будет наше сообщение A с дескриптором sizeof(A) в начале. А уже дальше можно проверить и контрольные суммы, и границы допустимого размера сообщения, и т.д. Фреймер, кстати, при нехватке места в приемной очереди, "откатит" ее назад, и вернется в состояние "жду границу кадра/не синхронизирован", откидывая сообщение.

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

7 минут назад, Arlleex сказал:

Если не двигать, а сделать отдельный метод WriteUpd()

Имхо - самое простое решение.

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

32 минуты назад, jcxz сказал:

Имхо - самое простое решение.

Получается, что так...

SLIP я привел просто как пример сейчас.

Между делом мне предстоит (скорее всего) написать софт-реализацию I2C-мастера (ногодрыг), и я хочу несколько универсализировать возможности этого мастера. Хочу сделать очередь простейших операций (например, формирование СТАРТ-условия, СТАРТ + отправка N данных + СТОП и т.д.), которую будет разгребать и отрабатывать обычное прерывание по таймеру (или какой-то периодический процесс). Внутри прерывания будет формироваться нужная ногодрыжная последовательность в зависимости от того, что сейчас нужно сделать на шине. В том числе по единому указанию в очереди команд производить "очистку" шины путем подачи 18 СТОП-условий. Да много чего еще. Вот и думаю, отделять дескрипторы команд от самих данных или в ту же кучу валить.

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

Присоединяйтесь к обсуждению

Вы можете написать сейчас и зарегистрироваться позже. Если у вас есть аккаунт, авторизуйтесь, чтобы опубликовать от имени своего аккаунта.

Гость
Ответить в этой теме...

×   Вставлено с форматированием.   Вставить как обычный текст

  Разрешено использовать не более 75 эмодзи.

×   Ваша ссылка была автоматически встроена.   Отображать как обычную ссылку

×   Ваш предыдущий контент был восстановлен.   Очистить редактор

×   Вы не можете вставлять изображения напрямую. Загружайте или вставляйте изображения по ссылке.

×
×
  • Создать...