in rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdMap.kt [160:236]
override fun onWireReceived(proto: IProtocol, buffer: AbstractBuffer, ctx: SerializationCtx, dispatchHelper: IRdWireableDispatchHelper) {
val header = buffer.readInt()
val msgVersioned = (header shr versionedFlagShift) != 0
val op = parseFromOrdinal<Op>(header and ((1 shl versionedFlagShift) - 1))
val version = if (msgVersioned) buffer.readLong() else 0
val key = keySzr.read(ctx, buffer)
if (op == Op.Ack) {
dispatchHelper.dispatch {
val errmsg = Sync.lock(pendingForAck) {
if (!msgVersioned) "Received ${Op.Ack} while msg hasn't versioned flag set"
else if (!master) "Received ${Op.Ack} when not a Master"
else pendingForAck[key]?.let { pendingVersion ->
if (pendingVersion < version) "Pending version `$pendingVersion` < ${Op.Ack} version `$version`"
else {
//side effect
if (pendingVersion == version) pendingForAck.remove(key) //else we don't need to remove, silently drop
"" //return good result
}
} ?: "No pending for ${Op.Ack}"
}
if (errmsg.isEmpty())
logReceived.trace { logmsg(Op.Ack, version, key) }
else
logReceived.error { logmsg(Op.Ack, version, key) + " >> $errmsg" }
}
} else {
val isPut = (op == Op.Add || op == Op.Update)
val value = if (isPut) valSzr.read(ctx, buffer) else null
val lifetime = dispatchHelper.lifetime
val definition = tryPreBindValue(lifetime, key, value, true)
logReceived.trace { "onWireReceived:: ${logmsg(op, version, key, value)}" }
dispatchHelper.dispatch {
if (msgVersioned || !master || !isPendingForAck(key)) {
logReceived.trace { "dispatched:: ${logmsg(op, version, key, value)}" }
if (value != null) {
val definitions = tryGetBindDefinitions(lifetime)
if (definitions != null) {
if (op == Op.Update)
definitions[key]?.terminate()
definitions[key] = definition
}
map[key] = value
} else {
val prevDef = tryGetBindDefinitions(lifetime)?.remove(key)
prevDef?.terminate()
map.remove(key)
}
} else {
logReceived.trace { logmsg(op, version, key, value) + " >> REJECTED" }
}
if (msgVersioned) {
proto.wire.send(rdid) { innerBuffer ->
innerBuffer.writeInt((1 shl versionedFlagShift) or Op.Ack.ordinal)
innerBuffer.writeLong(version)
keySzr.write(ctx, innerBuffer, key)
logSend.trace { logmsg(Op.Ack, version, key) }
}
if (master) logReceived.error { "Both ends are masters: $location" }
}
}
}
}