⏲️ 作业调度和回调#

反馈数据集#

注意

截至目前,FeedbackDataset 不提供作业调度支持。如果您想使用它,您需要使用其他数据集之一。要获取有关数据集差异的更多信息,您可以查看此处

其他数据集#

本指南简要介绍了 Argilla 监听器。Argilla 监听器使您能够构建细粒度的复杂工作流程作为后台进程,就像与 Argilla 直接集成的低调作业调度替代方案。

主要目标是方便用户定义和自定义他们的 Argilla 体验。请注意,关于使用 small-text 进行主动学习的教程是监听器功能强大的一个很好的例子。或者,您可以查看Python 客户端以熟悉它。

此功能是实验性的,您可以预期 Python API 会有一些更改。请在 Github 上报告您遇到的任何问题。此外,Jupyter Notebooks 可能需要完全重启以确保所有后台进程都已正确停止。

安装依赖项#

为了使用监听器,您需要安装以下依赖项

%pip install argilla[listeners] -qqq

基础知识#

监听器是装饰器,它包装了您想要调度的函数。通过定义一个 queryupdate_records 函数会获得两个变量:1) 我们从数据集和查询中获取的 records,以及 2) 包含函数参数的 ctx,例如 querydataset

import argilla as rg
from argilla.listeners import listener

@listener(
    dataset="my_dataset", # dataset to get record from
    query="lucene query", # https://docs.v1.argilla.com.cn/en/latest/guides/query_datasets.html
    execution_interval_in_seconds=3, # interval to check execution of `update_records`
)
def update_records(records, ctx):
    # records get the records that adhere to the query
    for rec in records:
        # do something ,e.g., train a model, change records
        rec.metadata = {"updated": True}

    # ctx hold the listener info
    name = ctx.__listener__.dataset
    rg.log(name, records)

然后我们可以通过调用函数来启动监听器

update_records.start()
update_records.stop()

高级#

条件执行#

我们可以为预期记录数设置一个条件,以便在实际执行装饰函数之前需要满足该条件。

@listener(
    dataset="my_dataset", # dataset to get record from
    condition=lambda search: search.total == 10, # only executes if `query` results in 10 records
)

@listener(
    dataset="my_dataset", # dataset to get record from
    condition=lambda search: search.total > 10, #  only executes if `query` results in more than 10 records
)

可更新的 query_params#

在执行循环期间,可以更新和更改 query_params,以便根据查询的输出进行灵活查询。

@listener(
    dataset="uber-reviews", # dataset to get record from
    query="metadata.batch_id:{batch_id}",
    batch_id=0
)
def update_records(records, ctx):
    # next iteration the query is executed with batch_id = 1
    ctx.query_params["batch_id"] += 1

指标#

潜在的操作(如报告)可以基于 Argilla 提供的指标完成。

@listener(
    dataset="my_dataset", # dataset to get record from
    metrics=["F1"]
)
def update_records(records, ctx):
    # next iteration the query is executed with batch_id = 1
    print(ctx.metrics)

不加载记录#

有时我们可能只想监听,而无需直接加载和处理文档。

@listener(
    dataset="my_dataset", # dataset to get record from
    query_records=False
)
def update_records(ctx):
    # Don`t load the records
    pass