Browse Source

[tests] python3 compability

Change-Id: I09c8eaf0cbe4a74b795aeb2a3b04f0abc39383c8
master
Stefan Bühler 1 year ago
parent
commit
9263deaeef
12 changed files with 124 additions and 113 deletions
  1. +62
    -51
      tests/base.py
  2. +19
    -18
      tests/requests.py
  3. +1
    -1
      tests/run-memcached.py
  4. +2
    -2
      tests/run-scgi-envcheck.py
  5. +4
    -6
      tests/runtests.py
  6. +8
    -8
      tests/service.py
  7. +10
    -9
      tests/t-cgi.py
  8. +4
    -4
      tests/t-etag.py
  9. +8
    -8
      tests/t-header-modify.py
  10. +2
    -2
      tests/t-memcached.py
  11. +2
    -2
      tests/t-scgi.py
  12. +2
    -2
      tests/t-secdownload.py

+ 62
- 51
tests/base.py View File

@@ -27,7 +27,7 @@ A test intance has the following attributes:
if the subtest doesn't provide a config
* runnable: whether to call Run
* todo: whether the test is expected to fail
* subdomains: whether a config should be used for (^|\.)vhost (i.e. vhost and all subdomains)
* subdomains: whether a config should be used for (^|\\.)vhost (i.e. vhost and all subdomains)

You can create files and directories in Prepare with TestBase.{PrepareVHostFile,PrepareFile,PrepareDir};
they will get removed on cleanup automatically (if the test was successful).
@@ -43,6 +43,7 @@ and the class will do everything for you. have a look at the class if you
need more details :)
"""

from __future__ import print_function

import os
import imp
@@ -54,7 +55,16 @@ import inspect

from service import *

__all__ = [ "Env", "Tests", "TestBase", "GroupTest" ]
__all__ = [ "Env", "Tests", "TestBase", "GroupTest", "eprint", "log" ]


def log(*args, **kwargs):
print(*args, file=Env.log, **kwargs)


def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)


class Dict(object):
pass
@@ -158,24 +168,25 @@ var.vhosts = var.vhosts + [ "%s" => {

def _run(self):
failed = False
print >> Env.log, "[Start] Running test %s" % (self.name)
log("[Start] Running test %s" % (self.name))
try:
if not self.Run():
failed = True
if self.todo:
print >> Env.log, "Test %s failed" % (self.name)
log("Test %s failed" % (self.name))
else:
print >> sys.stderr, "Test %s failed" % (self.name)
except BaseException, e:
eprint("Test %s failed" % (self.name))
except BaseException as e:
failed = True
if self.todo:
print >> Env.log, "Test %s failed:" % (self.name)
print >> Env.log, traceback.format_exc(10)
log("Test %s failed:" % (self.name))
log(traceback.format_exc(10))
else:
print >> sys.stderr, "Test %s failed: %s" % (self.name, e)
print >> Env.log, "Test %s failed:" % (self.name)
print >> Env.log, traceback.format_exc(10)
print >> Env.log, "[Done] Running test %s [result=%s]" % (self.name, failed and "Failed" or "Succeeded")
eprint("Test %s failed: %s" % (self.name, e))
eprint(traceback.format_exc(10))
log("Test %s failed:" % (self.name))
log(traceback.format_exc(10))
log("[Done] Running test %s [result=%s]" % (self.name, failed and "Failed" or "Succeeded"))
self._test_failed = failed and not self.todo
return not failed

@@ -197,12 +208,12 @@ var.vhosts = var.vhosts + [ "%s" => {
self.tests.CleanupDir(dirname)

# public
def PrepareVHostFile(self, fname, content, mode = 0644):
def PrepareVHostFile(self, fname, content, mode = 0o644):
"""remembers which files have been prepared and while remove them on cleanup; returns absolute pathname"""
fname = os.path.join('www', 'vhosts', self.vhost, fname)
return self.PrepareFile(fname, content, mode = mode)

def PrepareFile(self, fname, content, mode = 0644):
def PrepareFile(self, fname, content, mode = 0o644):
"""remembers which files have been prepared and while remove them on cleanup; returns absolute pathname"""
self._test_cleanup_files.append(fname)
return self.tests.PrepareFile(fname, content, mode = mode)
@@ -213,7 +224,7 @@ var.vhosts = var.vhosts + [ "%s" => {
return self.tests.PrepareDir(dirname)

def MissingFeature(self, feature):
print >> sys.stderr, Env.COLOR_RED + ("Skipping test '%s' due to missing '%s'" % (self.name, feature)) + Env.COLOR_RESET
eprint(Env.COLOR_RED + ("Skipping test '%s' due to missing '%s'" % (self.name, feature)) + Env.COLOR_RESET)
return False

# implement these yourself
@@ -242,7 +253,7 @@ class GroupTest(TestBase):
module_classes = inspect.getmembers(sys.modules[test_module], lambda member: inspect.isclass(member) and member.__module__ == test_module)
for name, obj in module_classes:
if name.startswith("Test") and not obj in self.group and obj != self.__class__:
print >> sys.stderr, "Test class", name, "not listed in group test list in", test_module, ".", self.__class__.__name__
eprint("Test class", name, "not listed in group test list in", test_module, ".", self.__class__.__name__)
self.subtests = []
for c in self.group:
t = c(self)
@@ -296,7 +307,7 @@ class Tests(object):

def add_test(self, test):
name = test.name
if self.tests_dict.has_key(name):
if name in self.tests_dict:
raise BaseException("Test '%s' already defined" % name)
self.tests_dict[name] = test
if test.runnable:
@@ -325,7 +336,7 @@ class Tests(object):
files = os.listdir(os.path.join(Env.sourcedir, "tests"))
files = filter(lambda x: x[-3:] == '.py', files)
files = filter(lambda x: x[:2] == 't-', files)
files.sort()
files = sorted(files)

mods = []
for f in files:
@@ -340,7 +351,7 @@ class Tests(object):
t._register(self)

def Prepare(self):
print >> Env.log, "[Start] Preparing tests"
log("[Start] Preparing tests")
cache_disk_etag_dir = self.PrepareDir(os.path.join("tmp", "cache_etag"))
errorlog = self.PrepareFile("log/error.log", "")
errorconfig = Env.debug and " " or """log [ default => "file:%s" ];""" % (errorlog)
@@ -419,7 +430,7 @@ var.reg_vhosts = [];
self.vhosts_config = ""

for t in self.tests:
print >> Env.log, "[Start] Preparing test '%s'" % (t.name)
log("[Start] Preparing test '%s'" % (t.name))
t._prepare()

self.config += self.vhosts_config
@@ -469,16 +480,16 @@ allow_listen "127.0.0.2:{Env.port}";
allow_listen ["127.0.0.2:{gnutlsport}", "127.0.0.2:{opensslport}"];
""".format(Env = Env, gnutlsport = Env.port+1, opensslport = Env.port + 2, valgrindconfig = valgrindconfig))

print >> Env.log, "[Done] Preparing tests"
log("[Done] Preparing tests")

print >> Env.log, "[Start] Preparing services"
log("[Start] Preparing services")
for s in self.services:
try:
s._prepare()
except:
self.failed = True
raise
print >> Env.log, "[Done] Preparing services"
log("[Done] Preparing services")

def _progress(self, i, n):
s = str(n)
@@ -491,7 +502,7 @@ allow_listen ["127.0.0.2:{gnutlsport}", "127.0.0.2:{opensslport}"];
DONE = Env.COLOR_YELLOW + "[DONE]" + Env.COLOR_RESET

testcount = len(self.run)
print >> Env.log, "[Start] Running tests"
log("[Start] Running tests")
Env.log.flush()
sys.stdout.flush()
sys.stderr.flush()
@@ -502,13 +513,13 @@ allow_listen ["127.0.0.2:{gnutlsport}", "127.0.0.2:{opensslport}"];
for t in self.run:
result = t._run()
if t.todo:
print >> sys.stdout, Env.COLOR_CYAN + self._progress(i, testcount) + fmt.format(t.name) + (result and DONE or TODO)
print(Env.COLOR_CYAN + self._progress(i, testcount) + fmt.format(t.name) + (result and DONE or TODO))
if result:
self.stat_done += 1
else:
self.stat_todo += 1
else:
print >> sys.stdout, Env.COLOR_CYAN + self._progress(i, testcount) + fmt.format(t.name) + (result and PASS or FAIL)
print(Env.COLOR_CYAN + self._progress(i, testcount) + fmt.format(t.name) + (result and PASS or FAIL))
if result:
self.stat_pass += 1
else:
@@ -519,9 +530,9 @@ allow_listen ["127.0.0.2:{gnutlsport}", "127.0.0.2:{opensslport}"];
sys.stdout.flush()
sys.stderr.flush()
self.failed = failed
print >> sys.stdout, ("%i out of %i tests passed (%.2f%%), %i tests failed, %i todo items, %i todo items are ready" %
(self.stat_pass, testcount, (100.0 * self.stat_pass)/testcount, self.stat_fail, self.stat_todo, self.stat_done))
print >> Env.log, "[Done] Running tests [result=%s]" % (failed and "Failed" or "Succeeded")
print(("%i out of %i tests passed (%.2f%%), %i tests failed, %i todo items, %i todo items are ready" %
(self.stat_pass, testcount, (100.0 * self.stat_pass)/testcount, self.stat_fail, self.stat_todo, self.stat_done)))
log("[Done] Running tests [result=%s]" % (failed and "Failed" or "Succeeded"))

Env.log.flush()
sys.stdout.flush()
@@ -530,20 +541,20 @@ allow_listen ["127.0.0.2:{gnutlsport}", "127.0.0.2:{opensslport}"];
return not failed

def Cleanup(self):
# print >> sys.stderr, "cleanup_files: %s, cleanup_dirs: %s" % (self.prepared_files, self.prepared_dirs)
# eprint("cleanup_files: %s, cleanup_dirs: %s" % (self.prepared_files, self.prepared_dirs))

if not Env.no_cleanup and not self.failed:
print >> Env.log, "[Start] Cleanup services"
log("[Start] Cleanup services")
for s in self.services:
s._cleanup()
print >> Env.log, "[Done] Cleanup services"
log("[Done] Cleanup services")
else:
print >> Env.log, "[Start] Stopping services"
log("[Start] Stopping services")
for s in self.services:
s._stop()
print >> Env.log, "[Done] Stopping services"
log("[Done] Stopping services")

print >> Env.log, "[Start] Cleanup tests"
log("[Start] Cleanup tests")
for t in self.tests:
t._cleanup()
if not Env.no_cleanup and not self.failed:
@@ -556,14 +567,14 @@ allow_listen ["127.0.0.2:{gnutlsport}", "127.0.0.2:{opensslport}"];
import shutil
shutil.rmtree(cache_disk_etag_dir)
os.mkdir(cache_disk_etag_dir)
except BaseException, e:
print >>sys.stderr, "Couldn't clear directory '%s': %s" % (fname, e)
except BaseException as e:
eprint("Couldn't clear directory '%s': %s" % (fname, e))
self.CleanupDir(os.path.join("tmp", "cache_etag"))
print >> Env.log, "[Done] Cleanup tests"
log("[Done] Cleanup tests")

## helpers for prepare/cleanup
def _preparefile(self, fname, content, mode = 0644):
if self.prepared_files.has_key(fname):
def _preparefile(self, fname, content, mode = 0o644):
if fname in self.prepared_files:
raise BaseException("File '%s' already exists!" % fname)
else:
path = os.path.join(Env.dir, fname)
@@ -574,18 +585,18 @@ allow_listen ["127.0.0.2:{gnutlsport}", "127.0.0.2:{opensslport}"];
self.prepared_files[fname] = 1

def _cleanupfile(self, fname):
if self.prepared_files.has_key(fname):
if fname in self.prepared_files:
try:
os.remove(os.path.join(Env.dir, fname))
except BaseException, e:
print >>sys.stderr, "Couldn't delete file '%s': %s" % (fname, e)
except BaseException as e:
eprint("Couldn't delete file '%s': %s" % (fname, e))
return False
return True
else:
return False

def _preparedir(self, dirname):
if self.prepared_dirs.has_key(dirname):
if dirname in self.prepared_dirs:
self.prepared_dirs[dirname] += 1
else:
os.mkdir(os.path.join(Env.dir, dirname))
@@ -596,29 +607,29 @@ allow_listen ["127.0.0.2:{gnutlsport}", "127.0.0.2:{opensslport}"];
if 0 == self.prepared_dirs[dirname]:
try:
os.rmdir(os.path.join(Env.dir, dirname))
except BaseException, e:
print >>sys.stderr, "Couldn't delete directory '%s': %s" % (dirname, e)
except BaseException as e:
eprint("Couldn't delete directory '%s': %s" % (dirname, e))

def PrepareFile(self, fname, content, mode = 0644):
path = filter(lambda x: x != '', fname.split('/'))
def PrepareFile(self, fname, content, mode = 0o644):
path = list(filter(lambda x: x != '', fname.split('/')))
for i in range(1, len(path)):
self._preparedir(os.path.join(*path[0:i]))
self._preparefile(os.path.join(*path), content, mode = mode)
return os.path.join(Env.dir, *path)

def PrepareDir(self, dirname):
path = filter(lambda x: x != '', dirname.split('/'))
path = list(filter(lambda x: x != '', dirname.split('/')))
for i in range(1, len(path)+1):
self._preparedir(os.path.join(*path[0:i]))
return os.path.join(Env.dir, *path)

def CleanupDir(self, dirname):
path = filter(lambda x: x != '', dirname.split('/'))
path = list(filter(lambda x: x != '', dirname.split('/')))
for i in reversed(range(1, len(path)+1)):
self._cleanupdir(os.path.join(*path[0:i]))

def CleanupFile(self, fname):
path = filter(lambda x: x != '', fname.split('/'))
path = list(filter(lambda x: x != '', fname.split('/')))
if not self._cleanupfile(os.path.join(*path)):
return False
for i in reversed(range(1, len(path))):


+ 19
- 18
tests/requests.py View File

@@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-

import pycurl
import StringIO

from io import BytesIO

import sys
import zlib
@@ -42,26 +43,26 @@ class CurlRequest(TestBase):
self.resp_body = None

def _recv_header(self, header):
header = header.rstrip()
header = header.decode("utf-8").rstrip()
if None == self.resp_first_line:
self.resp_first_line = header
return
if header == "":
if None != self.resp_first_line and self.resp_first_line.split(" ", 2)[1] == '100':
if Env.debugRequests:
print >> Env.log, "Handling 100 Continue: '%s'" % self.resp_first_line
log("Handling 100 Continue: '%s'" % self.resp_first_line)
self.resp_first_line = None
return
try:
(key, value) = header.split(":", 1)
except:
print >> sys.stderr, "Couldn't parse header '%s'" % header
eprint("Couldn't parse header '%s'" % header)
raise
key = key.strip()
value = value.strip()
self.resp_header_list.append((key, value))
key = key.lower()
if self.resp_headers.has_key(key):
if key in self.resp_headers:
self.resp_headers[key] += ", " + value
else:
self.resp_headers[key] = value
@@ -85,7 +86,7 @@ class CurlRequest(TestBase):
# ssl connections sometimes have timeout issues. could be entropy related..
# use 10 second timeout instead of 2 for ssl - only 3 requests, shouldn't hurt
c.setopt(pycurl.TIMEOUT, ("http" == self.SCHEME and 2) or 10)
b = StringIO.StringIO()
b = BytesIO()
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.HEADERFUNCTION, self._recv_header)
if None != self.POST: c.setopt(pycurl.POSTFIELDS, self.POST)
@@ -121,20 +122,20 @@ class CurlRequest(TestBase):
def dump(self):
c = self.curl
Env.log.flush()
print >> Env.log, "Dumping request for test '%s'" % self.name
print >> Env.log, "Curl request: URL = %s://%s:%i%s" % (self.SCHEME, self.vhost, Env.port + self.PORT, self.URL)
print >> Env.log, "Curl response code: %i " % (c.getinfo(pycurl.RESPONSE_CODE))
print >> Env.log, "Curl response headers:"
log("Dumping request for test '%s'" % self.name)
log("Curl request: URL = %s://%s:%i%s" % (self.SCHEME, self.vhost, Env.port + self.PORT, self.URL))
log("Curl response code: %i " % (c.getinfo(pycurl.RESPONSE_CODE)))
log("Curl response headers:")
for (k, v) in self.resp_header_list:
print >> Env.log, " %s: %s" % (k, v)
print >> Env.log, "Curl response body:"
print >> Env.log, self.ResponseBody()
log(" %s: %s" % (k, v))
log("Curl response body:")
log(self.ResponseBody())
Env.log.flush()

def _decode(self, method, data):
if 'x-gzip' == method or 'gzip' == method:
header = data[:10]
if "\x1f\x8b\x08\x00\x00\x00\x00\x00" != header[:8]:
if b"\x1f\x8b\x08\x00\x00\x00\x00\x00" != header[:8]:
raise CurlRequestException("Unsupported content-encoding gzip header")
return zlib.decompress(data[10:], -15)
elif 'deflate' == method:
@@ -149,10 +150,10 @@ class CurlRequest(TestBase):
def ResponseBody(self):
if None == self.resp_body:
body = self.buffer.getvalue()
if self.resp_headers.has_key("content-encoding"):
if "content-encoding" in self.resp_headers:
cenc = self.resp_headers["content-encoding"]
body = self._decode(cenc, body)
self.resp_body = body
self.resp_body = body.decode('utf-8')
return self.resp_body

def _checkResponse(self):
@@ -174,10 +175,10 @@ class CurlRequest(TestBase):

for (k, v) in self.EXPECT_RESPONSE_HEADERS:
if v == None:
if self.resp_headers.has_key(k.lower()):
if k.lower() in self.resp_headers:
raise CurlRequestException("Got unwanted response header '%s' = '%s'" % (k, self.resp_headers[k.lower()]))
else:
if not self.resp_headers.has_key(k.lower()):
if not k.lower() in self.resp_headers:
raise CurlRequestException("Didn't get wanted response header '%s'" % (k))
v1 = self.resp_headers[k.lower()]
if v1 != v:


+ 1
- 1
tests/run-memcached.py View File

@@ -57,7 +57,7 @@ class MemcacheDB:
return cas

def get(self, key):
if not self.d.has_key(key): return None
if not key in self.d: return None
entry = self.d[key]
if entry.expired():
self.d.pop(key)


+ 2
- 2
tests/run-scgi-envcheck.py View File

@@ -27,9 +27,9 @@ def parsereq(data):
key, value = header[0:2]
header = header[2:]
if '' == key: raise Exception("invalid request: empty key")
if env.has_key(key): raise Exception("invalid request: duplicate key")
if key in env: raise Exception("invalid request: duplicate key")
env[key] = value
if not env.has_key('SCGI') or env['SCGI'] != '1':
if not 'SCGI' in env or env['SCGI'] != '1':
raise Exception("invalid request: missing/broken SCGI=1 header")
return {'env': env, 'body': body}



+ 4
- 6
tests/runtests.py View File

@@ -5,7 +5,7 @@ import os
import sys
from logfile import LogFile, RemoveEscapeSeq

from base import Env, Tests
from base import Env, Tests, eprint

from optparse import OptionParser

@@ -120,7 +120,7 @@ try:
tests.Prepare()
except:
import traceback
print >> sys.stderr, traceback.format_exc()
eprint(traceback.format_exc())
else:
if tests.Run():
failed = False
@@ -130,7 +130,7 @@ try:
os.remove(os.path.join(Env.dir, "tests.log"))
except:
import traceback
print >> sys.stderr, traceback.format_exc()
eprint(traceback.format_exc())
failed = True
finally:
try:
@@ -140,9 +140,7 @@ finally:
elif not Env.no_cleanup and not failed:
os.rmdir(Env.dir)
except OSError:
print >> sys.stderr, "Couldn't delete temporary directory '%s', probably not empty (perhaps due to some errors)" % Env.dir

Env.log.close()
eprint("Couldn't delete temporary directory '%s', probably not empty (perhaps due to some errors)" % Env.dir)

if failed:
sys.exit(1)


+ 8
- 8
tests/service.py View File

@@ -52,7 +52,7 @@ class Service(object):
return devnull()

def fork(self, *args, **kwargs):
if kwargs.has_key('inp'):
if 'inp' in kwargs:
inp = kwargs['inp']
else:
inp = devnull()
@@ -71,7 +71,7 @@ class Service(object):
tlog = self.tests.PrepareFile("log/truss-%s.log" % self.name, "")
args = trussargs + [ tlog ] + list(args)

print >> base.Env.log, "Spawning '%s': %s" % (self.name, ' '.join(args))
base.log("Spawning '%s': %s" % (self.name, ' '.join(args)))
proc = subprocess.Popen(args, stdin = inp, stdout = logfile, stderr = logfile, close_fds = True, preexec_fn = preexec)
if None != inp: inp.close()
if None != logfile: logfile.close()
@@ -85,7 +85,7 @@ class Service(object):
if None == proc: return
self.proc = None
if None == proc.poll():
print >> base.Env.log, "Terminating service (%s) '%s'" % (ss, self.name)
base.log("Terminating service (%s) '%s'" % (ss, self.name))
try:
os.killpg(proc.pid, s)
s = signal.SIGTERM
@@ -93,11 +93,11 @@ class Service(object):
proc.terminate()
except:
pass
print >> base.Env.log, "Waiting for service '%s'" % (self.name)
base.log("Waiting for service '%s'" % (self.name))
if base.Env.wait: proc.wait()
while not procwait(proc):
try:
print >> base.Env.log, "Terminating service (%s) '%s'" % (ss, self.name)
base.log("Terminating service (%s) '%s'" % (ss, self.name))
os.killpg(proc.pid, s)
s = signal.SIGKILL
ss = "SIGKILL"
@@ -172,7 +172,7 @@ class Lighttpd(Service):
args = [base.Env.worker, '-m', base.Env.plugindir, '-c', base.Env.lighttpdconf, '-t']
if base.Env.valgrind:
args = [base.Env.valgrind] + args
print >> base.Env.log, "Testing lighttpd config: %s" % (' '.join(args))
base.log("Testing lighttpd config: %s" % (' '.join(args)))
proc = subprocess.Popen(args, stdin = inp, stdout = logfile, stderr = logfile, close_fds = True)
if None != inp: inp.close()
logfile.close()
@@ -212,6 +212,6 @@ class FastCGI(Service):
if None != self.sockfile:
try:
os.remove(self.sockfile)
except BaseException, e:
print >>sys.stderr, "Couldn't delete socket '%s': %s" % (self.sockfile, e)
except BaseException as e:
base.eprint("Couldn't delete socket '%s': %s" % (self.sockfile, e))
self.tests.CleanupDir(os.path.join("tmp", "sockets"))

+ 10
- 9
tests/t-cgi.py View File

@@ -3,16 +3,17 @@
from base import *
from requests import *
from service import FastCGI
from io import BytesIO
import time
import hashlib

def generate_body(seed, size):
i = 0
body = ''
while len(body) < size:
body += hashlib.sha1(seed + str(i)).digest()
body = BytesIO()
while body.tell() < size:
body.write(hashlib.sha1((seed + str(i)).encode('utf-8')).digest())
i += 1
return body[:size]
return body.getvalue()[:size]

class CGI(FastCGI):
name = "fcgi_cgi"
@@ -199,8 +200,8 @@ cgi = {{
return True

def Prepare(self):
self.PrepareVHostFile("envcheck.cgi", SCRIPT_ENVCHECK, mode = 0755)
self.PrepareVHostFile("uploadcheck.cgi", SCRIPT_UPLOADCHECK, mode = 0755)
self.PrepareVHostFile("chunkedencodingcheck.cgi", SCRIPT_CHUNKEDENCODINGCHECK, mode = 0755)
self.PrepareVHostFile("stderr.cgi", SCRIPT_STDERRCHECK, mode = 0755)
self.PrepareVHostFile("exiterror.cgi", SCRIPT_EXITERRORCHECK, mode = 0755)
self.PrepareVHostFile("envcheck.cgi", SCRIPT_ENVCHECK, mode = 0o755)
self.PrepareVHostFile("uploadcheck.cgi", SCRIPT_UPLOADCHECK, mode = 0o755)
self.PrepareVHostFile("chunkedencodingcheck.cgi", SCRIPT_CHUNKEDENCODINGCHECK, mode = 0o755)
self.PrepareVHostFile("stderr.cgi", SCRIPT_STDERRCHECK, mode = 0o755)
self.PrepareVHostFile("exiterror.cgi", SCRIPT_EXITERRORCHECK, mode = 0o755)

+ 4
- 4
tests/t-etag.py View File

@@ -13,7 +13,7 @@ class TestGetEtag1(CurlRequest):

def CheckResponse(self):
global retrieved_etag1
if not self.resp_headers.has_key('etag'): # lowercase keys!
if not 'etag' in self.resp_headers: # lowercase keys!
raise CurlRequestException("Response missing etag header" % (k, v1, v))
retrieved_etag1 = self.resp_headers['etag'] # lowercase keys!
return super(TestGetEtag1, self).CheckResponse()
@@ -33,7 +33,7 @@ class TestTryEtag1(CurlRequest):

def CheckResponse(self):
global retrieved_etag1
if not self.resp_headers.has_key('etag'): # lowercase keys!
if not 'etag' in self.resp_headers: # lowercase keys!
raise CurlRequestException("Response missing etag header" % (k, v1, v))
etag = self.resp_headers['etag'] # lowercase keys!
if retrieved_etag1 != etag:
@@ -50,7 +50,7 @@ class TestGetEtag2(CurlRequest):

def CheckResponse(self):
global retrieved_etag2
if not self.resp_headers.has_key('etag'): # lowercase keys!
if not 'etag' in self.resp_headers: # lowercase keys!
raise CurlRequestException("Response missing etag header" % (k, v1, v))
retrieved_etag2 = self.resp_headers['etag'] # lowercase keys!
return super(TestGetEtag2, self).CheckResponse()
@@ -70,7 +70,7 @@ class TestTryEtag2(CurlRequest):
def CheckResponse(self):
global retrieved_etag1
global retrieved_etag2
if not self.resp_headers.has_key('etag'): # lowercase keys!
if not 'etag' in self.resp_headers: # lowercase keys!
raise CurlRequestException("Response missing etag header" % (k, v1, v))
etag = self.resp_headers['etag'] # lowercase keys!
if retrieved_etag1 == etag:


+ 8
- 8
tests/t-header-modify.py View File

@@ -13,9 +13,9 @@ respond;
URL = "/path?simple_query"

def CheckResponse(self):
h = map(lambda x: x[1], filter(lambda x: x[0] == "Test-Header", self.resp_header_list))
h = [ value for key, value in self.resp_header_list if key == "Test-Header" ]
if len(h) != 2 or h[0] != "simple_query" or h[1] != "/path":
print >>Env.log, repr(h)
eprint(repr(h))
raise BaseException("Unexpected headers 'Test-Header'")
return True

@@ -29,9 +29,9 @@ respond;
URL = "/path?simple_query"

def CheckResponse(self):
h = map(lambda x: x[1], filter(lambda x: x[0] == "Test-Header", self.resp_header_list))
h = [ value for key, value in self.resp_header_list if key == "Test-Header" ]
if len(h) != 1 or h[0] != "simple_query, /path":
print >>Env.log, repr(h)
log(repr(h))
raise BaseException("Unexpected headers 'Test-Header'")
return True

@@ -45,9 +45,9 @@ respond;
URL = "/path?simple_query"

def CheckResponse(self):
h = map(lambda x: x[1], filter(lambda x: x[0] == "Test-Header", self.resp_header_list))
h = [ value for key, value in self.resp_header_list if key == "Test-Header" ]
if len(h) != 1 or h[0] != "/path":
print >>Env.log, repr(h)
log(repr(h))
raise BaseException("Unexpected headers 'Test-Header'")
return True

@@ -61,9 +61,9 @@ respond;
URL = "/path?simple_query"

def CheckResponse(self):
h = map(lambda x: x[1], filter(lambda x: x[0] == "Test-Header", self.resp_header_list))
h = [ value for key, value in self.resp_header_list if key == "Test-Header" ]
if len(h) != 0:
print >>Env.log, repr(h)
log(repr(h))
raise BaseException("Unexpected headers 'Test-Header'")
return True



+ 2
- 2
tests/t-memcached.py View File

@@ -30,8 +30,8 @@ class Memcached(Service):
if None != self.sockfile:
try:
os.remove(self.sockfile)
except BaseException, e:
print >>sys.stderr, "Couldn't delete socket '%s': %s" % (self.sockfile, e)
except BaseException as e:
base.eprint("Couldn't delete socket '%s': %s" % (self.sockfile, e))
self.tests.CleanupDir(os.path.join("tmp", "sockets"))

class TestStore1(CurlRequest):


+ 2
- 2
tests/t-scgi.py View File

@@ -27,8 +27,8 @@ class SCGI(Service):
if None != self.sockfile:
try:
os.remove(self.sockfile)
except BaseException, e:
print >>sys.stderr, "Couldn't delete socket '%s': %s" % (self.sockfile, e)
except BaseException as e:
base.eprint("Couldn't delete socket '%s': %s" % (self.sockfile, e))
self.tests.CleanupDir(os.path.join("tmp", "sockets"))




+ 2
- 2
tests/t-secdownload.py View File

@@ -3,14 +3,14 @@
from base import *
from requests import *
import time
from md5 import md5
from hashlib import md5

def securl(prefix, path, secret, tstamp = None):
if tstamp == None: tstamp = time.time()
tstamp = '%x' % int(tstamp)
md5content = secret + path + tstamp
if prefix[-1] != '/': prefix += '/'
return prefix + md5(md5content).hexdigest() + '/' + tstamp + path
return prefix + md5(md5content.encode('utf-8')).hexdigest() + '/' + tstamp + path

class SecdownloadFail(CurlRequest):
URL = "/test.txt"


Loading…
Cancel
Save