in src/global_signal.js [29:138]
export async function createGlobalSignal(signal, startValue, signalName, guaranteeStateKey, updateState) {
const channel = Multipeer.getBinaryMessageChannel(signalName)
const myUuid = getUuid((await Participants.self).id)
let state = SparkAutomergeWrapper.init()
const allPeers = await Participants.getAllOtherParticipants()
if (allPeers.length === 0) {
signal.setReceivedAllValues(true)
}
const syncStates = {}
for (const peer of allPeers) {
const peerUuid = getUuid(peer.id)
SparkAutomergeWrapper.initPeerSyncState(syncStates, peerUuid)
peer.isActiveInSameEffect.monitor().subscribe(function (event) {
const isLatestActive = event.newVal
if (!isLatestActive) {
SparkAutomergeWrapper.initPeerSyncState(syncStates, peerUuid)
}
})
}
let delayTimer;
let lastDelayTime = null;
function hasNoChangesForNewPeers() {
const existingPeerUuids = new Set(allPeers.map(p => getUuid(p.id)))
for (const [peerUuid, peerSyncState] of Object.entries(syncStates)) {
if (!existingPeerUuids.has(peerUuid)
&& peerSyncState.allChangesSent !== ALL_CHANGES_SENT) { // new Peer awaiting for changes
return false
}
}
return true
}
channel.onMessage.subscribe((msg) => {
const [nextBackend, patch, ignoreMessage] = SparkAutomergeWrapper.processMessage(state, syncStates, myUuid, msg)
if (ignoreMessage) {
return
}
state = nextBackend
const newValue = SparkAutomergeWrapper.get(state, signalName)
if (patch) {
signal.setReceivedAllValues(false)
signal.compareAndUpdateLocal(newValue)
}
// Delay the message passing when the value is updated from the network, to
// prevent each node from sending too many messages at any one time.
// Otherwise, all ack and sync messages are duplicated and storm the network.
if (delayTimer) {
delayTimer.unsubscribe();
}
const currentTime = Date.now();
if (lastDelayTime === null) {
lastDelayTime = currentTime;
}
if (currentTime - lastDelayTime > LONG_DELAY) {
// Send update messages immediately if it has been delayed for more than 2 seconds
sendUpdateMessages(state, syncStates, myUuid, channel);
lastDelayTime = null;
} else {
// Schedule the update messages to be sent 100ms later.
// This is to reduce the number of update messages responding to other peers' sync messages
delayTimer = Time.setTimeout(() => {
sendUpdateMessages(state, syncStates, myUuid, channel);
lastDelayTime = null;
}, 100);
}
const receivedAllValues = Object.values(syncStates).some(peerState =>
(peerState.allChangesSent === ALL_CHANGES_SENT && peerState.sharedHeads.length > 0)
)
let noChangesForNewPeers = false;
if (newValue == null) {
noChangesForNewPeers = hasNoChangesForNewPeers()
}
Time.setTimeout(() => {
signal.setReceivedAllValues(receivedAllValues || noChangesForNewPeers)
}, 50)
})
signal.updateState = function (event) {
state = guaranteeStateKey(state, signalName, startValue)
state = updateState(state, signalName, event)
sendUpdateMessages(state, syncStates, myUuid, channel)
};
if (allPeers.length > 0) {
Time.setTimeout(() => {
const currentValue = SparkAutomergeWrapper.get(state, signalName)
if (lastDelayTime === null && currentValue == null) {
const noPreExistingValuesReceived = Object.values(syncStates).every(peerState =>
peerState.allChangesSent === null && peerState.sharedHeads.length === 0
)
const noChangesForNewPeers = hasNoChangesForNewPeers()
signal.setReceivedAllValues(noPreExistingValuesReceived || noChangesForNewPeers)
}
}, LONG_DELAY)
}
sendUpdateMessages(state, syncStates, myUuid, channel)
signal.getName = () => signalName
}