Skip to content

Commit fb9bc26

Browse files
authored
feat: implement vm message extraction (#1027)
* feat: implement vm message extraction - closes #978
1 parent ead40c9 commit fb9bc26

File tree

12 files changed

+365
-8
lines changed

12 files changed

+365
-8
lines changed

chain/indexer/integrated/processor/state.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
poweractors "github.com/filecoin-project/lily/chain/actors/builtin/power"
2323
rewardactors "github.com/filecoin-project/lily/chain/actors/builtin/reward"
2424
verifregactors "github.com/filecoin-project/lily/chain/actors/builtin/verifreg"
25+
"github.com/filecoin-project/lily/tasks/messageexecutions/vm"
2526

2627
"github.com/filecoin-project/lily/tasks"
2728
// actor tasks
@@ -557,6 +558,8 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
557558
out.TipsetsProcessors[t] = gasecontask.NewTask(api)
558559
case tasktype.MultisigApproval:
559560
out.TipsetsProcessors[t] = msapprovaltask.NewTask(api)
561+
case tasktype.VmMessage:
562+
out.TipsetsProcessors[t] = vm.NewTask(api)
560563

561564
//
562565
// Blocks

chain/indexer/integrated/processor/state_internal_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/filecoin-project/lily/chain/actors/builtin/reward"
1616
"github.com/filecoin-project/lily/chain/actors/builtin/verifreg"
1717
"github.com/filecoin-project/lily/chain/indexer/tasktype"
18+
"github.com/filecoin-project/lily/tasks/messageexecutions/vm"
1819

1920
"github.com/filecoin-project/lily/tasks/actorstate"
2021
inittask "github.com/filecoin-project/lily/tasks/actorstate/init_"
@@ -48,7 +49,7 @@ func TestNewProcessor(t *testing.T) {
4849
require.Equal(t, t.Name(), proc.name)
4950
require.Len(t, proc.actorProcessors, 21)
5051
require.Len(t, proc.tipsetProcessors, 5)
51-
require.Len(t, proc.tipsetsProcessors, 9)
52+
require.Len(t, proc.tipsetsProcessors, 10)
5253
require.Len(t, proc.builtinProcessors, 1)
5354

5455
require.Equal(t, message.NewTask(nil), proc.tipsetsProcessors[tasktype.Message])
@@ -60,6 +61,7 @@ func TestNewProcessor(t *testing.T) {
6061
require.Equal(t, internalparsedmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.InternalParsedMessage])
6162
require.Equal(t, gaseconomy.NewTask(nil), proc.tipsetsProcessors[tasktype.MessageGasEconomy])
6263
require.Equal(t, msapprovals.NewTask(nil), proc.tipsetsProcessors[tasktype.MultisigApproval])
64+
require.Equal(t, vm.NewTask(nil), proc.tipsetsProcessors[tasktype.VmMessage])
6365

6466
require.Equal(t, headers.NewTask(), proc.tipsetProcessors[tasktype.BlockHeader])
6567
require.Equal(t, parents.NewTask(), proc.tipsetProcessors[tasktype.BlockParent])

chain/indexer/integrated/processor/state_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,6 @@ func TestMakeProcessorsAllTasks(t *testing.T) {
347347
require.NoError(t, err)
348348
require.Len(t, proc.ActorProcessors, 21)
349349
require.Len(t, proc.TipsetProcessors, 5)
350-
require.Len(t, proc.TipsetsProcessors, 9)
350+
require.Len(t, proc.TipsetsProcessors, 10)
351351
require.Len(t, proc.ReportProcessors, 1)
352352
}

chain/indexer/tasktype/table_tasks.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const (
2424
ParsedMessage = "parsed_message"
2525
InternalMessage = "internal_messages"
2626
InternalParsedMessage = "internal_parsed_messages"
27+
VmMessage = "vm_messages"
2728
MultisigTransaction = "multisig_transaction"
2829
ChainPower = "chain_power"
2930
PowerActorClaim = "power_actor_claim"
@@ -62,6 +63,7 @@ var AllTableTasks = []string{
6263
ParsedMessage,
6364
InternalMessage,
6465
InternalParsedMessage,
66+
VmMessage,
6567
MultisigTransaction,
6668
ChainPower,
6769
PowerActorClaim,
@@ -100,6 +102,7 @@ var TableLookup = map[string]struct{}{
100102
ParsedMessage: {},
101103
InternalMessage: {},
102104
InternalParsedMessage: {},
105+
VmMessage: {},
103106
MultisigTransaction: {},
104107
ChainPower: {},
105108
PowerActorClaim: {},
@@ -138,6 +141,7 @@ var TableComment = map[string]string{
138141
ParsedMessage: ``,
139142
InternalMessage: ``,
140143
InternalParsedMessage: ``,
144+
VmMessage: ``,
141145
MultisigTransaction: ``,
142146
ChainPower: ``,
143147
PowerActorClaim: ``,
@@ -178,8 +182,9 @@ var TableFieldComments = map[string]map[string]string{
178182
"DealID": "Identifier for the deal.",
179183
"EndEpoch": "The epoch at which this deal with end.",
180184
"Height": "Epoch at which this deal proposal was added or changed.",
185+
"IsString": "When true Label contains a valid UTF-8 string encoded in base64. When false Label contains raw bytes encoded in base64. Related to FIP: https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0027.md",
181186
"IsVerified": "Deal is with a verified provider.",
182-
"Label": "An arbitrary client chosen label to apply to the deal.",
187+
"Label": "An arbitrary client chosen label to apply to the deal. The value is base64 encoded before persisting.",
183188
"PaddedPieceSize": "The piece size in bytes with padding.",
184189
"PieceCID": "CID of a sector piece. A Piece is an object that represents a whole or part of a File.",
185190
"ProviderCollateral": "The amount of FIL (in attoFIL) the provider has pledged as collateral. The Provider deal collateral is only slashed when a sector is terminated before the deal expires.",
@@ -197,6 +202,21 @@ var TableFieldComments = map[string]map[string]string{
197202
ParsedMessage: {},
198203
InternalMessage: {},
199204
InternalParsedMessage: {},
205+
VmMessage: {
206+
"ActorCode": "ActorCode of To (receiver).",
207+
"Cid": "Cid of the message.",
208+
"ExitCode": "ExitCode of message execution.",
209+
"From": "From sender of message.",
210+
"GasUsed": "GasUsed by message.",
211+
"Height": "Height message was executed at.",
212+
"Method": "Method called on To (receiver).",
213+
"Params": "Params contained in message.",
214+
"Returns": "Return value of message.",
215+
"Source": "On-chain message triggering the message.",
216+
"StateRoot": "StateRoot message was applied to.",
217+
"To": "To receiver of message.",
218+
"Value": "Value attoFIL contained in message.",
219+
},
200220
MultisigTransaction: {
201221
"To": "Transaction State",
202222
},

chain/indexer/tasktype/tasks.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ var TaskLookup = map[string][]string{
7979
ImplicitMessageTask: {
8080
InternalMessage,
8181
InternalParsedMessage,
82+
VmMessage,
8283
},
8384
ChainConsensusTask: {
8485
ChainConsensus,

chain/indexer/tasktype/tasks_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func TestMakeTaskNamesAlias(t *testing.T) {
6666
},
6767
{
6868
taskAlias: tasktype.ImplicitMessageTask,
69-
tasks: []string{tasktype.InternalMessage, tasktype.InternalParsedMessage},
69+
tasks: []string{tasktype.InternalMessage, tasktype.InternalParsedMessage, tasktype.VmMessage},
7070
},
7171
{
7272
taskAlias: tasktype.ChainConsensusTask,
@@ -99,9 +99,8 @@ func TestMakeAllTaskAliasNames(t *testing.T) {
9999
require.Len(t, actual, len(storage.Models))
100100
}
101101

102-
const TotalTableTasks = 35
103-
104102
func TestMakeAllTaskNames(t *testing.T) {
103+
const TotalTableTasks = 36
105104
actual, err := tasktype.MakeTaskNames(tasktype.AllTableTasks)
106105
require.NoError(t, err)
107106
// if this test fails it means a new task name was added, update the above test

lens/util/repo.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"reflect"
99
"strings"
10+
"time"
1011

1112
"github.com/filecoin-project/go-bitfield"
1213
"github.com/filecoin-project/go-state-types/exitcode"
@@ -282,6 +283,34 @@ func ParseParams(params []byte, method abi.MethodNum, actCode cid.Cid) (string,
282283
return string(b), m.Name, err
283284
}
284285

286+
func ParseReturn(ret []byte, method abi.MethodNum, actCode cid.Cid) (string, string, error) {
287+
m, found := ActorRegistry.Methods[actCode][method]
288+
if !found {
289+
return "", "", fmt.Errorf("unknown method %d for actor %s", method, actCode)
290+
}
291+
292+
// if the actor method doesn't expect returns don't parse them
293+
if m.Ret == reflect.TypeOf(new(abi.EmptyValue)) {
294+
return "", m.Name, nil
295+
}
296+
297+
p := reflect.New(m.Ret.Elem()).Interface().(cbg.CBORUnmarshaler)
298+
if err := p.UnmarshalCBOR(bytes.NewReader(ret)); err != nil {
299+
actorName := builtin.ActorNameByCode(actCode)
300+
return "", m.Name, fmt.Errorf("cbor decode into %s %s:(%s.%d) failed: %v", m.Name, actorName, actCode, method, err)
301+
}
302+
303+
b, err := MarshalWithOverrides(p, map[reflect.Type]marshaller{
304+
reflect.TypeOf(bitfield.BitField{}): bitfieldCountMarshaller,
305+
})
306+
if err != nil {
307+
return "", "", fmt.Errorf("failed to parse message return method: %d, actor code: %s, return: %s: %w", method, actCode, string(ret), err)
308+
}
309+
310+
return string(b), m.Name, err
311+
312+
}
313+
285314
func MethodAndParamsForMessage(m *types.Message, destCode cid.Cid) (string, string, error) {
286315
// Method is optional, zero means a plain value transfer
287316
if m.Method == 0 {
@@ -308,6 +337,71 @@ func MethodAndParamsForMessage(m *types.Message, destCode cid.Cid) (string, stri
308337
return method, params, nil
309338
}
310339

340+
type MessageParamsReturn struct {
341+
MethodName string
342+
Params string
343+
Return string
344+
}
345+
346+
func MethodParamsReturnForMessage(m *MessageTrace, destCode cid.Cid) (*MessageParamsReturn, error) {
347+
// Method is optional, zero means a plain value transfer
348+
if m.Message.Method == 0 {
349+
return &MessageParamsReturn{
350+
MethodName: "Send",
351+
Params: "",
352+
Return: "",
353+
}, nil
354+
}
355+
356+
if !destCode.Defined() {
357+
return nil, fmt.Errorf("missing actor code")
358+
}
359+
360+
params, _, err := ParseParams(m.Message.Params, m.Message.Method, destCode)
361+
if err != nil {
362+
log.Warnf("failed to parse parameters of message %s: %v", m.Message.Cid(), err)
363+
return nil, fmt.Errorf("unknown method for actor type %s method %d: %w", destCode.String(), int64(m.Message.Method), err)
364+
}
365+
ret, method, err := ParseReturn(m.Receipt.Return, m.Message.Method, destCode)
366+
if err != nil {
367+
log.Warnf("failed to parse return of message %s: %v", m.Message.Cid(), err)
368+
return nil, fmt.Errorf("unknown method for actor type %s method %d: %w", destCode.String(), int64(m.Message.Method), err)
369+
}
370+
371+
return &MessageParamsReturn{
372+
MethodName: method,
373+
Params: params,
374+
Return: ret,
375+
}, nil
376+
}
377+
378+
func walkExecutionTrace(et *types.ExecutionTrace, trace *[]*MessageTrace) {
379+
for _, sub := range et.Subcalls {
380+
*trace = append(*trace, &MessageTrace{
381+
Message: sub.Msg,
382+
Receipt: sub.MsgRct,
383+
Error: sub.Error,
384+
Duration: sub.Duration,
385+
GasCharge: sub.GasCharges,
386+
})
387+
walkExecutionTrace(&sub, trace) //nolint:scopelint,gosec
388+
}
389+
}
390+
391+
type MessageTrace struct {
392+
Message *types.Message
393+
Receipt *types.MessageReceipt
394+
Error string
395+
Duration time.Duration
396+
GasCharge []*types.GasTrace
397+
}
398+
399+
func GetChildMessagesOf(m *lens.MessageExecution) []*MessageTrace {
400+
var out []*MessageTrace
401+
walkExecutionTrace(&m.Ret.ExecutionTrace, &out)
402+
return out
403+
}
404+
311405
func ActorNameAndFamilyFromCode(c cid.Cid) (name string, family string, err error) {
312406
if !c.Defined() {
313407
return "", "", fmt.Errorf("cannot derive actor name from undefined CID")

model/actors/market/dealproposal.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ type MarketDealProposal struct {
4949
// An arbitrary client chosen label to apply to the deal. The value is base64 encoded before persisting.
5050
Label string
5151

52-
// When true Label contains a valid UTF-8 string encoded in base64. When false Label contains raw bytes encoded in base64.
53-
// Related to FIP: https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0027.md
52+
// When true Label contains a valid UTF-8 string encoded in base64. When false Label contains raw bytes encoded in base64. Related to FIP: https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0027.md
5453
IsString bool
5554
}
5655

model/messages/vm.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package messages
2+
3+
import (
4+
"context"
5+
6+
"go.opencensus.io/tag"
7+
8+
"github.com/filecoin-project/lily/metrics"
9+
"github.com/filecoin-project/lily/model"
10+
)
11+
12+
type VMMessage struct {
13+
tableName struct{} `pg:"vm_messages"` // nolint: structcheck
14+
15+
// Height message was executed at.
16+
Height int64 `pg:",pk,notnull,use_zero"`
17+
// StateRoot message was applied to.
18+
StateRoot string `pg:",pk,notnull"`
19+
// Cid of the message.
20+
Cid string `pg:",pk,notnull"`
21+
// On-chain message triggering the message.
22+
Source string `pg:",pk,notnull"`
23+
24+
// From sender of message.
25+
From string `pg:",notnull"`
26+
// To receiver of message.
27+
To string `pg:",notnull"`
28+
// Value attoFIL contained in message.
29+
Value string `pg:"type:numeric,notnull"`
30+
// Method called on To (receiver).
31+
Method uint64 `pg:",notnull,use_zero"`
32+
// ActorCode of To (receiver).
33+
ActorCode string `pg:",notnull"`
34+
// ExitCode of message execution.
35+
ExitCode int64 `pg:",notnull,use_zero"`
36+
// GasUsed by message.
37+
GasUsed int64 `pg:",notnull,use_zero"`
38+
// Params contained in message.
39+
Params string `pg:",type:jsonb"`
40+
// Return value of message.
41+
Returns string `pg:",type:jsonb"`
42+
}
43+
44+
func (v *VMMessage) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
45+
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "vm_messages"))
46+
stop := metrics.Timer(ctx, metrics.PersistDuration)
47+
defer stop()
48+
49+
metrics.RecordCount(ctx, metrics.PersistModel, 1)
50+
return s.PersistModel(ctx, v)
51+
}
52+
53+
type VMMessageList []*VMMessage
54+
55+
func (vl VMMessageList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
56+
if len(vl) == 0 {
57+
return nil
58+
}
59+
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "vm_messages"))
60+
stop := metrics.Timer(ctx, metrics.PersistDuration)
61+
defer stop()
62+
63+
metrics.RecordCount(ctx, metrics.PersistModel, len(vl))
64+
return s.PersistModel(ctx, vl)
65+
}

schemas/v1/8_vm_messages.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package v1
2+
3+
func init() {
4+
patches.Register(
5+
8,
6+
`
7+
CREATE TABLE {{ .SchemaName | default "public"}}.vm_messages (
8+
height bigint NOT NULL,
9+
state_root text NOT NULL,
10+
cid text NOT NULL,
11+
source text,
12+
"from" text NOT NULL,
13+
"to" text NOT NULL,
14+
value numeric NOT NULL,
15+
method bigint NOT NULL,
16+
actor_code text NOT NULL,
17+
exit_code bigint NOT NULL,
18+
gas_used bigint NOT NULL,
19+
params jsonb,
20+
returns jsonb
21+
);
22+
ALTER TABLE ONLY {{ .SchemaName | default "public"}}.vm_messages ADD CONSTRAINT vm_messages_pkey PRIMARY KEY (height, state_root, cid, source);
23+
CREATE INDEX vm_messages_height_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BTREE (height);
24+
CREATE INDEX vm_messages_from_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("from");
25+
CREATE INDEX vm_messages_to_idx ON {{ .SchemaName | default "public"}}.vm_messages USING HASH ("to");
26+
CREATE INDEX vm_messages_actor_code_method_idx ON {{ .SchemaName | default "public"}}.vm_messages USING BTREE (actor_code, method);
27+
28+
29+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.height IS 'Height message was executed at.';
30+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.state_root IS 'CID of the parent state root at which this message was executed.';
31+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.cid IS 'CID of the message (note this CID does not appear on chain).';
32+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.source IS 'CID of the on-chain message that caused this message to be sent.';
33+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages."from" IS 'Address of the actor that sent the message.';
34+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages."to" IS 'Address of the actor that received the message.';
35+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.value IS 'Amount of FIL (in attoFIL) transferred by this message.';
36+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.method IS 'The method number invoked on the recipient actor. Only unique to the actor the method is being invoked on. A method number of 0 is a plain token transfer - no method execution';
37+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.actor_code IS 'The CID of the actor that received the message.';
38+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.exit_code IS 'The exit code that was returned as a result of executing the message.';
39+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.gas_used IS 'A measure of the amount of resources (or units of gas) consumed, in order to execute a message.';
40+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.params IS 'Message parameters parsed and serialized as a JSON object.';
41+
COMMENT ON COLUMN {{ .SchemaName | default "public"}}.vm_messages.returns IS 'Result returned from executing a message parsed and serialized as a JSON object.';
42+
`,
43+
)
44+
}

0 commit comments

Comments
 (0)