in bindings/go/src/_stacktester/stacktester.go [439:900]
func (sm *StackMachine) processInst(idx int, inst tuple.Tuple) {
defer func() {
if r := recover(); r != nil {
switch r := r.(type) {
case fdb.Error:
sm.store(idx, []byte(tuple.Tuple{[]byte("ERROR"), []byte(fmt.Sprintf("%d", r.Code))}.Pack()))
default:
panic(r)
}
}
}()
var e error
op := inst[0].(string)
if sm.verbose {
fmt.Printf("%d. Instruction is %s (%v)\n", idx, op, sm.prefix)
fmt.Printf("Stack from [")
sm.dumpStack()
fmt.Printf(" ] (%d)\n", len(sm.stack))
}
var t fdb.Transactor
var rt fdb.ReadTransactor
var isDB bool
switch {
case strings.HasSuffix(op, "_SNAPSHOT"):
rt = sm.currentTransaction().Snapshot()
op = op[:len(op)-9]
case strings.HasSuffix(op, "_DATABASE"):
t = db
rt = db
op = op[:len(op)-9]
isDB = true
default:
t = sm.currentTransaction()
rt = sm.currentTransaction()
}
switch {
case op == "PUSH":
sm.store(idx, inst[1])
case op == "DUP":
entry := sm.stack[len(sm.stack)-1]
sm.store(entry.idx, entry.item)
case op == "EMPTY_STACK":
sm.stack = []stackEntry{}
sm.stack = make([]stackEntry, 0)
case op == "SWAP":
idx := sm.waitAndPop().item.(int64)
sm.stack[len(sm.stack)-1], sm.stack[len(sm.stack)-1-int(idx)] = sm.stack[len(sm.stack)-1-int(idx)], sm.stack[len(sm.stack)-1]
case op == "POP":
sm.stack = sm.stack[:len(sm.stack)-1]
case op == "SUB":
var x, y *big.Int
switch x1 := sm.waitAndPop().item.(type) {
case *big.Int:
x = x1
case int64:
x = big.NewInt(x1)
case uint64:
x = new(big.Int)
x.SetUint64(x1)
}
switch y1 := sm.waitAndPop().item.(type) {
case *big.Int:
y = y1
case int64:
y = big.NewInt(y1)
case uint64:
y = new(big.Int)
y.SetUint64(y1)
}
sm.store(idx, x.Sub(x, y))
case op == "CONCAT":
str1 := sm.waitAndPop().item
str2 := sm.waitAndPop().item
switch str1.(type) {
case string:
sm.store(idx, str1.(string)+str2.(string))
case []byte:
sm.store(idx, append(str1.([]byte), str2.([]byte)...))
default:
panic("Invalid CONCAT parameter")
}
case op == "NEW_TRANSACTION":
sm.newTransaction()
case op == "USE_TRANSACTION":
sm.switchTransaction(sm.waitAndPop().item.([]byte))
case op == "ON_ERROR":
sm.store(idx, sm.currentTransaction().OnError(fdb.Error{int(sm.waitAndPop().item.(int64))}))
case op == "GET_READ_VERSION":
_, e = rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
sm.lastVersion = rtr.GetReadVersion().MustGet()
sm.store(idx, []byte("GOT_READ_VERSION"))
return nil, nil
})
if e != nil {
panic(e)
}
case op == "SET":
key := fdb.Key(sm.waitAndPop().item.([]byte))
value := sm.waitAndPop().item.([]byte)
sm.executeMutation(t, func(tr fdb.Transaction) (interface{}, error) {
tr.Set(key, value)
return nil, nil
}, isDB, idx)
case op == "LOG_STACK":
prefix := sm.waitAndPop().item.([]byte)
entries := make(map[int]stackEntry)
for len(sm.stack) > 0 {
entries[len(sm.stack)-1] = sm.waitAndPop()
if len(entries) == 100 {
sm.logStack(entries, prefix)
entries = make(map[int]stackEntry)
}
}
sm.logStack(entries, prefix)
case op == "GET":
key := fdb.Key(sm.waitAndPop().item.([]byte))
res, e := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
return rtr.Get(key), nil
})
if e != nil {
panic(e)
}
sm.store(idx, res.(fdb.FutureByteSlice))
case op == "GET_ESTIMATED_RANGE_SIZE":
r := sm.popKeyRange()
_, e := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
_ = rtr.GetEstimatedRangeSizeBytes(r).MustGet()
sm.store(idx, []byte("GOT_ESTIMATED_RANGE_SIZE"))
return nil, nil
})
if e != nil {
panic(e)
}
case op == "GET_RANGE_SPLIT_POINTS":
r := sm.popKeyRange()
chunkSize := sm.waitAndPop().item.(int64)
_, e := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
_ = rtr.GetRangeSplitPoints(r, chunkSize).MustGet()
sm.store(idx, []byte("GOT_RANGE_SPLIT_POINTS"))
return nil, nil
})
if e != nil {
panic(e)
}
case op == "COMMIT":
sm.store(idx, sm.currentTransaction().Commit())
case op == "RESET":
sm.currentTransaction().Reset()
case op == "CLEAR":
key := fdb.Key(sm.waitAndPop().item.([]byte))
sm.executeMutation(t, func(tr fdb.Transaction) (interface{}, error) {
tr.Clear(key)
return nil, nil
}, isDB, idx)
case op == "SET_READ_VERSION":
sm.currentTransaction().SetReadVersion(sm.lastVersion)
case op == "WAIT_FUTURE":
entry := sm.waitAndPop()
sm.store(entry.idx, entry.item)
case op == "GET_COMMITTED_VERSION":
sm.lastVersion, e = sm.currentTransaction().GetCommittedVersion()
if e != nil {
panic(e)
}
sm.store(idx, []byte("GOT_COMMITTED_VERSION"))
case op == "GET_APPROXIMATE_SIZE":
_ = sm.currentTransaction().GetApproximateSize().MustGet()
sm.store(idx, []byte("GOT_APPROXIMATE_SIZE"))
case op == "GET_VERSIONSTAMP":
sm.store(idx, sm.currentTransaction().GetVersionstamp())
case op == "GET_KEY":
sel := sm.popSelector()
prefix := sm.waitAndPop().item.([]byte)
res, e := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
return rtr.GetKey(sel).MustGet(), nil
})
if e != nil {
panic(e)
}
key := res.(fdb.Key)
if bytes.HasPrefix(key, prefix) {
sm.store(idx, key)
} else if bytes.Compare(key, prefix) < 0 {
sm.store(idx, prefix)
} else {
s, e := fdb.Strinc(prefix)
if e != nil {
panic(e)
}
sm.store(idx, s)
}
case strings.HasPrefix(op, "GET_RANGE"):
var r fdb.Range
switch op[9:] {
case "_STARTS_WITH":
r = sm.popPrefixRange()
case "_SELECTOR":
r = fdb.SelectorRange{sm.popSelector(), sm.popSelector()}
case "":
r = sm.popKeyRange()
}
ro := sm.popRangeOptions()
var prefix []byte = nil
if op[9:] == "_SELECTOR" {
prefix = sm.waitAndPop().item.([]byte)
}
res, e := rt.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
return rtr.GetRange(r, ro).GetSliceOrPanic(), nil
})
if e != nil {
panic(e)
}
sm.pushRange(idx, res.([]fdb.KeyValue), prefix)
case strings.HasPrefix(op, "CLEAR_RANGE"):
var er fdb.ExactRange
switch op[11:] {
case "_STARTS_WITH":
er = sm.popPrefixRange()
case "":
er = sm.popKeyRange()
}
sm.executeMutation(t, func(tr fdb.Transaction) (interface{}, error) {
tr.ClearRange(er)
return nil, nil
}, isDB, idx)
case op == "TUPLE_PACK":
var t tuple.Tuple
count := sm.waitAndPop().item.(int64)
for i := 0; i < int(count); i++ {
t = append(t, sm.waitAndPop().item)
}
sm.store(idx, []byte(t.Pack()))
case op == "TUPLE_PACK_WITH_VERSIONSTAMP":
var t tuple.Tuple
prefix := sm.waitAndPop().item.([]byte)
c := sm.waitAndPop().item.(int64)
for i := 0; i < int(c); i++ {
t = append(t, sm.waitAndPop().item)
}
packed, err := t.PackWithVersionstamp(prefix)
if err != nil && strings.Contains(err.Error(), "No incomplete") {
sm.store(idx, []byte("ERROR: NONE"))
} else if err != nil {
sm.store(idx, []byte("ERROR: MULTIPLE"))
} else {
sm.store(idx, []byte("OK"))
sm.store(idx, packed)
}
case op == "TUPLE_UNPACK":
t, e := tuple.Unpack(fdb.Key(sm.waitAndPop().item.([]byte)))
if e != nil {
panic(e)
}
for _, el := range t {
sm.store(idx, []byte(tuple.Tuple{el}.Pack()))
}
case op == "TUPLE_SORT":
count := sm.waitAndPop().item.(int64)
tuples := make([]tuple.Tuple, count)
for i := 0; i < int(count); i++ {
tuples[i], e = tuple.Unpack(fdb.Key(sm.waitAndPop().item.([]byte)))
if e != nil {
panic(e)
}
}
sort.Sort(byBytes(tuples))
for _, t := range tuples {
sm.store(idx, t.Pack())
}
case op == "ENCODE_FLOAT":
val_bytes := sm.waitAndPop().item.([]byte)
var val float32
binary.Read(bytes.NewBuffer(val_bytes), binary.BigEndian, &val)
sm.store(idx, val)
case op == "ENCODE_DOUBLE":
val_bytes := sm.waitAndPop().item.([]byte)
var val float64
binary.Read(bytes.NewBuffer(val_bytes), binary.BigEndian, &val)
sm.store(idx, val)
case op == "DECODE_FLOAT":
val := sm.waitAndPop().item.(float32)
var ibuf bytes.Buffer
binary.Write(&ibuf, binary.BigEndian, val)
sm.store(idx, ibuf.Bytes())
case op == "DECODE_DOUBLE":
val := sm.waitAndPop().item.(float64)
var ibuf bytes.Buffer
binary.Write(&ibuf, binary.BigEndian, val)
sm.store(idx, ibuf.Bytes())
case op == "TUPLE_RANGE":
var t tuple.Tuple
count := sm.waitAndPop().item.(int64)
for i := 0; i < int(count); i++ {
t = append(t, sm.waitAndPop().item)
}
bk, ek := t.FDBRangeKeys()
sm.store(idx, []byte(bk.FDBKey()))
sm.store(idx, []byte(ek.FDBKey()))
case op == "START_THREAD":
newsm := newStackMachine(sm.waitAndPop().item.([]byte), verbose)
sm.threads.Add(1)
go func() {
newsm.Run()
sm.threads.Done()
}()
case op == "WAIT_EMPTY":
prefix := sm.waitAndPop().item.([]byte)
er, e := fdb.PrefixRange(prefix)
if e != nil {
panic(e)
}
db.Transact(func(tr fdb.Transaction) (interface{}, error) {
v := tr.GetRange(er, fdb.RangeOptions{}).GetSliceOrPanic()
if len(v) != 0 {
panic(fdb.Error{1020})
}
return nil, nil
})
sm.store(idx, []byte("WAITED_FOR_EMPTY"))
case op == "READ_CONFLICT_RANGE":
e = sm.currentTransaction().AddReadConflictRange(fdb.KeyRange{fdb.Key(sm.waitAndPop().item.([]byte)), fdb.Key(sm.waitAndPop().item.([]byte))})
if e != nil {
panic(e)
}
sm.store(idx, []byte("SET_CONFLICT_RANGE"))
case op == "WRITE_CONFLICT_RANGE":
e = sm.currentTransaction().AddWriteConflictRange(fdb.KeyRange{fdb.Key(sm.waitAndPop().item.([]byte)), fdb.Key(sm.waitAndPop().item.([]byte))})
if e != nil {
panic(e)
}
sm.store(idx, []byte("SET_CONFLICT_RANGE"))
case op == "READ_CONFLICT_KEY":
e = sm.currentTransaction().AddReadConflictKey(fdb.Key(sm.waitAndPop().item.([]byte)))
if e != nil {
panic(e)
}
sm.store(idx, []byte("SET_CONFLICT_KEY"))
case op == "WRITE_CONFLICT_KEY":
e = sm.currentTransaction().AddWriteConflictKey(fdb.Key(sm.waitAndPop().item.([]byte)))
if e != nil {
panic(e)
}
sm.store(idx, []byte("SET_CONFLICT_KEY"))
case op == "ATOMIC_OP":
opname := strings.Replace(strings.Title(strings.Replace(strings.ToLower(sm.waitAndPop().item.(string)), "_", " ", -1)), " ", "", -1)
key := fdb.Key(sm.waitAndPop().item.([]byte))
ival := sm.waitAndPop().item
value := ival.([]byte)
sm.executeMutation(t, func(tr fdb.Transaction) (interface{}, error) {
reflect.ValueOf(tr).MethodByName(opname).Call([]reflect.Value{reflect.ValueOf(key), reflect.ValueOf(value)})
return nil, nil
}, isDB, idx)
case op == "DISABLE_WRITE_CONFLICT":
sm.currentTransaction().Options().SetNextWriteNoWriteConflictRange()
case op == "CANCEL":
sm.currentTransaction().Cancel()
case op == "UNIT_TESTS":
db.Options().SetLocationCacheSize(100001)
db.Options().SetMaxWatches(10001)
db.Options().SetDatacenterId("dc_id")
db.Options().SetMachineId("machine_id")
db.Options().SetSnapshotRywEnable()
db.Options().SetSnapshotRywDisable()
db.Options().SetTransactionLoggingMaxFieldLength(1000)
db.Options().SetTransactionTimeout(100000)
db.Options().SetTransactionTimeout(0)
db.Options().SetTransactionMaxRetryDelay(100)
db.Options().SetTransactionRetryLimit(10)
db.Options().SetTransactionRetryLimit(-1)
db.Options().SetTransactionCausalReadRisky()
db.Options().SetTransactionIncludePortInAddress()
if !fdb.IsAPIVersionSelected() {
log.Fatal("API version should be selected")
}
apiVersion := fdb.MustGetAPIVersion()
if apiVersion == 0 {
log.Fatal("API version is 0")
}
e1 := fdb.APIVersion(apiVersion + 1)
if e1 != nil {
fdbE := e1.(fdb.Error)
if fdbE.Code != 2201 {
panic(e1)
}
} else {
log.Fatal("Was not stopped from selecting two API versions")
}
e2 := fdb.APIVersion(apiVersion - 1)
if e2 != nil {
fdbE := e2.(fdb.Error)
if fdbE.Code != 2201 {
panic(e2)
}
} else {
log.Fatal("Was not stopped from selecting two API versions")
}
fdb.MustAPIVersion(apiVersion)
_, e := db.Transact(func(tr fdb.Transaction) (interface{}, error) {
tr.Options().SetPrioritySystemImmediate()
tr.Options().SetPriorityBatch()
tr.Options().SetCausalReadRisky()
tr.Options().SetCausalWriteRisky()
tr.Options().SetReadYourWritesDisable()
tr.Options().SetReadSystemKeys()
tr.Options().SetAccessSystemKeys()
tr.Options().SetTransactionLoggingMaxFieldLength(1000)
tr.Options().SetTimeout(60 * 1000)
tr.Options().SetRetryLimit(50)
tr.Options().SetMaxRetryDelay(100)
tr.Options().SetUsedDuringCommitProtectionDisable()
tr.Options().SetDebugTransactionIdentifier("my_transaction")
tr.Options().SetLogTransaction()
tr.Options().SetReadLockAware()
tr.Options().SetLockAware()
tr.Options().SetIncludePortInAddress()
return tr.Get(fdb.Key("\xff")).MustGet(), nil
})
if e != nil {
panic(e)
}
sm.testWatches()
sm.testLocality()
case strings.HasPrefix(op, "DIRECTORY_"):
sm.de.processOp(sm, op[10:], isDB, idx, t, rt)
default:
log.Fatalf("Unhandled operation %s\n", string(inst[0].([]byte)))
}
if sm.verbose {
fmt.Printf(" to [")
sm.dumpStack()
fmt.Printf(" ] (%d)\n\n", len(sm.stack))
}
runtime.Gosched()
}