123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594 |
- import { transports } from "./transports/index.js";
- import { installTimerFunctions, byteLength } from "./util.js";
- import { decode } from "./contrib/parseqs.js";
- import { parse } from "./contrib/parseuri.js";
- import { Emitter } from "@socket.io/component-emitter";
- import { protocol } from "engine.io-parser";
- import { defaultBinaryType } from "./transports/websocket-constructor.js";
- export class Socket extends Emitter {
- /**
- * Socket constructor.
- *
- * @param {String|Object} uri - uri or options
- * @param {Object} opts - options
- */
- constructor(uri, opts = {}) {
- super();
- this.binaryType = defaultBinaryType;
- this.writeBuffer = [];
- if (uri && "object" === typeof uri) {
- opts = uri;
- uri = null;
- }
- if (uri) {
- uri = parse(uri);
- opts.hostname = uri.host;
- opts.secure = uri.protocol === "https" || uri.protocol === "wss";
- opts.port = uri.port;
- if (uri.query)
- opts.query = uri.query;
- }
- else if (opts.host) {
- opts.hostname = parse(opts.host).host;
- }
- installTimerFunctions(this, opts);
- this.secure =
- null != opts.secure
- ? opts.secure
- : typeof location !== "undefined" && "https:" === location.protocol;
- if (opts.hostname && !opts.port) {
- // if no port is specified manually, use the protocol default
- opts.port = this.secure ? "443" : "80";
- }
- this.hostname =
- opts.hostname ||
- (typeof location !== "undefined" ? location.hostname : "localhost");
- this.port =
- opts.port ||
- (typeof location !== "undefined" && location.port
- ? location.port
- : this.secure
- ? "443"
- : "80");
- this.transports = opts.transports || [
- "polling",
- "websocket",
- "webtransport",
- ];
- this.writeBuffer = [];
- this.prevBufferLen = 0;
- this.opts = Object.assign({
- path: "/engine.io",
- agent: false,
- withCredentials: false,
- upgrade: true,
- timestampParam: "t",
- rememberUpgrade: false,
- addTrailingSlash: true,
- rejectUnauthorized: true,
- perMessageDeflate: {
- threshold: 1024,
- },
- transportOptions: {},
- closeOnBeforeunload: false,
- }, opts);
- this.opts.path =
- this.opts.path.replace(/\/$/, "") +
- (this.opts.addTrailingSlash ? "/" : "");
- if (typeof this.opts.query === "string") {
- this.opts.query = decode(this.opts.query);
- }
- // set on handshake
- this.id = null;
- this.upgrades = null;
- this.pingInterval = null;
- this.pingTimeout = null;
- // set on heartbeat
- this.pingTimeoutTimer = null;
- if (typeof addEventListener === "function") {
- if (this.opts.closeOnBeforeunload) {
- // Firefox closes the connection when the "beforeunload" event is emitted but not Chrome. This event listener
- // ensures every browser behaves the same (no "disconnect" event at the Socket.IO level when the page is
- // closed/reloaded)
- this.beforeunloadEventListener = () => {
- if (this.transport) {
- // silently close the transport
- this.transport.removeAllListeners();
- this.transport.close();
- }
- };
- addEventListener("beforeunload", this.beforeunloadEventListener, false);
- }
- if (this.hostname !== "localhost") {
- this.offlineEventListener = () => {
- this.onClose("transport close", {
- description: "network connection lost",
- });
- };
- addEventListener("offline", this.offlineEventListener, false);
- }
- }
- this.open();
- }
- /**
- * Creates transport of the given type.
- *
- * @param {String} name - transport name
- * @return {Transport}
- * @private
- */
- createTransport(name) {
- const query = Object.assign({}, this.opts.query);
- // append engine.io protocol identifier
- query.EIO = protocol;
- // transport name
- query.transport = name;
- // session id if we already have one
- if (this.id)
- query.sid = this.id;
- const opts = Object.assign({}, this.opts, {
- query,
- socket: this,
- hostname: this.hostname,
- secure: this.secure,
- port: this.port,
- }, this.opts.transportOptions[name]);
- return new transports[name](opts);
- }
- /**
- * Initializes transport to use and starts probe.
- *
- * @private
- */
- open() {
- let transport;
- if (this.opts.rememberUpgrade &&
- Socket.priorWebsocketSuccess &&
- this.transports.indexOf("websocket") !== -1) {
- transport = "websocket";
- }
- else if (0 === this.transports.length) {
- // Emit error on next tick so it can be listened to
- this.setTimeoutFn(() => {
- this.emitReserved("error", "No transports available");
- }, 0);
- return;
- }
- else {
- transport = this.transports[0];
- }
- this.readyState = "opening";
- // Retry with the next transport if the transport is disabled (jsonp: false)
- try {
- transport = this.createTransport(transport);
- }
- catch (e) {
- this.transports.shift();
- this.open();
- return;
- }
- transport.open();
- this.setTransport(transport);
- }
- /**
- * Sets the current transport. Disables the existing one (if any).
- *
- * @private
- */
- setTransport(transport) {
- if (this.transport) {
- this.transport.removeAllListeners();
- }
- // set up transport
- this.transport = transport;
- // set up transport listeners
- transport
- .on("drain", this.onDrain.bind(this))
- .on("packet", this.onPacket.bind(this))
- .on("error", this.onError.bind(this))
- .on("close", (reason) => this.onClose("transport close", reason));
- }
- /**
- * Probes a transport.
- *
- * @param {String} name - transport name
- * @private
- */
- probe(name) {
- let transport = this.createTransport(name);
- let failed = false;
- Socket.priorWebsocketSuccess = false;
- const onTransportOpen = () => {
- if (failed)
- return;
- transport.send([{ type: "ping", data: "probe" }]);
- transport.once("packet", (msg) => {
- if (failed)
- return;
- if ("pong" === msg.type && "probe" === msg.data) {
- this.upgrading = true;
- this.emitReserved("upgrading", transport);
- if (!transport)
- return;
- Socket.priorWebsocketSuccess = "websocket" === transport.name;
- this.transport.pause(() => {
- if (failed)
- return;
- if ("closed" === this.readyState)
- return;
- cleanup();
- this.setTransport(transport);
- transport.send([{ type: "upgrade" }]);
- this.emitReserved("upgrade", transport);
- transport = null;
- this.upgrading = false;
- this.flush();
- });
- }
- else {
- const err = new Error("probe error");
- // @ts-ignore
- err.transport = transport.name;
- this.emitReserved("upgradeError", err);
- }
- });
- };
- function freezeTransport() {
- if (failed)
- return;
- // Any callback called by transport should be ignored since now
- failed = true;
- cleanup();
- transport.close();
- transport = null;
- }
- // Handle any error that happens while probing
- const onerror = (err) => {
- const error = new Error("probe error: " + err);
- // @ts-ignore
- error.transport = transport.name;
- freezeTransport();
- this.emitReserved("upgradeError", error);
- };
- function onTransportClose() {
- onerror("transport closed");
- }
- // When the socket is closed while we're probing
- function onclose() {
- onerror("socket closed");
- }
- // When the socket is upgraded while we're probing
- function onupgrade(to) {
- if (transport && to.name !== transport.name) {
- freezeTransport();
- }
- }
- // Remove all listeners on the transport and on self
- const cleanup = () => {
- transport.removeListener("open", onTransportOpen);
- transport.removeListener("error", onerror);
- transport.removeListener("close", onTransportClose);
- this.off("close", onclose);
- this.off("upgrading", onupgrade);
- };
- transport.once("open", onTransportOpen);
- transport.once("error", onerror);
- transport.once("close", onTransportClose);
- this.once("close", onclose);
- this.once("upgrading", onupgrade);
- if (this.upgrades.indexOf("webtransport") !== -1 &&
- name !== "webtransport") {
- // favor WebTransport
- this.setTimeoutFn(() => {
- if (!failed) {
- transport.open();
- }
- }, 200);
- }
- else {
- transport.open();
- }
- }
- /**
- * Called when connection is deemed open.
- *
- * @private
- */
- onOpen() {
- this.readyState = "open";
- Socket.priorWebsocketSuccess = "websocket" === this.transport.name;
- this.emitReserved("open");
- this.flush();
- // we check for `readyState` in case an `open`
- // listener already closed the socket
- if ("open" === this.readyState && this.opts.upgrade) {
- let i = 0;
- const l = this.upgrades.length;
- for (; i < l; i++) {
- this.probe(this.upgrades[i]);
- }
- }
- }
- /**
- * Handles a packet.
- *
- * @private
- */
- onPacket(packet) {
- if ("opening" === this.readyState ||
- "open" === this.readyState ||
- "closing" === this.readyState) {
- this.emitReserved("packet", packet);
- // Socket is live - any packet counts
- this.emitReserved("heartbeat");
- this.resetPingTimeout();
- switch (packet.type) {
- case "open":
- this.onHandshake(JSON.parse(packet.data));
- break;
- case "ping":
- this.sendPacket("pong");
- this.emitReserved("ping");
- this.emitReserved("pong");
- break;
- case "error":
- const err = new Error("server error");
- // @ts-ignore
- err.code = packet.data;
- this.onError(err);
- break;
- case "message":
- this.emitReserved("data", packet.data);
- this.emitReserved("message", packet.data);
- break;
- }
- }
- else {
- }
- }
- /**
- * Called upon handshake completion.
- *
- * @param {Object} data - handshake obj
- * @private
- */
- onHandshake(data) {
- this.emitReserved("handshake", data);
- this.id = data.sid;
- this.transport.query.sid = data.sid;
- this.upgrades = this.filterUpgrades(data.upgrades);
- this.pingInterval = data.pingInterval;
- this.pingTimeout = data.pingTimeout;
- this.maxPayload = data.maxPayload;
- this.onOpen();
- // In case open handler closes socket
- if ("closed" === this.readyState)
- return;
- this.resetPingTimeout();
- }
- /**
- * Sets and resets ping timeout timer based on server pings.
- *
- * @private
- */
- resetPingTimeout() {
- this.clearTimeoutFn(this.pingTimeoutTimer);
- this.pingTimeoutTimer = this.setTimeoutFn(() => {
- this.onClose("ping timeout");
- }, this.pingInterval + this.pingTimeout);
- if (this.opts.autoUnref) {
- this.pingTimeoutTimer.unref();
- }
- }
- /**
- * Called on `drain` event
- *
- * @private
- */
- onDrain() {
- this.writeBuffer.splice(0, this.prevBufferLen);
- // setting prevBufferLen = 0 is very important
- // for example, when upgrading, upgrade packet is sent over,
- // and a nonzero prevBufferLen could cause problems on `drain`
- this.prevBufferLen = 0;
- if (0 === this.writeBuffer.length) {
- this.emitReserved("drain");
- }
- else {
- this.flush();
- }
- }
- /**
- * Flush write buffers.
- *
- * @private
- */
- flush() {
- if ("closed" !== this.readyState &&
- this.transport.writable &&
- !this.upgrading &&
- this.writeBuffer.length) {
- const packets = this.getWritablePackets();
- this.transport.send(packets);
- // keep track of current length of writeBuffer
- // splice writeBuffer and callbackBuffer on `drain`
- this.prevBufferLen = packets.length;
- this.emitReserved("flush");
- }
- }
- /**
- * Ensure the encoded size of the writeBuffer is below the maxPayload value sent by the server (only for HTTP
- * long-polling)
- *
- * @private
- */
- getWritablePackets() {
- const shouldCheckPayloadSize = this.maxPayload &&
- this.transport.name === "polling" &&
- this.writeBuffer.length > 1;
- if (!shouldCheckPayloadSize) {
- return this.writeBuffer;
- }
- let payloadSize = 1; // first packet type
- for (let i = 0; i < this.writeBuffer.length; i++) {
- const data = this.writeBuffer[i].data;
- if (data) {
- payloadSize += byteLength(data);
- }
- if (i > 0 && payloadSize > this.maxPayload) {
- return this.writeBuffer.slice(0, i);
- }
- payloadSize += 2; // separator + packet type
- }
- return this.writeBuffer;
- }
- /**
- * Sends a message.
- *
- * @param {String} msg - message.
- * @param {Object} options.
- * @param {Function} callback function.
- * @return {Socket} for chaining.
- */
- write(msg, options, fn) {
- this.sendPacket("message", msg, options, fn);
- return this;
- }
- send(msg, options, fn) {
- this.sendPacket("message", msg, options, fn);
- return this;
- }
- /**
- * Sends a packet.
- *
- * @param {String} type: packet type.
- * @param {String} data.
- * @param {Object} options.
- * @param {Function} fn - callback function.
- * @private
- */
- sendPacket(type, data, options, fn) {
- if ("function" === typeof data) {
- fn = data;
- data = undefined;
- }
- if ("function" === typeof options) {
- fn = options;
- options = null;
- }
- if ("closing" === this.readyState || "closed" === this.readyState) {
- return;
- }
- options = options || {};
- options.compress = false !== options.compress;
- const packet = {
- type: type,
- data: data,
- options: options,
- };
- this.emitReserved("packetCreate", packet);
- this.writeBuffer.push(packet);
- if (fn)
- this.once("flush", fn);
- this.flush();
- }
- /**
- * Closes the connection.
- */
- close() {
- const close = () => {
- this.onClose("forced close");
- this.transport.close();
- };
- const cleanupAndClose = () => {
- this.off("upgrade", cleanupAndClose);
- this.off("upgradeError", cleanupAndClose);
- close();
- };
- const waitForUpgrade = () => {
- // wait for upgrade to finish since we can't send packets while pausing a transport
- this.once("upgrade", cleanupAndClose);
- this.once("upgradeError", cleanupAndClose);
- };
- if ("opening" === this.readyState || "open" === this.readyState) {
- this.readyState = "closing";
- if (this.writeBuffer.length) {
- this.once("drain", () => {
- if (this.upgrading) {
- waitForUpgrade();
- }
- else {
- close();
- }
- });
- }
- else if (this.upgrading) {
- waitForUpgrade();
- }
- else {
- close();
- }
- }
- return this;
- }
- /**
- * Called upon transport error
- *
- * @private
- */
- onError(err) {
- Socket.priorWebsocketSuccess = false;
- this.emitReserved("error", err);
- this.onClose("transport error", err);
- }
- /**
- * Called upon transport close.
- *
- * @private
- */
- onClose(reason, description) {
- if ("opening" === this.readyState ||
- "open" === this.readyState ||
- "closing" === this.readyState) {
- // clear timers
- this.clearTimeoutFn(this.pingTimeoutTimer);
- // stop event from firing again for transport
- this.transport.removeAllListeners("close");
- // ensure transport won't stay open
- this.transport.close();
- // ignore further transport communication
- this.transport.removeAllListeners();
- if (typeof removeEventListener === "function") {
- removeEventListener("beforeunload", this.beforeunloadEventListener, false);
- removeEventListener("offline", this.offlineEventListener, false);
- }
- // set ready state
- this.readyState = "closed";
- // clear session id
- this.id = null;
- // emit close event
- this.emitReserved("close", reason, description);
- // clean buffers after, so users can still
- // grab the buffers on `close` event
- this.writeBuffer = [];
- this.prevBufferLen = 0;
- }
- }
- /**
- * Filters upgrades, returning only those matching client transports.
- *
- * @param {Array} upgrades - server upgrades
- * @private
- */
- filterUpgrades(upgrades) {
- const filteredUpgrades = [];
- let i = 0;
- const j = upgrades.length;
- for (; i < j; i++) {
- if (~this.transports.indexOf(upgrades[i]))
- filteredUpgrades.push(upgrades[i]);
- }
- return filteredUpgrades;
- }
- }
- Socket.protocol = protocol;
|