Merge branch 'master' into master-2

# Conflicts:
#	LICENSE
This commit is contained in:
stubbfel
2015-03-27 00:06:28 +01:00
parent 8f42134415
commit 7cf755493c
29 changed files with 669 additions and 0 deletions

111
.gitignore vendored Normal file
View File

@@ -0,0 +1,111 @@
# Created by https://www.gitignore.io
### PyCharm ###
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm
*.iml
## Directory-based project format:
.idea/
# if you remove the above rule, at least ignore the following:
# User-specific stuff:
# .idea/workspace.xml
# .idea/tasks.xml
# .idea/dictionaries
# Sensitive or high-churn files:
# .idea/dataSources.ids
# .idea/dataSources.xml
# .idea/sqlDataSources.xml
# .idea/dynamic.xml
# .idea/uiDesigner.xml
# Gradle:
# .idea/gradle.xml
# .idea/libraries
# Mongo Explorer plugin:
# .idea/mongoSettings.xml
## File-based project format:
*.ipr
*.iws
## Plugin-specific files:
# IntelliJ
/out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test.py / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/

View File

@@ -1,6 +1,10 @@
The MIT License (MIT)
<<<<<<< HEAD
Copyright (c) 2015 stubbfel
=======
Copyright (c) 2015 Felix Stubbe
>>>>>>> master
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
@@ -18,5 +22,9 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
<<<<<<< HEAD
SOFTWARE.
=======
SOFTWARE.
>>>>>>> master

14
MANIFEST Normal file
View File

@@ -0,0 +1,14 @@
# file GENERATED by distutils, do NOT edit
setup.py
src/__init__.py
src/fake_services/__init__.py
src/fake_services/service/__init__.py
src/fake_services/service/networkservice/__init__.py
src/fake_services/service/webservice/__init__.py
src/fake_services/service/webservice/fake_http_request_handler.py
src/fake_services/service/webservice/fake_web_server.py
src/fake_services/service/webservice/fake_web_server_manager.py
src/fake_services/service/webservice/file_content_response.py
src/fake_services/utility/__init__.py
src/fake_services/utility/network/__init__.py
src/fake_services/utility/network/ip_address_manager.py

17
example/runwebserver.py Normal file
View File

@@ -0,0 +1,17 @@
#!/usr/bin/env python3
__author__ = 'dev'
from fake_services.service.webservice.fake_web_server_manager import FakeWebServerManager
config = {
"/1/a/3/foo/bar.php":
[
{
"response_content_path": "cgi-bin/testA.html"
}
]
}
server = FakeWebServerManager(server_port=80, requests_config=config)
server.start_server()
#server.stop_server()

11
example/setips.py Normal file
View File

@@ -0,0 +1,11 @@
#!/usr/bin/env python3
__author__ = 'dev'
import fake_services.utility.network.ip_address_manager as ip
# set ips
ip.add_ip_addresses(["1.1.1.1", "2.2.2.2"], "eth0")
# unset ips
#ip.remove_ip_addresses(["1.1.1.1", "2.2.2.2"], "eth0")

14
setup.py Normal file
View File

@@ -0,0 +1,14 @@
from distutils.core import setup
setup(
name='FakeServices',
version='0.1',
packages=['', 'fake_services', 'fake_services.service', 'fake_services.service.webservice',
'fake_services.service.networkservice', 'fake_services.utility', 'fake_services.utility.network'],
package_dir={'': 'src'},
url='http://tcreate',
license='The MIT License (MIT) Copyright (c) 2015 Felix Stubbe Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.',
author='stubbfel',
author_email='stubbfel@gmail.com',
description='Helps to create many services on one maschine. Its manage IP-Addesses and response with static content of certain requests'
)

0
src/__init__.py Normal file
View File

View File

@@ -0,0 +1 @@
__author__ = 'dev'

View File

@@ -0,0 +1 @@
__author__ = 'dev'

View File

@@ -0,0 +1 @@
__author__ = 'dev'

View File

@@ -0,0 +1 @@
__author__ = 'dev'

View File

@@ -0,0 +1,74 @@
__author__ = 'dev'
import os
import re
import urllib.parse
from http.server import CGIHTTPRequestHandler
HEADERS_HOST_PARAMETER_KEY_NAME = "Host"
REQUEST_LINE_ENCODING = "iso-8859-1"
RESPONSE_PATH_PARAMETER_KEY_NAME = "rpath"
HOST_PATTERN_KEY_NAME = "host_pattern"
RESPONSE_CONTENT_PATH_KEY_NAME = "response_content_path"
class FakeHTTPRequestHandler(CGIHTTPRequestHandler):
def do_GET(self):
self.__set_path_setting()
CGIHTTPRequestHandler.do_GET(self)
def do_HEAD(self):
self.__set_path_setting()
CGIHTTPRequestHandler.do_HEAD(self)
def do_POST(self):
self.command = "GET"
self.do_GET()
def __set_path_setting(self):
response_content_path = None
if self.server.requests_config is not None:
server_path = self.__get_server_path()
if server_path in self.server.requests_config:
request_config = self.server.requests_config[server_path]
response_content_path = self.__get_response_content_path(request_config)
if response_content_path is not None:
self.path = self.server.request_handle_script_path
parent_path = os.path.join(os.path.sep, os.path.dirname(self.path))
if parent_path not in self.cgi_directories:
self.cgi_directories.append(parent_path)
request_data = {
RESPONSE_PATH_PARAMETER_KEY_NAME: response_content_path
}
url_parameter = urllib.parse.urlencode(request_data)
self.path = self.path + "?" + url_parameter
else:
self.path = "/404"
def __get_response_content_path(self, request_config):
sorted_configs = sorted(request_config, key=len, reverse=True)
server_host = self.__get_server_host()
for config in sorted_configs:
if HOST_PATTERN_KEY_NAME in config:
result = re.search(config[HOST_PATTERN_KEY_NAME], server_host)
if result is None:
continue
if RESPONSE_CONTENT_PATH_KEY_NAME in config:
return config[RESPONSE_CONTENT_PATH_KEY_NAME]
return None
def __get_server_path(self):
request_line = str(self.raw_requestline, REQUEST_LINE_ENCODING).rstrip('\r\n')
words = request_line.split()
if len(words) < 2:
return ""
return words[1]
def __get_server_host(self):
return self.headers[HEADERS_HOST_PARAMETER_KEY_NAME]

View File

@@ -0,0 +1,11 @@
from http.server import HTTPServer
__author__ = 'dev'
class FakeWebServer(HTTPServer):
def __init__(self, server_address, request_handler_class, request_handle_script_path, requests_config):
super().__init__(server_address, request_handler_class)
self.request_handle_script_path = request_handle_script_path
self.requests_config = requests_config

View File

@@ -0,0 +1,39 @@
__author__ = 'dev'
import os
import stat
import threading
import pkgutil
import shutil
from fake_services.service.webservice.fake_http_request_handler import FakeHTTPRequestHandler
from fake_services.service.webservice.fake_web_server import FakeWebServer
class FakeWebServerManager:
def __init__(self, server_port, requests_config=None, request_handle_script_path=None):
cgi_path = request_handle_script_path
if cgi_path is None:
cgi_path = pkgutil.get_loader("service.webservice.file_content_response").path
newPath = os.path.basename(cgi_path)
if not os.path.exists(newPath):
shutil.copyfile(cgi_path, newPath)
cgi_path = newPath
if not os.path.exists(cgi_path):
raise FileNotFoundError
if not os.access(cgi_path, os.X_OK):
st = os.stat(cgi_path)
os.chmod(cgi_path, st.st_mode | stat.S_IEXEC)
self.server = FakeWebServer(('', server_port), FakeHTTPRequestHandler, cgi_path, requests_config)
def start_server(self):
threading.Thread(target=self.server.serve_forever).start()
def stop_server(self):
self.server.shutdown()

View File

@@ -0,0 +1,15 @@
#!/usr/bin/env python3
__author__ = 'dev'
import cgi
from fake_services.service.webservice.fake_http_request_handler import RESPONSE_PATH_PARAMETER_KEY_NAME
response_file_path = cgi.FieldStorage()[RESPONSE_PATH_PARAMETER_KEY_NAME].value
response_file = open(response_file_path, "r")
response_file_content = response_file.read()
response_file.close()
print("\n")
print(response_file_content)

View File

@@ -0,0 +1 @@
__author__ = 'dev'

View File

@@ -0,0 +1 @@
__author__ = 'dev'

View File

@@ -0,0 +1,34 @@
__author__ = 'dev'
import subprocess
import ipaddress
SUDO_CMD = "sudo"
IP_CMD = "ip"
ADDR_ARG = "addr"
ADD_ARG = "add"
RM_ARG = "del"
DEV_ARG = "dev"
def add_ip_address(ip_address, network_card):
add_or_remove_address(ip_address, network_card, ADD_ARG)
def remove_ip_address(ip_address, network_card):
add_or_remove_address(ip_address, network_card, RM_ARG)
def add_ip_addresses(ip_addresses, network_card):
for ip_address in ip_addresses:
add_or_remove_address(ip_address, network_card, ADD_ARG)
def remove_ip_addresses(ip_addresses, network_card):
for ip_address in ip_addresses:
add_or_remove_address(ip_address, network_card, RM_ARG)
def add_or_remove_address(ip_address, network_card, cmd):
ipaddress.ip_address(ip_address)
subprocess.call([SUDO_CMD, IP_CMD, ADDR_ARG, cmd, ip_address, DEV_ARG, network_card])

View File

@@ -0,0 +1,10 @@
#!/usr/bin/env python3
__author__ = 'dev'
response_file_path = "cgi-bin/test.html"
response_file = open(response_file_path, "r")
response_file_content = response_file.read()
response_file.close()
print("\n")
print(response_file_content)

View File

@@ -0,0 +1,7 @@
<html>
<Title>Hello in HTML</Title>
<body>
<p>Hello There!</p>
<p><b>Hi There!</b></p>
</body>
</html>

View File

@@ -0,0 +1,9 @@
#!/usr/bin/env python3
print("""
<html>
<Title>Hello in HTML</Title>
<body>
<p>Hello There!</p>
<p><b>Hi There!</b></p>
</body>
</html> """)

View File

@@ -0,0 +1,7 @@
<html>
<Title>Hello in HTMLA</Title>
<body>
<p>Hello There!</p>
<p><b>Hi There!</b></p>
</body>
</html>

View File

@@ -0,0 +1,7 @@
<html>
<Title>Hello in HTMLB</Title>
<body>
<p>Hello There!</p>
<p><b>Hi There!</b></p>
</body>
</html>

View File

@@ -0,0 +1,7 @@
<html>
<Title>Hello in HTMLC</Title>
<body>
<p>Hello There!</p>
<p><b>Hi There!</b></p>
</body>
</html>

View File

@@ -0,0 +1,15 @@
#!/usr/bin/env python3
__author__ = 'dev'
import cgi
from fake_services.service.webservice.fake_http_request_handler import RESPONSE_PATH_PARAMETER_KEY_NAME
response_file_path = cgi.FieldStorage()[RESPONSE_PATH_PARAMETER_KEY_NAME].value
response_file = open(response_file_path, "r")
response_file_content = response_file.read()
response_file.close()
print("\n")
print(response_file_content)

View File

@@ -0,0 +1,15 @@
#!/usr/bin/env python3
__author__ = 'dev'
import cgi
from fake_services.service.webservice.fake_http_request_handler import RESPONSE_PATH_PARAMETER_KEY_NAME
response_file_path = cgi.FieldStorage()[RESPONSE_PATH_PARAMETER_KEY_NAME].value
response_file = open(response_file_path, "r")
response_file_content = response_file.read()
response_file.close()
print("\n")
print(response_file_content)

View File

@@ -0,0 +1,9 @@
{
"":
{
"host_pattern": "10.0.0.1",
"path_pattern": "index.php",
"protocol_pattern": "http",
"response_content_path": "cgi-bin/test.html"
}
}

View File

@@ -0,0 +1,156 @@
__author__ = 'dev'
import os
import stat
import unittest
import urllib.request
import urllib.parse
from fake_services.service.webservice.fake_web_server_manager import FakeWebServerManager
CGI_TEST_SCRIPT_PATH = "cgi-bin/test.py"
CGI_WRONG_SCRIPT_PATH = "cgi"
CGI_FILE_SCRIPT_PATH = "cgi-bin/file_content_response.py"
CGI_VAR_BIN_SCRIPT_PATH = "pybin/file_content_response.py"
SERVER_PORT = 8080
TEST_URL = "http://0.0.0.0:" + str(SERVER_PORT)
TEST_URL_EXPAND = TEST_URL + "/1/a/3/foo/bar.php"
DEFAULT_CONFIG = {
"/": [{"response_content_path": "cgi-bin/testA.html"}],
"/1/a/3/foo/bar.php":
[
{
"host_pattern": "^0.0.0.0$",
"response_content_path": "cgi-bin/testA.html"
},
{
"host_pattern": "0.0.0.0:8080",
"response_content_path": "cgi-bin/testB.html"
},
{
"response_content_path": "cgi-bin/testC.html"
}
]
}
class TestFakeSever(unittest.TestCase):
def setUp(self):
try:
os.chdir("service/webservice")
except:
pass
def tearDown(self):
try:
os.chdir("../..")
except:
pass
def test_moved_request(self):
server = FakeWebServerManager(server_port=SERVER_PORT, request_handle_script_path=CGI_TEST_SCRIPT_PATH, requests_config=DEFAULT_CONFIG)
server.start_server()
content = urllib.request.urlopen(TEST_URL).read().decode('utf-8')
content2 = urllib.request.urlopen(TEST_URL_EXPAND).read().decode('utf-8')
server.stop_server()
self.assertNotEqual("", content)
self.assertEqual(content, content2)
self.assertFalse(content.startswith("#"))
def test_not_found_script(self):
self.assertRaises(FileNotFoundError, FakeWebServerManager, SERVER_PORT, None, CGI_WRONG_SCRIPT_PATH)
FakeWebServerManager(server_port=SERVER_PORT)
def test_set_executable(self):
st = os.stat(CGI_FILE_SCRIPT_PATH)
if os.access(CGI_FILE_SCRIPT_PATH, os.X_OK):
os.chmod(CGI_FILE_SCRIPT_PATH, st.st_mode ^ stat.S_IEXEC)
self.assertFalse(os.access(CGI_FILE_SCRIPT_PATH, os.X_OK))
FakeWebServerManager(server_port=SERVER_PORT, request_handle_script_path=CGI_FILE_SCRIPT_PATH, requests_config=DEFAULT_CONFIG)
self.assertTrue(os.access(CGI_FILE_SCRIPT_PATH, os.X_OK))
def test_file_request(self):
server = FakeWebServerManager(server_port=SERVER_PORT, request_handle_script_path=CGI_FILE_SCRIPT_PATH, requests_config=DEFAULT_CONFIG)
server.start_server()
content = urllib.request.urlopen(TEST_URL).read().decode('utf-8')
content2 = urllib.request.urlopen(TEST_URL_EXPAND).read().decode('utf-8')
server.stop_server()
self.assertNotEqual("", content)
self.assertEqual(content, content2)
self.assertFalse(content.startswith("#"))
def test_file_var_bin_request(self):
server = FakeWebServerManager(server_port=SERVER_PORT, request_handle_script_path=CGI_FILE_SCRIPT_PATH, requests_config=DEFAULT_CONFIG)
server.start_server()
content = urllib.request.urlopen(TEST_URL_EXPAND).read().decode('utf-8')
server.stop_server()
self.assertNotEqual("", content)
self.assertFalse(content.startswith("#"))
def test_file_var_bin_request_post(self):
server = FakeWebServerManager(server_port=SERVER_PORT, request_handle_script_path=CGI_FILE_SCRIPT_PATH, requests_config=DEFAULT_CONFIG)
server.start_server()
content = urllib.request.urlopen(TEST_URL_EXPAND).read().decode('utf-8')
data = urllib.parse.urlencode({'q': 'Status'})
content2 = urllib.request.urlopen(TEST_URL_EXPAND, data.encode('utf-8')).read().decode('utf-8')
server.stop_server()
self.assertNotEqual("", content)
self.assertFalse(content.startswith("#"))
self.assertEqual(content, content2)
def test_config_request(self):
config = {
"/test_request":
[
{
"host_pattern": "10.0.0.1",
"response_content_path": "cgi-bin/test.html"
}
],
"/1/a/3/foo/bar.php":
[
{
"host_pattern": "^0.0.0.0$",
"response_content_path": "cgi-bin/testA.html"
},
{
"host_pattern": "0.0.0.0:8080",
"response_content_path": "cgi-bin/testB.html"
},
{
"response_content_path": "cgi-bin/testC.html"
}
]
}
server = FakeWebServerManager(server_port=SERVER_PORT, requests_config=config,
request_handle_script_path=CGI_VAR_BIN_SCRIPT_PATH)
server.start_server()
content = urllib.request.urlopen(TEST_URL_EXPAND).read().decode('utf-8')
server.stop_server()
self.assertNotEqual("", content)
self.assertFalse(content.startswith("#"))
def test_config_request_default_script(self):
path = os.path.abspath("cgi-bin/testA.html")
config = {
"/1/a/3/foo/bar.php":
[
{
"response_content_path": path
}
]
}
server = FakeWebServerManager(server_port=SERVER_PORT, requests_config=config)
server.start_server()
content = urllib.request.urlopen(TEST_URL_EXPAND).read().decode('utf-8')
server.stop_server()
self.assertNotEqual("", content)
self.assertFalse(content.startswith("#"))
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,73 @@
__author__ = 'dev'
import os
import re
import unittest
import fake_services.utility.network.ip_address_manager as ip
FIRST_IP_ADDRESS = "1.1.1.1"
SECOND_IP_ADDRESS = "1.1.1.2"
NETWORK_DEVICE_NAME = "enp20s0"
class TestIpManager(unittest.TestCase):
def tearDown(self):
ip.remove_ip_address(FIRST_IP_ADDRESS, NETWORK_DEVICE_NAME)
ip.remove_ip_address(SECOND_IP_ADDRESS, NETWORK_DEVICE_NAME)
def test_add_address(self):
self.assertFalse(self.ping_able(FIRST_IP_ADDRESS))
ip.add_ip_address(FIRST_IP_ADDRESS, NETWORK_DEVICE_NAME)
self.assertTrue(self.ping_able(FIRST_IP_ADDRESS))
def test_add_addresses(self):
self.assertFalse(self.ping_able(FIRST_IP_ADDRESS))
self.assertFalse(self.ping_able(SECOND_IP_ADDRESS))
ip.add_ip_addresses([FIRST_IP_ADDRESS,SECOND_IP_ADDRESS], NETWORK_DEVICE_NAME)
self.assertTrue(self.ping_able(FIRST_IP_ADDRESS))
self.assertTrue(self.ping_able(SECOND_IP_ADDRESS))
def test_rm_addresses(self):
self.assertFalse(self.ping_able(FIRST_IP_ADDRESS))
self.assertFalse(self.ping_able(SECOND_IP_ADDRESS))
ip.add_ip_addresses([FIRST_IP_ADDRESS,SECOND_IP_ADDRESS], NETWORK_DEVICE_NAME)
self.assertTrue(self.ping_able(FIRST_IP_ADDRESS))
self.assertTrue(self.ping_able(SECOND_IP_ADDRESS))
ip.remove_ip_addresses([FIRST_IP_ADDRESS,SECOND_IP_ADDRESS], NETWORK_DEVICE_NAME)
self.assertFalse(self.ping_able(FIRST_IP_ADDRESS))
self.assertFalse(self.ping_able(SECOND_IP_ADDRESS))
def test_rm_address(self):
self.assertFalse(self.ping_able(FIRST_IP_ADDRESS))
ip.add_ip_address(FIRST_IP_ADDRESS, NETWORK_DEVICE_NAME)
self.assertTrue(self.ping_able(FIRST_IP_ADDRESS))
ip.remove_ip_address(FIRST_IP_ADDRESS, NETWORK_DEVICE_NAME)
self.assertFalse(self.ping_able(FIRST_IP_ADDRESS))
def test_massive_adds(self):
ip_prefix = "10.0.0."
ip_list = []
for i in range(0, 255):
ip_list.append(ip_prefix + (str(i)))
try:
ip.add_ip_addresses(ip_list, NETWORK_DEVICE_NAME)
for ip_address in ip_list:
self.assertTrue(self.ping_able(ip_address))
finally:
ip.remove_ip_addresses(ip_list, NETWORK_DEVICE_NAME)
def ping_able(self, ip_address):
cmd = "ping -c1 " + ip_address
r = "".join(str(os.popen(cmd).readlines()))
if re.search("64 bytes from", r):
return True
else:
return False
if __name__ == '__main__':
unittest.main()