UniSet 2.32.1
UNetReceiver.h
1/*
2 * Copyright (c) 2015 Pavel Vainerman.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License as
6 * published by the Free Software Foundation, version 2.1.
7 *
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Lesser Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 */
16// -----------------------------------------------------------------------------
17#ifndef UNetReceiver_H_
18#define UNetReceiver_H_
19// -----------------------------------------------------------------------------
20#include <ostream>
21#include <memory>
22#include <string>
23#include <vector>
24#include <unordered_map>
25#include <sigc++/sigc++.h>
26#include <ev++.h>
27#include "UniSetObject.h"
28#include "Trigger.h"
29#include "Mutex.h"
30#include "SMInterface.h"
31#include "SharedMemory.h"
32#include "UDPPacket.h"
33#include "CommonEventLoop.h"
34#include "UNetTransport.h"
35// --------------------------------------------------------------------------
36namespace uniset
37{
38 // -----------------------------------------------------------------------------
39 /* Основная идея: сделать проверку очерёдности пакетов, но при этом использовать UDP.
40 * ===============
41 * В данных передаётся номер пакета. На случай если несколько пакетов придут не в той последовательности
42 * что были посланы, сделан циклический буфер (буфер сразу выделяет память при старте).
43 * Т.к. номер получаемых пакетов монотонно растёт на основе него вычисляется индекс
44 * куда поместить пакет в буфере. Есть два индекса
45 * rnum - (read number) номер последнего обработанного пакета + 1
46 * wnum - (write number) номер следующего ожидаемого пакета (номер последнего принятого + 1)
47 * WARNING: Если придёт два пакета с одинаковым номером, то новый пакет перезатрёт прошлый в буфере
48 *
49 * При этом обработка ведётся по порядку (только пакеты идущие подряд)
50 * как только встречается "дырка" происходит ожидание её "заполения". Если в течение времени (lostTimeout)
51 * "дырка" не исчезает, увеличивается счётчик потерянных пакетов и обработка продолжается с нового места.
52 * Т.к. используется libev и нет многопоточной работы, события обрабатываются последовательно.
53 * Раз в updatetime msec происходит обновление данных в SM, все накопившиеся пакеты обрабатываются
54 * либо пока не встретиться "дырка", либо пока rnum не догонит wnum.
55 *
56 * КЭШ
57 * ===
58 * Для оптимизации работы с SM, т.к. в пакетах приходят только пары [id,value] сделан кэш итераторов.
59 * Идея проста: сделан вектор размером с количеством принимаемых данных. В векторе хранятся итераторы (и всё что необходимо).
60 * Порядковый номер данных в пакете является индексом в кэше.
61 * Для защиты от изменения последовательности внутри пакета, в кэше хранится ID сохраняемого датчика, и если он не совпадёт с тем,
62 * ID который пришёл в пакете - элемент кэша обновляется.
63 * Если количество пришедших данных не совпадают с размером кэша - кэш обновляется.
64 *
65 * КЭШ (ДОПОЛНЕНИЕ)
66 * ===
67 * Т.к. в общем случае, данные могут быть разбиты на несколько (много) пакетов, то для каждого из них выделен свой кэш и создан отдельный
68 * map, ключом в котором является идентификатор данных (см. UDPMessage::getDataID()).
69 * Кэш в map добавляется тогда, когда приходит пакет с новым UDPMessage::getDataID() и в дальнейшим он используется для этого пакета.
70 * В текущей реализации размер map не контролируется (завязан на UDPMessage::getDataID()) и рассчитан на статичность пакетов,
71 * т.е. на то что UNetSender не будет с течением времени менять количество отправляемых пакетов
72 * (работать будет, просто в map останутся лежать записи для неиспользуемых пакетов)
73 *
74 * ОПТИМИЗАЦИЯ
75 * ===
76 * В кэше так же хранится crc последних принятых данных. Если crc совпадает с тем, что пришло в пакете, то обработки не происходит.
77 * crc хранится отдельно для дискретных и отдельно для аналоговых датчиков.
78 * Эту оптимизацию можно отключить параметром --prefix-recv-ignore-crc или recvIgnoreCRC="1" в конф. файле.
79 *
80 * Обработка сбоев в номере пакетов
81 * =========================================================================
82 * Если в какой-то момент расстояние между rnum и wnum превышает maxDifferens пакетов
83 * то считается, что произошёл сбой или узел который посылал пакеты - перезагрузился
84 * Идёт попытка обработать все текущие пакеты (до первой дырки), а дальше происходит
85 * реинициализация и обработка продолжается с нового номера.
86 *
87 * =========================================================================
88 * ОПТИМИЗАЦИЯ N1: см. UNetSender.h. Если номер последнего принятого пакета не менялся, пакет не обрабатываем.
89 *
90 * Создание соединения (открытие сокета)
91 * ======================================
92 * Попытка создать сокет производиться сразу в конструкторе, если это не получается,
93 * то создаётся таймер (evCheckConnection), который периодически (checkConnectionTime) пытается вновь
94 * открыть сокет.. и так бесконечно, пока не получится. Это важно для систем, где в момент загрузки программы
95 * (в момент создания объекта UNetReceiver) ещё может быть не поднята сеть или какой-то сбой с сетью и требуется
96 * ожидание (без вылета программы) пока "внешняя система мониторинга" не поднимет сеть.
97 * Если такая логика не требуется, то можно задать в конструкторе
98 * последним аргументом флаг nocheckconnection=true, тогда при создании объекта UNetReceiver, в конструкторе будет
99 * выкинуто исключение при неудачной попытке создания соединения.
100 */
101 // -----------------------------------------------------------------------------
102 class UNetReceiver final:
103 protected EvWatcher,
104 public std::enable_shared_from_this<UNetReceiver>
105 {
106 public:
107 UNetReceiver( std::unique_ptr<UNetReceiveTransport>&& transport, const std::shared_ptr<SMInterface>& smi
108 , bool nocheckConnection = false
109 , const std::string& prefix = "unet" );
110 virtual ~UNetReceiver();
111
112 void start();
113 void stop();
114
115 inline std::string getName() const noexcept
116 {
117 return myname;
118 }
119
120 // блокировать сохранение данных в SM
121 void setLockUpdate( bool st ) noexcept;
122 bool isLockUpdate() const noexcept;
123
124 void resetTimeout() noexcept;
125
126 bool isInitOK() const noexcept;
127 bool isRecvOK() const noexcept;
128 size_t getLostPacketsNum() const noexcept;
129
130 void setReceiveTimeout( timeout_t msec ) noexcept;
131 void setUpdatePause( timeout_t msec ) noexcept;
132 void setLostTimeout( timeout_t msec ) noexcept;
133 void setPrepareTime( timeout_t msec ) noexcept;
134 void setCheckConnectionPause( timeout_t msec ) noexcept;
135 void setMaxDifferens( unsigned long set ) noexcept;
136 void setEvrunTimeout(timeout_t msec ) noexcept;
137 void setInitPause( timeout_t msec ) noexcept;
138 void setBufferSize( size_t sz ) noexcept;
139 void setMaxReceiveAtTime( size_t sz ) noexcept;
140 void setIgnoreCRC( bool set ) noexcept;
141
142 void setRespondID( uniset::ObjectId id, bool invert = false ) noexcept;
143 void setLostPacketsID( uniset::ObjectId id ) noexcept;
144 void setModeID( uniset::ObjectId id ) noexcept;
145
146 void forceUpdate() noexcept; // пересохранить очередной пакет в SM даже если данные не менялись
147
148 inline std::string getTransportID() const noexcept
149 {
150 return transport->ID();
151 }
152
154 enum Event
155 {
157 evTimeout
158 };
159
160 enum class Mode : int
161 {
162 mEnabled = 0,
163 mDisabled = 1
164 };
165
166 typedef sigc::slot<void, const std::shared_ptr<UNetReceiver>&, Event> EventSlot;
167 void connectEvent( EventSlot sl ) noexcept;
168
169 // --------------------------------------------------------------------
170 inline std::shared_ptr<DebugStream> getLog() noexcept
171 {
172 return unetlog;
173 }
174
175 std::string getShortInfo() const noexcept;
176
177 protected:
178
179 const std::shared_ptr<SMInterface> shm;
180 std::shared_ptr<DebugStream> unetlog;
181
182 enum ReceiveRetCode
183 {
184 retOK = 0,
185 retError = 1,
186 retNoData = 2
187 };
188
189 ReceiveRetCode receive() noexcept;
190 void update() noexcept;
191 void callback( ev::io& watcher, int revents ) noexcept;
192 void readEvent( ev::io& watcher ) noexcept;
193 void updateEvent( ev::periodic& watcher, int revents ) noexcept;
194 void checkConnectionEvent( ev::periodic& watcher, int revents ) noexcept;
195 void statisticsEvent( ev::periodic& watcher, int revents ) noexcept;
196 void onForceUpdate( ev::async& watcher, int revents ) noexcept;
197 void initEvent( ev::timer& watcher, int revents ) noexcept;
198 virtual void evprepare( const ev::loop_ref& eloop ) noexcept override;
199 virtual void evfinish(const ev::loop_ref& eloop ) noexcept override;
200 virtual std::string wname() const noexcept override
201 {
202 return myname;
203 }
204
205 void initIterators() noexcept;
206 bool createConnection( bool throwEx = false );
207 bool checkConnection();
208 size_t rnext( size_t num );
209
210 private:
211 UNetReceiver() {}
212
213 timeout_t updatepause = { 100 };
215 std::unique_ptr<UNetReceiveTransport> transport;
216 std::string addr;
217 std::string myname;
218 ev::io evReceive;
219 ev::periodic evCheckConnection;
220 ev::periodic evStatistic;
221 ev::periodic evUpdate;
222 ev::timer evInitPause;
223 ev::async evForceUpdate;
224
225 // счётчики для подсчёта статистики
226 size_t recvCount = { 0 };
227 size_t upCount = { 0 };
228 std::chrono::steady_clock::time_point t_start;
229 std::chrono::steady_clock::time_point t_end;
230 std::chrono::steady_clock::time_point t_stats;
231
232 // текущая статистика
233 struct Stats
234 {
235 float recvPerSec = {0};
236 float upPerSec = {0};
237 size_t upProcessingTime_microsec = {0};
238 size_t recvProcessingTime_microsec = {0};
239 };
240
241 Stats stats;
242
243 // делаем loop общим.. одним на всех!
244 static CommonEventLoop loop;
245
246 double checkConnectionTime = { 10.0 }; // sec
247 std::mutex checkConnMutex;
248
249 PassiveTimer ptRecvTimeout;
250 PassiveTimer ptPrepare;
251 timeout_t recvTimeout = { 5000 }; // msec
252 timeout_t prepareTime = { 2000 };
253 timeout_t evrunTimeout = { 15000 };
254 timeout_t lostTimeout = { 200 };
255 size_t maxReceiveCount = { 5 }; // количество читаемых за один раз
256
257 double initPause = { 5.0 }; // пауза на начальную инициализацию (сек)
258 std::atomic_bool initOK = { false };
259
260 PassiveTimer ptLostTimeout;
261 size_t lostPackets = { 0 };
264 IOController::IOStateList::iterator itRespond;
265 bool respondInvert = { false };
266 uniset::ObjectId sidLostPackets = { uniset::DefaultObjectId };
267 IOController::IOStateList::iterator itLostPackets;
268
269 // режим работы
271 IOController::IOStateList::iterator itMode;
272 Mode mode = { Mode::mEnabled };
273
274 std::atomic_bool activated = { false };
275
276 size_t cbufSize = { 100 };
277 std::vector<UniSetUDP::UDPMessage> cbuf; // circular buffer
278 size_t wnum = { 1 };
279 size_t rnum = { 0 };
280 UniSetUDP::UDPMessage* pack; // текущий обрабатываемый пакет
281
285 size_t maxDifferens = { 20 };
286
287 std::atomic_bool lockUpdate = { false };
289 EventSlot slEvent;
290 Trigger trTimeout;
291 std::mutex tmMutex;
292
293 struct CacheItem
294 {
295 long id = { uniset::DefaultObjectId };
296 IOController::IOStateList::iterator ioit;
297
298 CacheItem():
299 id(uniset::DefaultObjectId) {}
300 };
301 typedef std::vector<CacheItem> CacheVec;
302
303 struct CacheInfo
304 {
305 uint16_t crc;
306 CacheVec items;
307
308 CacheInfo(): crc(0) {}
309 };
310
311 // ключом является UDPMessage::getDataID()
312 typedef std::unordered_map<long, CacheInfo> CacheMap;
313 CacheMap d_icache_map;
314 CacheMap a_icache_map;
315 size_t cacheMissed; // количество промахов
316 bool ignoreCRC = { false };
318 Trigger trOnMode;
320 CacheInfo* getDCache( UniSetUDP::UDPMessage* upack ) noexcept;
321 CacheInfo* getACache( UniSetUDP::UDPMessage* pack ) noexcept;
322 };
323 // --------------------------------------------------------------------------
324} // end of namespace uniset
325// -----------------------------------------------------------------------------
326namespace std
327{
328 std::string to_string( const uniset::UNetReceiver::Mode& p );
329}
330// -----------------------------------------------------------------------------
331#endif // UNetReceiver_H_
332// -----------------------------------------------------------------------------
Definition DebugStream.h:62
Definition CommonEventLoop.h:19
Definition UNetReceiver.h:105
Event
Definition UNetReceiver.h:155
@ evTimeout
Definition UNetReceiver.h:157
@ evOK
Definition UNetReceiver.h:156
Mode
Definition UNetReceiver.h:161
STL namespace.
Definition Calibration.h:27
const ObjectId DefaultObjectId
Definition UniSetTypes.h:71
long ObjectId
Definition UniSetTypes_i.idl:30