[tests] Add FastCGI service and use it in new cgi test

personal/stbuehler/wip
Stefan Bühler 12 years ago
parent 11fa781f98
commit 6013a9aa9a
  1. 89
      tests/base.py
  2. 28
      tests/runtests.py
  3. 60
      tests/service.py
  4. 56
      tests/t-cgi.py

@ -19,7 +19,7 @@ There are some basic test classes you can derive from:
Each test class can provide Prepare, Run and Cleanup handlers (CurlRequest already provides a Run handler).
A test intance has the following attributes:
* config: vhost config (error/access log and vhost handling gets added)
* config: vhost config (error/access log/docroot and vhost handling gets added)
* plain_config: gets added before all vhost configs (define global actions here)
* name: unique test name, has a sane default
* vhost: the vhost name; must be unique if a config is provided;
@ -101,7 +101,8 @@ class TestBase(object):
self.tests = tests
if not self.vhost: self.vhost = vhostname(self.name)
self.vhostdir = os.path.join(Env.dir, 'www', 'vhosts', self.vhost)
tests.add_test(self)
if self.FeatureCheck():
tests.add_test(self)
def _prepare(self):
self.Prepare()
@ -117,10 +118,11 @@ class TestBase(object):
var.reg_vhosts = var.reg_vhosts + [ "%s" : ${
log = [ "*": "file:%s" ];
accesslog = "%s";
docroot "%s";
%s
}
];
""" % (self.name, regex_subvhosts(self.vhost), errorlog, accesslog, self.config)
""" % (self.name, regex_subvhosts(self.vhost), errorlog, accesslog, self.vhostdir, self.config)
else:
config = """
# %s
@ -128,10 +130,11 @@ var.reg_vhosts = var.reg_vhosts + [ "%s" : ${
var.vhosts = var.vhosts + [ "%s" : ${
log = [ "*": "file:%s" ];
accesslog = "%s";
docroot "%s";
%s
}
];
""" % (self.name, self.vhost, errorlog, accesslog, self.config)
""" % (self.name, self.vhost, errorlog, accesslog, self.vhostdir, self.config)
self.tests.append_vhosts_config(config)
@ -176,21 +179,24 @@ var.vhosts = var.vhosts + [ "%s" : ${
self.tests.CleanupDir(dirname)
# public
def PrepareVHostFile(self, fname, content):
def PrepareVHostFile(self, fname, content, mode = 0644):
"""remembers which files have been prepared and while remove them on cleanup; returns absolute pathname"""
fname = 'www/vhosts/' + self.vhost + '/' + fname
return self.tests.PrepareFile(fname, content)
fname = os.path.join('www', 'vhosts', self.vhost, fname)
return self.PrepareFile(fname, content, mode = mode)
def PrepareFile(self, fname, content):
def PrepareFile(self, fname, content, mode = 0644):
"""remembers which files have been prepared and while remove them on cleanup; returns absolute pathname"""
self._test_cleanup_files.append(fname)
return self.tests.PrepareFile(fname, content)
return self.tests.PrepareFile(fname, content, mode = mode)
def PrepareDir(self, dirname):
"""remembers which directories have been prepared and while remove them on cleanup; returns absolute pathname"""
self._test_cleanup_dirs.append(fname)
return self.tests.PrepareDir(dirname)
def MissingFeature(self, feature):
print >> sys.stderr, Env.COLOR_RED + ("Skipping test '%s' due to missing '%s'" % (self.name, feature)) + Env.COLOR_RESET
return False
# implement these yourself
def Prepare(self):
@ -202,6 +208,9 @@ var.vhosts = var.vhosts + [ "%s" : ${
def Cleanup(self):
pass
def FeatureCheck(self):
return True
def class2testname(name):
if name.startswith("Test"): name = name[4:]
return name
@ -387,17 +396,10 @@ allow-listen {{ ip "127.0.0.1:{Env.port}"; }}
return ("[{0:>%i}" % len(s)).format(i) + "/" + s + "]"
def Run(self):
COLOR_RESET = Env.color and "\033[0m" or ""
COLOR_BLUE = Env.color and "\033[1;34m" or ""
COLOR_GREEN = Env.color and "\033[1;32m" or ""
COLOR_YELLOW = Env.color and "\033[1;33m" or ""
COLOR_RED = Env.color and "\033[1;31m" or ""
COLOR_CYAN = Env.color and "\033[1;36m" or ""
PASS = COLOR_GREEN + "[PASS]" + COLOR_RESET
FAIL = COLOR_RED + "[FAIL]" + COLOR_RESET
TODO = COLOR_YELLOW + "[TODO]" + COLOR_RESET
DONE = COLOR_YELLOW + "[DONE]" + COLOR_RESET
PASS = Env.COLOR_GREEN + "[PASS]" + Env.COLOR_RESET
FAIL = Env.COLOR_RED + "[FAIL]" + Env.COLOR_RESET
TODO = Env.COLOR_YELLOW + "[TODO]" + Env.COLOR_RESET
DONE = Env.COLOR_YELLOW + "[DONE]" + Env.COLOR_RESET
testcount = len(self.run)
print >> Env.log, "[Start] Running tests"
@ -405,19 +407,19 @@ allow-listen {{ ip "127.0.0.1:{Env.port}"; }}
sys.stdout.flush()
sys.stderr.flush()
fmt = COLOR_BLUE + " {0:<%i} " % self.testname_len
fmt = Env.COLOR_BLUE + " {0:<%i} " % self.testname_len
failed = False
i = 1
for t in self.run:
result = t._run()
if t.todo:
print >> sys.stdout, COLOR_CYAN + self._progress(i, testcount) + fmt.format(t.name) + (result and DONE or TODO)
print >> sys.stdout, Env.COLOR_CYAN + self._progress(i, testcount) + fmt.format(t.name) + (result and DONE or TODO)
if result:
self.stat_done += 1
else:
self.stat_todo += 1
else:
print >> sys.stdout, COLOR_CYAN + self._progress(i, testcount) + fmt.format(t.name) + (result and PASS or FAIL)
print >> sys.stdout, Env.COLOR_CYAN + self._progress(i, testcount) + fmt.format(t.name) + (result and PASS or FAIL)
if result:
self.stat_pass += 1
else:
@ -463,13 +465,15 @@ allow-listen {{ ip "127.0.0.1:{Env.port}"; }}
print >> Env.log, "[Done] Cleanup tests"
## helpers for prepare/cleanup
def _preparefile(self, fname, content):
def _preparefile(self, fname, content, mode = 0644):
if self.prepared_files.has_key(fname):
raise BaseException("File '%s' already exists!" % fname)
else:
f = open(os.path.join(Env.dir, fname), "w")
path = os.path.join(Env.dir, fname)
f = open(path, "w")
f.write(content)
f.close()
os.chmod(path, mode)
self.prepared_files[fname] = 1
def _cleanupfile(self, fname):
@ -498,21 +502,21 @@ allow-listen {{ ip "127.0.0.1:{Env.port}"; }}
except BaseException, e:
print >>sys.stderr, "Couldn't delete directory '%s': %s" % (dirname, e)
def PrepareFile(self, fname, content):
def PrepareFile(self, fname, content, mode = 0644):
path = filter(lambda x: x != '', fname.split('/'))
for i in range(1, len(path)):
self._preparedir(os.path.join(*path[0:i]))
self._preparefile(os.path.join(*path), content)
self._preparefile(os.path.join(*path), content, mode = mode)
return os.path.join(Env.dir, *path)
def PrepareDir(self, dirname):
path = filter(lambda x: x != '', fname.split('/'))
path = filter(lambda x: x != '', dirname.split('/'))
for i in range(1, len(path)+1):
self._preparedir(os.path.join(*path[0:i]))
return os.path.join(Env.dir, *path)
def CleanupDir(self, dirname):
path = filter(lambda x: x != '', fname.split('/'))
path = filter(lambda x: x != '', dirname.split('/'))
for i in reversed(range(1, len(path)+1)):
self._cleanupdir(os.path.join(*path[0:i]))
@ -522,30 +526,3 @@ allow-listen {{ ip "127.0.0.1:{Env.port}"; }}
return False
for i in reversed(range(1, len(path))):
self._cleanupdir(os.path.join(*path[0:i]))
class Lighttpd(Service):
name = "lighttpd"
def TestConfig(self):
logfile = open(self.log, "w")
inp = self.devnull()
args = [Env.worker, '-m', Env.plugindir, '-c', Env.lighttpdconf, '-t']
print >> Env.log, "Testing lighttpd config: %s" % (' '.join(args))
proc = subprocess.Popen(args, stdin = inp, stdout = logfile, stderr = logfile, close_fds = True)
if None != inp: inp.close()
logfile.close()
status = proc.wait()
if 0 != status:
os.system("cat '%s'" % self.log)
raise BaseException("testing lighttpd config failed with returncode %i" % (status))
def Prepare(self):
self.TestConfig()
self.portfree(Env.port)
if Env.no_angel:
self.fork(Env.worker, '-m', Env.plugindir, '-c', Env.lighttpdconf)
else:
self.fork(Env.angel, '-m', Env.plugindir, '-c', Env.angelconf)
self.waitconnect(Env.port)

@ -11,6 +11,23 @@ from optparse import OptionParser
from tempfile import mkdtemp
def which(program):
def is_exe(fpath):
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
def find_port(port):
if port >= 1024 and port < (65536-8):
return port
@ -55,9 +72,18 @@ Env.luadir = os.path.join(os.path.dirname(Env.sourcedir), "doc")
Env.debugRequests = options.debug_requests
Env.strace = options.strace
Env.truss = options.truss
Env.color = sys.stdin.isatty()
Env.no_angel = options.no_angel
Env.color = sys.stdin.isatty()
Env.COLOR_RESET = Env.color and "\033[0m" or ""
Env.COLOR_BLUE = Env.color and "\033[1;34m" or ""
Env.COLOR_GREEN = Env.color and "\033[1;32m" or ""
Env.COLOR_YELLOW = Env.color and "\033[1;33m" or ""
Env.COLOR_RED = Env.color and "\033[1;31m" or ""
Env.COLOR_CYAN = Env.color and "\033[1;36m" or ""
Env.fcgi_cgi = which('fcgi-cgi')
Env.dir = mkdtemp(dir = os.getcwd())
Env.defaultwww = os.path.join(Env.dir, "www", "default")

@ -9,7 +9,7 @@ import signal
import base
__all__ = [ "Service", "ServiceException", "devnull" ]
__all__ = [ "Service", "ServiceException", "devnull", "Lighttpd", "FastCGI" ]
class ServiceException(Exception):
def __init__(self, value): self.value = value
@ -40,11 +40,15 @@ class Service(object):
def devnull(self):
return devnull()
def fork(self, *args):
def fork(self, *args, **kwargs):
if kwargs.has_key('inp'):
inp = kwargs['inp']
else:
inp = devnull()
if None == self.name:
raise ServiceException("Service needs a name!")
logfile = open(self.log, "w")
inp = devnull()
if base.Env.strace:
slog = self.tests.PrepareFile("log/strace-%s.log" % self.name, "")
@ -130,4 +134,52 @@ class Service(object):
pass
def Stop(self):
pass
pass
class Lighttpd(Service):
name = "lighttpd"
def TestConfig(self):
logfile = open(self.log, "w")
inp = self.devnull()
args = [base.Env.worker, '-m', base.Env.plugindir, '-c', base.Env.lighttpdconf, '-t']
print >> base.Env.log, "Testing lighttpd config: %s" % (' '.join(args))
proc = subprocess.Popen(args, stdin = inp, stdout = logfile, stderr = logfile, close_fds = True)
if None != inp: inp.close()
logfile.close()
status = proc.wait()
if 0 != status:
os.system("cat '%s'" % self.log)
raise BaseException("testing lighttpd config failed with returncode %i" % (status))
def Prepare(self):
self.TestConfig()
self.portfree(base.Env.port)
if base.Env.no_angel:
self.fork(base.Env.worker, '-m', base.Env.plugindir, '-c', base.Env.lighttpdconf)
else:
self.fork(base.Env.angel, '-m', base.Env.plugindir, '-c', base.Env.angelconf)
self.waitconnect(base.Env.port)
class FastCGI(Service):
binary = [None]
def __init__(self):
self.sockfile = os.path.join(base.Env.dir, "tmp", "sockets", self.name + ".sock")
super(FastCGI, self).__init__()
def Prepare(self):
sockdir = self.tests.PrepareDir(os.path.join("tmp", "sockets"))
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(self.sockfile)
sock.listen(8)
self.fork(*self.binary, inp = sock)
def Cleanup(self):
if None != self.sockfile:
try:
os.remove(self.sockfile)
except BaseException, e:
print >>sys.stderr, "Couldn't delete socket '%s': %s" % (self.sockfile, e)
self.tests.CleanupDir(os.path.join("tmp", "sockets"))

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from service import FastCGI
class CGI(FastCGI):
name = "fcgi_cgi"
binary = [ Env.fcgi_cgi ]
SCRIPT_PATHINFO="""#!/bin/sh
echo -en 'Status: 200\\r\\nContent-Type: text/plain\\r\\n\\r\\n'
echo -n ${PATH_INFO}
"""
class TestPathInfo1(CurlRequest):
URL = "/pathinfo.cgi/abc/xyz"
EXPECT_RESPONSE_BODY = "/abc/xyz"
EXPECT_RESPONSE_CODE = 200
class Test(GroupTest):
group = [
TestPathInfo1,
]
config = """
pathinfo;
if phys.exists and phys.path =$ ".cgi" {
cgi;
} else {
cgi;
}
"""
def FeatureCheck(self):
if None == Env.fcgi_cgi:
return self.MissingFeature('fcgi-cgi')
cgi = CGI()
self.plain_config = """
setup {{ module_load "mod_fastcgi"; }}
cgi {{
fastcgi "unix:{socket}";
}}
""".format(socket = cgi.sockfile)
self.tests.add_service(cgi)
return True
def Prepare(self):
self.PrepareVHostFile("pathinfo.cgi", SCRIPT_PATHINFO, mode = 0755)
Loading…
Cancel
Save