fn make_socket_rpc()

in patched-vscode/cli/src/tunnels/control_server.rs [323:509]


fn make_socket_rpc(
	log: log::Logger,
	socket_tx: mpsc::Sender<SocketSignal>,
	http_delegated: DelegatedSimpleHttp,
	launcher_paths: LauncherPaths,
	code_server_args: CodeServerArgs,
	port_forwarding: Option<PortForwarding>,
	requires_auth: AuthRequired,
	platform: Platform,
	http_requests: HttpRequestsMap,
) -> RpcDispatcher<MsgPackSerializer, HandlerContext> {
	let server_bridges = ServerMultiplexer::new();
	let mut rpc = RpcBuilder::new(MsgPackSerializer {}).methods(HandlerContext {
		did_update: Arc::new(AtomicBool::new(false)),
		auth_state: Arc::new(std::sync::Mutex::new(match requires_auth {
			AuthRequired::VSDAWithToken(t) => AuthState::WaitingForChallenge(Some(t)),
			AuthRequired::VSDA => AuthState::WaitingForChallenge(None),
			AuthRequired::None => AuthState::Authenticated,
		})),
		socket_tx,
		log: log.clone(),
		launcher_paths,
		code_server_args,
		code_server: Arc::new(Mutex::new(None)),
		server_bridges,
		port_forwarding,
		platform,
		http: Arc::new(FallbackSimpleHttp::new(
			ReqwestSimpleHttp::new(),
			http_delegated,
		)),
		http_requests,
	});

	rpc.register_sync("ping", |_: EmptyObject, _| Ok(EmptyObject {}));
	rpc.register_sync("gethostname", |_: EmptyObject, _| handle_get_hostname());
	rpc.register_sync("sys_kill", |p: SysKillRequest, c| {
		ensure_auth(&c.auth_state)?;
		handle_sys_kill(p.pid)
	});
	rpc.register_sync("fs_stat", |p: FsSinglePathRequest, c| {
		ensure_auth(&c.auth_state)?;
		handle_stat(p.path)
	});
	rpc.register_duplex(
		"fs_read",
		1,
		move |mut streams, p: FsSinglePathRequest, c| async move {
			ensure_auth(&c.auth_state)?;
			handle_fs_read(streams.remove(0), p.path).await
		},
	);
	rpc.register_duplex(
		"fs_write",
		1,
		move |mut streams, p: FsSinglePathRequest, c| async move {
			ensure_auth(&c.auth_state)?;
			handle_fs_write(streams.remove(0), p.path).await
		},
	);
	rpc.register_duplex(
		"fs_connect",
		1,
		move |mut streams, p: FsSinglePathRequest, c| async move {
			ensure_auth(&c.auth_state)?;
			handle_fs_connect(streams.remove(0), p.path).await
		},
	);
	rpc.register_duplex(
		"net_connect",
		1,
		move |mut streams, n: NetConnectRequest, c| async move {
			ensure_auth(&c.auth_state)?;
			handle_net_connect(streams.remove(0), n).await
		},
	);
	rpc.register_async("fs_rm", move |p: FsSinglePathRequest, c| async move {
		ensure_auth(&c.auth_state)?;
		handle_fs_remove(p.path).await
	});
	rpc.register_sync("fs_mkdirp", |p: FsSinglePathRequest, c| {
		ensure_auth(&c.auth_state)?;
		handle_fs_mkdirp(p.path)
	});
	rpc.register_sync("fs_rename", |p: FsRenameRequest, c| {
		ensure_auth(&c.auth_state)?;
		handle_fs_rename(p.from_path, p.to_path)
	});
	rpc.register_sync("fs_readdir", |p: FsSinglePathRequest, c| {
		ensure_auth(&c.auth_state)?;
		handle_fs_readdir(p.path)
	});
	rpc.register_sync("get_env", |_: EmptyObject, c| {
		ensure_auth(&c.auth_state)?;
		handle_get_env()
	});
	rpc.register_sync(METHOD_CHALLENGE_ISSUE, |p: ChallengeIssueParams, c| {
		handle_challenge_issue(p, &c.auth_state)
	});
	rpc.register_sync(METHOD_CHALLENGE_VERIFY, |p: ChallengeVerifyParams, c| {
		handle_challenge_verify(p.response, &c.auth_state)
	});
	rpc.register_async("serve", move |params: ServeParams, c| async move {
		ensure_auth(&c.auth_state)?;
		handle_serve(c, params).await
	});
	rpc.register_async("update", |p: UpdateParams, c| async move {
		handle_update(&c.http, &c.log, &c.did_update, &p).await
	});
	rpc.register_sync("servermsg", |m: ServerMessageParams, c| {
		if let Err(e) = handle_server_message(&c.log, &c.server_bridges, m) {
			warning!(c.log, "error handling call: {:?}", e);
		}
		Ok(EmptyObject {})
	});
	rpc.register_sync("prune", |_: EmptyObject, c| handle_prune(&c.launcher_paths));
	rpc.register_async("callserverhttp", |p: CallServerHttpParams, c| async move {
		let code_server = c.code_server.lock().await.clone();
		handle_call_server_http(code_server, p).await
	});
	rpc.register_async("forward", |p: ForwardParams, c| async move {
		ensure_auth(&c.auth_state)?;
		handle_forward(&c.log, &c.port_forwarding, p).await
	});
	rpc.register_async("unforward", |p: UnforwardParams, c| async move {
		ensure_auth(&c.auth_state)?;
		handle_unforward(&c.log, &c.port_forwarding, p).await
	});
	rpc.register_async("acquire_cli", |p: AcquireCliParams, c| async move {
		ensure_auth(&c.auth_state)?;
		handle_acquire_cli(&c.launcher_paths, &c.http, &c.log, p).await
	});
	rpc.register_duplex("spawn", 3, |mut streams, p: SpawnParams, c| async move {
		ensure_auth(&c.auth_state)?;
		handle_spawn(
			&c.log,
			p,
			Some(streams.remove(0)),
			Some(streams.remove(0)),
			Some(streams.remove(0)),
		)
		.await
	});
	rpc.register_duplex(
		"spawn_cli",
		3,
		|mut streams, p: SpawnParams, c| async move {
			ensure_auth(&c.auth_state)?;
			handle_spawn_cli(
				&c.log,
				p,
				streams.remove(0),
				streams.remove(0),
				streams.remove(0),
			)
			.await
		},
	);
	rpc.register_sync("httpheaders", |p: HttpHeadersParams, c| {
		if let Some(req) = c.http_requests.lock().unwrap().get(&p.req_id) {
			trace!(c.log, "got {} response for req {}", p.status_code, p.req_id);
			req.initial_response(p.status_code, p.headers);
		} else {
			warning!(c.log, "got response for unknown req {}", p.req_id);
		}
		Ok(EmptyObject {})
	});
	rpc.register_sync("httpbody", move |p: HttpBodyParams, c| {
		let mut reqs = c.http_requests.lock().unwrap();
		if let Some(req) = reqs.get(&p.req_id) {
			if !p.segment.is_empty() {
				req.body(p.segment);
			}
			if p.complete {
				trace!(c.log, "delegated request {} completed", p.req_id);
				reqs.remove(&p.req_id);
			}
		}
		Ok(EmptyObject {})
	});
	rpc.register_sync(
		"version",
		|_: EmptyObject, _| Ok(VersionResponse::default()),
	);

	rpc.build(log)
}