Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/bulk_address_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import panos.firewall
import panos.objects


HOSTNAME = "127.0.0.1"
USERNAME = "admin"
PASSWORD = "admin"
Expand Down
1 change: 0 additions & 1 deletion examples/ensure_security_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import panos.firewall
import panos.policies


HOSTNAME = "127.0.0.1"
USERNAME = "admin"
PASSWORD = "admin"
Expand Down
1 change: 1 addition & 0 deletions examples/prisma_access_create_remote_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
To use the script, you need to replace the variables below with desired values.

"""

import logging
import os
import sys
Expand Down
3 changes: 2 additions & 1 deletion examples/prisma_access_list_RN_regions_bw.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
prisma_access_list_RN_regions_bw.py
==========

This script is an example on how to retrieve list of prisma access
This script is an example on how to retrieve list of prisma access
remote networks locations and bandwidth allocation and print it.

"""

__author__ = "bmigette"


Expand Down
3 changes: 2 additions & 1 deletion examples/prisma_access_show_jobs_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
prisma_access_show_jobs_status.py
==========

This script is an example on how to retrieve list of prisma access
This script is an example on how to retrieve list of prisma access
jobs (commit and push), and how to get details of a specific job

"""

__author__ = "bmigette"


Expand Down
3 changes: 2 additions & 1 deletion examples/prisma_access_show_remote_net_per_tenant.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
prisma_access_show_remote_net_per_tenant.py
==========

This script is an example on how to retrieve list of prisma access
This script is an example on how to retrieve list of prisma access
tenants and their remote networks

"""

__author__ = "bmigette"


Expand Down
11 changes: 7 additions & 4 deletions panos/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import time
import xml.dom.minidom as minidom
import xml.etree.ElementTree as ET
from xml.sax.saxutils import escape as xml_escape

import pan.commit
import pan.xapi
Expand Down Expand Up @@ -3386,7 +3387,7 @@ def create_import(self, vsys=None):

if vsys != "shared" and vsys is not None and self.XPATH_IMPORT is not None:
xpath = self.xpath_import_base(vsys)
element = "<member>{0}</member>".format(self.uid)
element = "<member>{0}</member>".format(xml_escape(self.uid))
device = self.nearest_pandevice()
device.active().xapi.set(xpath, element, retry_on_peer=True)

Expand Down Expand Up @@ -3830,9 +3831,11 @@ def make_method(cls, super_method_name, super_method):
def method(self, *args, **kwargs):
retry_on_peer = kwargs.pop(
"retry_on_peer",
True
if super_method_name not in ("keygen", "op", "ad_hoc", "export")
else False,
(
True
if super_method_name not in ("keygen", "op", "ad_hoc", "export")
else False
),
)
apply_on_peer = kwargs.pop("apply_on_peer", False)
ha_peer = self.pan_device.ha_peer
Expand Down
15 changes: 10 additions & 5 deletions panos/userid.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import xml.etree.ElementTree as ET
from copy import deepcopy
from xml.sax.saxutils import escape, quoteattr

from pan.xapi import PanXapiError

Expand Down Expand Up @@ -550,7 +551,7 @@ def get_groups(self, style=None):
"<show><user><group><list>",
]
if style is not None:
msg.append("<entry name='{0}'/>".format(style))
msg.append("<entry name={0}/>".format(quoteattr(style)))
msg.append("</list></group></user></show>")
cmd = "".join(msg)
vsys = self.device.vsys or "vsys1"
Expand Down Expand Up @@ -594,7 +595,11 @@ def get_group_members(self, group):
list

"""
cmd = "<show><user><group><name>" + group + "</name></group></user></show>"
cmd = (
"<show><user><group><name>"
+ escape(group)
+ "</name></group></user></show>"
)
vsys = self.device.vsys or "vsys1"

resp = self.device.op(cmd, vsys=vsys, cmd_xml=False)
Expand Down Expand Up @@ -644,12 +649,12 @@ def get_user_tags(self, user=None, prefix=None):
if user is None:
msg.append(
"<all>"
+ "<limit>{0}</limit>".format(limit)
+ "<start-point>{0}</start-point>".format(start)
+ "<limit>{0}</limit>".format(escape(str(limit)))
+ "<start-point>{0}</start-point>".format(escape(str(start)))
+ "</all>"
)
else:
msg.append("<user>{0}</user>".format(user))
msg.append("<user>{0}</user>".format(escape(user)))
msg.append("</registered-user></object></show>")

cmd = ET.fromstring("".join(msg))
Expand Down
1 change: 0 additions & 1 deletion tests/live/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from panos import firewall
from panos import panorama


live_devices = {}
one_fw_per_version = []
one_device_type_per_version = []
Expand Down
28 changes: 27 additions & 1 deletion tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import pan.xapi
import panos.base as Base
import panos.errors as Err

import panos.firewall
import panos.network

OBJECT_NAME = "MyObjectName"
VSYS = "vsys1"
Expand Down Expand Up @@ -1701,5 +1702,30 @@ def test_times_out(self, mocksleep):
assert mocksleep.call_count == 0


class TestVsysOperationsCreateImport(unittest.TestCase):
def test_create_import_escapes_uid(self):
evil_name = "zone&<bad>"
fw = panos.firewall.Firewall(
"fw1", "user", "passwd", "authkey", serial="S", vsys="vsys1"
)
vlan = panos.network.Vlan(evil_name)
fw.add(vlan)

with mock.patch.object(
panos.network.Vlan,
"XPATH_IMPORT",
new_callable=mock.PropertyMock,
return_value="/network/vlan",
):
fw.xapi.set = mock.Mock()
vlan.create_import("vsys1")

args, _ = fw.xapi.set.call_args
element = args[1]
self.assertEqual(element, "<member>zone&amp;&lt;bad&gt;</member>")
parsed = ET.fromstring(element)
self.assertEqual(parsed.text, evil_name)


if __name__ == "__main__":
unittest.main()
3 changes: 1 addition & 2 deletions tests/test_classic_objects.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
""" Tests specifically for classic objects.
"""Tests specifically for classic objects.

Note: All tests in this file are for classic objects. These are to try and
make sure that the fix for classic objects with a self.NAME == None still
work properly.

"""


from panos import device


Expand Down
1 change: 0 additions & 1 deletion tests/test_device_profile_xpaths.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
from panos.panorama import Panorama
from panos.panorama import Template


OBJECTS = {
SnmpServerProfile: [None, SnmpV2cServer, SnmpV3Server],
EmailServerProfile: [
Expand Down
1 change: 0 additions & 1 deletion tests/test_opstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from panos.policies import SecurityRule
from panos.policies import AuditCommentLog


HIT_COUNT_PREFIX = """
<response>
<result>
Expand Down
1 change: 0 additions & 1 deletion tests/test_predefined.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from panos.objects import ServiceObject
from panos.objects import Tag


PREDEFINED_CONFIG = {
ApplicationContainer: {
"single": "refresh_application",
Expand Down
1 change: 0 additions & 1 deletion tests/test_standards.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from panos import predefined
from panos import plugins


# Versioning constants.
#
# When checking versioning, this is the highest major version to check for
Expand Down
58 changes: 58 additions & 0 deletions tests/test_userid.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import mock
import sys
import unittest
import xml.etree.ElementTree as ET

import panos.firewall
import panos.panorama
Expand Down Expand Up @@ -99,6 +100,63 @@ def test_batch_untag_user(self):
],
)

def test_get_user_tags_escapes_user(self):
evil = "admin</user><all><limit>999999</limit></all><user>x"
fw = panos.firewall.Firewall(
"fw1", "user", "passwd", "authkey", serial="Serial", vsys="vsys1"
)
empty_response = ET.fromstring(
b"<response status='success'><result/></response>"
)
fw.op = mock.Mock(return_value=empty_response)

fw.userid.get_user_tags(user=evil)

sent = fw.op.call_args[1]["cmd"]
parsed = ET.fromstring(sent)
users = parsed.findall("./object/registered-user/user")
self.assertEqual(len(users), 1)
self.assertEqual(users[0].text, evil)
self.assertIsNone(parsed.find("./object/registered-user/all"))

def test_get_groups_escapes_style(self):
evil = "x'/><evil/><entry name='y"
fw = panos.firewall.Firewall(
"fw1", "user", "passwd", "authkey", serial="Serial", vsys="vsys1"
)
empty_response = ET.fromstring(
b"<response status='success'><result>\nTotal: 0\n</result></response>"
)
fw.op = mock.Mock(return_value=empty_response)

fw.userid.get_groups(style=evil)

sent = fw.op.call_args[0][0]
parsed = ET.fromstring(sent)
entries = parsed.findall("./user/group/list/entry")
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].attrib["name"], evil)
self.assertIsNone(parsed.find(".//evil"))

def test_get_group_members_escapes_group(self):
evil = "g</name><evil/><name>x"
fw = panos.firewall.Firewall(
"fw1", "user", "passwd", "authkey", serial="Serial", vsys="vsys1"
)
empty_response = ET.fromstring(
b"<response status='success'><result>\nTotal: 0\n</result></response>"
)
fw.op = mock.Mock(return_value=empty_response)

fw.userid.get_group_members(evil)

sent = fw.op.call_args[0][0]
parsed = ET.fromstring(sent)
names = parsed.findall("./user/group/name")
self.assertEqual(len(names), 1)
self.assertEqual(names[0].text, evil)
self.assertIsNone(parsed.find(".//evil"))


if __name__ == "__main__":
unittest.main()
Loading