diff --git a/ia-upload-stream b/ia-upload-stream index bb5c551..020b29a 100755 --- a/ia-upload-stream +++ b/ia-upload-stream @@ -26,6 +26,8 @@ import types logger = logging.getLogger() +# Timeout used for everything except part uploads +TIMEOUT = 60 class UploadError(Exception): @@ -160,13 +162,14 @@ def maybe_file_progress_bar(progress, f, *args, **kwargs): yield f -def upload_one(url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries): +def upload_one(url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries, timeout): + r = None # For UploadError in case of a timeout for attempt in range(1, tries + 1): if attempt > 1: logger.info(f'Retrying part {partNumber}') try: with maybe_file_progress_bar(progress, data, 'read', f'uploading {partNumber}', size = size) as w: - r = requests.put(f'{url}?partNumber={partNumber}&uploadId={uploadId}', headers = {**headers, 'Content-MD5': contentMd5}, data = w) + r = requests.put(f'{url}?partNumber={partNumber}&uploadId={uploadId}', headers = {**headers, 'Content-MD5': contentMd5}, data = w, timeout = timeout) except (ConnectionError, requests.exceptions.RequestException) as e: err = f'error {type(e).__module__}.{type(e).__name__} {e!s}' else: @@ -197,7 +200,7 @@ def wait_first(tasks, parts): logger.info(f'Upload of part {partNumber} OK, ETag: {eTag}') -def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024*1024, tries = 3, concurrency = 1, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None, progress = True): +def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024*1024, tries = 3, partTimeout = None, concurrency = 1, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None, progress = True): f = sys.stdin.buffer # Read `ia` config @@ -210,7 +213,7 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024 # Initiate multipart upload logger.info(f'Initiating multipart upload for {filename} in {item}') metadataHeaders = metadata_to_headers(metadata) - r = requests.post(f'{url}?uploads', headers = {**headers, 'x-amz-auto-make-bucket': '1', **metadataHeaders}) + r = requests.post(f'{url}?uploads', headers = {**headers, 'x-amz-auto-make-bucket': '1', **metadataHeaders}, timeout = TIMEOUT) if r.status_code != 200: raise UploadError(f'Could not initiate multipart upload; got status {r.status_code} from IA S3', r = r) # Fight me! @@ -223,7 +226,7 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024 # Wait for the item to exist; if the above created the item, it takes a little while for IA to actually create the bucket, and uploads would fail with a 404 until then. for attempt in range(1, tries + 1): logger.info(f'Checking for existence of {item}') - r = requests.get(f'https://s3.us.archive.org/{item}/', headers = headers) + r = requests.get(f'https://s3.us.archive.org/{item}/', headers = headers, timeout = TIMEOUT) if r.status_code == 200: break sleepTime = min(3 ** attempt, 30) @@ -255,7 +258,7 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024 logger.info(f'MD5: {h.hexdigest()}') contentMd5 = base64.b64encode(h.digest()).decode('ascii') - task = executor.submit(upload_one, url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries) + task = executor.submit(upload_one, url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries, partTimeout) tasks.append(task) while tasks: wait_first(tasks, parts) @@ -274,7 +277,7 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024 for attempt in range(1, tries + 1): if attempt > 1: logger.info('Retrying completion request') - r = requests.post(f'{url}?uploadId={uploadId}', headers = {**headers, **extraHeaders}, data = completeData) + r = requests.post(f'{url}?uploadId={uploadId}', headers = {**headers, **extraHeaders}, data = completeData, timeout = TIMEOUT) if r.status_code == 200: break retrying = f', retrying' if attempt < tries else '' @@ -295,7 +298,7 @@ def list_uploads(item, *, tries = 3): return super().init_poolmanager(*args, **kwargs) for attempt in range(1, tries + 1): - r = requests.get(url, allow_redirects = False) + r = requests.get(url, allow_redirects = False, timeout = TIMEOUT) if r.status_code == 307 and '.s3dns.us.archive.org' in r.headers['Location']: s3dnsUrl = r.headers['Location'] s3dnsUrl = s3dnsUrl.replace('http://', 'https://') @@ -303,7 +306,7 @@ def list_uploads(item, *, tries = 3): domain = s3dnsUrl[8:s3dnsUrl.find('/', 9)] s = requests.Session() s.mount(f'https://{domain}/', IAS3CertificateFixHTTPAdapter()) - r = s.get(s3dnsUrl) + r = s.get(s3dnsUrl, timeout = TIMEOUT) if r.status_code == 200: print(f'In-progress uploads for {item} (initiation datetime, upload ID, filename):') for upload in re.findall(r'.*?', r.text): @@ -330,7 +333,7 @@ def abort(item, filename, uploadId, *, iaConfigFile = None, tries = 3): for attempt in range(1, tries + 1): if attempt > 1: logger.info('Retrying abort request') - r = requests.delete(f'{url}?uploadId={uploadId}', headers = headers) + r = requests.delete(f'{url}?uploadId={uploadId}', headers = headers, timeout = TIMEOUT) if r.status_code == 204: break retrying = f', retrying' if attempt < tries else '' @@ -374,6 +377,7 @@ def main(): parser.add_argument('--clobber', dest = 'keepOldVersion', action = 'store_false', help = 'enable clobbering existing files') parser.add_argument('--ia-config-file', dest = 'iaConfigFile', metavar = 'FILE', help = 'path to the ia CLI config file (default: search the same paths as ia)') parser.add_argument('--tries', type = int, default = 3, metavar = 'N', help = 'retry on S3 errors (default: 3)') + parser.add_argument('--timeout', type = float, default = None, metavar = 'SECONDS', help = 'timeout for part uploads (default: unlimited)') parser.add_argument('--concurrency', '--concurrent', type = int, default = 1, metavar = 'N', help = 'upload N parts in parallel (default: 1)') parser.add_argument('--no-complete', dest = 'complete', action = 'store_false', help = 'disable completing the upload when stdin is exhausted') parser.add_argument('--no-progress', dest = 'progress', action = 'store_false', help = 'disable progress bar') @@ -404,6 +408,7 @@ def main(): iaConfigFile = args.iaConfigFile, partSize = args.partSize, tries = args.tries, + partTimeout = args.timeout, concurrency = args.concurrency, queueDerive = args.queueDerive, keepOldVersion = args.keepOldVersion,