socket.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594
  1. import { transports } from "./transports/index.js";
  2. import { installTimerFunctions, byteLength } from "./util.js";
  3. import { decode } from "./contrib/parseqs.js";
  4. import { parse } from "./contrib/parseuri.js";
  5. import { Emitter } from "@socket.io/component-emitter";
  6. import { protocol } from "engine.io-parser";
  7. import { defaultBinaryType } from "./transports/websocket-constructor.js";
  8. export class Socket extends Emitter {
  9. /**
  10. * Socket constructor.
  11. *
  12. * @param {String|Object} uri - uri or options
  13. * @param {Object} opts - options
  14. */
  15. constructor(uri, opts = {}) {
  16. super();
  17. this.binaryType = defaultBinaryType;
  18. this.writeBuffer = [];
  19. if (uri && "object" === typeof uri) {
  20. opts = uri;
  21. uri = null;
  22. }
  23. if (uri) {
  24. uri = parse(uri);
  25. opts.hostname = uri.host;
  26. opts.secure = uri.protocol === "https" || uri.protocol === "wss";
  27. opts.port = uri.port;
  28. if (uri.query)
  29. opts.query = uri.query;
  30. }
  31. else if (opts.host) {
  32. opts.hostname = parse(opts.host).host;
  33. }
  34. installTimerFunctions(this, opts);
  35. this.secure =
  36. null != opts.secure
  37. ? opts.secure
  38. : typeof location !== "undefined" && "https:" === location.protocol;
  39. if (opts.hostname && !opts.port) {
  40. // if no port is specified manually, use the protocol default
  41. opts.port = this.secure ? "443" : "80";
  42. }
  43. this.hostname =
  44. opts.hostname ||
  45. (typeof location !== "undefined" ? location.hostname : "localhost");
  46. this.port =
  47. opts.port ||
  48. (typeof location !== "undefined" && location.port
  49. ? location.port
  50. : this.secure
  51. ? "443"
  52. : "80");
  53. this.transports = opts.transports || [
  54. "polling",
  55. "websocket",
  56. "webtransport",
  57. ];
  58. this.writeBuffer = [];
  59. this.prevBufferLen = 0;
  60. this.opts = Object.assign({
  61. path: "/engine.io",
  62. agent: false,
  63. withCredentials: false,
  64. upgrade: true,
  65. timestampParam: "t",
  66. rememberUpgrade: false,
  67. addTrailingSlash: true,
  68. rejectUnauthorized: true,
  69. perMessageDeflate: {
  70. threshold: 1024,
  71. },
  72. transportOptions: {},
  73. closeOnBeforeunload: false,
  74. }, opts);
  75. this.opts.path =
  76. this.opts.path.replace(/\/$/, "") +
  77. (this.opts.addTrailingSlash ? "/" : "");
  78. if (typeof this.opts.query === "string") {
  79. this.opts.query = decode(this.opts.query);
  80. }
  81. // set on handshake
  82. this.id = null;
  83. this.upgrades = null;
  84. this.pingInterval = null;
  85. this.pingTimeout = null;
  86. // set on heartbeat
  87. this.pingTimeoutTimer = null;
  88. if (typeof addEventListener === "function") {
  89. if (this.opts.closeOnBeforeunload) {
  90. // Firefox closes the connection when the "beforeunload" event is emitted but not Chrome. This event listener
  91. // ensures every browser behaves the same (no "disconnect" event at the Socket.IO level when the page is
  92. // closed/reloaded)
  93. this.beforeunloadEventListener = () => {
  94. if (this.transport) {
  95. // silently close the transport
  96. this.transport.removeAllListeners();
  97. this.transport.close();
  98. }
  99. };
  100. addEventListener("beforeunload", this.beforeunloadEventListener, false);
  101. }
  102. if (this.hostname !== "localhost") {
  103. this.offlineEventListener = () => {
  104. this.onClose("transport close", {
  105. description: "network connection lost",
  106. });
  107. };
  108. addEventListener("offline", this.offlineEventListener, false);
  109. }
  110. }
  111. this.open();
  112. }
  113. /**
  114. * Creates transport of the given type.
  115. *
  116. * @param {String} name - transport name
  117. * @return {Transport}
  118. * @private
  119. */
  120. createTransport(name) {
  121. const query = Object.assign({}, this.opts.query);
  122. // append engine.io protocol identifier
  123. query.EIO = protocol;
  124. // transport name
  125. query.transport = name;
  126. // session id if we already have one
  127. if (this.id)
  128. query.sid = this.id;
  129. const opts = Object.assign({}, this.opts, {
  130. query,
  131. socket: this,
  132. hostname: this.hostname,
  133. secure: this.secure,
  134. port: this.port,
  135. }, this.opts.transportOptions[name]);
  136. return new transports[name](opts);
  137. }
  138. /**
  139. * Initializes transport to use and starts probe.
  140. *
  141. * @private
  142. */
  143. open() {
  144. let transport;
  145. if (this.opts.rememberUpgrade &&
  146. Socket.priorWebsocketSuccess &&
  147. this.transports.indexOf("websocket") !== -1) {
  148. transport = "websocket";
  149. }
  150. else if (0 === this.transports.length) {
  151. // Emit error on next tick so it can be listened to
  152. this.setTimeoutFn(() => {
  153. this.emitReserved("error", "No transports available");
  154. }, 0);
  155. return;
  156. }
  157. else {
  158. transport = this.transports[0];
  159. }
  160. this.readyState = "opening";
  161. // Retry with the next transport if the transport is disabled (jsonp: false)
  162. try {
  163. transport = this.createTransport(transport);
  164. }
  165. catch (e) {
  166. this.transports.shift();
  167. this.open();
  168. return;
  169. }
  170. transport.open();
  171. this.setTransport(transport);
  172. }
  173. /**
  174. * Sets the current transport. Disables the existing one (if any).
  175. *
  176. * @private
  177. */
  178. setTransport(transport) {
  179. if (this.transport) {
  180. this.transport.removeAllListeners();
  181. }
  182. // set up transport
  183. this.transport = transport;
  184. // set up transport listeners
  185. transport
  186. .on("drain", this.onDrain.bind(this))
  187. .on("packet", this.onPacket.bind(this))
  188. .on("error", this.onError.bind(this))
  189. .on("close", (reason) => this.onClose("transport close", reason));
  190. }
  191. /**
  192. * Probes a transport.
  193. *
  194. * @param {String} name - transport name
  195. * @private
  196. */
  197. probe(name) {
  198. let transport = this.createTransport(name);
  199. let failed = false;
  200. Socket.priorWebsocketSuccess = false;
  201. const onTransportOpen = () => {
  202. if (failed)
  203. return;
  204. transport.send([{ type: "ping", data: "probe" }]);
  205. transport.once("packet", (msg) => {
  206. if (failed)
  207. return;
  208. if ("pong" === msg.type && "probe" === msg.data) {
  209. this.upgrading = true;
  210. this.emitReserved("upgrading", transport);
  211. if (!transport)
  212. return;
  213. Socket.priorWebsocketSuccess = "websocket" === transport.name;
  214. this.transport.pause(() => {
  215. if (failed)
  216. return;
  217. if ("closed" === this.readyState)
  218. return;
  219. cleanup();
  220. this.setTransport(transport);
  221. transport.send([{ type: "upgrade" }]);
  222. this.emitReserved("upgrade", transport);
  223. transport = null;
  224. this.upgrading = false;
  225. this.flush();
  226. });
  227. }
  228. else {
  229. const err = new Error("probe error");
  230. // @ts-ignore
  231. err.transport = transport.name;
  232. this.emitReserved("upgradeError", err);
  233. }
  234. });
  235. };
  236. function freezeTransport() {
  237. if (failed)
  238. return;
  239. // Any callback called by transport should be ignored since now
  240. failed = true;
  241. cleanup();
  242. transport.close();
  243. transport = null;
  244. }
  245. // Handle any error that happens while probing
  246. const onerror = (err) => {
  247. const error = new Error("probe error: " + err);
  248. // @ts-ignore
  249. error.transport = transport.name;
  250. freezeTransport();
  251. this.emitReserved("upgradeError", error);
  252. };
  253. function onTransportClose() {
  254. onerror("transport closed");
  255. }
  256. // When the socket is closed while we're probing
  257. function onclose() {
  258. onerror("socket closed");
  259. }
  260. // When the socket is upgraded while we're probing
  261. function onupgrade(to) {
  262. if (transport && to.name !== transport.name) {
  263. freezeTransport();
  264. }
  265. }
  266. // Remove all listeners on the transport and on self
  267. const cleanup = () => {
  268. transport.removeListener("open", onTransportOpen);
  269. transport.removeListener("error", onerror);
  270. transport.removeListener("close", onTransportClose);
  271. this.off("close", onclose);
  272. this.off("upgrading", onupgrade);
  273. };
  274. transport.once("open", onTransportOpen);
  275. transport.once("error", onerror);
  276. transport.once("close", onTransportClose);
  277. this.once("close", onclose);
  278. this.once("upgrading", onupgrade);
  279. if (this.upgrades.indexOf("webtransport") !== -1 &&
  280. name !== "webtransport") {
  281. // favor WebTransport
  282. this.setTimeoutFn(() => {
  283. if (!failed) {
  284. transport.open();
  285. }
  286. }, 200);
  287. }
  288. else {
  289. transport.open();
  290. }
  291. }
  292. /**
  293. * Called when connection is deemed open.
  294. *
  295. * @private
  296. */
  297. onOpen() {
  298. this.readyState = "open";
  299. Socket.priorWebsocketSuccess = "websocket" === this.transport.name;
  300. this.emitReserved("open");
  301. this.flush();
  302. // we check for `readyState` in case an `open`
  303. // listener already closed the socket
  304. if ("open" === this.readyState && this.opts.upgrade) {
  305. let i = 0;
  306. const l = this.upgrades.length;
  307. for (; i < l; i++) {
  308. this.probe(this.upgrades[i]);
  309. }
  310. }
  311. }
  312. /**
  313. * Handles a packet.
  314. *
  315. * @private
  316. */
  317. onPacket(packet) {
  318. if ("opening" === this.readyState ||
  319. "open" === this.readyState ||
  320. "closing" === this.readyState) {
  321. this.emitReserved("packet", packet);
  322. // Socket is live - any packet counts
  323. this.emitReserved("heartbeat");
  324. this.resetPingTimeout();
  325. switch (packet.type) {
  326. case "open":
  327. this.onHandshake(JSON.parse(packet.data));
  328. break;
  329. case "ping":
  330. this.sendPacket("pong");
  331. this.emitReserved("ping");
  332. this.emitReserved("pong");
  333. break;
  334. case "error":
  335. const err = new Error("server error");
  336. // @ts-ignore
  337. err.code = packet.data;
  338. this.onError(err);
  339. break;
  340. case "message":
  341. this.emitReserved("data", packet.data);
  342. this.emitReserved("message", packet.data);
  343. break;
  344. }
  345. }
  346. else {
  347. }
  348. }
  349. /**
  350. * Called upon handshake completion.
  351. *
  352. * @param {Object} data - handshake obj
  353. * @private
  354. */
  355. onHandshake(data) {
  356. this.emitReserved("handshake", data);
  357. this.id = data.sid;
  358. this.transport.query.sid = data.sid;
  359. this.upgrades = this.filterUpgrades(data.upgrades);
  360. this.pingInterval = data.pingInterval;
  361. this.pingTimeout = data.pingTimeout;
  362. this.maxPayload = data.maxPayload;
  363. this.onOpen();
  364. // In case open handler closes socket
  365. if ("closed" === this.readyState)
  366. return;
  367. this.resetPingTimeout();
  368. }
  369. /**
  370. * Sets and resets ping timeout timer based on server pings.
  371. *
  372. * @private
  373. */
  374. resetPingTimeout() {
  375. this.clearTimeoutFn(this.pingTimeoutTimer);
  376. this.pingTimeoutTimer = this.setTimeoutFn(() => {
  377. this.onClose("ping timeout");
  378. }, this.pingInterval + this.pingTimeout);
  379. if (this.opts.autoUnref) {
  380. this.pingTimeoutTimer.unref();
  381. }
  382. }
  383. /**
  384. * Called on `drain` event
  385. *
  386. * @private
  387. */
  388. onDrain() {
  389. this.writeBuffer.splice(0, this.prevBufferLen);
  390. // setting prevBufferLen = 0 is very important
  391. // for example, when upgrading, upgrade packet is sent over,
  392. // and a nonzero prevBufferLen could cause problems on `drain`
  393. this.prevBufferLen = 0;
  394. if (0 === this.writeBuffer.length) {
  395. this.emitReserved("drain");
  396. }
  397. else {
  398. this.flush();
  399. }
  400. }
  401. /**
  402. * Flush write buffers.
  403. *
  404. * @private
  405. */
  406. flush() {
  407. if ("closed" !== this.readyState &&
  408. this.transport.writable &&
  409. !this.upgrading &&
  410. this.writeBuffer.length) {
  411. const packets = this.getWritablePackets();
  412. this.transport.send(packets);
  413. // keep track of current length of writeBuffer
  414. // splice writeBuffer and callbackBuffer on `drain`
  415. this.prevBufferLen = packets.length;
  416. this.emitReserved("flush");
  417. }
  418. }
  419. /**
  420. * Ensure the encoded size of the writeBuffer is below the maxPayload value sent by the server (only for HTTP
  421. * long-polling)
  422. *
  423. * @private
  424. */
  425. getWritablePackets() {
  426. const shouldCheckPayloadSize = this.maxPayload &&
  427. this.transport.name === "polling" &&
  428. this.writeBuffer.length > 1;
  429. if (!shouldCheckPayloadSize) {
  430. return this.writeBuffer;
  431. }
  432. let payloadSize = 1; // first packet type
  433. for (let i = 0; i < this.writeBuffer.length; i++) {
  434. const data = this.writeBuffer[i].data;
  435. if (data) {
  436. payloadSize += byteLength(data);
  437. }
  438. if (i > 0 && payloadSize > this.maxPayload) {
  439. return this.writeBuffer.slice(0, i);
  440. }
  441. payloadSize += 2; // separator + packet type
  442. }
  443. return this.writeBuffer;
  444. }
  445. /**
  446. * Sends a message.
  447. *
  448. * @param {String} msg - message.
  449. * @param {Object} options.
  450. * @param {Function} callback function.
  451. * @return {Socket} for chaining.
  452. */
  453. write(msg, options, fn) {
  454. this.sendPacket("message", msg, options, fn);
  455. return this;
  456. }
  457. send(msg, options, fn) {
  458. this.sendPacket("message", msg, options, fn);
  459. return this;
  460. }
  461. /**
  462. * Sends a packet.
  463. *
  464. * @param {String} type: packet type.
  465. * @param {String} data.
  466. * @param {Object} options.
  467. * @param {Function} fn - callback function.
  468. * @private
  469. */
  470. sendPacket(type, data, options, fn) {
  471. if ("function" === typeof data) {
  472. fn = data;
  473. data = undefined;
  474. }
  475. if ("function" === typeof options) {
  476. fn = options;
  477. options = null;
  478. }
  479. if ("closing" === this.readyState || "closed" === this.readyState) {
  480. return;
  481. }
  482. options = options || {};
  483. options.compress = false !== options.compress;
  484. const packet = {
  485. type: type,
  486. data: data,
  487. options: options,
  488. };
  489. this.emitReserved("packetCreate", packet);
  490. this.writeBuffer.push(packet);
  491. if (fn)
  492. this.once("flush", fn);
  493. this.flush();
  494. }
  495. /**
  496. * Closes the connection.
  497. */
  498. close() {
  499. const close = () => {
  500. this.onClose("forced close");
  501. this.transport.close();
  502. };
  503. const cleanupAndClose = () => {
  504. this.off("upgrade", cleanupAndClose);
  505. this.off("upgradeError", cleanupAndClose);
  506. close();
  507. };
  508. const waitForUpgrade = () => {
  509. // wait for upgrade to finish since we can't send packets while pausing a transport
  510. this.once("upgrade", cleanupAndClose);
  511. this.once("upgradeError", cleanupAndClose);
  512. };
  513. if ("opening" === this.readyState || "open" === this.readyState) {
  514. this.readyState = "closing";
  515. if (this.writeBuffer.length) {
  516. this.once("drain", () => {
  517. if (this.upgrading) {
  518. waitForUpgrade();
  519. }
  520. else {
  521. close();
  522. }
  523. });
  524. }
  525. else if (this.upgrading) {
  526. waitForUpgrade();
  527. }
  528. else {
  529. close();
  530. }
  531. }
  532. return this;
  533. }
  534. /**
  535. * Called upon transport error
  536. *
  537. * @private
  538. */
  539. onError(err) {
  540. Socket.priorWebsocketSuccess = false;
  541. this.emitReserved("error", err);
  542. this.onClose("transport error", err);
  543. }
  544. /**
  545. * Called upon transport close.
  546. *
  547. * @private
  548. */
  549. onClose(reason, description) {
  550. if ("opening" === this.readyState ||
  551. "open" === this.readyState ||
  552. "closing" === this.readyState) {
  553. // clear timers
  554. this.clearTimeoutFn(this.pingTimeoutTimer);
  555. // stop event from firing again for transport
  556. this.transport.removeAllListeners("close");
  557. // ensure transport won't stay open
  558. this.transport.close();
  559. // ignore further transport communication
  560. this.transport.removeAllListeners();
  561. if (typeof removeEventListener === "function") {
  562. removeEventListener("beforeunload", this.beforeunloadEventListener, false);
  563. removeEventListener("offline", this.offlineEventListener, false);
  564. }
  565. // set ready state
  566. this.readyState = "closed";
  567. // clear session id
  568. this.id = null;
  569. // emit close event
  570. this.emitReserved("close", reason, description);
  571. // clean buffers after, so users can still
  572. // grab the buffers on `close` event
  573. this.writeBuffer = [];
  574. this.prevBufferLen = 0;
  575. }
  576. }
  577. /**
  578. * Filters upgrades, returning only those matching client transports.
  579. *
  580. * @param {Array} upgrades - server upgrades
  581. * @private
  582. */
  583. filterUpgrades(upgrades) {
  584. const filteredUpgrades = [];
  585. let i = 0;
  586. const j = upgrades.length;
  587. for (; i < j; i++) {
  588. if (~this.transports.indexOf(upgrades[i]))
  589. filteredUpgrades.push(upgrades[i]);
  590. }
  591. return filteredUpgrades;
  592. }
  593. }
  594. Socket.protocol = protocol;