[tests] reorganize python tests

Change-Id: I7127b0e1cd16f713bca7310c04bd049aa2f74570
master
Stefan Bühler 3 months ago
parent f046f4fac1
commit a5e8c80bf8

@ -5,8 +5,8 @@ Howto write a test case:
Create a file "t-*.py" for your testcase and add a "Test" class to it, like:
from base import *
from requests import *
from base import GroupTest
from requests import CurlRequest
class Test(CurlRequest):
...
@ -44,16 +44,16 @@ need more details :)
"""
from __future__ import print_function
import importlib
import os
import imp
import sys
import traceback
import re
import subprocess
import inspect
from service import *
from pylt.service import Lighttpd
__all__ = [ "Env", "Tests", "TestBase", "GroupTest", "eprint", "log" ]
@ -78,6 +78,7 @@ def fix_test_name(name):
if (name[-1:] != '/'): name = name + '/'
return name
def load_test_file(name):
path = os.path.join(Env.sourcedir, 'tests', name)
file = open(path)
@ -86,12 +87,15 @@ def load_test_file(name):
file.close()
return module
def vhostname(testname):
return '.'.join(reversed(testname[1:-1].split('/'))).lower()
def regex_subvhosts(vhost):
return '(^|\\.)' + re.escape(vhost)
# basic interface
class TestBase(object):
config = None
@ -246,10 +250,12 @@ var.vhosts = var.vhosts + [ "%s" => {
def FeatureCheck(self):
return True
def class2testname(name):
if name.startswith("Test"): name = name[4:]
return name
class GroupTest(TestBase):
runnable = False
@ -339,20 +345,21 @@ class Tests(object):
self.vhosts_config += config
def LoadTests(self):
files = os.listdir(os.path.join(Env.sourcedir, "tests"))
files = filter(lambda x: x[-3:] == '.py', files)
files = filter(lambda x: x[:2] == 't-', files)
files = sorted(files)
module_names = sorted(
entry.removesuffix('.py')
for entry in os.listdir(os.path.join(os.path.dirname(__file__), 'tests'))
if entry.startswith('t-') and entry.endswith('.py')
)
mods = []
for f in files:
mods.append(load_test_file(f))
for mod_name in module_names:
mods.append(importlib.import_module(f'pylt.tests.{mod_name}'))
for m in mods:
t = m.Test()
for mod in mods:
t = mod.Test()
t.name = fix_test_name(t.name)
if '/' == t.name:
(n, _) = os.path.splitext(os.path.basename(m.__file__))
(n, _) = os.path.splitext(os.path.basename(mod.__file__))
t.name = fix_test_name(n[2:])
t._register(self)
@ -547,7 +554,7 @@ allow_listen ["127.0.0.2:{gnutlsport}", "127.0.0.2:{opensslport}"];
return not failed
def Cleanup(self):
# eprint("cleanup_files: %s, cleanup_dirs: %s" % (self.prepared_files, self.prepared_dirs))
# eprint("cleanup_files: %s, cleanup_dirs: %s" % (self.prepared_files, self.prepared_dirs))
if not Env.no_cleanup and not self.failed:
log("[Start] Cleanup services")
@ -574,11 +581,12 @@ allow_listen ["127.0.0.2:{gnutlsport}", "127.0.0.2:{opensslport}"];
shutil.rmtree(cache_disk_etag_dir)
os.mkdir(cache_disk_etag_dir)
except BaseException as e:
eprint("Couldn't clear directory '%s': %s" % (fname, e))
eprint("Couldn't clear directory '%s': %s" % (cache_disk_etag_dir, e))
self.CleanupDir(os.path.join("tmp", "cache_etag"))
log("[Done] Cleanup tests")
## helpers for prepare/cleanup
## helpers for prepare/cleanup
def _preparefile(self, fname, content, mode = 0o644):
if fname in self.prepared_files:
raise BaseException("File '%s' already exists!" % fname)

@ -2,10 +2,12 @@
import time
__all__ = [ 'LogFile', 'RemoveEscapeSeq' ]
ATTRS = [ 'closed', 'encoding', 'errors', 'mode', 'name', 'newlines', 'softspace' ]
class LogFile(object):
def __init__(self, file, **clones):
self.file = file
@ -21,12 +23,12 @@ class LogFile(object):
if name in ATTRS:
return delattr(self.file, name)
else:
return super(LogFile, self).__delattr__(name, value)
return super(LogFile, self).__delattr__(name)
def __getattr__(self, name):
if name in ATTRS:
return getattr(self.file, name)
else:
return super(LogFile, self).__getattr__(name, value)
return super(LogFile, self).__getattr__(name)
def __getattribute__(self, name):
if name in ATTRS:
return self.file.__getattribute__(name)
@ -85,6 +87,7 @@ class LogFile(object):
return self.write(''.join(args))
def xreadlines(self, *args, **kwargs): return self.file.xreadlines(*args, **kwargs)
class RemoveEscapeSeq(object):
def __init__(self, file):
self.file = file
@ -99,12 +102,12 @@ class RemoveEscapeSeq(object):
if name in ATTRS:
return delattr(self.file, name)
else:
return super(RemoveEscapeSeq, self).__delattr__(name, value)
return super(RemoveEscapeSeq, self).__delattr__(name)
def __getattr__(self, name):
if name in ATTRS:
return getattr(self.file, name)
else:
return super(RemoveEscapeSeq, self).__getattr__(name, value)
return super(RemoveEscapeSeq, self).__getattr__(name)
def __getattribute__(self, name):
if name in ATTRS:
return self.file.__getattribute__(name)

@ -4,13 +4,14 @@ import socket
import pycurl
from io import BytesIO
import sys
import zlib
import bz2
import os
from base import *
import pycurl
from pylt.base import eprint, log, Env, TestBase
TEST_TXT="""Hi!
0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
@ -19,6 +20,7 @@ TEST_TXT="""Hi!
0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
"""
class CurlRequestException(Exception):
def __init__(self, value): self.value = value
def __str__(self): return repr(self.value)
@ -132,7 +134,7 @@ class CurlRequest(_BaseRequest):
def Run(self):
if None == self.URL:
raise BasicException("You have to set URL in your CurlRequest instance")
raise BaseException("You have to set URL in your CurlRequest instance")
reqheaders = ["Host: " + self.vhost] + self.REQUEST_HEADERS
if None != self.ACCEPT_ENCODING:
reqheaders += ["Accept-Encoding: " + self.ACCEPT_ENCODING]

@ -0,0 +1,159 @@
# -*- coding: utf-8 -*-
import os
import sys
from tempfile import mkdtemp
from optparse import OptionParser
from .logfile import LogFile, RemoveEscapeSeq
from .base import Env, Tests, eprint
def which(program):
def is_exe(fpath):
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
def find_port(port):
if port >= 1024 and port < (65536-8):
return port
from random import Random
r = Random(os.getpid())
return r.randint(1024, 65536-8)
class ArgumentError(Exception):
def __init__(self, value): self.value = value
def __str__(self): return repr(self.value)
def parse_args():
parser = OptionParser()
parser.add_option("--angel", help = "Path to angel binary (required)")
parser.add_option("--worker", help = "Path to worker binary (required)")
parser.add_option("--plugindir", help = "Path to plugin directory (required)")
parser.add_option("-k", "--no-cleanup", help = "Keep temporary files, no cleanup", action = "store_true", default = False)
parser.add_option("-p", "--port", help = "Use [port,port+7] as tcp ports on 127.0.0.2 (default: 8088; use 0 for random port)", default = 8088, type = "int")
parser.add_option("-t", "--test", help = "Run specific test", action = "append", dest = "tests", default = [])
parser.add_option("-c", "--force-cleanup", help = "Keep no temporary files (overwrites -k)", action = "store_true", default = False)
parser.add_option("--strace", help = "Strace services", action = "store_true", default = False)
parser.add_option("--truss", help = "Truss services", action = "store_true", default = False)
parser.add_option("--debug-requests", help = "Dump requests", action = "store_true", default = False)
parser.add_option("--no-angel", help = "Spawn lighttpd worker directly", action = "store_true", default = False)
parser.add_option("--debug", help = "Show service logs on console", action = "store_true", default = False)
parser.add_option("--wait", help = "Wait for services to exit on first signal", action = "store_true", default = False)
parser.add_option("--valgrind", help = "Run worker with valgrind from angel", action = "store_true", default = False)
parser.add_option("--valgrind-leak", help = "Run valgrind with memory leak check; takes an empty string or a valgrind suppression file", action="store", default = False)
(options, args) = parser.parse_args()
if not options.angel or not options.worker or not options.plugindir:
raise ArgumentError("Missing required arguments")
if options.force_cleanup: options.no_cleanup = False
return options
def setup_env():
options = parse_args()
Env.angel = os.path.abspath(options.angel)
Env.worker = os.path.abspath(options.worker)
Env.plugindir = os.path.abspath(options.plugindir)
Env.no_cleanup = options.no_cleanup
Env.force_cleanup = options.force_cleanup
Env.port = find_port(options.port)
Env.tests = options.tests
Env.sourcedir = os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
Env.contribdir = os.path.join(Env.sourcedir, "contrib")
Env.debugRequests = options.debug_requests
Env.strace = options.strace
Env.truss = options.truss
Env.no_angel = options.no_angel
Env.debug = options.debug
Env.wait = options.wait
Env.valgrind = options.valgrind
Env.valgrind_leak = options.valgrind_leak
if Env.valgrind or Env.valgrind_leak:
Env.valgrind = which('valgrind')
Env.color = sys.stdin.isatty()
Env.COLOR_RESET = Env.color and "\033[0m" or ""
Env.COLOR_BLUE = Env.color and "\033[1;34m" or ""
Env.COLOR_GREEN = Env.color and "\033[1;32m" or ""
Env.COLOR_YELLOW = Env.color and "\033[1;33m" or ""
Env.COLOR_RED = Env.color and "\033[1;31m" or ""
Env.COLOR_CYAN = Env.color and "\033[1;36m" or ""
Env.fcgi_cgi = which('fcgi-cgi')
Env.dir = mkdtemp(suffix='-l2-tests')
Env.defaultwww = os.path.join(Env.dir, "www", "default")
Env.log = open(os.path.join(Env.dir, "tests.log"), "w")
if Env.color: Env.log = RemoveEscapeSeq(Env.log)
if Env.debug:
logfile = Env.log
Env.log = LogFile(sys.stdout, **{ "[log]": logfile })
sys.stderr = LogFile(sys.stderr, **{ "[stderr]": logfile })
sys.stdout = LogFile(sys.stdout, **{ "[stdout]": logfile })
else:
Env.log = LogFile(Env.log)
sys.stderr = LogFile(sys.stderr, **{ "[stderr]": Env.log })
sys.stdout = LogFile(sys.stdout, **{ "[stdout]": Env.log })
def main():
setup_env()
failed = False
try:
# run tests
tests = Tests()
tests.LoadTests()
failed = True
try:
tests.Prepare()
except:
import traceback
eprint(traceback.format_exc())
else:
if tests.Run():
failed = False
finally:
tests.Cleanup()
if not Env.no_cleanup and not failed:
os.remove(os.path.join(Env.dir, "tests.log"))
except:
import traceback
eprint(traceback.format_exc())
failed = True
finally:
try:
if Env.force_cleanup:
import shutil
shutil.rmtree(Env.dir)
elif not Env.no_cleanup and not failed:
os.rmdir(Env.dir)
except OSError:
eprint("Couldn't delete temporary directory '%s', probably not empty (perhaps due to some errors)" % Env.dir)
if failed:
sys.exit(1)
else:
sys.exit(0)

@ -8,7 +8,7 @@ import select
import signal
import time
import base
from pylt import base
__all__ = [ "Service", "ServiceException", "devnull", "Lighttpd", "FastCGI" ]

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import GroupTest
from pylt.requests import CurlRequest, TEST_TXT
class TestAlias1(CurlRequest):
URL = "/alias1"
@ -12,6 +13,7 @@ class TestAlias1(CurlRequest):
alias "/alias1" => var.default_docroot + "/test.txt";
"""
class TestAlias2(CurlRequest):
URL = "/alias2"
EXPECT_RESPONSE_BODY = TEST_TXT
@ -21,6 +23,7 @@ class TestAlias2(CurlRequest):
alias "/alias1" => "/nothing", "/alias2" => var.default_docroot + "/test.txt";
"""
class TestAlias3(CurlRequest):
URL = "/alias3/test.txt"
EXPECT_RESPONSE_BODY = TEST_TXT
@ -30,6 +33,7 @@ class TestAlias3(CurlRequest):
alias "/alias3" => var.default_docroot + "/";
"""
class Test(GroupTest):
group = [
TestAlias1,

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import GroupTest
from pylt.requests import CurlRequest
class TestSimple(CurlRequest):
vhost = "xyz.abc.basic-docroot"
@ -9,34 +10,40 @@ class TestSimple(CurlRequest):
EXPECT_RESPONSE_BODY = "/var/www/xyz.abc.basic-docroot/htdocs"
EXPECT_RESPONSE_CODE = 200
class TestSubdir(CurlRequest):
vhost = "xyz.abc.basic-docroot"
URL = "/?subdir"
EXPECT_RESPONSE_BODY = "/var/www/basic-docroot/xyz.abc.basic-docroot/htdocs"
EXPECT_RESPONSE_CODE = 200
class TestSubdirOpenRange(CurlRequest):
vhost = "test.xyz.abc.basic-docroot"
URL = "/?subdir-open-range"
EXPECT_RESPONSE_BODY = "/var/www/basic-docroot/test.xyz.abc/htdocs"
EXPECT_RESPONSE_CODE = 200
class TestSubdirFixedRange(CurlRequest):
vhost = "test.xyz.abc.basic-docroot"
URL = "/?subdir-fixed-range"
EXPECT_RESPONSE_BODY = "/var/www/basic-docroot/xyz.abc/htdocs"
EXPECT_RESPONSE_CODE = 200
class TestCascade(CurlRequest):
URL = "/?cascade"
EXPECT_RESPONSE_BODY = "/"
EXPECT_RESPONSE_CODE = 200
class TestCascadeFallback(CurlRequest):
URL = "/?cascade-fallback"
EXPECT_RESPONSE_BODY = "/var/www/fallback/htdocs"
EXPECT_RESPONSE_CODE = 200
class Test(GroupTest):
group = [
TestSimple,

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import GroupTest, TestBase
from pylt.requests import CurlRequest, RawRequest, TEST_TXT
LUA_SHOW_ENV_INFO="""
@ -23,6 +24,7 @@ class TestSimpleRequest(CurlRequest):
EXPECT_RESPONSE_CODE = 200
EXPECT_RESPONSE_HEADERS = [("Content-Type", "text/plain; charset=utf-8")]
class TestSimpleRequestStatus(CurlRequest):
URL = "/test.txt"
EXPECT_RESPONSE_BODY = TEST_TXT
@ -33,12 +35,14 @@ static_no_fail;
set_status 403;
"""
class TestSimpleRespond(CurlRequest):
URL = "/test.txt"
EXPECT_RESPONSE_BODY = "hello"
EXPECT_RESPONSE_CODE = 200
config = 'respond "hello";'
class TestIndex1(CurlRequest):
URL = "/"
EXPECT_RESPONSE_BODY = TEST_TXT
@ -48,6 +52,7 @@ defaultaction;
index "test.txt";
"""
class TestIndex2(CurlRequest):
URL = "/"
EXPECT_RESPONSE_BODY = TEST_TXT
@ -57,6 +62,7 @@ defaultaction;
index "index.html", "test.txt";
"""
class TestIndexNotExisting1(CurlRequest):
URL = "/not-existing"
EXPECT_RESPONSE_CODE = 404
@ -65,6 +71,7 @@ defaultaction;
index "index.html", "test.txt";
"""
class TestIndex3(CurlRequest):
URL = "/not-existing"
EXPECT_RESPONSE_BODY = TEST_TXT
@ -74,6 +81,7 @@ defaultaction;
index "/index.html", "/test.txt";
"""
class TestSimpleInfo(CurlRequest):
URL = "/?a_simple_query"
EXPECT_RESPONSE_BODY = "a_simple_query"
@ -89,6 +97,7 @@ class TestBadRequest1(RawRequest):
URL = "/?complicated?query= $"
EXPECT_RESPONSE_CODE = 400
class TestStaticExcludeExtensions1(CurlRequest):
URL = "/test.php"
EXPECT_RESPONSE_CODE = 403
@ -97,6 +106,7 @@ defaultaction;
static.exclude_extensions ".php";
"""
class TestStaticExcludeExtensions2(CurlRequest):
URL = "/test.php"
EXPECT_RESPONSE_CODE = 403
@ -105,6 +115,7 @@ defaultaction;
static.exclude_extensions (".php", ".py");
"""
class TestServerTag(CurlRequest):
URL = "/test.txt"
EXPECT_RESPONSE_BODY = TEST_TXT
@ -115,6 +126,7 @@ defaultaction;
server.tag "apache - no really!";
"""
class TestConditionalHeader1(CurlRequest):
URL = "/"
EXPECT_RESPONSE_BODY = "a"
@ -127,6 +139,7 @@ if req.header["X-Select"] == "a" {
}
"""
class TestConditionalHeader2(CurlRequest):
URL = "/"
EXPECT_RESPONSE_BODY = "b"
@ -138,6 +151,7 @@ if req.header["X-Select"] == "a" {
}
"""
class TestSimplePattern1(CurlRequest):
URL = "/"
EXPECT_RESPONSE_CODE = 403
@ -147,6 +161,7 @@ class TestSimplePattern1(CurlRequest):
respond 403 => "%{req.header[X-Select]}";
"""
class ProvideStatus(TestBase):
runnable = False
vhost = "status"
@ -155,6 +170,7 @@ setup { module_load "mod_status"; }
status.info;
"""
class Test(GroupTest):
group = [
TestSimpleRequest,

@ -1,11 +1,14 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from service import FastCGI
from io import BytesIO
import time
import hashlib
import pycurl
from pylt.base import Env, GroupTest
from pylt.requests import CurlRequest
from pylt.service import FastCGI
def generate_body(seed, size):
i = 0
@ -15,6 +18,7 @@ def generate_body(seed, size):
i += 1
return body.getvalue()[:size]
class CGI(FastCGI):
name = "fcgi_cgi"
binary = [ Env.fcgi_cgi ]
@ -83,20 +87,24 @@ class TestPathInfo1(CurlRequest):
EXPECT_RESPONSE_BODY = "/abc/xyz"
EXPECT_RESPONSE_CODE = 200
class TestRequestUri1(CurlRequest):
URL = "/envcheck.cgi/abc/xyz?REQUEST_URI"
EXPECT_RESPONSE_BODY = "/envcheck.cgi/abc/xyz?REQUEST_URI"
EXPECT_RESPONSE_CODE = 200
BODY = generate_body('hello world', 2*1024*1024)
BODY_SHA1 = hashlib.sha1(BODY).hexdigest()
class TestUploadLarge1(CurlRequest):
URL = "/uploadcheck.cgi"
POST = BODY
EXPECT_RESPONSE_BODY = BODY_SHA1
EXPECT_RESPONSE_CODE = 200
class ChunkedBodyReader:
def __init__(self, body, chunksize = 32*1024):
self.body = body
@ -110,6 +118,7 @@ class ChunkedBodyReader:
self.pos += size
return self.body[current:current+size]
class DelayedChunkedBodyReader:
def __init__(self, body, chunksize = 32*1024):
self.body = body
@ -135,21 +144,25 @@ class TestUploadLargeChunked1(CurlRequest):
curl.setopt(curl.UPLOAD, 1)
curl.setopt(pycurl.READFUNCTION, ChunkedBodyReader(BODY).read)
class TestChunkedEncoding1(CurlRequest):
URL = "/chunkedencodingcheck.cgi"
EXPECT_RESPONSE_BODY = "Hello World!\n"
EXPECT_RESPONSE_CODE = 200
class TestStderr1(CurlRequest):
URL = "/stderr.cgi"
EXPECT_RESPONSE_BODY = "Not a real error"
EXPECT_RESPONSE_CODE = 404
class TestExitError1(CurlRequest):
URL = "/exiterror.cgi"
EXPECT_RESPONSE_BODY = "Not a real error"
EXPECT_RESPONSE_CODE = 404
class TestExitErrorUpload1(CurlRequest):
URL = "/exiterror.cgi"
EXPECT_RESPONSE_BODY = "Not a real error"
@ -160,6 +173,7 @@ class TestExitErrorUpload1(CurlRequest):
curl.setopt(curl.UPLOAD, 1)
curl.setopt(pycurl.READFUNCTION, DelayedChunkedBodyReader("test").read)
class Test(GroupTest):
group = [
TestPathInfo1,

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import GroupTest
from pylt.requests import CurlRequest, TEST_TXT
class DeflateRequest(CurlRequest):
URL = "/test.txt"
@ -16,22 +17,28 @@ class DeflateRequest(CurlRequest):
class TestGzip(DeflateRequest):
ACCEPT_ENCODING = 'gzip'
class TestXGzip(DeflateRequest):
ACCEPT_ENCODING = 'x-gzip'
class TestDeflate(DeflateRequest):
ACCEPT_ENCODING = 'deflate'
# not supported
#class TestCompress(DeflateRequest):
# ACCEPT_ENCODING = 'compress'
class TestBzip2(DeflateRequest):
ACCEPT_ENCODING = 'bzip2'
class TestXBzip2(DeflateRequest):
ACCEPT_ENCODING = 'x-bzip2'
class TestDisableDeflate(CurlRequest):
URL = "/test.txt?nodeflate"
EXPECT_RESPONSE_BODY = TEST_TXT

@ -1,28 +1,33 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import GroupTest
from pylt.requests import CurlRequest, TEST_TXT
class TestDirlist(CurlRequest):
URL = "/foo/"
EXPECT_RESPONSE_CODE = 200
EXPECT_RESPONSE_HEADERS = [("Content-Type", "text/html; charset=utf-8")]
class TestRedirectDir(CurlRequest):
URL = "/foo"
EXPECT_RESPONSE_CODE = 301
EXPECT_RESPONSE_HEADERS = [("Location", "http://dirlist/foo/")]
class TestRedirectDirWithQuery(CurlRequest):
URL = "/foo?bar=baz"
EXPECT_RESPONSE_CODE = 301
EXPECT_RESPONSE_HEADERS = [("Location", "http://dirlist/foo/?bar=baz")]
class TestRedirectDirWithQueryAndSpecialChars(CurlRequest):
URL = "/f%3f%20o?bar=baz"
EXPECT_RESPONSE_CODE = 301
EXPECT_RESPONSE_HEADERS = [("Location", "http://dirlist/f%3f%20o/?bar=baz")]
class Test(GroupTest):
group = [
TestDirlist,

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import GroupTest
from pylt.requests import CurlRequest
class TestPatternCapture(CurlRequest):
# use capture from previous regex conditional
@ -15,6 +16,7 @@ if req.path =~ "(.*)" {
EXPECT_RESPONSE_BODY = "/path/"
EXPECT_RESPONSE_CODE = 200
class TestPatternCaptureRange(CurlRequest):
# use capture from previous regex conditional
config = """
@ -27,6 +29,7 @@ if req.path =~ "/([^/]*)/(.*)" {
EXPECT_RESPONSE_BODY = "pathxyz"
EXPECT_RESPONSE_CODE = 200
class TestPatternCaptureRevRange(CurlRequest):
# use capture from previous regex conditional
config = """
@ -39,6 +42,7 @@ if req.path =~ "/([^/]*)/(.*)" {
EXPECT_RESPONSE_BODY = "xyzpath"
EXPECT_RESPONSE_CODE = 200
class TestPatternEncodingPath(CurlRequest):
# encoding path
config = """
@ -49,6 +53,7 @@ show_env_info;
EXPECT_RESPONSE_BODY = "/complicated%3fpath%3d%20%24"
EXPECT_RESPONSE_CODE = 200
class TestPatternCombine(CurlRequest):
# combine several pieces
config = """
@ -61,6 +66,7 @@ show_env_info;
def Prepare(self):
self.EXPECT_RESPONSE_BODY = "Abc:/complicated%3fpath%3d%20%24:a_simple_query:" + self.vhost
class TestPatternEscape(CurlRequest):
config = """
env.set "INFO" => "\\\\%\\\\?\\\\$\\\\%{req.path}";

@ -1,10 +1,12 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import GroupTest
from pylt.requests import CurlRequest, CurlRequestException, TEST_TXT
retrieved_etag1 = None
class TestGetEtag1(CurlRequest):
URL = "/test.txt"
EXPECT_RESPONSE_BODY = TEST_TXT
@ -14,10 +16,11 @@ class TestGetEtag1(CurlRequest):
def CheckResponse(self):
global retrieved_etag1
if not 'etag' in self.resp_headers: # lowercase keys!
raise CurlRequestException("Response missing etag header" % (k, v1, v))
raise CurlRequestException("Response missing etag header")
retrieved_etag1 = self.resp_headers['etag'] # lowercase keys!
return super(TestGetEtag1, self).CheckResponse()
class TestTryEtag1(CurlRequest):
URL = "/test.txt"
EXPECT_RESPONSE_BODY = ""
@ -33,7 +36,7 @@ class TestTryEtag1(CurlRequest):
def CheckResponse(self):
global retrieved_etag1
if not 'etag' in self.resp_headers: # lowercase keys!
raise CurlRequestException("Response missing etag header" % (k, v1, v))
raise CurlRequestException("Response missing etag header")
etag = self.resp_headers['etag'] # lowercase keys!
if retrieved_etag1 != etag:
raise CurlRequestException("Response unexpected etag header response header '%s' (wanted '%s')" % (etag, retrieved_etag1))
@ -42,6 +45,7 @@ class TestTryEtag1(CurlRequest):
retrieved_etag2 = None
class TestGetEtag2(CurlRequest):
URL = "/test.txt"
EXPECT_RESPONSE_BODY = TEST_TXT
@ -50,10 +54,11 @@ class TestGetEtag2(CurlRequest):
def CheckResponse(self):
global retrieved_etag2
if not 'etag' in self.resp_headers: # lowercase keys!
raise CurlRequestException("Response missing etag header" % (k, v1, v))
raise CurlRequestException("Response missing etag header")
retrieved_etag2 = self.resp_headers['etag'] # lowercase keys!
return super(TestGetEtag2, self).CheckResponse()
class TestTryEtag2(CurlRequest):
URL = "/test.txt"
EXPECT_RESPONSE_BODY = ""
@ -69,7 +74,7 @@ class TestTryEtag2(CurlRequest):
global retrieved_etag1
global retrieved_etag2
if not 'etag' in self.resp_headers: # lowercase keys!
raise CurlRequestException("Response missing etag header" % (k, v1, v))
raise CurlRequestException("Response missing etag header")
etag = self.resp_headers['etag'] # lowercase keys!
if retrieved_etag1 == etag:
raise CurlRequestException("Response has same etag header as uncompressed response '%s' (wanted '%s')" % (etag, retrieved_etag2))
@ -77,5 +82,6 @@ class TestTryEtag2(CurlRequest):
raise CurlRequestException("Response unexpected etag header response header '%s' (wanted '%s')" % (etag, retrieved_etag2))
return super(TestTryEtag2, self).CheckResponse()
class Test(GroupTest):
group = [TestGetEtag1, TestTryEtag1, TestGetEtag2, TestTryEtag2]

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import GroupTest
from pylt.requests import CurlRequest, TEST_TXT
class TestSimpleRequest(CurlRequest):
PORT = 1
@ -12,6 +13,7 @@ class TestSimpleRequest(CurlRequest):
EXPECT_RESPONSE_HEADERS = [("Content-Type", "text/plain; charset=utf-8")]
vhost = "test1.ssl"
class TestSNI(CurlRequest):
PORT = 1
SCHEME = "https"

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import eprint, log, GroupTest
from pylt.requests import CurlRequest
class TestHeaderAdd(CurlRequest):
# use capture from previous regex conditional
@ -19,6 +20,7 @@ respond;
raise BaseException("Unexpected headers 'Test-Header'")
return True
class TestHeaderAppend(CurlRequest):
# use capture from previous regex conditional
config = """
@ -35,6 +37,7 @@ respond;
raise BaseException("Unexpected headers 'Test-Header'")
return True
class TestHeaderOverwrite(CurlRequest):
# use capture from previous regex conditional
config = """
@ -51,6 +54,7 @@ respond;
raise BaseException("Unexpected headers 'Test-Header'")
return True
class TestHeaderRemove(CurlRequest):
# use capture from previous regex conditional
config = """
@ -67,6 +71,7 @@ respond;
raise BaseException("Unexpected headers 'Test-Header'")
return True
class Test(GroupTest):
group = [
TestHeaderAdd,

@ -1,13 +1,13 @@
# -*- coding: utf-8 -*-
from base import GroupTest
from requests import CurlRequest
from service import Service
import socket
import os
import base
import time
from pylt import base
from pylt.requests import CurlRequest
from pylt.service import Service
class Memcached(Service):
name = "memcached"
@ -34,12 +34,14 @@ class Memcached(Service):
base.eprint("Couldn't delete socket '%s': %s" % (self.sockfile, e))
self.tests.CleanupDir(os.path.join("tmp", "sockets"))
class TestStore1(CurlRequest):
URL = "/"
EXPECT_RESPONSE_BODY = "Hello World!"
EXPECT_RESPONSE_CODE = 200
EXPECT_RESPONSE_HEADERS = [("X-Memcached-Hit", "false")]
class TestLookup1(CurlRequest):
URL = "/"
EXPECT_RESPONSE_BODY = "Hello World!"
@ -52,7 +54,8 @@ class TestLookup1(CurlRequest):
time.sleep(0.2)
return super(TestLookup1, self).Run()
class Test(GroupTest):
class Test(base.GroupTest):
group = [
TestStore1,
TestLookup1,

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import GroupTest
from pylt.requests import CurlRequest
class TestMimeType1(CurlRequest):
@ -10,18 +10,21 @@ class TestMimeType1(CurlRequest):
EXPECT_RESPONSE_CODE = 200
EXPECT_RESPONSE_HEADERS = [ ("Content-Type", "text/plain; charset=utf-8") ]
class TestMimeType2(CurlRequest):
URL = "/test.xt"
EXPECT_RESPONSE_BODY = ""
EXPECT_RESPONSE_CODE = 200
EXPECT_RESPONSE_HEADERS = [ ("Content-Type", "text/plain") ]
class TestMimeType3(CurlRequest):
URL = "/test.rxt"
EXPECT_RESPONSE_BODY = ""
EXPECT_RESPONSE_CODE = 200
EXPECT_RESPONSE_HEADERS = [ ("Content-Type", "text/strange") ]
class Test(GroupTest):
group = [TestMimeType1,TestMimeType2,TestMimeType3]

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import GroupTest
from pylt.requests import CurlRequest
#userI:passI for I in [1..4] with [apr-md5, crypt, plain and apr-sha]
PASSWORDS="""user1:$apr1$mhpONdUp$xSRcAbK2F6hLFUzW59tzW/
@ -14,36 +15,43 @@ user4:{SHA}LbTBgR9CRYKpD41+53mVzwGNlEM=
DIGESTPASSWORDS="""user5:Realm1:b0590e8c95605dd708226b552fc86a22
"""
class TestAprMd5Fail(CurlRequest):
URL = "/test.txt"
EXPECT_RESPONSE_CODE = 401
AUTH = "user1:test1"
class TestAprMd5Success(CurlRequest):
URL = "/test.txt"
EXPECT_RESPONSE_CODE = 200
AUTH = "user1:pass1"
class TestCryptFail(CurlRequest):
URL = "/test.txt"
EXPECT_RESPONSE_CODE = 401
AUTH = "user2:test2"
class TestCryptSuccess(CurlRequest):
URL = "/test.txt"
EXPECT_RESPONSE_CODE = 200
AUTH = "user2:pass2"
class TestPlainFail(CurlRequest):
URL = "/test.txt?plain"
EXPECT_RESPONSE_CODE = 401
AUTH = "user3:test3"
class TestPlainSuccess(CurlRequest):
URL = "/test.txt?plain"
EXPECT_RESPONSE_CODE = 200
AUTH = "user3:pass3"
class TestAprSha1Fail(CurlRequest):
URL = "/test.txt"
EXPECT_RESPONSE_CODE = 401
@ -54,30 +62,36 @@ class TestAprSha1Success(CurlRequest):
EXPECT_RESPONSE_CODE = 200
AUTH = "user4:pass4"
class TestDigestFail(CurlRequest):
URL = "/test.txt?digest"
EXPECT_RESPONSE_CODE = 401
AUTH = "user5:test5"
class TestDigestSuccess(CurlRequest):
URL = "/test.txt?digest"
EXPECT_RESPONSE_CODE = 200
AUTH = "user5:pass5"
class TestRequireUserDeny(CurlRequest):
URL = "/test.txt?require1"
EXPECT_RESPONSE_CODE = 403
AUTH = "user4:pass4"
class TestRequireUserSuccess(CurlRequest):
URL = "/test.txt?require1"
EXPECT_RESPONSE_CODE = 200
AUTH = "user1:pass1"
class TestDeny(CurlRequest):
URL = "/test.txt?deny"
EXPECT_RESPONSE_CODE = 403
class Test(GroupTest):
group = [
TestAprMd5Fail, TestAprMd5Success,

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import GroupTest
from pylt.requests import CurlRequest
LUA_TEST_OPTIONS="""
@ -17,14 +18,17 @@ actions = {
"""
class TestSetupOption(CurlRequest):
URL = "/"
EXPECT_RESPONSE_HEADERS = [("Server", "lighttpd 2.0 with lua")]
class TestChangeOption(CurlRequest):
URL = "/?change"
EXPECT_RESPONSE_HEADERS = [("Server", "lighttpd 2.0 with modified lua")]
class Test(GroupTest):
group = [
TestSetupOption, TestChangeOption,

@ -1,10 +1,12 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
import socketserver
import threading
from pylt.base import Env, GroupTest
from pylt.requests import CurlRequest, TEST_TXT
class HttpBackendHandler(socketserver.StreamRequestHandler):
def handle(self):
keepalive = True
@ -72,6 +74,7 @@ class HttpBackend(socketserver.ThreadingMixIn, socketserver.TCPServer):
self.listen_thread.daemon = True
self.listen_thread.start()
class TestSimple(CurlRequest):
URL = "/test.txt"
EXPECT_RESPONSE_CODE = 200
@ -83,6 +86,7 @@ self_proxy;
"""
no_docroot = True
# backend gets encoded %2F
class TestProxiedRewrittenEncodedURL(CurlRequest):
URL = "/foo%2Ffile?abc"
@ -94,6 +98,7 @@ rewrite_raw "/foo(.*)" => "/dest$1";
backend_proxy;
"""
# backend gets decoded %2F
class TestProxiedRewrittenDecodedURL(CurlRequest):
URL = "/foo%2Ffile?abc"
@ -105,6 +110,7 @@ rewrite "/foo(.*)" => "/dest$1";
backend_proxy;
"""
# fake a backend forcing keep-alive mode
class TestBackendForcedKeepalive(CurlRequest):
URL = "/keepalive"
@ -115,6 +121,7 @@ class TestBackendForcedKeepalive(CurlRequest):
backend_proxy;
"""
# have backend "Upgrade"
class TestBackendUpgrade(CurlRequest):
URL = "/upgrade/custom"
@ -125,6 +132,7 @@ class TestBackendUpgrade(CurlRequest):
backend_proxy;
"""
class TestBackendDelayedChunk(CurlRequest):
URL = "/chunked/delay"
EXPECT_RESPONSE_BODY = "Hi!\n"
@ -134,6 +142,7 @@ class TestBackendDelayedChunk(CurlRequest):
backend_proxy;
"""
class TestBackendNoLength(CurlRequest):
URL = "/nolength"
EXPECT_RESPONSE_BODY = "Hello world"
@ -143,6 +152,7 @@ class TestBackendNoLength(CurlRequest):
backend_proxy;
"""
class Test(GroupTest):
group = [
TestSimple,

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import GroupTest
from pylt.requests import CurlRequest, TEST_TXT
class TestSimpleRequest(CurlRequest):
PORT = 2
@ -12,6 +13,7 @@ class TestSimpleRequest(CurlRequest):
EXPECT_RESPONSE_HEADERS = [("Content-Type", "text/plain; charset=utf-8")]
vhost = "test1.ssl"
class Test(GroupTest):
group = [
TestSimpleRequest

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import GroupTest
from pylt.requests import CurlRequest, TEST_TXT
class TestRewrite1(CurlRequest):
URL = "/somefile"
@ -13,6 +14,7 @@ rewrite "^/somefile$" => "/test.txt";
defaultaction;
"""
class TestRewrite2(CurlRequest):
URL = "/somefile"
EXPECT_RESPONSE_BODY = TEST_TXT
@ -22,6 +24,8 @@ class TestRewrite2(CurlRequest):
rewrite "/somethingelse" => "/nothing", "^/somefile$" => "/test.txt";
defaultaction;
"""
class Test(GroupTest):
plain_config = """
setup { module_load "mod_rewrite"; }

@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
from pylt.base import GroupTest
from pylt.requests import CurlRequest, TEST_TXT
class TestRewrite1(CurlRequest):
URL = "/somefile"
@ -13,6 +14,7 @@ rewrite "^/somefile$" => "/test.txt";
defaultaction;
"""
class TestRewrite2(CurlRequest):
URL = "/somefile"
EXPECT_RESPONSE_BODY = TEST_TXT
@ -23,6 +25,7 @@ rewrite "/somethingelse" => "/nothing", "^/somefile$" => "/test.txt";
defaultaction;
"""
# match decoded and simplified paths by default
class TestRewrite3(CurlRequest):
URL = "/http://some%2Ffile"
@ -33,6 +36,7 @@ rewrite "/http:/some(/.*)" => "/dest$1";
respond 200 => "%{req.path}";
"""
# match raw paths and simplify path
class TestRewrite4(CurlRequest):
URL = "/http://some%2Ffile"
@ -43,6 +47,7 @@ rewrite_raw "(/http://some%2F.*)" => "/dest$1";
respond 200 => "%{req.path}";
"""
# match and write raw paths
class TestRewrite5(CurlRequest):
URL = "/http://some%2Ffile"
@ -53,6 +58,7 @@ rewrite_raw "(/http://some%2F.*)" => "/dest$1";
respond 200 => "%{req.raw_path}";
"""
# raw match and write query string
class TestRewrite6(CurlRequest):
URL = "/http://some%2Ffile"
@ -63,6 +69,7 @@ rewrite_raw