Browse Source

Add timeouts

master
JustAnotherArchivist 1 year ago
parent
commit
edf1dd417c
1 changed files with 15 additions and 10 deletions
  1. +15
    -10
      ia-upload-stream

+ 15
- 10
ia-upload-stream View File

@@ -26,6 +26,8 @@ import types




logger = logging.getLogger() logger = logging.getLogger()
# Timeout used for everything except part uploads
TIMEOUT = 60




class UploadError(Exception): class UploadError(Exception):
@@ -160,13 +162,14 @@ def maybe_file_progress_bar(progress, f, *args, **kwargs):
yield f yield f




def upload_one(url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries):
def upload_one(url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries, timeout):
r = None # For UploadError in case of a timeout
for attempt in range(1, tries + 1): for attempt in range(1, tries + 1):
if attempt > 1: if attempt > 1:
logger.info(f'Retrying part {partNumber}') logger.info(f'Retrying part {partNumber}')
try: try:
with maybe_file_progress_bar(progress, data, 'read', f'uploading {partNumber}', size = size) as w: with maybe_file_progress_bar(progress, data, 'read', f'uploading {partNumber}', size = size) as w:
r = requests.put(f'{url}?partNumber={partNumber}&uploadId={uploadId}', headers = {**headers, 'Content-MD5': contentMd5}, data = w)
r = requests.put(f'{url}?partNumber={partNumber}&uploadId={uploadId}', headers = {**headers, 'Content-MD5': contentMd5}, data = w, timeout = timeout)
except (ConnectionError, requests.exceptions.RequestException) as e: except (ConnectionError, requests.exceptions.RequestException) as e:
err = f'error {type(e).__module__}.{type(e).__name__} {e!s}' err = f'error {type(e).__module__}.{type(e).__name__} {e!s}'
else: else:
@@ -197,7 +200,7 @@ def wait_first(tasks, parts):
logger.info(f'Upload of part {partNumber} OK, ETag: {eTag}') logger.info(f'Upload of part {partNumber} OK, ETag: {eTag}')




def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024*1024, tries = 3, concurrency = 1, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None, progress = True):
def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024*1024, tries = 3, partTimeout = None, concurrency = 1, queueDerive = True, keepOldVersion = True, complete = True, uploadId = None, parts = None, progress = True):
f = sys.stdin.buffer f = sys.stdin.buffer


# Read `ia` config # Read `ia` config
@@ -210,7 +213,7 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024
# Initiate multipart upload # Initiate multipart upload
logger.info(f'Initiating multipart upload for {filename} in {item}') logger.info(f'Initiating multipart upload for {filename} in {item}')
metadataHeaders = metadata_to_headers(metadata) metadataHeaders = metadata_to_headers(metadata)
r = requests.post(f'{url}?uploads', headers = {**headers, 'x-amz-auto-make-bucket': '1', **metadataHeaders})
r = requests.post(f'{url}?uploads', headers = {**headers, 'x-amz-auto-make-bucket': '1', **metadataHeaders}, timeout = TIMEOUT)
if r.status_code != 200: if r.status_code != 200:
raise UploadError(f'Could not initiate multipart upload; got status {r.status_code} from IA S3', r = r) raise UploadError(f'Could not initiate multipart upload; got status {r.status_code} from IA S3', r = r)
# Fight me! # Fight me!
@@ -223,7 +226,7 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024
# Wait for the item to exist; if the above created the item, it takes a little while for IA to actually create the bucket, and uploads would fail with a 404 until then. # Wait for the item to exist; if the above created the item, it takes a little while for IA to actually create the bucket, and uploads would fail with a 404 until then.
for attempt in range(1, tries + 1): for attempt in range(1, tries + 1):
logger.info(f'Checking for existence of {item}') logger.info(f'Checking for existence of {item}')
r = requests.get(f'https://s3.us.archive.org/{item}/', headers = headers)
r = requests.get(f'https://s3.us.archive.org/{item}/', headers = headers, timeout = TIMEOUT)
if r.status_code == 200: if r.status_code == 200:
break break
sleepTime = min(3 ** attempt, 30) sleepTime = min(3 ** attempt, 30)
@@ -255,7 +258,7 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024
logger.info(f'MD5: {h.hexdigest()}') logger.info(f'MD5: {h.hexdigest()}')
contentMd5 = base64.b64encode(h.digest()).decode('ascii') contentMd5 = base64.b64encode(h.digest()).decode('ascii')


task = executor.submit(upload_one, url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries)
task = executor.submit(upload_one, url, uploadId, partNumber, data, contentMd5, size, headers, progress, tries, partTimeout)
tasks.append(task) tasks.append(task)
while tasks: while tasks:
wait_first(tasks, parts) wait_first(tasks, parts)
@@ -274,7 +277,7 @@ def upload(item, filename, metadata, *, iaConfigFile = None, partSize = 100*1024
for attempt in range(1, tries + 1): for attempt in range(1, tries + 1):
if attempt > 1: if attempt > 1:
logger.info('Retrying completion request') logger.info('Retrying completion request')
r = requests.post(f'{url}?uploadId={uploadId}', headers = {**headers, **extraHeaders}, data = completeData)
r = requests.post(f'{url}?uploadId={uploadId}', headers = {**headers, **extraHeaders}, data = completeData, timeout = TIMEOUT)
if r.status_code == 200: if r.status_code == 200:
break break
retrying = f', retrying' if attempt < tries else '' retrying = f', retrying' if attempt < tries else ''
@@ -295,7 +298,7 @@ def list_uploads(item, *, tries = 3):
return super().init_poolmanager(*args, **kwargs) return super().init_poolmanager(*args, **kwargs)


for attempt in range(1, tries + 1): for attempt in range(1, tries + 1):
r = requests.get(url, allow_redirects = False)
r = requests.get(url, allow_redirects = False, timeout = TIMEOUT)
if r.status_code == 307 and '.s3dns.us.archive.org' in r.headers['Location']: if r.status_code == 307 and '.s3dns.us.archive.org' in r.headers['Location']:
s3dnsUrl = r.headers['Location'] s3dnsUrl = r.headers['Location']
s3dnsUrl = s3dnsUrl.replace('http://', 'https://') s3dnsUrl = s3dnsUrl.replace('http://', 'https://')
@@ -303,7 +306,7 @@ def list_uploads(item, *, tries = 3):
domain = s3dnsUrl[8:s3dnsUrl.find('/', 9)] domain = s3dnsUrl[8:s3dnsUrl.find('/', 9)]
s = requests.Session() s = requests.Session()
s.mount(f'https://{domain}/', IAS3CertificateFixHTTPAdapter()) s.mount(f'https://{domain}/', IAS3CertificateFixHTTPAdapter())
r = s.get(s3dnsUrl)
r = s.get(s3dnsUrl, timeout = TIMEOUT)
if r.status_code == 200: if r.status_code == 200:
print(f'In-progress uploads for {item} (initiation datetime, upload ID, filename):') print(f'In-progress uploads for {item} (initiation datetime, upload ID, filename):')
for upload in re.findall(r'<Upload>.*?</Upload>', r.text): for upload in re.findall(r'<Upload>.*?</Upload>', r.text):
@@ -330,7 +333,7 @@ def abort(item, filename, uploadId, *, iaConfigFile = None, tries = 3):
for attempt in range(1, tries + 1): for attempt in range(1, tries + 1):
if attempt > 1: if attempt > 1:
logger.info('Retrying abort request') logger.info('Retrying abort request')
r = requests.delete(f'{url}?uploadId={uploadId}', headers = headers)
r = requests.delete(f'{url}?uploadId={uploadId}', headers = headers, timeout = TIMEOUT)
if r.status_code == 204: if r.status_code == 204:
break break
retrying = f', retrying' if attempt < tries else '' retrying = f', retrying' if attempt < tries else ''
@@ -374,6 +377,7 @@ def main():
parser.add_argument('--clobber', dest = 'keepOldVersion', action = 'store_false', help = 'enable clobbering existing files') parser.add_argument('--clobber', dest = 'keepOldVersion', action = 'store_false', help = 'enable clobbering existing files')
parser.add_argument('--ia-config-file', dest = 'iaConfigFile', metavar = 'FILE', help = 'path to the ia CLI config file (default: search the same paths as ia)') parser.add_argument('--ia-config-file', dest = 'iaConfigFile', metavar = 'FILE', help = 'path to the ia CLI config file (default: search the same paths as ia)')
parser.add_argument('--tries', type = int, default = 3, metavar = 'N', help = 'retry on S3 errors (default: 3)') parser.add_argument('--tries', type = int, default = 3, metavar = 'N', help = 'retry on S3 errors (default: 3)')
parser.add_argument('--timeout', type = float, default = None, metavar = 'SECONDS', help = 'timeout for part uploads (default: unlimited)')
parser.add_argument('--concurrency', '--concurrent', type = int, default = 1, metavar = 'N', help = 'upload N parts in parallel (default: 1)') parser.add_argument('--concurrency', '--concurrent', type = int, default = 1, metavar = 'N', help = 'upload N parts in parallel (default: 1)')
parser.add_argument('--no-complete', dest = 'complete', action = 'store_false', help = 'disable completing the upload when stdin is exhausted') parser.add_argument('--no-complete', dest = 'complete', action = 'store_false', help = 'disable completing the upload when stdin is exhausted')
parser.add_argument('--no-progress', dest = 'progress', action = 'store_false', help = 'disable progress bar') parser.add_argument('--no-progress', dest = 'progress', action = 'store_false', help = 'disable progress bar')
@@ -404,6 +408,7 @@ def main():
iaConfigFile = args.iaConfigFile, iaConfigFile = args.iaConfigFile,
partSize = args.partSize, partSize = args.partSize,
tries = args.tries, tries = args.tries,
partTimeout = args.timeout,
concurrency = args.concurrency, concurrency = args.concurrency,
queueDerive = args.queueDerive, queueDerive = args.queueDerive,
keepOldVersion = args.keepOldVersion, keepOldVersion = args.keepOldVersion,


Loading…
Cancel
Save