Sh3ll
OdayForums


Server : Apache
System : Linux server1.cgrithy.com 3.10.0-1160.95.1.el7.x86_64 #1 SMP Mon Jul 24 13:59:37 UTC 2023 x86_64
User : nobody ( 99)
PHP Version : 8.1.23
Disable Function : NONE
Directory :  /etc/apache2/conf.d/modsec_vendor_configs/OWASP3/tests/regression/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //etc/apache2/conf.d/modsec_vendor_configs/OWASP3/tests/regression/CRS_Tests.py
from subprocess import TimeoutExpired
from ftw import logchecker, testrunner, http
from ftw.ruleset import Input
import pytest
import os

CRS_HEADER = 'X-CRS-Test'

def test_crs(test, logchecker_obj):
    runner = testrunner.TestRunner()
    for stage in test.stages:
        runner.run_stage(stage, logchecker_obj)


class FooLogChecker(logchecker.LogChecker):
    def __init__(self, config):
        super(FooLogChecker, self).__init__()
        self.log_location = self.find_log_location(config)
        self.backwards_reader = BackwardsReader(self.log_location)
        self.start_marker = None
        self.end_marker = None

    def mark_start(self, stage_id):
        self.start_marker = self.find_marker(stage_id)

    def mark_end(self, stage_id):
        self.end_marker = self.find_marker(stage_id)

    def find_marker(self, stage_id):
        stage_id_bytes = stage_id.encode('utf-8')
        header_bytes = CRS_HEADER.encode('utf-8')
        def try_once():
            self.mark_and_flush_log(stage_id)
            self.backwards_reader.reset()
            return self.backwards_reader.readline() or b''
            
        line = try_once()
        while not (header_bytes in line and stage_id_bytes in line):
            line = try_once()
        return line

    def get_logs(self):
        logs = []
        # At this point we're already at the end marker
        for line in self.backwards_reader.readlines():
            if line == self.start_marker:
                break

            logs.append(line.decode('utf-8'))
        return logs

    def mark_and_flush_log(self, header_value):
        """
        Send a valid request to the server with a special header that will
        generate an entry in the log. We can use this to flush the log and to
        mark the output so we know where our test output is.
        """
        http.HttpUA().send_request(Input(
            headers={
                'Host': 'localhost',
                'User-Agent': 'CRS',
                'Accept': '*/*',
                CRS_HEADER: header_value
            },
            version='HTTP/1.0'))

    @staticmethod
    def find_log_location(config):
        key = 'log_location_linux' 
        # First, try to find the log configuration from config.ini
        if key in config:
            return config[key]
        else:
            # Now we could check for the configuration that was passed
            # on the command line. Unfortunately, we use a default, so we
            # don't know whether it was *actually* on the command line.
            # Let's try to find the Docker container instead.
            import os.path
            import subprocess
            prefix = os.path.join('tests', 'logs')
            log_file_name = 'error.log'
            directory_name = 'modsec2-apache'
            process = subprocess.Popen(
                'docker ps --format "{{.Names}}"',
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE)
            try:
                out, _ = process.communicate(timeout=10)
            except TimeoutExpired:
                out = ''
            if b'modsec3-nginx' in out:
                directory_name = 'modsec3-nginx'
            return os.path.join(prefix, directory_name, log_file_name)



@pytest.fixture(scope='session')
def logchecker_obj(config):
    return FooLogChecker(config)

# Adapted from http://code.activestate.com/recipes/120686-read-a-text-file-backwards/
class BackwardsReader:
  def __init__(self, file, blksize=4096):
    """initialize the internal structures"""
    self.file = file
    # how big of a block to read from the file...
    self.blksize = blksize
    self.f = open(file, 'rb')

    self.reset()

  def readline(self):
    while len(self.data) == 1 and ((self.blkcount * self.blksize) < self.size):
      self.blkcount = self.blkcount + 1
      line = self.data[0]
      try:
        self.f.seek(-self.blksize * self.blkcount, os.SEEK_END) # read from end of file
        self.data = (self.f.read(self.blksize) + line).split(b'\n')
      except IOError:  # can't seek before the beginning of the file
        self.f.seek(0)
        self.data = (self.f.read(self.size - (self.blksize * (self.blkcount-1))) + line).split(b'\n')

    if len(self.data) == 0:
      return ""

    line = self.data.pop()
    return line + b'\n'

  def readlines(self):
      line = self.readline()
      while line:
          yield line
          line = self.readline()
        
  def reset(self):
    # get the file size
    self.size = os.stat(self.file)[6]
    # how many blocks we've read
    self.blkcount = 1
    # if the file is smaller than the blocksize, read a block,
    # otherwise, read the whole thing...
    if self.size > self.blksize:
      self.f.seek(-self.blksize * self.blkcount, 2) # read from end of file
    self.data = self.f.read(self.blksize).split(b'\n')
    # strip the last item if it's empty...  a byproduct of the last line having
    # a newline at the end of it
    if not self.data[-1]:
      self.data.pop()

ZeroDay Forums Mini