2
0
Fork 0

[tests] add deflate support and tests

This commit is contained in:
Stefan Bühler 2013-05-24 10:57:09 +02:00
parent 1402e8987c
commit b1611a2047
3 changed files with 96 additions and 6 deletions

View File

@ -333,6 +333,7 @@ setup {{
module_load (
"mod_accesslog",
"mod_deflate",
"mod_dirlist",
"mod_lua",
"mod_vhost"
@ -354,6 +355,12 @@ defaultaction = {{
docroot "{Env.defaultwww}";
}};
do_deflate = {{
if request.is_handled {{
deflate;
}}
}};
var.vhosts = [];
var.reg_vhosts = [];
""".format(Env = Env, errorlog = errorlog, accesslog = accesslog)
@ -376,6 +383,9 @@ var.vhosts = var.vhosts + [ "default" => {
} ];
vhost.map var.vhosts;
static;
do_deflate;
"""
Env.lighttpdconf = self.PrepareFile("conf/lighttpd.conf", self.config)

View File

@ -4,6 +4,8 @@ import pycurl
import StringIO
import sys
import zlib
import bz2
from base import *
@ -17,6 +19,8 @@ class CurlRequest(TestBase):
PORT = 0 # offset to Env.port
AUTH = None
POST = None
REQUEST_HEADERS = []
ACCEPT_ENCODING = "deflate, gzip"
EXPECT_RESPONSE_BODY = None
EXPECT_RESPONSE_CODE = None
@ -27,6 +31,7 @@ class CurlRequest(TestBase):
self.resp_header_list = []
self.resp_headers = { }
self.resp_first_line = None
self.resp_body = None
def _recv_header(self, header):
header = header.rstrip()
@ -47,6 +52,7 @@ class CurlRequest(TestBase):
key = key.strip()
value = value.strip()
self.resp_header_list.append((key, value))
key = key.lower()
if self.resp_headers.has_key(key):
self.resp_headers[key] += ", " + value
else:
@ -55,9 +61,12 @@ class CurlRequest(TestBase):
def Run(self):
if None == self.URL:
raise BasicException("You have to set URL in your CurlRequest instance")
reqheaders = ["Host: " + self.vhost] + self.REQUEST_HEADERS
if None != self.ACCEPT_ENCODING:
reqheaders += ["Accept-Encoding: " + self.ACCEPT_ENCODING]
c = pycurl.Curl()
c.setopt(pycurl.URL, self.SCHEME + ("://127.0.0.2:%i" % (Env.port + self.PORT)) + self.URL)
c.setopt(pycurl.HTTPHEADER, ["Host: " + self.vhost])
c.setopt(pycurl.HTTPHEADER, reqheaders)
c.setopt(pycurl.NOSIGNAL, 1)
c.setopt(pycurl.TIMEOUT, 2)
b = StringIO.StringIO()
@ -103,9 +112,33 @@ class CurlRequest(TestBase):
for (k, v) in self.resp_header_list:
print >> Env.log, " %s: %s" % (k, v)
print >> Env.log, "Curl response body:"
print >> Env.log, self.buffer.getvalue()
print >> Env.log, self.ResponseBody()
Env.log.flush()
def _decode(self, method, data):
if 'x-gzip' == method or 'gzip' == method:
header = data[:10]
if "\x1f\x8b\x08\x00\x00\x00\x00\x00" != header[:8]:
raise CurlRequestException("Unsupported content-encoding gzip header")
return zlib.decompress(data[10:], -15)
elif 'deflate' == method:
return zlib.decompress(data, -15)
elif 'compress' == method:
raise CurlRequestException("Unsupported content-encoding %s" % method)
elif 'x-bzip2' == method or 'bzip2' == method:
return bz2.decompress(data)
else:
raise CurlRequestException("Unsupported content-encoding %s" % method)
def ResponseBody(self):
if None == self.resp_body:
body = self.buffer.getvalue()
if self.resp_headers.has_key("content-encoding"):
cenc = self.resp_headers["content-encoding"]
body = self._decode(cenc, body)
self.resp_body = body
return self.resp_body
def _checkResponse(self):
c = self.curl
if Env.debugRequests:
@ -120,14 +153,13 @@ class CurlRequest(TestBase):
raise CurlRequestException("Unexpected response code %i (wanted %i)" % (code, self.EXPECT_RESPONSE_CODE))
if None != self.EXPECT_RESPONSE_BODY:
body = self.buffer.getvalue()
if body != self.EXPECT_RESPONSE_BODY:
if self.ResponseBody() != self.EXPECT_RESPONSE_BODY:
raise CurlRequestException("Unexpected response body")
for (k, v) in self.EXPECT_RESPONSE_HEADERS:
if not self.resp_headers.has_key(k):
if not self.resp_headers.has_key(k.lower()):
raise CurlRequestException("Didn't get wanted response header '%s'" % (k))
v1 = self.resp_headers[k]
v1 = self.resp_headers[k.lower()]
if v1 != v:
raise CurlRequestException("Unexpected response header '%s' = '%s' (wanted '%s')" % (k, v1, v))

48
tests/t-deflate.py Normal file
View File

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
from base import *
from requests import *
TEST_TXT="""Hi!
0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
"""
class DeflateRequest(CurlRequest):
URL = "/test.txt"
EXPECT_RESPONSE_BODY = TEST_TXT
EXPECT_RESPONSE_CODE = 200
EXPECT_RESPONSE_HEADERS = [("Vary", "Accept-Encoding")]
def Prepare(self):
self.EXPECT_RESPONSE_HEADERS = self.EXPECT_RESPONSE_HEADERS + [ ("Content-Encoding", self.ACCEPT_ENCODING) ]
class TestGzip(DeflateRequest):
ACCEPT_ENCODING = 'gzip'
class TestXGzip(DeflateRequest):
ACCEPT_ENCODING = 'x-gzip'
class TestDeflate(DeflateRequest):
ACCEPT_ENCODING = 'deflate'
# not supported
#class TestCompress(DeflateRequest):
# ACCEPT_ENCODING = 'compress'
class TestBzip2(DeflateRequest):
ACCEPT_ENCODING = 'bzip2'
class TestXBzip2(DeflateRequest):
ACCEPT_ENCODING = 'x-bzip2'
class Test(GroupTest):
group = [TestGzip, TestXGzip, TestDeflate, TestBzip2, TestXBzip2]
def Prepare(self):
self.PrepareVHostFile("test.txt", TEST_TXT)
# deflate is enabled global too; force it here anyway
self.config = """static; do_deflate;"""