[tests] use raw tcp for bad url request test

curl doesn't allow urls with spaces anymore (which is a good thing)

Change-Id: Id735a6e5d37a19a5ad4efc46830f8ec63a427415
master
Stefan Bühler 3 months ago committed by Stefan Bühler
parent 29e57d3005
commit 2879d7ccc2

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import socket
import pycurl
from io import BytesIO
@ -22,7 +23,8 @@ class CurlRequestException(Exception):
def __init__(self, value): self.value = value
def __str__(self): return repr(self.value)
class CurlRequest(TestBase):
class _BaseRequest(TestBase):
URL = None
SCHEME = "http"
PORT = 0 # offset to Env.port
@ -36,11 +38,12 @@ class CurlRequest(TestBase):
EXPECT_RESPONSE_HEADERS = []
def __init__(self, parent):
super(CurlRequest, self).__init__(parent)
super().__init__(parent)
self.resp_header_list = []
self.resp_headers = { }
self.resp_first_line = None
self.resp_body = None
self.resp_code = 0
def _recv_header(self, header):
header = header.decode("utf-8").rstrip()
@ -67,6 +70,66 @@ class CurlRequest(TestBase):
else:
self.resp_headers[key] = value
def _decode(self, method, data):
if 'x-gzip' == method or 'gzip' == method:
header = data[:10]
if b"\x1f\x8b\x08\x00\x00\x00\x00\x00" != header[:8]:
raise CurlRequestException("Unsupported content-encoding gzip header")
return zlib.decompress(data[10:], -15)
elif 'deflate' == method:
return zlib.decompress(data, -15)
elif 'compress' == method:
raise CurlRequestException("Unsupported content-encoding %s" % method)
elif 'x-bzip2' == method or 'bzip2' == method:
return bz2.decompress(data)
else:
raise CurlRequestException("Unsupported content-encoding %s" % method)
def ResponseBody(self):
if None == self.resp_body:
body = self.buffer.getvalue()
if "content-encoding" in self.resp_headers:
cenc = self.resp_headers["content-encoding"]
body = self._decode(cenc, body)
self.resp_body = body.decode('utf-8')
return self.resp_body
def _checkResponse(self):
if Env.debugRequests:
self.dump()
if not self.CheckResponse():
return False
if None != self.EXPECT_RESPONSE_CODE:
if self.resp_code != self.EXPECT_RESPONSE_CODE:
raise CurlRequestException("Unexpected response code %i (wanted %i)" % (self.resp_code, self.EXPECT_RESPONSE_CODE))
if None != self.EXPECT_RESPONSE_BODY:
if self.ResponseBody() != self.EXPECT_RESPONSE_BODY:
raise CurlRequestException("Unexpected response body")
for (k, v) in self.EXPECT_RESPONSE_HEADERS:
if v == None:
if k.lower() in self.resp_headers:
raise CurlRequestException("Got unwanted response header '%s' = '%s'" % (k, self.resp_headers[k.lower()]))
else:
if not k.lower() in self.resp_headers:
raise CurlRequestException("Didn't get wanted response header '%s'" % (k))
v1 = self.resp_headers[k.lower()]
if v1 != v:
raise CurlRequestException("Unexpected response header '%s' = '%s' (wanted '%s')" % (k, v1, v))
return True
def CheckResponse(self):
return True
class CurlRequest(_BaseRequest):
def PrepareRequest(self, curl, reqheaders):
pass
def Run(self):
if None == self.URL:
raise BasicException("You have to set URL in your CurlRequest instance")
@ -96,13 +159,14 @@ class CurlRequest(TestBase):
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.MAXREDIRS, 5)
self.curl = c
self.buffer = b
self.PrepareRequest(reqheaders)
self.PrepareRequest(c, reqheaders)
c.perform()
self.resp_code = c.getinfo(pycurl.RESPONSE_CODE)
try:
if not self._checkResponse():
raise CurlRequestException("Response check failed")
@ -112,19 +176,14 @@ class CurlRequest(TestBase):
raise
finally:
c.close()
self.curl = None
return True
def PrepareRequest(self, reqheaders):
pass
def dump(self):
c = self.curl
Env.log.flush()
log("Dumping request for test '%s'" % self.name)
log("Curl request: URL = %s://%s:%i%s" % (self.SCHEME, self.vhost, Env.port + self.PORT, self.URL))
log("Curl response code: %i " % (c.getinfo(pycurl.RESPONSE_CODE)))
log("Curl response code: %i " % (self.resp_code))
log("Curl response headers:")
for (k, v) in self.resp_header_list:
log(" %s: %s" % (k, v))
@ -132,59 +191,76 @@ class CurlRequest(TestBase):
log(self.ResponseBody())
Env.log.flush()
def _decode(self, method, data):
if 'x-gzip' == method or 'gzip' == method:
header = data[:10]
if b"\x1f\x8b\x08\x00\x00\x00\x00\x00" != header[:8]:
raise CurlRequestException("Unsupported content-encoding gzip header")
return zlib.decompress(data[10:], -15)
elif 'deflate' == method:
return zlib.decompress(data, -15)
elif 'compress' == method:
raise CurlRequestException("Unsupported content-encoding %s" % method)
elif 'x-bzip2' == method or 'bzip2' == method:
return bz2.decompress(data)
else:
raise CurlRequestException("Unsupported content-encoding %s" % method)
def ResponseBody(self):
if None == self.resp_body:
body = self.buffer.getvalue()
if "content-encoding" in self.resp_headers:
cenc = self.resp_headers["content-encoding"]
body = self._decode(cenc, body)
self.resp_body = body.decode('utf-8')
return self.resp_body
class RawRequest(_BaseRequest):
def __init__(self, parent):
super().__init__(parent)
self.request = b''
def _checkResponse(self):
c = self.curl
if Env.debugRequests:
self.dump()
def Run(self):
if None == self.URL:
raise Exception("You have to set URL in your CurlRequest instance")
reqheaders = ["Host: " + self.vhost] + self.REQUEST_HEADERS
if None != self.ACCEPT_ENCODING:
reqheaders += ["Accept-Encoding: " + self.ACCEPT_ENCODING]
if self.SCHEME != 'http':
raise Exception(f"Unsupported scheme {self.scheme!r}")
if None != self.POST:
raise Exception(f"POST not supported")
if None != self.AUTH:
raise Exception(f"AUTH not supported")
if not self.CheckResponse():
return False
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as tcp_con:
tcp_con.settimeout(2)
tcp_con.connect(('127.0.0.2', Env.port + self.PORT))
if None != self.EXPECT_RESPONSE_CODE:
code = c.getinfo(pycurl.RESPONSE_CODE)
if code != self.EXPECT_RESPONSE_CODE:
raise CurlRequestException("Unexpected response code %i (wanted %i)" % (code, self.EXPECT_RESPONSE_CODE))
req = f'GET {self.URL} HTTP/1.1\r\n'.encode()
for hdr in reqheaders:
req += hdr.encode() + b'\r\n'
# not implementing content-length handling / keep-alive / ...
req += b'Connection: close\r\n'
req += b'\r\n'
self.request = req
if None != self.EXPECT_RESPONSE_BODY:
if self.ResponseBody() != self.EXPECT_RESPONSE_BODY:
raise CurlRequestException("Unexpected response body")
tcp_con.sendall(req)
response = b''
while True:
chunk = tcp_con.recv(4069)
if not chunk:
break
response += chunk
for (k, v) in self.EXPECT_RESPONSE_HEADERS:
if v == None:
if k.lower() in self.resp_headers:
raise CurlRequestException("Got unwanted response header '%s' = '%s'" % (k, self.resp_headers[k.lower()]))
else:
if not k.lower() in self.resp_headers:
raise CurlRequestException("Didn't get wanted response header '%s'" % (k))
v1 = self.resp_headers[k.lower()]
if v1 != v:
raise CurlRequestException("Unexpected response header '%s' = '%s' (wanted '%s')" % (k, v1, v))
while True:
if not b'\n' in response:
raise Exception("Response missing header end")
line, response = response.split(b'\n', maxsplit=1)
if not line or line == b'\r':
break
self._recv_header(line)
return True
self.buffer = BytesIO(response)
self.resp_code = int(self.resp_first_line.split()[1])
try:
if not self._checkResponse():
raise CurlRequestException("Response check failed")
except:
if not Env.debugRequests:
self.dump()
raise
def CheckResponse(self):
return True
def dump(self):
Env.log.flush()
log("Dumping request for test '%s'" % self.name)
log("Request:")
for line in self.request.splitlines():
log(f" {line.decode()}")
log("Response code: %i " % (self.resp_code))
log("Response headers:")
for (k, v) in self.resp_header_list:
log(" %s: %s" % (k, v))
log("Response body:")
log(self.ResponseBody())
Env.log.flush()

@ -84,7 +84,7 @@ env.set "INFO" => "%{req.query}";
show_env_info;
"""
class TestBadRequest1(CurlRequest):
class TestBadRequest1(RawRequest):
# unencoded query
URL = "/?complicated?query= $"
EXPECT_RESPONSE_CODE = 400

@ -131,10 +131,9 @@ class TestUploadLargeChunked1(CurlRequest):
EXPECT_RESPONSE_CODE = 200
REQUEST_HEADERS = ["Transfer-Encoding: chunked"]
def PrepareRequest(self, reqheaders):
c = self.curl
c.setopt(c.UPLOAD, 1)
c.setopt(pycurl.READFUNCTION, ChunkedBodyReader(BODY).read)
def PrepareRequest(self, curl, reqheaders):
curl.setopt(curl.UPLOAD, 1)
curl.setopt(pycurl.READFUNCTION, ChunkedBodyReader(BODY).read)
class TestChunkedEncoding1(CurlRequest):
URL = "/chunkedencodingcheck.cgi"
@ -157,10 +156,9 @@ class TestExitErrorUpload1(CurlRequest):
EXPECT_RESPONSE_CODE = 404
REQUEST_HEADERS = ["Transfer-Encoding: chunked"]
def PrepareRequest(self, reqheaders):
c = self.curl
c.setopt(c.UPLOAD, 1)
c.setopt(pycurl.READFUNCTION, DelayedChunkedBodyReader("test").read)
def PrepareRequest(self, curl, reqheaders):
curl.setopt(curl.UPLOAD, 1)
curl.setopt(pycurl.READFUNCTION, DelayedChunkedBodyReader("test").read)
class Test(GroupTest):
group = [

@ -24,12 +24,11 @@ class TestTryEtag1(CurlRequest):
EXPECT_RESPONSE_CODE = 304
ACCEPT_ENCODING = None
def PrepareRequest(self, reqheaders):
def PrepareRequest(self, curl, reqheaders):
global retrieved_etag1
if retrieved_etag1 == None:
raise CurlRequestException("Don't have a etag value to request")
c = self.curl
c.setopt(c.HTTPHEADER, reqheaders + ["If-None-Match: " + retrieved_etag1])
curl.setopt(curl.HTTPHEADER, reqheaders + ["If-None-Match: " + retrieved_etag1])
def CheckResponse(self):
global retrieved_etag1
@ -60,12 +59,11 @@ class TestTryEtag2(CurlRequest):
EXPECT_RESPONSE_BODY = ""
EXPECT_RESPONSE_CODE = 304
def PrepareRequest(self, reqheaders):
def PrepareRequest(self, curl, reqheaders):
global retrieved_etag2
if retrieved_etag2 == None:
raise CurlRequestException("Don't have a etag value to request")
c = self.curl
c.setopt(c.HTTPHEADER, reqheaders + ["If-None-Match: " + retrieved_etag2])
curl.setopt(curl.HTTPHEADER, reqheaders + ["If-None-Match: " + retrieved_etag2])
def CheckResponse(self):
global retrieved_etag1

Loading…
Cancel
Save