in src/google/appengine/ext/ndb/context.py [0:0]
def transaction(self, callback, **ctx_options):
options = _make_ctx_options(ctx_options, TransactionOptions)
propagation = TransactionOptions.propagation(options)
if propagation is None:
propagation = TransactionOptions.NESTED
mode = datastore_rpc.TransactionMode.READ_WRITE
if ctx_options.get('read_only', False):
mode = datastore_rpc.TransactionMode.READ_ONLY
parent = self
if propagation == TransactionOptions.NESTED:
if self.in_transaction():
raise datastore_errors.BadRequestError(
'Nested transactions are not supported.')
elif propagation == TransactionOptions.MANDATORY:
if not self.in_transaction():
raise datastore_errors.BadRequestError(
'Requires an existing transaction.')
result = callback()
if isinstance(result, tasklets.Future):
result = yield result
raise tasklets.Return(result)
elif propagation == TransactionOptions.ALLOWED:
if self.in_transaction():
result = callback()
if isinstance(result, tasklets.Future):
result = yield result
raise tasklets.Return(result)
elif propagation == TransactionOptions.INDEPENDENT:
while parent.in_transaction():
parent = parent._parent_context
if parent is None:
raise datastore_errors.BadRequestError(
'Context without non-transactional ancestor')
else:
raise datastore_errors.BadArgumentError(
'Invalid propagation value (%s).' % (propagation,))
app = TransactionOptions.app(options) or key_module._DefaultAppId()
retries = TransactionOptions.retries(options)
if retries is None:
retries = 3
yield parent.flush()
transaction = None
tconn = None
for _ in range(1 + max(0, retries)):
previous_transaction = (
transaction
if mode == datastore_rpc.TransactionMode.READ_WRITE else None)
transaction = yield (parent._conn.async_begin_transaction(
options, app,
previous_transaction,
mode))
tconn = datastore_rpc.TransactionalConnection(
adapter=parent._conn.adapter,
config=parent._conn.config,
transaction=transaction,
_api_version=parent._conn._api_version)
tctx = parent.__class__(conn=tconn,
auto_batcher_class=parent._auto_batcher_class,
parent_context=parent)
tctx._old_ds_conn = datastore._GetConnection()
ok = False
try:
tctx.set_memcache_policy(parent.get_memcache_policy())
tctx.set_memcache_timeout_policy(parent.get_memcache_timeout_policy())
tasklets.set_context(tctx)
datastore._SetConnection(tconn)
try:
try:
result = callback()
if isinstance(result, tasklets.Future):
result = yield result
finally:
yield tctx.flush()
except GeneratorExit:
raise
except Exception:
t, e, tb = sys.exc_info()
tconn.async_rollback(options)
if issubclass(t, datastore_errors.Rollback):
return
else:
six.reraise(t, e, tb)
else:
ok = yield tconn.async_commit(options)
if ok:
parent._cache.update(tctx._cache)
yield parent._clear_memcache(tctx._cache)
raise tasklets.Return(result)
finally:
datastore._SetConnection(tctx._old_ds_conn)
del tctx._old_ds_conn
if ok:
for on_commit_callback in tctx._on_commit_queue:
on_commit_callback()
tconn.async_rollback(options)
raise datastore_errors.TransactionFailedError(
'The transaction could not be committed. Please try again.')