iCTF 2013 "secretvault" Writeup - Why ECB is a Bad Idea

Sat 07 December 2013 by Javex

So I wasn't too fond of this year's iCTF with their ever returning complexity that drives your focus away from the actual exploiting and security fixes. The concept sounded great but as with the previous two years it was just to much work besides acutally exploiting stuff. And the results showed that - we didn't do that great, but teams with no exploits at all made top ranks. The best move is not to play at all.

Besides that, there were still some fun challenges on different levels of difficulty. This one is a pretty easy crypto challenge called "secretvault".

Extracting the Source Code

The first thing you see when you look at the service are a lot of binary files:

├── Crypto.Cipher.AES.so
├── _ctypes.so
├── _heapq.so
├── _io.so
├── _multiprocessing.so
├── bz2.so
├── datetime.so
├── libpython2.7.so.1.0
├── pyexpat.so
├── readline.so
├── secretvault
├── server.log
├── start.sh
├── stop.sh
├── termios.so

Looking at all the names we quickly see this is not a binary challenge but a Python one and it contains compiled bytecode packed into a binary. The data is stored using ZIP so we can unzip it:

$ unzip -d testing secretvault
Archive:  secretvault
  inflating: testing/Crypto/__init__.pyc
  inflating: testing/Crypto/Cipher/__init__.pyc
  inflating: testing/Crypto/Cipher/AES.pyc
  inflating: testing/Queue.pyc
  ...
  inflating: testing/__main__.pyc
  ...
  inflating: testing/xmlrpclib.pyc
  inflating: testing/zipfile.pyc

Looking at the files we quickly see that the __main__ file is most likely the actual service, so we retrieve its sourcecode using uncompyle2:

$ uncompyle2 -o . __main__.pyc
# 2013.12.07 19:16:42 CET
+++ okay decompyling __main__.pyc
# decompiled 1 files: 1 okay, 0 failed, 0 verify failed
# 2013.12.07 19:16:43 CET

$ mv __main__.pyc_dis __main__.py

Let's take a look at the source code:

#Embedded file name: secretvault.py
import socket
import sys
import Crypto.Cipher.AES
import base64
import multiprocessing
import datetime
import time
import os
import os.path
from multiprocessing import Lock, Manager
import logging

class NullDevice:

    def write(self, s):
        pass

    def flush(self):
        pass


sys.stdout = NullDevice()
sys.stderr = NullDevice()
filesdir = '/var/ictf/services/secretvault'
FORMAT = '%(asctime)-15s %(message)s'
logger = logging.getLogger('server')
logging.disable(logging.CRITICAL)
lock = Lock()
allowed_commands = set(['ls', 'cat secret.txt', 'cat server.py'])
aes_key = os.urandom(32)
secret = os.urandom(200)
root = 'rootadministrator'
manager = Manager()
db = manager.dict()

def safe_string(s):
    return s.isalnum()


def cleanup():
    global filesdir
    try:
        now = datetime.datetime.now()
        fl = os.listdir(filesdir)
        for filename in fl:
            try:
                filepath = os.path.join(filesdir, filename)
                if os.path.isfile(filepath):
                    t = datetime.datetime.fromtimestamp(os.path.getctime(filepath))
                    if now - t > datetime.timedelta(seconds=600):
                        os.remove(filepath)
            except Exception as e:
                pass

    except Exception as e:
        pass


def authenticate(s, db, lock):
    lock.acquire()
    try:
        fields = s.split('|')
        if len(fields) < 2:
            raise Exception('Invalid format')
            return ''
        user = fields[0]
        user = user.strip()
        passwd = fields[1]
        passwd = passwd.strip()
        print user, passwd
        if not (safe_string(user) and safe_string(passwd)):
            raise Exception('Unsafe')
            return ''
        if user == root:
            if passwd == secret:
                print user, 'authenticated'
                return user
            else:
                raise Exception('Unauthorized')
                return ''
        else:
            if user in db and db[user] == passwd:
                print user, 'authenticated'
                return user
            if user not in db:
                print user, 'authenticated'
                db[user] = passwd
                return user
            return ''
    except:
        raise
    finally:
        lock.release()


def create_token(user):
    try:
        print 'creating the token'
        t = datetime.datetime.now()
        plain_token = '%d|%d|%d|%d|%d|%s|CHALLENGE' % (t.year,
         t.month,
         t.day,
         t.hour,
         t.minute,
         user)
        l1 = len('%d|%d|%d|%d|%d|' % (t.year,
         t.month,
         t.day,
         t.hour,
         t.minute))
        padded_plain_token = ' ' * (32 - l1) + plain_token
        l = len(padded_plain_token)
        padded_plain_token = padded_plain_token + ' ' * (16 - l % 16)
        aes = Crypto.Cipher.AES.new(aes_key, Crypto.Cipher.AES.MODE_ECB)
        enc_token = aes.encrypt(padded_plain_token)
        b64_token = base64.b64encode(enc_token)
        return b64_token
    except Exception as e:
        logger.error('%s' % str(e))
        raise


def parse_command(msg):
    try:
        fs = msg.split(',')
        b64_c = fs[0]
        command = fs[1]
        command = command.strip()
        print 'command=', command
        print 'fields=', fs
        c = base64.b64decode(b64_c)
        aes = Crypto.Cipher.AES.new(aes_key, Crypto.Cipher.AES.MODE_ECB)
        m = aes.decrypt(c)
        print 'm=', m
        fields = m.split('|')
        year = int(fields[0])
        month = int(fields[1])
        day = int(fields[2])
        hour = int(fields[3])
        minute = int(fields[4])
        user = fields[5]
        user = user.strip()
        if not safe_string(user):
            raise Exception('Invalid Username')
            return ''
        if 'CHALLENGE' not in fields[6]:
            raise Exception('Invalid Token')
            return ''
        print 'mfields=', fields
        ts = datetime.datetime(year, month, day, hour, minute)
        t = datetime.datetime.now()
        d = t - ts
        if d > datetime.timedelta(seconds=120):
            print 'expired'
            raise Exception('Expired')
            return ''
        if command == 'ls':
            print 'ls'
            try:
                pipe = os.popen('ls')
                out = pipe.read()
                pipe.close()
            except Exception as e:
                out = ''

            return out
        if command == 'cat server.py':
            print 'read src'
            try:
                pipe = os.popen('cat server.py')
                out = pipe.read()
                pipe.close()
            except Exception as e:
                out = ''
                raise

            return out
        if command == 'retv secret':
            print 'retv'
            if user == root:
                try:
                    pipe = os.popen('cat secret.txt')
                    out = pipe.read()
                    pipe.close()
                except Exception as e:
                    out = ''
                    raise

                return out
            else:
                try:
                    try:
                        users_vault = '%s/secret_of_%s.txt' % (filesdir, user)
                        pipe = os.popen('cat "%s"' % users_vault)
                        out = pipe.read()
                        pipe.close()
                        print 'out =', out
                    except Exception as e:
                        out = ''
                        raise

                except:
                    out = ''

                return out
        elif command.startswith('write '):
            print 'write'
            to_write = command[len('write '):]
            if user == root:
                try:
                    f = open('secret.txt', 'wt')
                    f.write(to_write)
                    f.close()
                except:
                    print 'failed'
                    raise

            else:
                try:
                    users_vault = '%s/secret_of_%s.txt' % (filesdir, user)
                    f = open(users_vault, 'wt')
                    f.write(to_write)
                    f.close()
                except:
                    print 'failed'
                    raise

        return ''
    except Exception as e:
        print e, 'msg=', msg
        raise
        return ''


def handle(connection, address, db, lock):
    try:
        connection.send('user|pass:')
        data = connection.recv(1024)
        print data, 'read'
        user = authenticate(data, db, lock)
        if len(user) == 0:
            raise Exception('Not authenticated')
        print 'user %s authenticated' % user
        print 'creating token for user', user
        tok = create_token(user)
        print 'token=', tok
        connection.send(tok)
        while True:
            connection.send('\ncookie,command:')
            data = ''
            tries = 0
            while len(data) == 0:
                if tries > 1:
                    connection.close()
                    return
                data = connection.recv(1024)
                tries += 1

            print 'data=', data
            out = parse_command(data)
            connection.send(out)

    except Exception as e:
        print e
    finally:
        try:
            connection.close()
        except:
            pass


def main():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_address = ('0.0.0.0', int(sys.argv[1]))
    sock.bind(server_address)
    sock.listen(1)
    processes = []
    lasttime = datetime.datetime.now()
    lastcleanup = datetime.datetime.now()
    firstcleanup = True
    try:
        while True:
            now = datetime.datetime.now()
            d = now - lasttime
            cleanupd = now - lastcleanup
            if cleanupd > datetime.timedelta(seconds=600):
                try:
                    if firstcleanup or not firstcleanup and not cleanupprocess.is_alive():
                        cleanupprocess = multiprocessing.Process(target=cleanup)
                        cleanupprocess.daemon = True
                        cleanupprocess.start()
                        lastcleanup = now
                        firstcleanup = False
                except Exception as e:
                    print e

            if d > datetime.timedelta(seconds=60) and False:
                lasttime = now
                tobe_removed = []
                print 'Terminating processes'
                for i in range(len(processes)):
                    if now - processes[i][1] > datetime.timedelta(seconds=60):
                        try:
                            print 'Terminated'
                        except Exception as e:
                            pass

                        tobe_removed.append(i)
                        try:
                            print 'Closed'
                        except Exception as e:
                            pass

                    else:
                        break

                for i in sorted(tobe_removed, reverse=True):
                    del processes[i]

            print >> sys.stderr, 'waiting for a connection'
            try:
                connection, client_address = sock.accept()
            except Exception as e:
                continue

            try:
                connection.settimeout(5)
                process = multiprocessing.Process(target=handle, args=(connection,
                 client_address,
                 db,
                 lock))
                process.daemon = True
                process.start()
            except Exception as e:
                print 'Cannot create the process', e
                raise

    except Exception as e:
        print e


if __name__ == '__main__':
    main()

How the Service Works

Lots of stuff is going on here so let me break it down for you. You connect to the socket and provide a username and password. If it exists, you are authenticated, if it is new it is created. There is also a special root user but we will not take a look at it in this writeup. When login is finished you get a base64 string as a token/cookie which you provide on all commands.

Once you have your cookie, you can execute the following commands:

  • ls: List files in working directory
  • cat server.py: Get sourcecode (Go figure, we didn't need to extract it)
  • retv secret: Get your users secret (deployed with write command)
  • write ...: Write arbitrary data to a file (this is where the secret is written by the gameserver)

Just using the service legitimately, we could find out all users by issuing the ls command, because the write command writes the users data in a file called secret_of_USERNAME.txt. However, with the information provided for the service, we get a so-called flag_id: It describes which flag the server wants. So we don't need to find out the username.

Here is how a legitimate run of the service woud look like:

user|pass:aaa|bbb
MzRJ854nDTJrUeStflCKqNgAxLz5SnxZl39eyjq30EYtU6goua2Lhf96LsCN5atd
cookie,command:MzRJ854nDTJrUeStflCKqNgAxLz5SnxZl39eyjq30EYtU6goua2Lhf96LsCN5atd,ls
secret_of_zjdG05qz5bB4I0ApV3AS4.txt

In this case, we logged in as user aaa with password bbb and got a base64 encoded cookie. Using that we executed the ls command, which in our test environment returns a single file.

Authentication

Our main problem, is how we can get the secret of another user. Let's first look at the authentication phase:

def authenticate(s, db, lock):
    lock.acquire()
    try:
        fields = s.split('|')
        if len(fields) < 2:
            raise Exception('Invalid format')
            return ''
        user = fields[0]
        user = user.strip()
        passwd = fields[1]
        passwd = passwd.strip()
        print user, passwd
        if not (safe_string(user) and safe_string(passwd)):
            raise Exception('Unsafe')
            return ''
        if user == root:
            if passwd == secret:
                print user, 'authenticated'
                return user
            else:
                raise Exception('Unauthorized')
                return ''
        else:
            if user in db and db[user] == passwd:
                print user, 'authenticated'
                return user
            if user not in db:
                print user, 'authenticated'
                db[user] = passwd
                return user
            return ''
    except:
        raise
    finally:
        lock.release()

Ignoring the root-part of the logic we notice several things. First of all, we see that we can provide more than two parameters to this function. The split is done on | but the length is checked for smaller than 2 not exactly two, so something like this is valid: aaa|bbb|ccc. Now ccc will be ignored but is allowed to be present.

Secondly, we see how a user is either created or authenticated, so we can log in using arbitrary usernames and passwords as long as they don't exist yet. Next, we note that the password and username are stripped (which we don't use, but it might be another possibility). The check for safe_string disallows any non-alphanumeric characters. We conclude that this function has some odds but looks fairly good - no direct exploit here.

Creation of Token

So let's look at the cookie being created:

def create_token(user):
    try:
        print 'creating the token'
        t = datetime.datetime.now()
        plain_token = '%d|%d|%d|%d|%d|%s|CHALLENGE' % (t.year,
         t.month,
         t.day,
         t.hour,
         t.minute,
         user)
        l1 = len('%d|%d|%d|%d|%d|' % (t.year,
         t.month,
         t.day,
         t.hour,
         t.minute))
        padded_plain_token = ' ' * (32 - l1) + plain_token
        l = len(padded_plain_token)
        padded_plain_token = padded_plain_token + ' ' * (16 - l % 16)
        aes = Crypto.Cipher.AES.new(aes_key, Crypto.Cipher.AES.MODE_ECB)
        enc_token = aes.encrypt(padded_plain_token)
        b64_token = base64.b64encode(enc_token)
        return b64_token
    except Exception as e:
        logger.error('%s' % str(e))
        raise

It includes a timestamp and the username into the tookie and we see that the token is padded to be exactly of length 32 for the timestamp part. The interesting part is the encryption, as it states AES.MODE_ECB. All available red lights should go off: This should be investigated. Taking a look at the input for encryption (padded_plain_token), we see that it consists of 32 bytes for the timestamp and a padded mount of 16-byte blocks for the username plus the word CHALLENGE is appended. Each value is separated by the | character. A final plain token looks like this: "     12|6|23|12|username|CHALLENGE    " where the spaces are adjusted to meet the block requirement.

Turning ECB Mode into an Exploit

Now we note that AES has a blocksize of 16 bytes which happens to be what the string was padded for: two blocks of timestamp, then any number of blocks, padded with spaces, for the username + challenge keyword. And this is where ECB comes in: Since the blocks are not linked, each block will always be the same independently of previous or later blocks. Thus a username like AAAAAAAAAAAAAAAABBB has the exact same first username block as a username like AAAAAAAAAAAAAAAACCC. Also, the username DDDDDDDDDDDDDDDDBBB will produce the exact same second block as the first username.

So while we cannot authenticate as an existing user (see above), we can get cookies for a user whose first 16 bytes are the same and one for a user whose remaining 5 bytes (gameserver gives random 21-byte usernames) are the same. Combining those we can split up the blocks, take the first two as the timestamp, then one of the following two blocks each from the parts of the username.

So it seems we can create a cookie for arbitrary usernames. Let's see where that gets us when executing a command:

def parse_command(msg):
    try:
        fs = msg.split(',')
        b64_c = fs[0]
        command = fs[1]
        command = command.strip()
        c = base64.b64decode(b64_c)
        aes = Crypto.Cipher.AES.new(aes_key, Crypto.Cipher.AES.MODE_ECB)
        m = aes.decrypt(c)
        fields = m.split('|')
        #...
        user = fields[5]

The username is taken from the cookie and looking at the commands above, this means we can execute the retv secret command for any user (i.e. the one the gameserver gives us).

That's it, now we gotta turn this into an exploit:

import socket
import os
import re
import base64
from time import sleep
import sys

HOST, PORT = "localhost", 1337
#HOST, PORT = "10.14.57.2", 2190
HOST, PORT = "10.14.57.2", 31337
USERNAME = "Be65QawKnybmXRQr35Cf"
#s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

class Exploit():

    def execute(self, ip, port, flag_id):
        # DO STUFF TO GET FLAG
        self.flag = exploit(ip, port, flag_id)

    def result(self):
        return {'FLAG' : self.flag}

def init():
    s = socket.socket()
    s.connect((HOST, PORT))
    username = USERNAME
    pwd = "A"*10
    print s.recv(1024),
    cmd = "%s|%s\n" % (username, pwd)
    print cmd,
    s.sendall(cmd)
    cookie = s.recv(1024).strip()
    print "Cookie: ", cookie
    print s.recv(1024),
    cmd = "%s,write FLGABCDEFG\n" % cookie
    print cmd,
    s.sendall(cmd)
    s.close()
    return cookie


def build_usercookie(ip, port, username):
    #cookie = login("A"*5, "B"*5)
    s = socket.socket()
    s.connect((ip, port))
    part1, part2 = username[:16], username[16:]
    assert len(part1) == 16
    assert len(part2) < 16
    print s.recv(1024),

    # Build part1 username
    username = "%s%s" % (part1, "A"*3)
    password = "B" * 5
    cmd = "%s|%s\n" % (username, password)
    print cmd,
    s.sendall(cmd)
    cookie = s.recv(1024).strip()
    print "Cookie for user %s: %s" % (username, cookie)
    cookie_data = base64.b64decode(cookie)
    # username blocks: username|CHALLENGE
    assert len(cookie_data) == 64
    block1 = cookie_data[32:48]
    s.close()

    # Build part2 username
    s = socket.socket()
    s.connect((ip, port))
    print s.recv(1024),
    username ="%s%s|CHALLENGE" % ("A" * 16, part2)
    password = "B" * 5
    cmd = "%s|%s\n" % (username, password)
    print cmd,
    s.sendall(cmd)
    cookie = s.recv(1024).strip()
    print "Cookie for user %s: %s" % (username, cookie)
    cookie_data = base64.b64decode(cookie)
    # username blocks: username|CHALLENGE
    assert len(cookie_data) == 64
    block2 = cookie_data[48:]
    timeblock = cookie_data[:32]
    assert len(timeblock) == 32
    assert len(block1) == 16
    assert len(block2) == 16
    final_cookie_data = timeblock + block1 + block2
    return s, base64.b64encode(final_cookie_data)


def retv_secret(s, cookie):
    print s.recv(1024),
    cmd = "%s,retv secret\n" % cookie
    print cmd,
    s.sendall(cmd)
    flag = s.recv(1024)
    s.close()
    return flag.strip()


def exploit(ip, port, username):
    s, cookie = build_usercookie(ip, port, username)
    return retv_secret(s, cookie)


if __name__ == '__main__':
    a = Exploit()
    a.execute(HOST, PORT, USERNAME)
    print a.result()

Note

This exploit does some stuff around it, not all of which makes total sense. For example, appending |CHALLENGE to the username for the second part is likely unnecessary, but we didn't modify it once it worked.

Final Words

I enjoyed the challenge because it was pretty simple crypto stuff that showed how ECB really is. In fact, right before the CTF I walked past an image stating "yo momma is so dumb she uses ecb" :-).

What I did not like was the exploit infrastructure. In theory it would go like this:

  1. Test exploit locally
  2. Test exploit against own vulnbox
  3. Test exploit using the test_exploit.py script provided
  4. Submit exploit where it gets tested against an unpatched vulnbox to make sure it works in the iCTF infrastructure.

Once all those steps are passed, one could assume the exploit is working fine. However, you don't have much to debug on (besides me putting a lot of assertions there that could fail on a problem) but then again you shouldn't need them. But, after over two hours my exploit failed to provide a single flag. So I performed all those steps again, made some adjustments (mostly organizational and cosmetic) and submitted the exploit again. And the flags magically came in. Maybe there was a bug in my exploit. But I don't know, because I cannot monitor its execution properly. Instead I have to rely on pure luck whether it works or not.

Adding to this came a horrible website for providing information that, while being built with AngularJS, required being reloaded automatically with JavaScript on. I thought the purpose of a client-side template engine was that you could update parts dynamically and re-render the template? What kept happening was that the site was so slow, that the automatic reload came in most of the time exactly when the data was loaded. But if you disabled the reload, you couldn't just press F5, because it would always stay on the old data (it kept the so-called tick count in the parameters where you had to delete it on each refresh).

This being the third iCTF in a row where we had massive problems with their infrastructure but not with the services / exploits themselves, I find myself wishing they'd move their manpower of designing fancy websites with JavaScript rendered graphs that you can drag around on to a simple and fun CTF experience. Because if you would have turned down all those complex rendering and self-hosting exploits you could provide a great CTF experience and focus on designing cool services.

Besides that, I want to thank the guys over at UCBS for organizing this huge CTF and putting all the work into it.


Comments