合约事件推送

标签:java-sdk 事件订阅 Event


1. 功能简介

合约事件推送功能提供了合约事件的异步推送机制,客户端向节点发送注册请求,在请求中携带客户端关注的合约事件的参数,节点根据请求参数对请求区块范围的Event Log进行过滤,将结果分次推送给客户端。

2. 交互协议

客户端与节点的交互基于WebSocket协议。交互分为三个阶段:注册请求,节点回复,Event Log数据推送。

2.1 注册请求

客户端向节点发送事件推送的注册请求:

// request sample:
{
  "fromBlock": "latest",
  "toBlock": "latest",
  "addresses": [
    "0xca5ed56862869c25da0bdf186e634aac6c6361ee"
  ],
  "topics": [
    "0x91c95f04198617c60eaf2180fbca88fc192db379657df0e412a9f7dd4ebbe95d"
  ],
  "groupID": "group0",
  "filterID": "bb31e4ec086c48e18f21cb994e2e5967"
}
  • filerID:字符串类型,每次请求唯一,标记一次注册任务

  • groupID:字符串类型,群组ID

  • fromBlock:整形字符串,初始区块。“latest” 当前块高

  • toBlock:整形字符串,最终区块。“latest” 处理至当前块高时,继续等待新区块

  • addresses:字符串或者字符串数组:字符串表示单个合约地址,数组为多个合约地址,数组可以为空

  • topics:字符串类型或者数组类型:字符串表示单个topic,数组为多个topic,数组可以为空

2.2 节点回复

节点接受客户端注册请求时,会对请求参数进行校验,将是否成功接受该注册请求结果回复给客户端。

// response sample:
{
  "filterID": "bb31e4ec086c48e18f21cb994e2e5967",
  "result": 0
}
  • filterID:字符串类型,每次请求唯一,标记一次注册任务

  • result:整形,返回结果。0成功,其余为失败状态码

2.3 Event Log数据推送

节点验证客户端注册请求成功之后,根据客户端请求参数条件,向客户端推送Event Log数据。

// event log push sample:
{
  "filterID": "bb31e4ec086c48e18f21cb994e2e5967",
  "result": 0,
  "logs": [
    
  ]
}
  • filterID:字符串类型,每次请求唯一,标记一次注册任务

  • result:整形 0:Event Log数据推送 1:推送完成。客户端一次注册请求对应节点的数据推送会有多次(请求区块范围比较大或者等待新的区块),result字段为1时说明节点推送已经结束

  • logs:Log对象数组,result为0时有效

3. Java SDK 合约事件教程

注册接口

Java SDK中org.fisco.bcos.sdk.v3.eventsub.EventSubscribe类提供合约事件的注册接口,用户可以调用subscribeEvent向节点发送注册请求,并设置回调函数。

  public String subscribeEvent(EventSubParams params, EventSubCallback callback);

params注册参数

事件回调请求注册的参数:

public class EventSubParams {
    private BigInteger fromBlock;   
    private BigInteger toBlock;
    private List<String> addresses;
    private List<List<String>> topics;
}

callback回调对象

public interface EventSubCallback {
    void onReceiveLog(String eventSubId, int status, List<EventLog> logs);
}
  • status 回调返回状态:

    0       : 正常推送,此时logs为节点推送的事件日志
    1       : 推送完成,执行区间的区块都已经处理
    42000   : 其他错误 
    -41000  : 参数无效,客户端验证参数错误返回
    -41001  : 参数错误,节点验证参数错误返回
    -41002  : 群组不存在
    -41003  : 请求错误的区块区间
    -41004  : 节点推送数据格式错误
    -41005  : 请求发送超时
    -41006  : 客户端无订阅权限
    -41007  : 事件尚未注册,取消订阅失败
  • logs表示回调的Event Log对象列表,status为0有效。默认值null,可以在子类中通过org.fisco.bcos.sdk.v3.abi.ContractCodec解析以下EventLog对象的data字段。

  // EventLog 对象
  public class EventLog {
    private String logIndex;
    private String transactionIndex;
    private String transactionHash;
    private String blockNumber;
    private String address;
    private String data;
    private List<String> topics;
  }
  • 实现回调对象

Java SDK对回调类EventSubCallback无默认实现,用户可以通过继承EventSubCallback类,重写onReceiveLog接口,实现自己的回调逻辑处理。

class SubscribeCallback implements EventSubCallback {
    public void onReceiveLog(String eventSubId, int status, List<EventLog> logs) {
        // ADD CODE
    }
}

注意:onReceiveLog接口多次回调的logs有重复的可能性,可以根据EventLog对象中的blockNumber,transactionIndex,logIndex进行去重

topic工具

org.fisco.bcos.sdk.v3.codec.abi.TopicTools提供将各种类型参数转换为对应topic的工具,用户设置EventSubParamstopics参数可以使用。

 class TopicTools {
    // int1/uint1~uint1/uint256 
    public static String integerToTopic(BigInteger i)
    // bool
    public static String boolToTopic(boolean b)
    // address
    public static String addressToTopic(String s)
    // string
    public static String stringToTopic(String s)
    // bytes
    public static String bytesToTopic(byte[] b)
    // byte1~byte32
    public static String byteNToTopic(byte[] b)
}

4. 示例

这里以Asset合约的TransferEvent为例说明,给出合约事件推送的一些场景供用户参考。

contract Asset {
    event TransferEvent(int256 ret, string indexed from_account, string indexed to_account, uint256 indexed amount);
    event TransferAccountEvent(string,string);

    function transfer(string from_account, string to_account, uint256 amount) public returns(int256) {
        // 结果
        int result = 0;

        // 其他逻辑,省略

        // TransferEvent 保存结果以及接口参数
        TransferEvent(result, from_account, to_account, amount);

        // TransferAccountEvent 保存账号
        TransferAccountEvent(from_account, to_account);
    }
}
  • 场景1:将链上所有/最新的事件回调至客户端

        // 初始化EventSubscribe
        EventSubscribe eventSubscribe = EventSubscribe.build(group, configOption);
        eventSubscribe.start();
        
        // 参数设置
        EventSubParams params = new EventSubParams();

        // 全部Event fromBlock设置为 -1 
        params.setFromBlock(-1);

        // toBlock设置为-1,处理至最新区块继续等待新的区块
        params.setToBlock(-1);
   
        // 注册事件
        eventSubscribe.subscribeEvent(params, 
                (eventSubId, status, logs) -> {
                    System.out.println("event sub id: " + eventSubId);
                    System.out.println(" \t status: " + status);
                    System.out.println(" \t logs: " + logs);
                });
  • 场景2: 将Asset合约最新的TransferEvent事件回调至客户端

        // 初始化EventSubscribe
        EventSubscribe eventSubscribe = EventSubscribe.build(group, configOption);
        eventSubscribe.start();
        
        // 设置参数
        EventSubParams params = new EventSubParams();

        // 从订阅时的最新区块区块开始,fromBlock设置为-1
        params.setFromBlock(-1);
        // toBlock设置为-1,处理至最新区块继续等待新的区块
        params.setToBlock(-1);
        
        // topic0,TransferEvent(int256,string,string,uint256)
        params.addTopic(0, TopicTools.stringToTopic("TransferEvent(int256,string,string,uint256)");

        // 注册事件
        eventSubscribe.subscribeEvent(params, 
                (eventSubId, status, logs) -> {
                    System.out.println("event sub id: " + eventSubId);
                    System.out.println(" \t status: " + status);
                    System.out.println(" \t logs: " + logs);
                });
  • 场景3: 将指定地址的Asset合约最新的TransferEvent事件回调至客户端

合约地址: String addr = "0x06922a844c542df030a2a2be8f835892db99f324";

        // 初始化EventSubscribe
        EventSubscribe eventSubscribe = EventSubscribe.build(group, configOption);
        eventSubscribe.start();

        String addr = "0x06922a844c542df030a2a2be8f835892db99f324";
        
        // 设置参数
        EventSubParams params = new EventSubParams();

        // 从订阅时的最新区块区块开始,fromBlock设置为-1
        params.setFromBlock(-1);
        // toBlock设置为-1,处理至最新区块继续等待新的区块
        params.setToBlock(-1);

        // addresses设置为asset地址,匹配该合约地址
        params.addAddress("0x06922a844c542df030a2a2be8f835892db99f324");

        // topic0,TransferEvent(int256,string,string,uint256)
        params.addTopic(0, TopicTools.stringToTopic("TransferEvent(int256,string,string,uint256)");

        // 注册事件
        eventSubscribe.subscribeEvent(params, 
                (eventSubId, status, logs) -> {
                    System.out.println("event sub id: " + eventSubId);
                    System.out.println(" \t status: " + status);
                    System.out.println(" \t logs: " + logs);
                });
  • 场景4: 将指定地址的Asset合约所有TransferEvent事件回调至客户端

合约地址: String addr = "0x06922a844c542df030a2a2be8f835892db99f324";

        // 其他初始化逻辑,省略
        
        // 设置参数
        EventSubParams params = new EventSubParams();

        // 从最初区块开始,fromBlock设置为1
        params.setFromBlock(1);
        // toBlock设置为-1,处理至最新区块继续等待新的区块
        params.setToBlock(-1);

        // addresses设置为asset地址,匹配该合约地址
        params.addAddress("0x06922a844c542df030a2a2be8f835892db99f324");

        // topic0,TransferEvent(int256,string,string,uint256)
        params.addTopic(0, TopicTools.stringToTopic("TransferEvent(int256,string,string,uint256)");

        // 注册事件
        eventSubscribe.subscribeEvent(params, 
                (eventSubId, status, logs) -> {
                    System.out.println("event sub id: " + eventSubId);
                    System.out.println(" \t status: " + status);
                    System.out.println(" \t logs: " + logs);
                });

4. 解析例子

Asset合约为例,描述合约部署、调用、注册事件及解析节点推送事件内容的实现。注意:对增加了indexed属性的事件参数,均不进行解码,在相应位置上直接记录,其余非indexed属性的事件参数将进行解码。

        String contractAddress = "";
        try {
            AssembleTransactionProcessor manager =
                    TransactionProcessorFactory.createAssembleTransactionProcessor(
                            client, client.getCryptoSuite().createKeyPair(), abiFile, binFile);
            // deploy
            TransactionResponse response = manager.deployByContractLoader("Asset", Lists.newArrayList());
            if (!response.getTransactionReceipt().isStatusOK()) {
                return;
            }
            contractAddress = response.getContractAddress();
            // call function with event
            List<Object> paramsSetValues = new ArrayList<Object>();
            paramsSetValues.add("Alice");
            paramsSetValues.add("Bob");
            paramsSetValues.add(new BigInteger("100"));
            TransactionResponse transactionResponse =
                    manager.sendTransactionAndGetResponse(
                            contractAddress, abi, "transfer", paramsSetValues);
            logger.info("transaction response : " + JsonUtils.toJson(transactionResponse));
        } catch (Exception e) {
            logger.error("exception:", e);
        }
        
        // subscribe event
        EventSubscribe eventSubscribe = EventSubscribe.build(group, configOption);
        eventSubscribe.start();

        EventSubParams eventLogParams = new EventSubParams();
        eventLogParams.setFromBlock(-1);
        eventLogParams.setToBlock(-1);
        eventLogParams.addAddress(contractAddress);
        CryptoSuite invalidCryptoSuite =
                new CryptoSuite(client.getCryptoSuite().getCryptoTypeConfig());
        TopicTools topicTools = new TopicTools(invalidCryptoSuite);

        eventLogParams.setTopics(topics);
        eventLogParams.addTopic(0,topicTools.stringToTopic("TransferEvent(int256,string,string,uint256)"));
        eventLogParams.addTopic(0,topicTools.stringToTopic("TransferAccountEvent(string,string)"));

        class SubscribeCallback implements EventSubCallback {
            public transient Semaphore semaphore = new Semaphore(1, true);

            SubscribeCallback() {
                try {
                    semaphore.acquire(1);
                } catch (InterruptedException e) {
                    logger.error("error :", e);
                    Thread.currentThread().interrupt();
                }
            }

            @Override
            public void onReceiveLog(String eventId, int status, List<EventLog> logs) {
                String str = "status in onReceiveLog : " + status;
                logger.debug(str);
                semaphore.release();

                // decode event
                if (logs != null) {
                    for (EventLog log : logs) {
                        logger.debug(
                                " blockNumber: {}, txIndex:{}, data: {}",
                                log.getBlockNumber(),
                                log.getTransactionIndex(),
                                log.getData());
                        ContractCodec abiCodec = new ContractCodec(client.getCryptoSuite(), false);
                        try {
                            List<Object> list = abiCodec.decodeEvent(abi, "TransferEvent", log);
                            logger.debug("decode event log content, " + list);
                            // list = [0,
                            // 0x81376b9868b292a46a1c486d344e427a3088657fda629b5f4a647822d329cd6a,
                            // 0x28cac318a86c8a0a6a9156c2dba2c8c2363677ba0514ef616592d81557e679b6,
                            // 0x0000000000000000000000000000000000000000000000000000000000000064]
                            // 后三个事件参数均为indexed属性
                        } catch (ContractCodecException e) {
                            logger.error("decode event log error, " + e.getMessage());
                        }
                        try {
                            List<Object> list =
                                    abiCodec.decodeEvent(abi, "TransferAccountEvent", log);
                            logger.debug("decode event log content, " + list);
                            // list = [Alice, Bob]
                        } catch (ContractCodecException e) {
                            logger.error("decode event log error, " + e.getMessage());
                        }
                    }
                }
            }
        }

        SubscribeCallback subscribeEventCallback1 = new SubscribeCallback();
        String registerId =
                eventSubscribe.subscribeEvent(eventLogParams, subscribeEventCallback1);
        try {
            subscribeEventCallback1.semaphore.acquire(1);
            subscribeEventCallback1.semaphore.release();
            logger.info("subscribe successful, registerId is " + registerId);
        } catch (InterruptedException e) {
            logger.error("system error:", e);
            Thread.currentThread().interrupt();
        }