diff --git a/ia-upload-stream b/ia-upload-stream index c40947c..a0a2d76 100755 --- a/ia-upload-stream +++ b/ia-upload-stream @@ -101,6 +101,19 @@ def readinto_size_limit(fin, fout, size, blockSize = 1048576): size -= len(d) +def get_part(f, partSize, progress): + data = io.BytesIO() + with maybe_file_progress_bar(progress, data, 'write', 'reading input') as w: + readinto_size_limit(f, w, partSize) + data.seek(0) + size = len(data.getbuffer()) + logger.info('Calculating MD5') + h = hashlib.md5(data.getbuffer()) + logger.info(f'MD5: {h.hexdigest()}') + contentMd5 = base64.b64encode(h.digest()).decode('ascii') + return (data, size, contentMd5) + + @contextlib.contextmanager def file_progress_bar(f, mode, description, size = None): if size is None: @@ -164,12 +177,14 @@ def maybe_file_progress_bar(progress, f, *args, **kwargs): def upload_one(url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries, timeout): r = None # For UploadError in case of a timeout + if partNumber: + url = f'{url}?partNumber={partNumber}&uploadId={uploadId}' for attempt in range(1, tries + 1): if attempt > 1: logger.info(f'Retrying part {partNumber}') try: with maybe_file_progress_bar(progress, data, 'read', f'uploading {partNumber}', size = size) as w: - r = requests.put(f'{url}?partNumber={partNumber}&uploadId={uploadId}', headers = {**headers, 'Content-MD5': contentMd5}, data = w, timeout = timeout) + r = requests.put(url, headers = {**headers, 'Content-MD5': contentMd5}, data = w, timeout = timeout) except (ConnectionError, requests.exceptions.RequestException) as e: err = f'error {type(e).__module__}.{type(e).__name__} {e!s}' else: @@ -208,12 +223,24 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024 url = f'https://s3.us.archive.org/{item}/{filename}' headers = {'Authorization': f'LOW {access}:{secret}'} + metadataHeaders = metadata_to_headers(metadata) + initialHeaders = {**headers, 'x-amz-auto-make-bucket': '1', **metadataHeaders} + + # Always read the first part + data, size, contentMd5 = get_part(f, partSize, progress) + + # If the file is only a single part anyway, use the normal PUT API instead of multipart because IA can process that *much* faster. + if uploadId is None and parts is None and complete and size < partSize: + logger.info(f'Uploading in one piece ({size} bytes)') + partNumber, eTag = upload_one(url, None, 0, data, contentMd5, size, initialHeaders, progress, tries, partTimeout) + logger.info(f'Upload OK, ETag: {eTag}') + logger.info('Done!') + return if uploadId is None: # Initiate multipart upload logger.info(f'Initiating multipart upload for {filename} in {item}') - metadataHeaders = metadata_to_headers(metadata) - r = requests.post(f'{url}?uploads', headers = {**headers, 'x-amz-auto-make-bucket': '1', **metadataHeaders}, timeout = TIMEOUT) + r = requests.post(f'{url}?uploads', headers = initialHeaders, timeout = TIMEOUT) if r.status_code != 200: raise UploadError(f'Could not initiate multipart upload; got status {r.status_code} from IA S3', r = r) # Fight me! @@ -241,22 +268,18 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024 parts = [] tasks = collections.deque() with concurrent.futures.ThreadPoolExecutor(max_workers = concurrency) as executor: - for partNumber in itertools.count(start = len(parts) + 1): + logger.info(f'Uploading part {len(parts) + 1} ({size} bytes)') + task = executor.submit(upload_one, url, uploadId, len(parts) + 1, data, contentMd5, size, headers, progress, tries, partTimeout) + tasks.append(task) + + for partNumber in itertools.count(start = len(parts) + 2): while len(tasks) >= concurrency: wait_first(tasks, parts) - data = io.BytesIO() - with maybe_file_progress_bar(progress, data, 'write', 'reading input') as w: - readinto_size_limit(f, w, partSize) - data.seek(0) - size = len(data.getbuffer()) + data, size, contentMd5 = get_part(f, partSize, progress) if not size: # We're done! break logger.info(f'Uploading part {partNumber} ({size} bytes)') - logger.info('Calculating MD5') - h = hashlib.md5(data.getbuffer()) - logger.info(f'MD5: {h.hexdigest()}') - contentMd5 = base64.b64encode(h.digest()).decode('ascii') task = executor.submit(upload_one, url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries, partTimeout) tasks.append(task)