查看原文
其他

渣渣菜鸡的 ElasticSearch 源码解析 —— 启动流程(下)

zhisheng zhisheng 2021-09-05


前提

上篇文章写完了 ES 流程启动的一部分,main 方法都入口,以及创建 Elasticsearch 运行的必须环境以及相关配置,接着就是创建该环境的节点了。

Node 的创建

看下新建节点的代码:(代码比较多,这里是比较关键的地方,我就把注释直接写在代码上面了,实在不好拆开这段代码,300 多行代码)

 1public Node(Environment environment) {
 2        this(environment, Collections.emptyList()); //执行下面的代码
 3    }
 4
 5protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
 6    final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
 7    boolean success = false;
 8    {
 9// use temp logger just to say we are starting. we can't use it later on because the node name might not be set
10        Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings()));
11        logger.info("initializing ...");
12    }
13    try {
14        originalSettings = environment.settings();
15        Settings tmpSettings = Settings.builder().put(environment.settings())
16            .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
17
18// create the node environment as soon as possible, to recover the node id and enable logging
19        try {
20            nodeEnvironment = new NodeEnvironment(tmpSettings, environment); //1、创建节点环境,比如节点名称,节点ID,分片信息,存储元,以及分配内存准备给节点使用
21            resourcesToClose.add(nodeEnvironment);
22        } catch (IOException ex) {
23        throw new IllegalStateException("Failed to create node environment", ex);
24        }
25        final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
26        final String nodeId = nodeEnvironment.nodeId();
27        tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
28        final Logger logger = Loggers.getLogger(Node.class, tmpSettings);
29// this must be captured after the node name is possibly added to the settings
30        final String nodeName = NODE_NAME_SETTING.get(tmpSettings);
31        if (hadPredefinedNodeName == false) {
32            logger.info("node name derived from node ID [{}]; set [{}] to override", nodeId, NODE_NAME_SETTING.getKey());
33        } else {
34            logger.info("node name [{}], node ID [{}]", nodeName, nodeId);
35        }
36
37        //2、打印出JVM相关信息
38        final JvmInfo jvmInfo = JvmInfo.jvmInfo();
39        logger.info(
40"version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
41           
47        logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
48        //检查当前版本是不是 pre-release 版本(Snapshot),
49        warnIfPreRelease(Version.CURRENT, Build.CURRENT.isSnapshot(), logger);
50        。。。
51        this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins);   //3、利用PluginsService加载相应的模块和插件
52        this.settings = pluginsService.updatedSettings();
53        localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
54
55// create the environment based on the finalized (processed) view of the settings
56// this is just to makes sure that people get the same settings, no matter where they ask them from
57        this.environment = new Environment(this.settings, environment.configFile());
58        Environment.assertEquivalent(environment, this.environment);
59
60        final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);        //线程池
61
62        final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
63        resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
64        // adds the context to the DeprecationLogger so that it does not need to be injected everywhere
65        DeprecationLogger.setThreadContext(threadPool.getThreadContext());
66        resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
67
68        final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());       //额外配置
69        final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
70        for (final ExecutorBuilder<?> builder : threadPool.builders()) {
71            //4、加载一些额外配置
72            additionalSettings.addAll(builder.getRegisteredSettings());
73        }
74        client = new NodeClient(settings, threadPool);//5、创建一个节点客户端                                                                                  
75
76        //6、缓存一系列模块,如NodeModule,ClusterModule,IndicesModule,ActionModule,GatewayModule,SettingsModule,RepositioriesModule,scriptModule,analysisModule
77        final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
78        final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
79        AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
80        // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool so we might be late here already
81        final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter);
82scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
83       
94        final UsageService usageService = new UsageService(settings);
95
96        ModulesBuilder modules = new ModulesBuilder();
97// plugin modules must be added here, before others or we can get crazy injection errors...
98        for (Module pluginModule : pluginsService.createGuiceModules()) {
99            modules.add(pluginModule);
100        }
101        final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
102        ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
103        modules.add(clusterModule);
104        IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
105        modules.add(indicesModule);
106
107        SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
108        CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
109                                                                                  settingsModule.getClusterSettings());
110        resourcesToClose.add(circuitBreakerService);
111        modules.add(new GatewayModule());
112
113        PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
114        BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
115        resourcesToClose.add(bigArrays);
116        modules.add(settingsModule);
117        
133        modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
134        final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
135        final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
136analysisModule.getAnalysisRegistry(),                                                                clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),client, metaStateService);
137
138        Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
139            .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,namedWriteableRegistry).stream())
140.collect(Collectors.toList());
141
142        ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
143                                                     settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
144        modules.add(actionModule);
145
146        //7、获取RestController,用于处理各种Elasticsearch的rest命令,如_cat,_all,_cat/health,_clusters等rest命令(Elasticsearch称之为action)
147        final RestController restController = actionModule.getRestController();
148                                                       SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
173        final Consumer<Binder> httpBind;
174        final HttpServerTransport httpServerTransport;
175        if (networkModule.isHttpEnabled()) {
176            httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
177            httpBind = b -> {
178b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
179            };
180        } else {
181            httpBind = b -> {
182                b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
183            };
184            httpServerTransport = null;
185        }
186
187        final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService, namedWriteableRegistry,networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),clusterModule.getAllocationService());
188
189        this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,searchTransportService);
190
191        final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),responseCollectorService);
192
193        final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
194            .filterPlugins(PersistentTaskPlugin.class).stream()
195     .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client))
196            .flatMap(List::stream)
197            .collect(toList());
198
199        final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors);
200        final PersistentTasksClusterService persistentTasksClusterService =
201            new PersistentTasksClusterService(settings, registry, clusterService);
202        final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);
203
204//8、绑定处理各种服务的实例,这里是最核心的地方,也是Elasticsearch能处理各种服务的核心.
205        modules.add(b -> {
206            b.bind(Node.class).toInstance(this);
207            b.bind(NodeService.class).toInstance(nodeService);
208            b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
209            
249
250        // TODO hack around circular dependencies problems in AllocationService
251clusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class));
252
253        List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
254            .filter(p -> p instanceof LifecycleComponent)
255            .map(p -> (LifecycleComponent) p).collect(Collectors.toList());
256
257        //9、利用Guice将各种模块以及服务(xxxService)注入到Elasticsearch环境中
258pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()                                     .map(injector::getInstance).collect(Collectors.toList()));
259        resourcesToClose.addAll(pluginLifecycleComponents);
260        this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
261        client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
262
263        if (NetworkModule.HTTP_ENABLED.get(settings)) { //如果elasticsearch.yml文件中配置了http.enabled参数(默认为true),则会初始化RestHandlers
264            logger.debug("initializing HTTP handlers ...");
265            actionModule.initRestHandlers(() -> clusterService.state().nodes()); //初始化RestHandlers, 解析集群命令,如_cat/,_cat/health
266        }
267        //10、初始化工作完成
268        logger.info("initialized");
269
270        success = true;
271    } catch (IOException ex) {
272        throw new ElasticsearchException("failed to bind service", ex);
273    } finally {
274        if (!success) {
275            IOUtils.closeWhileHandlingException(resourcesToClose);
276        }
277    }
278}

上面代码真的很多,这里再说下上面这么多代码主要干了什么吧:(具体是哪行代码执行的如下流程,上面代码中也标记了)

1、创建节点环境,比如节点名称,节点 ID,分片信息,存储元,以及分配内存准备给节点使用

2、打印出 JVM 相关信息

3、利用 PluginsService 加载相应的模块和插件,具体哪些模块可以去 modules 目录下查看

4、加载一些额外的配置参数

5、创建一个节点客户端

6、缓存一系列模块,如NodeModule,ClusterModule,IndicesModule,ActionModule,GatewayModule,SettingsModule,RepositioriesModule,scriptModule,analysisModule

7、获取 RestController,用于处理各种 Elasticsearch 的 rest 命令,如 _cat, _all, _cat/health, _clusters 等 rest命令

8、绑定处理各种服务的实例

9、利用 Guice 将各种模块以及服务(xxxService)注入到 Elasticsearch 环境中

10、初始化工作完成(打印日志)

JarHell 报错解释

前一篇阅读源码环境搭建的文章写过用 JDK 1.8 编译 ES 源码是会遇到如下异常:

1org.elasticsearch.bootstrap.StartupExceptionjava.lang.IllegalStateExceptionjar hell!

这里说下就是 setup 方法中的如下代码导致的

1try {
2    // look for jar hell
3    final Logger logger = ESLoggerFactory.getLogger(JarHell.class);
4    JarHell.checkJarHell(logger::debug);
5catch (IOException | URISyntaxException e) {
6    throw new BootstrapException(e);
7}

所以你如果是用 JDK 1.8 编译的,那么就需要把所有的有这块的代码给注释掉就可以编译成功的。

我自己试过用 JDK 10 编译是没有出现这里报错的。

正式启动 ES 节点

回到上面 Bootstrap 中的静态 init 方法中,接下来就是正式启动 elasticsearch 节点了:

1INSTANCE.start();  //调用下面的 start 方法
2
3private void start() throws NodeValidationException {
4    node.start();                                       //正式启动 Elasticsearch 节点
5    keepAliveThread.start();
6}

接下来看看这个 start 方法里面的 node.start() 方法源码:

 1public Node start() throws NodeValidationException {
 2    if (!lifecycle.moveToStarted()) {
 3        return this;
 4    }
 5
 6    Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
 7    logger.info("starting ...");
 8    pluginLifecycleComponents.forEach(LifecycleComponent::start); 
 9
10    //1、利用Guice获取上述注册的各种模块以及服务
11    //Node 的启动其实就是 node 里每个组件的启动,同样的,分别调用不同的实例的 start 方法来启动这个组件, 如下:
12    injector.getInstance(MappingUpdatedAction.class).setClient(client);
13    injector.getInstance(IndicesService.class).start();
20
21    final ClusterService clusterService = injector.getInstance(ClusterService.class);
22
23    final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
24    nodeConnectionsService.start();
25    clusterService.setNodeConnectionsService(nodeConnectionsService);
26
27    injector.getInstance(ResourceWatcherService.class).start();
28    injector.getInstance(GatewayService.class).start();
29    Discovery discovery = injector.getInstance(Discovery.class);
30    clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
31
32    // Start the transport service now so the publish address will be added to the local disco node in ClusterService
33    TransportService transportService = injector.getInstance(TransportService.class);
34    transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
35    transportService.start();
36    assert localNodeFactory.getNode() != null;
37    assert transportService.getLocalNode().equals(localNodeFactory.getNode())
38        : "transportService has a different local node than the factory provided";
39    final MetaData onDiskMetadata;
40    try {
41        // we load the global state here (the persistent part of the cluster state stored on disk) to
42        // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
43        if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {//根据配置文件看当前节点是master还是data节点
44            onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState();
45        } else {
46            onDiskMetadata = MetaData.EMPTY_META_DATA;
47        }
48        assert onDiskMetadata != null : "metadata is null but shouldn't"// this is never null
49    } catch (IOException e) {
50        throw new UncheckedIOException(e);
51    }
52    validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), transportService.boundAddress(), pluginsService
53        .filterPlugins(Plugin
54        .class)
55        .stream()
56        .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
57
58    //2、将当前节点加入到一个集群簇中去,并启动当前节点
59    clusterService.addStateApplier(transportService.getTaskManager());
60    // start after transport service so the local disco is known
61    discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
62    clusterService.start();
63    assert clusterService.localNode().equals(localNodeFactory.getNode())
64        : "clusterService has a different local node than the factory provided";
65    transportService.acceptIncomingRequests();
66    discovery.startInitialJoin();
67    // tribe nodes don't have a master so we shouldn't register an observer         s
68    final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
69    if (initialStateTimeout.millis() > 0) {
70        final ThreadPool thread = injector.getInstance(ThreadPool.class);
71        ClusterState clusterState = clusterService.state();
72        ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
73        if (clusterState.nodes().getMasterNodeId() == null) {
74           
93            try {
94                latch.await();
95            } catch (InterruptedException e) {
96                throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
97            }
98        }
99    }
100
101
102    if (NetworkModule.HTTP_ENABLED.get(settings)) {
103        injector.getInstance(HttpServerTransport.class).start();
104    }
105
106    if (WRITE_PORTS_FILE_SETTING.get(settings)) {
107        if (NetworkModule.HTTP_ENABLED.get(settings)) {
108            HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
109            writePortsFile("http", http.boundAddress());
110        }
111        TransportService transport = injector.getInstance(TransportService.class);
112        writePortsFile("transport", transport.boundAddress());
113    }
114
115    logger.info("started");
116
117    pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);
118
119    return this;
120}

上面代码主要是:

1、利用 Guice 获取上述注册的各种模块以及服务,然后启动 node 里每个组件(分别调用不同的实例的 start 方法来启动)

2、打印日志(启动节点完成)

总结

这篇文章主要把大概启动流程串通了,讲了下 node 节点的创建和正式启动 ES 节点了。因为篇幅较多所以拆开成两篇,先不扣细节了,后面流程启动文章写完后我们再单一的扣细节。

关注我

zhisheng

转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/08/12/es-code03/

相关文章

1、渣渣菜鸡为什么要看 ElasticSearch 源码?

2、渣渣菜鸡的 ElasticSearch 源码解析 —— 环境搭建

3、Elasticsearch 默认分词器

4、Elasticsearch 可用中分分词器

5、Elasticsearch 自定义分词器

6、全文搜索引擎 Elasticsearch 集群搭建入门教程

7、Elasticsearch 系列文章(三):ElasticSearch 集群监控

8、Elasticsearch 系列文章(四):ElasticSearch 单个节点监控

9、Elasticsearch 系列文章(五):ELK 实时日志分析平台环境搭建

10、教你如何在 IDEA 远程 Debug ElasticSearch

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存