#!/usr/bin/env python3 import re import requests import sys import urllib3 RESPONSE_PATTERN = re.compile(r'''^<\?xml version=(["'])1\.0\1 encoding=(["'])UTF-8\2\?>''' '\n?' r'''''') NAME_PATTERN = re.compile(r'([^<]*)') KEY_PATTERN = re.compile(r'([^<]*)') MTIME_PATTERN = re.compile(r'([^<]*)') REDIRECT_PATTERN = re.compile(r'''^<\?xml version=(["'])1\.0\1 encoding=(["'])UTF-8\2\?>''' '\n?' r'PermanentRedirect') REDIRECT_TARGET_ENDPOINT_PATTERN = re.compile(r'([^<]*)') REDIRECT_TARGET_BUCKET_PATTERN = re.compile(r'([^<]*)') PROVIDERS = { 'amazon': ['https://s3.amazonaws.com/{}/', 'https://{}.s3.amazonaws.com/'], 'google': ['https://storage.googleapis.com/{}/'], 'scaleway': ['https://s3.nl-ams.scw.cloud/{}/', 'https://s3.fr-par.scw.cloud/{}/'], 'wasabi': ['https://s3.wasabisys.com/{}/'], } # AWS S3 buckets whose names contain a . are broken because AWS can't be bothered to serve valid TLS certs for them. urllib3.disable_warnings() def fetch_with_redirect(url): print(f'Fetching {url}', file = sys.stderr) r = requests.get(url, verify = False, timeout = 60) print(f'{r.status_code} {url}', file = sys.stderr) body = r.text if r.status_code == 301 and REDIRECT_PATTERN.match(body): m = REDIRECT_TARGET_ENDPOINT_PATTERN.search(body) if not m: raise RuntimeError('Could not get redirect endpoint') endpoint = m.group(1) m = REDIRECT_TARGET_BUCKET_PATTERN.search(body) if not m: raise RuntimeError('Could not get redirect bucket') bucket = m.group(1) print(f'Redirect to endpoint {endpoint!r} bucket {bucket!r}') url = f'https://{endpoint}/{bucket}/' print(f'Fetching {url}') r = requests.get(url, timeout = 60) print(f'{r.status_code} {url}', file = sys.stderr) body = r.text if r.status_code == 200 and not RESPONSE_PATTERN.match(body): raise RuntimeError(f'Invalid body: {body[:200]}...') return r, url, body def find(url, providers): _, _, body = fetch_with_redirect(url) # Get bucket name m = NAME_PATTERN.search(body) if not m: raise RuntimeError('Could not find bucket name') name = m.group(1) if '&' in name: raise RuntimeError(f'Unsupported bucket name: {name!r}') # Get name and mtime of first object m = KEY_PATTERN.search(body) if m: firstKey = m.group(1) m = MTIME_PATTERN.search(body) if not m: raise RuntimeError('Got key but no mtime') firstMtime = m.group(1) else: print('Warning: no key found, cannot verify that it is the same bucket', file = sys.stderr) firstKey, firstMtime = None, None # Start searching for provider in providers: for testUrlTemplate in PROVIDERS[provider]: testUrl = testUrlTemplate.format(name) r, testUrl, body = fetch_with_redirect(testUrl) if r.status_code != 200: continue # Compare first object if not firstKey: continue m = KEY_PATTERN.search(body) if not m: print(f'No key in {testUrl}', file = sys.stderr) continue testFirstKey = m.group(1) m = MTIME_PATTERN.search(body) if not m: print(f'Got key but no mtime in {testUrl}', file = sys.stderr) continue testFirstMtime = m.group(1) if (firstKey, firstMtime) == (testFirstKey, testFirstMtime): print(f'Found the bucket: {url} == {testUrl}') if __name__ == '__main__': if not 2 <= len(sys.argv) <= 3 or sys.argv[1] in ('--help', '-h'): print('Usage: s3-bucket-find-direct-url URL [PROVIDER]', file = sys.stderr) print("Searches for an S3 bucket that's available at URL (e.g. CDN or proxy), optionally filtered by PROVIDER", file = sys.stderr) print(f'Providers: {", ".join(PROVIDERS)}', file = sys.stderr) sys.exit(1) url = sys.argv[1] providers = (sys.argv[2],) if len(sys.argv) == 3 else tuple(PROVIDERS.keys()) find(url, providers)