| #!/usr/bin/env python3 |
| # SPDX-License-Identifier: LGPL-2.1-or-later |
| # |
| # systemd-sysv-generator integration test |
| # |
| # © 2015 Canonical Ltd. |
| # Author: Martin Pitt <martin.pitt@ubuntu.com> |
| |
| import collections |
| import os |
| import shutil |
| import subprocess |
| import sys |
| import tempfile |
| import unittest |
| |
| from configparser import RawConfigParser |
| from glob import glob |
| |
| sysv_generator = './systemd-sysv-generator' |
| |
| class MultiDict(collections.OrderedDict): |
| def __setitem__(self, key, value): |
| if isinstance(value, list) and key in self: |
| self[key].extend(value) |
| else: |
| super(MultiDict, self).__setitem__(key, value) |
| |
| class SysvGeneratorTest(unittest.TestCase): |
| def setUp(self): |
| self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.') |
| self.init_d_dir = os.path.join(self.workdir, 'init.d') |
| os.mkdir(self.init_d_dir) |
| self.rcnd_dir = self.workdir |
| self.unit_dir = os.path.join(self.workdir, 'systemd') |
| os.mkdir(self.unit_dir) |
| self.out_dir = os.path.join(self.workdir, 'output') |
| os.mkdir(self.out_dir) |
| |
| def tearDown(self): |
| shutil.rmtree(self.workdir) |
| |
| # |
| # Helper methods |
| # |
| |
| def run_generator(self, expect_error=False): |
| '''Run sysv-generator. |
| |
| Fail if stderr contains any "Fail", unless expect_error is True. |
| Return (stderr, filename -> ConfigParser) pair with output to stderr and |
| parsed generated units. |
| ''' |
| env = os.environ.copy() |
| env['SYSTEMD_LOG_LEVEL'] = 'debug' |
| env['SYSTEMD_LOG_TARGET'] = 'console' |
| env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir |
| env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir |
| env['SYSTEMD_UNIT_PATH'] = self.unit_dir |
| gen = subprocess.Popen( |
| [sysv_generator, 'ignored', 'ignored', self.out_dir], |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
| universal_newlines=True, env=env) |
| (out, err) = gen.communicate() |
| if not expect_error: |
| self.assertFalse('Fail' in err, err) |
| self.assertEqual(gen.returncode, 0, err) |
| |
| results = {} |
| for service in glob(self.out_dir + '/*.service'): |
| if os.path.islink(service): |
| continue |
| try: |
| # for python3 we need here strict=False to parse multiple |
| # lines with the same key |
| cp = RawConfigParser(dict_type=MultiDict, strict=False) |
| except TypeError: |
| # RawConfigParser in python2 does not have the strict option |
| # but it allows multiple lines with the same key by default |
| cp = RawConfigParser(dict_type=MultiDict) |
| cp.optionxform = lambda o: o # don't lower-case option names |
| with open(service) as f: |
| cp.readfp(f) |
| results[os.path.basename(service)] = cp |
| |
| return (err, results) |
| |
| def add_sysv(self, fname, keys, enable=False, prio=1): |
| '''Create a SysV init script with the given keys in the LSB header |
| |
| There are sensible default values for all fields. |
| If enable is True, links will be created in the rcN.d dirs. In that |
| case, the priority can be given with "prio" (default to 1). |
| |
| Return path of generated script. |
| ''' |
| name_without_sh = fname.endswith('.sh') and fname[:-3] or fname |
| keys.setdefault('Provides', name_without_sh) |
| keys.setdefault('Required-Start', '$local_fs') |
| keys.setdefault('Required-Stop', keys['Required-Start']) |
| keys.setdefault('Default-Start', '2 3 4 5') |
| keys.setdefault('Default-Stop', '0 1 6') |
| keys.setdefault('Short-Description', 'test {} service'.format(name_without_sh)) |
| keys.setdefault('Description', 'long description for test {} service'.format(name_without_sh)) |
| script = os.path.join(self.init_d_dir, fname) |
| with open(script, 'w') as f: |
| f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n') |
| for k, v in keys.items(): |
| if v is not None: |
| f.write('#{:>20} {}\n'.format(k + ':', v)) |
| f.write('### END INIT INFO\ncode --goes here\n') |
| os.chmod(script, 0o755) |
| |
| if enable: |
| def make_link(prefix, runlevel): |
| d = os.path.join(self.rcnd_dir, 'rc{}.d'.format(runlevel)) |
| if not os.path.isdir(d): |
| os.mkdir(d) |
| os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname)) |
| |
| for rl in keys['Default-Start'].split(): |
| make_link('S%02i' % prio, rl) |
| for rl in keys['Default-Stop'].split(): |
| make_link('K%02i' % (99 - prio), rl) |
| |
| return script |
| |
| def assert_enabled(self, unit, targets): |
| '''assert that a unit is enabled in precisely the given targets''' |
| |
| all_targets = ['multi-user', 'graphical'] |
| |
| # should be enabled |
| for target in all_targets: |
| link = os.path.join(self.out_dir, '{}.target.wants'.format(target), unit) |
| if target in targets: |
| unit_file = os.readlink(link) |
| # os.path.exists() will fail on a dangling symlink |
| self.assertTrue(os.path.exists(link)) |
| self.assertEqual(os.path.basename(unit_file), unit) |
| else: |
| self.assertFalse(os.path.exists(link), |
| '{} unexpectedly exists'.format(link)) |
| |
| # |
| # test cases |
| # |
| |
| def test_nothing(self): |
| '''no input files''' |
| |
| results = self.run_generator()[1] |
| self.assertEqual(results, {}) |
| self.assertEqual(os.listdir(self.out_dir), []) |
| |
| def test_simple_disabled(self): |
| '''simple service without dependencies, disabled''' |
| |
| self.add_sysv('foo', {}, enable=False) |
| err, results = self.run_generator() |
| self.assertEqual(len(results), 1) |
| |
| # no enablement links or other stuff |
| self.assertEqual(os.listdir(self.out_dir), ['foo.service']) |
| |
| s = results['foo.service'] |
| self.assertEqual(s.sections(), ['Unit', 'Service']) |
| self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service') |
| # $local_fs does not need translation, don't expect any dependency |
| # fields here |
| self.assertEqual(set(s.options('Unit')), |
| set(['Documentation', 'SourcePath', 'Description'])) |
| |
| self.assertEqual(s.get('Service', 'Type'), 'forking') |
| init_script = os.path.join(self.init_d_dir, 'foo') |
| self.assertEqual(s.get('Service', 'ExecStart'), |
| '{} start'.format(init_script)) |
| self.assertEqual(s.get('Service', 'ExecStop'), |
| '{} stop'.format(init_script)) |
| |
| self.assertNotIn('Overwriting', err) |
| |
| def test_simple_enabled_all(self): |
| '''simple service without dependencies, enabled in all runlevels''' |
| |
| self.add_sysv('foo', {}, enable=True) |
| err, results = self.run_generator() |
| self.assertEqual(list(results), ['foo.service']) |
| self.assert_enabled('foo.service', ['multi-user', 'graphical']) |
| self.assertNotIn('Overwriting', err) |
| |
| def test_simple_escaped(self): |
| '''simple service without dependencies, that requires escaping the name''' |
| |
| self.add_sysv('foo+', {}) |
| self.add_sysv('foo-admin', {}) |
| err, results = self.run_generator() |
| self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'}) |
| self.assertNotIn('Overwriting', err) |
| |
| def test_simple_enabled_some(self): |
| '''simple service without dependencies, enabled in some runlevels''' |
| |
| self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True) |
| err, results = self.run_generator() |
| self.assertEqual(list(results), ['foo.service']) |
| self.assert_enabled('foo.service', ['multi-user']) |
| |
| def test_lsb_macro_dep_single(self): |
| '''single LSB macro dependency: $network''' |
| |
| self.add_sysv('foo', {'Required-Start': '$network'}) |
| s = self.run_generator()[1]['foo.service'] |
| self.assertEqual(set(s.options('Unit')), |
| set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants'])) |
| self.assertEqual(s.get('Unit', 'After'), 'network-online.target') |
| self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target') |
| |
| def test_lsb_macro_dep_multi(self): |
| '''multiple LSB macro dependencies''' |
| |
| self.add_sysv('foo', {'Required-Start': '$named $portmap'}) |
| s = self.run_generator()[1]['foo.service'] |
| self.assertEqual(set(s.options('Unit')), |
| set(['Documentation', 'SourcePath', 'Description', 'After'])) |
| self.assertEqual(s.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target']) |
| |
| def test_lsb_deps(self): |
| '''LSB header dependencies to other services''' |
| |
| # also give symlink priorities here; they should be ignored |
| self.add_sysv('foo', {'Required-Start': 'must1 must2', |
| 'Should-Start': 'may1 ne_may2'}, |
| enable=True, prio=40) |
| self.add_sysv('must1', {}, enable=True, prio=10) |
| self.add_sysv('must2', {}, enable=True, prio=15) |
| self.add_sysv('may1', {}, enable=True, prio=20) |
| # do not create ne_may2 |
| err, results = self.run_generator() |
| self.assertEqual(sorted(results), |
| ['foo.service', 'may1.service', 'must1.service', 'must2.service']) |
| |
| # foo should depend on all of them |
| self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()), |
| ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service']) |
| |
| # other services should not depend on each other |
| self.assertFalse(results['must1.service'].has_option('Unit', 'After')) |
| self.assertFalse(results['must2.service'].has_option('Unit', 'After')) |
| self.assertFalse(results['may1.service'].has_option('Unit', 'After')) |
| |
| def test_symlink_prio_deps(self): |
| '''script without LSB headers use rcN.d priority''' |
| |
| # create two init.d scripts without LSB header and enable them with |
| # startup priorities |
| for prio, name in [(10, 'provider'), (15, 'consumer')]: |
| with open(os.path.join(self.init_d_dir, name), 'w') as f: |
| f.write('#!/bin/init-d-interpreter\ncode --goes here\n') |
| os.fchmod(f.fileno(), 0o755) |
| |
| d = os.path.join(self.rcnd_dir, 'rc2.d') |
| if not os.path.isdir(d): |
| os.mkdir(d) |
| os.symlink('../init.d/' + name, os.path.join(d, 'S{:>2}{}'.format(prio, name))) |
| |
| err, results = self.run_generator() |
| self.assertEqual(sorted(results), ['consumer.service', 'provider.service']) |
| self.assertFalse(results['provider.service'].has_option('Unit', 'After')) |
| self.assertEqual(results['consumer.service'].get('Unit', 'After'), |
| 'provider.service') |
| |
| def test_multiple_provides(self): |
| '''multiple Provides: names''' |
| |
| self.add_sysv('foo', {'Provides': 'foo bar baz'}) |
| err, results = self.run_generator() |
| self.assertEqual(list(results), ['foo.service']) |
| self.assertEqual(set(results['foo.service'].options('Unit')), |
| set(['Documentation', 'SourcePath', 'Description'])) |
| # should create symlinks for the alternative names |
| for f in ['bar.service', 'baz.service']: |
| self.assertEqual(os.readlink(os.path.join(self.out_dir, f)), |
| 'foo.service') |
| self.assertNotIn('Overwriting', err) |
| |
| def test_provides_escaped(self): |
| '''a script that Provides: a name that requires escaping''' |
| |
| self.add_sysv('foo', {'Provides': 'foo foo+'}) |
| err, results = self.run_generator() |
| self.assertEqual(list(results), ['foo.service']) |
| self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')), |
| 'foo.service') |
| self.assertNotIn('Overwriting', err) |
| |
| def test_same_provides_in_multiple_scripts(self): |
| '''multiple init.d scripts provide the same name''' |
| |
| self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1) |
| self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2) |
| err, results = self.run_generator() |
| self.assertEqual(sorted(results), ['bar.service', 'foo.service']) |
| # should create symlink for the alternative name for either unit |
| self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')), |
| ['foo.service', 'bar.service']) |
| |
| def test_provide_other_script(self): |
| '''init.d scripts provides the name of another init.d script''' |
| |
| self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True) |
| self.add_sysv('bar', {'Provides': 'bar'}, enable=True) |
| err, results = self.run_generator() |
| self.assertEqual(sorted(results), ['bar.service', 'foo.service']) |
| # we do expect an overwrite here, bar.service should overwrite the |
| # alias link from foo.service |
| self.assertIn('Overwriting', err) |
| |
| def test_nonexecutable_script(self): |
| '''ignores non-executable init.d script''' |
| |
| os.chmod(self.add_sysv('foo', {}), 0o644) |
| err, results = self.run_generator() |
| self.assertEqual(results, {}) |
| |
| def test_sh_suffix(self): |
| '''init.d script with .sh suffix''' |
| |
| self.add_sysv('foo.sh', {}, enable=True) |
| err, results = self.run_generator() |
| s = results['foo.service'] |
| |
| self.assertEqual(s.sections(), ['Unit', 'Service']) |
| # should not have a .sh |
| self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service') |
| |
| # calls correct script with .sh |
| init_script = os.path.join(self.init_d_dir, 'foo.sh') |
| self.assertEqual(s.get('Service', 'ExecStart'), |
| '{} start'.format(init_script)) |
| self.assertEqual(s.get('Service', 'ExecStop'), |
| '{} stop'.format(init_script)) |
| |
| self.assert_enabled('foo.service', ['multi-user', 'graphical']) |
| |
| def test_sh_suffix_with_provides(self): |
| '''init.d script with .sh suffix and Provides:''' |
| |
| self.add_sysv('foo.sh', {'Provides': 'foo bar'}) |
| err, results = self.run_generator() |
| # ensure we don't try to create a symlink to itself |
| self.assertNotIn('itself', err) |
| self.assertEqual(list(results), ['foo.service']) |
| self.assertEqual(results['foo.service'].get('Unit', 'Description'), |
| 'LSB: test foo service') |
| |
| # should create symlink for the alternative name |
| self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')), |
| 'foo.service') |
| |
| def test_hidden_files(self): |
| '''init.d script with hidden file suffix''' |
| |
| script = self.add_sysv('foo', {}, enable=True) |
| # backup files (not enabled in rcN.d/) |
| shutil.copy(script, script + '.dpkg-new') |
| shutil.copy(script, script + '.dpkg-dist') |
| shutil.copy(script, script + '.swp') |
| shutil.copy(script, script + '.rpmsave') |
| |
| err, results = self.run_generator() |
| self.assertEqual(list(results), ['foo.service']) |
| |
| self.assert_enabled('foo.service', ['multi-user', 'graphical']) |
| |
| def test_backup_file(self): |
| '''init.d script with backup file''' |
| |
| script = self.add_sysv('foo', {}, enable=True) |
| # backup files (not enabled in rcN.d/) |
| shutil.copy(script, script + '.bak') |
| shutil.copy(script, script + '.old') |
| shutil.copy(script, script + '.tmp') |
| shutil.copy(script, script + '.new') |
| |
| err, results = self.run_generator() |
| print(err) |
| self.assertEqual(sorted(results), ['foo.service', 'foo.tmp.service']) |
| |
| # ensure we don't try to create a symlink to itself |
| self.assertNotIn('itself', err) |
| |
| self.assert_enabled('foo.service', ['multi-user', 'graphical']) |
| self.assert_enabled('foo.bak.service', []) |
| self.assert_enabled('foo.old.service', []) |
| |
| def test_existing_native_unit(self): |
| '''existing native unit''' |
| |
| with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f: |
| f.write('[Unit]\n') |
| |
| self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True) |
| err, results = self.run_generator() |
| self.assertEqual(list(results), []) |
| # no enablement or alias links, as native unit is disabled |
| self.assertEqual(os.listdir(self.out_dir), []) |
| |
| |
| if __name__ == '__main__': |
| unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2)) |