You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

577 lines
18 KiB

  1. #!/usr/bin/env python
  2. """
  3. megawarc is useful if you have .tar full of .warc.gz files and
  4. you really want one big .warc.gz. With megawarc you get your
  5. .warc.gz, but you can still restore the original .tar.
  6. The megawarc tool looks for .warc.gz in the .tar file and
  7. creates three files, the megawarc:
  8. FILE.warc.gz is the concatenated .warc.gz
  9. FILE.tar contains any non-warc files from the .tar
  10. FILE.json.gz contains metadata
  11. You need the JSON file to reconstruct the original .tar from
  12. the .warc.gz and .tar files. The JSON file has the location
  13. of every file from the original .tar file.
  14. METADATA FORMAT
  15. ---------------
  16. One line with a JSON object per file in the .tar.
  17. {
  18. "target": {
  19. "container": "warc" or "tar", (where is this file?)
  20. "offset": number, (where in the tar/warc does this
  21. file start? for files in the tar
  22. this includes the tar header,
  23. which is copied to the tar.)
  24. "size": size (where does this file end?
  25. for files in the tar, this includes
  26. the padding to 512 bytes)
  27. },
  28. "src_offsets": {
  29. "entry": number, (where is this file in the original tar?)
  30. "data": number, (where does the data start? entry+512)
  31. "next_entry": number (where does the next tar entry start)
  32. },
  33. "header_fields": {
  34. ... (parsed fields from the tar header)
  35. },
  36. "header_base64": string (the base64-encoded tar header)
  37. }
  38. In older megawarcs the header is sometimes not base64-encoded:
  39. "header_string": string (the tar header for this entry)
  40. USAGE
  41. -----
  42. megawarc convert FILE
  43. Converts the tar file (containing .warc.gz files) to a megawarc.
  44. It creates FILE.warc.gz, FILE.tar and FILE.json.gz from FILE.
  45. megawarc pack FILE INFILE_1 [[INFILE_2] ...]
  46. Creates a megawarc with basename FILE and recursively adds the
  47. given files and directories to it, as if they were in a tar file.
  48. It creates FILE.warc.gz, FILE.tar and FILE.json.gz.
  49. megawarc restore FILE
  50. Converts the megawarc back to the original tar.
  51. It reads FILE.warc.gz, FILE.tar and FILE.json.gz to make FILE.
  52. """
  53. import base64
  54. import gzip
  55. import json
  56. import os.path
  57. import re
  58. import subprocess
  59. import sys
  60. import tarfile
  61. import zlib
  62. from optparse import OptionParser
  63. try:
  64. from collections import OrderedDict
  65. except ImportError:
  66. from ordereddict import OrderedDict
  67. class ProgressInfo(object):
  68. def __init__(self, maximum):
  69. self._current = 0
  70. self._maximum = maximum
  71. self._previous_percentage = None
  72. self._active = sys.stderr.isatty()
  73. self.print_status()
  74. def update(self, new_value):
  75. self._current = new_value
  76. self.print_status()
  77. def print_status(self):
  78. if not self._active:
  79. return
  80. percentage = int(float(self._current) / float(self._maximum) * 100)
  81. if self._maximum < 0:
  82. # count down
  83. percentage = 100-percentage
  84. percentage = max(0, min(100, percentage))
  85. if self._previous_percentage != percentage:
  86. self._previous_percentage = percentage
  87. sys.stderr.write("\r %3d%%" % percentage)
  88. def clear(self):
  89. if self._active:
  90. sys.stderr.write("\r \r")
  91. self._active = False
  92. # open input_filename and write the data from offset to
  93. # (offset+size) to stream
  94. def copy_to_stream(stream, input_filename, offset, size, verbose=False):
  95. if verbose and size > 10 * 1024 * 1024:
  96. progress = ProgressInfo(-size)
  97. else:
  98. progress = None
  99. try:
  100. with open(input_filename, "r") as f:
  101. f.seek(offset)
  102. to_read = size
  103. while to_read > 0:
  104. buf_size = min(to_read, 4096)
  105. buf = f.read(buf_size)
  106. l = len(buf)
  107. if l < buf_size:
  108. raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, l))
  109. stream.write(buf)
  110. to_read -= l
  111. if progress:
  112. progress.update(-to_read)
  113. finally:
  114. if progress:
  115. progress.clear()
  116. # part of a stream as a file
  117. # (seek relative to an offset)
  118. class RangeFile(object):
  119. def __init__(self, stream, offset, size):
  120. self._stream = stream
  121. self._offset = offset
  122. self._size = size
  123. self._current_rel_offset = 0
  124. self.seek(0)
  125. def tell(self):
  126. return self._current_rel_offset
  127. def seek(self, pos, whence=os.SEEK_SET):
  128. if whence == os.SEEK_SET:
  129. self._current_rel_offset = pos
  130. elif whence == os.SEEK_CUR:
  131. self._current_rel_offset += pos
  132. elif whence == os.SEEK_END:
  133. self._current_rel_offset = self._size + pos
  134. else:
  135. raise Exception("Unknown whence: %d." % whence)
  136. if self._current_rel_offset < 0 or self._current_rel_offset > self._size:
  137. raise Exception("Seek outside file: %d." % self._current_rel_offset)
  138. self._stream.seek(self._offset + self._current_rel_offset)
  139. def read(self, size):
  140. size = min(self._size - self._current_rel_offset, size)
  141. self._current_rel_offset += size
  142. buf = self._stream.read(size)
  143. if len(buf) < size:
  144. raise Exception("Expected to read %d but received %d." % (size, len(buf)))
  145. return buf
  146. # copies while reading
  147. class CopyReader(object):
  148. def __init__(self, in_stream, out_stream):
  149. self._in_stream = in_stream
  150. self._out_stream = out_stream
  151. self._last_read = 0
  152. def tell(self):
  153. return self._in_stream.tell()
  154. def seek(self, pos, whence=os.SEEK_SET):
  155. self._in_stream.seek(pos, whence)
  156. def read(self, size):
  157. pos = self.tell()
  158. if self._last_read < pos:
  159. raise Exception("Last read: %d Current pos: %d" % (self._last_read, pos))
  160. buf = self._in_stream.read(size)
  161. read_before = self._last_read - pos
  162. if read_before == 0:
  163. new_read = buf
  164. else:
  165. new_read = buf[read_before:]
  166. l = len(new_read)
  167. if l > 0:
  168. self._last_read += l
  169. self._out_stream.write(new_read)
  170. return buf
  171. # check for gzip errors
  172. def test_gz(filename, offset, size, verbose=False, copy_to_file=None):
  173. with open(filename, "r") as f_stream:
  174. f = RangeFile(f_stream, offset, size)
  175. if verbose and size > 10 * 1024 * 1024:
  176. progress = ProgressInfo(-size)
  177. else:
  178. progress = None
  179. if copy_to_file:
  180. f = CopyReader(f, copy_to_file)
  181. start_pos = copy_to_file.tell()
  182. try:
  183. with open("/dev/null", "w") as dev_null:
  184. gz = subprocess.Popen(["gunzip", "-tv"],
  185. shell=False,
  186. stdin=subprocess.PIPE,
  187. stdout=dev_null,
  188. stderr=dev_null)
  189. while True:
  190. buf = f.read(4096)
  191. size -= len(buf)
  192. if progress:
  193. progress.update(-size)
  194. if len(buf) > 0:
  195. gz.stdin.write(buf)
  196. else:
  197. break
  198. gz.stdin.close()
  199. ret = gz.wait()
  200. if ret != 0:
  201. raise IOError("Could not decompress warc.gz. gunzip returned %d." % ret)
  202. if progress:
  203. progress.clear()
  204. except (IOError, OSError) as e:
  205. if progress:
  206. progress.clear()
  207. if verbose:
  208. print >>sys.stderr, e
  209. if copy_to_file:
  210. copy_to_file.truncate(start_pos)
  211. copy_to_file.seek(start_pos)
  212. return False
  213. return True
  214. # converting a .tar with warcs to megawarc tar+warc+json
  215. class MegawarcBuilder(object):
  216. def __init__(self, input_filename):
  217. self.verbose = False
  218. self.input_filename = input_filename
  219. self.output_warc_filename = input_filename + ".megawarc.warc.gz"
  220. self.output_tar_filename = input_filename + ".megawarc.tar"
  221. self.output_json_filename = input_filename + ".megawarc.json.gz"
  222. def process(self):
  223. with open(self.output_warc_filename, "wb") as warc_out:
  224. with open(self.output_tar_filename, "wb") as tar_out:
  225. json_out = gzip.open(self.output_json_filename, "wb")
  226. try:
  227. tar = tarfile.open(self.input_filename, "r")
  228. try:
  229. for tarinfo in tar:
  230. self.process_entry(tarinfo, warc_out, tar_out, json_out)
  231. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  232. if padding > 0:
  233. tar_out.write("\0" * padding)
  234. finally:
  235. tar.close()
  236. finally:
  237. json_out.close()
  238. def process_entry(self, entry, warc_out, tar_out, json_out):
  239. with open(self.input_filename, "r") as tar:
  240. tar.seek(entry.offset)
  241. tar_header = tar.read(entry.offset_data - entry.offset)
  242. # calculate position of tar entry
  243. block_size = (len(tar_header) + # header
  244. entry.size + # data
  245. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  246. next_offset = entry.offset + block_size
  247. d_src_offsets = OrderedDict()
  248. d_src_offsets["entry"] = entry.offset
  249. d_src_offsets["data"] = entry.offset_data
  250. d_src_offsets["next_entry"] = next_offset
  251. # decide what to do with this entry
  252. valid_warc_gz = False
  253. if entry.isfile() and re.search(r"\.warc\.gz", entry.name):
  254. # this is a .warc.gz
  255. if self.verbose:
  256. print >>sys.stderr, "Checking %s" % entry.name
  257. # add to megawarc while copying to the megawarc.warc.gz
  258. warc_offset = warc_out.tell()
  259. valid_warc_gz = test_gz(self.input_filename, entry.offset_data, entry.size,
  260. copy_to_file=warc_out, verbose=self.verbose)
  261. # save in megawarc or in tar
  262. d_target = OrderedDict()
  263. if valid_warc_gz:
  264. # a warc file.gz, add to megawarc
  265. if self.verbose:
  266. print >>sys.stderr, "Copied %s to warc" % entry.name
  267. d_target["container"] = "warc"
  268. d_target["offset"] = warc_offset
  269. d_target["size"] = entry.size
  270. else:
  271. # not a warc.gz file, add to tar
  272. tar_offset = tar_out.tell()
  273. if self.verbose:
  274. print >>sys.stderr, "Copying %s to tar" % entry.name
  275. copy_to_stream(tar_out, self.input_filename, entry.offset, block_size)
  276. d_target["container"] = "tar"
  277. d_target["offset"] = tar_offset
  278. d_target["size"] = block_size
  279. # store details
  280. d = OrderedDict()
  281. d["target"] = d_target
  282. d["src_offsets"] = d_src_offsets
  283. d["header_fields"] = entry.get_info("utf-8", {})
  284. d["header_base64"] = base64.b64encode(tar_header)
  285. # store metadata
  286. json.dump(d, json_out, separators=(',', ':'))
  287. json_out.write("\n")
  288. # adding .warc.gz and other files to megawarc tar+warc+json
  289. class MegawarcPacker(object):
  290. def __init__(self, output_basename):
  291. self.verbose = False
  292. self.output_basename = output_basename
  293. self.output_warc_filename = output_basename + ".megawarc.warc.gz"
  294. self.output_tar_filename = output_basename + ".megawarc.tar"
  295. self.output_json_filename = output_basename + ".megawarc.json.gz"
  296. self.tar_pos = 0
  297. def process(self, filelist):
  298. with open(self.output_warc_filename, "wb") as warc_out:
  299. with open(self.output_tar_filename, "wb") as tar_out:
  300. json_out = gzip.open(self.output_json_filename, "wb")
  301. try:
  302. def each_file(arg, dirname, names):
  303. for n in names:
  304. n = os.path.join(dirname, n)
  305. if os.path.isfile(n):
  306. self.process_file(n, warc_out, tar_out, json_out)
  307. for filename in filelist:
  308. if os.path.isdir(filename):
  309. os.path.walk(filename, each_file, None)
  310. elif os.path.isfile(filename):
  311. self.process_file(filename, warc_out, tar_out, json_out)
  312. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  313. if padding > 0:
  314. tar_out.write("\0" * padding)
  315. finally:
  316. json_out.close()
  317. def process_file(self, filename, warc_out, tar_out, json_out):
  318. # make tar header
  319. arcname = filename
  320. arcname = arcname.replace(os.sep, "/")
  321. arcname = arcname.lstrip("/")
  322. entry = tarfile.TarInfo()
  323. statres = os.stat(filename)
  324. stmd = statres.st_mode
  325. entry.name = arcname
  326. entry.mode = stmd
  327. entry.uid = statres.st_uid
  328. entry.gid = statres.st_gid
  329. entry.size = statres.st_size
  330. entry.mtime = statres.st_mtime
  331. entry.type = tarfile.REGTYPE
  332. tar_header = entry.tobuf()
  333. # find position in imaginary tar
  334. entry.offset = self.tar_pos
  335. # calculate position of tar entry
  336. tar_header_l = len(tar_header)
  337. block_size = (tar_header_l + # header
  338. entry.size + # data
  339. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  340. data_offset = entry.offset + tar_header_l
  341. next_offset = entry.offset + block_size
  342. # move to next position in imaginary tar
  343. self.tar_pos = next_offset
  344. d_src_offsets = OrderedDict()
  345. d_src_offsets["entry"] = entry.offset
  346. d_src_offsets["data"] = data_offset
  347. d_src_offsets["next_entry"] = next_offset
  348. # decide what to do with this file
  349. valid_warc_gz = False
  350. if re.search(r"\.warc\.gz", filename):
  351. if self.verbose:
  352. print >>sys.stderr, "Checking %s" % filename
  353. warc_offset = warc_out.tell()
  354. valid_warc_gz = test_gz(filename, 0, entry.size,
  355. copy_to_file=warc_out, verbose=self.verbose)
  356. # save in megawarc or in tar
  357. d_target = OrderedDict()
  358. if valid_warc_gz:
  359. # a warc file.gz, add to megawarc
  360. if self.verbose:
  361. print >>sys.stderr, "Copied %s to warc" % filename
  362. d_target["container"] = "warc"
  363. d_target["offset"] = warc_offset
  364. d_target["size"] = entry.size
  365. else:
  366. # not a warc.gz file, add to tar
  367. tar_offset = tar_out.tell()
  368. if self.verbose:
  369. print >>sys.stderr, "Copying %s to tar" % filename
  370. tar_out.write(tar_header)
  371. copy_to_stream(tar_out, filename, 0, entry.size)
  372. padding = (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE
  373. if padding > 0:
  374. tar_out.write("\0" * padding)
  375. d_target["container"] = "tar"
  376. d_target["offset"] = tar_offset
  377. d_target["size"] = block_size
  378. # store details
  379. d = OrderedDict()
  380. d["target"] = d_target
  381. d["src_offsets"] = d_src_offsets
  382. d["header_fields"] = entry.get_info("utf-8", {})
  383. d["header_base64"] = base64.b64encode(tar_header)
  384. # store metadata
  385. json.dump(d, json_out, separators=(',', ':'))
  386. json_out.write("\n")
  387. # recreate the original .tar from a megawarc tar+warc+json
  388. class MegawarcRestorer(object):
  389. def __init__(self, output_filename):
  390. self.verbose = False
  391. self.output_filename = output_filename
  392. self.input_warc_filename = output_filename + ".megawarc.warc.gz"
  393. self.input_tar_filename = output_filename + ".megawarc.tar"
  394. self.input_json_filename = output_filename + ".megawarc.json.gz"
  395. def process(self):
  396. json_in = gzip.open(self.input_json_filename, "rb")
  397. try:
  398. with open(self.output_filename, "wb") as tar_out:
  399. for line in json_in:
  400. entry = json.loads(line)
  401. self.process_entry(entry, tar_out)
  402. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  403. if padding > 0:
  404. tar_out.write("\0" * padding)
  405. finally:
  406. json_in.close()
  407. def process_entry(self, entry, tar_out):
  408. if entry["target"]["container"] == "warc":
  409. if self.verbose:
  410. print >>sys.stderr, "Copying %s from warc" % entry["header_fields"]["name"]
  411. if "header_base64" in entry:
  412. tar_out.write(base64.b64decode(entry["header_base64"]))
  413. elif "header_string" in entry:
  414. tar_out.write(entry["header_string"])
  415. else:
  416. raise Exception("Missing header_string or header_base64.")
  417. copy_to_stream(tar_out, self.input_warc_filename,
  418. entry["target"]["offset"], entry["target"]["size"])
  419. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  420. if padding > 0:
  421. tar_out.write("\0" * padding)
  422. elif entry["target"]["container"] == "tar":
  423. if self.verbose:
  424. print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"]
  425. copy_to_stream(tar_out, self.input_tar_filename,
  426. entry["target"]["offset"], entry["target"]["size"])
  427. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  428. if padding > 0:
  429. tar_out.write("\0" * padding)
  430. else:
  431. raise Exception("Unkown container: %s for %s" %
  432. (entry["target"]["container"], entry["header_fields"]["name"]))
  433. def main():
  434. parser = OptionParser(
  435. usage="Usage: %prog [--verbose] convert FILE\n %prog [--verbose] pack FILE [INFILE [INFILE ...]]\n %prog [--verbose] restore FILE",
  436. description="""%prog convert FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar.
  437. Use %prog pack FILE INFILE ... to create a megawarc containing the files.
  438. Use %prog restore FILE to reconstruct original tar.
  439. """
  440. )
  441. parser.add_option("-v", "--verbose", dest="verbose",
  442. action="store_true",
  443. help="print status messages", default=False)
  444. (options, args) = parser.parse_args()
  445. if len(args) < 2:
  446. parser.print_usage()
  447. exit(1)
  448. if args[0] == "convert":
  449. if not os.path.exists(args[1]):
  450. print >>sys.stderr, "Input file %s does not exist." % args[1]
  451. exit(1)
  452. try:
  453. mwb = MegawarcBuilder(args[1])
  454. mwb.verbose = options.verbose
  455. mwb.process()
  456. except:
  457. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  458. if os.path.exists(args[1]+ext):
  459. os.unlink(args[1]+ext)
  460. raise
  461. elif args[0] == "pack":
  462. try:
  463. mwb = MegawarcPacker(args[1])
  464. mwb.verbose = options.verbose
  465. mwb.process(args[2:])
  466. except:
  467. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  468. if os.path.exists(args[1]+ext):
  469. os.unlink(args[1]+ext)
  470. raise
  471. elif args[0] == "restore":
  472. for ext in (".megawarc.warc.gz", ".megawarc.json.gz"):
  473. if not os.path.exists(args[1]+ext):
  474. print >>sys.stderr, "Input file %s does not exist." % (args[1] + ext)
  475. exit(1)
  476. if os.path.exists(args[1]):
  477. print >>sys.stderr, "Outputfile %s already exists." % args[1]
  478. exit(1)
  479. try:
  480. mwr = MegawarcRestorer(args[1])
  481. mwr.verbose = options.verbose
  482. mwr.process()
  483. except:
  484. if os.path.exists(args[1]):
  485. os.unlink(args[1])
  486. raise
  487. else:
  488. parser.print_usage()
  489. exit(1)
  490. if __name__ == "__main__":
  491. main()