test_token_plugins.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. # vim: tabstop=4 shiftwidth=4 softtabstop=4
  2. """ Unit tests for Token plugins"""
  3. import unittest
  4. from unittest.mock import patch, mock_open, MagicMock
  5. from jwcrypto import jwt
  6. from websockify.token_plugins import ReadOnlyTokenFile, JWTTokenApi
  7. class ReadOnlyTokenFileTestCase(unittest.TestCase):
  8. patch('os.path.isdir', MagicMock(return_value=False))
  9. def test_empty(self):
  10. plugin = ReadOnlyTokenFile('configfile')
  11. config = ""
  12. pyopen = mock_open(read_data=config)
  13. with patch("websockify.token_plugins.open", pyopen, create=True):
  14. result = plugin.lookup('testhost')
  15. pyopen.assert_called_once_with('configfile')
  16. self.assertIsNone(result)
  17. patch('os.path.isdir', MagicMock(return_value=False))
  18. def test_simple(self):
  19. plugin = ReadOnlyTokenFile('configfile')
  20. config = "testhost: remote_host:remote_port"
  21. pyopen = mock_open(read_data=config)
  22. with patch("websockify.token_plugins.open", pyopen, create=True):
  23. result = plugin.lookup('testhost')
  24. pyopen.assert_called_once_with('configfile')
  25. self.assertIsNotNone(result)
  26. self.assertEqual(result[0], "remote_host")
  27. self.assertEqual(result[1], "remote_port")
  28. patch('os.path.isdir', MagicMock(return_value=False))
  29. def test_tabs(self):
  30. plugin = ReadOnlyTokenFile('configfile')
  31. config = "testhost:\tremote_host:remote_port"
  32. pyopen = mock_open(read_data=config)
  33. with patch("websockify.token_plugins.open", pyopen, create=True):
  34. result = plugin.lookup('testhost')
  35. pyopen.assert_called_once_with('configfile')
  36. self.assertIsNotNone(result)
  37. self.assertEqual(result[0], "remote_host")
  38. self.assertEqual(result[1], "remote_port")
  39. class JWSTokenTestCase(unittest.TestCase):
  40. def test_asymmetric_jws_token_plugin(self):
  41. plugin = JWTTokenApi("./tests/fixtures/public.pem")
  42. key = jwt.JWK()
  43. private_key = open("./tests/fixtures/private.pem", "rb").read()
  44. key.import_from_pem(private_key)
  45. jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
  46. jwt_token.make_signed_token(key)
  47. result = plugin.lookup(jwt_token.serialize())
  48. self.assertIsNotNone(result)
  49. self.assertEqual(result[0], "remote_host")
  50. self.assertEqual(result[1], "remote_port")
  51. def test_asymmetric_jws_token_plugin_with_illigal_key_exception(self):
  52. plugin = JWTTokenApi("wrong.pub")
  53. key = jwt.JWK()
  54. private_key = open("./tests/fixtures/private.pem", "rb").read()
  55. key.import_from_pem(private_key)
  56. jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
  57. jwt_token.make_signed_token(key)
  58. result = plugin.lookup(jwt_token.serialize())
  59. self.assertIsNone(result)
  60. @patch('time.time')
  61. def test_jwt_valid_time(self, mock_time):
  62. plugin = JWTTokenApi("./tests/fixtures/public.pem")
  63. key = jwt.JWK()
  64. private_key = open("./tests/fixtures/private.pem", "rb").read()
  65. key.import_from_pem(private_key)
  66. jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 })
  67. jwt_token.make_signed_token(key)
  68. mock_time.return_value = 150
  69. result = plugin.lookup(jwt_token.serialize())
  70. self.assertIsNotNone(result)
  71. self.assertEqual(result[0], "remote_host")
  72. self.assertEqual(result[1], "remote_port")
  73. @patch('time.time')
  74. def test_jwt_early_time(self, mock_time):
  75. plugin = JWTTokenApi("./tests/fixtures/public.pem")
  76. key = jwt.JWK()
  77. private_key = open("./tests/fixtures/private.pem", "rb").read()
  78. key.import_from_pem(private_key)
  79. jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 })
  80. jwt_token.make_signed_token(key)
  81. mock_time.return_value = 50
  82. result = plugin.lookup(jwt_token.serialize())
  83. self.assertIsNone(result)
  84. @patch('time.time')
  85. def test_jwt_late_time(self, mock_time):
  86. plugin = JWTTokenApi("./tests/fixtures/public.pem")
  87. key = jwt.JWK()
  88. private_key = open("./tests/fixtures/private.pem", "rb").read()
  89. key.import_from_pem(private_key)
  90. jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 })
  91. jwt_token.make_signed_token(key)
  92. mock_time.return_value = 250
  93. result = plugin.lookup(jwt_token.serialize())
  94. self.assertIsNone(result)
  95. def test_symmetric_jws_token_plugin(self):
  96. plugin = JWTTokenApi("./tests/fixtures/symmetric.key")
  97. secret = open("./tests/fixtures/symmetric.key").read()
  98. key = jwt.JWK()
  99. key.import_key(kty="oct",k=secret)
  100. jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"})
  101. jwt_token.make_signed_token(key)
  102. result = plugin.lookup(jwt_token.serialize())
  103. self.assertIsNotNone(result)
  104. self.assertEqual(result[0], "remote_host")
  105. self.assertEqual(result[1], "remote_port")
  106. def test_symmetric_jws_token_plugin_with_illigal_key_exception(self):
  107. plugin = JWTTokenApi("wrong_sauce")
  108. secret = open("./tests/fixtures/symmetric.key").read()
  109. key = jwt.JWK()
  110. key.import_key(kty="oct",k=secret)
  111. jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"})
  112. jwt_token.make_signed_token(key)
  113. result = plugin.lookup(jwt_token.serialize())
  114. self.assertIsNone(result)
  115. def test_asymmetric_jwe_token_plugin(self):
  116. plugin = JWTTokenApi("./tests/fixtures/private.pem")
  117. private_key = jwt.JWK()
  118. public_key = jwt.JWK()
  119. private_key_data = open("./tests/fixtures/private.pem", "rb").read()
  120. public_key_data = open("./tests/fixtures/public.pem", "rb").read()
  121. private_key.import_from_pem(private_key_data)
  122. public_key.import_from_pem(public_key_data)
  123. jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port"})
  124. jwt_token.make_signed_token(private_key)
  125. jwe_token = jwt.JWT(header={"alg": "RSA-OAEP", "enc": "A256CBC-HS512"},
  126. claims=jwt_token.serialize())
  127. jwe_token.make_encrypted_token(public_key)
  128. result = plugin.lookup(jwt_token.serialize())
  129. self.assertIsNotNone(result)
  130. self.assertEqual(result[0], "remote_host")
  131. self.assertEqual(result[1], "remote_port")