override fun onWireReceived()

in rd-kt/rd-framework/src/main/kotlin/com/jetbrains/rd/framework/impl/RdMap.kt [160:236]


    override fun onWireReceived(proto: IProtocol, buffer: AbstractBuffer, ctx: SerializationCtx, dispatchHelper: IRdWireableDispatchHelper) {
        val header = buffer.readInt()
        val msgVersioned = (header shr versionedFlagShift) != 0
        val op = parseFromOrdinal<Op>(header and ((1 shl versionedFlagShift) - 1))

        val version = if (msgVersioned) buffer.readLong() else 0

        val key = keySzr.read(ctx, buffer)

        if (op == Op.Ack) {
            dispatchHelper.dispatch {
                val errmsg = Sync.lock(pendingForAck) {
                    if (!msgVersioned) "Received ${Op.Ack} while msg hasn't versioned flag set"
                    else if (!master) "Received ${Op.Ack} when not a Master"
                    else pendingForAck[key]?.let { pendingVersion ->
                        if (pendingVersion < version) "Pending version `$pendingVersion` < ${Op.Ack} version `$version`"
                        else {
                            //side effect
                            if (pendingVersion == version) pendingForAck.remove(key) //else we don't need to remove, silently drop
                            "" //return good result
                        }
                    } ?: "No pending for ${Op.Ack}"
                }

                if (errmsg.isEmpty())
                    logReceived.trace { logmsg(Op.Ack, version, key) }
                else
                    logReceived.error { logmsg(Op.Ack, version, key) + " >> $errmsg" }
            }

        } else {
            val isPut = (op == Op.Add || op == Op.Update)
            val value = if (isPut) valSzr.read(ctx, buffer) else null

            val lifetime = dispatchHelper.lifetime
            val definition = tryPreBindValue(lifetime, key, value, true)

            logReceived.trace { "onWireReceived:: ${logmsg(op, version, key, value)}" }

            dispatchHelper.dispatch {
                if (msgVersioned || !master || !isPendingForAck(key)) {
                    logReceived.trace { "dispatched:: ${logmsg(op, version, key, value)}" }

                    if (value != null) {
                        val definitions = tryGetBindDefinitions(lifetime)
                        if (definitions != null) {
                            if (op == Op.Update)
                                definitions[key]?.terminate()

                            definitions[key] = definition
                        }

                        map[key] = value
                    } else {
                        val prevDef = tryGetBindDefinitions(lifetime)?.remove(key)
                        prevDef?.terminate()
                        map.remove(key)
                    }

                } else {
                    logReceived.trace { logmsg(op, version, key, value) + " >> REJECTED" }
                }

                if (msgVersioned) {
                    proto.wire.send(rdid) { innerBuffer ->
                        innerBuffer.writeInt((1 shl versionedFlagShift) or Op.Ack.ordinal)
                        innerBuffer.writeLong(version)
                        keySzr.write(ctx, innerBuffer, key)

                        logSend.trace { logmsg(Op.Ack, version, key) }
                    }

                    if (master) logReceived.error { "Both ends are masters: $location" }
                }
            }
        }
    }