UniSet 2.32.1
UWebSocketGate.h
1/*
2 * Copyright (c) 2017 Pavel Vainerman.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License as
6 * published by the Free Software Foundation, version 2.1.
7 *
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Lesser Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 */
16// --------------------------------------------------------------------------
20// --------------------------------------------------------------------------
21#ifndef UWebSocketGate_H_
22#define UWebSocketGate_H_
23// --------------------------------------------------------------------------
24#include <queue>
25#include <memory>
26#include <mutex>
27#include <condition_variable>
28#include <chrono>
29#include <ev++.h>
30#include <sigc++/sigc++.h>
31#include <Poco/JSON/Object.h>
32#include <Poco/Net/WebSocket.h>
33#include <Poco/ObjectPool.h>
34#include "UniSetTypes.h"
35#include "LogAgregator.h"
36#include "UniSetObject.h"
37#include "DebugStream.h"
38#include "SharedMemory.h"
39#include "SMInterface.h"
40#include "EventLoopServer.h"
41#include "UTCPStream.h"
42#include "UHttpRequestHandler.h"
43#include "UHttpServer.h"
44#include "UTCPCore.h"
45#include "RunLock.h"
46// -------------------------------------------------------------------------
47namespace uniset
48{
49 //------------------------------------------------------------------------------------------
190 public UniSetObject,
191 public EventLoopServer
192#ifndef DISABLE_REST_API
193 , public Poco::Net::HTTPRequestHandler
194#endif
195 {
196 public:
197 UWebSocketGate( uniset::ObjectId id, xmlNode* cnode
198 , uniset::ObjectId shmID
199 , const std::shared_ptr<SharedMemory>& ic = nullptr
200 , const std::string& prefix = "-ws" );
201
202 virtual ~UWebSocketGate();
203
205 static std::shared_ptr<UWebSocketGate> init_wsgate( int argc, const char* const* argv
206 , uniset::ObjectId shmID
207 , const std::shared_ptr<SharedMemory>& ic = nullptr
208 , const std::string& prefix = "ws-" );
209
211 static void help_print();
212
213 inline std::shared_ptr<DebugStream> log()
214 {
215 return mylog;
216 }
217 inline std::shared_ptr<uniset::LogAgregator> logAgregator() noexcept
218 {
219 return loga;
220 }
221
222#ifndef DISABLE_REST_API
223 virtual void handleRequest( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp ) override;
224 void onWebSocketSession( Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp );
225#endif
226
227 static Poco::JSON::Object::Ptr error_to_json( std::string_view err );
228 static void fill_error_json( Poco::JSON::Object::Ptr& p, std::string_view err );
229
230 protected:
231
232 class UWebSocket;
233
234 virtual bool activateObject() override;
235 virtual bool deactivateObject() override;
236 virtual void sysCommand( const uniset::SystemMessage* sm ) override;
237 void run( bool async );
238 virtual void evfinish() override;
239 virtual void evprepare() override;
240 void onCheckBuffer( ev::timer& t, int revents );
241 void onActivate( ev::async& watcher, int revents ) ;
242 void onCommand( ev::async& watcher, int revents );
243
244#ifndef DISABLE_REST_API
245 void httpWebSocketPage( std::ostream& out, Poco::Net::HTTPServerRequest& req, Poco::Net::HTTPServerResponse& resp );
246 void httpWebSocketConnectPage(std::ostream& out, Poco::Net::HTTPServerRequest& req,
247 Poco::Net::HTTPServerResponse& resp, const std::string& params );
248
249 std::shared_ptr<UWebSocket> newWebSocket(Poco::Net::HTTPServerRequest* req, Poco::Net::HTTPServerResponse* resp, const Poco::URI::QueryParameters& qp );
250 void delWebSocket( std::shared_ptr<UWebSocket>& ws );
251
252 Poco::JSON::Object::Ptr respError( Poco::Net::HTTPServerResponse& resp, Poco::Net::HTTPResponse::HTTPStatus s, const std::string& message );
253 void makeResponseAccessHeader( Poco::Net::HTTPServerResponse& resp );
254#endif
255 void terminate();
256
257 ev::async wsactivate; // активация WebSocket-ов
258 std::shared_ptr<ev::async> wscmd;
259
260 void checkMessages( ev::timer& t, int revents );
261 virtual void sensorInfo( const uniset::SensorMessage* sm ) override;
262 virtual uniset::SimpleInfo* getInfo( const char* userparam = 0 ) override;
263 ev::timer iocheck;
264 double check_sec = { 0.05 };
265 int maxMessagesProcessing = { 200 };
266
267 std::shared_ptr<DebugStream> mylog;
268 std::shared_ptr<uniset::LogAgregator> loga;
269 std::shared_ptr<SMInterface> shm;
270 std::unique_ptr<uniset::RunLock> runlock;
271
272 std::shared_ptr<uniset::LogServer> logserv;
273 std::string logserv_host = {""};
274 int logserv_port = { 0 };
275
276#ifndef DISABLE_REST_API
277 std::shared_ptr<Poco::Net::HTTPServer> httpserv;
278 std::string httpHost = { "" };
279 int httpPort = { 0 };
280 std::string httpCORS_allow = { "*" };
281
282 double wsHeartbeatTime_sec = { 3.0 };
283 double wsSendTime_sec = { 0.2 };
284 size_t wsMaxSend = { 5000 };
285 size_t wsMaxCmd = { 200 };
286 double wsPongTimeout_sec = {5.0 };
287
288 int jpoolCapacity = { 200 };
289 int jpoolPeakCapacity = { 5000 };
290
299 public Poco::Net::WebSocket
300 {
301 public:
302 UWebSocket( Poco::Net::HTTPServerRequest* req,
303 Poco::Net::HTTPServerResponse* resp,
304 int jpoolCapacity = 100,
305 int jpoolPeakCapacity = 500 );
306
307 virtual ~UWebSocket();
308
309 std::string getInfo() const noexcept;
310
311 bool isActive();
312 void set( ev::dynamic_loop& loop, std::shared_ptr<ev::async> a );
313
314 void send( ev::timer& t, int revents );
315 void ping( ev::timer& t, int revents );
316 void read( ev::io& io, int revents );
317 void pong( ev::timer& t, int revents );
318
319 struct sinfo
320 {
321 sinfo( const std::string& _cmd, uniset::ObjectId _id ): id(_id), cmd(_cmd) {}
322
323 std::string err; // ошибка при работе с датчиком (например при заказе)
325 std::string cmd = "";
326 long value = { 0 }; // set value
327 // cache
328 std::string name;
329 };
330
331 void ask( uniset::ObjectId id );
332 void del( uniset::ObjectId id );
333 void get( uniset::ObjectId id );
334 void set( uniset::ObjectId id, long value );
335 void freeze( uniset::ObjectId id, long value );
336 void unfreeze( uniset::ObjectId id );
337 void sensorInfo( const uniset::SensorMessage* sm );
338 void doCommand( const std::shared_ptr<SMInterface>& ui );
339 static Poco::JSON::Object::Ptr to_short_json( const std::shared_ptr<sinfo>& si );
340 static Poco::JSON::Object::Ptr to_json( const uniset::SensorMessage* sm, const std::shared_ptr<sinfo>& si );
341 static void fill_short_json( Poco::JSON::Object::Ptr& p, const std::shared_ptr<sinfo>& si );
342 static void fill_json( Poco::JSON::Object::Ptr& p, const uniset::SensorMessage* sm, const std::shared_ptr<sinfo>& si );
343
344 void term();
345 void waitCompletion();
346
347 // настройка
348 void setHearbeatTime( const double& sec );
349 void setSendPeriod( const double& sec );
350 void setMaxSendCount( size_t val );
351 void setMaxCmdCount( size_t val );
352 void setPongTimeout( const double& set );
353
354 std::shared_ptr<DebugStream> mylog;
355
356 protected:
357
358 void write();
359 void sendResponse( const std::shared_ptr<sinfo>& si );
360 void sendShortResponse( const std::shared_ptr<sinfo>& si );
361 void onCommand( std::string_view cmd );
362 void sendError( std::string_view message );
363 void returnObjectToPool( Poco::JSON::Object::Ptr& json );
364
365 ev::timer iosend;
366 double send_sec = { 0.5 };
367 size_t maxsend = { 5000 };
368 size_t maxcmd = { 200 };
369 const int Kbuf = { 10 }; // коэффициент для буфера сообщений (maxsend умножается на Kbuf)
370 static const size_t sbufLen = 100 * 1024;
371 char sbuf[sbufLen]; // буфер используемый для преобразования json в потом байт (см. send)
372
373 ev::timer ioping;
374 double ping_sec = { 3.0 };
375 static const std::string ping_str;
376 ev::timer iopong;
377 double pongTimeout_sec = { 5.0 };
378 size_t pongCounter = { 0 };
379
380 ev::io iorecv;
381 char rbuf[64 * 1024];
382 timeout_t recvTimeout = { 200 }; // msec
383 std::shared_ptr<ev::async> cmdsignal;
384
385 std::mutex finishmut;
386 std::condition_variable finish;
387
388 std::atomic_bool cancelled = { false };
389
390 std::unordered_map<uniset::ObjectId, std::shared_ptr<sinfo> > smap;
391 std::queue< std::shared_ptr<sinfo> > qcmd; // очередь команд
392
393 Poco::Net::HTTPServerRequest* req;
394 Poco::Net::HTTPServerResponse* resp;
395
396 // очередь json-на отправку
397 std::queue<Poco::JSON::Object::Ptr> jbuf;
398 std::unique_ptr<Poco::ObjectPool< Poco::JSON::Object, Poco::JSON::Object::Ptr >> jpoolSM;
399 std::unique_ptr<Poco::ObjectPool< Poco::JSON::Object, Poco::JSON::Object::Ptr >> jpoolErr;
400 std::unique_ptr<Poco::ObjectPool< Poco::JSON::Object, Poco::JSON::Object::Ptr >> jpoolShortSM;
401
402 // очередь данных на посылку..
403 std::unique_ptr<Poco::ObjectPool< uniset::UTCPCore::Buffer >> wbufpool;
404 std::queue<uniset::UTCPCore::Buffer*> wbuf;
405 size_t maxsize; // рассчитывается сходя из max_send (см. конструктор)
406 };
407
409 {
410 public:
411
412 UWebSocketGuard( std::shared_ptr<UWebSocket>& s, UWebSocketGate* g ):
413 ws(s), wsgate(g) {}
414
416 {
417 wsgate->delWebSocket(ws);
418 }
419
420 private:
421 std::shared_ptr<UWebSocket> ws;
422 UWebSocketGate* wsgate;
423 };
424
425 friend class UWebSocketGuard;
426
427 std::list<std::shared_ptr<UWebSocket>> wsocks;
428 uniset::uniset_rwmutex wsocksMutex;
429 size_t maxwsocks = { 50 }; // максимальное количество websocket-ов
430
432 public Poco::Net::HTTPRequestHandlerFactory
433 {
434 public:
437
438 virtual Poco::Net::HTTPRequestHandler* createRequestHandler( const Poco::Net::HTTPServerRequest& req ) override;
439
440 private:
441 UWebSocketGate* wsgate;
442 };
443#endif
444
445 private:
446
447 };
448 // ----------------------------------------------------------------------------------
449} // end of namespace uniset
450//------------------------------------------------------------------------------------------
451#endif
The EventLoopServer class Реализация общей части всех процессов использующих libev....
Definition EventLoopServer.h:18
Definition MessageType.h:127
Definition MessageType.h:171
Definition UWebSocketGate.h:409
Definition UWebSocketGate.h:300
timeout_t recvTimeout
Definition UWebSocketGate.h:382
Definition UWebSocketGate.h:195
virtual bool activateObject() override
Активизация объекта (переопределяется для необходимых действий после активизации)
Definition UWebSocketGate.cc:687
static std::shared_ptr< UWebSocketGate > init_wsgate(int argc, const char *const *argv, uniset::ObjectId shmID, const std::shared_ptr< SharedMemory > &ic=nullptr, const std::string &prefix="ws-")
Definition UWebSocketGate.cc:373
virtual bool deactivateObject() override
Деактивация объекта (переопределяется для необходимых действий при завершении работы)
Definition UWebSocketGate.cc:677
static void help_print()
Definition UWebSocketGate.cc:389
Definition UniSetObject.h:80
std::shared_ptr< UInterface > ui
Definition UniSetObject.h:136
Definition Mutex.h:32
Definition Calibration.h:27
const ObjectId DefaultObjectId
Definition UniSetTypes.h:71
long ObjectId
Definition UniSetTypes_i.idl:30
Definition UniSetTypes_i.idl:65
Definition UWebSocketGate.h:320