"""CableLabs CCAP XSD output plugin""" # # This plugin outputs XML Schema specific to the CableLabs CCAP OSSI # specification available here: # http://www.cablelabs.com/cablemodem/specifications/ccap.html # __version__ = "20121212" # CABLELABS: VErsion 20121212: # * Built-in function seems to be buggy, re-implementing custom function # CABLELABS: Version 20121211: # * Removed custom is_mandatory() function, found one in statements include # CABLELABS: Version 20121207: # * Another bug fix relating to mandatory containers, deferring NGAA-468 # CABLELABS: Version 20121206: # * Fix bug relating to mandatory containers # CABLELABS: Version 20121205: # * NGAA-468: Remove hardcoding of 'ccap', fix so any module name is usable. # * Code clean-up: removed commented-out unneeded defs # CABLELABS: Version 20121204: # * Fixes relating to mandatory containers # CABLELABS: Version 20121128: # * Add ccap:inlineType extension checking, disables global_complex_types for # parent node only when used with --inline-type # * NGAA-466: reinstated wrapping choice subelements in sequence tags, added # named complex types in these as well (check yang-ext placement in .yang # files) # CABLELABS: Version 20120814: # * fix numbering of non-extension point ext-choiceNNN elements to increment # properly # CABLELABS: Version 20120724: # * fixes bug removing all numbered ext-choice when --ccap-vendor is used, # now only descendants of ccap:extensionPoints are removed # CABLELABS: Version 20120712: # * removes elementFormDefault="qualified" as per NGAA-449 # * removes extraneous numbered ext_choiceX when --ccap-vendor is used # CABLELABS: Version 20120709: # * fixes NGAA-297 #5 and #7 from xml.sax.saxutils import quoteattr from xml.sax.saxutils import escape import optparse import re import sys import pyang from pyang import plugin from pyang import util from pyang import statements from pyang import error yang_to_xsd_types = \ {'int8': 'byte', 'int16': 'short', 'int32': 'int', 'int64': 'long', 'uint8': 'unsignedByte', 'uint16': 'unsignedShort', 'uint32': 'unsignedInt', 'uint64': 'unsignedLong', 'decimal64': 'decimal', 'string': 'string', 'boolean': 'boolean', # enumeration is handled separately # bits is handled separately 'binary': 'base64Binary', 'hexbinary': 'ccapOctetString', # leafref is handled separately 'instance-identifier': 'string', 'identityref': 'QName', # empty is handled separately # union is handled separately } # keyword argument-name yin-element xsd-appinfo yang_keywords = \ {'anyxml': ('name', False, False), 'argument': ('name', False, False), 'augment': ('target-node', False, False), 'base': ('name', False, False), 'belongs-to': ('module', False, True), 'bit': ('name', False, False), 'case': ('name', False, False), 'choice': ('name', False, False), 'config': ('value', False, True), 'contact': ('text', True, True), 'container': ('name', False, False), 'default': ('value', False, True), 'description': ('text', True, False), 'deviate': ('value', False, False), 'deviation': ('target-node', False, False), 'enum': ('name', False, False), 'error-app-tag': ('value', False, True), 'error-message': ('value', True, True), 'extension': ('name', False, False), 'feature': ('name', False, False), 'fraction-digits': ('value', False, False), 'grouping': ('name', False, False), 'identity': ('name', False, False), 'if-feature': ('name', False, False), 'import': ('module', False, True), 'include': ('module', False, True), 'input': (None, None, False), 'key': ('value', False, False), 'leaf': ('name', False, False), 'leaf-list': ('name', False, False), 'length': ('value', False, False), 'list': ('name', False, False), 'mandatory': ('value', False, True), 'max-elements': ('value', False, True), 'min-elements': ('value', False, True), 'module': ('name', False, False), 'must': ('condition', False, True), 'namespace': ('uri', False, False), 'notification': ('name', False, False), 'ordered-by': ('value', False, True), 'organization': ('text', True, True), 'output': (None, None, False), 'path': ('value', False, False), 'pattern': ('value', False, False), 'position': ('value', False, False), 'presence': ('value', False, False), 'prefix': ('value', False, True), 'range': ('value', False, False), 'reference': ('text', True, True), 'refine': ('target-node', False, False), 'require-instance': ('value', False, True), 'revision': ('date', False, True), 'revision-date': ('date', False, True), 'rpc': ('name', False, False), 'status': ('value', False, True), 'submodule': ('name', False, False), 'type': ('name', False, False), 'typedef': ('name', False, False), 'unique': ('tag', False, False), 'units': ('name', False, True), 'uses': ('name', False, False), 'value': ('value', False, False), 'when': ('condition', False, True), 'yang-version': ('value', False, True), 'yin-element': ('value', False, False), } class debug: """instantiate class with positive integer or leave blank for default""" def __init__(self, level=0): self.level = level def pr(self, level, text): """send str(text) to this method to print text above int(level)""" if self.level >= level: print text def get_level(self): return self.level def set_level(self, level): self.level = level # Set this to 0 to disable console debugging output: d = debug(0) d.pr(1, "======== new run ========") def pyang_plugin_init(): plugin.register_plugin(CLXSDPlugin()) class CLXSDPlugin(plugin.PyangPlugin): def add_opts(self, optparser): optlist = [ optparse.make_option("--ccapxsd-global-complex-types", dest="xsd_global_complex_types", action="store_true", help="Make all complex types global instead " "of inline"), optparse.make_option("--ccapxsd-groups", dest="xsd_groups", action="store_true", help="EXPERIMENTAL: does not work yet"), optparse.make_option("--ccapxsd-no-appinfo", dest="xsd_no_appinfo", action="store_true", help="Do not print YANG specific appinfo"), optparse.make_option("--ccapxsd-no-imports", dest="xsd_no_imports", action="store_true", help="Do not generate xs:import elements"), optparse.make_option("--ccapxsd-break-pattern", dest="xsd_break_pattern", action="store_true", help="Break XSD pattern so that they fit " "into RFCs"), optparse.make_option("--ccapxsd-no-lecture", dest="xsd_no_lecture", action="store_true", help="Do not generate the lecture about " "how the XSD can be used"), optparse.make_option("--ccap-vendor", dest="ccap_vendor", action="store_true", help="Use with " "--ccapxsd-global-complex-types to treat " "ccap:extensionPoint as unnamed " "complexType"), optparse.make_option("--inline-type", dest="inline_type", action="store_true", help="Use with " "--ccapxsd-global-complex-types and " "--ccap-vendor NOT to treat " "ccap:inlineType as unnamed complexType"), ] g = optparser.add_option_group("CableLabs CCAP XSD output specific " "options") g.add_options(optlist) def add_output_format(self, fmts): fmts['ccapxsd'] = self def emit(self, ctx, modules, fd): module = modules[0] # cannot do XSD unless everything is ok for our module for (epos, etag, eargs) in ctx.errors: if (epos.top == module and error.is_error(error.err_level(etag))): raise error.EmitError("XSD translation needs a valid module") # we also need to have all other modules found for pre in module.i_prefixes: (modname, revision) = module.i_prefixes[pre] mod = statements.modulename_to_module(module, modname, revision) if mod == None: raise error.EmitError("cannot find module %s, needed by XSD" " translation" % modname) emit_xsd(ctx, module, fd) class DummyFD(object): def write(self, s): pass def expand_locally_defined_typedefs(ctx, module, m): """Create top-level typedefs for all locally defined typedefs.""" for c in m.search('typedef'): c.i_xsd_name = c.arg for inc in module.search('include'): rev = None r = inc.search_one('revision-date') if r is not None: rev = r.arg m = ctx.get_module(inc.arg, rev) for c in m.search('typedef'): c.i_xsd_name = c.arg def gen_name(name, name_list): i = 0 tname = name + '_' + str(i) while util.attrsearch(tname, 'i_xsd_name', name_list): i = i + 1 tname = name + '_' + str(i) return tname def add_typedef(obj): for t in obj.search('typedef'): t.i_xsd_name = gen_name(t.arg, module.search('typedef') + module.i_local_typedefs) module.i_local_typedefs.append(t) if 'i_children' in obj.__dict__: for c in obj.i_children: add_typedef(c) for c in obj.search('grouping'): add_typedef(c) for c in (m.i_children + m.search('import') + m.search('augment') + m.search('grouping')): add_typedef(c) def emit_xsd(ctx, module, fd): if module.keyword == 'submodule': belongs_to = module.search_one('belongs-to') parent_modulename = belongs_to.arg parent_module = ctx.read_module(parent_modulename) if parent_module is not None: i_namespace = parent_module.search_one('namespace').arg i_prefix = parent_module.search_one('prefix').arg else: raise error.EmitError("cannot find module %s, needed by XSD" " translation" % parent_modulename) else: i_namespace = module.search_one('namespace').arg i_prefix = module.search_one('prefix').arg # initialize some XSD specific variables module.i_xsd_namespace = i_namespace module.i_xsd_prefix = i_prefix module.i_gen_typedef = [] module.i_gen_import = [] module.i_gen_augment_idx = 0 module.i_local_typedefs = [] # CABLELABS # To keep track of the per-module choice-ext idx module.i_choiceext_idx = 1 # The list of extensions in cablelabs modules # module.i_extensions = ['extension-point'] mods = [module] for i in module.search('include'): subm = ctx.get_module(i.arg) if subm is not None: mods.append(subm) # make sure the top module imports all modules imported by the # submodule for subimp in subm.search('import'): p = subimp.search_one('prefix').arg (othermodname, otherrevision) = subm.i_prefixes[p] ourprefix = util.dictsearch((othermodname, otherrevision), module.i_prefixes) if ourprefix is None: # we don't have a prefix for this module newprefix = gen_new_import(module, othermodname, otherrevision) newmod = ctx.get_module(othermodname, otherrevision) newmod.i_xsd_prefix = newprefix # make sure we "import" all modules imported by our modules, # recursively. the reason for this is that we might generate a # type reference for leafreafs, and for this the module must be # imported. unused imports are ignored by confdc. handled = [] def add_import(othermodname, otherrevision): if (othermodname, otherrevision) not in handled: # new module handled.append((othermodname, otherrevision)) ourprefix = util.dictsearch((othermodname, otherrevision), module.i_prefixes) if ourprefix is None: # we don't have a prefix for this module newprefix = gen_new_import(module, othermodname, otherrevision) newmod = ctx.get_module(othermodname, otherrevision) newmod.i_xsd_prefix = newprefix else: newmod = ctx.get_module(othermodname, otherrevision) for i in newmod.search('include'): subm = ctx.get_module(i.arg) if subm is not None: for imp in subm.search('import'): r = imp.search_one('revision-date') if r is not None: r = r.arg add_import(imp.arg, r) for imp in newmod.search('import'): r = imp.search_one('revision-date') if r is not None: r = r.arg add_import(imp.arg, r) for imp in module.search('import'): r = imp.search_one('revision-date') if r is not None: r = r.arg add_import(imp.arg, r) # first, create top-level typedefs of local typedefs for m in mods: expand_locally_defined_typedefs(ctx, module, m) prefixes = [module.i_xsd_prefix] + [p for p in module.i_prefixes] if module.i_xsd_prefix in ['xs', 'yin', 'nc', 'ncn']: i = 0 pre = "p" + str(i) while pre in prefixes: i = i + 1 pre = "p" + str(i) prefixes.append(pre) module.i_xsd_prefix = pre has_rpc = False has_notifications = False for c in module.substmts: if c.keyword == 'notification': has_notifications = True elif c.keyword == 'rpc': has_rpc = True fd.write('\n') fd.write(' 0: fd.write(' version="%s"\n' % module.search('revision')[0].arg) fd.write(' xml:lang="en"') handled_modules = [] for m in mods: for pre in m.i_prefixes: (modname, revision) = m.i_prefixes[pre] mod = statements.modulename_to_module(m, modname, revision) if mod in handled_modules or mod.keyword == 'submodule': continue handled_modules.append(mod) if pre in ['xs', 'yin', 'nc', 'ncn']: # someone uses one of our prefixes # generate a new prefix for that module i = 0 pre = "p" + str(i) while pre in prefixes: i = i + 1 pre = "p" + str(i) prefixes.append(pre) mod.i_xsd_prefix = pre if mod == module: uri = mod.i_xsd_namespace else: uri = mod.search_one('namespace').arg fd.write('\n xmlns:' + pre + '=' + quoteattr(uri)) fd.write('>\n\n') if ctx.opts.xsd_no_imports != True: imports = module.search('import') + module.i_gen_import for inc in module.search('include'): rev = None r = inc.search_one('revision-date') if r is not None: rev = r.arg # container, list, choice m = ctx.get_module(inc.arg, rev) imports.extend(m.search('import')) imported = [] for x in imports: rev = None r = x.search_one('revision-date') if r is not None: rev = r.arg mod = ctx.get_module(x.arg, rev) if mod not in imported: date = "" imported.append(mod) uri = mod.search_one('namespace').arg d = mod.search_one('revision') if d is not None: date = "@" + d.arg fd.write(' \n' % (uri, x.arg, date)) if has_rpc and module.arg == 'ietf-netconf': # this is the YANG mdoule for the NETCONF operations # FIXME: when 4741bis has been published, change # the schema location to a http uri fd.write(' ') elif has_rpc: fd.write(' ') if has_notifications: fd.write(' ') if len(imports) > 0 or has_rpc or has_notifications: fd.write('\n') if ctx.opts.xsd_no_lecture != True: fd.write(' \n') fd.write(' \n') fd.write(' The schema describes an instance document' 'consisting\n') fd.write(' of the entire configuration data store, operational\n') fd.write(' data, rpc operations, and notifications.\n') fd.write(' This schema can thus NOT be used as-is to\n') fd.write(' validate NETCONF PDUs.\n') fd.write(' pythonVersion="%s.%s.%s"\n' % (sys.version_info[0], sys.version_info[1], sys.version_info[2])) fd.write(' pyangVersion="%s"\n' % pyang.__version__) fd.write(' ccapXsdPluginVersion="%s"\n' % __version__) fd.write(' \n') fd.write(' \n\n') print_annotation(ctx, fd, ' ', module) ctx.i_pass = 'second' # print typedefs got_typedefs = False for m in mods: if m.search_one('typedef') is not None: got_typedefs = True if got_typedefs: fd.write('\n \n') for m in mods: for c in m.search('typedef'): print_simple_type(ctx, module, fd, ' ', c.search_one('type'), c, ' name="%s"' % c.i_xsd_name, c.search_one('description')) # print locally defined typedefs if len(module.i_local_typedefs) > 0: fd.write('\n \n') for c in module.i_local_typedefs: fd.write('\n') print_simple_type(ctx, module, fd, ' ', c.search_one('type'), c, ' name="%s"' % c.i_xsd_name, c.search_one('description')) # print augments # filter away local augments; they are printed inline in the XSD augment = [a for a in module.search('augment') if a.i_target_node.i_module.arg != module.arg] if len(augment) > 0: fd.write('\n \n') for c in augment: fd.write('\n') print_augment(ctx, module, fd, ' ', c) # print groupings if ctx.opts.xsd_groups and len(module.i_groupings) > 0: fd.write('\n \n') for c in module.i_groupings: fd.write('\n') print_grouping(ctx, module, fd, ' ', module.i_groupings[c]) fd.write('\n') # print data definitions ctx.xsd_ct_names = {} ctx.xsd_ct_queue = [] # CABLELABS 2012-07-05: added 1 attribute of ctx.opts for tracking # --ccap-vendor global complex types toggling: ctx.opts.global_complex_types = ctx.opts.xsd_global_complex_types print_children(ctx, module, fd, module.i_children, ' ', []) # CABLELABS: The ext-type for extensions: fd.write(' \n') fd.write(' \n') fd.write(' \n') fd.write(' \n') fd.write(' \n') fd.write(' \n') while ctx.xsd_ct_queue: (path, uindent, extbase, cn, c, aname) = ctx.xsd_ct_queue.pop() print_complex_type(ctx, module, fd, ' ', path, [], uindent, extbase, cn, c, aname) # then print all generated 'dummy' simpleTypes, if any if len(module.i_gen_typedef) > 0: fd.write('\n \n') for c in module.i_gen_typedef: fd.write('\n') print_simple_type(ctx, module, fd, ' ', c.search_one('type'), c, ' name="%s"' % c.arg, None) fd.write('\n\n') #CABLELABS: 2012-08-17: altered function for more flexibility, added inlineType #special check for ccap extensionPoints #CABLELABS: 2012-11-02: createdcontains_extension function for inlineType #checking def is_descendant_of_extension(search, c, ctx): """return True of c is descendant of a (search) extension""" if c.__dict__['parent'] is not None: d.pr(5, "\n") if c.search_one(search): d.pr(5, "is_descendant_of_extension(" + str(search) + ") returned True") return True else: d.pr(5, "is_descendant_of_extension(" + str(search) + ") recursion...") return is_descendant_of_extension(search, c.parent, ctx) else: d.pr(5, "is_descendant_of_extension(" + str(search) + ") returned False") return False def print_lineage(c, module_prefix="ccap", s=""): if c.__dict__['parent'] is not None: ext_pt = "" if c.search_one((module_prefix, 'extensionPoint')): ext_pt += "[extensionPoint]" if c.search_one((module_prefix, 'inlineType')): ext_pt += "[inlineType]" if is_mandatory(c): ext_pt += "[mandatory]" s = " => %s%s (%s)" % (c.arg, ext_pt, c.keyword) + s print_lineage(c.parent, module_prefix, s) else: d.pr(2, s[4:]) def is_mandatory(c): if c.search_one('mandatory', 'true'): return True elif c.keyword == 'container' and 'i_children' in c.__dict__: for i in c.i_children: if (i.keyword in ['leaf', 'choice', 'anyxml'] and i.search_one('mandatory', 'true')): return True elif (i.keyword in ['list', 'leaf-list'] and i.search_one('min-elements') is not None and int(i.search_one('min-elements').arg) > 0): return True elif i.keyword == 'container' and is_mandatory(i): return True else: return False else: return False def print_children(ctx, module, fd, children, indent, path, uniq=[], uindent=''): # CABLELABS 2012-07-05: debugging statements: d.pr(7, "") d.pr(7, "print_children() call") uses_list = [] for c in children: d.pr(2, c.arg + " (" + c.keyword + ") " + str(len(c.substmts))) print_lineage(c) # CABLELABS 2012-06-29: # This treats ccap extensionPoint and inlineType as if the # --xccapxsd-global-complex-types flag were not set when the # --ccap-vendor flag is set OR when inlineType is used if ctx.opts.xsd_global_complex_types: # CABLELABS 2012-07-05: simplified logic for --ccap-vendor # implementation # CABLELABS 2012-11-12: added ccap:inlineType if ((ctx.opts.ccap_vendor and is_descendant_of_extension(('ccap', 'extensionPoint'), c, ctx)) or (ctx.opts.inline_type and is_descendant_of_extension(('ccap', 'inlineType'), c, ctx))): # contains_extension(('ccap', 'inlineType'), c, ctx))): # c.parent.parent.search_one('ccap', 'inlineType'))): d.pr(6, "\n") d.pr(8, "ctx.opts.inline_type = " + str(ctx.opts.inline_type)) d.pr(6, "Setting gct to False") ctx.opts.global_complex_types = False d.pr(7, "c.keyword = " + str(c.keyword)) else: d.pr(6, "\n") d.pr(8, "ctx.opts.inline_type = " + str(ctx.opts.inline_type)) d.pr(6, "Setting gct to True") ctx.opts.global_complex_types = True d.pr(4, " ctx.opts.global_complex_types = " + str(ctx.opts.global_complex_types)) d.pr(8, "ctx.opts.xsd_global_complex_types = " + str(ctx.opts.xsd_global_complex_types)) if (ctx.opts.xsd_groups and hasattr(c, 'i_uses') and c.i_uses[0].i_grouping.parent.parent == None and c.i_uses[0].search_one('refine') is None): # do not inline this child; print a reference to the grouping # instead - only for top-level groupings if c.i_uses[0] not in uses_list: uses_list.append(c.i_uses[0]) fd.write(indent + '\n' % c.i_uses[0].arg) continue cn = c.keyword if cn in ['container', 'list', 'leaf', 'leaf-list', 'anyxml', 'notification', 'rpc']: default = "" mino = "" maxo = "" atype = "" sgroup = "" extbase = None # ctcount = {} # unused variable # Explicitly produce the default minOccurs and maxOccurs values mino = ' minOccurs="1"' maxo = ' maxOccurs="1"' if path == []: pass elif cn in ['leaf']: is_key = False if ((c.parent.keyword == 'list') and (c.parent.search_one('key') is not None) and (c.arg in c.parent.search_one('key').arg.split())): is_key = True if ((is_key == False) and (c.search_one('mandatory') == None or c.search_one('mandatory').arg != 'true')): mino = 'minOccurs="0" ' if c.search_one('default') != None: default = ' default="%s"' % c.search_one('default').arg elif cn in ['container']: # both presence and organizational containers gets # a minOccurs 0, b/c empty organizational containers # don't have to appear in the XML # mino = ' minOccurs="0" ' # XXX: Added minOccurs="1" for containers that have children # that are mandatory true # if c.search_one('mandatory', 'true'): # mino = 'minOccurs="1" ' # CABLELABS: 2012-11-28: Added recursive checking for mandatory # CABLELABS: 2012-12-04: Removed above logic and moved to # is_mandatory() function # if is_mandatory(c): # mino = 'minOccurs="1" ' # CABLELABS: 2012-12-11: Replaced custom def with built-in: if is_mandatory(c): mino = 'minOccurs="1" ' # CABLELABS: If the container is in a choice statement with # mandatory 'true', all containers need to be minOccurs="1" # CABLELABS: 2012-12-04: This is incorrect according to RFC6020 # elif ((c.parent.parent.keyword == 'choice') and # (c.parent.parent.search_one('mandatory') is not None)): # mino = 'minOccurs="1" ' else: mino = 'minOccurs="0" ' elif cn in ['list', 'leaf-list']: if c.search_one('min-elements') != None: mino = ' minOccurs="%s" ' % \ c.search_one('min-elements').arg else: mino = ' minOccurs="0" ' if c.search_one('max-elements') != None: maxo = ' maxOccurs="%s"' % c.search_one('max-elements').arg else: maxo = ' maxOccurs="unbounded"' elif cn in ['anyxml']: if (c.search_one('mandatory') == None or c.search_one('mandatory').arg != 'true'): mino = ' minOccurs="0" ' if cn in ['leaf', 'leaf-list']: type = c.search_one('type') if type.i_is_derived == False: if type.arg == 'empty': atype = '' else: atype = ' type="%s"' % xsd_type_name(ctx, type, c) elif cn in ['notification']: sgroup = ' substitutionGroup="ncn:notificationContent"' extbase = 'ncn:NotificationContentType' elif cn in ['rpc']: sgroup = ' substitutionGroup="nc:rpcOperation"' extbase = 'nc:rpcOperationType' # CABLELABS 2012-06-29: NGAA-297: changed to local scope variable # for more flexibility with --ccap-vendor option if (cn in ['container', 'list', 'rpc', 'notification'] and ctx.opts.global_complex_types): ctype_name = c.arg ## CABLELABS 2012-06-28: This code was in the original xsd ## module to avoid duplicate-name type overwriting. We may ## want to implement some form of this code. # x = c.parent # idx = 0 ## while ctype_name in ctx.xsd_ct_names: ## already taken; generate a new name ## not ideal... ## CABLELABS: We fail miserably on duplicate ## complexType names with different content. ## Need help with this # if x is not None: # ctype_name = '%s_%s' % (x.arg, ctype_name) # else: # ctype_name = '%s_%d' % (ctype_name, idx) # idx = idx + 1 if ctype_name in ctx.xsd_ct_names: ctx.xsd_ct_names[ctype_name] += 1 else: ctx.xsd_ct_names[ctype_name] = 0 atype = ' type="%s-type"' % c.arg d.pr(8, " ctype_name = " + ctype_name) ## CABLELABS 2012-06-28: NGAA-297 5a, 7b (not all containers) # elif c.search_one(("ccap", "extensionPoint")): # atype = ' type="%s-type"' % c.arg # atype = ' type="ext-type"' # ctype_name = None else: ctype_name = None # CABLELABS 2012-06-29: NGAA-297 (all containers) # TODO: This may need to be changed depending on how inlineType # shakes out(?). if (c.search_one(("ccap", "extensionPoint")) and ('i_children' not in c.__dict__ or (ctx.opts.ccap_vendor is None and ctx.opts.global_complex_types))): # ctx.opts.global_complex_types) or # ctx.opts.inline_type is None)): atype = ' type="ext-type"' # CABLELABS 2012-12-05: NGAA-468: Changed logic to parent module # if c.arg == 'ccap': if c.parent.keyword == "module": fd.write(indent + '\n') inline_end = False # CABLELABS 2012-06-29: NGAA-297: changed to local scope variable # for more flexibility with --ccap-vendor option if (cn in ['container', 'list', 'rpc', 'notification'] and not ctx.opts.global_complex_types): if not has_body: fd.write('>\n') d.pr(7, "calling print_complex_type()") print_complex_type(ctx, module, fd, indent + ' ', path, uniq, uindent, extbase, cn, c, '') elif cn in ['leaf', 'leaf-list']: if c.search_one('type').i_is_derived == True: if not has_body: fd.write('>\n') print_simple_type(ctx, module, fd, indent + ' ', c.search_one('type'), c, '', None) elif c.search_one('type').arg == 'empty': if not has_body: fd.write('>\n') fd.write(indent + ' \n') elif not has_body: inline_end = True elif cn in ['anyxml']: if not has_body: fd.write('>\n') # FIXME: make one such global type, if needed fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') # CABLELABS 2012-06-29: NGAA-297: changed to local scope variable # for more flexibility with --ccap-vendor option if (cn in ['container', 'list', 'rpc', 'notification'] and ctx.opts.global_complex_types): d.pr(7, "\ncn = " + cn + ", gct on") ustr = "" for child in c.i_children: # print child.__dict__, "\n" ustr += mk_ustr(module, path + [c.arg], indent + " ", child) if ustr != "": if not has_body: has_body = True fd.write('>\n') fd.write(ustr) if not has_body and not ctx.opts.ccap_vendor: fd.write('/>\n') elif not has_body: fd.write('/>\n') else: fd.write(indent + '\n') # add to queue # CABLELABS: if there is no previous entry w/ the same # ctype_name # print ctype_name # print c.arg # print ctx.xsd_ct_names if ctx.xsd_ct_names[ctype_name] >= 1: # if ctype_name in ctx.xsd_ct_names: pass else: d.pr(3, 'Adding %s (%s) of type="%s-type" to ' 'xsd_ct_queue' % (c.arg, c.keyword, ctype_name)) ctx.xsd_ct_queue.insert(0, (path, uindent, extbase, cn, c, ' name="%s-type"' % ctype_name)) elif inline_end: fd.write('/>\n') else: fd.write(indent + '\n') # CABLELABS 2012-06-29: NGAA-297 5b, 7c # We want to include ext element if ccap extensionPoints are found # when using --ccapxsd-global-complex-types option: if (c.search_one(("ccap", "extensionPoint")) and ctx.opts.xsd_global_complex_types and c.parent.parent.keyword != 'choice'): fd.write(indent + '\n') d.pr(2, "Adding ext element.") elif cn == 'choice': fd.write(indent + '\n') print_description(fd, indent + ' ', c.search_one('description')) for child in c.i_children: # CABLELABS: 2012-11-27: Added for NGAA-466 if child.keyword == 'case': d.pr(6, "calling print_children_type() " "from print_children() -- choice/case") mino = 'minOccurs="0"' if child.parent.search_one('mandatory', 'true'): mino = 'minOccurs="1"' fd.write(indent + ' \n' % (child.arg, mino)) fd.write(indent + ' \n') fd.write(indent + ' \n') print_children(ctx, module, fd, child.i_children, indent + ' ', path) fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') else: # CABLELABS: No need to wrap in sequences # fd.write(indent + ' \n') d.pr(6, "calling print_children() " "from print_children() -- not case") print_children(ctx, module, fd, child.i_children, indent + ' ', path) # CABLELABS: No need to wrap in sequences # fd.write(indent + ' \n') # CABLELABS: Need extenstion elements here, replace with XSLT? # 2012-07-10: remove numbered ext-choice for --ccap-vendor option # 2012-07-24: fix to only remove numbered ext-choices for # descendants of extensions if (ctx.opts.ccap_vendor and is_descendant_of_extension(("ccap", 'extensionPoint'), c, ctx)): pass else: if (c.search_one('mandatory') is not None): fd.write(indent + ' \n' % module.i_choiceext_idx) else: fd.write(indent + ' \n' % module.i_choiceext_idx) module.i_choiceext_idx = module.i_choiceext_idx + 1 fd.write(indent + '\n') def xsd_type_name(ctx, type, parent): if type.arg in yang_to_xsd_types: return "xs:%s" % yang_to_xsd_types[type.arg] elif type.arg == 'leafref': if parent.i_leafref_expanded is False: statements.v_reference_leaf_leafref(ctx, parent) (ref, _pos) = parent.i_leafref_ptr return xsd_type_name(ctx, ref.search_one('type'), ref) # elif ((type.i_typedef != None) and (":" not in type.arg)): # return type.i_typedef.i_xsd_name elif type.i_typedef is not None: if hasattr(type.i_typedef, 'i_xsd_name'): # the type is in our own module return "%s" % type.i_typedef.i_xsd_name elif type.i_typedef.parent.keyword == 'module': # top-level (exported) typedef return "%s:%s" % (type.i_typedef.i_module.i_xsd_prefix, type.i_typedef.arg) elif type.i_typedef.parent.keyword == 'submodule': if (type.i_typedef.parent.i_modulename == type.i_module.arg): # top-level in our own module return type.i_typedef.arg else: # top-level (exported) in submodule othermod = ctx.get_module(type.i_typedef.parent.i_modulename) return "%s:%s" % (othermod.i_xsd_prefix, type.i_typedef.arg) else: # cannot happen print type.arg assert False else: return type.arg def print_augment(ctx, module, fd, indent, augment): i = module.i_gen_augment_idx name = "a" + str(i) while util.attrsearch(name, 'arg', module.search('grouping')) != None: i = i + 1 name = "a" + str(i) module.i_gen_augment_idx = i + 1 fd.write(indent + '\n' % name) print_description(fd, indent + ' ', augment.search_one('description')) fd.write(indent + ' \n') d.pr(6, "calling print_children() from print_augment()") print_children(ctx, module, fd, augment.i_children, indent + ' ', []) fd.write(indent + ' \n') fd.write(indent + '\n') def print_grouping(ctx, module, fd, indent, grouping): fd.write(indent + '\n' % grouping.arg) print_description(fd, indent + ' ', grouping.search_one('description')) fd.write(indent + ' \n') d.pr(6, "calling print_children() from print_grouping()") print_children(ctx, module, fd, grouping.i_children, indent + ' ', []) fd.write(indent + ' \n') fd.write(indent + '\n') def print_description(fd, indent, descr): if descr != None: fd.write(indent + '\n') fd.write(indent + ' \n') fd.write(fmt_text(indent + ' ', descr.arg) + '\n') fd.write(indent + ' \n') fd.write(indent + '\n\n') def gen_new_import(module, modname, revision): i = 0 pre = "p" + str(i) while pre in module.i_prefixes: i = i + 1 pre = "p" + str(i) module.i_prefixes[pre] = (modname, revision) imp = statements.Statement(module, module, None, 'import', modname) if revision is not None: rev = statements.Statement(module, imp, None, 'revision-date', revision) imp.substmts.append(rev) module.i_gen_import.append(imp) return pre def print_complex_type(ctx, module, fd, indent, path, uniq, uindent, extbase, cn, c, aname): # CABLELABS 2012-07-05: debugging statements: d.pr(7, "") d.pr(7, "print_complex_type() call") fd.write(indent + '\n' % aname) extindent = '' if extbase != None: fd.write(indent + ' \n') fd.write(indent + ' \n' % extbase) extindent = ' ' fd.write(indent + extindent + ' \n') if cn == 'rpc': if c.search_one('input') != None: chs = c.search_one('input').i_children else: chs = [] elif cn == 'list': # sort children so that all keys come first k = c.search_one('key') if k is not None: kchs = [] chs = [] keynames = k.arg.split() for ch in c.i_children: if ch.arg in keynames: kchs.append(ch) else: chs.append(ch) chs = kchs + chs else: chs = c.i_children ustr = mk_ustr(module, path, uindent, c) # CABLELABS 2012-08-17: exclude ccap:inlineType from generating keys # when --inline-type flag is used if not ((ctx.opts.ccap_vendor or ctx.opts.inline_type) and ctx.opts.xsd_global_complex_types and # c.search_one(('ccap', 'inlineType'))): is_descendant_of_extension(("ccap", 'inlineType'), c, ctx)): uniq.append(ustr) else: chs = c.i_children # allocate a new object for constraint recording uniqes = [] d.pr(6, "calling print_children() from print_complex_type()") print_children(ctx, module, fd, chs, indent + extindent + ' ', [c.arg] + path, uniqes, indent + extindent) # allow for augments # CABLELABS: No, don't... #fd.write(indent + extindent + ' \n') # CABLELABS: ...but write our own: # CABLELABS 2012-06-29: NGAA-297 5b, 7c # removed following line and relocated to end of print_chilren(): # fd.write(indent + extindent + ' \n') fd.write(indent + extindent + ' \n') if extbase != None: fd.write(indent + ' \n') fd.write(indent + ' \n') # CABLELABS 2012-12-05: for NGAA-468 if c.parent.keyword == "module": # if c.arg == 'ccap': fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') # write the recorded key and unique constraints (if any) # CABLELABS 2012-07-09: replaced following 3 lines with lines afterwards # plugin was including keys in complex types with --ccap-vendor flag: # for u in uniqes: # fd.write(u) # fd.write(indent + '\n') # if ctx.opts.ccap_vendor: # fd.write(indent + '\n') # for u in uniqes: # fd.write(u) # else: # for u in uniqes: # fd.write(u) # fd.write(indent + '\n') fd.write(indent + '\n') for u in uniqes: fd.write(u) # if not ctx.opts.ccap_vendor: # for u in uniqes: # fd.write(u) # fd.write(indent + '\n') # else: # fd.write(indent + '\n') # for u in uniqes: # fd.write(u) def mk_ustr(module, path, uindent, c): k = c.search_one('key') ustr = "" if k is not None: # record the key constraints to be used by our # parent element ustr += uindent + '\n' % '_'.join(path + [c.arg]) ustr += uindent + ' \n' % c.arg for expr in k.arg.split(): # f = '/'.join([module.i_xsd_prefix + ':' + x f = '/'.join([x for x in expr.split('/')]) ustr += uindent + \ ' \n' % f ustr += uindent + '\n' i = 0 for u in c.search('unique'): ustr += uindent + '\n' % \ ('_'.join(path + [c.arg]), i) ustr += uindent + ' \n' % c.arg for expr in u.arg.split(): f = '/'.join([x for x in expr.split('/')]) ustr += uindent + ' \n' % f ustr += uindent + '\n' i += 1 return ustr def print_simple_type(ctx, module, fd, indent, type, parent, attrstr, descr): d.pr(7, "print_simple_type() call") def gen_new_typedef(module, new_type): i = 0 name = "t" + str(i) all_typedefs = module.search('typedef') + module.i_local_typedefs + \ module.i_gen_typedef while util.attrsearch(name, 'arg', all_typedefs): i = i + 1 name = "t" + str(i) typedef = statements.Statement(module, module, new_type.pos, 'typedef', name) typedef.substmts.append(new_type) module.i_gen_typedef.append(typedef) return name if type.search('bit') != []: fd.write(indent + '\n' % attrstr) print_description(fd, indent + ' ', descr) fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') for bit in type.search('bit'): fd.write(indent + ' \n' % quoteattr(bit.arg)) fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + '\n') return fd.write(indent + '\n' % attrstr) print_description(fd, indent + ' ', descr) if type.arg in yang_to_xsd_types: base = xsd_type_name(ctx, type, parent) elif type.arg == 'leafref': base = xsd_type_name(ctx, type, parent) elif type.search('enum') != []: base = 'xs:string' else: base = xsd_type_name(ctx, type, parent) if ((type.search_one('length') != None) and (type.search('pattern') != [])): # this type has both length and pattern, which isn't allowed # in XSD. we solve this by generating a dummy type with the # pattern only, derive from it new_type = type.copy() # remove length and generate the base type with pattern restriction length_stmt = new_type.search_one('length') new_type.substmts.remove(length_stmt) new_type.i_lengths = [] if ctx.i_pass == 'first': base = '' else: base = gen_new_typedef(module, new_type) # reset type new_type = type.copy() # remove patterns and keep the length restriction for p in new_type.search('pattern'): new_type.substmts.remove(p) type = new_type if (len(type.i_lengths) > 1 or len(type.i_ranges) > 1): if type.i_typedef != None: parent = type.i_typedef.search_one('type') if (len(parent.i_lengths) > 1 or len(parent.i_ranges) > 1): # the parent type is translated into a union, and we need # to use a new length facet - this isn't allowed by XSD. # but we make the observation that the length facet in the # parent isn't needed anymore, so we use the parent's parent # as base type, unless the parent's parent has pattern # restrictions also, in which case we generate a new typedef # w/o the lengths if parent.search('pattern') != []: # we have to generate a new derived type with the # pattern restriction only new_type = parent.copy() # remove length length_stmt = new_type.search_one('length') new_type.substmts.remove(length_stmt) new_type.i_lengths = [] # type might be in another module, so we might need to add # a prefix. further, its base type might be in yet another # module, so we might need to change its base type's # name if type.arg.find(":") != -1: [prefix, _name] = type.arg.split(':', 1) if new_type.arg.find(":") == -1: new_type.arg = prefix + ":" + new_type.arg else: # complex case. the other type has a prefix, i.e. # is imported. we might not even import that # module. we have to add an import in order to # cover this case properly [newprefix, newname] = new_type.arg.split(':', 1) (newmodname, newrevision) = \ new_type.i_module.i_prefixes[newprefix] # first, check if we already have the module # imported newprefix = util.dictsearch((newmodname, newrevision), module.i_prefixes) if newprefix != None: # it is imported, use our prefix new_type.arg = newprefix + ':' + newname else: newprefix = gen_new_import(module, newmodname, newrevision) newmod = ctx.get_module(newmodname, newrevision) newmod.i_xsd_prefix = newprefix if ctx.i_pass == 'first': base = '' else: base = gen_new_typedef(module, new_type) else: base = parent.arg fd.write(indent + ' \n') if type.search_one('length') != None: for (lo, hi) in type.i_lengths: fd.write(indent + ' \n') fd.write(indent + ' \n' % base) if hi == None: # FIXME: we don't generate length here currently, # b/c libxml segfaults if base also has min/maxLength... # fd.write(indent + # ' \n' % lo) hi = lo if lo not in ['min', 'max']: fd.write(indent + ' \n' % lo) if hi not in ['min', 'max']: fd.write(indent + ' \n' % hi) fd.write(indent + ' \n') fd.write(indent + ' \n') elif type.search_one('range') != None: for (lo, hi) in type.i_ranges: fd.write(indent + ' \n') fd.write(indent + ' \n' % base) if lo not in ['min', 'max']: fd.write(indent + ' ' '\n' % lo) if hi == None: hi = lo if hi not in ['min', 'max']: fd.write(indent + ' ' '\n' % hi) fd.write(indent + ' \n') fd.write(indent + ' \n') fd.write(indent + ' \n') elif type.search('type') != []: fd.write(indent + ' \n') for t in type.search('type'): print_simple_type(ctx, module, fd, indent + ' ', t, type, '', None) fd.write(indent + ' \n') elif type.search('pattern') != []: def print_pattern(indent, patstr): fd.write(indent + ' \n') def print_ored_patterns(patterns, indent): if len(patterns) == 1: patstr = patterns[0].arg else: patstr = '' for p in patterns[:-1]: patstr += '(' + p.arg + ')|' patstr += '(' + patterns[-1].arg + ')' fd.write(indent + ' \n' % base) print_pattern(indent, patstr) fd.write(indent + ' \n') print_ored_patterns(type.search('pattern'), indent) else: fd.write(indent + ' \n' % base) if len(type.search('enum')) > 0: for e in type.search('enum'): fd.write(indent + ' \n' % quoteattr(e.arg)) elif type.search_one('length') != None: # other cases in union above [(lo, hi)] = type.i_lengths if lo == hi and False: # FIXME: we don't generate length here currently, # b/c libxml segfaults if base also has min/maxLength fd.write(indent + ' \n' % lo) else: if lo not in ['min', 'max']: fd.write(indent + ' \n' % lo) if hi == None: hi = lo if hi not in ['min', 'max']: fd.write(indent + ' \n' % hi) elif type.search_one('range') != None: [(lo, hi)] = type.i_ranges # other cases in union above if lo not in ['min', 'max']: fd.write(indent + ' \n' % lo) if hi == None: hi = lo if hi not in ['min', 'max']: fd.write(indent + ' \n' % hi) elif type.search_one('fraction-digits') is not None: fd.write(indent + ' \n' % type.search_one('fraction-digits').arg) fd.write(indent + ' \n') fd.write(indent + '\n') def print_annotation(ctx, fd, indent, obj, prestr=''): def is_appinfo(keyword): if util.is_prefixed(keyword) == True: return False (argname, argiselem, argappinfo) = yang_keywords[keyword] return argappinfo def do_print(indent, stmt): keyword = stmt.keyword (argname, argiselem, argappinfo) = yang_keywords[keyword] if argname == None: fd.write(indent + '\n') elif argiselem == False: # print argument as an attribute attrstr = argname + '=' + quoteattr(stmt.arg) if len(stmt.substmts) == 0: fd.write(indent + '\n') else: fd.write(indent + '\n') for s in stmt.substmts: do_print(indent + ' ', s) fd.write(indent + '\n') else: # print argument as an element fd.write(indent + '\n') fd.write(indent + ' \n') fd.write(fmt_text(indent + ' ', stmt.arg)) fd.write('\n' + indent + ' \n') for s in stmt.substmts: do_print(indent + ' ', s) fd.write(indent + '\n') stmts = [s for s in obj.substmts if is_appinfo(s.keyword)] if ((ctx.opts.xsd_no_appinfo == False and len(stmts) > 0) or obj.search_one('description') != None): fd.write(prestr) fd.write(indent + '\n') if obj.search_one('description') != None: fd.write(indent + ' \n') fd.write(fmt_text(indent + ' ', obj.search_one('description').arg) + '\n') fd.write(indent + ' \n') if ctx.opts.xsd_no_appinfo == False: fd.write(indent + ' \n') for stmt in stmts: do_print(indent + ' ', stmt) fd.write(indent + ' \n') fd.write(indent + '\n') return True else: return False # FIXME: I don't thik this is strictly correct - we should really just # print the string as-is, since whitespace in XSD is significant. def fmt_text(indent, data): res = [] for line in re.split("(\n)", escape(data)): if line == '': continue if line == '\n': res.extend(line) else: res.extend(indent + line) return ''.join(res)