export async function createGlobalSignal()

in src/global_signal.js [29:138]


export async function createGlobalSignal(signal, startValue, signalName, guaranteeStateKey, updateState) {
  const channel = Multipeer.getBinaryMessageChannel(signalName)
  const myUuid = getUuid((await Participants.self).id)
  let state = SparkAutomergeWrapper.init()

  const allPeers = await Participants.getAllOtherParticipants()
  if (allPeers.length === 0) {
    signal.setReceivedAllValues(true)
  }
  const syncStates = {}

  for (const peer of allPeers) {
    const peerUuid = getUuid(peer.id)
    SparkAutomergeWrapper.initPeerSyncState(syncStates, peerUuid)
    peer.isActiveInSameEffect.monitor().subscribe(function (event) {
      const isLatestActive = event.newVal
      if (!isLatestActive) {
        SparkAutomergeWrapper.initPeerSyncState(syncStates, peerUuid)
      }
    })
  }

  let delayTimer;
  let lastDelayTime = null;

  function hasNoChangesForNewPeers() {
    const existingPeerUuids = new Set(allPeers.map(p => getUuid(p.id)))
    for (const [peerUuid, peerSyncState] of Object.entries(syncStates)) {
      if (!existingPeerUuids.has(peerUuid)
        && peerSyncState.allChangesSent !== ALL_CHANGES_SENT) { // new Peer awaiting for changes
        return false
      }
    }
    return true
  }

  channel.onMessage.subscribe((msg) => {
    const [nextBackend, patch, ignoreMessage] = SparkAutomergeWrapper.processMessage(state, syncStates, myUuid, msg)
    if (ignoreMessage) {
      return
    }
    state = nextBackend
    const newValue = SparkAutomergeWrapper.get(state, signalName)

    if (patch) {
      signal.setReceivedAllValues(false)
      signal.compareAndUpdateLocal(newValue)
    }

    // Delay the message passing when the value is updated from the network, to
    // prevent each node from sending too many messages at any one time.
    // Otherwise, all ack and sync messages are duplicated and storm the network.
    if (delayTimer) {
      delayTimer.unsubscribe();
    }

    const currentTime = Date.now();
    if (lastDelayTime === null) {
      lastDelayTime = currentTime;
    }

    if (currentTime - lastDelayTime > LONG_DELAY) {
      // Send update messages immediately if it has been delayed for more than 2 seconds
      sendUpdateMessages(state, syncStates, myUuid, channel);
      lastDelayTime = null;
    } else {
      // Schedule the update messages to be sent 100ms later.
      // This is to reduce the number of update messages responding to other peers' sync messages
      delayTimer = Time.setTimeout(() => {
        sendUpdateMessages(state, syncStates, myUuid, channel);
        lastDelayTime = null;
      }, 100);
    }

    const receivedAllValues = Object.values(syncStates).some(peerState =>
      (peerState.allChangesSent === ALL_CHANGES_SENT && peerState.sharedHeads.length > 0)
    )

    let noChangesForNewPeers = false;
    if (newValue == null) {
      noChangesForNewPeers = hasNoChangesForNewPeers()
    }
    Time.setTimeout(() => {
      signal.setReceivedAllValues(receivedAllValues || noChangesForNewPeers)
    }, 50)
  })

  signal.updateState = function (event) {
    state = guaranteeStateKey(state, signalName, startValue)
    state = updateState(state, signalName, event)
    sendUpdateMessages(state, syncStates, myUuid, channel)
  };

  if (allPeers.length > 0) {
    Time.setTimeout(() => {
      const currentValue = SparkAutomergeWrapper.get(state, signalName)
      if (lastDelayTime === null && currentValue == null) {
        const noPreExistingValuesReceived = Object.values(syncStates).every(peerState =>
          peerState.allChangesSent === null && peerState.sharedHeads.length === 0
        )
        const noChangesForNewPeers = hasNoChangesForNewPeers()
        signal.setReceivedAllValues(noPreExistingValuesReceived || noChangesForNewPeers)
      }
    }, LONG_DELAY)
  }

  sendUpdateMessages(state, syncStates, myUuid, channel)

  signal.getName = () => signalName
}