技术分享|Hangfire深度实践
源宝导读:Hangfire是一个开源的.NET任务调度框架,提供了内置集成化的控制台,可以直观明了的查看作业调度情况。本文将介绍Hangfire使用, ERP中接入Hangfire任务完成耗时操作场景以及Hangfire特性深度实践。
一、背景
在ERP中存在大量的耗时操作场景,例如售楼的房源生成,会同时生成上万个房间,用户可能会等待长达一分钟,并且不能离开当前页面。
基于此场景平台需要提供后台作业+实时消息通知机制,房间生成操作调整为后台作业,生成成功后推送站点通知告知用户。
在平台中还有一个调度服务需要周期性执行定时任务,例如发送邮件、短信等。
定时任务调度,是一个老生常谈的问题。网上有许多定时任务调度的解决方案,之前使用定时任务调度框架Quartz.Net,基于此框架开发调度服务。
在此基础上有很多功能都需要自己开发,而Hangfire自带后台任务调度面板,包含日志等一系列功能,也可以在后台手动执行任务。
在该篇文章中主要介绍一下什么是Hangfire,Hangfire的基本特征与优点和ERP中应用实践。
二、Hangfire介绍
Hangfire是一个开源的.NET任务调度框架,提供了内置集成化的控制台,可以直观明了的查看作业调度情况,并且Hangfire不需要依赖于单独的应用程序执行(如:Windows服务,Windows计划),并且支持持久性存储。
下图描述了Hangfire的主要架构:
从上图看,它是由:客户端、作业存储、服务端组成的。
后台作业由 Hangfire Server 处理。它实现一组专用(非线程池的)后台线程,用于从作业存储中取出作业并处理,服务端还负责自动删除旧数据以保持作业存储干净。
Hangfire为任何一个作业存储使用可靠的处理算法,因此您可以内置在Web应用程序中处理,而不会担心在应用程序重新启动,进程终止等情况下丢失后台作业。
使用示例
本示例使用ASP.NET Core宿主承载Hangfire。
新建ASP.NET Core空项目,然后通过在程序包管理控制台中输入安装命令安装Hangfire所需NuGet包:
Install-Package Hangfire.Core
Install-Package Hangfire.AspNetCore
使用SQL Server作为存储时需要安装的NuGet包:
Install-Package Hangfire.SqlServer
也可以通过快速编辑.csproj 文件中的PackageReference来快速引用:
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Hangfire.Core" Version="1.7.24" />
<PackageReference Include="Hangfire.SqlServer" Version="1.7.24" />
<PackageReference Include="Hangfire.AspNetCore" Version="1.7.24" />
</ItemGroup>
在SQL Server数据库中创建一个HangfireDemo数据库,然后在程序appsettings.json中配置数据库连接:
{
"ConnectionStrings": {
"HangfireConnection": "Server=.\\SQL2016;Database=HangfireDemo;User ID=sa;Password=;"
},
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*"
}
接着在项目的Startup.cs ConfigureServices方法注册Hangfire相关服务:
public void ConfigureServices(IServiceCollection services)
{
// Add Hangfire services.
services.AddHangfire(configuration => configuration
.SetDataCompatibilityLevel(CompatibilityLevel.Version_170)
.UseSimpleAssemblyNameTypeSerializer()
.UseRecommendedSerializerSettings()
.UseSqlServerStorage(Configuration.GetConnectionString("HangfireConnection"), new SqlServerStorageOptions
{
CommandBatchMaxTimeout = TimeSpan.FromMinutes(5),
SlidingInvisibilityTimeout = TimeSpan.FromMinutes(5),
QueuePollInterval = TimeSpan.Zero,
UseRecommendedIsolationLevel = true,
DisableGlobalLocks = true
}));
// Add the processing server as IHostedService
services.AddHangfireServer();
services.AddMvc();
}
在Configure 方法配置面板及路由:
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IBackgroundJobClient backgroundJobs)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseHangfireDashboard();
backgroundJobs.Enqueue(() => Console.WriteLine("Hello world from Hangfire!"));
app.UseEndpoints(endpoints =>
{
endpoints.MapGet("/", async context =>
{
await context.Response.WriteAsync("Hello World!");
});
});
}
运行程序,访问 http://localhost:5000/hangfire ,可以看到仪表盘。
控制台会输出 Hello world from Hangfire! ,在作业的完成中可以看到一条完成的记录。
然后在数据库中Hangfire会自动生成Hangfire.前缀的相关表,来持久化任务等信息。
Hangfire ERP应用实践
以房源生成为例
1.先定义一个后台作业计算需要的参数类型。
/// <summary>
/// 批量生成房源的参数
///</summary>
public class BatchGenerateRoomJobArgs
{
///<summary>
/// 所属楼栋
///</summary>
public Guid BldGUID { get; set; }
///<summary>
/// 房间个数
///</summary>
public int RoomNumber { get; set; }
}
作业参数不需要实现接口或继承基类,应该是可序列化的,因为该参数需要被序列化并存储到数据库中。
作业管理默认使用的是 JSON 序列化的方式。参数应该做到简洁,不应该包含实体或者其他非序列化的对象。正如示例所展示的,应该仅存储实体的Id并从仓储内得到该实体的作业。
2.然后实现作业执行器。
///<summary>
/// 批量生成房源后台作业执行器
///</summary>
[BackgroundJobName("slxt-generate-room", "批量生成房源", "0011")]
public class BatchGenerateRoomJob : IBackgroundJob
{
public override void Execute(BatchGenerateRoomJobArgs args)
{
// 产生房源的实现代码
}
}
作业执行器需要实现IBackgroundJob接口,实现Execute方法。
作业类型上需要BackgroundJobNameAttribute标记,在参数中说明作业的标识、名称和所属子系统。
作业上下文:
没有HTTP上下文:无法获取Http上下文对象,也可能无法使用Session之类的服务器对象。
数据库操作:作业执行器已经初始化了数据库连接上下文,如果启用了应用分库,也会将数据库切换到作业归属应用的数据库上下文中,因此,可以像普通代码一样使用CPQuery或实体服务来操作数据库。
注册后台作业
在应用初始化过程中,将作业执行器和参数注册到后台作业列表中。
internal static class AppInitializer
{
public static void Init()
{
BackgroundJobRegister.Add<BatchGenerateRoomJob, BatchGenerateRoomJobArgs>();
}
}
运行后台作业
继续以批量生成房源为例,示例代码如下:
///<summary>
/// 房源生成应用服务
///</summary>
public class RoomAppService : AppService
{
private readonly IBackgroundJobManager _backgroundJobManager = MyServiceLocator.GetService<ibackgroundjobmanager< a="">>();
///<summary>
/// 批量生成房源的异步作业
///<summary>
///<returns></returns>
public virtual void BatchGenerateRoom(Guid buildGuid, int roomNumber)
{
var batchGenerateRoomJobArgs = new BatchGenerateRoomJobArgs { BldGUID = buildGuid, RoomNumber = roomNumber };
// 执行后台作业
// 将作业参数加入后台作业队列中
_backgroundJobManager.Enqueue(batchGenerateRoomJobArgs);
}
}
将作业参数加入到执行队列中后,作业就会由Hangfire自动调度执行。
执行结果通知: 作业执行完成后,可以自行通知用户(例如邮件、消息等),也可以使用实时消息推送方案来发出用户通知。
异常处理: 作业在执行过程中,如果出现异常,会记录系统异常日志,如果是业务异常(BusinessLogicException),会停止执行并将任务标记为失败;其他类型的异常会自动重试,如果重试次数达到上线(10次),作业将标记为失败,不再执行。
Hangfire ERP深度实践
在ERP中深度集成了Hangfire,对于Hangfire 的一些默认信息以及特性做了一些改造和适配,下面一一介绍深度特性。
Hangfire替换默认重试机制
首先对于Hangfire增加过滤器,替换掉Hangfire的默认重试机制,使重试控制在自己的手中。
同时支持用户在自己的Job中打标控制任务的重试次数及间隔时间,使任务控制更加灵活和业务更贴合。
实现IJobFilterProvider 接口,实现获取Filter的方法
internal class HangfireIocJobFilterProvider : IJobFilterProvider
{
public IEnumerableGetFilters(Job job)
{
if (job == null)
return new List();
var jobFilters = JobFilterProviders.Providers.GetFilters(job).ToList();
// 使用ExceptionFilterAttribute替换默认的重试
jobFilters.RemoveAll(x => x.Instance is AutomaticRetryAttribute);
// 获取自定义的BackgroundJobAutomaticRetryAttribute的重试次数、重试间隔时间参数
// 此处Job为ExecutorJobAdapter,实际的Job在Adapter的第一个泛型参数中
var types = job.Type.GetGenericArguments();
var retryAttr = BackgroundJobAutomaticRetryAttribute.GetByJobType(types);
if (retryAttr == null)
jobFilters.Add(new JobFilter(new ExceptionFilterAttribute(), JobFilterScope.Type, null));
else
jobFilters.Add(new JobFilter(new ExceptionFilterAttribute(retryAttr.Attempts, retryAttr.DelaysInSeconds), JobFilterScope.Type, null));
return jobFilters;
}
}
然后将这个实现设置到BackgroundJobServer中:
private BackgroundJobServer CreateJobServer()
{
var jobOptions = new BackgroundJobServerOptions
{
FilterProvider = GetService(),
};
var server = new BackgroundJobServer(jobOptions, JobStorage.Current);
return server;
}
这样就替换掉默认重试机制 AutomaticRetryAttribute为ExceptionFilterAttribute。后续任务在失败时就会依据设置的间隔及间隔评论进行重试,对于未打标的Job还是执行默认的重试次数3次,间隔也是默认。
Hangfire任务通用参数
ERP中还有一个场景,针对每个任务创建的时候都需要参数应用、任务名称、任务描述。
这时就可以使用Hangfire的Filter,Filter的实现跟MVC的Action Filter是一样的,都是责任链模式。
创建的时候实现的是IClientFilter,消费执行时候如果要实现过滤需要实现IServerFilter。
以下实现IClientFilter,添加参数应用、任务名称、任务描述:
internal class JobAttachParameterFilterAttribute:JobFilterAttribute,IClientFilter
{
public void OnCreating(CreatingContext filterContext)
{
if(filterContext==null)
throw new ArgumentNullException(nameof(filterContext));
//这里由于调度任务中AsyncJobStartArguments实现了IBackgroundJobConfigurationGetter
//所以需要根据Args获取到配置信息
var context=filterContext.Job.Args.OfType<executecontext< a="">>().FirstOrDefault();
var jobType=filterContext.Job.Type.GetGenericArguments().FirstOrDefault();
if (context == null||jobType==null)
return;
var config = BackgroundJobContainer.Instance.GetJobConfigByJobTypeOrArg(context.GetArg(),jobType);
filterContext.SetJobParameter("Application", config.AppCode);
filterContext.SetJobParameter( "JobName", config.JobName);
filterContext.SetJobParameter( "JobDescription", config.JobDescription);
}
public void OnCreated(CreatedContext filterContext)
{
}
}
在注册Job时添加的信息或者实现IBackgroundJobConfigurationGetter 接口的信息就会被写入到Hangfire的Job中,这样避免重复传递参数,通过拦截根据上下文直接获取。
Hangfire实践经验分享
问题1:
产品在做并发测试时,大量插入了Hangfire Job,但是Hangfire Job插入很慢,导致任务大量累积,并不能做到高性能。
原因:
ERP 中Hangfire 默认使用SQL Server存储任务,这时默认会使用全局锁,在Hangfire入队的时候会导致入队是串行执行的。
解决方案:
使用Hangfire提供的特性,将全局锁改为使用局部锁。也即将DisableGlobalLocks、UsePageLocksOnDequeue、UseRecommendedIsolationLevel设为true。
部分调整代码如下:
JobStorage.Current = new SqlServerStorage(connectionInfo.MasterDb, new SqlServerStorageOptions
{
UseRecommendedIsolationLevel = true,
UsePageLocksOnDequeue = true,
DisableGlobalLocks = true
});
问题2:
ERP中有个数据分发任务,在分库场景中会自动分发组织和用户等数据,默认5s会分发一次。在主系统中新增一个组织后,发现在子系统无法引入这个组织,但是过一会又可以引入。
实际分发是成功,但是分发任务的时间远远大于5s。
原因:
分析任务执行日志,发现时间分发任务间隔为15s,而且一直都是15s执行一次。明明配置的5s执行分发一次,为何会变为15s?
打开Hangfire代码分析,发现Hangfire默认设置的等待时间为15s,如果调度器队列里面没有任务,默认等待15s,再进入队列取数据,从而导致配置的5s未生效而变为15s。
主要代码位于RecurringJobScheduler类中:
等待时间来源配置BackgroundJobServerOptions.SchedulePollingInterval 属性,默认代码中为15s
解决方案:
在BackgroundJobServerOptions 设置属性调整为3s,这样可以保证5s周期的任务可以正确执行。对于更短周期,可以将此属性调整为0,即无任务时不做等待。
部分调整代码如下:
var jobOptions = new BackgroundJobServerOptions
{
StopTimeout = TimeSpan.FromSeconds(15),
ShutdownTimeout = TimeSpan.FromSeconds(30),
SchedulePollingInterval = TimeSpan.FromSeconds(3),
WorkerCount = 40,
ServerName = GetHangfireServiceName()
};
其中参数说明:
ServerName:由于Hangfire是一个Storage,Client,Server结构的程序,该字段配置一个进程里面多个Server实例的服务名称;
WorkerCount:初始化工作线程数,可以最多同时执行任务个数;
Queues:Hangfire可以设置工作队列,如果想设置工作的优先级或者根据服务拆分任务,可以使用Queues;
StopTimeout:设置该参数之后,如果遇到了服务器的关机事件,Hangfire继续在设置的时间内执行任务,而不是立刻中断;
ShutdownTimeout:配置心跳检查进程的取消的超时时间;
SchedulePollingInterval:任务调度轮询间隔,Hangfire使用轮询的机制对队列中的Job进行处理,如果队列里面没有Job,Hangfire会默认等待15秒,然后再去队列取数。
可以根据实际场景灵活调整。
Hangfire最佳实践
Job方法的参数要简单
方法调用(即作业)在Hangfire创建作业过程中参数会使用JSON序列化转换为 JSON 字符串。
如果传递的是复杂实体或大型对象,包括阵列,最好将其放入数据库,然后传递主键等关键信息到Job方法。
以下示例,最佳实践:
public void Method(int entityId) { }
不建议做法:
public void Method(Entity entity) { }
方法可重复执行
重复执行意味着方法可以在执行过程中间中断,然后再次安全调用。中断可能由许多不同的情况(例如,服务器关闭等)引起,Hangfire 将尝试进行多次重试处理。
如果你的任务不支持重复执行,可能会存在很多问题。例如,使用Job发送电子邮件,多次重试会导致,发送给同一收件人多封电子邮件。
例如发送邮件示例代码,最佳实践:
public void Method(int deliveryId)
{
if (_emailService.IsNotDelivered(deliveryId))
{
_emailService.Send("person@example.com", "Hello!");
_emailService.SetDelivered(deliveryId);
}
}
不建议做法:
public void Method()
{
_emailService.Send("person@example.com", "Hello!");
}
三、总结
Hangfire 提供多种多样的特性,满足灵活的配置,而且拥有直观的面板,可以方便查看Job信息及异常信息,而且还可以在面板中手动执行任务,Hangfire 是一个优秀的任务调度框架。
ERP 接入Hangfire,提供后台任务的简单调用方式,对于后台执行任务及长耗时作业提供了强大的支持,使ERP支持更加丰富完善,具备更高性能。
张同学: 研发工程师,目前负责建模平台相关研发工作。