实现 Merkle Patricia Tree

date
Jan 14, 2023
slug
implement-mpt
status
Published
tags
Blockchain
Ethereum
summary
Merkle Patricia Tree (MPT) 是以太坊的一种存储结构。它不仅可以支持增删查改的操作,而且它还能对查询的结果提供可验证的功能。
type
Post

一、前置工作

1.1 哈希函数

以太坊中使用的哈希函数是 Keccak256,是一种 SHA3 的变种。将数据传入后,这个函数会返回该数据的哈希值(256位,64个十六进制数)。
func Keccak256(data ...[]byte) []byte {
	d := sha3.NewLegacyKeccak256()
	for _, b := range data {
		d.Write(b)
	}
	return d.Sum(nil)
}

1.2 Nibble(半字节)

关于 nibble 的信息请浏览以太坊中的半字节(nibble),这里只会具体讲述它在 MPT 里的作用。
虽然 Go 实现 nibble 只能使用 byte 类型去替代,不能做到真正的半字节。但是这只是在内存中,后续存储到磁盘前,会将相邻的两个半字节拼接成一个字节,再存储到磁盘中。
type Nibble byte

1.2.1 nibble 是 MPT 查询时最小的移动单位

以太坊中的以账户为例,对于一个存储的键值对 (key, value)key 存放的是 Keccak256(accountAddress),即用户地址(区块链中唯一)的哈希值。value 存放的是 rlp.EncodeToBytes(accountData)),即对应用户数据的 RLP 编码。
而在 MPT 的寻找数据存放的过程中,最小的移动单位是 nibble,不是 byte。假设现在有一个字节切片:[1, 2, 3, 4, 5],在 MPT 的眼中则是:[0, 1, 0, 2, 0, 3, 0, 4, 0, 5]

1.2.2 nibble 可作为前缀标记 MPT 节点类型

在 MPT 有三种类型的节点:extension, leaf 和 branch。它们在序列化和反序列化时,需要对它们进行区分。但只有 extension 和 leaf 类型的节点需要被区分,branch 节点的序列化方式已经和它们区别开了。
extension 和 leaf 类型节点在序列化只需要两个值:Path、Next(extension)或 Path、Value(leaf),使用 RLP 时序列化完全一致,必须添加前缀进行区分。这两个类型的节点的 Path 最小移动单位是 nibble,会出现组成 Path 的 nibble 为奇数个的情况,所以 Path 的 nibble 最后一定要拼接成 byte。
hex char    bits    |    node type partial     path length
----------------------------------------------------------
   0        0000    |       extension              even
   1        0001    |       extension              odd
   2        0010    |   terminating (leaf)         even
   3        0011    |   terminating (leaf)         odd
因为这两个原因,给 extension 和 leaf 类型的节点添加前缀,不仅可以对节点类型进行区分,也能将 Path 的 nibble 补齐成偶数个。下面的例子中,每个字符都可认为是一个 nibble:
key: 12 34 56       |      extension: 00 12 34 56
key: 12 34 5        |      extension: 11 23 45
---------------------------------------------------
key: 12 34 56       |      leaf:      20 12 34 56
key: 12 34 5        |      leaf:      31 23 45
偶数个 nibble 的 Path 如果添加单个前缀就会变成奇数个,所以偶数个 nibble 的 Path 添加前缀时一次添加两个 nibble。第一个为前缀标志位,第二个永远为 0。奇数个 nibble 的 Path 仅用添加一个前缀标志位即可变成偶数个 nibble。

1.2.3 比较重要的函数

  • ToPrefixed:根据传入的 nibble 切片添加前缀
func ToPrefixed(ns []Nibble, isLeafNode bool) []Nibble {
	var prefixBytes []Nibble

	if len(ns)%2 > 0 {
		prefixBytes = []Nibble{1}
	} else {
		prefixBytes = []Nibble{0, 0}
	}

	prefixed := make([]Nibble, 0, len(prefixBytes)+len(ns))
	prefixed = append(prefixed, prefixBytes...)
	prefixed = append(prefixed, ns...)

	if isLeafNode {
		prefixed[0] += 2
	}

	return prefixed
}
  • ToBytes:将传入的 nibble 切片拼接成 byte 切片
func ToBytes(ns []Nibble) []byte {
	buf := make([]byte, 0, len(ns)/2)

	for i := 0; i < len(ns); i += 2 {
		b := byte(ns[i]<<4) + byte(ns[i+1])
		buf = append(buf, b)
	}

	return buf
}

二、MPT 节点

notion image

2.1 节点接口(Node Interface)

如果一个变量是节点类型,它一定要实现 Hash 函数和 Raw 函数。前者会返回该节点的哈希值,后者会返回该节点数据的 RLP 序列化值。
type Node interface {
	Hash() []byte
	Raw() []interface{}
}

2.2 空节点(Empty Node)

一个节点什么也没有就是空节点。比如在 Go 中如果一个 Node 类型的值为 nil,那它就是空节点。空节点调用 Raw 返回的是空字节切片,因为什么也没有。空节点调用 Hash 返回的是一个固定的哈希值 "56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"
这个固定的哈希值是由 0x80(空字节数组的 RLP 编码值) 经过 Keccak256 计算得到 keccak-256在线计算 - StrErr.com
  • Raw:返回空字节切片
  • Hash:返回固定哈希值

2.3 拓展节点(Extension Node)

extension 节点用于合并多个路径上共同的 nibble。它只会储存两个值:shared nibble 和 next node。
序列化时 shared nibble 会添加前缀,而 next node 会根据它序列化值的长度,选择是否使用 next node 的哈希值。(如果 next node 长度大于等于 32,则使用哈希值,如果 next node 序列化值的长度小于 32,则可以直接将序列化值存放在 extension 节点中)
func (e ExtensionNode) Raw() []interface{} {
	hashes := make([]interface{}, 2)

	hashes[0] = ToBytes(ToPrefixed(e.Path, false))
	if len(Serialize(e.Next)) >= 32 {
		hashes[1] = e.Next.Hash()
	} else {
		hashes[1] = e.Next.Raw()
	}

	return hashes
}

2.4 叶子节点(Leaf Node)

leaf 节点用于存放某个路径的终点。它只会储存两个值:path 和 value。
序列化时只需要给 path 添加前缀,而 value 原封不动。
func (l LeafNode) Raw() []interface{} {
	path := ToBytes(ToPrefixed(l.Path, true))
	raw := []interface{}{path, l.Value}
	return raw
}

2.5 分支节点(Branch Node)

branch 节点既有 extension 节点的 next node 指针,也能像 leaf 节点一样存放一个值。它总共存放 17 个值,即 16 个 next node 和 1 个 value。
序列化时,16 个 next node 像 extension 一样处理,next node 序列化值长度大于等于 32,则使用哈希值,反之使用序列化值。1 个 value 像 leaf 一样处理, value 原封不动。
func (b BranchNode) Raw() []interface{} {
	hashes := make([]interface{}, 17)
	for i := 0; i < 16; i++ {
		if b.Branches[i] == nil {
			hashes[i] = EmptyNodeRaw
		} else {
			node := b.Branches[i]
			if len(Serialize(node)) >= 32 {
				hashes[i] = node.Hash()
			} else {
				hashes[i] = node.Raw()
			}
		}
	}

	hashes[16] = b.Value
	return hashes
}

三、MPT 操作

3.1 Get

func (t *Trie) Get(key []byte) ([]byte, bool) {
	// node 和 nibbles 在 for 循环过程中会不断变化
	// node: 当前整停留的节点
	// nibbles: 当前剩余匹配的 nibbles
	node := t.root
	nibbles := FromBytes(key)

	for {
		// 当停留在一个 EmptyNode,查询失败
		if IsEmptyNode(node) {
			return nil, false
		}
		
		// 当停留在一个 LeafNode,看看其 Path 和剩余 nibble 是否完全匹配
		// 如果不匹配,查询失败
		// 如果完全匹配,返回 LeafNode 的 value
		if leaf, ok := node.(*LeafNode); ok {
			matched := PrefixMatchedLen(leaf.Path, nibbles)
			if matched != len(leaf.Path) || matched != len(nibbles) {
				return nil, false
			}
			return leaf.Value, true
		}
		
		// 当停留在一个 BranchNode,看看是否有剩余的 nibbles
		// 如果没有剩余的 nibbles,则返回 BranchNode 存放的 value
		// 如果有剩余的 nibbles,修改剩余 nibbles 和当前停留 node
		if branch, ok := node.(*BranchNode); ok {
			if len(nibbles) == 0 {
				return branch.Value, branch.HasValue()
			}

			b, remaining := nibbles[0], nibbles[1:]
			nibbles = remaining
			node = branch.Branches[b]

			continue
		}
		
		// 当停留在一个 ExtensionNode
		// 如果 ExtensionNode 的 Path 大于匹配长度,则无法找到该 value
		// 如果 ExtensionNode 的 Path 与剩余 nibbles 部分匹配,则修改剩余 nibbles 和当前停留 node
		if ext, ok := node.(*ExtensionNode); ok {
			matched := PrefixMatchedLen(ext.Path, nibbles)

			if matched < len(ext.Path) {
				return nil, false
			}

			nibbles = nibbles[matched:]
			node = ext.Next
			continue
		}

		panic("not found")
	}
}

3.2 Put

Put 将键值对添加到 trie 中,一般来说,规则是:
  • 当停在一个 EmptyNode 时,用剩余的路径替换为一个新的 LeafNode。
  • 当停止到 LeafNode 时,将其转换为 ExtensionNode 并添加一个新的 BranchNode 和一个新的 LeafNode。
  • 当在一个 ExtensionNode 停止时,将其转换为另一个路径更短的 ExtensionNode,并创建一个指向该 ExtensionNode 的新 BranchNode。
func (t *Trie) Put(key []byte, value []byte) {
	// node 和 nibbles 在 for 循环过程中会不断变化
	// node: 当前整停留的节点
	// nibbles: 当前剩余匹配的 nibbles
	node := &t.root               
	nibbles := FromBytes(key)

	for {
		// 当停在一个 EmptyNode,用剩余路径替换为一个新的 LeafNode
		if IsEmptyNode(*node) {
			leaf := NewLeafNodeFromNibbles(nibbles, value)
			*node = leaf
			return
		}
		
		// 当停止到 LeafNode
		if leaf, ok := (*node).(*LeafNode); ok {
			matched := PrefixMatchedLen(leaf.Path, nibbles)

			// 如果剩余 nibbles 全部匹配,更新 value
			// 结束 Put
			if matched == len(nibbles) && matched == len(leaf.Path) {
				newLeaf := NewLeafNodeFromNibbles(leaf.Path, value)
				*node = newLeaf
				return
			}

			// 如果 Path 不匹配,该 LeafNode 会分裂,需要一个 BranchNode
			branch := NewBranchNode()
			// 分裂时因为停留的是 LeafNode,看看匹配 nibbles 的长度
			// 如果匹配的长度和停留的 LeafNode 一致,则 BranchNode 设置 value
			// 如果匹配的长度等于剩余 nibbles,意味着新 Put 的 value 会存放到该 BranchNode 中
			if matched == len(leaf.Path) {
				branch.SetValue(leaf.Value)
			}
			if matched == len(nibbles) {
				branch.SetValue(value)
			}

			// 匹配了部分 nibbles,意味着有重叠的路径。
			// 这步结束后,node 被 ExtensionNode 或 BranchNode 替换
			if matched > 0 {
				// 这些重叠的路径需要创建一个 ExtensionNode
				ext := NewExtensionNode(leaf.Path[:matched], branch)
				*node = ext
			} else {
				// 如果没有匹配的长度,不需要创建 ExensionNode
				*node = branch
			}

			// L 01020304 hello
			// + 010203   world(set 到了 BranchNode)
			if matched < len(leaf.Path) {
				// branchNibble: 0
				// leafNibbles:  4
				// newLeaf: (4, hello)
				branchNibble, leafNibbles := leaf.Path[matched], leaf.Path[matched+1:]
				newLeaf := NewLeafNodeFromNibbles(leafNibbles, leaf.Value) // not :matched+1
				branch.SetBranch(branchNibble, newLeaf)
			}

			// L 01020304 hello(set 到了 BranchNode)
			// + 010203040506 world
			if matched < len(nibbles) {
				// branchNibble: 0
				// leafNibbles:  506
				// newLeaf: (506, world)
				branchNibble, leafNibbles := nibbles[matched], nibbles[matched+1:]
				newLeaf := NewLeafNodeFromNibbles(leafNibbles, value)
				branch.SetBranch(branchNibble, newLeaf)
			}

			return
		}

		// 当停止到 BranchNode
		if branch, ok := (*node).(*BranchNode); ok {
			// 如果剩余 nibbles 长度为 0,说明 Put 的 value 可存放在该 BranchNode
			if len(nibbles) == 0 {
				branch.SetValue(value)
				return
			}
			
			// 如有剩余 nibbles 长度,像链表一样切换 nibbles 和 node
			b, remaining := nibbles[0], nibbles[1:]
			nibbles = remaining
			node = &branch.Branches[b]
			continue
		}

		// 当在一个 ExtensionNode 停止
		if ext, ok := (*node).(*ExtensionNode); ok {
			matched := PrefixMatchedLen(ext.Path, nibbles)
			if matched < len(ext.Path) {
				// E 01020304
				// + 010203 good
				// extNibbles: 010203
				// branchNibble: 0
				// extRemainingnibbles: 4
				extNibbles, branchNibble, extRemainingnibbles := ext.Path[:matched], ext.Path[matched], ext.Path[matched+1:]
				branch := NewBranchNode()
				if len(extRemainingnibbles) == 0 {
					// BranchNode 直接承接原 ExtensionNode 的 Next
					// E 0102030
					// + 010203 good
					branch.SetBranch(branchNibble, ext.Next)
				} else {
					// E 01020304
					// + 010203 good
					// 需要创建新 ExtensionNode 承接原 ExtensionNode 的 Next
					// 新 BranchNode 指向它
					newExt := NewExtensionNode(extRemainingnibbles, ext.Next)
					branch.SetBranch(branchNibble, newExt)
				}
				
				if matched < len(nibbles) {
					// E 0102030405
					// + 01023456 good
					// nodeBranchNibble: 3
					// nodeLeafNibble:   456
					// remainingLeaf:    (456, good)
					nodeBranchNibble, nodeLeafNibbles := nibbles[matched], nibbles[matched+1:]
					remainingLeaf := NewLeafNodeFromNibbles(nodeLeafNibbles, value)
					branch.SetBranch(nodeBranchNibble, remainingLeaf)
				} else if matched == len(nibbles) {
					// E 01020304
					// + 010203 good
					branch.SetValue(value)
				} else {
					panic(fmt.Sprintf("too many matched (%v > %v)", matched, len(nibbles)))
				}

				// 没有共同的路径,不需要创建 Extension
				// E 01020304
				// + 1234 good
				if len(extNibbles) == 0 {
					*node = branch
				} else {
					// 创建拥有共同路径的 ExtensionNode,并替换原 ExtensionNode
					*node = NewExtensionNode(extNibbles, branch)
				}
				return
			}
			
			// E 01020304
			// + 0102030506 good
			nibbles = nibbles[matched:]
			node = &ext.Next
			continue
		}

		panic("unknown type")
	}
}

3.3 Proof

以太坊中验证时存取验证路径的节点是使用实现该接口的变量,如何验证我们实现的 MPT 是符合以太坊标准的?可以使用 go-ethereum 的 VerifyProof 函数实现验证给定根哈希下 Key 的证明。go-ethereum 的 VerifyProof 函数需要传入三个参数:rootHash, keyproof。如果验证成功,VerifyProof 会返回对应 Key 的 Value。如果验证失败,VerifyProof 会返回错误。
第三个参数 proof 需要实现该接口,而我们将要实现的 Prove 函数,将会返回一个实现该接口的变量,该变量拥有一个 Key 验证时需要获取的节点序列化信息。proof 变量存放的 Key 是节点的哈希值,Value 是节点的 Raw 经过 RLP 序列化的值:
type KeyValueReader interface {
    Has(key []byte) (bool, error)
    Get(key []byte) ([]byte, error)
}
// Prove 的过程和 Get 的过程相似
// 只不过它会将搜索路径上的节点存放到 proof 变量中
// 如果对应 KV 不存在会返回 false
func (t *Trie) Prove(key []byte) (Proof, bool) {
	proof := NewProofDB()
	node := t.root
	nibbles := FromBytes(key)

	for {
		// 将搜索路径上的节点存放到 proof 变量中
		proof.Put(Hash(node), Serialize(node))

		if IsEmptyNode(node) {
			return nil, false
		}

		if leaf, ok := node.(*LeafNode); ok {
			matched := PrefixMatchedLen(leaf.Path, nibbles)
			if matched != len(leaf.Path) || matched != len(nibbles) {
				return nil, false
			}

			return proof, true
		}

		if branch, ok := node.(*BranchNode); ok {
			if len(nibbles) == 0 {
				return proof, branch.HasValue()
			}

			b, remaining := nibbles[0], nibbles[1:]
			nibbles = remaining
			node = branch.Branches[b]
			continue
		}

		if ext, ok := node.(*ExtensionNode); ok {
			matched := PrefixMatchedLen(ext.Path, nibbles)

			if matched < len(ext.Path) {
				return nil, false
			}

			nibbles = nibbles[matched:]
			node = ext.Next
			continue
		}

		panic("not found")
	}
}

3.4 VerifyProof

暂时先用以太坊官网的函数替代,验证实现的 MPT 是符合以太坊标准的。
func VerifyProof(rootHash []byte, key []byte, proof Proof) (value []byte, err error) {
	return trie.VerifyProof(common.BytesToHash(rootHash), key, proof)
}

四、MPT 使用

4.1 State Trie

func TestStorageProofExample(t *testing.T) {
	// 1ee3017a85544556ea847c203623a9c84efdb77fa4951a5b01296d9aacefc5f7
	account1Hash := crypto.Keccak256(common.HexToAddress("0x24264ae01b1abbc9a91e18926818ad5cbf39017b").Bytes())
	accountState1, err := rlp.EncodeToBytes([]interface{}{
		uint64(1),                     // Nonce
		(new(big.Int)).SetInt64(1e18), // 1 ETH
		EmptyNodeHash,                 // Empty StorageHash
		crypto.Keccak256([]byte("")),  // Empty CodeHash
	})
	require.NoError(t, err)

	// 1eeced8d7a011c27d9aed60517c8e596509852f1d208a8a6d6f16d17ea5da204
	account2Hash := crypto.Keccak256(common.HexToAddress("0x3a844bb6252b584f76febb40c941ec898df9bc23").Bytes())
	accountState2, err := rlp.EncodeToBytes([]interface{}{
		uint64(3),                     // Nonce
		(new(big.Int)).SetInt64(2e18), // 2 ETH
		EmptyNodeHash,                 // Empty StorageHash
		crypto.Keccak256([]byte("")),  // Empty CodeHash
	})
	require.NoError(t, err)

	// 以上两个账户哈希值在开头有一些共同的部分: "1ee",
	// 当储存进 MPT 时,会创建一个 ExtensionNode

	// 使用两个账户创建一个 world state
	worldStateTrie := NewTrie()
	worldStateTrie.Put(account1Hash, accountState1)
	worldStateTrie.Put(account2Hash, accountState2)

	// 计算 state trie 的根哈希值
	stateRoot := worldStateTrie.Hash()

	// 为账户 1 的状态创建 proof
	accountStateProof, ok := worldStateTrie.Prove(account1Hash)
	require.True(t, ok)

	// 针对 stateRoot 验证 proof,获取对应账户 state
	validAccountState, err := VerifyProof(stateRoot, account1Hash, accountStateProof)
	require.NoError(t, err)

	// 再次检查帐户状态是否与原始帐户状态相同
	require.True(t, bytes.Equal(validAccountState, accountState1))
}
 

© therainisme 2025