in alpha/alpha-spec-saga-akka/src/main/java/org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaActor.java [399:464]
private void beforeStop(BaseEvent event, SagaActorState state, SagaData data){
if (LOG.isDebugEnabled()) {
LOG.debug("stop [{}] {}", data.getGlobalTxId(), state);
}
try{
sagaEndTime = System.currentTimeMillis();
data.setLastState(state);
data.setEndTime(new Date());
data.setTerminated(true);
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem())
.stopSagaData(data.getGlobalTxId(), data);
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaEndCounter();
SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system())
.doSagaAvgTime(sagaEndTime - sagaBeginTime);
// destroy self from cluster shard region
getContext().getParent()
.tell(new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf());
// clear self mailbox from persistence
// 已经停止的Actor使用以下两个命令清理,但是 highestSequenceNr 不会被删除,需要手工清理
// 以下基于 journal-redis 说明:
// 假设 globalTxId=ed2cdb9c-e86c-4b01-9f43-8e34704e7694, 那么在 Redis 中会生成三个 key
// journal:persistenceIds
// journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694
// journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr
//
// 1. journal:persistenceIds 是 set 类型, 记录了所有的 globalTxId, 使用 smembers journal:persistenceIds 可以看到
// 2. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 是 zset 类型, 记录了这个事务的所有事件
// 使用 zrange journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694 1 -1 可以看到
// 3. journal:persisted:ed2cdb9c-e86c-4b01-9f43-8e34704e7694:highestSequenceNr 是 string 类型, 里面记录这序列号
//
// 何如清理:
// 通过 deleteMessages 和 deleteSnapshot 可以清理部分数据,但是 highestSequenceNr 还是无法自动删除,需要定期手动清理
// 遍历 journal:persistenceIds 集合,用每一条数据item拼接成key journal:persisted:item 和 journal:persisted:item:highestSequenceNr
// 如果没有成对出现就说明是已经终止的actor 那么可以将 journal:persisted:item 从 journal:persistenceIds 删除
// 并删除 journal:persisted:item:highestSequenceNr
//
// 目前可以看到的解释是 https://github.com/akka/akka/issues/21181
//
// Lua script akka-persistence-redis-clean.lua
// local ids = redis.call('smembers','journal:persistenceIds');
// local delkeys = {};
// for k, v in pairs(ids) do
// local jpid = 'journal:persisted:' .. v;
// local jpidnr = 'journal:persisted:' .. v .. ':highestSequenceNr';
// local hasjpid = redis.call('exists',jpid);
// if(hasjpid == 0)
// then
// local hasjpidnr = redis.call('exists',jpidnr);
// if(hasjpidnr == 1)
// then
// redis.call('del',jpidnr);
// table.insert(delkeys,jpid);
// end
// end
// end
// return delkeys;
deleteMessages(lastSequenceNr());
deleteSnapshot(snapshotSequenceNr());
}catch(Exception e){
LOG.error("stop [{}] fail",data.getGlobalTxId());
throw e;
}
}