查看原文
其他

一言不合秀代码 | 怎样写好Mesos Framework

2017-03-27 敏国 数人云

“调度”这个词近两年被提到的比较多,资源调度管理应用生命周期等,带来了极大的便利,数人云开源的Mesos调度器Swan,基于Mesos实现应用调度框架,帮助用户轻松发布应用,实现应用的滚动更新,并根据用户指定的策略做应用的健康检测和故障转移。


授之以鱼不如授之以渔,小数带来工程师的代码级文章,透彻分析如何写Mesos Framework:


运 行 任 务



当我们接受到Mesos发来的offer以后,可以选择接受或拒绝offer。如果要运行任务,那么就Accept offer,  否则的话应该decline offer。代码示例如下: 

  

func (s *Scheduler) ReceivedOffers(offers []*mesos.Offer) {

    for _, offer := range offers {

                cpus, mem, disk := OfferedResources(offer)

                var tasks []*mesos.TaskInfo

                for taskLaunched < tasksCount &&

                        cpus >= NeededCpus &&

                        mem >= NeededMem &&

                        disk >= NeededDisk {

                        task, err := BuildTask(offer) 

                        if err != nil {

                                return fmt.Errorf("Build task failed: %s", err.Error())

                        }   

 

         

                        taskInfo := BuildTaskInfo(offer, task)

                        tasks = append(tasks, taskInfo)

 

                         taskLaunched++

                        cpus -= version.Cpus

                        mem -= version.Mem

                        disk -= version.Disk

                 }

                 LaunchTasks(offer, tasks)

      }  

}

OfferedResources(offer) 用来计算offer提供的资源,包括cpu, memory和disk的大小. BuildTaskInfo用来构造一个mesos所识别的TaskInfo结构体, LaunchTasks用来给Mesos发命令以创建task。LauchTasks的过程实际上是一个接受Mesos offer的过程,示例代码如下:

 

func (s *Scheduler) LaunchTasks(offer *mesos.Offer, tasks []*mesos.TaskInfo)  {

        logrus.Infof("Launch %d tasks with offer %s", len(tasks), *offer.GetId().Value)

        call := &sched.Call{

                FrameworkId: s.framework.GetId(),

                Type:        sched.Call_ACCEPT.Enum(),

                Accept: &sched.Call_Accept{

                        OfferIds: []*mesos.OfferID{

                                offer.GetId(),

                        },

                        Operations: []*mesos.Offer_Operation{

                                &mesos.Offer_Operation{

                                        Type: mesos.Offer_Operation_LAUNCH.Enum(),

                                        Launch: &mesos.Offer_Operation_Launch{

                                                TaskInfos: tasks,

                                        },

                                },

                        },

                        Filters: &mesos.Filters{RefuseSeconds: proto.Float64(1)},

                },

        }

 

        return s.send(call)

}

 

之后,如果创建tasks成功的话,应该就可以在Mesos ui上看到刚才创建的task了。



Task 状 态 更 新



Task状态是通过Mesos statusUpdate事件来更新的。Mesos task状态大概分以下几种:


TaskState_TASK_STAGING  TaskState = 6

TaskState_TASK_STARTING TaskState = 0
TaskState_TASK_RUNNING  TaskState = 1
TaskState_TASK_KILLING  TaskState = 8
TaskState_TASK_FINISHED TaskState = 2
TaskState_TASK_FAILED   TaskState = 3
TaskState_TASK_KILLED   TaskState = 4
TaskState_TASK_ERROR    TaskState = 7
TaskState_TASK_LOST TaskState = 5
TaskState_TASK_DROPPED TaskState = 9
TaskState_TASK_UNREACHABLE TaskState = 10
TaskState_TASK_GONE TaskState = 11
TaskState_TASK_GONE_BY_OPERATOR TaskState = 12
TaskState_TASK_UNKNOWN TaskState = 13

 


更新状态的示例代码如下:

 

func (s *Scheduler) status(status *mesos.TaskStatus) {

    state := status.GetState()

    taskId := status.TaskId.GetValue()

    switch state {

        case mesos.TaskState_TASK_STAGING:

             doSometing()                  

        case mesos.TaskState_TASK_STARTING:

             doSometing()                 

        case mesos.TaskState_TASK_RUNNING:

             doSometing()

        case mesos.TaskState_TASK_FINISHED:

             doSometing()                

        case mesos.TaskState_TASK_FAILED:

             doSometing()                

        case mesos.TaskState_TASK_KILLED:

             doSometing()                

        case mesos.TaskState_TASK_LOST:

             doSometing()

    }

}

                        

 

上面只是示例代码,具体的处理细节可以看 https://github.com/Dataman-Cloud/swan。




删 除 任 务


删除任务是通过给Mesos发送Call_KILL类型的消息来实现的,消息中指定了需要杀死的task的ID,具体示例代码如下:

 

func (s *Scheduler) KillTask(task *types.Task)(*http.Response, error) {

        call := &sched.Call{

                FrameworkId: s.framework.GetId(),

                Type:        sched.Call_KILL.Enum(),

                Kill: &sched.Call_Kill{

                        TaskId: &mesos.TaskID{

                                Value: proto.String(task.ID),

                        },  

                        AgentId: &mesos.AgentID{

                                Value: task.AgentId,

                        },  

                },  

        }   

 

        duration := proto.Int64(task.KillPolicy.Duration * 1000 * 1000)

        if task.KillPolicy != nil {

                if task.KillPolicy.Duration != 0 {

                        call.Kill.KillPolicy = &mesos.KillPolicy{

                                GracePeriod: &mesos.DurationInfo{

                                        Nanoseconds: duration,

                                },  

                        }   

                }   

        }   

 

        return s.send(call)

}

 

其中,Type类型指定了消息的类型,FrameworkId指定了当前Framework在Mesos注册的ID,task.ID指定了需要kill的task的ID,task.AgentId指定了需要kill的task所在的agentId.

 

killPolicy是一个自定义的优雅终止相关的策略,其中指定了优雅终止的超时时间duration,也就是Mesos先給task发一个SIGTERM的信号,让task有时间去做一些清理工作,如果task没有正常终止,在经过一定时间后发送SIGKILL去强制杀死task。这个时间由duration指定,Mesos默认是3秒。



Mesos 断 线 重 连


Framework和Mesos之间通过一个长连接进行通信,在某些情况下,连接可能出错,这时候就需要Framework重新去连接Mesos,示例代码如下:

 

func (s *Scheduler) subscribe() error {


    logrus.Infof("Subscribe with mesos master %s", s.Master)

    call := &sched.Call{

        Type: sched.Call_SUBSCRIBE.Enum(),

        Subscribe: &sched.Call_Subscribe{

            FrameworkInfo: s.Framework,

        },

    }

 

    if s.Framework.Id != nil {

        call.FrameworkId = &mesos.FrameworkID{

            Value: proto.String(s.Framework.Id.GetValue()),

        }

    }

 

    resp, err := s.Send(call)

    if err != nil {

        return err

    }

 

    // http might now be the default transport in future release

    if resp.StatusCode != http.StatusOK {

        return fmt.Errorf("Subscribe with unexpected response status: %d", resp.StatusCode)

    }

 

    go s.handleEvents(resp)

 

    return nil

}

 

func (s *Scheduler) resubscribe() {

    for {

        logrus.Warn("Connection to mesos got error, reconnect")

        if err := s.subscribe(); err == nil {

            return

        } else {

            logrus.Errorf("%s", err.Error())

        }


        <-time.After(2 * time.Second)

    }

}

 

func (s *Scheuler) handleEvents(resp *http.Response) {

    defer func() {

        resp.Body.Close()

    }()

 

    r := NewReader(resp.Body)

    dec := json.NewDecoder(r)

 

    for {

        event := new(sched.Event)

        if err := dec.Decode(event); err != nil {    //got some error, reconnect.

        go func() {

            s.resubscribe()

        }()

 

        return

        }

 

        switch event.GetType() {

            case sched.Event_SUBSCRIBED:

                doSomework()

            case sched.Event_OFFERS:

                doSomework()

            case sched.Event_RESCIND:

                doSomework()

            case sched.Event_UPDATE:

                doSomework()

            case sched.Event_MESSAGE:

                doSomework()

            case sched.Event_FAILURE:

                doSomework()

            case sched.Event_ERROR:

                doSomework()

            case sched.Event_HEARTBEAT:

                doSomework()

            }

        }

    }

}

 

函数resubscribe用来向Mesos重新注册,如果注册失败,隔2秒之后会重试,直到连接成功为止。注册成功后会在一个新的goroutine里继续原来逻辑的处理。具体可以查看Mesos文档()的Disconnections小节关于重连的内容。


五步走的Mesos Framework教程就分享到这里了,更多代码请跳转https://github.com/Dataman-Cloud/swan,欢迎Star&Fork。




------------------      活动推荐      -------------------



数人云Meetup北京|大牛的成长轨迹之架构&运维

一年之计在于春,
数人云邀请到了架构&运维的前辈们来分享自己的成长轨迹以及技术发展方向
或许会给在路上的你提供一些帮助,
坚持一下就能看到诗和远方。




点击阅读原文报名啦~


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存