本文是基于OpenStack Mitaka版本的Trove组件对WSGI和RPC调用流程进行分析。而OpenStack其他组件的处理流程也都大同小异,因此了解了trove的WSGI和RPC调用流程,学习OpenStack其他组件就可以举一反三了。
Trove组件中的wsgi配置文件默认为/etc/trove目录下的api-paste.ini文件,在启动trove-api服务时,将加载该文件进行初始化。在该文件中,主要的配置项如下:
composite:定义Trove服务的入口,M版本中主要包含获取版本信息服务入口与troveapi v1.0服务入口。use则指定了根据请求路径获取不同版本app的规则。
filter:定义了Trove API中使用的过滤器。如authorization过滤器主要用来验证tenant_id等。
app:定义了Trove API应用程序。Trove应用程序主要是versions和troveapp,url为/的请求将映射到该应用程序,而troveapp则会作为管道troveapi的应用程序。
pipeline:定义了Trove API应用程序管道,M版本主要是troveapi,包含了多个filter和一个app。url为/v1.0的请求将映射到该管道进行处理。
另外,在OpenStack中有三种wsgi配置方法(config/egg/call),trove-api初始化时加载api-paste.ini文件使用的是config模式,而composite中匹配版本则使用的call模式。
在trove-api处理请求的整个流程中,相关的类与方法主要由以下几个:
1) versioned_urlmap()方法:该方法位于trove/common/wsgi.py中,主要用于将请求的url路径与composite中定义的版本进行匹配,而该方法中则主要返回了用于实现匹配算法的VersionedURLMap类的可调用对象。当trove接收到请求后,会调用该对象的__call__()方法匹配路径从而确定使用哪个app或pipeline。
2) API类:该类的主要作用是添加URL映射,当接收请求时解析路径,路径匹配成功之后,调用相关方法执行相关操作并响应,API继承了wsgi.py中的Router类。
3) Router类:初始化成员变量_router,该成员变量是routes.middleware.RoutesMiddleware类的实例化对象。该对象为可调用对象,在接收到请求时,首先通过mapper对象进行解析,然后将解析结果传递给Router中实现的_dispatch()方法对请求进行分发。
4) Resource类:在API中添加URL映射时,传入的controller参数一般都为Resource类的实例。该类的主要作用主要有两点:首先,实现了对象的序列化与反序列化;然后,在Router对象分发之后,会进一步通过Resource类中的dispatch()方法得到具体的操作方法。
5) Controller类:在API通过mapper解析到对应路径之后,则会调用具体的Controller对象中具体方法处理请求,并返回响应信息。
6) Result类:主要封装了对请求的响应结果,包括状态码、状态信息以及响应消息体等。
启动trove-api服务时,会调用trove/cmd/api.py中的main()方法对trove-api服务进行初始化。在该方法中,首先会通过CONF获取WSGI的配置文件,并获取服务限制的线程数。
conf_file = CONF.find_file(CONF.api_paste_config) workers = CONF.trove_api_workers or processutils.get_worker_count() 获取到配置文件以后,则会调用wsgi.py中的launch()方法通过配置文件初始化WSGI配置。 launcher = wsgi.launch('trove', CONF.bind_port, conf_file, host=CONF.bind_host, workers=workers) launch()方法参数的主要含义如下:app_name为应用程序名称,在读取WSGI配置文件时,首先会找到与该参数对应的composite或app,Trove中为'trove';port为服务的端口号,Trove默认端口号为8779;conf_file为WSGI的配置文件,Trove默认为'api-paste.ini';host为WSGI服务端主机,默认为'0.0.0.0';workers为服务的线程数。 该方法最终会调用paste插件中的loadapp()方法通过读取配置文件中的配置项初始化trove-api服务。 return deploy.loadapp("config:%s" % paste_config_file, name=app_name) 在调用loadapp()方法时,paste插件会根据composite->pipeline->app->fileter的顺序调用配置文件中各插件对应的factory()方法。 def app_factory(global_conf, **local_conf): return API() 这些factory()方法则主要是初始化一些可调用对象。如trove-api中的API类的对象。 class API(wsgi.Router): """Defines the API routes.""" def __init__(self): mapper = routes.Mapper() super(API, self).__init__(mapper) self._instance_router(mapper) self._cluster_router(mapper) self._datastore_router(mapper) self._flavor_router(mapper) self._versions_router(mapper) self._limits_router(mapper) self._backups_router(mapper) self._configurations_router(mapper) self._modules_router(mapper) 初始化这些对象时,则主要是将trove-api支持的所有操作的URL添加到mapper映射中。如将创建、删除数据库实例等操作添加到mapper映射中。 def _instance_router(self, mapper): instance_resource = InstanceController.create_resource() mapper.connect("/{tenant_id}/instances, controller=instance_resource, action="index", conditions={'method': ['GET']}) mapper.connect("/{tenant_id}/instances", controller=instance_resource, action="create", conditions={'method': ['POST']}) mapper为routes中的Mapper对象,在初始化时主要调用mapper对象的connect方法添加URL映射。该方法的参数主要由:第一个参数为请求的URL路径信息;controller为处理请求对应的Controller类,而在Trove中则传入的是Resource类,该类主要包含了具体操作的Controller对象,序列化对象和反序列化对象;action为Controller中进行操作的具体方法名;conditions中则定义了请求的方法GET、POST、PUT、PATCH等。 初始化完成之后,会初始化WSGI对应的Service并启动服务。 server = base_wsgi.Service(app, port, host=host, backlog=backlog, threads=threads) return service.launch(CONF, server, workers)Trove在M版本中使用OpenStack提供的oslo-messaging依赖包进行RPC调用,在oslo-messaging中与RPC调用相关的类和方法主要由以下几个:
1) Target类:该类主要用于描述客户端发送的消息去哪儿和服务端接收什么消息。其主要成员变量有:
exchange:将接收到的消息分类,并告知消息分发到何种路由何种队列。
topic:是RPC消息的唯一标识,客户端发送topic的消息,服务端则接收处理对应topic的消息。
namespace:服务端可以在一个topic上提供多个方法集合,这个集合通过namespace分开管理。
fanout:如果为True,则将消息发送到所有满足条件的server上,此时会忽略topic指定的内容。
server:服务端标识。
version:标识rpc api的版本。
2) Transport类:实现监听和发送消息的抽象层,具体实现则是由Transport成员变量_driver来定义的。
3) RabbitDriver类:具体实现消息的监听,发送等操作,在OpenStack Trove组件中使用的是RabbitMQ消息队列,因此在调用时会匹配到RabbitDriver类。
4) MassageHandlingServer类:监听消息的服务端。将一个Transport和一个PRCDispatcher联系起来,用于分发和处理消息。
5) RPCDispatcher类:定义了具体的消息分发机制。
6) RPCClient类:消息客户端,利用一个可调用_CallContext对象发送消息。
在OpenStack使用RPC传输和处理消息时,首先会根据消息接收端的Target和endpoints初始化一个RPCDispatcher,然后会根据具体的Transport和定义的RPCDispatcher创建一个MessageHandlingServer接收并处理消息。
当实例化一个RPCClient之后,客户端发送cast或call消息,最终会调用Transport中定义的RabbitDriver对应的send()方法发送消息。
当Server端接收到消息后,则会调用RPCDispatcher对象中的dispatch()方法定义的规则找到具体的Manager处理消息中对应的操作。
如果是call调用,则会监听处理方法时候执行完成,当执行完成之后则返回相应的值。
当启动trove-taskmanager服务时,Trove会调用trove/cmd/taskmanager.py中的main()方法完成服务的初始化操作并启动服务。
@with_initialize(extra_opts=extra_opts) def main(conf): startup(conf, conf.taskmanager_queue) 在调用main()方法之前,会首先调用@with_initialize装饰器,在其中会调用rpc.init()方法初始化RPC调用。而init()方法中主要是获取了一个服务端的Transport,该Transport对象中设置了实际处理消息的驱动Driver,Trove默认使用RabbitMQ进行消息传输,所以初始化时设置的为RabbitDriver。 TRANSPORT = messaging.get_transport(conf, allowed_remote_exmods=exmods, aliases=TRANSPORT_ALIASES) 得到Transport之后,则会进入main()方法,该方法中则调用了startup()方法进行具体的操作。在startup()方法中,首先初始化了一个用于接收消息的服务端RPCService。 server = rpc_service.RpcService( manager=conf.taskmanager_manager, topic=topic, rpc_api_version=rpc_version.RPC_API_VERSION) 传入的参数主要有:manager指定了处理具体操作的Manager类,在trove-taskmanager中为trove.taskmanager.manager.Manager;topic是服务端接收消息的唯一标识,相当于RabbitMQ中的binding key,exchange也是根据topic来进行消息分发的,taskmanager中默认使用的topic为taskmanager;rpc_api_version则表示了rpc api的版本,主要在升级时使用,M版本的Trove使用的是1.0版本的RPC调用。 初始化Service完成之后,则会调用Service的launch()方法启动服务,启动服务时,会调用RPCService中的start()方法启动trove-taskmanager的RPC服务。在start()方法中,最终会调用oslo-messagine的get_rpc_server()方法获取一个用于接收taskmanager消息的服务。 def get_rpc_server(transport, target, endpoints, executor='blocking', serializer=None): dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer) return msg_server.MessageHandlingServer(transport, dispatcher, executor) 该方法主要的入参有:transport用于获取处理RabbitMQ消息的RabbitDriver驱动;target用于指定该服务接收什么消息;endpoints则指定了处理消息的RPCDispatcher;然后根据transport、dispatcher和executor创建并返回一个用于接收消息的MessageHandlingServer。创建完成之后,会调用该Server的start()方法启动该Server。一般地,Trove首先会调用taskmanager的api中的方法发送rpc消息。所以,在调用taskmanager的API之前,会先调用其初始化方法。
class API(object): def __init__(self, context): self.context = context super(API, self).__init__() version_cap = self.VERSION_ALIASES.get( CONF.upgrade_levels.taskmanager, CONF.upgrade_levels.taskmanager) target = messaging.Target(topic=CONF.taskmanager_queue, version=version_cap) self.client = self.get_client(target, version_cap) 在调用__init__()方法时,首先创建了一个target,该target指定了消息将发送到哪儿。其中,topic相当于RabbitMQ中的routing key,需要与Server中的topic匹配,不一定需要完全匹配,在trove-taskmanager中,API中的topic与Server中的topic都为taskmanager。得到target之后,会根据该target的对应的版本创建一个RPC客户端。最终会调用trove/rpc.py中的get_client()方法得到一个RPCClient对象。
初始化API得到对应的对象之后,就可以调用具体方法发送消息。这些方法最终都会调用API中的_cast()或_call()方法发送消息。在trove-taskmanager中通常只需要调用_cast()方法。
def _cast(self, method_name, version, **kwargs): LOG.debug("Casting %s" % method_name) with NotificationCastWrapper(self.context, 'taskmanager'): cctxt = self.client.prepare(version=version) cctxt.cast(self.context, method_name, **kwargs) 该方法中,首先调用RPCClient中的prepare()方法获取一个_CallContext对象,然后调用该对象的cast()方法发送消息。该方法的参数主要包含了context表示上下文信息;method_name则表示了当接收到消息后需要调用的处理方法;kwargs中则包含了调用处理方法时所需的参数等。