Source: fluxCommunicationMessagesSender.js

  1. /* eslint-disable no-underscore-dangle */
  2. const LRU = require('lru-cache');
  3. const WebSocket = require('ws');
  4. const log = require('../lib/log');
  5. const serviceHelper = require('./serviceHelper');
  6. const fluxNetworkHelper = require('./fluxNetworkHelper');
  7. const verificationHelper = require('./verificationHelper');
  8. const messageHelper = require('./messageHelper');
  9. const {
  10. outgoingConnections, outgoingPeers, incomingPeers, incomingConnections,
  11. } = require('./utils/establishedConnections');
  12. const myMessageCache = new LRU(1000);
  13. let response = messageHelper.createErrorMessage();
  14. /**
  15. * To send to all peers.
  16. * @param {object} data Data.
  17. * @param {object[]} wsList Web socket list.
  18. */
  19. async function sendToAllPeers(data, wsList) {
  20. try {
  21. const removals = [];
  22. const ipremovals = [];
  23. // wsList is always a sublist of outgoingConnections
  24. const outConList = wsList || outgoingConnections;
  25. // eslint-disable-next-line no-restricted-syntax
  26. for (const client of outConList) {
  27. try {
  28. // eslint-disable-next-line no-await-in-loop
  29. await serviceHelper.delay(25);
  30. if (client.readyState === WebSocket.OPEN) {
  31. if (!data) {
  32. const pingTime = new Date().getTime();
  33. client.ping('flux'); // do ping with flux str instead
  34. const foundPeer = outgoingPeers.find((peer) => peer.ip === client._socket.remoteAddress);
  35. if (foundPeer) {
  36. foundPeer.lastPingTime = pingTime;
  37. }
  38. } else {
  39. client.send(data);
  40. }
  41. } else {
  42. throw new Error(`Connection to ${client._socket.remoteAddress} is not open`);
  43. }
  44. } catch (e) {
  45. removals.push(client);
  46. try {
  47. const ip = client._socket.remoteAddress;
  48. const foundPeer = outgoingPeers.find((peer) => peer.ip === ip);
  49. ipremovals.push(foundPeer);
  50. // eslint-disable-next-line no-use-before-define
  51. fluxNetworkHelper.closeConnection(ip);
  52. } catch (err) {
  53. log.error(err);
  54. }
  55. }
  56. }
  57. for (let i = 0; i < ipremovals.length; i += 1) {
  58. const peerIndex = outgoingPeers.indexOf(ipremovals[i]);
  59. if (peerIndex > -1) {
  60. outgoingPeers.splice(peerIndex, 1);
  61. }
  62. }
  63. for (let i = 0; i < removals.length; i += 1) {
  64. const ocIndex = outgoingConnections.indexOf(removals[i]);
  65. if (ocIndex > -1) {
  66. outgoingConnections.splice(ocIndex, 1);
  67. }
  68. }
  69. } catch (error) {
  70. log.error(error);
  71. }
  72. }
  73. /**
  74. * To send to all incoming connections.
  75. * @param {object} data Data.
  76. * @param {object[]} wsList Web socket list.
  77. */
  78. async function sendToAllIncomingConnections(data, wsList) {
  79. try {
  80. const removals = [];
  81. const ipremovals = [];
  82. // wsList is always a sublist of incomingConnections
  83. const incConList = wsList || incomingConnections;
  84. // eslint-disable-next-line no-restricted-syntax
  85. for (const client of incConList) {
  86. try {
  87. // eslint-disable-next-line no-await-in-loop
  88. await serviceHelper.delay(25);
  89. if (client.readyState === WebSocket.OPEN) {
  90. if (!data) {
  91. client.ping('flux'); // do ping with flux str instead
  92. } else {
  93. client.send(data);
  94. }
  95. } else {
  96. throw new Error(`Connection to ${client._socket.remoteAddress} is not open`);
  97. }
  98. } catch (e) {
  99. removals.push(client);
  100. try {
  101. const ip = client._socket.remoteAddress;
  102. const foundPeer = incomingPeers.find((peer) => peer.ip === ip);
  103. ipremovals.push(foundPeer);
  104. fluxNetworkHelper.closeIncomingConnection(ip, [], client); // this is wrong
  105. } catch (err) {
  106. log.error(err);
  107. }
  108. }
  109. }
  110. for (let i = 0; i < ipremovals.length; i += 1) {
  111. const peerIndex = incomingPeers.indexOf(ipremovals[i]);
  112. if (peerIndex > -1) {
  113. incomingPeers.splice(peerIndex, 1);
  114. }
  115. }
  116. for (let i = 0; i < removals.length; i += 1) {
  117. const ocIndex = incomingConnections.indexOf(removals[i]);
  118. if (ocIndex > -1) {
  119. incomingConnections.splice(ocIndex, 1);
  120. }
  121. }
  122. } catch (error) {
  123. log.error(error);
  124. }
  125. }
  126. /**
  127. * To get Flux message signature.
  128. * @param {object} message Message.
  129. * @param {string} privatekey Private key.
  130. * @returns {string} Signature.
  131. */
  132. async function getFluxMessageSignature(message, privatekey) {
  133. const privKey = await fluxNetworkHelper.getFluxNodePrivateKey(privatekey);
  134. const signature = await verificationHelper.signMessage(message, privKey);
  135. return signature;
  136. }
  137. /**
  138. * To serialise and sign a Flux broadcast.
  139. * @param {object} dataToBroadcast Data to broadcast. Contains version, timestamp, pubKey, signature and data.
  140. * @param {string} privatekey Private key.
  141. * @returns {string} Data string (serialised data object).
  142. */
  143. async function serialiseAndSignFluxBroadcast(dataToBroadcast, privatekey) {
  144. const version = 1;
  145. const timestamp = Date.now();
  146. const pubKey = await fluxNetworkHelper.getFluxNodePublicKey(privatekey);
  147. const message = serviceHelper.ensureString(dataToBroadcast);
  148. const messageToSign = version + message + timestamp;
  149. const signature = await getFluxMessageSignature(messageToSign, privatekey);
  150. // version 1 specifications
  151. // message contains version, timestamp, pubKey, signature and data. Data is a stringified json. Signature is signature of version+stringifieddata+timestamp
  152. // signed by the priv key corresponding to pubkey attached
  153. // data object contains version, timestamp of signing, signature, pubKey, data object. further data object must at least contain its type as string to determine it further.
  154. const dataObj = {
  155. version,
  156. timestamp,
  157. pubKey,
  158. signature,
  159. data: dataToBroadcast,
  160. };
  161. const dataString = JSON.stringify(dataObj);
  162. return dataString;
  163. }
  164. /**
  165. * To sign and send message to web socket.
  166. * @param {object} message Message.
  167. * @param {object} ws Web Socket.
  168. */
  169. async function sendMessageToWS(message, ws) {
  170. try {
  171. const messageSigned = await serialiseAndSignFluxBroadcast(message);
  172. try {
  173. ws.send(messageSigned);
  174. } catch (e) {
  175. console.error(e);
  176. }
  177. } catch (error) {
  178. log.error(error);
  179. }
  180. }
  181. /**
  182. * To respond with app message.
  183. * @param {object} message Message.
  184. * @param {object} ws Web socket.
  185. * @returns {void} Return statement is only used here to interrupt the function and nothing is returned.
  186. */
  187. async function respondWithAppMessage(message, ws) {
  188. try {
  189. // check if we have it database of permanent appMessages
  190. // eslint-disable-next-line global-require
  191. const appsService = require('./appsService');
  192. const tempMesResponse = myMessageCache.get(serviceHelper.ensureString(message));
  193. if (tempMesResponse) {
  194. sendMessageToWS(tempMesResponse, ws);
  195. return;
  196. }
  197. console.log(serviceHelper.ensureString(message));
  198. const appMessage = await appsService.checkAppMessageExistence(message.data.hash) || await appsService.checkAppTemporaryMessageExistence(message.data.hash);
  199. if (appMessage) {
  200. // const permanentAppMessage = {
  201. // type: messageType,
  202. // version: typeVersion,
  203. // appSpecifications: appSpecFormatted,
  204. // hash: messageHASH,
  205. // timestamp,
  206. // signature,
  207. // txid,
  208. // height,
  209. // valueSat,
  210. // };
  211. // a temporary appmessage looks like this:
  212. // const newMessage = {
  213. // appSpecifications: message.appSpecifications || message.zelAppSpecifications,
  214. // type: message.type,
  215. // version: message.version,
  216. // hash: message.hash,
  217. // timestamp: message.timestamp,
  218. // signature: message.signature,
  219. // createdAt: new Date(message.timestamp),
  220. // expireAt: new Date(validTill),
  221. // };
  222. const temporaryAppMessage = { // specification of temp message
  223. type: appMessage.type,
  224. version: appMessage.version,
  225. appSpecifications: appMessage.appSpecifications || appMessage.zelAppSpecifications,
  226. hash: appMessage.hash,
  227. timestamp: appMessage.timestamp,
  228. signature: appMessage.signature,
  229. };
  230. myMessageCache.set(serviceHelper.ensureString(message), temporaryAppMessage);
  231. sendMessageToWS(temporaryAppMessage, ws);
  232. }
  233. // else do nothing. We do not have this message. And this Flux would be requesting it from other peers soon too.
  234. } catch (error) {
  235. log.error(error);
  236. }
  237. }
  238. /**
  239. * To broadcast message to outgoing peers. Data is serialised and sent to outgoing peers.
  240. * @param {object} dataToBroadcast Data to broadcast.
  241. */
  242. async function broadcastMessageToOutgoing(dataToBroadcast) {
  243. const serialisedData = await serialiseAndSignFluxBroadcast(dataToBroadcast);
  244. await sendToAllPeers(serialisedData);
  245. }
  246. /**
  247. * To broadcast message to incoming peers. Data is serialised and sent to incoming peers.
  248. * @param {object} dataToBroadcast Data to broadcast.
  249. */
  250. async function broadcastMessageToIncoming(dataToBroadcast) {
  251. const serialisedData = await serialiseAndSignFluxBroadcast(dataToBroadcast);
  252. await sendToAllIncomingConnections(serialisedData);
  253. }
  254. /**
  255. * To broadcast message from user to outgoing peers. Data is serialised and sent to outgoing peers. Only accessible by admins and Flux team members.
  256. * @param {object} req Request.
  257. * @param {object} res Response.
  258. */
  259. async function broadcastMessageToOutgoingFromUser(req, res) {
  260. try {
  261. let { data } = req.params;
  262. data = data || req.query.data;
  263. if (data === undefined || data === null) {
  264. throw new Error('No message to broadcast attached.');
  265. }
  266. const authorized = await verificationHelper.verifyPrivilege('adminandfluxteam', req);
  267. if (authorized === true) {
  268. await broadcastMessageToOutgoing(data);
  269. const message = messageHelper.createSuccessMessage('Message successfully broadcasted to Flux network');
  270. response = message;
  271. } else {
  272. response = messageHelper.errUnauthorizedMessage();
  273. }
  274. res.json(response);
  275. } catch (error) {
  276. log.error(error);
  277. const errorResponse = messageHelper.createErrorMessage(
  278. error.message || error,
  279. error.name,
  280. error.code,
  281. );
  282. res.json(errorResponse);
  283. }
  284. }
  285. /**
  286. * To broadcast message from user to outgoing peers after data is processed. Processed data is serialised and sent to outgoing peers. Only accessible by admins and Flux team members.
  287. * @param {object} req Request.
  288. * @param {object} res Response.
  289. */
  290. async function broadcastMessageToOutgoingFromUserPost(req, res) {
  291. let body = '';
  292. req.on('data', (data) => {
  293. body += data;
  294. });
  295. req.on('end', async () => {
  296. try {
  297. if (body === undefined || body === '') {
  298. throw new Error('No message to broadcast attached.');
  299. }
  300. const processedBody = serviceHelper.ensureObject(body);
  301. const authorized = await verificationHelper.verifyPrivilege('adminandfluxteam', req);
  302. if (authorized === true) {
  303. await broadcastMessageToOutgoing(processedBody);
  304. const message = messageHelper.createSuccessMessage('Message successfully broadcasted to Flux network');
  305. response = message;
  306. } else {
  307. response = messageHelper.errUnauthorizedMessage();
  308. }
  309. res.json(response);
  310. } catch (error) {
  311. log.error(error);
  312. const errorResponse = messageHelper.createErrorMessage(
  313. error.message || error,
  314. error.name,
  315. error.code,
  316. );
  317. res.json(errorResponse);
  318. }
  319. });
  320. }
  321. /**
  322. * To broadcast message from user to incoming peers. Data is serialised and sent to incoming peers. Only accessible by admins and Flux team members.
  323. * @param {object} req Request.
  324. * @param {object} res Response.
  325. */
  326. async function broadcastMessageToIncomingFromUser(req, res) {
  327. try {
  328. let { data } = req.params;
  329. data = data || req.query.data;
  330. if (data === undefined || data === null) {
  331. throw new Error('No message to broadcast attached.');
  332. }
  333. const authorized = await verificationHelper.verifyPrivilege('adminandfluxteam', req);
  334. if (authorized === true) {
  335. await broadcastMessageToIncoming(data);
  336. const message = messageHelper.createSuccessMessage('Message successfully broadcasted to Flux network');
  337. response = message;
  338. } else {
  339. response = messageHelper.errUnauthorizedMessage();
  340. }
  341. res.json(response);
  342. } catch (error) {
  343. log.error(error);
  344. const errorResponse = messageHelper.createErrorMessage(
  345. error.message || error,
  346. error.name,
  347. error.code,
  348. );
  349. res.json(errorResponse);
  350. }
  351. }
  352. /**
  353. * To broadcast message from user to incoming peers after data is processed. Processed data is serialised and sent to incoming peers. Only accessible by admins and Flux team members.
  354. * @param {object} req Request.
  355. * @param {object} res Response.
  356. */
  357. async function broadcastMessageToIncomingFromUserPost(req, res) {
  358. let body = '';
  359. req.on('data', (data) => {
  360. body += data;
  361. });
  362. req.on('end', async () => {
  363. try {
  364. if (body === undefined || body === '') {
  365. throw new Error('No message to broadcast attached.');
  366. }
  367. const processedBody = serviceHelper.ensureObject(body);
  368. const authorized = await verificationHelper.verifyPrivilege('adminandfluxteam', req);
  369. if (authorized === true) {
  370. await broadcastMessageToIncoming(processedBody);
  371. const message = messageHelper.createSuccessMessage('Message successfully broadcasted to Flux network');
  372. response = message;
  373. } else {
  374. response = messageHelper.errUnauthorizedMessage();
  375. }
  376. res.json(response);
  377. } catch (error) {
  378. log.error(error);
  379. const errorResponse = messageHelper.createErrorMessage(
  380. error.message || error,
  381. error.name,
  382. error.code,
  383. );
  384. res.json(errorResponse);
  385. }
  386. });
  387. }
  388. /**
  389. * To broadcast message from user. Handles messages to outgoing and incoming peers. Only accessible by admins and Flux team members.
  390. * @param {object} req Request.
  391. * @param {object} res Response.
  392. */
  393. async function broadcastMessageFromUser(req, res) {
  394. try {
  395. let { data } = req.params;
  396. data = data || req.query.data;
  397. if (data === undefined || data === null) {
  398. throw new Error('No message to broadcast attached.');
  399. }
  400. const authorized = await verificationHelper.verifyPrivilege('adminandfluxteam', req);
  401. if (authorized === true) {
  402. await broadcastMessageToOutgoing(data);
  403. await broadcastMessageToIncoming(data);
  404. const message = messageHelper.createSuccessMessage('Message successfully broadcasted to Flux network');
  405. response = message;
  406. } else {
  407. response = messageHelper.errUnauthorizedMessage();
  408. }
  409. res.json(response);
  410. } catch (error) {
  411. log.error(error);
  412. const errorResponse = messageHelper.createErrorMessage(
  413. error.message || error,
  414. error.name,
  415. error.code,
  416. );
  417. res.json(errorResponse);
  418. }
  419. }
  420. /**
  421. * To broadcast message from user after data is processed. Handles messages to outgoing and incoming peers. Only accessible by admins and Flux team members.
  422. * @param {object} req Request.
  423. * @param {object} res Response.
  424. */
  425. async function broadcastMessageFromUserPost(req, res) {
  426. let body = '';
  427. req.on('data', (data) => {
  428. body += data;
  429. });
  430. req.on('end', async () => {
  431. try {
  432. if (body === undefined || body === '') {
  433. throw new Error('No message to broadcast attached.');
  434. }
  435. const processedBody = serviceHelper.ensureObject(body);
  436. const authorized = await verificationHelper.verifyPrivilege('adminandfluxteam', req);
  437. if (authorized === true) {
  438. await broadcastMessageToOutgoing(processedBody);
  439. await broadcastMessageToIncoming(processedBody);
  440. const message = messageHelper.createSuccessMessage('Message successfully broadcasted to Flux network');
  441. response = message;
  442. } else {
  443. response = messageHelper.errUnauthorizedMessage();
  444. }
  445. res.json(response);
  446. } catch (error) {
  447. log.error(error);
  448. const errorResponse = messageHelper.createErrorMessage(
  449. error.message || error,
  450. error.name,
  451. error.code,
  452. );
  453. res.json(errorResponse);
  454. }
  455. });
  456. }
  457. // how long can this take?
  458. /**
  459. * To broadcast temporary app message.
  460. * @param {object} message Message.
  461. */
  462. async function broadcastTemporaryAppMessage(message) {
  463. /* message object
  464. * @param type string
  465. * @param version number
  466. * @param appSpecifications object
  467. * @param hash string - messageHash(type + version + JSON.stringify(appSpecifications) + timestamp + signature))
  468. * @param timestamp number
  469. * @param signature string
  470. */
  471. log.info(message);
  472. // no verification of message before broadcasting. Broadcasting happens always after data have been verified and are stored in our db. It is up to receiving node to verify it and store and rebroadcast.
  473. if (typeof message !== 'object' || typeof message.type !== 'string' || typeof message.version !== 'number' || typeof message.appSpecifications !== 'object' || typeof message.signature !== 'string' || typeof message.timestamp !== 'number' || typeof message.hash !== 'string') {
  474. throw new Error('Invalid Flux App message for storing');
  475. }
  476. // to all outoing
  477. await broadcastMessageToOutgoing(message); // every outgoing peer AT LEAST 50ms - suppose 40 outgoing - 0.8 seconds
  478. // to all incoming. Delay broadcast in case message is processing
  479. await broadcastMessageToIncoming(message); // every incoing peer AT LEAST 50ms. Suppose 50 incoming - 1 second
  480. }
  481. module.exports = {
  482. sendToAllPeers,
  483. sendMessageToWS,
  484. sendToAllIncomingConnections,
  485. respondWithAppMessage,
  486. serialiseAndSignFluxBroadcast,
  487. getFluxMessageSignature,
  488. broadcastMessageToOutgoingFromUser,
  489. broadcastMessageToOutgoingFromUserPost,
  490. broadcastMessageToIncomingFromUser,
  491. broadcastMessageToIncomingFromUserPost,
  492. broadcastMessageToIncoming,
  493. broadcastMessageFromUser,
  494. broadcastMessageFromUserPost,
  495. broadcastTemporaryAppMessage,
  496. broadcastMessageToOutgoing,
  497. };