Co-authored-by: tobi <tobi.smethurst@protonmail.com> Reviewed-on: https://codeberg.org/superseriousbusiness/masto-fe-standalone/pulls/88 Co-authored-by: Zoë Bijl <moiety@noreply.codeberg.org> Co-committed-by: Zoë Bijl <moiety@noreply.codeberg.org>
276 lines
6.9 KiB
JavaScript
276 lines
6.9 KiB
JavaScript
// @ts-check
|
|
|
|
import WebSocketClient from "@gamestdio/websocket";
|
|
|
|
/**
|
|
* @type {WebSocketClient | undefined}
|
|
*/
|
|
let sharedConnection;
|
|
|
|
/**
|
|
* @typedef Subscription
|
|
* @property {string} channelName
|
|
* @property {Object.<string, string>} params
|
|
* @property {function(): void} onConnect
|
|
* @property {function(StreamEvent): void} onReceive
|
|
* @property {function(): void} onDisconnect
|
|
*/
|
|
|
|
/**
|
|
* @typedef StreamEvent
|
|
* @property {string} event
|
|
* @property {object} payload
|
|
*/
|
|
|
|
/**
|
|
* @type {Array.<Subscription>}
|
|
*/
|
|
const subscriptions = [];
|
|
|
|
/**
|
|
* @type {Object.<string, number>}
|
|
*/
|
|
const subscriptionCounters = {};
|
|
|
|
/**
|
|
* @param {Subscription} subscription
|
|
*/
|
|
const addSubscription = subscription => {
|
|
subscriptions.push(subscription);
|
|
};
|
|
|
|
/**
|
|
* @param {Subscription} subscription
|
|
*/
|
|
const removeSubscription = subscription => {
|
|
const index = subscriptions.indexOf(subscription);
|
|
|
|
if (index !== -1) {
|
|
subscriptions.splice(index, 1);
|
|
}
|
|
};
|
|
|
|
/**
|
|
* @param {Subscription} subscription
|
|
*/
|
|
const subscribe = ({ channelName, params, onConnect }) => {
|
|
const key = channelNameWithInlineParams(channelName, params);
|
|
|
|
subscriptionCounters[key] = subscriptionCounters[key] || 0;
|
|
|
|
if (subscriptionCounters[key] === 0) {
|
|
// @ts-expect-error
|
|
sharedConnection.send(JSON.stringify({ type: "subscribe", stream: channelName, ...params }));
|
|
}
|
|
|
|
subscriptionCounters[key] += 1;
|
|
onConnect();
|
|
};
|
|
|
|
/**
|
|
* @param {Subscription} subscription
|
|
*/
|
|
const unsubscribe = ({ channelName, params, onDisconnect }) => {
|
|
const key = channelNameWithInlineParams(channelName, params);
|
|
|
|
subscriptionCounters[key] = subscriptionCounters[key] || 1;
|
|
|
|
// @ts-expect-error
|
|
if (subscriptionCounters[key] === 1 && sharedConnection.readyState === WebSocketClient.OPEN) {
|
|
// @ts-expect-error
|
|
sharedConnection.send(JSON.stringify({ type: "unsubscribe", stream: channelName, ...params }));
|
|
}
|
|
|
|
subscriptionCounters[key] -= 1;
|
|
onDisconnect();
|
|
};
|
|
|
|
const sharedCallbacks = {
|
|
connected() {
|
|
subscriptions.forEach(subscription => subscribe(subscription));
|
|
},
|
|
|
|
// @ts-expect-error
|
|
received(data) {
|
|
const { stream } = data;
|
|
|
|
subscriptions.filter(({ channelName, params }) => {
|
|
const streamChannelName = stream[0];
|
|
|
|
if (stream.length === 1) {
|
|
return channelName === streamChannelName;
|
|
}
|
|
|
|
const streamIdentifier = stream[1];
|
|
|
|
if (["hashtag", "hashtag:local"].includes(channelName)) {
|
|
return channelName === streamChannelName && params.tag === streamIdentifier;
|
|
} else if (channelName === "list") {
|
|
return channelName === streamChannelName && params.list === streamIdentifier;
|
|
}
|
|
|
|
return false;
|
|
}).forEach(subscription => {
|
|
subscription.onReceive(data);
|
|
});
|
|
},
|
|
|
|
disconnected() {
|
|
subscriptions.forEach(subscription => unsubscribe(subscription));
|
|
},
|
|
|
|
reconnected() {
|
|
},
|
|
};
|
|
|
|
/**
|
|
* @param {string} channelName
|
|
* @param {Object.<string, string>} params
|
|
* @returns {string}
|
|
*/
|
|
const channelNameWithInlineParams = (channelName, params) => {
|
|
if (Object.keys(params).length === 0) {
|
|
return channelName;
|
|
}
|
|
|
|
return `${channelName}&${Object.keys(params).map(key => `${key}=${params[key]}`).join("&")}`;
|
|
};
|
|
|
|
/**
|
|
* @param {string} channelName
|
|
* @param {Object.<string, string>} params
|
|
* @param {function(Function, Function): { onConnect: (function(): void), onReceive: (function(StreamEvent): void), onDisconnect: (function(): void) }} callbacks
|
|
* @returns {function(): void}
|
|
*/
|
|
// @ts-expect-error
|
|
export const connectStream = (channelName, params, callbacks) => (dispatch, getState) => {
|
|
const streamingAPIBaseURL = getState().getIn(["meta", "streaming_api_base_url"]);
|
|
const accessToken = getState().getIn(["meta", "access_token"]);
|
|
const { onConnect, onReceive, onDisconnect } = callbacks(dispatch, getState);
|
|
|
|
// If we cannot use a websockets connection, we must fall back
|
|
// to using individual connections for each channel
|
|
if (!streamingAPIBaseURL.startsWith("ws")) {
|
|
const connection = createConnection(streamingAPIBaseURL, accessToken, channelNameWithInlineParams(channelName, params), {
|
|
connected() {
|
|
onConnect();
|
|
},
|
|
|
|
received(data) {
|
|
onReceive(data);
|
|
},
|
|
|
|
disconnected() {
|
|
onDisconnect();
|
|
},
|
|
|
|
reconnected() {
|
|
onConnect();
|
|
},
|
|
});
|
|
|
|
return () => {
|
|
connection.close();
|
|
};
|
|
}
|
|
|
|
const subscription = {
|
|
channelName,
|
|
params,
|
|
onConnect,
|
|
onReceive,
|
|
onDisconnect,
|
|
};
|
|
|
|
addSubscription(subscription);
|
|
|
|
// If a connection is open, we can execute the subscription right now. Otherwise,
|
|
// because we have already registered it, it will be executed on connect
|
|
|
|
if (!sharedConnection) {
|
|
sharedConnection = /** @type {WebSocketClient} */ (createConnection(streamingAPIBaseURL, accessToken, "", sharedCallbacks));
|
|
} else if (sharedConnection.readyState === WebSocketClient.OPEN) {
|
|
subscribe(subscription);
|
|
}
|
|
|
|
return () => {
|
|
removeSubscription(subscription);
|
|
unsubscribe(subscription);
|
|
};
|
|
};
|
|
|
|
const KNOWN_EVENT_TYPES = [
|
|
"update",
|
|
"delete",
|
|
"notification",
|
|
"conversation",
|
|
"filters_changed",
|
|
"encrypted_message",
|
|
"announcement",
|
|
"announcement.delete",
|
|
"announcement.reaction",
|
|
];
|
|
|
|
/**
|
|
* @param {MessageEvent} e
|
|
* @param {function(StreamEvent): void} received
|
|
*/
|
|
const handleEventSourceMessage = (e, received) => {
|
|
received({
|
|
event: e.type,
|
|
payload: e.data,
|
|
});
|
|
};
|
|
|
|
/**
|
|
* @param {string} streamingAPIBaseURL
|
|
* @param {string} accessToken
|
|
* @param {string} channelName
|
|
* @param {{ connected: Function, received: function(StreamEvent): void, disconnected: Function, reconnected: Function }} callbacks
|
|
* @returns {WebSocketClient | EventSource}
|
|
*/
|
|
const createConnection = (streamingAPIBaseURL, accessToken, channelName, { connected, received, disconnected, reconnected }) => {
|
|
const params = channelName.split("&");
|
|
|
|
// @ts-expect-error
|
|
channelName = params.shift();
|
|
|
|
if (streamingAPIBaseURL.startsWith("ws")) {
|
|
// @ts-expect-error
|
|
const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join("&")}`, accessToken);
|
|
|
|
// @ts-expect-error
|
|
ws.onopen = connected;
|
|
ws.onmessage = e => received(JSON.parse(e.data));
|
|
// @ts-expect-error
|
|
ws.onclose = disconnected;
|
|
// @ts-expect-error
|
|
ws.onreconnect = reconnected;
|
|
|
|
return ws;
|
|
}
|
|
|
|
channelName = channelName.replace(/:/g, "/");
|
|
|
|
if (channelName.endsWith(":media")) {
|
|
channelName = channelName.replace("/media", "");
|
|
params.push("only_media=true");
|
|
}
|
|
|
|
params.push(`access_token=${accessToken}`);
|
|
|
|
const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${channelName}?${params.join("&")}`);
|
|
|
|
es.onopen = () => {
|
|
connected();
|
|
};
|
|
|
|
KNOWN_EVENT_TYPES.forEach(type => {
|
|
es.addEventListener(type, e => handleEventSourceMessage(/** @type {MessageEvent} */(e), received));
|
|
});
|
|
|
|
es.onerror = /** @type {function(): void} */ (disconnected);
|
|
|
|
return es;
|
|
};
|