in mixer.go [150:284]
func main() {
flag.Parse()
var mixerURL *url.URL
var err error
if *remoteURL == "" {
if *mixerProject == "" || *mixerRegion == "" {
log.Fatal("You must specify the project and region of the kernels mixer")
}
mixerURL, err = url.Parse(fmt.Sprintf("https://%s-dot-%s.%s", *mixerProject, *mixerRegion, *mixerHost))
} else {
mixerURL, err = url.Parse(*remoteURL)
}
if err != nil {
log.Fatalf("Failure parsing the URL for the kernel mixer: %v", err)
}
mixerProxy := httputil.NewSingleHostReverseProxy(mixerURL)
baseDirector := mixerProxy.Director
mixerProxy.Director = func(r *http.Request) {
baseDirector(r)
if errs := util.ModifyProxiedRequestForHost(r, mixerURL.Host); len(errs) > 0 {
util.Log(r, fmt.Sprintf("Unexpected errors modifying proxied request headers: %v\n", errs))
}
clearExternalOriginForWebsocketRequests(r)
}
mixerProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
util.Log(r, fmt.Sprintf("Error forwarding a request to the kernels mixer: %v", err))
if websocket.IsWebSocketUpgrade(r) {
// Do not report the error via the response status code, as if we do then JupyterLab will
// not retry the websocket connection and will unnecessarily report the kernel as disconnected.
return
}
// Report the proxy error using the same status as the default error handler.
w.WriteHeader(http.StatusBadGateway)
}
tokenSource := oauth2.ReuseTokenSource(nil, tokenSourceFunc(gcloudToken))
// Do the initial token fetch at startup.
tokenSource.Token()
wrappedSessionsProxy := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
token, err := tokenSource.Token()
if err != nil {
msg := fmt.Sprintf("failure generating the authorization header: %v", err)
util.Log(r, msg)
http.Error(w, msg, util.HTTPStatusCode(err))
return
}
r.Header.Del("Authorization")
r.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token.AccessToken))
mixerProxy.ServeHTTP(w, r)
})
remoteBackend := backends.New(remoteBackendName, remoteResourceNameSuffix, mixerURL.Host, wrappedSessionsProxy)
localBackendHost := fmt.Sprintf("localhost:%d", *jupyterPort)
localURL, err := url.Parse("http://" + localBackendHost)
if err != nil {
log.Fatalf("Failure parsing the URL for the locally-running Jupyter Lab instance: %v", err)
}
localProxy := httputil.NewSingleHostReverseProxy(localURL)
localProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
util.Log(r, fmt.Sprintf("Error forwarding a request to the local Jupyter server. Verify %s is active. %v", localURL.String(), err))
if websocket.IsWebSocketUpgrade(r) {
// Do not report the error via the response status code, as if we do then JupyterLab will
// not retry the websocket connection and will unnecessarily report the kernel as disconnected.
return
}
// Report the proxy error using the same status as the default error handler.
w.WriteHeader(http.StatusBadGateway)
}
if len(*jupyterToken) > 0 {
localProxyBaseDirector := localProxy.Director
localProxy.Director = func(r *http.Request) {
localProxyBaseDirector(r)
q := r.URL.Query()
q.Set("token", *jupyterToken)
r.URL.RawQuery = q.Encode()
}
}
localBackend := backends.New(localBackendName, localResourceNameSuffix, localBackendHost, localProxy)
kernelSpecsHandler := kernelspecs.Handler(localBackend, remoteBackend)
kernelsHandler := kernels.Handler(localBackend, remoteBackend)
sessionsHandler := sessions.Handler(localBackend, remoteBackend)
mux := http.NewServeMux()
mux.Handle("/api/kernelspecs", kernelSpecsHandler)
mux.Handle("/api/kernelspecs/", kernelSpecsHandler)
mux.Handle("/kernelspecs/", kernelSpecsHandler)
mux.Handle("/api/kernels", kernelsHandler)
mux.Handle("/api/kernels/", kernelsHandler)
mux.Handle("/api/sessions", sessionsHandler)
mux.Handle("/api/sessions/", sessionsHandler)
mux.Handle("/", localProxy)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if *logRequestHeaders {
util.Log(r, fmt.Sprintf("Request headers: %+v", r.Header))
}
if *logAllRequestResponses {
var buff *bytes.Buffer
if *logAllResponseBodies {
var responseBuff bytes.Buffer
buff = &responseBuff
defer func() {
go func(respBuff *bytes.Buffer) {
util.Log(r, fmt.Sprintf("Response body: %q", respBuff.String()))
}(buff)
}()
}
w = util.NewLoggingResponseWriter(w, r, buff)
}
if len(*jupyterToken) > 0 {
if token := r.Header.Get("token"); token != *jupyterToken {
util.Log(r, fmt.Sprintf("Token mismatch: %q", token))
http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden)
return
}
}
if err := util.CheckXSRF(r); err != nil {
util.Log(r, fmt.Sprintf("XSRF error: %v", err))
http.Error(w, err.Error(), util.HTTPStatusCode(err))
return
}
if !websocket.IsWebSocketUpgrade(r) {
ctx, cancel := context.WithTimeout(r.Context(), *contextRequestTimeout)
defer cancel()
r = r.WithContext(ctx)
}
mux.ServeHTTP(w, r)
})
localAddress := fmt.Sprintf("[::1]:%d", *port)
log.Printf("Listening on %q...\n", localAddress)
log.Fatal(http.ListenAndServe(localAddress, nil))
}