实现 Merkle Patricia Tree
date
Jan 14, 2023
slug
implement-mpt
status
Published
tags
Blockchain
Ethereum
summary
Merkle Patricia Tree (MPT) 是以太坊的一种存储结构。它不仅可以支持增删查改的操作,而且它还能对查询的结果提供可验证的功能。
type
Post
一、前置工作
1.1 哈希函数
以太坊中使用的哈希函数是 Keccak256,是一种 SHA3 的变种。将数据传入后,这个函数会返回该数据的哈希值(256位,64个十六进制数)。
func Keccak256(data ...[]byte) []byte {
d := sha3.NewLegacyKeccak256()
for _, b := range data {
d.Write(b)
}
return d.Sum(nil)
}1.2 Nibble(半字节)
关于 nibble 的信息请浏览以太坊中的半字节(nibble),这里只会具体讲述它在 MPT 里的作用。
虽然 Go 实现 nibble 只能使用 byte 类型去替代,不能做到真正的半字节。但是这只是在内存中,后续存储到磁盘前,会将相邻的两个半字节拼接成一个字节,再存储到磁盘中。
type Nibble byte1.2.1 nibble 是 MPT 查询时最小的移动单位
以太坊中的以账户为例,对于一个存储的键值对
(key, value) ,key 存放的是 Keccak256(accountAddress),即用户地址(区块链中唯一)的哈希值。value 存放的是 rlp.EncodeToBytes(accountData)),即对应用户数据的 RLP 编码。而在 MPT 的寻找数据存放的过程中,最小的移动单位是 nibble,不是 byte。假设现在有一个字节切片:
[1, 2, 3, 4, 5],在 MPT 的眼中则是:[0, 1, 0, 2, 0, 3, 0, 4, 0, 5]。1.2.2 nibble 可作为前缀标记 MPT 节点类型
在 MPT 有三种类型的节点:extension, leaf 和 branch。它们在序列化和反序列化时,需要对它们进行区分。但只有 extension 和 leaf 类型的节点需要被区分,branch 节点的序列化方式已经和它们区别开了。
extension 和 leaf 类型节点在序列化只需要两个值:Path、Next(extension)或 Path、Value(leaf),使用 RLP 时序列化完全一致,必须添加前缀进行区分。这两个类型的节点的 Path 最小移动单位是 nibble,会出现组成 Path 的 nibble 为奇数个的情况,所以 Path 的 nibble 最后一定要拼接成 byte。
hex char bits | node type partial path length
----------------------------------------------------------
0 0000 | extension even
1 0001 | extension odd
2 0010 | terminating (leaf) even
3 0011 | terminating (leaf) odd因为这两个原因,给 extension 和 leaf 类型的节点添加前缀,不仅可以对节点类型进行区分,也能将 Path 的 nibble 补齐成偶数个。下面的例子中,每个字符都可认为是一个 nibble:
key: 12 34 56 | extension: 00 12 34 56
key: 12 34 5 | extension: 11 23 45
---------------------------------------------------
key: 12 34 56 | leaf: 20 12 34 56
key: 12 34 5 | leaf: 31 23 45偶数个 nibble 的 Path 如果添加单个前缀就会变成奇数个,所以偶数个 nibble 的 Path 添加前缀时一次添加两个 nibble。第一个为前缀标志位,第二个永远为 0。奇数个 nibble 的 Path 仅用添加一个前缀标志位即可变成偶数个 nibble。
1.2.3 比较重要的函数
ToPrefixed:根据传入的 nibble 切片添加前缀
func ToPrefixed(ns []Nibble, isLeafNode bool) []Nibble {
var prefixBytes []Nibble
if len(ns)%2 > 0 {
prefixBytes = []Nibble{1}
} else {
prefixBytes = []Nibble{0, 0}
}
prefixed := make([]Nibble, 0, len(prefixBytes)+len(ns))
prefixed = append(prefixed, prefixBytes...)
prefixed = append(prefixed, ns...)
if isLeafNode {
prefixed[0] += 2
}
return prefixed
}- ToBytes:将传入的 nibble 切片拼接成 byte 切片
func ToBytes(ns []Nibble) []byte {
buf := make([]byte, 0, len(ns)/2)
for i := 0; i < len(ns); i += 2 {
b := byte(ns[i]<<4) + byte(ns[i+1])
buf = append(buf, b)
}
return buf
}二、MPT 节点

2.1 节点接口(Node Interface)
如果一个变量是节点类型,它一定要实现
Hash 函数和 Raw 函数。前者会返回该节点的哈希值,后者会返回该节点数据的 RLP 序列化值。type Node interface {
Hash() []byte
Raw() []interface{}
}2.2 空节点(Empty Node)
一个节点什么也没有就是空节点。比如在 Go 中如果一个
Node 类型的值为 nil,那它就是空节点。空节点调用 Raw 返回的是空字节切片,因为什么也没有。空节点调用 Hash 返回的是一个固定的哈希值 "56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"。这个固定的哈希值是由 0x80(空字节数组的 RLP 编码值) 经过 Keccak256 计算得到 keccak-256在线计算 - StrErr.com
- Raw:返回空字节切片
- Hash:返回固定哈希值
2.3 拓展节点(Extension Node)
extension 节点用于合并多个路径上共同的 nibble。它只会储存两个值:shared nibble 和 next node。
序列化时 shared nibble 会添加前缀,而 next node 会根据它序列化值的长度,选择是否使用 next node 的哈希值。(如果 next node 长度大于等于 32,则使用哈希值,如果 next node 序列化值的长度小于 32,则可以直接将序列化值存放在 extension 节点中)
func (e ExtensionNode) Raw() []interface{} {
hashes := make([]interface{}, 2)
hashes[0] = ToBytes(ToPrefixed(e.Path, false))
if len(Serialize(e.Next)) >= 32 {
hashes[1] = e.Next.Hash()
} else {
hashes[1] = e.Next.Raw()
}
return hashes
}2.4 叶子节点(Leaf Node)
leaf 节点用于存放某个路径的终点。它只会储存两个值:path 和 value。
序列化时只需要给 path 添加前缀,而 value 原封不动。
func (l LeafNode) Raw() []interface{} {
path := ToBytes(ToPrefixed(l.Path, true))
raw := []interface{}{path, l.Value}
return raw
}2.5 分支节点(Branch Node)
branch 节点既有 extension 节点的 next node 指针,也能像 leaf 节点一样存放一个值。它总共存放 17 个值,即 16 个 next node 和 1 个 value。
序列化时,16 个 next node 像 extension 一样处理,next node 序列化值长度大于等于 32,则使用哈希值,反之使用序列化值。1 个 value 像 leaf 一样处理, value 原封不动。
func (b BranchNode) Raw() []interface{} {
hashes := make([]interface{}, 17)
for i := 0; i < 16; i++ {
if b.Branches[i] == nil {
hashes[i] = EmptyNodeRaw
} else {
node := b.Branches[i]
if len(Serialize(node)) >= 32 {
hashes[i] = node.Hash()
} else {
hashes[i] = node.Raw()
}
}
}
hashes[16] = b.Value
return hashes
}三、MPT 操作
3.1 Get
func (t *Trie) Get(key []byte) ([]byte, bool) {
// node 和 nibbles 在 for 循环过程中会不断变化
// node: 当前整停留的节点
// nibbles: 当前剩余匹配的 nibbles
node := t.root
nibbles := FromBytes(key)
for {
// 当停留在一个 EmptyNode,查询失败
if IsEmptyNode(node) {
return nil, false
}
// 当停留在一个 LeafNode,看看其 Path 和剩余 nibble 是否完全匹配
// 如果不匹配,查询失败
// 如果完全匹配,返回 LeafNode 的 value
if leaf, ok := node.(*LeafNode); ok {
matched := PrefixMatchedLen(leaf.Path, nibbles)
if matched != len(leaf.Path) || matched != len(nibbles) {
return nil, false
}
return leaf.Value, true
}
// 当停留在一个 BranchNode,看看是否有剩余的 nibbles
// 如果没有剩余的 nibbles,则返回 BranchNode 存放的 value
// 如果有剩余的 nibbles,修改剩余 nibbles 和当前停留 node
if branch, ok := node.(*BranchNode); ok {
if len(nibbles) == 0 {
return branch.Value, branch.HasValue()
}
b, remaining := nibbles[0], nibbles[1:]
nibbles = remaining
node = branch.Branches[b]
continue
}
// 当停留在一个 ExtensionNode
// 如果 ExtensionNode 的 Path 大于匹配长度,则无法找到该 value
// 如果 ExtensionNode 的 Path 与剩余 nibbles 部分匹配,则修改剩余 nibbles 和当前停留 node
if ext, ok := node.(*ExtensionNode); ok {
matched := PrefixMatchedLen(ext.Path, nibbles)
if matched < len(ext.Path) {
return nil, false
}
nibbles = nibbles[matched:]
node = ext.Next
continue
}
panic("not found")
}
}3.2 Put
Put 将键值对添加到 trie 中,一般来说,规则是:
- 当停在一个 EmptyNode 时,用剩余的路径替换为一个新的 LeafNode。
- 当停止到 LeafNode 时,将其转换为 ExtensionNode 并添加一个新的 BranchNode 和一个新的 LeafNode。
- 当在一个 ExtensionNode 停止时,将其转换为另一个路径更短的 ExtensionNode,并创建一个指向该 ExtensionNode 的新 BranchNode。
func (t *Trie) Put(key []byte, value []byte) {
// node 和 nibbles 在 for 循环过程中会不断变化
// node: 当前整停留的节点
// nibbles: 当前剩余匹配的 nibbles
node := &t.root
nibbles := FromBytes(key)
for {
// 当停在一个 EmptyNode,用剩余路径替换为一个新的 LeafNode
if IsEmptyNode(*node) {
leaf := NewLeafNodeFromNibbles(nibbles, value)
*node = leaf
return
}
// 当停止到 LeafNode
if leaf, ok := (*node).(*LeafNode); ok {
matched := PrefixMatchedLen(leaf.Path, nibbles)
// 如果剩余 nibbles 全部匹配,更新 value
// 结束 Put
if matched == len(nibbles) && matched == len(leaf.Path) {
newLeaf := NewLeafNodeFromNibbles(leaf.Path, value)
*node = newLeaf
return
}
// 如果 Path 不匹配,该 LeafNode 会分裂,需要一个 BranchNode
branch := NewBranchNode()
// 分裂时因为停留的是 LeafNode,看看匹配 nibbles 的长度
// 如果匹配的长度和停留的 LeafNode 一致,则 BranchNode 设置 value
// 如果匹配的长度等于剩余 nibbles,意味着新 Put 的 value 会存放到该 BranchNode 中
if matched == len(leaf.Path) {
branch.SetValue(leaf.Value)
}
if matched == len(nibbles) {
branch.SetValue(value)
}
// 匹配了部分 nibbles,意味着有重叠的路径。
// 这步结束后,node 被 ExtensionNode 或 BranchNode 替换
if matched > 0 {
// 这些重叠的路径需要创建一个 ExtensionNode
ext := NewExtensionNode(leaf.Path[:matched], branch)
*node = ext
} else {
// 如果没有匹配的长度,不需要创建 ExensionNode
*node = branch
}
// L 01020304 hello
// + 010203 world(set 到了 BranchNode)
if matched < len(leaf.Path) {
// branchNibble: 0
// leafNibbles: 4
// newLeaf: (4, hello)
branchNibble, leafNibbles := leaf.Path[matched], leaf.Path[matched+1:]
newLeaf := NewLeafNodeFromNibbles(leafNibbles, leaf.Value) // not :matched+1
branch.SetBranch(branchNibble, newLeaf)
}
// L 01020304 hello(set 到了 BranchNode)
// + 010203040506 world
if matched < len(nibbles) {
// branchNibble: 0
// leafNibbles: 506
// newLeaf: (506, world)
branchNibble, leafNibbles := nibbles[matched], nibbles[matched+1:]
newLeaf := NewLeafNodeFromNibbles(leafNibbles, value)
branch.SetBranch(branchNibble, newLeaf)
}
return
}
// 当停止到 BranchNode
if branch, ok := (*node).(*BranchNode); ok {
// 如果剩余 nibbles 长度为 0,说明 Put 的 value 可存放在该 BranchNode
if len(nibbles) == 0 {
branch.SetValue(value)
return
}
// 如有剩余 nibbles 长度,像链表一样切换 nibbles 和 node
b, remaining := nibbles[0], nibbles[1:]
nibbles = remaining
node = &branch.Branches[b]
continue
}
// 当在一个 ExtensionNode 停止
if ext, ok := (*node).(*ExtensionNode); ok {
matched := PrefixMatchedLen(ext.Path, nibbles)
if matched < len(ext.Path) {
// E 01020304
// + 010203 good
// extNibbles: 010203
// branchNibble: 0
// extRemainingnibbles: 4
extNibbles, branchNibble, extRemainingnibbles := ext.Path[:matched], ext.Path[matched], ext.Path[matched+1:]
branch := NewBranchNode()
if len(extRemainingnibbles) == 0 {
// BranchNode 直接承接原 ExtensionNode 的 Next
// E 0102030
// + 010203 good
branch.SetBranch(branchNibble, ext.Next)
} else {
// E 01020304
// + 010203 good
// 需要创建新 ExtensionNode 承接原 ExtensionNode 的 Next
// 新 BranchNode 指向它
newExt := NewExtensionNode(extRemainingnibbles, ext.Next)
branch.SetBranch(branchNibble, newExt)
}
if matched < len(nibbles) {
// E 0102030405
// + 01023456 good
// nodeBranchNibble: 3
// nodeLeafNibble: 456
// remainingLeaf: (456, good)
nodeBranchNibble, nodeLeafNibbles := nibbles[matched], nibbles[matched+1:]
remainingLeaf := NewLeafNodeFromNibbles(nodeLeafNibbles, value)
branch.SetBranch(nodeBranchNibble, remainingLeaf)
} else if matched == len(nibbles) {
// E 01020304
// + 010203 good
branch.SetValue(value)
} else {
panic(fmt.Sprintf("too many matched (%v > %v)", matched, len(nibbles)))
}
// 没有共同的路径,不需要创建 Extension
// E 01020304
// + 1234 good
if len(extNibbles) == 0 {
*node = branch
} else {
// 创建拥有共同路径的 ExtensionNode,并替换原 ExtensionNode
*node = NewExtensionNode(extNibbles, branch)
}
return
}
// E 01020304
// + 0102030506 good
nibbles = nibbles[matched:]
node = &ext.Next
continue
}
panic("unknown type")
}
}3.3 Proof
以太坊中验证时存取验证路径的节点是使用实现该接口的变量,如何验证我们实现的 MPT 是符合以太坊标准的?可以使用 go-ethereum 的 VerifyProof 函数实现验证给定根哈希下 Key 的证明。go-ethereum 的 VerifyProof 函数需要传入三个参数:
rootHash, key 和 proof。如果验证成功,VerifyProof 会返回对应 Key 的 Value。如果验证失败,VerifyProof 会返回错误。第三个参数
proof 需要实现该接口,而我们将要实现的 Prove 函数,将会返回一个实现该接口的变量,该变量拥有一个 Key 验证时需要获取的节点序列化信息。proof 变量存放的 Key 是节点的哈希值,Value 是节点的 Raw 经过 RLP 序列化的值:type KeyValueReader interface {
Has(key []byte) (bool, error)
Get(key []byte) ([]byte, error)
}// Prove 的过程和 Get 的过程相似
// 只不过它会将搜索路径上的节点存放到 proof 变量中
// 如果对应 KV 不存在会返回 false
func (t *Trie) Prove(key []byte) (Proof, bool) {
proof := NewProofDB()
node := t.root
nibbles := FromBytes(key)
for {
// 将搜索路径上的节点存放到 proof 变量中
proof.Put(Hash(node), Serialize(node))
if IsEmptyNode(node) {
return nil, false
}
if leaf, ok := node.(*LeafNode); ok {
matched := PrefixMatchedLen(leaf.Path, nibbles)
if matched != len(leaf.Path) || matched != len(nibbles) {
return nil, false
}
return proof, true
}
if branch, ok := node.(*BranchNode); ok {
if len(nibbles) == 0 {
return proof, branch.HasValue()
}
b, remaining := nibbles[0], nibbles[1:]
nibbles = remaining
node = branch.Branches[b]
continue
}
if ext, ok := node.(*ExtensionNode); ok {
matched := PrefixMatchedLen(ext.Path, nibbles)
if matched < len(ext.Path) {
return nil, false
}
nibbles = nibbles[matched:]
node = ext.Next
continue
}
panic("not found")
}
}3.4 VerifyProof
暂时先用以太坊官网的函数替代,验证实现的 MPT 是符合以太坊标准的。
func VerifyProof(rootHash []byte, key []byte, proof Proof) (value []byte, err error) {
return trie.VerifyProof(common.BytesToHash(rootHash), key, proof)
}四、MPT 使用
4.1 State Trie
func TestStorageProofExample(t *testing.T) {
// 1ee3017a85544556ea847c203623a9c84efdb77fa4951a5b01296d9aacefc5f7
account1Hash := crypto.Keccak256(common.HexToAddress("0x24264ae01b1abbc9a91e18926818ad5cbf39017b").Bytes())
accountState1, err := rlp.EncodeToBytes([]interface{}{
uint64(1), // Nonce
(new(big.Int)).SetInt64(1e18), // 1 ETH
EmptyNodeHash, // Empty StorageHash
crypto.Keccak256([]byte("")), // Empty CodeHash
})
require.NoError(t, err)
// 1eeced8d7a011c27d9aed60517c8e596509852f1d208a8a6d6f16d17ea5da204
account2Hash := crypto.Keccak256(common.HexToAddress("0x3a844bb6252b584f76febb40c941ec898df9bc23").Bytes())
accountState2, err := rlp.EncodeToBytes([]interface{}{
uint64(3), // Nonce
(new(big.Int)).SetInt64(2e18), // 2 ETH
EmptyNodeHash, // Empty StorageHash
crypto.Keccak256([]byte("")), // Empty CodeHash
})
require.NoError(t, err)
// 以上两个账户哈希值在开头有一些共同的部分: "1ee",
// 当储存进 MPT 时,会创建一个 ExtensionNode
// 使用两个账户创建一个 world state
worldStateTrie := NewTrie()
worldStateTrie.Put(account1Hash, accountState1)
worldStateTrie.Put(account2Hash, accountState2)
// 计算 state trie 的根哈希值
stateRoot := worldStateTrie.Hash()
// 为账户 1 的状态创建 proof
accountStateProof, ok := worldStateTrie.Prove(account1Hash)
require.True(t, ok)
// 针对 stateRoot 验证 proof,获取对应账户 state
validAccountState, err := VerifyProof(stateRoot, account1Hash, accountStateProof)
require.NoError(t, err)
// 再次检查帐户状态是否与原始帐户状态相同
require.True(t, bytes.Equal(validAccountState, accountState1))
}