in patched-vscode/cli/src/tunnels/control_server.rs [323:509]
fn make_socket_rpc(
log: log::Logger,
socket_tx: mpsc::Sender<SocketSignal>,
http_delegated: DelegatedSimpleHttp,
launcher_paths: LauncherPaths,
code_server_args: CodeServerArgs,
port_forwarding: Option<PortForwarding>,
requires_auth: AuthRequired,
platform: Platform,
http_requests: HttpRequestsMap,
) -> RpcDispatcher<MsgPackSerializer, HandlerContext> {
let server_bridges = ServerMultiplexer::new();
let mut rpc = RpcBuilder::new(MsgPackSerializer {}).methods(HandlerContext {
did_update: Arc::new(AtomicBool::new(false)),
auth_state: Arc::new(std::sync::Mutex::new(match requires_auth {
AuthRequired::VSDAWithToken(t) => AuthState::WaitingForChallenge(Some(t)),
AuthRequired::VSDA => AuthState::WaitingForChallenge(None),
AuthRequired::None => AuthState::Authenticated,
})),
socket_tx,
log: log.clone(),
launcher_paths,
code_server_args,
code_server: Arc::new(Mutex::new(None)),
server_bridges,
port_forwarding,
platform,
http: Arc::new(FallbackSimpleHttp::new(
ReqwestSimpleHttp::new(),
http_delegated,
)),
http_requests,
});
rpc.register_sync("ping", |_: EmptyObject, _| Ok(EmptyObject {}));
rpc.register_sync("gethostname", |_: EmptyObject, _| handle_get_hostname());
rpc.register_sync("sys_kill", |p: SysKillRequest, c| {
ensure_auth(&c.auth_state)?;
handle_sys_kill(p.pid)
});
rpc.register_sync("fs_stat", |p: FsSinglePathRequest, c| {
ensure_auth(&c.auth_state)?;
handle_stat(p.path)
});
rpc.register_duplex(
"fs_read",
1,
move |mut streams, p: FsSinglePathRequest, c| async move {
ensure_auth(&c.auth_state)?;
handle_fs_read(streams.remove(0), p.path).await
},
);
rpc.register_duplex(
"fs_write",
1,
move |mut streams, p: FsSinglePathRequest, c| async move {
ensure_auth(&c.auth_state)?;
handle_fs_write(streams.remove(0), p.path).await
},
);
rpc.register_duplex(
"fs_connect",
1,
move |mut streams, p: FsSinglePathRequest, c| async move {
ensure_auth(&c.auth_state)?;
handle_fs_connect(streams.remove(0), p.path).await
},
);
rpc.register_duplex(
"net_connect",
1,
move |mut streams, n: NetConnectRequest, c| async move {
ensure_auth(&c.auth_state)?;
handle_net_connect(streams.remove(0), n).await
},
);
rpc.register_async("fs_rm", move |p: FsSinglePathRequest, c| async move {
ensure_auth(&c.auth_state)?;
handle_fs_remove(p.path).await
});
rpc.register_sync("fs_mkdirp", |p: FsSinglePathRequest, c| {
ensure_auth(&c.auth_state)?;
handle_fs_mkdirp(p.path)
});
rpc.register_sync("fs_rename", |p: FsRenameRequest, c| {
ensure_auth(&c.auth_state)?;
handle_fs_rename(p.from_path, p.to_path)
});
rpc.register_sync("fs_readdir", |p: FsSinglePathRequest, c| {
ensure_auth(&c.auth_state)?;
handle_fs_readdir(p.path)
});
rpc.register_sync("get_env", |_: EmptyObject, c| {
ensure_auth(&c.auth_state)?;
handle_get_env()
});
rpc.register_sync(METHOD_CHALLENGE_ISSUE, |p: ChallengeIssueParams, c| {
handle_challenge_issue(p, &c.auth_state)
});
rpc.register_sync(METHOD_CHALLENGE_VERIFY, |p: ChallengeVerifyParams, c| {
handle_challenge_verify(p.response, &c.auth_state)
});
rpc.register_async("serve", move |params: ServeParams, c| async move {
ensure_auth(&c.auth_state)?;
handle_serve(c, params).await
});
rpc.register_async("update", |p: UpdateParams, c| async move {
handle_update(&c.http, &c.log, &c.did_update, &p).await
});
rpc.register_sync("servermsg", |m: ServerMessageParams, c| {
if let Err(e) = handle_server_message(&c.log, &c.server_bridges, m) {
warning!(c.log, "error handling call: {:?}", e);
}
Ok(EmptyObject {})
});
rpc.register_sync("prune", |_: EmptyObject, c| handle_prune(&c.launcher_paths));
rpc.register_async("callserverhttp", |p: CallServerHttpParams, c| async move {
let code_server = c.code_server.lock().await.clone();
handle_call_server_http(code_server, p).await
});
rpc.register_async("forward", |p: ForwardParams, c| async move {
ensure_auth(&c.auth_state)?;
handle_forward(&c.log, &c.port_forwarding, p).await
});
rpc.register_async("unforward", |p: UnforwardParams, c| async move {
ensure_auth(&c.auth_state)?;
handle_unforward(&c.log, &c.port_forwarding, p).await
});
rpc.register_async("acquire_cli", |p: AcquireCliParams, c| async move {
ensure_auth(&c.auth_state)?;
handle_acquire_cli(&c.launcher_paths, &c.http, &c.log, p).await
});
rpc.register_duplex("spawn", 3, |mut streams, p: SpawnParams, c| async move {
ensure_auth(&c.auth_state)?;
handle_spawn(
&c.log,
p,
Some(streams.remove(0)),
Some(streams.remove(0)),
Some(streams.remove(0)),
)
.await
});
rpc.register_duplex(
"spawn_cli",
3,
|mut streams, p: SpawnParams, c| async move {
ensure_auth(&c.auth_state)?;
handle_spawn_cli(
&c.log,
p,
streams.remove(0),
streams.remove(0),
streams.remove(0),
)
.await
},
);
rpc.register_sync("httpheaders", |p: HttpHeadersParams, c| {
if let Some(req) = c.http_requests.lock().unwrap().get(&p.req_id) {
trace!(c.log, "got {} response for req {}", p.status_code, p.req_id);
req.initial_response(p.status_code, p.headers);
} else {
warning!(c.log, "got response for unknown req {}", p.req_id);
}
Ok(EmptyObject {})
});
rpc.register_sync("httpbody", move |p: HttpBodyParams, c| {
let mut reqs = c.http_requests.lock().unwrap();
if let Some(req) = reqs.get(&p.req_id) {
if !p.segment.is_empty() {
req.body(p.segment);
}
if p.complete {
trace!(c.log, "delegated request {} completed", p.req_id);
reqs.remove(&p.req_id);
}
}
Ok(EmptyObject {})
});
rpc.register_sync(
"version",
|_: EmptyObject, _| Ok(VersionResponse::default()),
);
rpc.build(log)
}