in chat/protocols/matrix/lib/matrix-sdk/sync.js [893:1305]
async processSyncResponse(syncEventData, data) {
const client = this.client;
// data looks like:
// {
// next_batch: $token,
// presence: { events: [] },
// account_data: { events: [] },
// device_lists: { changed: ["@user:server", ... ]},
// to_device: { events: [] },
// device_one_time_keys_count: { signed_curve25519: 42 },
// rooms: {
// invite: {
// $roomid: {
// invite_state: { events: [] }
// }
// },
// join: {
// $roomid: {
// state: { events: [] },
// timeline: { events: [], prev_batch: $token, limited: true },
// ephemeral: { events: [] },
// summary: {
// m.heroes: [ $user_id ],
// m.joined_member_count: $count,
// m.invited_member_count: $count
// },
// account_data: { events: [] },
// unread_notifications: {
// highlight_count: 0,
// notification_count: 0,
// }
// }
// },
// leave: {
// $roomid: {
// state: { events: [] },
// timeline: { events: [], prev_batch: $token }
// }
// }
// }
// }
// TODO-arch:
// - Each event we pass through needs to be emitted via 'event', can we
// do this in one place?
// - The isBrandNewRoom boilerplate is boilerplatey.
// handle presence events (User objects)
if (Array.isArray(data.presence?.events)) {
data.presence.events.filter(_utils.noUnsafeEventProps).map(client.getEventMapper()).forEach(function (presenceEvent) {
let user = client.store.getUser(presenceEvent.getSender());
if (user) {
user.setPresenceEvent(presenceEvent);
} else {
user = _user.User.createUser(presenceEvent.getSender(), client);
user.setPresenceEvent(presenceEvent);
client.store.storeUser(user);
}
client.emit(_client.ClientEvent.Event, presenceEvent);
});
}
// handle non-room account_data
if (Array.isArray(data.account_data?.events)) {
const events = data.account_data.events.filter(_utils.noUnsafeEventProps).map(client.getEventMapper());
const prevEventsMap = events.reduce((m, c) => {
m[c.getType()] = client.store.getAccountData(c.getType());
return m;
}, {});
client.store.storeAccountDataEvents(events);
events.forEach(function (accountDataEvent) {
// Honour push rules that come down the sync stream but also
// honour push rules that were previously cached. Base rules
// will be updated when we receive push rules via getPushRules
// (see sync) before syncing over the network.
if (accountDataEvent.getType() === _event.EventType.PushRules) {
const rules = accountDataEvent.getContent();
client.setPushRules(rules);
}
const prevEvent = prevEventsMap[accountDataEvent.getType()];
client.emit(_client.ClientEvent.AccountData, accountDataEvent, prevEvent);
return accountDataEvent;
});
}
// handle to-device events
if (data.to_device && Array.isArray(data.to_device.events) && data.to_device.events.length > 0) {
let toDeviceMessages = data.to_device.events.filter(_utils.noUnsafeEventProps);
if (this.syncOpts.cryptoCallbacks) {
toDeviceMessages = await this.syncOpts.cryptoCallbacks.preprocessToDeviceMessages(toDeviceMessages);
}
const cancelledKeyVerificationTxns = [];
toDeviceMessages.map(client.getEventMapper({
toDevice: true
})).map(toDeviceEvent => {
// map is a cheap inline forEach
// We want to flag m.key.verification.start events as cancelled
// if there's an accompanying m.key.verification.cancel event, so
// we pull out the transaction IDs from the cancellation events
// so we can flag the verification events as cancelled in the loop
// below.
if (toDeviceEvent.getType() === "m.key.verification.cancel") {
const txnId = toDeviceEvent.getContent()["transaction_id"];
if (txnId) {
cancelledKeyVerificationTxns.push(txnId);
}
}
// as mentioned above, .map is a cheap inline forEach, so return
// the unmodified event.
return toDeviceEvent;
}).forEach(function (toDeviceEvent) {
const content = toDeviceEvent.getContent();
if (toDeviceEvent.getType() == "m.room.message" && content.msgtype == "m.bad.encrypted") {
// the mapper already logged a warning.
_logger.logger.log("Ignoring undecryptable to-device event from " + toDeviceEvent.getSender());
return;
}
if (toDeviceEvent.getType() === "m.key.verification.start" || toDeviceEvent.getType() === "m.key.verification.request") {
const txnId = content["transaction_id"];
if (cancelledKeyVerificationTxns.includes(txnId)) {
toDeviceEvent.flagCancelled();
}
}
client.emit(_client.ClientEvent.ToDeviceEvent, toDeviceEvent);
});
} else {
// no more to-device events: we can stop polling with a short timeout.
this.catchingUp = false;
}
// the returned json structure is a bit crap, so make it into a
// nicer form (array) after applying sanity to make sure we don't fail
// on missing keys (on the off chance)
let inviteRooms = [];
let joinRooms = [];
let leaveRooms = [];
let knockRooms = [];
if (data.rooms) {
if (data.rooms.invite) {
inviteRooms = this.mapSyncResponseToRoomArray(data.rooms.invite);
}
if (data.rooms.join) {
joinRooms = this.mapSyncResponseToRoomArray(data.rooms.join);
}
if (data.rooms.leave) {
leaveRooms = this.mapSyncResponseToRoomArray(data.rooms.leave);
}
if (data.rooms.knock) {
knockRooms = this.mapSyncResponseToRoomArray(data.rooms.knock);
}
}
this.notifEvents = [];
// Handle invites
await (0, _utils.promiseMapSeries)(inviteRooms, async inviteObj => {
const room = inviteObj.room;
const stateEvents = this.mapSyncEventsFormat(inviteObj.invite_state, room);
await this.injectRoomEvents(room, stateEvents);
const inviter = room.currentState.getStateEvents(_event.EventType.RoomMember, client.getUserId())?.getSender();
const crypto = client.crypto;
if (crypto) {
const parkedHistory = await crypto.cryptoStore.takeParkedSharedHistory(room.roomId);
for (const parked of parkedHistory) {
if (parked.senderId === inviter) {
await crypto.olmDevice.addInboundGroupSession(room.roomId, parked.senderKey, parked.forwardingCurve25519KeyChain, parked.sessionId, parked.sessionKey, parked.keysClaimed, true, {
sharedHistory: true,
untrusted: true
});
}
}
}
if (inviteObj.isBrandNewRoom) {
room.recalculate();
client.store.storeRoom(room);
client.emit(_client.ClientEvent.Room, room);
} else {
// Update room state for invite->reject->invite cycles
room.recalculate();
}
stateEvents.forEach(function (e) {
client.emit(_client.ClientEvent.Event, e);
});
});
// Handle joins
await (0, _utils.promiseMapSeries)(joinRooms, async joinObj => {
const room = joinObj.room;
const stateEvents = this.mapSyncEventsFormat(joinObj.state, room);
// Prevent events from being decrypted ahead of time
// this helps large account to speed up faster
// room::decryptCriticalEvent is in charge of decrypting all the events
// required for a client to function properly
const events = this.mapSyncEventsFormat(joinObj.timeline, room, false);
const ephemeralEvents = this.mapSyncEventsFormat(joinObj.ephemeral);
const accountDataEvents = this.mapSyncEventsFormat(joinObj.account_data);
const encrypted = this.isRoomEncrypted(room, stateEvents, events);
// We store the server-provided value first so it's correct when any of the events fire.
if (joinObj.unread_notifications) {
/**
* We track unread notifications ourselves in encrypted rooms, so don't
* bother setting it here. We trust our calculations better than the
* server's for this case, and therefore will assume that our non-zero
* count is accurate.
* XXX: this is known faulty as the push rule for `.m.room.encrypted` may be disabled so server
* may issue notification counts of 0 which we wrongly trust.
* https://github.com/matrix-org/matrix-spec-proposals/pull/2654 would fix this
*
* @see import("./client").fixNotificationCountOnDecryption
*/
if (!encrypted || joinObj.unread_notifications.notification_count === 0) {
// In an encrypted room, if the room has notifications enabled then it's typical for
// the server to flag all new messages as notifying. However, some push rules calculate
// events as ignored based on their event contents (e.g. ignoring msgtype=m.notice messages)
// so we want to calculate this figure on the client in all cases.
room.setUnreadNotificationCount(_room.NotificationCountType.Total, joinObj.unread_notifications.notification_count ?? 0);
}
if (!encrypted || room.getUnreadNotificationCount(_room.NotificationCountType.Highlight) <= 0) {
// If the locally stored highlight count is zero, use the server provided value.
room.setUnreadNotificationCount(_room.NotificationCountType.Highlight, joinObj.unread_notifications.highlight_count ?? 0);
}
}
const unreadThreadNotifications = joinObj[_sync.UNREAD_THREAD_NOTIFICATIONS.name] ?? joinObj[_sync.UNREAD_THREAD_NOTIFICATIONS.altName];
if (unreadThreadNotifications) {
// This mirrors the logic above for rooms: take the *total* notification count from
// the server for unencrypted rooms or is it's zero. Any threads not present in this
// object implicitly have zero notifications, so start by clearing the total counts
// for all such threads.
room.resetThreadUnreadNotificationCountFromSync(Object.keys(unreadThreadNotifications));
for (const [threadId, unreadNotification] of Object.entries(unreadThreadNotifications)) {
if (!encrypted || unreadNotification.notification_count === 0) {
room.setThreadUnreadNotificationCount(threadId, _room.NotificationCountType.Total, unreadNotification.notification_count ?? 0);
}
const hasNoNotifications = room.getThreadUnreadNotificationCount(threadId, _room.NotificationCountType.Highlight) <= 0;
if (!encrypted || encrypted && hasNoNotifications) {
room.setThreadUnreadNotificationCount(threadId, _room.NotificationCountType.Highlight, unreadNotification.highlight_count ?? 0);
}
}
} else {
room.resetThreadUnreadNotificationCountFromSync();
}
joinObj.timeline = joinObj.timeline || {};
if (joinObj.isBrandNewRoom) {
// set the back-pagination token. Do this *before* adding any
// events so that clients can start back-paginating.
if (joinObj.timeline.prev_batch !== null) {
room.getLiveTimeline().setPaginationToken(joinObj.timeline.prev_batch, _eventTimeline.EventTimeline.BACKWARDS);
}
} else if (joinObj.timeline.limited) {
let limited = true;
// we've got a limited sync, so we *probably* have a gap in the
// timeline, so should reset. But we might have been peeking or
// paginating and already have some of the events, in which
// case we just want to append any subsequent events to the end
// of the existing timeline.
//
// This is particularly important in the case that we already have
// *all* of the events in the timeline - in that case, if we reset
// the timeline, we'll end up with an entirely empty timeline,
// which we'll try to paginate but not get any new events (which
// will stop us linking the empty timeline into the chain).
//
for (let i = events.length - 1; i >= 0; i--) {
const eventId = events[i].getId();
if (room.getTimelineForEvent(eventId)) {
debuglog(`Already have event ${eventId} in limited sync - not resetting`);
limited = false;
// we might still be missing some of the events before i;
// we don't want to be adding them to the end of the
// timeline because that would put them out of order.
events.splice(0, i);
// XXX: there's a problem here if the skipped part of the
// timeline modifies the state set in stateEvents, because
// we'll end up using the state from stateEvents rather
// than the later state from timelineEvents. We probably
// need to wind stateEvents forward over the events we're
// skipping.
break;
}
}
if (limited) {
room.resetLiveTimeline(joinObj.timeline.prev_batch, this.syncOpts.canResetEntireTimeline(room.roomId) ? null : syncEventData.oldSyncToken ?? null);
// We have to assume any gap in any timeline is
// reason to stop incrementally tracking notifications and
// reset the timeline.
client.resetNotifTimelineSet();
}
}
// process any crypto events *before* emitting the RoomStateEvent events. This
// avoids a race condition if the application tries to send a message after the
// state event is processed, but before crypto is enabled, which then causes the
// crypto layer to complain.
if (this.syncOpts.cryptoCallbacks) {
for (const e of stateEvents.concat(events)) {
if (e.isState() && e.getType() === _event.EventType.RoomEncryption && e.getStateKey() === "") {
await this.syncOpts.cryptoCallbacks.onCryptoEvent(room, e);
}
}
}
try {
await this.injectRoomEvents(room, stateEvents, events, syncEventData.fromCache);
} catch (e) {
_logger.logger.error(`Failed to process events on room ${room.roomId}:`, e);
}
// set summary after processing events,
// because it will trigger a name calculation
// which needs the room state to be up to date
if (joinObj.summary) {
room.setSummary(joinObj.summary);
}
// we deliberately don't add ephemeral events to the timeline
room.addEphemeralEvents(ephemeralEvents);
// we deliberately don't add accountData to the timeline
room.addAccountData(accountDataEvents);
room.recalculate();
if (joinObj.isBrandNewRoom) {
client.store.storeRoom(room);
client.emit(_client.ClientEvent.Room, room);
}
this.processEventsForNotifs(room, events);
const emitEvent = e => client.emit(_client.ClientEvent.Event, e);
stateEvents.forEach(emitEvent);
events.forEach(emitEvent);
ephemeralEvents.forEach(emitEvent);
accountDataEvents.forEach(emitEvent);
// Decrypt only the last message in all rooms to make sure we can generate a preview
// And decrypt all events after the recorded read receipt to ensure an accurate
// notification count
room.decryptCriticalEvents();
});
// Handle leaves (e.g. kicked rooms)
await (0, _utils.promiseMapSeries)(leaveRooms, async leaveObj => {
const room = leaveObj.room;
const stateEvents = this.mapSyncEventsFormat(leaveObj.state, room);
const events = this.mapSyncEventsFormat(leaveObj.timeline, room);
const accountDataEvents = this.mapSyncEventsFormat(leaveObj.account_data);
await this.injectRoomEvents(room, stateEvents, events);
room.addAccountData(accountDataEvents);
room.recalculate();
if (leaveObj.isBrandNewRoom) {
client.store.storeRoom(room);
client.emit(_client.ClientEvent.Room, room);
}
this.processEventsForNotifs(room, events);
stateEvents.forEach(function (e) {
client.emit(_client.ClientEvent.Event, e);
});
events.forEach(function (e) {
client.emit(_client.ClientEvent.Event, e);
});
accountDataEvents.forEach(function (e) {
client.emit(_client.ClientEvent.Event, e);
});
});
// Handle knocks
await (0, _utils.promiseMapSeries)(knockRooms, async knockObj => {
const room = knockObj.room;
const stateEvents = this.mapSyncEventsFormat(knockObj.knock_state, room);
await this.injectRoomEvents(room, stateEvents);
if (knockObj.isBrandNewRoom) {
room.recalculate();
client.store.storeRoom(room);
client.emit(_client.ClientEvent.Room, room);
} else {
// Update room state for knock->leave->knock cycles
room.recalculate();
}
stateEvents.forEach(function (e) {
client.emit(_client.ClientEvent.Event, e);
});
});
// update the notification timeline, if appropriate.
// we only do this for live events, as otherwise we can't order them sanely
// in the timeline relative to ones paginated in by /notifications.
// XXX: we could fix this by making EventTimeline support chronological
// ordering... but it doesn't, right now.
if (syncEventData.oldSyncToken && this.notifEvents.length) {
this.notifEvents.sort(function (a, b) {
return a.getTs() - b.getTs();
});
this.notifEvents.forEach(function (event) {
client.getNotifTimelineSet()?.addLiveEvent(event);
});
}
// Handle device list updates
if (data.device_lists) {
if (this.syncOpts.cryptoCallbacks) {
await this.syncOpts.cryptoCallbacks.processDeviceLists(data.device_lists);
} else {
// FIXME if we *don't* have a crypto module, we still need to
// invalidate the device lists. But that would require a
// substantial bit of rework :/.
}
}
// Handle one_time_keys_count and unused fallback keys
await this.syncOpts.cryptoCallbacks?.processKeyCounts(data.device_one_time_keys_count, data.device_unused_fallback_key_types ?? data["org.matrix.msc2732.device_unused_fallback_key_types"]);
}