Heat源码分析
本文主要以一个Stack创建过程分析,具体过程通过如下代码跟进。
Heat Stack创建流程图
Heat Stack代码分析
Heat Engine 入口文件heat/engine/service.py下的create_stack
def create_stack(self, cnxt, stack_name, template, params, files, args,
owner_id=None):
# 这里的cnxt包含租户信息,类似于其它模块的contex
# 这里的stack_name为创建时输入的名字
# 这里的template是整个创建时-f 模板文件的内容
# params, files, 如果没有传递为空
# args为'disable_rollback': True
LOG.info(_('Creating stack %s') % stack_name)
# 这里的def _stack_create先抛开,后面单独展开
# 这里属于重点内容
stack = self._parse_template_and_validate_stack(cnxt,
stack_name,
template,
params,
files,
args,
owner_id)
# 这里调用stack存储进数据库
stack.store()
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
_stack_create, stack)
return dict(stack.identifier())
下面转到同意模块下的_parse_template_and_validate_stack
def _parse_template_and_validate_stack(self, cnxt, stack_name, template,
params, files, args, owner_id=None):
# 这里进行模板版本匹配,如果版本不在支持的范围内,会出现错误
tmpl = templatem.Template(template, files=files)
# 这里调用进行资源类型判定与租户允许的stack数及per_stack判定
self._validate_new_stack(cnxt, stack_name, tmpl)
# If it is stack-adopt, use parameters from adopt_stack_data
common_params = api.extract_args(args)
if rpc_api.PARAM_ADOPT_STACK_DATA in common_params:
params[rpc_api.STACK_PARAMETERS] = common_params[
rpc_api.PARAM_ADOPT_STACK_DATA]['environment'][
rpc_api.STACK_PARAMETERS]
# 加载环境资源,这里为空
env = environment.Environment(params)
stack = parser.Stack(cnxt, stack_name, tmpl, env,
owner_id=owner_id,
**common_params)
self._validate_deferred_auth_context(cnxt, stack)
# 这里
stack.validate()
return stack
展开由_parse_template_and_validate_stack调用的heat/engine/template.py中的Template
class Template(collections.Mapping):
'''A stack template.'''
def __new__(cls, template, *args, **kwargs):
'''Create a new Template of the appropriate class.'''
if cls != Template:
TemplateClass = cls
else:
# 这里调用get_template_class去调用模板的版本是否合法,以及获取版本
TemplateClass = get_template_class(template)
return super(Template, cls).__new__(TemplateClass)
def __init__(self, template, template_id=None, files=None):
'''
Initialise the template with a JSON object and a set of Parameters
'''
self.id = template_id
self.t = template
self.files = files or {}
self.maps = self[self.MAPPINGS]
self.version = get_version(self.t, _template_classes.keys())
展开同一个模块下的get_template_class
def get_template_class(template_data):
global _template_classes
if _template_classes is None:
mgr = _get_template_extension_manager()
# 这里的mgr.names包含了支持的版本名字如['AWSTemplateFormatVersion.2010-09-09']
_template_classes = dict((tuple(name.split('.')), mgr[name].plugin)
for name in mgr.names())
available_versions = _template_classes.keys()
version = get_version(template_data, available_versions)
version_type = version[0]
try:
# 去判断了版本是否合法
return _template_classes[version]
except KeyError:
..............
raise exception.InvalidTemplateVersion(explanation=explanation)
这里展开由_parse_template_and_validate_stack调用的heat/engine/service.py中的_validate_new_stack
def _validate_new_stack(self, cnxt, stack_name, parsed_template):
try:
# 这里调用进行资源类型判定
parsed_template.validate()
except Exception as ex:
raise exception.StackValidationFailed(message=six.text_type(ex))
# 判定stack_name,如果存在就会报错
if db_api.stack_get_by_name(cnxt, stack_name):
raise exception.StackExists(stack_name=stack_name)
# 获取租户允许active的最大值,默认100
tenant_limit = cfg.CONF.max_stacks_per_tenant
if db_api.stack_count_all(cnxt) >= tenant_limit:
message = _("You have reached the maximum stacks per tenant, %d."
" Please delete some stacks.") % tenant_limit
raise exception.RequestLimitExceeded(message=message)
# 这里的per_stack默认值是1000
num_resources = len(parsed_template[parsed_template.RESOURCES])
if num_resources > cfg.CONF.max_resources_per_stack:
message = exception.StackResourceLimitExceeded.msg_fmt
raise exception.RequestLimitExceeded(message=message)
展开由_validate_new_stack调用的heat/engine/template.py下的validate
def validate(self):
# self.t.keys为[u'heat_template_version', u'description', u'resources']
for k in self.t.keys():
# 这里判断k的值是否在self.SECTIONS里面
# self.SECTIONS的值为('heat_template_version', 'description',
# 'parameter_groups', 'parameters', 'resources', 'outputs', '__undefined__')
if k not in self.SECTIONS:
raise exception.InvalidTemplateSection(section=k)
# check resources
# 这里获取各类资源类型
for res in self[self.RESOURCES].values():
try:
# 对资源类型进行判定
if not res.get('Type'):
message = _('Every Resource object must '
'contain a Type member.')
raise exception.StackValidationFailed(message=message)
except AttributeError:
type_res = type(res)
if isinstance(res, unicode):
type_res = "string"
message = _('Resources must contain Resource. '
'Found a [%s] instead') % type_res
raise exception.StackValidationFailed(message=message)
展开由_parse_template_and_validate_stack调用的heat/engine/stack.py 下的Stack
class Stack(collections.Mapping):
def __init__(self, context, stack_name, tmpl, env=None,
stack_id=None, action=None, status=None,
status_reason='', timeout_mins=None, resolve_data=True,
disable_rollback=True, parent_resource=None, owner_id=None,
adopt_stack_data=None, stack_user_project_id=None,
created_time=None, updated_time=None,
user_creds_id=None, tenant_id=None,
use_stored_context=False, username=None):
def _validate_stack_name(name):
if not re.match("[a-zA-Z][a-zA-Z0-9_.-]*$", name):
message = _('Invalid stack name %s must contain '
'only alphanumeric or \"_-.\" characters, '
'must start with alpha') % name
raise exception.StackValidationFailed(message=message)
if owner_id is None:
_validate_stack_name(stack_name)
# self.xxxx 初始化了一堆数据
if use_stored_context:
self.context = self.stored_context()
# 这里的self.clients是一个类,里面包含了heat支持的客户端
# ['_clients', 'auth_token', 'ceilometer', 'cinder', 'client', 'client_plugin',
# 'context', 'glance', 'heat', 'keystone', 'neutron', 'nova', 'swift', 'trove', 'url_for']
self.clients = self.context.clients
# This will use the provided tenant ID when loading the stack
# from the DB or get it from the context for new stacks.
self.tenant_id = tenant_id or self.context.tenant_id
self.username = username or self.context.username
# 这里进行了heat支持的客户端初始化与env初始化
resources.initialise()
self.env = env or environment.Environment({})
self.parameters = self.t.parameters(self.identifier(),
user_params=self.env.params)
self._set_param_stackid()
if resolve_data:
self.outputs = self.resolve_static_data(self.t[self.t.OUTPUTS])
else:
self.outputs = {}
这里展开由Stack初始化调用的heat/engine/resources/init.py下的initialise
def initialise():
global _environment
if _environment is not None:
return
# 初始化客户端,调用heat/engine/clients/__init__.py中的initialise
# 进行extension.ExtensionManager加载
clients.initialise()
# 这里初始化env
global_env = environment.Environment({}, user_env=False)
_load_global_environment(global_env)
_environment = global_env
展载载开由_parse_template_and_validate_stack调用的heat/engine/stack.py中validate
def validate(self):
...........
# 这里是重点self.dependencies的值为Dependencies([(<heat.engine.resources.server.Server
# object at 0x515f850>, <heat.engine.resources.volume.CinderVolume object at 0x5096a90>)])
for res in self.dependencies:
try:
# ....
result = res.validate()
except exception.HeatException as ex:
LOG.info(ex)
raise ex
except Exception as ex:
LOG.exception(ex)
raise StackValidationFailed(message=encodeutils.safe_decode(
six.text_type(ex)))
if result:
raise StackValidationFailed(message=result)
for val in self.outputs.values():
snippet = val.get('Value', '')
try:
function.validate(snippet)
except Exception as ex:
reason = 'Output validation error: %s' % six.text_type(ex)
raise StackValidationFailed(message=reason)
这里展开由validate调用的heat/engine/resource.py下的validate
def validate(self):
LOG.info(_('Validating %s') % str(self))
# self.t的值为模板中的资源ResourceDefinition {'Type': u'OS::Cinder::Volume',
# 'Properties': {u'image': u'cirros', u'size': 1}}
function.validate(self.t)
self.validate_deletion_policy(self.t.deletion_policy())
return self.properties.validate()
展开由heat/engine/service.py下的create_stack调用heat/engine/stack.py下的store
def store(self, backup=False):
'''
Store the stack in the database and return its ID
If self.id is set, we update the existing stack
'''
s = {
'name': self._backup_name() if backup else self.name,
'raw_template_id': self.t.store(self.context),
'parameters': self.env.user_env_as_dict(),
'owner_id': self.owner_id,
'username': self.username,
'tenant': self.tenant_id,
'action': self.action,
'status': self.status,
'status_reason': self.status_reason,
'timeout': self.timeout_mins,
'disable_rollback': self.disable_rollback,
'stack_user_project_id': self.stack_user_project_id,
'updated_at': self.updated_time,
'user_creds_id': self.user_creds_id,
'backup': backup
}
if self.id:
db_api.stack_update(self.context, self.id, s)
else:
if not self.user_creds_id:
# Create a context containing a trust_id and trustor_user_id
# if trusts are enabled
if cfg.CONF.deferred_auth_method == 'trusts':
keystone = self.clients.client('keystone')
trust_ctx = keystone.create_trust_context()
new_creds = db_api.user_creds_create(trust_ctx)
else:
new_creds = db_api.user_creds_create(self.context)
s['user_creds_id'] = new_creds.id
self.user_creds_id = new_creds.id
new_s = db_api.stack_create(self.context, s)
self.id = new_s.id
self.created_time = new_s.created_at
self._set_param_stackid()
return self.id
展开由heat/engine/service.py下的create_stack调用的同一个类下的start_with_lock
def start_with_lock(self, cnxt, stack, engine_id, func, *args, **kwargs):
lock = stack_lock.StackLock(cnxt, stack, engine_id)
# 获取锁的生成器实现上下文管理
with lock.thread_lock(stack.id):
th = self.start_with_acquired_lock(stack, lock,
func, *args, **kwargs)
return th
展开由start_with_locak调用的heat/engine/stack_lock.py下的thread_lock
def thread_lock(self, stack_id):
"""
Acquire a lock and release it only if there is an exception. The
release method still needs to be scheduled to be run at the
end of the thread using the Thread.link method.
"""
try:
# 这里的acquire就是通过数据库获取一把锁
self.acquire()
yield
except: # noqa
with excutils.save_and_reraise_exception():
self.release(stack_id)
展开由start_with_locak同一个模块下的th = self.start_with_acquired_lock
def start_with_acquired_lock(self, stack, lock, func, *args, **kwargs):
"""
Run the given method in a sub-thread and release the provided lock
when the thread finishes.
:param stack: Stack to be operated on
:type stack: heat.engine.parser.Stack
:param lock: The acquired stack lock
:type lock: heat.engine.stack_lock.StackLock
:param func: Callable to be invoked in sub-thread
:type func: function or instancemethod
:param args: Args to be passed to func
:param kwargs: Keyword-args to be passed to func
"""
def release(gt, *args):
"""
Callback function that will be passed to GreenThread.link().
"""
lock.release(*args)
# 调用了协程
th = self.start(stack.id, func, *args, **kwargs)
# 这里的link就是greenthread的link,用于在协程结束的时候做些什么
th.link(release, stack.id)
return th
展开由start_with_acquired调用的同一个模块下的start
def start(self, stack_id, func, *args, **kwargs):
"""
Run the given method in a sub-thread.
"""
if stack_id not in self.groups:
self.groups[stack_id] = threadgroup.ThreadGroup()
return self.groups[stack_id].add_thread(func, *args, **kwargs)
展开start调用的add_thread
def add_thread(self, callback, *args, **kwargs):
gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(gt, self)
self.threads.append(th)
return th
下面看一下heat/engine/service.py下的_stack_create具体实现
def _stack_create(stack):
if not stack.stack_user_project_id:
stack.create_stack_user_project_id()
# Create/Adopt a stack, and create the periodic task if successful
if stack.adopt_stack_data:
if not cfg.CONF.enable_stack_adopt:
raise exception.NotSupported(feature='Stack Adopt')
stack.adopt()
else:
# 重点在这里
stack.create()
if (stack.action in (stack.CREATE, stack.ADOPT)
and stack.status == stack.COMPLETE):
if self.stack_watch:
# Schedule a periodic watcher task for this stack
self.stack_watch.start_watch_task(stack.id, cnxt)
else:
LOG.info(_("Stack create failed, status %s") % stack.status)
下面展开由_stack_create调用的heat/engine/stack.py中的create
def create(self):
'''
Create the stack and all of the resources.
'''
def rollback():
if not self.disable_rollback and self.state == (self.CREATE,
self.FAILED):
self.delete(action=self.ROLLBACK)
# 这里开始进行调度
creator = scheduler.TaskRunner(self.stack_task,
action=self.CREATE,
reverse=False,
post_func=rollback,
error_wait_time=ERROR_WAIT_TIME)
creator(timeout=self.timeout_secs())
展开由create调用的heat/engine/scheduler.py下的TaskRunner
class TaskRunner(object):
"""
Wrapper for a resumable task (co-routine).
"""
def __init__(self, task, *args, **kwargs):
"""
Initialise with a task function, and arguments to be passed to it when
it is started.
The task function may be a co-routine that yields control flow between
steps.
"""
assert callable(task), "Task is not callable"
self._task = task
self._args = args
self._kwargs = kwargs
self._runner = None
self._done = False
self._timeout = None
self.name = task_description(task)
这里展开由create调用的heat/engine/stack.py中的timeout_secs
def timeout_secs(self):
'''
Return the stack action timeout in seconds.
'''
if self.timeout_mins is None:
return cfg.CONF.stack_action_timeout
return self.timeout_mins * 60
下面继续回到heat/engine/scheduler.py中的__call__
def __call__(self, wait_time=1, timeout=None):
"""
Start and run the task to completion.
The task will sleep for `wait_time` seconds between steps. To avoid
sleeping, pass `None` for `wait_time`.
"""
# 这里的timeout的值为3600
# 这里调用了start
self.start(timeout=timeout)
# ensure that wait is applied only if task has not completed.
if not self.done():
self._sleep(wait_time)
self.run_to_completion(wait_time=wait_time)
start 调用代码如下:
def start(self, timeout=None):
"""
Initialise the task and run its first step.
If a timeout is specified, any attempt to step the task after that
number of seconds has elapsed will result in a Timeout being
raised inside the task.
"""
assert self._runner is None, "Task already started"
assert not self._done, "Task already cancelled"
LOG.debug('%s starting' % str(self))
if timeout is not None:
self._timeout = Timeout(self, timeout)
# 这里调用了self._task得到了一个生成器
result = self._task(*self._args, **self._kwargs)
if isinstance(result, types.GeneratorType):
self._runner = result
# 开始调用self.setp
self.step()
else:
self._runner = False
self._done = True
LOG.debug('%s done (not resumable)' % str(self))