Go-Job让你的任务调度不再繁琐|得物技术
目录
一、背景
二、架构设计
1. 整体架构
2. 服务端设计
2.1 服务组成
2.2 任务设计
2.2.1 任务生成
2.2.2 任务匹配
2.2.3 任务分片
2.2.4 任务生命周期
2.2.5 任务与执行器
3. SDK设计
3.1 任务执行
3.2 连接管理
3.2.1 建立连接
3.2.2 断线重连
三、实战指南
1. 代码开发
2. 触发器创建
3. 任务查看
四、成果与展望
一
背景
二
架构设计
整体架构
namespace:命名空间,用于资源隔离。
handler:任务处理类,用户自定义实现。
worker:运行handler的业务服务,通过SDK接入,与Go-Job服务端通信。
trigger:调度平台上的触发器,包含调度规则、模式、超时配置等。
task:根据调度规则生成的任务信息。
runInstance:任务执行的最小单元。
Web控制台:负责任务配置管理,包括任务创建/编辑、历史任务查看、权限控制等; 调度服务:负责触发器管理、任务生成、任务调度匹配等; 执行器:接收并执行任务,同时将自身状态上报给调度服务。
通过SDK注册实现脚本逻辑的函数; 通过在Web控制台创建定时任务; 通过调度服务负责任务的定时创建,并下发给匹配的执行器; 执行器执行接收到的任务,并上报任务状态。
(GPT理解的架构)
服务端设计
服务组成
Controller:服务集群的统一入口,提供RESTful API和RPC接口服务,所有任务的增删改查以及客户端SDK的连接操作都由Controller处理,并将相应的操作转发到其它模块。 提供Web控制台管理的REST API接口(包括命名空间管理、任务管理、数据查询等操作)。 提供SDK连接管理能力,负责维护客户端通过SDK接入的长连接状态。 作为请求控制入口,所有的请求都需要经过Controller来实现转发。 Trigger:调度服务的核心模块,通过监听触发器状态,定时创建任务,并推送到Matching服务用于任务匹配。 提供命名空间、触发器等数据的增删改查能力。 监听所有触发器,负责在规定时间到达时创建任务。 将生成的任务推送到Matching服务。 Matching:负责分配调度任务到接入客户端的节点上,通过接收Trigger推送的待匹配任务,基于匹配策略将其与接入的客户端节点进行匹配,并将匹配成功的任务和客户端进行绑定操作,推送完整的任务信息给SDK。
(GPT理解的服务关系)
任务设计
任务生成
时效性保证:整个任务触发队列的消费者支持水平扩容,通过监控生产/消费速率,能够灵活的通过增加消费节点的形式来增加消费者实例从而提高处理并发度,减少排队时间。 数据一致性: 单次触发事件的消费,保证在一个事务下进行,基于持久化的特性,支持失败重试,确保任务最终被触发一次。 健康检查: 定时检查已注册未停用的触发器清单,通过计算它们的调度规则结合它们最近一次触发的任务信息,综合确认触发器任务生成的健康状况,如果监测到未正常创建任务的触发器,则生产异常报告,并推送给运维人员。
任务匹配
queueManage: 职责:负责管理不同脚本的任务队列状态,包括队列的创建、删除等操作。通过管理不同的taskQueue来组织任务,根据任务的命名空间和脚本ID生成唯一索引,以确保不同脚本队列的唯一性。 taskQueue: 职责:存储等待调度的任务。调度器通过queueManage获取对应的taskQueue,从中获取当前脚本等待调度的任务。 内部子队列:
activeQ:基于触发时间排序的优先队列,触发时间与当前时间越接近的任务会优先被取出。 deadlineQ:已匹配或已超时的任务会移入此队列,并最终归档。
clientCache: 职责:维护所有客户端节点的请求状态。 功能:每个客户端的任务请求都会经历以下四个主要步骤:
入队 (inQueue):客户端请求进入等待队列。 等待 (wait):客户端在队列中等待任务分配。 超时/获取任务 (timeout/matched):在等待过程中,如果等待超时则进入超时处理状态,如果获取到任务则进入任务绑定状态。 返回 (return):任务匹配完毕后,客户端返回。
过滤 (filter): 职责:剔除状态为已超时、已匹配、已关闭等不可调度的任务,确保进入匹配阶段的任务都是可执行的。 匹配 (match): 职责:将经过过滤的任务和客户端进行匹配,基于标签 (label) 的匹配机制,只有双方标签一致的情况下,任务才能被分配给客户端。此机制目前用于支持染色环境、蓝绿部署等高级功能。
任务生成:新的任务生成后,进入对应的任务队列 (taskQueue) 中等待调度。 触发调度:当检测到新任务或新客户端接入时,触发一次匹配动作。 过滤和匹配: 调度器首先对任务进行过滤,剔除不可调度的任务。 过滤后的任务和客户端进入匹配阶段,基于标签 (label) 进行匹配。
任务分片
Task A:未设置分片,由单台pod执行,最终耗时为8分钟。 Task B:设置了4个分片,因此每个分片处理1/4的业务数据,每个子任务耗时2分钟。由于分片任务并发执行,最终耗时为2分钟。
func (w *HelloHandler) Do(ctx job.Context) error {
info := job.GetTaskInfo(ctx)
fmt.Printf("完整参数 :%s \n", info.GetParam())
fmt.Printf("分片参数 :%s \n", info.GetShardParam())
fmt.Printf("当前执行id :%s \n", info.GetRunInstanceId())
fmt.Printf("分片id :%d \n", info.GetShardNum())
fmt.Printf("任务id :%v \n", info.GetTaskId())
...
return nil
}
任务生命周期
创建 (Created):触发器被触发后,会创建一个新的任务,此时任务的初始状态即为Created状态。在此阶段,系统会创建任务并初始化相关数据,并推送到Matching服务等待动态。 等待执行 (WaitingToRun):任务在此状态表示已经和执行器进行了匹配,等待被执行器接收并开始执行。执行器通过拉取请求获得任务并创建runInstance。 执行中 (Running):任务正在被执行器处理。执行器上报任务开始事件,任务状态更新为“执行中”。 取消 (Canceled):任务在执行过程中可能因人为干预或系统策略被取消。任务被取消后,状态更新为“取消”。 完成 (RunToCompletion):任务成功执行并完成。执行器上报任务完成事件,任务状态更新为“完成”,并结束生命周期。 失败 (Faulted):任务执行过程中可能会失败。失败的任务状态更新为“失败”,系统可能会触发重试策略。
任务与执行器
独立队列:不同脚本的任务队列相互独立,保证任务互不干预。 动态扩展:执行器能够根据任务数量动态扩展或缩减节点数量,提高系统的灵活性和适应性。
任务数大于执行器数
任务数小于执行器数
假设有3个执行器,但脚本A的任务数少于3个,此时第三个执行器将处于等待任务的状态,直到有新的任务生成并匹配成功后才能继续执行任务。
(GPT理解的任务设计)
SDK设计
连接管理: 负责建立和维护与gRPC服务器的连接。该模块确保在连接断开时,能够及时检测并重新连接。 数据收发: 负责通过gRPC Stream双向流发送和接收数据,包括接收任务和将结果返回给服务端。 任务执行:负责异步执行任务对应Do函数方法。 健康检查: 通过定期发送心跳包来维持连接的活跃状态,并监控连接的健康状况。在检测到故障时,触发相应的处理机制,如重启模块或重新建立连接。 配置管理: 负责管理SDK依赖的各种配置信息,包括环境变量、label信息、服务端地址等。这些配置可以通过外部配置文件或动态更新。
任务执行
接收任务 (recvTask):从数据收发模块接收任务数据。 初始化任务上下文 (initTaskCtx):初始化任务处理所需的上下文信息(函数初始化、context初始化、超时设置等)。 执行任务 (Process Do):执行对应任务函数逻辑并返回执行结果。 任务完成 (finishTask):任务执行结束后的清理工作,包括对象销毁,结果上报等。
连接管理
建立连接
未连接状态: 客户端 (client) 和服务端 (service) 初始处于未连接状态。 发起连接请求: 客户端向服务端发起gRPC stream连接请求。 服务端接收连接请求。 连接成功通知: 服务端成功接收连接请求后,发送连接成功通知给客户端。 客户端接收到连接成功通知后,触发OnConnected事件。 客户端初始化: 在OnConnected事件中,客户端通过该连接注册相关的脚本信息。 服务端初始化: 服务端在连接建立后,开始监测心跳状态。 服务端注册client节点信息,准备处理任务。 数据交互: 双方完成连接初始化后,基于该链接开始进行数据交互,包含任务请求、下发、状态上报等操作。 断线重连
连接异常断开: 客户端和服务端检测到连接异常断开。 触发重连事件: 客户端触发Reconnecting事件,并开始尝试重新建立连接。 定时发起一次连接请求,直到连接成功。 服务端处理断开事件: 服务端检测到连接断开,触发OnDisconnected事件。 移除客户端注册的信息,清理相关资源。 重新建立连接: 客户端再次向服务端发起gRPC stream连接请求。 服务端接收连接请求并处理。 连接恢复: 服务端成功接收连接请求后,发送连接成功通知给客户端。 客户端接收到连接成功通知后,触发Reconnected事件。 重新初始化: 客户端在Reconnected事件中,重新注册相关的脚本。 服务端重新注册client节点信息,继续处理任务。 数据交互恢复: 链接重新建立后,基于该链接开始进行数据交互,包含任务请求、下发、状态上报等操作。
(GPT理解的SDK)
三
实战指南
代码开发
在这段旅程中,将通过简单的5个步骤来实现一个自定义脚本,轻松接入调度平台。
go-job-sdk
import "go-job-sdk/config"
Config struct {
JobConfig config.Config `yaml:"jobConfig"`
}
在Do接口内,实现业务逻辑
import (
"go-job-sdk/job"
)
type HelloHandler struct{}
func (w *HelloHandler) Do(ctx job.Context) error {
// 可以通过GetTaskInfo方法可以从上下文中获取任务的基本信息
info := job.GetTaskInfo(ctx)
fmt.Printf("参数 :%s \n", info.Param)
fmt.Printf("当前执行id :%s \n", info.RunInstanceId)
fmt.Printf("分片id :%d \n", info.ShardNum)
fmt.Printf("执行超时时间 :%v \n", info.RunTimeout)
fmt.Printf("任务id :%v \n", info.TaskId)
return nil
}
import (
...
"go-job-sdk/config"
"go-job-sdk/job"
"go-job-sdk/worker"
)
func main() {
group := worker.NewWorkerGroup(context.Background(), cfg.JobConfig)
group.Add("hello-world1", &HelloHandler{}) // key 服务内唯一
group.Add("hello-world2", &HelloHandler{}) // key 服务内唯一
// 启动: 连接并注册脚本到go-job
if err := group.Start(); err != nil {
fmt.Println(err)
return
}
// 该方法会block在这里. 若不需要block, 可以不调用wait方法
if err := group.Wait(); err != nil {
fmt.Println(err)
return
}
}
jobConfig:
app: "demo" # 脚本所在服务名称
disable: true # 是否停止job注册 默认:false
jobCenterService:
rpcAddress: "xx" # 调度平台服务地址
namespace: "test" # 命名空间
handlers:
hello-world1: # [脚本名称]关闭指定名称的脚本
disable: true
触发器创建
支持丰富配置,包括调度策略、重试次数、分片设置、执行超时、任务超时、上下线设置、参数设置、染色环境、告警配置等。
任务查看
控制台支持查看触发器的调度历史及任务执行详情。
四
成果与展望
任务监控与告警 可视化监控:开发实时监控仪表盘,提供任务执行状态、资源使用情况等关键信息的可视化展示。 高级告警配置:支持更多维度的告警配置,如任务执行时间过长、资源超限等,并提供多渠道通知方式(如邮件、短信、电话等)。 丰富的调度策略 多层级调度:支持基于业务优先级的多层级调度策略,确保关键任务优先执行。 节假日和工作日调度:增加如节假日、工作日等场景的调度策略,满足不同场景的调度需求。 安全与合规 权限管理:加强用户权限管理,细化权限控制,确保系统数据安全。
往期回顾
文 / fred
关注得物技术,每周一、三、五更新技术干货
要是觉得文章对你有帮助的话,欢迎评论转发点赞~
未经得物技术许可严禁转载,否则依法追究法律责任。
“
扫码添加小助手微信
如有任何疑问,或想要了解更多技术资讯,请添加小助手微信: