--- /dev/null
+ORAN CSAR Package validating Tool
+---------------------------------
+ORAN CSAR package validating tool provides the testers and DevOps engineers to validate the TOSCA version, CSAR version and verify if the yaml file exists in the directory and the enttry definition contents of yaml file.
+
+Source code:
+
+Tool file contents:
+
+main.py
+csar.py
+Toscameta.py
+utlis.py
+
+Usage:
+
+python3 main.py <validate function> -d <destination> [--no-verify-cert] <source>
+
+<source> : source directory where the csar package zip file exists.
+<destination> : destination directory to unzip the csar contents.
+[--no-verify-cert] : Optional parameter.
+
+
+example:
+
+python3 main.py csar-validate -d /tmp/ [--no-verify-cert] CSAR-dest/vnf-vsn.csar
+
+The ORAN CSAR package validation code leverages from vnfsdk CSAR pkg code with modification.
--- /dev/null
+
+import logging
+import os
+import tempfile
+import zipfile
+
+import requests
+from ruamel import yaml
+
+import toscameta
+import utils
+
+LOG = logging.getLogger(__name__)
+
+class _CSARReader(object):
+
+ def __init__(self, source, destination, no_verify_cert=True):
+ if os.path.isdir(destination) and os.listdir(destination):
+ raise ValueError('{0} already exists and is not empty. '
+ 'Please specify the location where the CSAR '
+ 'should be extracted.'.format(destination))
+ downloaded_csar = '://' in source
+ if downloaded_csar:
+ file_descriptor, download_target = tempfile.mkstemp()
+ os.close(file_descriptor)
+ self._download(source, download_target)
+ source = download_target
+ self.source = os.path.expanduser(source)
+ self.destination = os.path.expanduser(destination)
+ self.metadata = None
+ self.manifest = None
+ try:
+ if not os.path.exists(self.source):
+ raise ValueError('{0} does not exists. Please specify a valid CSAR path.'
+ .format(self.source))
+ if not zipfile.is_zipfile(self.source):
+ raise ValueError('{0} is not a valid CSAR.'.format(self.source))
+ self._extract()
+ self._read_metadata()
+ finally:
+ if downloaded_csar:
+ os.remove(self.source)
+
+ @property
+ def created_by(self):
+ return self.metadata.created_by
+
+ @property
+ def csar_version(self):
+ return self.metadata.csar_version
+
+ @property
+ def meta_file_version(self):
+ return self.metadata.meta_file_version
+
+ @property
+ def entry_definitions(self):
+ return self.metadata.entry_definitions
+
+ @property
+ def entry_definitions_yaml(self):
+ with open(os.path.join(self.destination, self.entry_definitions)) as f:
+ return yaml.safe_load(f)
+
+ @property
+ def entry_manifest_file(self):
+ return self.metadata.entry_manifest_file
+
+ @property
+ def entry_history_file(self):
+ return self.metadata.entry_history_file
+
+ @property
+ def entry_tests_dir(self):
+ return self.metadata.entry_tests_dir
+
+ @property
+ def entry_licenses_dir(self):
+ return self.metadata.entry_licenses_dir
+
+ @property
+ def entry_certificate_file(self):
+ return self.metadata.entry_certificate_file
+
+ def _extract(self):
+ LOG.debug('Extracting CSAR contents')
+ if not os.path.exists(self.destination):
+ os.mkdir(self.destination)
+ with zipfile.ZipFile(self.source) as f:
+ f.extractall(self.destination)
+ LOG.debug('CSAR contents successfully extracted')
+
+ def _read_metadata(self):
+ self.metadata = toscameta.create_from_file(self.destination)
+
+ def _download(self, url, target):
+ response = requests.get(url, stream=True)
+ if response.status_code != 200:
+ raise ValueError('Server at {0} returned a {1} status code'
+ .format(url, response.status_code))
+ LOG.info('Downloading {0} to {1}'.format(url, target))
+ with open(target, 'wb') as f:
+ for chunk in response.iter_content(chunk_size=8192):
+ if chunk:
+ f.write(chunk)
+
+
+def read(source, destination, no_verify_cert=False):
+ return _CSARReader(source=source,
+ destination=destination,
+ no_verify_cert=no_verify_cert)
+
--- /dev/null
+
+import sys
+import logging
+import argparse
+import shutil
+import tempfile
+
+import pkg_resources
+
+import csar
+
+LOG = logging.getLogger(__name__)
+
+def csar_validate_func(namespace):
+ try:
+ csar.read(namespace.source,
+ namespace.destination,
+ namespace.no_verify_cert)
+ finally:
+ LOG.debug('Calling rmtree: {}'.format(namespace.destination))
+ shutil.rmtree(namespace.destination, ignore_errors=True)
+
+def parse_args(args_list):
+ """
+ Entry point
+ """
+ parser = argparse.ArgumentParser(description='CSAR Validation tool')
+ parser.add_argument('-v', '--verbose',
+ dest='verbosity',
+ action='count',
+ default=0,
+ help='Set verbosity level (can be passed multiple times)')
+
+ subparsers = parser.add_subparsers(help='csar-validate')
+
+ csar_open = subparsers.add_parser('csar-validate')
+ csar_open.set_defaults(func=csar_validate_func)
+ csar_open.add_argument(
+ 'source',
+ help='CSAR file location')
+ csar_open.add_argument(
+ '-d', '--destination',
+ help='Output directory to extract the CSAR into',
+ required=True)
+ csar_open.add_argument(
+ '--no-verify-cert',
+ action='store_true',
+ help="Do NOT verify the signer's certificate")
+
+ return parser.parse_args(args_list)
+
+
+def init_logging():
+# verbosity = [logging.WARNING, logging.INFO, logging.DEBUG]
+#
+ logging.basicConfig(level=logging.INFO)
+
+def main():
+ args = parse_args(sys.argv[1:])
+# logging.basicConfig(level=logging.DEBUG)
+ init_logging()
+ return args.func(args)
+
+
+if __name__ == '__main__':
+ main()
+
--- /dev/null
+install python3-pip
+pip3 install ruamel.yaml
+pip3 install udatetime
--- /dev/null
+
+import logging
+import os
+import pprint
+
+from ruamel import yaml
+import six
+
+import utils
+
+LOG = logging.getLogger(__name__)
+
+META_FILE = 'TOSCA-Metadata/TOSCA.meta'
+
+META_FILE_VERSION_KEY = 'TOSCA-Meta-File-Version'
+META_FILE_VERSION_VALUE = '1.0'
+META_CSAR_VERSION_KEY = 'CSAR-Version'
+META_CSAR_VERSION_VALUE = '1.1'
+META_CREATED_BY_KEY = 'Created-By'
+META_CREATED_BY_VALUE = 'ORAN'
+
+META_ENTRY_DEFINITIONS_KEY = 'Entry-Definitions'
+
+BASE_META = {
+ META_FILE_VERSION_KEY: META_FILE_VERSION_VALUE,
+ META_CSAR_VERSION_KEY: META_CSAR_VERSION_VALUE,
+}
+
+
+class ToscaMeta(object):
+ META_ENTRY_MANIFEST_FILE_KEY = 'ETSI-Entry-Manifest'
+ META_ENTRY_HISTORY_FILE_KEY = 'ETSI-Entry-Change-Log'
+ META_ENTRY_TESTS_DIR_KEY = 'ETSI-Entry-Tests'
+ META_ENTRY_LICENSES_DIR_KEY = 'ETSI-Entry-Licenses'
+ META_ENTRY_CERT_FILE_KEY = 'ETSI-Entry-Certificate'
+ REQUIRED_KEYS = [
+ META_FILE_VERSION_KEY, META_CSAR_VERSION_KEY,
+ META_CREATED_BY_KEY, META_ENTRY_DEFINITIONS_KEY,
+ META_ENTRY_MANIFEST_FILE_KEY, META_ENTRY_HISTORY_FILE_KEY,
+ META_ENTRY_LICENSES_DIR_KEY, ]
+ OPTIONAL_KEYS = [META_ENTRY_TESTS_DIR_KEY, META_ENTRY_CERT_FILE_KEY]
+
+ def __init__(self, base_dir, entry, manifest=None, changelog=None,
+ licenses=None, tests=None, certificate=None,
+ meta_file_version=META_FILE_VERSION_VALUE,
+ meta_csar_version=META_CSAR_VERSION_VALUE,
+ meta_created_by=META_CREATED_BY_VALUE):
+
+ self.base_dir = base_dir
+
+ metadata = {}
+ metadata[META_FILE_VERSION_KEY] = str(meta_file_version)
+ metadata[META_CSAR_VERSION_KEY] = str(meta_csar_version)
+ metadata[META_CREATED_BY_KEY] = meta_created_by
+ metadata[META_ENTRY_DEFINITIONS_KEY] = entry
+
+ self.metadata = self._validate(metadata)
+
+ def _validate(self, metadata):
+ LOG.debug('CSAR Validating metadata file: {0}'.format(metadata))
+ for (key, value) in six.iteritems(BASE_META):
+ LOG.info('TOSCA.meta: {} {}'.format(key, value))
+ LOG.info('get.meta: {} '.format(metadata.get(key)))
+ if metadata.get(key) != value:
+ raise ValueError('TOSCA.meta: {} must be {}'.format(key, value))
+
+ utils.check_file_dir(root=self.base_dir,
+ entry=metadata.get(META_ENTRY_DEFINITIONS_KEY),
+ msg='Please specify a valid entry point.',
+ check_dir=False)
+ entry_file = os.path.join(self.base_dir,
+ metadata.get(META_ENTRY_DEFINITIONS_KEY))
+ LOG.info('entry file: {} '.format(entry_file))
+ try:
+ with open(entry_file) as f:
+ yaml.safe_load(f)['tosca_definitions_version']
+ except Exception:
+ raise ValueError('Entry file {} is not a valid tosca simple yaml file'.format(entry_file))
+ return metadata
+
+ def dump_as_string(self):
+ s = ""
+ for key in self.REQUIRED_KEYS + self.OPTIONAL_KEYS:
+ if self.metadata.get(key):
+ s += "{}: {}\n".format(key, self.metadata.get(key))
+ return s
+
+ @property
+ def created_by(self):
+ return self.metadata.get(META_CREATED_BY_KEY)
+
+ @property
+ def csar_version(self):
+ return self.metadata.get(META_CSAR_VERSION_KEY)
+
+ @property
+ def meta_file_version(self):
+ return self.metadata.get(META_FILE_VERSION_KEY)
+
+ @property
+ def entry_definitions(self):
+ return self.metadata.get(META_ENTRY_DEFINITIONS_KEY)
+
+ @property
+ def entry_manifest_file(self):
+ return self.metadata.get(self.META_ENTRY_MANIFEST_FILE_KEY)
+
+ @property
+ def entry_history_file(self):
+ return self.metadata.get(self.META_ENTRY_HISTORY_FILE_KEY)
+
+ @property
+ def entry_tests_dir(self):
+ return self.metadata.get(self.META_ENTRY_TESTS_DIR_KEY)
+
+ @property
+ def entry_licenses_dir(self):
+ return self.metadata.get(self.META_ENTRY_LICENSES_DIR_KEY)
+
+ @property
+ def entry_certificate_file(self):
+ return self.metadata.get(self.META_ENTRY_CERT_FILE_KEY)
+
+
+class ToscaMeta241(ToscaMeta):
+ # SOL004 v2.4.1
+ META_ENTRY_MANIFEST_FILE_KEY = 'Entry-Manifest'
+ META_ENTRY_HISTORY_FILE_KEY = 'Entry-Change-Log'
+ META_ENTRY_TESTS_DIR_KEY = 'Entry-Tests'
+ META_ENTRY_LICENSES_DIR_KEY = 'Entry-Licenses'
+ META_ENTRY_CERT_FILE_KEY = 'Entry-Certificate'
+ REQUIRED_KEYS = [
+ META_FILE_VERSION_KEY, META_CSAR_VERSION_KEY,
+ META_CREATED_BY_KEY, META_ENTRY_DEFINITIONS_KEY, ]
+ OPTIONAL_KEYS = [
+ META_ENTRY_MANIFEST_FILE_KEY, META_ENTRY_HISTORY_FILE_KEY,
+ META_ENTRY_LICENSES_DIR_KEY, META_ENTRY_TESTS_DIR_KEY,
+ META_ENTRY_CERT_FILE_KEY, ]
+
+
+class ToscaMeta261(ToscaMeta):
+ # SOL004 v2.6.1
+ pass
+
+
+def create_from_file(base_dir):
+ csar_metafile = os.path.join(base_dir, META_FILE)
+ if not os.path.exists(csar_metafile):
+ raise ValueError('Metadata file {0} is missing from the CSAR'.format(csar_metafile))
+ LOG.debug('CSAR metadata file: {0}'.format(csar_metafile))
+ LOG.debug('Attempting to parse CSAR metadata YAML')
+ with open(csar_metafile) as f:
+ metadata = yaml.safe_load(f)
+ LOG.debug('CSAR metadata:\n{0}'.format(pprint.pformat(metadata)))
+ # By default we assume it's SOL004 2.4.1
+ cls = ToscaMeta241
+ for key in metadata.keys():
+ if key.startswith('ETSI-'):
+ cls = ToscaMeta261
+ break
+ return cls(base_dir,
+ entry=metadata.get(META_ENTRY_DEFINITIONS_KEY),
+ manifest=metadata.get(cls.META_ENTRY_MANIFEST_FILE_KEY),
+ changelog=metadata.get(cls.META_ENTRY_HISTORY_FILE_KEY),
+ licenses=metadata.get(cls.META_ENTRY_LICENSES_DIR_KEY),
+ tests=metadata.get(cls.META_ENTRY_TESTS_DIR_KEY),
+ certificate=metadata.get(cls.META_ENTRY_CERT_FILE_KEY),
+ meta_file_version=metadata.get(META_FILE_VERSION_KEY),
+ meta_csar_version=metadata.get(META_CSAR_VERSION_KEY),
+ meta_created_by=metadata.get(META_CREATED_BY_KEY))
+
--- /dev/null
+#!/bin/sh
+
+echo
+echo "Validating TOSCA Version(Tosca-ver-vnf-vsn.csar) Unit test case 1"
+python3 ../main.py csar-validate -d /tmp/vh2/ --no-verify-cert ../CSAR-dest/Tosca-ver-vnf-vsn.csar
+if [ $? -eq 0 ]
+then
+ echo "TOSCA Unit test case 1 Passed"
+else
+ echo "TOSCA Unit test case 1 Failed"
+fi
+echo
+echo "Validating TOSCA CSAR Version(Tosca-csar-ver-vnf-vsn.csar) Unit test case 2"
+echo
+python3 ../main.py csar-validate -d /tmp/vh2/ --no-verify-cert ../CSAR-dest/Tosca-csar-ver-vnf-vsn.csar
+if [ $? -eq 0 ]
+then
+ echo "TOSCA Unit test case 2 Passed"
+else
+ echo "TOSCA Unit test case 2 Failed"
+fi
+
--- /dev/null
+#!/bin/sh
+
+echo
+echo "Validating vnf-vsn.csar Unit test case 1"
+python3 ../main.py csar-validate -d /tmp/vh2/ --no-verify-cert ../CSAR-dest/vnf-vsn.csar
+if [ $? -eq 0 ]
+then
+ echo "Unit test case 1 Passed"
+else
+ echo "Unit test case 1 Failed"
+fi
+echo
+echo "Validating ns-vsn.csar unit test case 2"
+echo
+python3 ../main.py csar-validate -d /tmp/vh2/ --no-verify-cert ../CSAR-dest/ns-vsn.csar
+echo
+if [ $? -eq 0 ]
+then
+ echo "Unit test case 2 Passed"
+else
+ echo "Unit test case 2 Failed"
+fi
+echo
+
--- /dev/null
+#!/bin/sh
+
+echo
+echo "Validating YAML error(yaml-error-vnf-vsn.csar) Unit test case 1"
+python3 ../main.py csar-validate -d /tmp/vh2/ --no-verify-cert ../CSAR-dest/yaml-error-vnf-vsn.csar
+if [ $? -eq 0 ]
+then
+ echo "YAML error Unit test case 1 Passed"
+else
+ echo "YAML error Unit test case 1 Failed"
+fi
+echo
+echo "Validating YAML file error(yaml-file-error-vnf-vsn.csar) Unit test case 1"
+python3 ../main.py csar-validate -d /tmp/vh2/ --no-verify-cert ../CSAR-dest/yaml-file-error-vnf-vsn.csar
+if [ $? -eq 0 ]
+then
+ echo "YAML file error Unit test case 2 Passed"
+else
+ echo "YAML file error Unit test case 2 Failed"
+fi
+
--- /dev/null
+
+import hashlib
+from io import BytesIO
+import logging
+import os
+import os.path
+import subprocess
+import tempfile
+
+import requests
+import six
+from six.moves.urllib import parse as urlparse
+
+
+LOG = logging.getLogger(__name__)
+
+
+def check_file_dir(root, entry, msg, check_for_non=False, check_dir=False):
+ path = os.path.join(root, entry)
+ LOG.debug('Path is {} ' .format(path))
+ if check_for_non:
+ ret = not os.path.exists(path)
+ error_msg = '{0} already exists. ' + msg
+ elif check_dir:
+ ret = os.path.isdir(path)
+ error_msg = '{0} is not an existing directory. ' + msg
+ else:
+ ret = os.path.isfile(path)
+ error_msg = '{0} is not an existing file. ' + msg
+ if not ret:
+ raise ValueError(error_msg.format(path))
+
+
+def _hash_value_for_file(f, hash_function, block_size=2**20):
+ while True:
+ data = f.read(block_size)
+ if not data:
+ break
+ hash_function.update(data)
+
+ return hash_function.hexdigest()
+
+
+def cal_file_hash(root, path, algo):
+ if algo == 'SHA-256':
+ algo = 'SHA256'
+ elif algo == 'SHA-512':
+ algo = 'SHA512'
+ h = hashlib.new(algo)
+ if urlparse.urlparse(path).scheme:
+ r = requests.get(path)
+ if r.status_code != 200:
+ raise ValueError('Server at {0} returned a {1} status code'
+ .format(path, r.status_code))
+ fp = BytesIO(r.content)
+ return _hash_value_for_file(fp, h)
+ else:
+ with open(os.path.join(root, path), 'rb') as fp:
+ return _hash_value_for_file(fp, h)
+
+
+def _run_cmd(cmd, **kwargs):
+ if isinstance(cmd, list):
+ args = cmd
+ elif isinstance(cmd, str):
+ args = [cmd]
+ else:
+ raise RuntimeError("cmd must be string or list")
+
+ for key, value in six.iteritems(kwargs):
+ args.append(key)
+ if value:
+ args.append(value)
+ try:
+ LOG.debug("Executing %s", args)
+ return str(subprocess.check_output(args).decode('utf-8'))
+ except subprocess.CalledProcessError as e:
+ LOG.error("Executing %s failed with return code %d, output: %s",
+ e.cmd, e.returncode, e.output)
+ raise e
+
+
+def sign(msg_file, cert_file, key_file):
+ args = ["openssl", "cms", "-sign", "-binary"]
+ kwargs = {
+ '-in': os.path.abspath(msg_file),
+ '-signer': os.path.abspath(cert_file),
+ '-inkey': os.path.abspath(key_file),
+ '-outform': 'PEM', }
+
+ return _run_cmd(args, **kwargs)
+
+
+def verify(msg_file, cert_file, cms, no_verify_cert=False):
+ args = ["openssl", "cms", "-verify", "-binary"]
+ if no_verify_cert:
+ args.append("-noverify")
+
+ with tempfile.NamedTemporaryFile(mode='w') as f:
+ f.write(cms)
+ f.flush()
+ kwargs = {
+ '-in': f.name,
+ '-inform': 'PEM',
+ '-content': os.path.abspath(msg_file),
+ '-certfile': os.path.abspath(cert_file), }
+ return _run_cmd(args, **kwargs)
+