¡@

Home 

OpenStack Study: test_iptables_fwaas.py

OpenStack Index

**** CubicPower OpenStack Study ****

# vim: tabstop=4 shiftwidth=4 softtabstop=4

#

# Copyright 2013 Dell Inc.

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

#

# @author: Rajesh Mohan, Rajesh_Mohan3@Dell.com, DELL Inc.

import mock

from mock import call

from oslo.config import cfg

from neutron.agent.common import config as a_cfg

import neutron.services.firewall.drivers.linux.iptables_fwaas as fwaas

from neutron.tests import base

from neutron.tests.unit import test_api_v2

_uuid = test_api_v2._uuid

FAKE_SRC_PREFIX = '10.0.0.0/24'

FAKE_DST_PREFIX = '20.0.0.0/24'

FAKE_PROTOCOL = 'tcp'

FAKE_SRC_PORT = 5000

FAKE_DST_PORT = 22

FAKE_FW_ID = 'fake-fw-uuid'

**** CubicPower OpenStack Study ****

class IptablesFwaasTestCase(base.BaseTestCase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(IptablesFwaasTestCase, self).setUp()

        cfg.CONF.register_opts(a_cfg.ROOT_HELPER_OPTS, 'AGENT')

        self.utils_exec_p = mock.patch(

            'neutron.agent.linux.utils.execute')

        self.utils_exec = self.utils_exec_p.start()

        self.addCleanup(self.utils_exec_p.stop)

        self.iptables_cls_p = mock.patch(

            'neutron.agent.linux.iptables_manager.IptablesManager')

        self.iptables_cls_p.start()

        self.addCleanup(self.iptables_cls_p.stop)

        self.firewall = fwaas.IptablesFwaasDriver()

**** CubicPower OpenStack Study ****

    def _fake_rules_v4(self, fwid, apply_list):

        rule_list = []

        rule1 = {'enabled': True,

                 'action': 'allow',

                 'ip_version': 4,

                 'protocol': 'tcp',

                 'destination_port': '80',

                 'source_ip_address': '10.24.4.2'}

        rule2 = {'enabled': True,

                 'action': 'deny',

                 'ip_version': 4,

                 'protocol': 'tcp',

                 'destination_port': '22'}

        ingress_chain = ('iv4%s' % fwid)[:11]

        egress_chain = ('ov4%s' % fwid)[:11]

        for router_info_inst in apply_list:

            v4filter_inst = router_info_inst.iptables_manager.ipv4['filter']

            v4filter_inst.chains.append(ingress_chain)

            v4filter_inst.chains.append(egress_chain)

        rule_list.append(rule1)

        rule_list.append(rule2)

        return rule_list

**** CubicPower OpenStack Study ****

    def _fake_firewall_no_rule(self):

        rule_list = []

        fw_inst = {'id': FAKE_FW_ID,

                   'admin_state_up': True,

                   'tenant_id': 'tenant-uuid',

                   'firewall_rule_list': rule_list}

        return fw_inst

**** CubicPower OpenStack Study ****

    def _fake_firewall(self, rule_list):

        fw_inst = {'id': FAKE_FW_ID,

                   'admin_state_up': True,

                   'tenant_id': 'tenant-uuid',

                   'firewall_rule_list': rule_list}

        return fw_inst

**** CubicPower OpenStack Study ****

    def _fake_firewall_with_admin_down(self, rule_list):

        fw_inst = {'id': FAKE_FW_ID,

                   'admin_state_up': False,

                   'tenant_id': 'tenant-uuid',

                   'firewall_rule_list': rule_list}

        return fw_inst

**** CubicPower OpenStack Study ****

    def _fake_apply_list(self, router_count=1):

        apply_list = []

        while router_count > 0:

            iptables_inst = mock.Mock()

            v4filter_inst = mock.Mock()

            v6filter_inst = mock.Mock()

            v4filter_inst.chains = []

            v6filter_inst.chains = []

            iptables_inst.ipv4 = {'filter': v4filter_inst}

            iptables_inst.ipv6 = {'filter': v6filter_inst}

            router_info_inst = mock.Mock()

            router_info_inst.iptables_manager = iptables_inst

            apply_list.append(router_info_inst)

            router_count -= 1

        return apply_list

**** CubicPower OpenStack Study ****

    def _setup_firewall_with_rules(self, func, router_count=1):

        apply_list = self._fake_apply_list(router_count=router_count)

        rule_list = self._fake_rules_v4(FAKE_FW_ID, apply_list)

        firewall = self._fake_firewall(rule_list)

        func(apply_list, firewall)

        invalid_rule = '-m state --state INVALID -j DROP'

        est_rule = '-m state --state ESTABLISHED,RELATED -j ACCEPT'

        rule1 = '-p tcp --dport 80  -s 10.24.4.2  -j ACCEPT'

        rule2 = '-p tcp --dport 22    -j DROP'

        ingress_chain = 'iv4%s' % firewall['id']

        egress_chain = 'ov4%s' % firewall['id']

        bname = fwaas.iptables_manager.binary_name

        ipt_mgr_ichain = '%s-%s' % (bname, ingress_chain[:11])

        ipt_mgr_echain = '%s-%s' % (bname, egress_chain[:11])

        for router_info_inst in apply_list:

            v4filter_inst = router_info_inst.iptables_manager.ipv4['filter']

            calls = [call.ensure_remove_chain('iv4fake-fw-uuid'),

                     call.ensure_remove_chain('ov4fake-fw-uuid'),

                     call.ensure_remove_chain('fwaas-default-policy'),

                     call.add_chain('fwaas-default-policy'),

                     call.add_rule('fwaas-default-policy', '-j DROP'),

                     call.add_chain(ingress_chain),

                     call.add_rule(ingress_chain, invalid_rule),

                     call.add_rule(ingress_chain, est_rule),

                     call.add_chain(egress_chain),

                     call.add_rule(egress_chain, invalid_rule),

                     call.add_rule(egress_chain, est_rule),

                     call.add_rule(ingress_chain, rule1),

                     call.add_rule(egress_chain, rule1),

                     call.add_rule(ingress_chain, rule2),

                     call.add_rule(egress_chain, rule2),

                     call.add_rule('FORWARD',

                                   '-o qr-+ -j %s' % ipt_mgr_ichain),

                     call.add_rule('FORWARD',

                                   '-i qr-+ -j %s' % ipt_mgr_echain),

                     call.add_rule('FORWARD',

                                   '-o qr-+ -j %s-fwaas-defau' % bname),

                     call.add_rule('FORWARD',

                                   '-i qr-+ -j %s-fwaas-defau' % bname)]

            v4filter_inst.assert_has_calls(calls)

**** CubicPower OpenStack Study ****

    def test_create_firewall_no_rules(self):

        apply_list = self._fake_apply_list()

        firewall = self._fake_firewall_no_rule()

        self.firewall.create_firewall(apply_list, firewall)

        invalid_rule = '-m state --state INVALID -j DROP'

        est_rule = '-m state --state ESTABLISHED,RELATED -j ACCEPT'

        bname = fwaas.iptables_manager.binary_name

        for ip_version in (4, 6):

            ingress_chain = ('iv%s%s' % (ip_version, firewall['id']))

            egress_chain = ('ov%s%s' % (ip_version, firewall['id']))

            calls = [call.ensure_remove_chain('iv%sfake-fw-uuid' % ip_version),

                     call.ensure_remove_chain('ov%sfake-fw-uuid' % ip_version),

                     call.ensure_remove_chain('fwaas-default-policy'),

                     call.add_chain('fwaas-default-policy'),

                     call.add_rule('fwaas-default-policy', '-j DROP'),

                     call.add_chain(ingress_chain),

                     call.add_rule(ingress_chain, invalid_rule),

                     call.add_rule(ingress_chain, est_rule),

                     call.add_chain(egress_chain),

                     call.add_rule(egress_chain, invalid_rule),

                     call.add_rule(egress_chain, est_rule),

                     call.add_rule('FORWARD',

                                   '-o qr-+ -j %s-fwaas-defau' % bname),

                     call.add_rule('FORWARD',

                                   '-i qr-+ -j %s-fwaas-defau' % bname)]

            if ip_version == 4:

                v4filter_inst = apply_list[0].iptables_manager.ipv4['filter']

                v4filter_inst.assert_has_calls(calls)

            else:

                v6filter_inst = apply_list[0].iptables_manager.ipv6['filter']

                v6filter_inst.assert_has_calls(calls)

**** CubicPower OpenStack Study ****

    def test_create_firewall_with_rules(self):

        self._setup_firewall_with_rules(self.firewall.create_firewall)

**** CubicPower OpenStack Study ****

    def test_create_firewall_with_rules_two_routers(self):

        self._setup_firewall_with_rules(self.firewall.create_firewall,

                                        router_count=2)

**** CubicPower OpenStack Study ****

    def test_update_firewall_with_rules(self):

        self._setup_firewall_with_rules(self.firewall.update_firewall)

**** CubicPower OpenStack Study ****

    def test_delete_firewall(self):

        apply_list = self._fake_apply_list()

        firewall = self._fake_firewall_no_rule()

        self.firewall.delete_firewall(apply_list, firewall)

        ingress_chain = 'iv4%s' % firewall['id']

        egress_chain = 'ov4%s' % firewall['id']

        calls = [call.ensure_remove_chain(ingress_chain),

                 call.ensure_remove_chain(egress_chain),

                 call.ensure_remove_chain('fwaas-default-policy')]

        apply_list[0].iptables_manager.ipv4['filter'].assert_has_calls(calls)

**** CubicPower OpenStack Study ****

    def test_create_firewall_with_admin_down(self):

        apply_list = self._fake_apply_list()

        rule_list = self._fake_rules_v4(FAKE_FW_ID, apply_list)

        firewall = self._fake_firewall_with_admin_down(rule_list)

        self.firewall.create_firewall(apply_list, firewall)

        calls = [call.ensure_remove_chain('iv4fake-fw-uuid'),

                 call.ensure_remove_chain('ov4fake-fw-uuid'),

                 call.ensure_remove_chain('fwaas-default-policy'),

                 call.add_chain('fwaas-default-policy'),

                 call.add_rule('fwaas-default-policy', '-j DROP')]

        apply_list[0].iptables_manager.ipv4['filter'].assert_has_calls(calls)