src/google/appengine/api/apiproxy_stub.py (58 lines of code) (raw):
#!/usr/bin/env python
#
# Copyright 2007 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Base class for implementing API proxy stubs."""
import logging
import random
import threading
from google.appengine.api import apiproxy_rpc
from google.appengine.api import request_info
from google.appengine.runtime import apiproxy_errors
MAX_REQUEST_SIZE = 1 << 20
REQ_SIZE_EXCEEDS_LIMIT_MSG_TEMPLATE = ('The request to API call %s.%s() was too'
' large.')
logging.getLogger('google.appengine.api.stubs').setLevel(logging.INFO)
class APIProxyStub(object):
"""Base class for implementing API proxy stub classes.
To implement an API proxy stub:
- Extend this class.
- Override `__init__` to pass in appropriate default service name.
- Implement service methods as `_Dynamic_<method>(request, response)`.
"""
_ACCEPTS_REQUEST_ID = False
THREADSAFE = False
def __init__(self, service_name, max_request_size=MAX_REQUEST_SIZE,
request_data=None):
"""Constructor.
Args:
service_name: Service name expected for all calls.
max_request_size: `int`. Maximum allowable size of the incoming request.
An `apiproxy_errors.RequestTooLargeError` will be raised if the inbound
request exceeds this size. Default is 1 MB. Subclasses can override it.
request_data: A `request_info.RequestInfo` instance used to look up
state associated with the request that generated an API call.
"""
self.__service_name = service_name
self.__max_request_size = max_request_size
self.request_data = request_data or request_info._local_request_info
self._mutex = threading.RLock()
self.__error = None
self.__error_dict = {}
def CreateRPC(self):
"""Creates RPC object instance.
Returns:
An instance of RPC.
"""
return apiproxy_rpc.RPC(stub=self)
def CheckRequest(self, service, call, request):
"""Check if a request meet some common restrictions.
Args:
service: Must be name as provided to `service_name` of constructor.
call: A string representing the rpc to make.
request: A protocol buffer of the type corresponding to `call`.
"""
assert service == self.__service_name, ('Expected "%s" service name, '
'was "%s"' % (self.__service_name,
service))
if request.ByteSize() > self.__max_request_size:
raise apiproxy_errors.RequestTooLargeError(
REQ_SIZE_EXCEEDS_LIMIT_MSG_TEMPLATE % (service, call))
messages = []
assert request.IsInitialized(messages), messages
def MakeSyncCall(self, service, call, request, response, request_id=None):
"""The main RPC entry point.
Args:
service: Must be name as provided to `service_name` of constructor.
call: A string representing the rpc to make. Must be part of
the underlying services methods and impemented by `_Dynamic_<call>`.
request: A protocol buffer of the type corresponding to `call`.
response: A protocol buffer of the type corresponding to `call`.
request_id: A unique string identifying the request associated with the
API call.
"""
self.CheckRequest(service, call, request)
exception_type, frequency = self.__error_dict.get(call, (None, None))
if exception_type and frequency:
if random.random() <= frequency:
raise exception_type
if self.__error:
if random.random() <= self.__error_rate:
raise self.__error
method = getattr(self, '_Dynamic_' + call)
if self._ACCEPTS_REQUEST_ID:
method(request, response, request_id)
else:
method(request, response)
def SetError(self, error, method=None, error_rate=1):
"""Set an error condition that may be raised when calls made to stub.
If a method is specified, the error will only apply to that call.
The error rate is applied to the method specified or all calls if
method is not set.
Args:
error: An instance of `apiproxy_errors.Error` or `None` for no error.
method: A string representing the method that the error will affect.
error_rate: a number from `[0, 1]` that sets the chance of the error,
defaults to `1`.
"""
assert error is None or isinstance(error, apiproxy_errors.Error)
if method and error:
self.__error_dict[method] = error, error_rate
else:
self.__error_rate = error_rate
self.__error = error
def Synchronized(method):
"""Decorator to acquire a mutex around an `APIProxyStub` method.
Args:
method: An unbound method of `APIProxyStub` or a subclass.
Returns:
The `method`, altered such it acquires `self._mutex` throughout its
execution.
"""
def WrappedMethod(self, *args, **kwargs):
with self._mutex:
return method(self, *args, **kwargs)
return WrappedMethod