custos-client-sdks/custos_jupyterhub_authenticator/build/lib/custosauthenticator/custos.py [72:194]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    @default("authorize_url")
    def _authorize_url_default(self):
        return "{}auth".format(self.iam_uri)

    @default("token_url")
    def _token_url_default(self):
        return "https://{}/apiserver/identity-management/v1.0.0/token".format(self.custos_host)

    scope = List(
        Unicode(),
        default_value=['openid', 'email', 'org.cilogon.userinfo'],
        config=True,
        help="""The OAuth scopes to request.
        See cilogon_scope.md for details.
        At least 'openid' is required.
        """, )

    allowed_groups = List(
        config=True, )

    @validate('scope')
    def _validate_scope(self, proposal):
        """ensure openid is requested"""

        if 'openid' not in proposal.value:
            return ['openid'] + proposal.value
        return proposal.value

    async def authenticate(self, handler, data=None):
        """We set up auth_state based on additional Custos info if we
            receive it.
            """

        code = handler.get_argument("code")

        authS = "{}:{}".format(self.client_id, self.client_secret)
        tokenByte = authS.encode('utf-8')
        encodedBytes = base64.b64encode(tokenByte)
        auth_string = encodedBytes.decode('utf-8')
        headers = {"Accept": "application/json", "User-Agent": "JupyterHub",
                   "Authorization": "Bearer {}".format(auth_string)}

        params = dict(
            client_id=self.client_id,
            client_secret=self.client_secret,
            redirect_uri=self.oauth_callback_url,
            code=code,
            grant_type='authorization_code',
        )

        url = url_concat(self.token_url, params)

        req = HTTPRequest(url, headers=headers, method="POST", body='')

        token_response = await self.fetch(req)
        access_token = token_response['access_token']

        # Determine who the logged in user is
        params = dict(access_token=access_token)
        req = HTTPRequest(
            url_concat("https://{}/apiserver/identity-management/v1.0.0/user".format(self.custos_host), params),
            headers=headers,
        )
        resp_json = await  self.fetch(req)

        userdict = {"name": resp_json['username']}
        # Now we set up auth_state
        userdict["auth_state"] = auth_state = {}
        # Save the token response and full Custos reply in auth state
        # These can be used for user provisioning
        #  in the Lab/Notebook environment.
        auth_state['token_response'] = token_response
        # store the whole user model in auth_state.custos_user
        # keep access_token as well, in case anyone was relying on it
        auth_state['access_token'] = access_token
        auth_state['custos_user'] = resp_json
        return userdict

    async def pre_spawn_start(self, user, spawner):
        """Pass upstream_token to spawner via environment variable"""
        app_log.debug("Calling pre_spawn_start")
        auth_state = await user.get_auth_state()
        if not auth_state:
            # auth_state not enabled
            app_log.debug("Auth state not enabled")
            return

        authentication_status = await self.is_user_authorized_to_spawn_server(user.name)
        if not authentication_status:
            msg = "User {} is not authorized to start a server".format(user.name)
            raise HTTPError(401, msg)
        spawner.environment['UPSTREAM_TOKEN'] = auth_state['access_token']

    async def is_user_authorized_to_spawn_server(self, username):

        authS = "{}:{}".format(self.client_id, self.client_secret)
        tokenByte = authS.encode('utf-8')
        encodedBytes = base64.b64encode(tokenByte)
        auth_string = encodedBytes.decode('utf-8')
        headers = {"Accept": "application/json", "User-Agent": "JupyterHub",
                   "Authorization": "Bearer {}".format(auth_string)}

        # Determine who the logged in user is
        key = ['profile.username']
        value = [username]

        params = dict(zip(key, value))

        url = url_concat(self.group_uri, params)

        req = HTTPRequest(url, headers=headers, method="GET")

        group_response = await self.fetch(req)

        user_groups = group_response['groups']
        matched = False

        for group in user_groups:
            if group['id']  in self.allowed_groups:
                matched = True
                return matched

        return matched
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



