Fabain4 IT路上的潜行者

Heat Stack Create 代码分析


Heat源码分析

本文主要以一个Stack创建过程分析,具体过程通过如下代码跟进。

Heat Stack创建流程图

Heat Stack代码分析

Heat Engine 入口文件heat/engine/service.py下的create_stack

  def create_stack(self, cnxt, stack_name, template, params, files, args,
                     owner_id=None):

        # 这里的cnxt包含租户信息,类似于其它模块的contex
        # 这里的stack_name为创建时输入的名字
        # 这里的template是整个创建时-f 模板文件的内容
        # params, files, 如果没有传递为空
        # args为'disable_rollback': True
        LOG.info(_('Creating stack %s') % stack_name)

        # 这里的def _stack_create先抛开,后面单独展开

        # 这里属于重点内容
        stack = self._parse_template_and_validate_stack(cnxt,
                                                        stack_name,
                                                        template,
                                                        params,
                                                        files,
                                                        args,
                                                        owner_id)

        # 这里调用stack存储进数据库
        stack.store()

        self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
                                              _stack_create, stack)

        return dict(stack.identifier())

下面转到同意模块下的_parse_template_and_validate_stack

 def _parse_template_and_validate_stack(self, cnxt, stack_name, template,
                                           params, files, args, owner_id=None):
        
        # 这里进行模板版本匹配,如果版本不在支持的范围内,会出现错误
        tmpl = templatem.Template(template, files=files)
        # 这里调用进行资源类型判定与租户允许的stack数及per_stack判定
        self._validate_new_stack(cnxt, stack_name, tmpl)

        # If it is stack-adopt, use parameters from adopt_stack_data
        common_params = api.extract_args(args)

        if rpc_api.PARAM_ADOPT_STACK_DATA in common_params:
            params[rpc_api.STACK_PARAMETERS] = common_params[
                rpc_api.PARAM_ADOPT_STACK_DATA]['environment'][
                    rpc_api.STACK_PARAMETERS]

        # 加载环境资源,这里为空
        env = environment.Environment(params)
        stack = parser.Stack(cnxt, stack_name, tmpl, env,
                             owner_id=owner_id,
                             **common_params)

        self._validate_deferred_auth_context(cnxt, stack)
        # 这里
        stack.validate()
        return stack

展开由_parse_template_and_validate_stack调用的heat/engine/template.py中的Template

class Template(collections.Mapping):
    '''A stack template.'''

    def __new__(cls, template, *args, **kwargs):
        '''Create a new Template of the appropriate class.'''

        if cls != Template:
            TemplateClass = cls
        else:
            
            # 这里调用get_template_class去调用模板的版本是否合法,以及获取版本
            TemplateClass = get_template_class(template)

        return super(Template, cls).__new__(TemplateClass)

    def __init__(self, template, template_id=None, files=None):
        '''
        Initialise the template with a JSON object and a set of Parameters
        '''
        self.id = template_id
        self.t = template
        self.files = files or {}
        self.maps = self[self.MAPPINGS]
        self.version = get_version(self.t, _template_classes.keys())

展开同一个模块下的get_template_class

def get_template_class(template_data):
    global _template_classes

    if _template_classes is None:
        mgr = _get_template_extension_manager()
       
        # 这里的mgr.names包含了支持的版本名字如['AWSTemplateFormatVersion.2010-09-09']
        _template_classes = dict((tuple(name.split('.')), mgr[name].plugin)
                                 for name in mgr.names())

    available_versions = _template_classes.keys()
    version = get_version(template_data, available_versions)
    version_type = version[0]
    try:
        
        # 去判断了版本是否合法
        return _template_classes[version]
    except KeyError:
        ..............
        raise exception.InvalidTemplateVersion(explanation=explanation)

这里展开由_parse_template_and_validate_stack调用的heat/engine/service.py中的_validate_new_stack

def _validate_new_stack(self, cnxt, stack_name, parsed_template):
        try:
           
            # 这里调用进行资源类型判定
            parsed_template.validate()
        except Exception as ex:
            raise exception.StackValidationFailed(message=six.text_type(ex))

        # 判定stack_name,如果存在就会报错
        if db_api.stack_get_by_name(cnxt, stack_name):
            raise exception.StackExists(stack_name=stack_name)

        # 获取租户允许active的最大值,默认100
        tenant_limit = cfg.CONF.max_stacks_per_tenant
        if db_api.stack_count_all(cnxt) >= tenant_limit:
            message = _("You have reached the maximum stacks per tenant, %d."
                        " Please delete some stacks.") % tenant_limit
            raise exception.RequestLimitExceeded(message=message)

        # 这里的per_stack默认值是1000
        num_resources = len(parsed_template[parsed_template.RESOURCES])
        if num_resources > cfg.CONF.max_resources_per_stack:
            message = exception.StackResourceLimitExceeded.msg_fmt
            raise exception.RequestLimitExceeded(message=message)

展开由_validate_new_stack调用的heat/engine/template.py下的validate

def validate(self):

        # self.t.keys为[u'heat_template_version', u'description', u'resources']
        for k in self.t.keys():
            # 这里判断k的值是否在self.SECTIONS里面
            # self.SECTIONS的值为('heat_template_version', 'description', 
            # 'parameter_groups', 'parameters', 'resources', 'outputs', '__undefined__')
            if k not in self.SECTIONS:
                raise exception.InvalidTemplateSection(section=k)

        # check resources
        # 这里获取各类资源类型
        for res in self[self.RESOURCES].values():
            try:
                # 对资源类型进行判定
                if not res.get('Type'):
                    message = _('Every Resource object must '
                                'contain a Type member.')
                    raise exception.StackValidationFailed(message=message)
            except AttributeError:
                type_res = type(res)
                if isinstance(res, unicode):
                    type_res = "string"
                message = _('Resources must contain Resource. '
                            'Found a [%s] instead') % type_res
                raise exception.StackValidationFailed(message=message)

展开由_parse_template_and_validate_stack调用的heat/engine/stack.py 下的Stack

class Stack(collections.Mapping):

    def __init__(self, context, stack_name, tmpl, env=None,
                 stack_id=None, action=None, status=None,
                 status_reason='', timeout_mins=None, resolve_data=True,
                 disable_rollback=True, parent_resource=None, owner_id=None,
                 adopt_stack_data=None, stack_user_project_id=None,
                 created_time=None, updated_time=None,
                 user_creds_id=None, tenant_id=None,
                 use_stored_context=False, username=None):

        def _validate_stack_name(name):
            if not re.match("[a-zA-Z][a-zA-Z0-9_.-]*$", name):
                message = _('Invalid stack name %s must contain '
                            'only alphanumeric or \"_-.\" characters, '
                            'must start with alpha') % name
                raise exception.StackValidationFailed(message=message)

        if owner_id is None:
            _validate_stack_name(stack_name)

        # self.xxxx 初始化了一堆数据

        if use_stored_context:
            self.context = self.stored_context()

        # 这里的self.clients是一个类,里面包含了heat支持的客户端
        # ['_clients', 'auth_token', 'ceilometer', 'cinder', 'client', 'client_plugin', 
        # 'context', 'glance', 'heat', 'keystone', 'neutron', 'nova', 'swift', 'trove', 'url_for']
        self.clients = self.context.clients

        # This will use the provided tenant ID when loading the stack
        # from the DB or get it from the context for new stacks.
        self.tenant_id = tenant_id or self.context.tenant_id
        self.username = username or self.context.username

        # 这里进行了heat支持的客户端初始化与env初始化
        resources.initialise()

        self.env = env or environment.Environment({})
        self.parameters = self.t.parameters(self.identifier(),
                                            user_params=self.env.params)
        self._set_param_stackid()

        if resolve_data:
            self.outputs = self.resolve_static_data(self.t[self.t.OUTPUTS])
        else:
            self.outputs = {}

这里展开由Stack初始化调用的heat/engine/resources/init.py下的initialise

def initialise():
    global _environment
    if _environment is not None:
        return

    # 初始化客户端,调用heat/engine/clients/__init__.py中的initialise
    # 进行extension.ExtensionManager加载
    clients.initialise()

    # 这里初始化env
    global_env = environment.Environment({}, user_env=False)
    _load_global_environment(global_env)
    _environment = global_env

展载载开由_parse_template_and_validate_stack调用的heat/engine/stack.py中validate

def validate(self):
        ...........
        # 这里是重点self.dependencies的值为Dependencies([(<heat.engine.resources.server.Server
        #  object at 0x515f850>, <heat.engine.resources.volume.CinderVolume object at 0x5096a90>)])
        for res in self.dependencies:
            try:
                # ....
                result = res.validate()
            except exception.HeatException as ex:
                LOG.info(ex)
                raise ex
            except Exception as ex:
                LOG.exception(ex)
                raise StackValidationFailed(message=encodeutils.safe_decode(
                                            six.text_type(ex)))
            if result:
                raise StackValidationFailed(message=result)

            for val in self.outputs.values():
                snippet = val.get('Value', '')
                try:
                    function.validate(snippet)
                except Exception as ex:
                    reason = 'Output validation error: %s' % six.text_type(ex)
                    raise StackValidationFailed(message=reason)

这里展开由validate调用的heat/engine/resource.py下的validate

 def validate(self):
        LOG.info(_('Validating %s') % str(self))

        # self.t的值为模板中的资源ResourceDefinition {'Type': u'OS::Cinder::Volume',
        # 'Properties': {u'image': u'cirros', u'size': 1}}
        function.validate(self.t)
        self.validate_deletion_policy(self.t.deletion_policy())
        return self.properties.validate()

展开由heat/engine/service.py下的create_stack调用heat/engine/stack.py下的store

 def store(self, backup=False):
        '''
        Store the stack in the database and return its ID
        If self.id is set, we update the existing stack
        '''
        s = {
            'name': self._backup_name() if backup else self.name,
            'raw_template_id': self.t.store(self.context),
            'parameters': self.env.user_env_as_dict(),
            'owner_id': self.owner_id,
            'username': self.username,
            'tenant': self.tenant_id,
            'action': self.action,
            'status': self.status,
            'status_reason': self.status_reason,
            'timeout': self.timeout_mins,
            'disable_rollback': self.disable_rollback,
            'stack_user_project_id': self.stack_user_project_id,
            'updated_at': self.updated_time,
            'user_creds_id': self.user_creds_id,
            'backup': backup
        }
        if self.id:
            db_api.stack_update(self.context, self.id, s)
        else:
            if not self.user_creds_id:
                # Create a context containing a trust_id and trustor_user_id
                # if trusts are enabled
                if cfg.CONF.deferred_auth_method == 'trusts':
                    keystone = self.clients.client('keystone')
                    trust_ctx = keystone.create_trust_context()
                    new_creds = db_api.user_creds_create(trust_ctx)
                else:
                    new_creds = db_api.user_creds_create(self.context)
                s['user_creds_id'] = new_creds.id
                self.user_creds_id = new_creds.id

            new_s = db_api.stack_create(self.context, s)
            self.id = new_s.id
            self.created_time = new_s.created_at

        self._set_param_stackid()

        return self.id

展开由heat/engine/service.py下的create_stack调用的同一个类下的start_with_lock

def start_with_lock(self, cnxt, stack, engine_id, func, *args, **kwargs):
        
        lock = stack_lock.StackLock(cnxt, stack, engine_id)
        # 获取锁的生成器实现上下文管理
        with lock.thread_lock(stack.id):
            th = self.start_with_acquired_lock(stack, lock,
                                               func, *args, **kwargs)
            return th

展开由start_with_locak调用的heat/engine/stack_lock.py下的thread_lock

 def thread_lock(self, stack_id):
        """
        Acquire a lock and release it only if there is an exception.  The
        release method still needs to be scheduled to be run at the
        end of the thread using the Thread.link method.
        """
        try:
            # 这里的acquire就是通过数据库获取一把锁
            self.acquire()
            yield
        except:  # noqa
            with excutils.save_and_reraise_exception():
                self.release(stack_id)

展开由start_with_locak同一个模块下的th = self.start_with_acquired_lock

def start_with_acquired_lock(self, stack, lock, func, *args, **kwargs):
        """
        Run the given method in a sub-thread and release the provided lock
        when the thread finishes.

        :param stack: Stack to be operated on
        :type stack: heat.engine.parser.Stack
        :param lock: The acquired stack lock
        :type lock: heat.engine.stack_lock.StackLock
        :param func: Callable to be invoked in sub-thread
        :type func: function or instancemethod
        :param args: Args to be passed to func
        :param kwargs: Keyword-args to be passed to func

        """
        def release(gt, *args):
            """
            Callback function that will be passed to GreenThread.link().
            """
            lock.release(*args)

        # 调用了协程
        th = self.start(stack.id, func, *args, **kwargs)
        # 这里的link就是greenthread的link,用于在协程结束的时候做些什么
        th.link(release, stack.id)
        return th

展开由start_with_acquired调用的同一个模块下的start

def start(self, stack_id, func, *args, **kwargs):
        """
        Run the given method in a sub-thread.
        """
        if stack_id not in self.groups:
            self.groups[stack_id] = threadgroup.ThreadGroup()
        return self.groups[stack_id].add_thread(func, *args, **kwargs)

展开start调用的add_thread

def add_thread(self, callback, *args, **kwargs):
        gt = self.pool.spawn(callback, *args, **kwargs)
        th = Thread(gt, self)
        self.threads.append(th)
        return th

下面看一下heat/engine/service.py下的_stack_create具体实现

def _stack_create(stack):

            if not stack.stack_user_project_id:
                stack.create_stack_user_project_id()

            # Create/Adopt a stack, and create the periodic task if successful
            if stack.adopt_stack_data:
                if not cfg.CONF.enable_stack_adopt:
                    raise exception.NotSupported(feature='Stack Adopt')

                stack.adopt()
            else:
                # 重点在这里
                stack.create()

            if (stack.action in (stack.CREATE, stack.ADOPT)
                    and stack.status == stack.COMPLETE):
                if self.stack_watch:
                    # Schedule a periodic watcher task for this stack
                    self.stack_watch.start_watch_task(stack.id, cnxt)
            else:
                LOG.info(_("Stack create failed, status %s") % stack.status)

下面展开由_stack_create调用的heat/engine/stack.py中的create

def create(self):
        '''
        Create the stack and all of the resources.
        '''
        def rollback():
            if not self.disable_rollback and self.state == (self.CREATE,
                                                            self.FAILED):
                self.delete(action=self.ROLLBACK)

        # 这里开始进行调度
        creator = scheduler.TaskRunner(self.stack_task,
                                       action=self.CREATE,
                                       reverse=False,
                                       post_func=rollback,
                                       error_wait_time=ERROR_WAIT_TIME)
        creator(timeout=self.timeout_secs())

展开由create调用的heat/engine/scheduler.py下的TaskRunner

class TaskRunner(object):
    """
    Wrapper for a resumable task (co-routine).
    """

    def __init__(self, task, *args, **kwargs):
        """
        Initialise with a task function, and arguments to be passed to it when
        it is started.

        The task function may be a co-routine that yields control flow between
        steps.
        """
        assert callable(task), "Task is not callable"

        self._task = task
        self._args = args
        self._kwargs = kwargs
        self._runner = None
        self._done = False
        self._timeout = None
        self.name = task_description(task)

这里展开由create调用的heat/engine/stack.py中的timeout_secs

def timeout_secs(self):
        '''
        Return the stack action timeout in seconds.
        '''
        if self.timeout_mins is None:
            return cfg.CONF.stack_action_timeout

        return self.timeout_mins * 60

下面继续回到heat/engine/scheduler.py中的__call__

def __call__(self, wait_time=1, timeout=None):
        """
        Start and run the task to completion.

        The task will sleep for `wait_time` seconds between steps. To avoid
        sleeping, pass `None` for `wait_time`.
        """
        # 这里的timeout的值为3600
        # 这里调用了start
        self.start(timeout=timeout)
        # ensure that wait is applied only if task has not completed.
        if not self.done():
            self._sleep(wait_time)
        self.run_to_completion(wait_time=wait_time)

start 调用代码如下:

def start(self, timeout=None):
        """
        Initialise the task and run its first step.

        If a timeout is specified, any attempt to step the task after that
        number of seconds has elapsed will result in a Timeout being
        raised inside the task.
        """
        assert self._runner is None, "Task already started"
        assert not self._done, "Task already cancelled"

        LOG.debug('%s starting' % str(self))

        if timeout is not None:
            self._timeout = Timeout(self, timeout)

        # 这里调用了self._task得到了一个生成器
        result = self._task(*self._args, **self._kwargs)
        if isinstance(result, types.GeneratorType):
            self._runner = result
            # 开始调用self.setp 
            self.step()
        else:
            self._runner = False
            self._done = True
            LOG.debug('%s done (not resumable)' % str(self))

上一篇 Nova resize

下一篇 Heat 介绍

Comments