⏲️ 作业调度和回调#
反馈数据集#
注意
截至目前,FeedbackDataset
不提供作业调度支持。如果您想使用它,您需要使用其他数据集之一。要获取有关数据集差异的更多信息,您可以查看此处。
其他数据集#
本指南简要介绍了 Argilla 监听器。Argilla 监听器使您能够构建细粒度的复杂工作流程作为后台进程,就像与 Argilla 直接集成的低调作业调度替代方案。
主要目标是方便用户定义和自定义他们的 Argilla 体验。请注意,关于使用 small-text 进行主动学习的教程是监听器功能强大的一个很好的例子。或者,您可以查看Python 客户端以熟悉它。
此功能是实验性的,您可以预期 Python API 会有一些更改。请在 Github 上报告您遇到的任何问题。此外,Jupyter Notebooks 可能需要完全重启以确保所有后台进程都已正确停止。
安装依赖项#
为了使用监听器,您需要安装以下依赖项
%pip install argilla[listeners] -qqq
基础知识#
监听器是装饰器,它包装了您想要调度的函数。通过定义一个 query
,update_records
函数会获得两个变量:1) 我们从数据集和查询中获取的 records
,以及 2) 包含函数参数的 ctx
,例如 query
和 dataset
。
import argilla as rg
from argilla.listeners import listener
@listener(
dataset="my_dataset", # dataset to get record from
query="lucene query", # https://docs.v1.argilla.com.cn/en/latest/guides/query_datasets.html
execution_interval_in_seconds=3, # interval to check execution of `update_records`
)
def update_records(records, ctx):
# records get the records that adhere to the query
for rec in records:
# do something ,e.g., train a model, change records
rec.metadata = {"updated": True}
# ctx hold the listener info
name = ctx.__listener__.dataset
rg.log(name, records)
然后我们可以通过调用函数来启动监听器
update_records.start()
update_records.stop()
高级#
条件执行#
我们可以为预期记录数设置一个条件,以便在实际执行装饰函数之前需要满足该条件。
@listener(
dataset="my_dataset", # dataset to get record from
condition=lambda search: search.total == 10, # only executes if `query` results in 10 records
)
@listener(
dataset="my_dataset", # dataset to get record from
condition=lambda search: search.total > 10, # only executes if `query` results in more than 10 records
)
可更新的 query_params
#
在执行循环期间,可以更新和更改 query_params
,以便根据查询的输出进行灵活查询。
@listener(
dataset="uber-reviews", # dataset to get record from
query="metadata.batch_id:{batch_id}",
batch_id=0
)
def update_records(records, ctx):
# next iteration the query is executed with batch_id = 1
ctx.query_params["batch_id"] += 1
指标#
潜在的操作(如报告)可以基于 Argilla 提供的指标完成。
@listener(
dataset="my_dataset", # dataset to get record from
metrics=["F1"]
)
def update_records(records, ctx):
# next iteration the query is executed with batch_id = 1
print(ctx.metrics)
不加载记录#
有时我们可能只想监听,而无需直接加载和处理文档。
@listener(
dataset="my_dataset", # dataset to get record from
query_records=False
)
def update_records(ctx):
# Don`t load the records
pass