You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

470 lines
16 KiB

  1. #!/usr/bin/env python
  2. """
  3. megawarc is useful if you have .tar full of .warc.gz files and
  4. you really want one big .warc.gz. With megawarc you get your
  5. .warc.gz, but you can still restore the original .tar.
  6. The megawarc tool looks for .warc.gz in the .tar file and
  7. creates three files, the megawarc:
  8. FILE.warc.gz is the concatenated .warc.gz
  9. FILE.tar contains any non-warc files from the .tar
  10. FILE.json.gz contains metadata
  11. You need the JSON file to reconstruct the original .tar from
  12. the .warc.gz and .tar files. The JSON file has the location
  13. of every file from the original .tar file.
  14. METADATA FORMAT
  15. ---------------
  16. One line with a JSON object per file in the .tar.
  17. {
  18. "target": {
  19. "container": "warc" or "tar", (where is this file?)
  20. "offset": number, (where in the tar/warc does this
  21. file start? for files in the tar
  22. this includes the tar header,
  23. which is copied to the tar.)
  24. "size": size (where does this file end?
  25. for files in the tar, this includes
  26. the padding to 512 bytes)
  27. },
  28. "src_offsets": {
  29. "entry": number, (where is this file in the original tar?)
  30. "data": number, (where does the data start? entry+512)
  31. "next_entry": number (where does the next tar entry start)
  32. },
  33. "header_fields": {
  34. ... (parsed fields from the tar header)
  35. },
  36. "header_string": string (the tar header for this entry)
  37. }
  38. USAGE
  39. -----
  40. megawarc convert FILE
  41. Converts the tar file (containing .warc.gz files) to a megawarc.
  42. It creates FILE.warc.gz, FILE.tar and FILE.json.gz from FILE.
  43. megawarc pack FILE INFILE_1 [[INFILE_2] ...]
  44. Creates a megawarc with basename FILE and recursively adds the
  45. given files and directories to it, as if they were in a tar file.
  46. It creates FILE.warc.gz, FILE.tar and FILE.json.gz.
  47. megawarc restore FILE
  48. Converts the megawarc back to the original tar.
  49. It reads FILE.warc.gz, FILE.tar and FILE.json.gz to make FILE.
  50. """
  51. import gzip
  52. import json
  53. import os.path
  54. import re
  55. import sys
  56. import tarfile
  57. import zlib
  58. from optparse import OptionParser
  59. try:
  60. from collections import OrderedDict
  61. except ImportError:
  62. from ordereddict import OrderedDict
  63. # modify tarfile.TarInfo to keep the original tar headers
  64. tarfile.TarInfo.orig_frombuf = tarfile.TarInfo.frombuf
  65. @classmethod
  66. def keepbuf_frombuf(cls, buf):
  67. entry = cls.orig_frombuf(buf)
  68. entry.buf = buf
  69. return entry
  70. tarfile.TarInfo.frombuf = keepbuf_frombuf
  71. # open input_filename and write the data from offset to
  72. # (offset+size) to stream
  73. def copy_to_stream(stream, input_filename, offset, size):
  74. with open(input_filename, "r") as f:
  75. f.seek(offset)
  76. to_read = size
  77. while to_read > 0:
  78. buf_size = min(to_read, 4096)
  79. buf = f.read(buf_size)
  80. if len(buf) < buf_size:
  81. raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf)))
  82. stream.write(buf)
  83. to_read -= len(buf)
  84. stream.flush()
  85. # part of a stream as a file
  86. # (seek relative to an offset)
  87. class RangeFile(object):
  88. def __init__(self, stream, offset, size):
  89. self._stream = stream
  90. self._offset = offset
  91. self._size = size
  92. self._current_rel_offset = 0
  93. def tell(self):
  94. return self._current_rel_offset
  95. def seek(self, pos, whence=os.SEEK_SET):
  96. if whence == os.SEEK_SET:
  97. self._current_rel_offset = pos
  98. elif whence == os.SEEK_CUR:
  99. self._current_rel_offset += pos
  100. elif whence == os.SEEK_END:
  101. self._current_rel_offset = self._size + pos
  102. else:
  103. raise Exception("Unknown whence: %d." % whence)
  104. if self._current_rel_offset < 0 or self._current_rel_offset > self._size:
  105. raise Exception("Seek outside file: %d." % self._current_rel_offset)
  106. self._stream.seek(self._offset + self._current_rel_offset)
  107. def read(self, size):
  108. size = min(self._size - self._current_rel_offset, size)
  109. self._current_rel_offset += size
  110. buf = self._stream.read(size)
  111. if len(buf) < size:
  112. raise Exception("Expected to read %d but received %d." % (size, len(buf)))
  113. return buf
  114. # check for gzip errors
  115. def test_gz(filename, offset, size, verbose=False):
  116. with open(filename, "r") as f_stream:
  117. f = RangeFile(f_stream, offset, size)
  118. try:
  119. gz = gzip.GzipFile(fileobj=f, mode="rb")
  120. while True:
  121. buf = gz.read(4096)
  122. if len(buf) == 0:
  123. break
  124. except (IOError, ValueError, zlib.error) as e:
  125. if verbose:
  126. print >>sys.stderr, e
  127. return False
  128. return True
  129. # converting a .tar with warcs to megawarc tar+warc+json
  130. class MegawarcBuilder(object):
  131. def __init__(self, input_filename):
  132. self.verbose = False
  133. self.input_filename = input_filename
  134. self.output_warc_filename = input_filename + ".megawarc.warc.gz"
  135. self.output_tar_filename = input_filename + ".megawarc.tar"
  136. self.output_json_filename = input_filename + ".megawarc.json.gz"
  137. def process(self):
  138. with open(self.output_warc_filename, "wb") as warc_out:
  139. with open(self.output_tar_filename, "wb") as tar_out:
  140. with gzip.open(self.output_json_filename, "wb") as json_out:
  141. with tarfile.open(self.input_filename, "r") as tar:
  142. for tarinfo in tar:
  143. self.process_entry(tarinfo, warc_out, tar_out, json_out)
  144. tar_out.flush()
  145. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  146. if padding > 0:
  147. tar_out.write("\0" * padding)
  148. def process_entry(self, entry, warc_out, tar_out, json_out):
  149. # calculate position of tar entry
  150. block_size = (tarfile.BLOCKSIZE + # header
  151. entry.size + # data
  152. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  153. data_offset = entry.offset + tarfile.BLOCKSIZE
  154. next_offset = entry.offset + block_size
  155. d_src_offsets = OrderedDict()
  156. d_src_offsets["entry"] = entry.offset
  157. d_src_offsets["data"] = data_offset
  158. d_src_offsets["next_entry"] = next_offset
  159. # decide what to do with this entry
  160. valid_warc_gz = False
  161. if entry.isfile() and re.search(r"\.warc\.gz", entry.name):
  162. if self.verbose:
  163. print >>sys.stderr, "Checking %s" % entry.name
  164. valid_warc_gz = test_gz(self.input_filename, data_offset, entry.size, self.verbose)
  165. if not valid_warc_gz:
  166. if self.verbose:
  167. print >>sys.stderr, "Invalid gzip %s" % entry.name
  168. # save in megawarc or in tar
  169. d_target = OrderedDict()
  170. if valid_warc_gz:
  171. # a warc file.gz, add to megawarc
  172. warc_offset = warc_out.tell()
  173. if self.verbose:
  174. print >>sys.stderr, "Copying %s to warc" % entry.name
  175. copy_to_stream(warc_out, self.input_filename, data_offset, entry.size)
  176. d_target["container"] = "warc"
  177. d_target["offset"] = warc_offset
  178. d_target["size"] = entry.size
  179. else:
  180. # not a warc.gz file, add to tar
  181. tar_offset = tar_out.tell()
  182. if self.verbose:
  183. print >>sys.stderr, "Copying %s to tar" % entry.name
  184. copy_to_stream(tar_out, self.input_filename, entry.offset, block_size)
  185. d_target["container"] = "tar"
  186. d_target["offset"] = tar_offset
  187. d_target["size"] = block_size
  188. # store details
  189. d = OrderedDict()
  190. d["target"] = d_target
  191. d["src_offsets"] = d_src_offsets
  192. d["header_fields"] = entry.get_info("utf-8", {})
  193. d["header_string"] = entry.buf
  194. # store metadata
  195. json.dump(d, json_out, separators=(',', ':'))
  196. json_out.write("\n")
  197. # adding .warc.gz and other files to megawarc tar+warc+json
  198. class MegawarcPacker(object):
  199. def __init__(self, output_basename):
  200. self.verbose = False
  201. self.output_basename = output_basename
  202. self.output_warc_filename = output_basename + ".megawarc.warc.gz"
  203. self.output_tar_filename = output_basename + ".megawarc.tar"
  204. self.output_json_filename = output_basename + ".megawarc.json.gz"
  205. self.tar_pos = 0
  206. def process(self, filelist):
  207. with open(self.output_warc_filename, "wb") as warc_out:
  208. with open(self.output_tar_filename, "wb") as tar_out:
  209. with gzip.open(self.output_json_filename, "wb") as json_out:
  210. def each_file(arg, dirname, names):
  211. for n in names:
  212. n = os.path.join(dirname, n)
  213. if os.path.isfile(n):
  214. self.process_file(n, warc_out, tar_out, json_out)
  215. for filename in filelist:
  216. if os.path.isdir(filename):
  217. os.path.walk(filename, each_file, None)
  218. elif os.path.isfile(filename):
  219. self.process_file(filename, warc_out, tar_out, json_out)
  220. tar_out.flush()
  221. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  222. if padding > 0:
  223. tar_out.write("\0" * padding)
  224. def process_file(self, filename, warc_out, tar_out, json_out):
  225. # make tar header
  226. arcname = filename
  227. arcname = arcname.replace(os.sep, "/")
  228. arcname = arcname.lstrip("/")
  229. entry = tarfile.TarInfo()
  230. statres = os.stat(filename)
  231. stmd = statres.st_mode
  232. entry.name = arcname
  233. entry.mode = stmd
  234. entry.uid = statres.st_uid
  235. entry.gid = statres.st_gid
  236. entry.size = statres.st_size
  237. entry.mtime = statres.st_mtime
  238. entry.type = tarfile.REGTYPE
  239. tar_header = entry.tobuf()
  240. # find position in imaginary tar
  241. entry.offset = self.tar_pos
  242. # calculate position of tar entry
  243. block_size = (len(tar_header) + # header
  244. entry.size + # data
  245. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  246. data_offset = entry.offset + len(tar_header)
  247. next_offset = entry.offset + block_size
  248. # move to next position in imaginary tar
  249. self.tar_pos = next_offset
  250. d_src_offsets = OrderedDict()
  251. d_src_offsets["entry"] = entry.offset
  252. d_src_offsets["data"] = data_offset
  253. d_src_offsets["next_entry"] = next_offset
  254. # decide what to do with this file
  255. valid_warc_gz = False
  256. if re.search(r"\.warc\.gz", filename):
  257. if self.verbose:
  258. print >>sys.stderr, "Checking %s" % filename
  259. valid_warc_gz = test_gz(filename, 0, entry.size, self.verbose)
  260. if not valid_warc_gz:
  261. if self.verbose:
  262. print >>sys.stderr, "Invalid gzip %s" % filename
  263. # save in megawarc or in tar
  264. d_target = OrderedDict()
  265. if valid_warc_gz:
  266. # a warc file.gz, add to megawarc
  267. warc_offset = warc_out.tell()
  268. if self.verbose:
  269. print >>sys.stderr, "Copying %s to warc" % filename
  270. copy_to_stream(warc_out, filename, 0, entry.size)
  271. d_target["container"] = "warc"
  272. d_target["offset"] = warc_offset
  273. d_target["size"] = entry.size
  274. else:
  275. # not a warc.gz file, add to tar
  276. tar_offset = tar_out.tell()
  277. if self.verbose:
  278. print >>sys.stderr, "Copying %s to tar" % filename
  279. tar_out.write(tar_header)
  280. copy_to_stream(tar_out, filename, 0, entry.size)
  281. padding = (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE
  282. if padding > 0:
  283. tar_out.write("\0" * padding)
  284. tar_out.flush()
  285. d_target["container"] = "tar"
  286. d_target["offset"] = tar_offset
  287. d_target["size"] = block_size
  288. # store details
  289. d = OrderedDict()
  290. d["target"] = d_target
  291. d["src_offsets"] = d_src_offsets
  292. d["header_fields"] = entry.get_info("utf-8", {})
  293. d["header_string"] = tar_header
  294. # store metadata
  295. json.dump(d, json_out, separators=(',', ':'))
  296. json_out.write("\n")
  297. # recreate the original .tar from a megawarc tar+warc+json
  298. class MegawarcRestorer(object):
  299. def __init__(self, output_filename):
  300. self.verbose = False
  301. self.output_filename = output_filename
  302. self.input_warc_filename = output_filename + ".megawarc.warc.gz"
  303. self.input_tar_filename = output_filename + ".megawarc.tar"
  304. self.input_json_filename = output_filename + ".megawarc.json.gz"
  305. def process(self):
  306. with gzip.open(self.input_json_filename, "rb") as json_in:
  307. with open(self.output_filename, "wb") as tar_out:
  308. for line in json_in:
  309. entry = json.loads(line)
  310. self.process_entry(entry, tar_out)
  311. tar_out.flush()
  312. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  313. if padding > 0:
  314. tar_out.write("\0" * padding)
  315. def process_entry(self, entry, tar_out):
  316. if entry["target"]["container"] == "warc":
  317. if self.verbose:
  318. print >>sys.stderr, "Copying %s from warc" % entry["header_fields"]["name"]
  319. tar_out.write(entry["header_string"])
  320. copy_to_stream(tar_out, self.input_warc_filename,
  321. entry["target"]["offset"], entry["target"]["size"])
  322. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  323. if padding > 0:
  324. tar_out.write("\0" * padding)
  325. elif entry["target"]["container"] == "tar":
  326. if self.verbose:
  327. print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"]
  328. copy_to_stream(tar_out, self.input_tar_filename,
  329. entry["target"]["offset"], entry["target"]["size"])
  330. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  331. if padding > 0:
  332. tar_out.write("\0" * padding)
  333. else:
  334. raise Exception("Unkown container: %s for %s" %
  335. (entry["target"]["container"], entry["header_fields"]["name"]))
  336. def main():
  337. parser = OptionParser(
  338. usage="Usage: %prog [--verbose] convert FILE\n %prog [--verbose] pack FILE [INFILE [INFILE ...]]\n %prog [--verbose] restore FILE",
  339. description="""%prog convert FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar.
  340. Use %prog pack FILE INFILE ... to create a megawarc containing the files.
  341. Use %prog restore FILE to reconstruct original tar.
  342. """
  343. )
  344. parser.add_option("-v", "--verbose", dest="verbose",
  345. action="store_true",
  346. help="print status messages", default=False)
  347. (options, args) = parser.parse_args()
  348. if len(args) < 2:
  349. parser.print_usage()
  350. exit(1)
  351. if args[0] == "convert":
  352. if not os.path.exists(args[1]):
  353. print >>sys.stderr, "Input file %s does not exist." % args[1]
  354. exit(1)
  355. try:
  356. mwb = MegawarcBuilder(args[1])
  357. mwb.verbose = options.verbose
  358. mwb.process()
  359. except:
  360. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  361. if os.path.exists(args[1]+ext):
  362. os.unlink(args[1]+ext)
  363. raise
  364. elif args[0] == "pack":
  365. try:
  366. mwb = MegawarcPacker(args[1])
  367. mwb.verbose = options.verbose
  368. mwb.process(args[2:])
  369. except:
  370. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  371. if os.path.exists(args[1]+ext):
  372. os.unlink(args[1]+ext)
  373. raise
  374. elif args[0] == "restore":
  375. for ext in (".megawarc.warc.gz", ".megawarc.json.gz"):
  376. if not os.path.exists(args[1]+ext):
  377. print >>sys.stderr, "Input file %s does not exist." % (args[1] + ext)
  378. exit(1)
  379. if os.path.exists(args[1]):
  380. print >>sys.stderr, "Outputfile %s already exists." % args[1]
  381. exit(1)
  382. try:
  383. mwr = MegawarcRestorer(args[1])
  384. mwr.verbose = options.verbose
  385. mwr.process()
  386. except:
  387. if os.path.exists(args[1]):
  388. os.unlink(args[1])
  389. raise
  390. else:
  391. parser.print_usage()
  392. exit(1)
  393. if __name__ == "__main__":
  394. main()