Summary: 590 instances, 551 unique Text Count // TODO: potentially logic to check clock if we exceeded total backoff timeout here (and then return nil) 1 internal let ownerID: ActorIdentity // TODO: can be just identity 1 // TODO: we are sending the ping here to initiate cluster membership. Once available this should do a state sync instead 1 "case \(decl.nameFirstLowercased)(/*TODO: MODULE.*/GeneratedActor.Messages.\(decl.name))" 1 // TODO: remove this and replace by the infrastructure which is now Swift's `actor` 1 // FIXME: this is obviously not a good idea 1 // self._serialization = nil // FIXME: need to release serialization 1 // FIXME: terrible hack, instead just store the id then? 1 /// Not intended for general use. TODO: Make internal if possible. 1 // TODO: TopLevelDataEncoder 1 // TODO: a bit lazy implementation 1 // TODO: we need to be able to abstract over Coders to collapse this into "giveMeACoder().decode()" 1 // TODO: we should make it dedicated to dispatch() rather than raw executing perhaps? This way it can take care of fairness things 1 associatedtype Message: Codable // TODO: & Sendable 1 /// without a leader to perform the unreachable -> down move. // TODO keep thinking if we can do better here, we could to a quorum downing IMHO, and remove this impl completely as it's very "bad". 1 case nodeTerminated(UniqueNode) // TODO: more additional info? 1 // FIXME: is there some way to express that actually, Metadata is INSIDE Payload so I only want to pass the "envelope" myself...? 1 return true // TODO: would be better to avoid them ending up here at all, this means that likely a double dead letter was sent 1 // TODO: what if already stopped or failed 1 // TODO: shall we make those return something async-capable, or is our assumption that we invoke these in the serialization pools enough at least until proven wrong? 1 // // TODO: implement this rather as "high priority peer to gossip to" 1 // TODO: we could require it to be async as well or something 1 // self.instrumentation.actorSubscribed(key: anyKey, address: self.id._unwrapActorAddress) // FIXME: remove the address parameter, it does not make sense anymore 1 // TODO: func haveNotYetSeen(version: VersionVector): [UniqueNode] 1 // // TODO: assert the refs match type? 1 // @available(*, deprecated, message: "use 'actor cluster' transport version instead") // TODO: deprecate 1 // TODO: avoid the box part? 1 /// Pool members may be local or remote, // TODO: and there should be ways to say "prefer local or something". 1 // FIXME: we should not add Codable conformance onto a stdlib type, but rather fix this in stdlib 1 let shootTheNodeWriteTimeout: NIO.TimeAmount = .seconds(10) // FIXME: hardcoded last write timeout... 1 // TODO: deprecate, we should not need this explicit type 1 // TODO: special handle $ask- replies; those mean we got a reply to an ask which timed out already 1 // TODO: This can be optimized and it's enough if we keep a digest of the gossips; this way ACKs can just send the digest as well saving space. 1 return self.onHandshakeFailed(context, state, with: fromNode, error: error) // FIXME: implement this basically disassociate() right away? 1 // TODO: wasteful representation, keeping for now to iterate on handshake -- ktoso 1 // TODO: validate all the niling out; can we null out the cell itself? 1 fatalError("selected dispatcher [\(props.dispatcher)] not implemented yet; ") // FIXME: remove any not implemented ones simply from API 2 // TODO: Dry up setting this metadata 1 "messageAccess": "public", // TODO: allow non public actor messages 1 // TODO: should we share this, or have a separate ELG for IO? 1 case .default: dispatcher = self._dispatcher // TODO: this is dispatcher inheritance, not sure about it 1 // TODO: It would be lovely to be able to express this in the type system as "actor owned" or "actor local" to some actor instance. 1 // TODO: once we can abstract over Coders all these could go away most likely (and accept a generic TopLevelCoder) 2 // TODO: should this always be personalized dead letters instead? 1 // TODO: no graceful steps implemented today yet) leave the cluster. 1 // TODO: implement special actor ref instead of using real actor 1 // TODO: more docs 1 // TODO: the reason the `_` keys are not cancelled is because we want to cancel timers in _restartPrepare but we need "our restart timer" to remain 2 self.imports.append("\(node)") // TODO: more special type, since cross module etc 1 return bootstrap.connect(host: targetNode.host, port: Int(targetNode.port)) // TODO: separate setup from using it 1 let selectedPeers = logic.selectPeers(allPeers) // TODO: OrderedSet would be the right thing here... 1 // TODO: can we skip the Any... and use the underlying existential somehow? 1 let hint = hintOverride ?? _typeName(type) // FIXME: _mangledTypeName https://github.com/apple/swift/pull/30318 2 // FIXME: connecting to the singleton through proxy incurs an extra hop. an optimization would be 1 // TODO: https://github.com/apple/swift-distributed-actors/issues/605 use the serialization infra from the system rather 1 // TODO: leave should perhaps return a future or something to await on. 1 // TODO: expectTermination(of: ...) maybe nicer wording? 1 case pinnedThread // TODO: implement pinned thread dispatcher 1 // FIXME: PASS IN FROM ASSOCIATION SINCE MUST SURVIVE CONNECTIONS! // TODO: tests about killing connections the hard way 1 // // TODO: or rather, ask the logic if it wants to eagerly push? 1 var newWorkers = Array(listing.refs) // TODO: smarter logic here, remove dead ones etc; keep stable round robin while new listing arrives 1 return deadLetters.system // FIXME: do we really need this 1 // // FIXME: this needs more work... 1 // TODO: if we made system carry system.time we could always count from that point in time with a TimeAmount; require Clock and settings then 1 // TODO: Future; TODO2: no need for this at all now since we have async await 1 // TODO: implement noLongerThan: .seconds(30), where total time is taken from actor system clock 1 // TODO: avoid the lock... 1 // TODO: move to async function 2 // TODO: remove actors as we notify about them 1 // TODO: find a better name; it is not exactly "fish for message" though, that can ignore messages for a while, this one does not 1 fatalError("Can't \(#function) \(watchee) @ (\(watchee.id)), does not seem to be managed by ActorSystem") // TODO: handle more gracefully, i.e. say that we can't watch that actor 1 // TODO: complete impl 1 // TODO: make more async than seining like this, also with check interval rather than spin, or use the blocking queue properly 1 // TODO: stop all children? depends which style we'll end up with... 1 // TODO: make a directive here 1 // TODO: quite inefficient way to scan it, tho list is short 1 // FIXME: this can be removed once issue #458 lands 1 // TODO: log this warning only "once in while" after buffer becomes full 1 // FIXME: We are currently using a timed `poll` instead of indefinitely 1 // FIXME: make sure that if the peer terminated, we don't add it again in here, receptionist would be better then to power this... 1 // FIXME: Implement me (!), we need to make the storage a counter 1 UniqueNode node = 1; // TODO oneof { senderNode | recipientNode | node } 1 // FIXME: document that may crash, it may right? 1 /// TODO oneof { senderNode | recipientNode | node } 1 // TODO: this is mostly only a placeholder impl; we'd need a proper wheel timer most likely 1 // TODO: metadata 1 /// // TODO the scheme how we configure this may need some more re-thinking. 1 // TODO: round robin or what strategy? 1 // TODO: we may be able to pull this off by implementing the "root" as traversable and then we expose it to the Serialization() impl 1 var runResult = ActorRunResult.continueRunning // TODO: hijack the run_length, and reformulate it as "fuel", and set it to zero when we need to stop 1 // TODO: does not handle blocking longer than `within` well 1 // TODO: Terrible lock which we want to get rid of; it means that every remote send has to content against all other sends about getting this ref 1 internal struct TheOneWhoHasNoParent: _ReceivesSystemMessages { // FIXME: fix the name 1 public var systemName: String // TODO: some other name, to signify "this is just for humans"? 1 case handshakeFailed(Node, Error) // TODO: remove? 1 settings.register(WallTimeClock.self) // TODO: can be removed once https://github.com/apple/swift/pull/30318 lands 1 // TODO: When `delay`, we have to be careful not to change ordering of the messages, meaning that if delayed, then all following messages have to be delayed as well. 1 context.system._resolve(context: .init(address: ._swim(on: self.asUniqueNode!), system: context.system)) // TODO: the ! is not so nice 1 // TODO: collapse into one String and index into it? 1 // TODO: should use default `within` from TestKit 1 // FIXME: Avoid the copying, needs SwiftProtobuf changes 1 // FIXME: validate format = self.format? 1 let existenceConfirmed = true // TODO: implement support for existence confirmed or drop it? 1 // TODO: duplicates some logic from _traverse implementation on Actor system (due to initialization dances), see if we can remove the duplication of this 1 // TODO: how to better express this; so it can be maintained by the OpLog itself 1 // TODO: keep and remove logics 1 return // TODO: This restriction could be lifted; perhaps we can direct the register to the right node? 2 // TODO: Document since users need to implement these 1 // TODO: compacting ([+A, -A] -> no need to store A) 1 let allPeers: [AddressableActorRef] = Array(self.peers).map { $0.asAddressable } // TODO: some protocol Addressable so we can avoid this mapping? 1 // TODO: There is tons of compression opportunity about not having to send full tables around in general, but for now we will just send them around 1 ) // TODO: metadata pattern 2 // TODO: probably remove those? 1 context.log.error("Receptionist error caught: \(error)") // TODO: simplify, but we definitely cannot escalate here 3 // TODO: "flags" incl. isSystemMessage 1 // TODO: we should not rely on NIO for futures 1 /// Base backoff strategy to use in handshake retries // TODO: move it around somewhere so only handshake cares? 1 import struct Foundation.Data // FIXME: would want to not have to use Data in our infra as it forces us to copy 1 // TODO: want to eventually not have this; also move to more structured logging perhaps... 1 // TODO: remove this 1 // TODO: replace with a special minimal `_ActorRef` that does not require spawning or scheduling. 1 // TODO: Implement stricter-round robin, the same way as our SWIM impl does, see `nextMemberToPing` 1 // TODO: has to modify restart counters here and supervise with modified supervisor 1 // TODO: allow configuring dispatcher for the probe or always use the calling thread one 1 // TODO: We have discussed and wanted to "do your own" rather than import the NIO ones, but not entirely sold on the usefulness of replicating them -- ktoso 1 // TODO: We are doing a timed poll here to guarantee that we 1 guard let identifier = identifierAny as? GossipIdentifier else { // FIXME: just force GossipIdentifier to be codable, avoid this hacky dance? 1 // TODO: we need to be able to abstract over Coders to collapse this into "giveMeACoder().encode()" 1 // TODO: should deadLetters be special, since watching it is nonsense? 1 case associatedNodes(_ActorRef>) // TODO: better type here 1 state.events.publish(.membershipChange(change)) // TODO: need a test where a leader observes a replacement, and we ensure that it does not end up signalling up or removal twice? 1 // FIXME: optimize ack reply; this can contain only rows of seen tables where we are "ahead" (and always "our" row) 1 case requestMembershipChange(Cluster.Event) // TODO: make a command 1 return // TODO: "drop" the message 1 // TODO: func remoteActorMessageSerializeFailed 1 // var maxRedeliveryTicksWithoutACK = 10_000 // TODO settings 1 try self.onSubscribe(context: context, message: message) // FIXME: would kill the receptionist! 1 // TODO: (optimization) tell `ActorSingletonManager` on `to` node that this node is handing off (https://github.com/apple/swift-distributed-actors/issues/329) 1 // TODO: diffing is not super well tested, may lose up numbers 1 // TODO: use specific dimensions if shell has it configured or groups etc 2 // TODO: Document and API guarantees 1 // TODO: don't forget to include config in string repr once we do it 1 // TODO: not entirely happy about the added weight, but I suppose avoiding going all the way "into" the settings on each send is even worse? 2 // TODO: can likely be optimized more 1 // TODO: This is done "automatically" once we do log compaction 1 // TODO: likely better as class hierarchy, by we'll see... 1 // TODO: we COULD aggressively re-deliver right now here though this is only an optimization 1 let group: EventLoopGroup = settings.eventLoopGroup ?? settings.makeDefaultEventLoopGroup() // TODO: share the loop with client side? 1 signalQueue: _LinkedBlockingQueue<_SystemMessage>, // TODO: maybe we don't need this one 1 // self.state.log.trace("Done rejecting handshake.") // TODO: something more, terminate the association? 2 // TODO: do we HAVE to do this in the Receptionist? 1 // TODO: pass in settings rather than create them here 1 // TODO: currently disabled warnings as errors because of Sendable check noise and work in progress on different toolchains 1 // _ = context.close() // TODO: maybe? 1 // TODO: record message types by type 1 // TODO: should probably .escalate instead; 1 // FIXME: make this internal (!) 1 return TestMatchers(it: self, callSite: callSiteInfo).toBeEmpty() // TODO: lazy impl, should get "expected empty" messages etc 1 // TODO: worth making it Proto representable or not? 1 // TODO: abstract how we keep them, for round robin / random etc 1 // TODO: could we optimize the case when the target is _local_ and _terminated_ so we don't have to do the watch dance (heavy if we did it always), 1 // TODO: document if cheap (AFAICS yes) 1 // TODO: would be nice to be able to also intercept system messages hm... 1 // TODO: lookup separately? 1 // TODO: make it cleaner? though we decided to go with manual peer management as the ClusterShell owns it, hm 1 // TODO: more metadata (from Envelope) (e.g. sender) 1 // FIXME: implement this once we have the Kill or Down command on cluster shell 1 // TODO: could be configurable to escalate once restarts exhausted 1 log[metadataKey: "actor/path"] = "/system/serialization" // TODO: this is a fake path, we could use log source: here if it gets merged 1 // TODO: move to Time.swift? 1 // FIXME: a complete impl would need to "resolve the types" to know if it happens to be a dist protocol 1 // TODO: compact the log whenever we know all members of the cluster have seen 1 let removalDeadline: Deadline // TODO: cluster should have timer to try to remove those periodically 1 // MARK: System extensions to support watching // TODO: move those into context, and make the ActorIdentity the context 1 // TODO: can we make this honor the run length like `Mailbox` does? 1 settings.register(ActorAddress.self, serializerID: .foundationJSON) // TODO: this was protobuf 1 // TODO: Figure out why this can ever happen 1 // TODO: drop message when it fails to be serialized? 1 // FIXME: this will break with nexting... 1 // TODO: may also want to return "these were removed" if we need to make any internal cleanup 1 /// Tombstones are slightly lighter than a real association, and are kept for a maximum of `settings.cluster.associationTombstoneTTL` TODO: make this setting (!!!) 1 // TODO: create message processing metrics 1 // TODO: accept hex and url encoded things as well 1 // FIXME: can this be removed? 1 // TODO: Implement "setup" inside settings, so that parts of bootstrap can be done there, e.g. by end users without digging into remoting internals 1 log[metadataKey: "actor/path"] = "/system/transport.client" // TODO: this is a fake path, we could use log source: here if it gets merged 1 // TODO: This style of implementation queue -> channel swapping can only ever work with coarse locking and is just temporary 1 return // TODO: drop the message 1 // TODO: in case we'd get a new connection the redeliveries must remain... so we always need to poll for the remotecontrol from association? the association would keep the buffers? 1 // TODO: Eventually: probably also best as not enum but a bunch of factories? 1 // TODO: Not optimal since we always do traverseAll rather than follow the Path of the context 1 // TODO: mark as unsafe mode only 1 // TODO: metric for dead letter: self.instrumentation.deadLetter(message: message, from: nil) 2 // TODO: such messages should go over a priority lane 1 nonisolated var id: AnyActorIdentity { get } // FIXME: replace with DistributedActor conformance 1 try self.becomeNext(behavior: .ignore) // TODO: make .drop once implemented 1 // TODO: make sure we only handle ONCE? 1 // TODO: We are using an internal function here to allow us to automatically enable the more strict mode in release builds. 1 remainingWorkers.removeAll { ref in ref.address == terminated.address } // TODO: removeFirst is enough, but has no closure version 1 // FIXME: carry the return type raw in the reply enum 1 internal let metrics: ActorSystemMetrics // TODO: rather, do this via instrumentation 1 // TODO: maybe store also at what time we sent the handshake, so we can diagnose if we should reject replies for being late etc 1 // TODO: remove and always just use the Any.Type 1 // FIXME: implement by scheduling checks rather than spinning 1 public typealias ReplicaVersion = (replicaID: ReplicaID, version: Version) // TODO: struct? 1 // FIXME: Why is this here and not in concurrency helpers? 1 // TODO: do we need to separate server and client sides? Sounds like a reasonable thing to do. 1 traceLog_Mailbox(self.address.path, "Tombstone arrived in dead letters. TODO: make sure these dont happen") 1 // TODO: annoyance; init MUST be defined here rather than in extension since it is required 3 // TODO: instead analyse the type syntax? 1 log[metadataKey: "actor/path"] = "/system/transport.server" // TODO: this is a fake path, we could use log source: here if it gets merged 1 // TODO: also record thr delay between submitting and starting serialization work here? 2 // TODO: Optimization: gap collapsing: [+a,+b,+c,-c] -> [+a,+b,gap(until:4)] 1 // TODO: get the name more properly 1 // FIXME: protect the naming context access and name reservation; add a test 1 // TODO: model the states to express this can not happen // there is a client side state machine and a server side one 1 // TODO: public var identity: ActorIdentity = Path + Name 1 // TODO: nicer representation though needs endiannees dance since this is then encoded in little endian; revisit how and where to represent once we have a solid handshake -- ktoso 1 // FIXME: if we had signal handlers, invoke it 1 // TODO: in the future it can get more stateful behaviors if we wanted to or separate inbound/outbound etc. 1 // TODO: we could merge ACK and NACK if NACKs were to carry "the gap" 1 /// must be taken on the cluster layer, by using and checking for tombstones. // TODO: make a nasty test for this, a simple one we got; See MembershipGossipSeenTableTests 1 // TODO: determine what custom one to use, proto or what else 1 settings.register(_OperationLogClusterReceptionist.AckOps.self) // TODO: can be removed once https://github.com/apple/swift/pull/30318 lands 1 // FIXME: This will go away once https://github.com/apple/swift/pull/30318 is merged and we can rely on summoning types 2 // FIXME: make the promise dance here 1 // FIXME: This current implementation doesn't work on Linux. It is disabled 1 // TODO: This seems worse to implement since I can't pass through my "reads of lazy cause rendering" 1 // TODO: conversions are naive here, we'd want to express this more nicely... 1 // TODO: owner can be used to create metrics for the stash of specific owner 1 // TODO: configurable "when to flush" 1 /// and the pull from thereon happens directly between those peers, which the recipient MAY flow control if it so wanted to; TODO: more detailed flow control rather than just the maxChunk? 1 let buffer = serialized.buffer.asByteBuffer(allocator: self.serialization.allocator) // FIXME: yes we double allocate, no good ways around it today 1 // TODO: those only apply when bounded mailboxes 1 // TODO: pretty sub-optimal, but we'll flatten this all out eventually 1 // TODO: metrics how often we really have to copy 2 () // TODO: revisit logging more details here 1 /// - (TODO: this can just be an actor listening to events once we have events subbing) the shell queries `downingProvider` for decision for downing the node 1 settings.register(ErrorEnvelope.self) // TODO: can be removed once https://github.com/apple/swift/pull/30318 lands 1 case continuation(() throws -> _Behavior) // TODO: make it a Carry type for better debugging 1 // TODO: better type printout so we know we only handle SpecificSignal with this one 1 // TODO: guard that the target node is actually "us"? i.e. if we're exposed over various protocols and/or ports etc? 1 case terminated(ref: AddressableActorRef, existenceConfirmed: Bool, addressTerminated: Bool) // TODO: more additional info? // TODO: send terminated PATH, not ref, sending to it does not make sense after all 1 // TODO: this is just to prevent a DI crash because of enums without cases and Codable 1 // FIXME: death watcher is incomplete, should handle snapshot!! 1 let members = self.membership.members(withStatus: [.joining, .up, .leaving]) // FIXME: we should not require joining nodes in convergence, can losen up a bit here I hope 1 // TODO: signal "gossip round complete" perhaps? 1 // TODO: revise surface API what we want to expose; stopping by just name may be okey? 1 // TODO: ask errors should be lovely and include where they were asked from (source loc) 1 // TODO: matters perhaps only for metrics where we'd want to "please count this specific type of error" so leaving this logic as-is 1 /// Guaranteed to be keyed with `.actorAddress`. // FIXME ensure this 1 // FIXME: Hack to stop from subReceive. Should we allow this somehow? 1 MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) // TODO: share pool with others 1 // TODO: A pool can be configured to terminate itself when any of its workers terminate or attempt to spawn replacements. 1 case shutdown(BlockingReceptacle) // TODO: could be NIO future 1 return try! Guest.resolve(self._guest._ref.asAddressable.asAnyActorIdentity, using: system) // FIXME: cleanup these APIs, should never need throws, resolve earlier 1 // TODO: would there be cases where we want to reconnect the discovery mechanism instead? (we could handle it here) 1 private var membership: Cluster.Membership // FIXME: we need to ensure the membership is always up to date -- we need the initial snapshot or a diff from a zero state etc. 1 // TODO: Workaround for https://bugs.swift.org/browse/SR-12315 "Extension of nested type does not have access to types it is nested in" 1 // TODO: make interval so we know the length of how long an actor processes a message 1 // TODO: not extensively tested but should just-work™ since we treat NIO as plain thread pool basically here. 1 // TODO: explain this more 1 // TODO: restructure it somehow, perhaps we dont need the full abstraction like this 1 private weak var myself: DistributedActor? // TODO: make this just store the address instead? 1 // TODO: want to reconcile those into one, and allow /dead as well 1 /// "too late" or be dropped for some other reason, one may mark it using the TODO: "dont log me as dead letter" protocol. 1 // TODO: eventually replace with our own scheduler implementation 1 // TODO: func remoteActorMessageDeserializeEndFailed 1 // FIXME: this is evil? 1 // FIXME: Avoid the copying, needs SwiftProtobuf changes 1 // Slightly modified version of NIO's Heap. TODO: upstream changes 1 // TODO: Finally change the triple tuple into a specific type with more helpers 1 // TODO: If we used some bits for system message queue count, we could avoid this issue... Consider this at some point perhaps 1 /// If the alternative behavior contains a `.setup` or other deferred behavior, it will be canonicalized on its first execution // TODO: make a test for it 1 self.instrumentation = system.settings.instrumentation.makeActorInstrumentation(self, address) // TODO: could be in association, per node 1 try self.onLookup(context: context, message: message) // FIXME: would kill the receptionist! 1 // TODO: last resort, print error (system could be going down) 1 let associatedNodes = try probe.expectMessage() // TODO: use interval here 1 // effects are signalled via the ClusterShell, not here (it will also perform a merge) // TODO: a bit duplicated, could we maintain it here? 1 /// actor system logs, to other external systems. TODO: Note also node roles, which we do not have yet... those are dynamic key/value pairs paired to a unique node. 1 // TODO: TopLevelDataDecoder 1 // ==== Declare mangled names of some known popular types // TODO: hardcoded mangled name until we have _mangledTypeName 1 // FIXME: Implement me (!), we need to make the storage a counter 1 // TODO: Allow plugins to register types...? 1 // TODO: instead back with a String and keep a pos to index quickly into the name for Substring? 1 case associated(channel: Channel) // TODO: _InternalActorTransport.Node/Peer/Target ??? 1 // FIXME: super naive... replace with something more proper 1 self.handlePingRequestResponse(response: .timeout(target: peerToPing, pingRequestOrigin: context.myself, timeout: pingTimeout, sequenceNumber: 0), pinged: peerToPing, context: context) // FIXME: that sequence number... 1 // TODO: would be some actual logic, that we can plug and play 1 // TODO: here we can either log... or dispatch to actor... or invoke Logging. etc 1 let equalityHackRef = try! system._resolveStub(identity: identity) // FIXME: cleanup the try! 1 // TODO: "race to become the host", all nodes race and try CAS-like to set themselves as leader -- this we could do with cas-paxos perhaps or similar; it is less predictable which node wins, which can be good or bad 1 self._children.insert(ref) // TODO: separate adapters collection? 1 indirect case intercept(behavior: _Behavior, with: _Interceptor) // TODO: for printing it would be nicer to have "supervised" here, though, modeling wise it is exactly an intercept 1 // TODO: update the min 1 self._cluster_members_removed = .init(label: clusterMembersLabel, dimensions: [("status", Cluster.MemberStatus.removed.rawValue)]) // TODO: this is equal to number of stored tombstones kind of 1 // TODO: this will change to subscribing to cluster events once those land 1 // TODO: document how to deal with `protocol` message accepting actors, those should be very rare. 1 """) // TODO: metadata: self.outboundSystemMessages.metadata 2 // TODO: This will be replaced by proper timer keys which can express such need eventually. 1 // FIXME: also close all associations (!!!) 1 // TODO: Optimization: head collapsing: [+a,+b,+c,-b,-a] -> [gap(until:2),+c,-b] 1 // TODO: This style can only ever work since we lock around the entirety of enqueueing messages and this setting; make it such that we don't need the lock eventually 1 public typealias ActorMessage = Codable // FIXME: MAKE THIS SENDABLE: & Sendable 1 /// // TODO: implement SWIMs selection in terms of this 1 // FIXME: this should never happen; tombstone must always be taken in by the actor as last message 1 // FIXME: this is a weak workaround around not having "extensions" (unique object per actor system) 1 // TODO: slow/fast ticks: When we know there's nothing new to share with others, we use the slow tick (which should be increased to 5 seconds or less) 1 // TODO: reply "registered"? 1 // // TODO: remove this most likely 1 // TODO: In situations which need strong guarantees, this leadership election scheme does NOT provide strong enough 1 self._myCell = ref // TODO: atomic? 1 return // TODO: error instead 1 fatalError("TODO other actor path roots not supported; Was: \(shell)") 2 // FIXME: if the actor has lifecycle hooks, call them 1 // TODO: validate if it is for the same UID or not, if not, we may be in trouble? 1 // TODO: callback into client or not? 1 // TODO: note to self measurements of rate can be done in two ways: 1 // TODO: trim some information? 1 fatalError("Can not resolve actor refs without CodingUserInfoKey.actorSerializationContext set!") // TODO: better message 1 let event = SWIM.MemberStatusChangedEvent(previousStatus: previousStatus, member: member) // FIXME: make SWIM emit an option of the event 1 guard isDistributedProtocol else { // FIXME: detect DistributedActor constrained protocol 1 // TODO: This could be expressed as some "actorable.implements(protocol)" 1 // FIXME: we need the sender() to attach properly 1 // TODO: consider ReadWriteLock lock, these accesses are very strongly read only biased 1 // TODO: Maybe offeringToSpeakAtVersion or something like that? 1 // TODO: make this nicer... the ID could serve as the ref 1 // TODO: Those read bad, make one that is from/to in params? 1 // TODO: could be configurable to escalate once restarts exhausted 1 // TODO: "oldest node" 1 // ("bytes dumper", DumpRawBytesDebugHandler(role: .client, log: log)), // FIXME: make available via compilation flag 1 // TODO: optimize the "op / delta buffer" which this really is, kins of like in Op based CRDTs 1 // return try! Guest.resolve(self._guest._ref.asAddressable.asAnyActorIdentity, using: system) // FIXME: cleanup these APIs, should never need throws, resolve earlier 1 // FIXME: we have to add "own node" since we're not getting the .snapshot... so we have to manually act as if.. 1 // TODO: make nicer for auto completion? (.constant) etc 1 // TODO: clock + limit "max total wait time" etc 1 // TODO: this could take into account roles, if we do them 1 // TODO: it is known dead, optimize the resolve? 1 // FIXME: ensure that we never have a seen entry for a non-member 1 shootTheOtherNodePromise.fail(TimeoutError(message: "Timed out writing final STONITH to \(remoteNode), should close forcefully.", timeout: .seconds(10))) // FIXME: same timeout but diff type 1 hasher.combine(self.name) // FIXME: take into account enclosing scope 1 // TODO: benchmark 1 case .signalHandling(let recvMsg, _): return try recvMsg.interpretMessage(context: context, message: message) // TODO: should we keep the signal handler even if not .same? // TODO: more signal handling tests 1 // TODO: is this a change to coalesce some system messages? what if someone did some silly watching the same exact thing many times? 1 // MARK: Plugins // TODO: rename since may be confused with package plugins? 1 var registeredKeys: Set = [] // TODO: OR we store it directly as registeredUnderKeys/subscribedToKeys in the dict 1 recipientPath: wireEnvelope.recipient.path, // TODO: use addresses 1 // TODO: the thing is, I think we can express the entire "wait for children to stop" as a behavior, and no need to make it special implementation in the cell 1 // TODO: handling "unhandled" would be good here... though I think type wise this won't fly, since we care about signal too 1 self.nextPeriodicAckPermittedDeadline[peer] = Deadline.fromNow(nextPeriodicAckAllowedIn) // TODO: context.system.timeSource 1 /// - this may cause the other peer to pull (ack) from any other peer receptionist, if it notices it is "behind" with regards to any of them. // FIXME: what if a peer notices "twice" so we also need to prevent a timer from resending that ack? 2 Array(self.segments.dropLast()) == maybeParentPath.segments // TODO: more efficient impl, without the copying 1 // TODO: carry same data as Envelope -- baggage etc 1 // TODO: not sure if we'd need this or not in reality, we'll see... executing futures safely would be more interesting perhaps 1 // FIXME: expose addMember after all 1 /// and dropped which happens only after an extended period of time. // FIXME: That period of time is not implemented 1 // TODO: make this O(1) by allowing wrapper type to equality check only on NodeID 1 // TODO: We could also implement taking a Clock, and using it see if there's a total limit exceeded 1 () // ignore all other messages // TODO: why? 2 try onDownAction(context.system) // TODO: return a future and run with a timeout 1 // TODO: how can we move the spawn somewhere else so we don't have to pass in the system or context? 1 // TODO: consider receptionist instead of this; we're "early" but receptionist could already be spreading its info to this node, since we associated. 1 return TestMatchers(it: self, callSite: callSiteInfo).toBeNotEmpty() // TODO: lazy impl, should get "expected non-empty" messages etc 1 let system: ActorSystem // TODO: maybe don't need to store it and access via clusterShell? 1 // TODO: definitely good, though likely not as first thing We can base it on Akka's recent "Affinity" one, 1 // TODO: we do not keep any "future" messages and rely on them being re-sent, this is most likely fine (and is in reality in other impls), 1 // TODO: in reality should be FQN, for cross module support 1 // FIXME: change this completely 1 // TODO: should cache perhaps also associations to inject them eagerly to actor refs? 1 settings.register(DistributedActors.OpLogDistributedReceptionist.PushOps.self) // TODO: can be removed once https://github.com/apple/swift/pull/30318 lands 1 self.appendLiteral("[\(ref.address)]") // TODO: make those address 1 // TODO: sender and other metadata 1 return // TODO: "drop" the message rather than dead letter it? 1 // TODO: Would be nice to not need this type at all; though initialization dance prohibiting self access makes this a bit hard 1 // TODO: add test for sending raw SwiftProtobuf.Message 1 // FIXME: super naive... replace with something more proper 1 // FIXME: encode as authority/URI with optimized parser here, this will be executed many many times... 2 // TODO: soundness check; we can't immediately send it to dead letters just yet since first all user messages 1 settings.register(_OperationLogClusterReceptionist.PushOps.self) // TODO: can be removed once https://github.com/apple/swift/pull/30318 lands 1 // TODO: perhaps we instead just warn and ignore this; since it should be harmless 1 // ("bytes dumper", DumpRawBytesDebugHandler(role: .server, log: log)), // FIXME: only include for debug -DSACT_TRACE_NIO things? 1 // TODO: also flush when a key has seen e.g. 100 changes? 2 public protocol Signal: NonTransportableActorMessage, Sendable {} // FIXME: we could allow them as Codable, we never send them over the wire, but people might manually if they wanted to I suppose 1 // TODO: tweak logging some more, this is actually not scary in racy handshakes; so it may happen often 1 // TODO: realistically we ARE becoming a transport and thus should be able to remove 'transports' entirely 2 // TODO: pretty sub-optimal, but we'll flatten this all out eventually 1 #endif // TODO: make the \next printout nice TODO dont log messages (could leak pass etc) 1 // FIXME: don't retry on rejections; those are final; just failures are not, clarify this 1 for key in self.installedTimers.keys where includeSystemTimers || !key.isSystemTimer { // TODO: represent with "system timer key" type? 1 effectiveMetadata: self.context.effectiveMetadata(overrides: metadata), // TODO: should force lazies 1 // TODO: potential for coalescing some ACKs here; schedule "lets write back in 300ms" 2 return .same // TODO: make .drop once implemented 1 case restart(atMost: Int, within: TimeAmount?, backoff: BackoffStrategy?) // TODO: would like to remove the `?` and model more properly 1 // TODO: would be nice to be able to abstract over the coders (using TopLevelDecoder-like types) then rename this to `AnyCodableSerializer` 2 // FIXME: should have more proper config section 1 // TODO: change since serialization which can throw should be shipped of to a future 1 let node: Node // TODO: allow carrying UniqueNode 1 // FIXME: there is a Linux PMC API you can use to get this, but it's 1 let len = message.lengthOfBytes(using: .utf8) // TODO: optimize for ascii? 1 // TODO: This should pick a few at random rather than ping everyone 2 // FIXME: this should technically offload onto storage and then apply them again... 1 // TODO: would this mean that we cannot implement re-delivery inside the NIO layer as we do today? 1 fileprivate init(_ nanoseconds: Value) { // FIXME: Needed the copy since this constructor 1 /// // TODO: This should default to some nice binary format rather than JSON. 1 // TODO: not used 1 self.log.warning("Actor threw error, reason: [\(error)]:\(type(of: error)). Terminating.") // TODO: configurable logging? in props? 1 let threadId = pthread_self() // TODO: since pthread_threadid_np not available, how to get an id? 1 if let watcher = instance as? (DistributedActor & DistributedActors.LifecycleWatch) { // TODO: cleanup once LifecycleWatch implies DistributedActor 1 // TODO: and their logging rate should be configurable 1 // TODO: let awaitAtLeast: Int // before starting to direct traffic 1 // TODO: make Error 1 // FIXME: PASS IN FROM ASSOCIATION SINCE MUST SURVIVE CONNECTIONS !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 2 // TODO: can this instead be a CellDelegate? 1 // TODO: concurrency... lock the ref as others may read it? 1 // TODO: carry metadata from Envelope 1 // self.negotiateCapabilities(...) // TODO: We may want to negotiate other options 1 // TODO: how to better hide such more nasty assertions? 1 instance.onMembershipChanged(change) // TODO: return and interpret directives 1 // TODO: or hide it completely somehow; too dangerous 1 // TODO: (optimization) tell `ActorSingletonManager` on `from` node that this node is taking over (https://github.com/apple/swift-distributed-actors/issues/329) 1 // TODO: Ensure we don't read faster than we can write by adding the BackPressureHandler into the pipeline. 1 self.log.error("\(message)") // TODO: configurable logging? in props? 1 // TODO: maybe move log and settings outside of state into the shell? 1 // TODO: hope to remove this once a StdOutLogHandler lands that has formatting support; 1 // TODO: collapse those two and only use the instrumentation points, also for metrics 1 // TODO: just force GossipIdentifier to be codable, avoid this dance? 1 // TODO: Do we really need to store them at all? 1 /// // TODO: detailed docs on how to use this for a serialization changing rollout of a type 1 // TODO: if/when we'd have some election scheme that is async, e.g. "vote" then this timeout should NOT be infinite and should be handled properly 1 sendAndDropAsDeadLetter() // TODO: "Drop" rather than DeadLetter 1 nonisolated var actorTransport: ActorTransport { get } // FIXME: replace with DistributedActor conformance 1 let backtrace: [String] // TODO: Could be worth it to carry it as struct rather than the raw string? 1 // TODO: Exact semantics remain to be defined. Reserved likely to be used for flags "connection modes" etc "don't connect me, I just send 1 message" etc? 1 // let data = try self.jsonString().data(using: .utf8)! // TODO allow a "debug mode with json payloads?" 1 return bootstrap.bind(host: bindAddress.node.host, port: Int(bindAddress.node.port)) // TODO: separate setup from using it 1 // TODO: OrderedSet would be the right thing here to be honest... 1 self.genericParameterDecls.map { $0.parameterDecl } // TODO: must handle where clauses better 1 try self.validateUniqueName(name) // FIXME: reserve name 1 // TODO: there is no TokenKind.mutatingKeyword in swift-syntax and it's expressed as .identifier("mutating"), could be a bug/omission 1 // FIXME: THIS IS A WORKAROUND UNTIL WE CAN GET MANGLED NAMES https://github.com/apple/swift/pull/30318 2 var registeredKeys: Set = [] // TODO: OR we store it directly as registeredUnderKeys/subscribedToKeys in the dict 1 enum Message: NonTransportableActorMessage { // TODO: make it codable, transportability depends on the Event really 1 // TODO: we could include node ids or similar if we wanted snowflakes... 1 // TODO: This should be internal, but is forced to be public by `_deserializeDeliver` on references. 1 // TODO: The looping through transports could be ineffective... but realistically we dont have many 2 // TODO: generalize this such that we can do props -> dimensions -> done, and not special case the system ones 2 // TODO: decide if we log or crash when new things reg ensured during runtime 1 res.append((nil, "_replyTo", "_ActorRef>")) // TODO: make the same with the error envelope 1 /// Such relationships must be confirmed by using the `_ActorContext.children.hasChild(:UniqueActorPath)` method. TODO: this does not exist yet 2 // TODO: maybe a specialized one... for ask? 1 // TODO: rename ActorRunResult -- the mailbox run is "the run", this is more like the actors per reduction directive... need to not overload the name "run" 1 // TODO: association id for logging? 1 // TODO: Discuss naming of `Worker` 1 let encoder = TopLevelBytesBlobEncoder(allocator: self.allocator) // TODO: make it not a class? 1 // TODO: consider if we need abstraction / does it cost us? 1 log.debug("\(Self.self) deinit, cancelling [\(installedTimers.count)] timers") // TODO: include actor address 1 // TODO: make proper .ordinalString function 1 // TODO: the remote REJECTING must not trigger backoffs 1 // TODO: we always want to call "through" the supervisor, make it more obvious that that should be the case internal API wise? 1 // TODO: how to guard in iOS etc here? 1 // TODO: assert the refs match type? 1 // TODO: clear after a few days, or some max count of nodes, use sorted set for this 1 // TODO: accept association id? 1 // TODO: should we disallow mixing ReplicaID types somehow? 1 // FIXME: MessageDispatcher is going to be replaced by custom Executor types in Swift itself 1 self.nextPeriodicAckPermittedDeadline[peer.id] = Deadline.fromNow(nextPeriodicAckAllowedIn) // TODO: system.timeSource 1 instance.onActorWatched(by: watcher, remoteNode: remoteNode) // TODO: return and interpret directives 1 // TODO: discuss naming of `InternalMessageDispatcher` 1 // TODO: if no more workers may want to issue warnings or timeouts 1 return try interceptor.interceptSignal(target: behavior, context: context, signal: signal) // TODO: do we need to try? 1 // TODO: Thought; we could detect if we're nested in a top-level JSON that we should encode as json perhaps, since proto can do this? 1 // case dispatch(qosClass: Dispatch.DispatchQoS.QoSClass) // TODO: we want diff actors to be able to run on diff priorities, thus this setting 1 // TODO: Reimplement association such that we don't need locks here 1 // TODO: Would want to rename; this is really protocol + host + port, and a "cute name for humans" we on purpose do not take the name as part or identity 1 // FIXME: if we'd expose lifecycle hooks, call them 1 // FIXME: see if we can restructure this to avoid these nil/then-set dance 1 metadata: [ // TODO: carry reason why -- was it gossip, manual or other? 1 /// Offer a new listing to the subscription stream. // FIXME: implement this by offering single elements (!!!) 1 // TODO: does this stay like this? 1 // FIXME: optimize so we don't alloc into the String() here 1 // TODO: we can likely optimize not having to call "through" supervisor if we are .stop anyway 1 // FIXME: sometimes we could encode raw and not via the Data -- think about it and fix it 1 // TODO: Alternatively locking on system message things could be a solution... Though heavy one. 1 // TODO: does not work cross module yet (it would break) 1 // TODO: test removing non existing member 1 // TODO: "$box\(self.name)" would be nicer, but it is reserved 1 // TODO: register types until https://github.com/apple/swift/pull/30318 is merged? 1 settings.register(BestEffortStringError.self) // TODO: can be removed once https://github.com/apple/swift/pull/30318 lands 1 // TODO: this actually would be dispatching to the logging infra (has ticket) 1 // FIXME: recipient address, not just path 1 // FIXME: https://github.com/apple/swift-distributed-actors/issues/552 1 // TODO: call into an connection error? 1 // TODO: maybe conform to Sequence? 1 static let watch = 0 // TODO: UNWATCH!? 1 // FIXME: improve this to always pass around AddressableActorRef rather than just address (in receptionist Subscribe message), remove this trick then 1 // FIXME: reserve the name, atomically 1 // TODO: redeliver everything 1 // TODO: move instrumentation into the transport? 1 // FIXME: implement once cluster.down() is available issue #848 1 // FIXME: enable these assertions 2 next = .unhandled // TODO: could be .drop 1 fatalError("TODO: handle more gracefully") // TODO: handle more gracefully, i.e. say that we can't watch that actor 2 self.onConnectionError(HandshakeConnectionError(node: self.remoteNode, message: "Handshake timed out")) // TODO: improve msgs 1 // TODO: This is not a full solution, however lessens the amount of instances in which we may enqueue to a terminating actor 1 // TODO: reformulate as Wire.accept / reject? 1 // TODO: converge into one tree 1 // TODO: soundness check if this isn't about handshaking with a replacement, then we should continue; 1 // TODO: represent with "system timer key" type? 1 // TODO: clean this up since it's used by tests only (e.g., EventStreamConsumer) 1 // TODO: make not recursive perhaps since could blow up on large chain? 1 // TODO: Could apply the dynamic lookup tricks we do in tracing to make this looking up more painless / safe... 1 state.log.warning("Aborting incoming handshake: \(error)") // TODO: remove 1 // TODO: lock inside provider, not here 1 // FIXME: implement giving up reconnecting 1 existingWatchers.insert(watcher) // FIXME: we have to remove it once it terminates... 1 func _tellOrDeadLetter(_ message: Any, file: String, line: UInt) // TODO: This must die? 1 let adaptedAddress = try self.address.makeChildAddress(name: name, incarnation: .random()) // TODO: actor name to BE the identity 1 self.timers.cancelAll() // TODO: cancel all except the restart timer 1 // TODO: all these to accept trace context or something similar 3 /// TODO: wasteful representation, keeping for now to iterate on handshake -- ktoso 1 self.segments = value.segments.map { $0.value } // TODO: avoiding the mapping could be nice... store segments as strings? 1 // TODO: used to be but too much hassle: throw ActorPathError.illegalActorPathElement(name: name, illegal: "\(c)", index: pos) 1 // TODO: can we hide this? Relates to: https://bugs.swift.org/browse/SR-5880 3 // TODO: rather could we send messages to self._deadLetters with enough info so it handles properly? 1 state.events.publish(.membershipChange(change)) // TODO: need a test where a leader observes a replacement, and we ensure that it does not end up signalling up or removal twice? 1 // TODO: dedup with the boxed one 1 // TODO: hope to remove this one 1 // TODO: perhaps we can figure out where `to` is next and hand over gracefully? 1 // TODO: let trace: TraceMetadata 1 public struct WorkerPoolSettings { // TODO: need the Codable? 1 // TODO: this can be optimized a bit more I suppose, with a reverse lookup table 1 // TODO: we could use this to make TestProbes more "real" rather than wrappers 1 try self.becomeNext(behavior: next) // FIXME: make sure we don't drop the behavior...? 1 system.cluster.ref.tell(.query(.associatedNodes(probe.ref))) // TODO: ask would be nice here 1 case downCommand(Node) // TODO: add reason 1 // TODO: CPU Affinity when pinning 1 // TODO: a bit terrible; perhaps key should be Node and then confirm by UniqueNode? 1 case .signalHandlingAsync(let recvMsg, _): return try recvMsg.interpretMessage(context: context, message: message) // TODO: should we keep the signal handler even if not .same? // TODO: more signal handling tests 1 // TODO: not in love that we have to do logic like this here... with a plain book to continue running or not it is easier 1 // TODO: optimize by carrying ID in envelope of if we need to special handle this as system message 1 self._children.insert(adapter) // TODO: separate adapters collection? 1 import struct Foundation.Data // TODO: would refer to not go "through" Data as our target always is ByteBuffer 1 default: // TODO: remove the use of default: it is the devil