查看原文
其他

一文读懂 SuperEdge 拓扑算法

杜杨浩,腾讯云高级工程师,热衷于开源、容器和Kubernetes。目前主要从事镜像仓库、Kubernetes集群高可用&备份还原,以及边缘计算相关研发工作。

前言

SuperEdge 介绍

SuperEdge 是基于原生 Kubernetes 的边缘容器管理系统。该系统把云原生能力扩展到边缘侧,很好的实现了云端对边缘端的管理和控制。同时 superedge 自研了 service group 实现了基于边缘计算的服务访问控制,极大简化了应用从云端部署到边缘端的过程。

SuperEdge service group拓扑感知特性

SuperEdge service group 利用 application-grid-wrapper 实现拓扑感知,完成了同一个 nodeunit 内服务的闭环访问。

在深入分析 application-grid-wrapper 之前,这里先简单介绍一下社区 Kubernetes 原生支持的拓扑感知特性[1]

Kubernetes service topology awareness 特性于v1.17发布 alpha 版本,用于实现路由拓扑以及就近访问特性。用户需要在service 中添加 topologyKeys 字段标示拓扑key类型,只有具有相同拓扑域的 endpoint 会被访问到,目前有三种 topologyKeys 可供选择:

  • "kubernetes.io/hostname":访问本节点内(kubernetes.io/hostname label value相同)的endpoint,如果没有则service访问失败
  • "topology.kubernetes.io/zone":访问相同zone域内(topology.kubernetes.io/zone label value相同)的endpoint,如果没有则service访问失败
  • "topology.kubernetes.io/region":访问相同region域内(topology.kubernetes.io/region label value 相同)的 endpoint,如果没有则 service 访问失败

除了单独填写如上某一个拓扑key之外,还可以将这些key构造成列表进行填写,例如:["kubernetes.io/hostname", "topology.kubernetes.io/zone", "topology.kubernetes.io/region"],这表示:优先访问本节点内的 endpoint;如果不存在,则访问同一个 zone 内的 endpoint;如果再不存在,则访问同一个 region 内的 endpoint,如果都不存在则访问失败。

另外,还可以在列表最后(只能最后一项)添加"*"表示:如果前面拓扑域都失败,则访问任何有效的 endpoint,也即没有限制拓扑了,示例如下:
# A Service that prefers node local, zonal, then regional endpoints but falls back to cluster wide endpoints.
apiVersion: v1
kind: Service
metadata:  
  name: my-service
spec:  
  selector:    
    app: my-app  
  ports:    
    - protocol: TCP      
      port: 80      
      targetPort: 9376  
  topologyKeys:    
    - "kubernetes.io/hostname"
    - "topology.kubernetes.io/zone" 
    - "topology.kubernetes.io/region"
    - "*"

而service group 实现的拓扑感知和社区对比,有如下区别:

  • service group 拓扑 key 可以自定义,也即为 gridUniqKey,使用起来更加灵活;而社区实现目前只有三种选择:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region"。
  • service group 只能填写一个拓扑 key,也即只能访问本拓扑域内有效的 endpoint,无法访问其它拓扑域的 endpoint;而社区可以通过 topologyKey 列表以及"*"实现其它备选拓扑域 endpoint 的访问。

service group 实现的拓扑感知,service 配置如下:

# A Service that only prefers node zone1al endpoints.
apiVersion: v1
kind: Service
metadata:  
  annotations:    
    topologyKeys: '["zone1"]'  
  labels:    
    superedge.io/grid-selector: servicegrid-demo  
  name: servicegrid-demo-svc
spec:  
  ports:  
  - port: 80    
    protocol: TCP    
    targetPort: 8080  
  selector:    
    appGrid: echo

在介绍完 service group 实现的拓扑感知后,我们深入到源码分析实现细节。同样的,这里以一个使用示例开始分析:

# step1: labels edge nodes
$ kubectl  get nodes
NAME    STATUS   ROLES    AGE   VERSIO
Nnode0   Ready    <none>   16d   v1.16.7
node1    Ready    <none>   16d   v1.16.7
node2    Ready    <none>   16d   v1.16.7
# nodeunit1(nodegroup and servicegroup zone1)
$ kubectl --kubeconfig config label nodes node0 zone1=nodeunit1  
# nodeunit2(nodegroup and servicegroup zone1)
$ kubectl --kubeconfig config label nodes node1 zone1=nodeunit2
$ kubectl --kubeconfig config label nodes node2 zone1=nodeunit2

...

# step3: deploy echo ServiceGrid
$ cat <<EOF | kubectl --kubeconfig config apply -f -
apiVersion: superedge.io/v1
kind: ServiceGrid
metadata:  
  name: servicegrid-demo  
  namespace: default
spec:  
  gridUniqKey: zone1  
  template:    
    selector:      
      appGrid: echo    
    ports:    
    - protocol: TCP      
      port: 80      
      targetPort: 8080
EOF
servicegrid.superedge.io/servicegrid-demo created
# note that there is only one relevant service generated
$ kubectl  get svc
NAME              TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)   AGE
kubernetes          ClusterIP   192.168.0.1       <none>        443/TCP   16d
servicegrid-demo-svc   ClusterIP   192.168.6.139     <none>        80/TCP    10m
    
# step4: access servicegrid-demo-svc(service topology and closed-looped)
# execute on node0
$ curl 192.168.6.139|grep "node name"        node name:      node0
# execute on node1 and node2
$ curl 192.168.6.139|grep "node name" 
       node name:      node2
$ curl 192.168.6.139|grep "node name"
       node name:      node1

在创建完 ServiceGrid CR 后,ServiceGrid Controller 负责根据 ServiceGrid产生对应的 service (包含由 serviceGrid.Spec.GridUniqKey 构成的 topologyKeys annotations);而 application-grid-wrapper 根据 service 实现拓扑感知,下面依次分析。

ServiceGrid Controller 分析

ServiceGrid Controller 逻辑和 DeploymentGrid Controller 整体一致,如下:

  • 1、创建并维护 service group 需要的若干 CRDs(包括:ServiceGrid)
  • 2、监听 ServiceGrid event,并填充 ServiceGrid 到工作队列中;循环从队列中取出 ServiceGrid 进行解析,创建并且维护对应的 service
  • 3、监听 service event,并将相关的  ServiceGrid 塞到工作队列中进行上述处理,协助上述逻辑达到整体 reconcile 逻辑

注意这里区别于 DeploymentGrid Controller:

  • 一个 ServiceGrid 对象只产生一个 service
  • 只需额外监听 service event,无需监听 node 事件。因为 node 的CRUD与 ServiceGrid 无关
  • ServiceGrid 对应产生的 service,命名为:{ServiceGrid}-svc
func (sgc *ServiceGridController) syncServiceGrid(key string) error {    
    startTime := time.Now()    
    klog.V(4).Infof("Started syncing service grid %q (%v)", key, startTime)    
    defer func() {        
      klog.V(4).Infof("Finished syncing service grid %q (%v)", key, time.Since(startTime))    
    }()    
    
    namespace, name, err := cache.SplitMetaNamespaceKey(key)    
    if err != nil {        
      return err    
    }    
    
    sg, err := sgc.svcGridLister.ServiceGrids(namespace).Get(name)    
    if errors.IsNotFound(err) {              
      klog.V(2).Infof("service grid %v has been deleted", key)        
        return nil    
    }    
    if err != nil {        
        return err    
    }    
    
    if sg.Spec.GridUniqKey == "" {           
    sgc.eventRecorder.Eventf(sg, corev1.EventTypeWarning, "Empty""This service grid has an empty grid key")        
        return nil    
    }    
    
    // get service workload list of this grid    
    svcList, err := sgc.getServiceForGrid(sg)    
    if err != nil {        
        return err    
    }    
    
    if sg.DeletionTimestamp != nil {        
      return nil    
    }    
    
    // sync service grid relevant services workload    
    return sgc.reconcile(sg, svcList)
    }
    
func (sgc *ServiceGridController) getServiceForGrid(sg *crdv1.ServiceGrid) ([]*corev1.Service, error) {    
  svcList, err := sgc.svcLister.Services(sg.Namespace).List(labels.Everything())    
  if err != nil {        
    return nil, err    
  }    
  
  labelSelector, err := common.GetDefaultSelector(sg.Name)    
  if err != nil {        
     return nil, err    
  }   
     
  canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error)
  {        
       fresh, err := 
  sgc.crdClient.SuperedgeV1().ServiceGrids(sg.Namespace).Get(context.TODO(), sg.Name, metav1.GetOptions{})        
      if err != nil {            
        return nil, err        
      }        
      if fresh.UID != sg.UID {           
       return nil, fmt.Errorf("orignal service grid %v/%v is gone: got uid %v, wanted %v", sg.Namespace,                  
     sg.Name, fresh.UID, sg.UID)        
      }        
      return fresh, nil    
    })    
      
    cm := controller.NewServiceControllerRefManager(sgc.svcClient, sg, labelSelector, util.ControllerKind, canAdoptFunc)    
    return cm.ClaimService(svcList)
}
    
func (sgc *ServiceGridController) reconcile(g *crdv1.ServiceGrid, svcList 
[]*corev1.Service) error {    
    var (        
        adds    []*corev1.Service     
        updates []*corev1.Service     
        deletes []*corev1.Service    
    )    
    
    sgTargetSvcName := util.GetServiceName(g)    
    isExistingSvc := false    
    for _, svc := range svcList {     
        if svc.Name == sgTargetSvcName {            
            isExistingSvc = true     
            template := util.KeepConsistence(g, svc)            
            if !apiequality.Semantic.DeepEqual(template, svc) {           
                updates = append(updates, template)            
            }        
        } else {            
            deletes = append(deletes, svc)        
        }    
    }    
    
    if !isExistingSvc {        
        adds = append(adds, util.CreateService(g))    
    }    
    
    return sgc.syncService(adds, updates, deletes)
}

func CreateService(sg *crdv1.ServiceGrid) *corev1.Service {    
    svc := &corev1.Service{        
        ObjectMeta: metav1.ObjectMeta{            
          Name:      GetServiceName(sg),            
          Namespace: sg.Namespace,   
          // Append existed ServiceGrid labels to service to be created  
          Labels: func() map[string]string {                
              if sg.Labels != nil { 
                  newLabels := sg.Labels                    
                  newLabels[common.GridSelectorName] = sg.Name             
                  newLabels[common.GridSelectorUniqKeyName] = sg.Spec.GridUniqKey                    
                  return newLabels              
       } else {               
                  return map[string]string{                        
                      common.GridSelectorName:        sg.Name,             
                      common.GridSelectorUniqKeyName: sg.Spec.GridUniqKey, 
              }               
          }            
      }(),            
      Annotations: make(map[string]string),       
    },        
    Spec: sg.Spec.Template,   
  }    
  
  keys := make([]string, 1)    
  keys[0] = sg.Spec.GridUniqKey    
  keyData, _ := json.Marshal(keys)     
  svc.Annotations[common.TopologyAnnotationsKey] = string(keyData)    
  
  return svc
}

由于逻辑与 DeploymentGrid 类似,这里不展开细节,重点关注 application-grid-wrapper 部分。

application-grid-wrapper 分析

在 ServiceGrid Controller 创建完 service 之后,application-grid-wrapper 的作用就开始启动了:

apiVersion: v1
kind: Service
metadata:  
  annotations:    
    topologyKeys: '["zone1"]'  
  creationTimestamp: "2021-03-03T07:33:30Z"  
  labels:    
    superedge.io/grid-selector: servicegrid-demo  
    name: servicegrid-demo-svc  
    namespace: default  
    ownerReferences:  
    - apiVersion: superedge.io/v1    
      blockOwnerDeletion: true    
      controller: true    
      kind: ServiceGrid    
      name: servicegrid-demo    
      uid: 78c74d3c-72ac-4e68-8c79-f1396af5a581  
    resourceVersion: "127987090"  
    selfLink: /api/v1/namespaces/default/services/servicegrid-demo-svc  
    uid: 8130ba7b-c27e-4c3a-8ceb-4f6dd0178dfc
spec:  
    clusterIP: 192.168.161.1  
    ports:  
    - port: 80    
    protocol: TCP    
    targetPort: 8080  
  selector:    
    appGrid: echo  
  sessionAffinity: None  
  type: ClusterIP
status:  
  loadBalancer: {}

为了实现 Kubernetes 零侵入,需要在 kube-proxy 与 apiserver 通信之间添加一层 wrapper,架构如下:

调用链路如下:

kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver

因此 application-grid-wrapper 会起服务,接受来自 kube-proxy 的请求,如下:

func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error {    
    ...    
    klog.Infof("Start to run interceptor server")    
    /* filter     
    */    
    server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)}    
    
    if insecure {        
        return server.ListenAndServe()    
    }    
    ...    
    server.TLSConfig = tlsConfig    
    return server.ListenAndServeTLS("""")
}

func (s *interceptorServer) buildFilterChains(debug bool) http.Handler {   
    handler := http.Handler(http.NewServeMux())    
    
    handler = s.interceptEndpointsRequest(handler)    
    handler = s.interceptServiceRequest(handler)    
    handler = s.interceptEventRequest(handler)    
    handler = s.interceptNodeRequest(handler)    
    handler = s.logger(handler)    
    
  if debug {        
      handler = s.debugger(handler)   
  }    
  
  return handler
}
这里会首先创建 interceptorServer,然后注册处理函数,由外到内依次如下:
  • debug:接受 debug 请求,返回 wrapper pprof 运行信息

  • logger:打印请求日志

  • node:接受 kube-proxy node GET(/api/v1/nodes/{node})请求,并返回 node信息

  • event:接受 kube-proxy events POST(/events)请求,并将请求转发给 lite-apiserver

func (s *interceptorServer) interceptEventRequest(handler http.Handler) http.Handler {  
   return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {         if r.Method != http.MethodPost || !strings.HasSuffix(r.URL.Path, "/events") {          
         handler.ServeHTTP(w, r)     
         return      
     }      
     
     targetURL, _ := url.Parse(s.restConfig.Host)      
     reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)      
     reverseProxy.Transport, _ = rest.TransportFor(s.restConfig)      
     reverseProxy.ServeHTTP(w, r) 
    })
  }
  • service:接受 kube-proxy service List&Watch(/api/v1/services)请求,并根据 storageCache 内容返回(GetServices)

  • endpoint:接受 kube-proxy endpoint List&Watch (/api/v1/endpoints)请求,并根据 storageCache 内容返回(GetEndpoints)

下面先重点分析 cache 部分的逻辑,然后再回过头来分析具体的 http handler List&Watch 处理逻辑。

wrapper 为了实现拓扑感知,自己维护了一个 cache,包括:node,service,endpoint。可以看到在 setupInformers 中注册了这三类资源的处理函数:

type storageCache struct {    
    // hostName is the nodeName of node which application-grid-wrapper deploys on    
    hostName         string    
    wrapperInCluster bool    
    
    // mu lock protect the following map structure    
    mu           sync.RWMutex    
    servicesMap  map[types.NamespacedName]*serviceContainer    
    endpointsMap map[types.NamespacedName]*endpointsContainer    
    nodesMap     map[types.NamespacedName]*nodeContainer    
    
    // service watch channel    
    serviceChan chan<- watch.Event   
    // endpoints watch channel    
    endpointsChan chan<- watch.Event
}
...
func NewStorageCache(hostName string, wrapperInCluster bool, serviceNotifier, endpointsNotifier chan watch.Event) *storageCache {    
   msc := &storageCache{        
       hostName:         hostName,   
       wrapperInCluster: wrapperInCluster,        
       servicesMap:      make(map[types.NamespacedName]*serviceContainer), 
       endpointsMap:     make(map[types.NamespacedName]*endpointsContainer),        
       nodesMap:         make(map[types.NamespacedName]*nodeContainer),  
       serviceChan:      serviceNotifier,        
       endpointsChan:    endpointsNotifier,    
    }    
    
    return msc
}
...
func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error {    
   ...    
   if err := s.setupInformers(ctx.Done()); err != nil {        
     return err    
  }    
  
   klog.Infof("Start to run interceptor server")    
   /* filter     
   */    
  server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)}    
  ...    
  return server.ListenAndServeTLS("""")
}

func (s *interceptorServer) setupInformers(stop <-chan struct{}) error {   
    klog.Infof("Start to run service and endpoints informers")    
    noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)    
    if err != nil {        
     klog.Errorf("can't parse proxy label, %v", err)        
     return err    
    }    
    
    noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)    
    if err != nil {        
        klog.Errorf("can't parse headless label, %v", err)        
        return err    
    }    
        
    labelSelector := labels.NewSelector()    
    labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints) 
    
    resyncPeriod := time.Minute * 5   
    client := kubernetes.NewForConfigOrDie(s.restConfig)    
    nodeInformerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)    
    informerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,        
      informers.WithTweakListOptions(func(options *metav1.ListOptions) {          
     options.LabelSelector = labelSelector.String()        
       }))    
         
    nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer()    
    serviceInformer := informerFactory.Core().V1().Services().Informer()    
    endpointsInformer := informerFactory.Core().V1().Endpoints().Informer()   
    
    /*    
    */    
    nodeInformer.AddEventHandlerWithResyncPeriod(s.cache.NodeEventHandler(), resyncPeriod)    
    serviceInformer.AddEventHandlerWithResyncPeriod(s.cache.ServiceEventHandler(), resyncPeriod)    
   
    endpointsInformer.AddEventHandlerWithResyncPeriod(s.cache.EndpointsEventHandler(), resyncPeriod)    
    
    go nodeInformer.Run(stop)    
    go serviceInformer.Run(stop)    
    go endpointsInformer.Run(stop)    
    
    if !cache.WaitForNamedCacheSync("node", stop,        
       nodeInformer.HasSynced,        
       serviceInformer.HasSynced,     
       endpointsInformer.HasSynced) {      
      return fmt.Errorf("can't sync informers")    
    }    
    
    return nil
}

func (sc *storageCache) NodeEventHandler() cache.ResourceEventHandler {   
     return &nodeHandler{cache: sc}
}

func (sc *storageCache) ServiceEventHandler() cache.ResourceEventHandler {    
    return &serviceHandler{cache: sc}

}
func (sc *storageCache) EndpointsEventHandler() cache.ResourceEventHandler {    
    return &endpointsHandler{cache: sc}
}
这里依次分析 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler,如下:

1、NodeEventHandler

NodeEventHandler 负责监听 node 资源相关 event,并将 node 以及 node Labels 添加到 storageCache.nodesMap 中(key为nodeName,value为node以及node labels)。

func (nh *nodeHandler) add(node *v1.Node) {    
    sc := nh.cache    
    
    sc.mu.Lock()    
    
    nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    
    klog.Infof("Adding node %v", nodeKey)    
    sc.nodesMap[nodeKey] = &nodeContainer{        
        node:   node,        
        labels: node.Labels,    
    }    
    // update endpoints    
    changedEps := sc.rebuildEndpointsMap()    
    
    sc.mu.Unlock()    
    
    for _, eps := range changedEps { 
        sc.endpointsChan <- eps    
        }
}

func (nh *nodeHandler) update(node *v1.Node) {    
    sc := nh.cache    
    
    sc.mu.Lock()    
        nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    
    klog.Infof("Updating node %v", nodeKey)    
    nodeContainer, found := sc.nodesMap[nodeKey]    
    if !found {        
        sc.mu.Unlock()        
        klog.Errorf("Updating non-existed node %v", nodeKey)        
        return    
    }    
    
    nodeContainer.node = node    
    // return directly when labels of node stay unchanged    
    if reflect.DeepEqual(node.Labels, nodeContainer.labels) {        
        sc.mu.Unlock()        
        return    
    }    
    nodeContainer.labels = node.Labels    
    
    // update endpoints    
    changedEps := sc.rebuildEndpointsMap()    
    
    sc.mu.Unlock()    
    
    for _, eps := range changedEps {        
     sc.endpointsChan <- eps  
    }
}
...

同时由于 node 的改变会影响 endpoint,因此会调用 rebuildEndpointsMap 刷新 storageCache.endpointsMap。

// rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events
func (sc *storageCache) rebuildEndpointsMap() []watch.Event {   
    evts := make([]watch.Event, 0)   
    for name, endpointsContainer := range sc.endpointsMap {       
        newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)   
        if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {            
            continue        
        }        
        sc.endpointsMap[name].modified = newEps        
        evts = append(evts, watch.Event{            
            Type:   watch.Modified,      
            Object: newEps,     
       })    
   }    
   return evts
}

rebuildEndpointsMap 是 cache 的核心函数,同时也是拓扑感知的算法实现:

// pruneEndpoints filters endpoints using serviceTopology rules combined by services topologyKeys and node labels
func pruneEndpoints(hostName string,
    nodes map[types.NamespacedName]*nodeContainer,    
    services map[types.NamespacedName]*serviceContainer,    
    eps *v1.Endpoints, wrapperInCluster bool) *v1.Endpoints {   
    
    epsKey := types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}  
    
    if wrapperInCluster {       
     eps = genLocalEndpoints(eps) 
    }   
    
    // dangling endpoints    
    svc, ok := services[epsKey] 
    if !ok {        
        klog.V(4).Infof("Dangling endpoints %s, %+#v", eps.Name, eps.Subsets)        
        return eps    
    }   
        
    // normal service    
    if len(svc.keys) == 0 {     
        klog.V(4).Infof("Normal endpoints %s, %+#v", eps.Name, eps.Subsets)    
        return eps   
    }    
    
    // topology endpoints    
    newEps := eps.DeepCopy()    
    for si := range newEps.Subsets { 
        subnet := &newEps.Subsets[si]
        subnet.Addresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.Addresses)        
        subnet.NotReadyAddresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.NotReadyAddresses)    
    }    
    klog.V(4).Infof("Topology endpoints %s: subnets from %+#v to %+#v", eps.Name, eps.Subsets, newEps.Subsets) 
    
    return newEps
}

// filterConcernedAddresses aims to filter out endpoints addresses within the same node unit
func filterConcernedAddresses(topologyKeys []string, hostName string, nodes 
map[types.NamespacedName]*nodeContainer,    
    addresses []v1.EndpointAddress) []v1.EndpointAddress {    
    hostNode, found := nodes[types.NamespacedName{Name: hostName}]    
    if !found {        
        return nil    
    }    
    
    filteredEndpointAddresses := make([]v1.EndpointAddress, 0)    
    for i := range addresses {       
        addr := addresses[i]        
        if nodeName := addr.NodeName; nodeName != nil {            
             epsNode, found := nodes[types.NamespacedName{Name: *nodeName}]           
             if !found {             
                 continue            
             }            
             if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) {                
              filteredEndpointAddresses = append(filteredEndpointAddresses, addr)            
             }        
        }    
    }    
    
    return filteredEndpointAddresses 
}

func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool { 
    if n1 == nil || n2 == nil {      
        return false    
    }    
    
    for _, key := range keys {       
        val1, v1found := n1[key]     
        val2, v2found := n2[key]    
        
        if v1found && v2found && val1 == val2 {            
        return true        
        }    
    }    
    
    return false   
}

算法逻辑如下:

  • 判断 endpoint 是否为 default kubernetes service,如果是,则将该 endpoint 转化为 wrapper 所在边缘节点的 lite-apiserver 地址(127.0.0.1)和端口(51003)。
apiVersion: v1
kind: Endpoints
metadata:  
  annotations:    
    superedge.io/local-endpoint: 127.0.0.1    
    superedge.io/local-port: "51003" 
  name: kubernetes  
  namespace: default
subsets:
- addresses: 
  - ip: 172.31.0.60  
  ports:  
  - name: https    
  port: xxx    
  protocol: TCP
func genLocalEndpoints(eps *v1.Endpoints) *v1.Endpoints {    
    if eps.Namespace != metav1.NamespaceDefault || eps.Name != MasterEndpointName {        
        return eps    
    }    
    
    klog.V(4).Infof("begin to gen local ep %v", eps)    
    ipAddress, e := eps.Annotations[EdgeLocalEndpoint]    
    if !e {        
        return eps    
    }    
        
    portStr, e := eps.Annotations[EdgeLocalPort]    
    if !e {        
        return eps    
    }    
    
    klog.V(4).Infof("get local endpoint %s:%s", ipAddress, portStr)    
    port, err := strconv.ParseInt(portStr, 10, 32)    
    if err != nil {        
        klog.Errorf("parse int %s err %v", portStr, err)        
        return eps    
    }  
    
    ip := net.ParseIP(ipAddress)   
    if ip == nil {        
        klog.Warningf("parse ip %s nil", ipAddress)        
        return eps    
    }    
    
    nep := eps.DeepCopy()    
    nep.Subsets = []v1.EndpointSubset{        
       {            
           Addresses: []v1.EndpointAddress{                
              {                    
                   IP: ipAddress,   
                },            
            },            
            Ports: []v1.EndpointPort{                
                 {                    
                     Protocol: v1.ProtocolTCP,                    
                     Port:     int32(port),                    
                    Name:     "https",                
                },         
             },      
         },   
    }  
      
    klog.V(4).Infof("gen new endpoint complete %v", nep)      
    return nep
}
这样做的目的是使边缘节点上的服务采用集群内 (InCluster) 方式访问的 apiserver 为本地的 lite-apiserver,而不是云端的 apiserver。
  • 从 storageCache.servicesMap cache 中根据 endpoint 名称(namespace/name) 取出对应 service,如果该 service 没有 topologyKeys 则无需做拓扑转化(非 service group)。

func getTopologyKeys(objectMeta *metav1.ObjectMeta) []string {   
    if !hasTopologyKey(objectMeta) { 
        return nil    
    }   
    
    var keys []string   
    keyData := objectMeta.Annotations[TopologyAnnotationsKey]    
    if err := json.Unmarshal([]byte(keyData), &keys); err != nil {        
        klog.Errorf("can't parse topology keys %s, %v", keyData, err)       
        return nil    
    }    
    
    return keys
}
  • 调用 filterConcernedAddresses 过滤 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一个 service topologyKeys 中的 endpoint。

// filterConcernedAddresses aims to filter out endpoints addresses within the same node unit
func filterConcernedAddresses(topologyKeys []string, hostName string, nodes 
map[types.NamespacedName]*nodeContainer,    
    addresses []v1.EndpointAddress) []v1.EndpointAddress {    
    hostNode, found := nodes[types.NamespacedName{Name: hostName}]    
    if !found {        
        return nil   
    }
        
    filteredEndpointAddresses := make([]v1.EndpointAddress, 0)    
    for i := range addresses {       
        addr := addresses[i]        
        if nodeName := addr.NodeName; nodeName != nil {            
         epsNode, found := nodes[types.NamespacedName{Name: *nodeName}]   
         if !found {                
             continue            
         }           
         if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) {                
             filteredEndpointAddresses = append(filteredEndpointAddresses, addr)            
            }       
        }   
    }    
    
    return filteredEndpointAddresses
}
    func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool {   
      if n1 == nil || n2 == nil {       
      return false    
     }   
    
     for _, key := range keys {       
       val1, v1found := n1[key]       
       val2, v2found := n2[key]       
    
      if v1found && v2found && val1 == val2 {           
          return true        
       }    
   }    
  
   return false
}
注意:如果 wrapper 所在边缘节点没有 service topologyKeys 标签,则也无法访问该 service。
回到 rebuildEndpointsMap,在调用 pruneEndpoints 刷新了同一个拓扑域内的 endpoint 后,会将修改后的 endpoints 赋值给 storageCache .endpointsMap [endpoint]. modified (该字段记录了拓扑感知后修改的endpoints)。
func (nh *nodeHandler) add(node *v1.Node) {    
    sc := nh.cache   
    
    sc.mu.Lock()    
    
    nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    
    klog.Infof("Adding node %v", nodeKey)    
    sc.nodesMap[nodeKey] = &nodeContainer{        
        node:   node,        
        labels: node.Labels,    
    }    
    // update endpoints    
    changedEps := sc.rebuildEndpointsMap()    
    
    sc.mu.Unlock()    
    
    for _, eps := range changedEps { 
        sc.endpointsChan <- eps    
    }
}

// rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events
func (sc *storageCache) rebuildEndpointsMap() []watch.Event {    
    evts := make([]watch.Event, 0)  
    for name, endpointsContainer := range sc.endpointsMap {        
        newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)        
        if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {            
            continue        
        }        
        sc.endpointsMap[name].modified = newEps        
        evts = append(evts, watch.Event{            
            Type:   watch.Modified,   
            Object: newEps,        
        })    
    }    
    return evts
}
另外,如果 endpoints (拓扑感知后修改的 endpoints)发生改变,会构建 watch event,传递给 endpoints handler (interceptEndpointsRequest)处理。

2、ServiceEventHandler

storageCache.servicesMap 结构体 key 为 service 名称(namespace/name),value 为 serviceContainer,包含如下数据:

  • svc:service对象
  • keys:service topologyKeys
对于 service 资源的改动,这里用 Update event 说明:
func (sh *serviceHandler) update(service *v1.Service) {    
    sc := sh.cache   
    
    sc.mu.Lock()    
    serviceKey := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}    
    klog.Infof("Updating service %v", serviceKey)    
    newTopologyKeys := getTopologyKeys(&service.ObjectMeta)    
    serviceContainer, found := sc.servicesMap[serviceKey]    
    if !found {        
        sc.mu.Unlock()        
        klog.Errorf("update non-existed service, %v", serviceKey)        
        return    
    }   
   
    sc.serviceChan <- watch.Event{   
        Type:   watch.Modified,       
        Object: service,    
    }    
    
    serviceContainer.svc = service   
    // return directly when topologyKeys of service stay unchanged    
    if reflect.DeepEqual(serviceContainer.keys, newTopologyKeys) {        
        sc.mu.Unlock()        
        return    
    }    
    
    serviceContainer.keys = newTopologyKeys    
    
    // update endpoints    
    changedEps := sc.rebuildEndpointsMap()    
    sc.mu.Unlock()    
    
    for _, eps := range changedEps { 
        sc.endpointsChan <- eps    
    }
}
逻辑如下:
  • 获取 service topologyKeys
  • 构建 service event.Modified event
  • 比较 service topologyKeys 与已经存在的是否有差异
  • 如果有差异则更新 topologyKeys,且调用 rebuildEndpointsMap 刷新该 service 对应的endpoints,如果 endpoints 发生变化,则构建 endpoints watch event,传递给 endpoints handler (interceptEndpointsRequest)处理。

3、EndpointsEventHandler

storageCache.endpointsMap 结构体 key 为 endpoints 名称(namespace/name),value 为 endpointsContainer,包含如下数据:

  • endpoints:拓扑修改前的 endpoints
  • modified:拓扑修改后的 endpoints
对于 endpoints 资源的改动,这里用 Update event 说明:
func (eh *endpointsHandler) update(endpoints *v1.Endpoints) {    
    sc := eh.cache   
     
    sc.mu.Lock()   
    endpointsKey := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}    
    klog.Infof("Updating endpoints %v", endpointsKey)    
    
    endpointsContainer, found := sc.endpointsMap[endpointsKey]    
    if !found {        
        sc.mu.Unlock()       
        klog.Errorf("Updating non-existed endpoints %v", endpointsKey)    
        return    
    }    
    endpointsContainer.endpoints = endpoints    
    newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpoints, sc.wrapperInCluster)    
    changed := !apiequality.Semantic.DeepEqual(endpointsContainer.modified, newEps)    
    if changed {        
        endpointsContainer.modified = newEps   
    }    
    sc.mu.Unlock()    
    
    if changed {       
        sc.endpointsChan <- watch.Event{            
            Type:   watch.Modified,   
            Object: newEps,       
        }   
    }
}
逻辑如下:
  • 更新 endpointsContainer.endpoint 为新的 endpoints 对象
  • 调用 pruneEndpoints 获取拓扑刷新后的 endpoints
  • 比较 endpointsContainer.modified 与新刷新后的 endpoints
  • 如果有差异则更新 endpointsContainer.modified,则构建 endpoints watch event,传递给 endpoints handler (interceptEndpointsRequest)处理。
在分析完 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler 之后,我们回到具体的 http handler List&Watch 处理逻辑上,这里以 endpoints 为例:
func (s *interceptorServer) interceptEndpointsRequest(handler http.Handler) http.Handler {    
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 
        if r.Method != http.MethodGet || 
!strings.HasPrefix(r.URL.Path, "/api/v1/endpoints") {            
            handler.ServeHTTP(w, r)   
            return       
        }        
        
        queries := r.URL.Query()     
        acceptType := r.Header.Get("Accept")        
        info, found := s.parseAccept(acceptType, s.mediaSerializer)       
        if !found {            
            klog.Errorf("can't find %s serializer", acceptType)           
            w.WriteHeader(http.StatusBadRequest)            
            return        
        }        
        
        encoder := 
scheme.Codecs.EncoderForVersion(info.Serializer, v1.SchemeGroupVersion)   
        // list request        
        if queries.Get("watch") == "" {           
            w.Header().Set("Content-Type", info.MediaType)            
            allEndpoints := s.cache.GetEndpoints()           
            epsItems := make([]v1.Endpoints, 0, len(allEndpoints))        
            for _, eps := range allEndpoints {                
                epsItems = append(epsItems, *eps)            
            }            
            
            epsList := &v1.EndpointsList{                
                Items: epsItems,     
             }           
             
          err := encoder.Encode(epsList, w)            
          if err != nil {             
              klog.Errorf("can't marshal endpoints list, %v", err)        
w.WriteHeader(http.StatusInternalServerError)               
              return            
          }    
            
              return       
      }       
         
      // watch request       
      timeoutSecondsStr := r.URL.Query().Get("timeoutSeconds")   
      timeout := time.Minute        
      if timeoutSecondsStr != "" {  
          timeout, _ = time.ParseDuration(fmt.Sprintf("%ss", timeoutSecondsStr))       
      }        
        
      timer := time.NewTimer(timeout)      
     defer timer.Stop()       
      
      flusher, ok := w.(http.Flusher)      
      if !ok {            
          klog.Errorf("unable to start watch - can't get http.Flusher: %#v", w)            
w.WriteHeader(http.StatusMethodNotAllowed)            
          return        
        }    
        
        e := restclientwatch.NewEncoder(           
streaming.NewEncoder(info.StreamSerializer.Framer.NewFrameWriter(w),       
scheme.Codecs.EncoderForVersion(info.StreamSerializer, v1.SchemeGroupVersion)),            
          encoder)        
      if info.MediaType == runtime.ContentTypeProtobuf {         
          w.Header().Set("Content-Type"
    runtime.ContentTypeProtobuf+";stream=watch")       
      } else {           
          w.Header().Set("Content-Type", runtime.ContentTypeJSON)        
      }        
      w.Header().Set("Transfer-Encoding""chunked")        
      w.WriteHeader(http.StatusOK)   
      flusher.Flush()        
      for {            
          select {            
          case <-r.Context().Done(): 
              return            
          case <-timer.C:            
              return           
          case evt := <-s.endpointsWatchCh:               
              klog.V(4).Infof("Send endpoint watch event: %+#v", evt)     
              err := e.Encode(&evt)   
              if err != nil {         
              klog.Errorf("can't encode watch event, %v", err)             
              return                
          }               
          
          if len(s.endpointsWatchCh) == 0 {                    
              flusher.Flush()         
            }            
         }         
      }    
  })
}

逻辑如下:

  • 如果为 List请求,则调用 GetEndpoints 获取拓扑修改后的 endpoints 列表,并返

func (sc *storageCache) GetEndpoints() []*v1.Endpoints {    
    sc.mu.RLock()   
    defer sc.mu.RUnlock()   
    
    epList := make([]*v1.Endpoints, 0, 
len(sc.endpointsMap))    
    for _, v := range sc.endpointsMap {        
        epList = append(epList, v.modified)    
    }    
    return epList
}
  • 如果为 Watch 请求,则不断从 storageCache.endpointsWatchCh 管道中接受 watch event,并返回 interceptServiceRequest 逻辑与 interceptEndpointsRequest 一致,这里不再赘述 。

总结

  • SuperEdge service group 利用 application-grid-wrapper 实现拓扑感知,完成了同一个 nodeunit 内服务的闭环访问
  • service group 实现的拓扑感知和 Kubernetes 社区原生实现对比,有如下区别:
    • service group 拓扑 key 可以自定义,也即为 gridUniqKey,使用起来更加灵活;而社区实现目前只有三种选择:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region"
    • service group 只能填写一个拓扑 key,也即只能访问本拓扑域内有效的 endpoint,无法访问其它拓扑域的 endpoint;而社区可以通过 topologyKey 列表以及"*"实现其它备选拓扑域 endpoint的访问
  • ServiceGrid Controller 负责根据 ServiceGrid 产生对应的 service(包含由serviceGrid.Spec.GridUniqKey 构成的 topologyKeys annotations),逻辑和 DeploymentGrid Controller 整体一致,如下:
    • 创建并维护 service group 需要的若干CRDs(包括:ServiceGrid)
    • 监听 ServiceGrid event,并填充 ServiceGrid到工作队列中;循环从队列中取出 ServiceGrid 进行解析,创建并且维护对应的 service
    • 监听 service event,并将相关的 ServiceGrid 塞到工作队列中进行上述处理,协助上述逻辑达到整体 reconcile 逻辑
  • 为了实现 Kubernetes 零侵入,需要在 kube-proxy 与 apiserver 通信之间添加一层 wrapper,调用链路如下:kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver
  • application-grid-wrapper 是一个 http server,接受来自 kube-proxy 的请求,同时维护一个资源缓存,处理函数由外到内依次如下:
    • debug:接受 debug 请求,返回 wrapper pprof 运行信息
    • logger:打印请求日志
    • node:接受 kube-proxy node GET (/api/v1/nodes/{node}) 请求,并返回 node 信息
    • event:接受 kube-proxy events POST (/events) 请求,并将请求转发给 lite-apiserver
    • service:接受 kube-proxy service List&Watch (/api/v1/services) 请求,并根据 storageCache 内容返回 (GetServices)
    • endpoint:接受 kube-proxy endpoint List&Watch (/api/v1/endpoints) 请求,并根据 storageCache 内容返回(GetEndpoints)
  • wrapper 为了实现拓扑感知,维护了一个资源 cache,包括:node,service,endpoint,同时注册了相关 event 处理函数。核心拓扑算法逻辑为:调用 filterConcernedAddresses 过滤 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一个 service topologyKeys 中的 endpoint。另外,如果 wrapper 所在边缘节点没有 service topologyKeys 标签,则也无法访问该service
  • wrapper 接受来自 kube-proxy 对 endpoints 以及 service 的 List&Watch 请求,以endpoints 为例:如果为List 请求,则调用 GetEndpoints 获取拓扑修改后的 endpoints 列表,并返回;如果为 Watch 请求,则不断从storageCache.endpointsWatchCh 管道中接受 watch event,并返回。service 逻辑与 endpoints 一致

展望

目前 SuperEdge service group 实现的拓扑算法功能更加灵活方便,如何处理与 Kubernetes 社区 service topology awareness 之间的关系值得探索,建议将 SuperEdge 拓扑算法推到社区。


【参考资料】

[1] 拓扑感知特性:【 https://kubernetes.io/docs/concepts/services-networking/service-topology/】

[2] duyanghao kubernetes-reading-notes: 【https://github.com/duyanghao/kubernetes-reading-notes/blob/master/superedge/service-group/README.md】



  往期精选推荐  

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存