You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

467 lines
15 KiB

  1. #!/usr/bin/env python
  2. """
  3. megawarc is useful if you have .tar full of .warc.gz files and
  4. you really want one big .warc.gz. With megawarc you get your
  5. .warc.gz, but you can still restore the original .tar.
  6. The megawarc tool looks for .warc.gz in the .tar file and
  7. creates three files, the megawarc:
  8. FILE.warc.gz is the concatenated .warc.gz
  9. FILE.tar contains any non-warc files from the .tar
  10. FILE.json.gz contains metadata
  11. You need the JSON file to reconstruct the original .tar from
  12. the .warc.gz and .tar files. The JSON file has the location
  13. of every file from the original .tar file.
  14. METADATA FORMAT
  15. ---------------
  16. One line with a JSON object per file in the .tar.
  17. {
  18. "target": {
  19. "container": "warc" or "tar", (where is this file?)
  20. "offset": number, (where in the tar/warc does this
  21. file start? for files in the tar
  22. this includes the tar header,
  23. which is copied to the tar.)
  24. "size": size (where does this file end?
  25. for files in the tar, this includes
  26. the padding to 512 bytes)
  27. },
  28. "src_offsets": {
  29. "entry": number, (where is this file in the original tar?)
  30. "data": number, (where does the data start? entry+512)
  31. "next_entry": number (where does the next tar entry start)
  32. },
  33. "header_fields": {
  34. ... (parsed fields from the tar header)
  35. },
  36. "header_string": string (the tar header for this entry)
  37. }
  38. USAGE
  39. -----
  40. megawarc convert FILE
  41. Converts the tar file (containing .warc.gz files) to a megawarc.
  42. It creates FILE.warc.gz, FILE.tar and FILE.json.gz from FILE.
  43. megawarc pack FILE INFILE_1 [[INFILE_2] ...]
  44. Creates a megawarc with basename FILE and recursively adds the
  45. given files and directories to it, as if they were in a tar file.
  46. It creates FILE.warc.gz, FILE.tar and FILE.json.gz.
  47. megawarc restore FILE
  48. Converts the megawarc back to the original tar.
  49. It reads FILE.warc.gz, FILE.tar and FILE.json.gz to make FILE.
  50. """
  51. import gzip
  52. import json
  53. import os.path
  54. import re
  55. import sys
  56. import tarfile
  57. from optparse import OptionParser
  58. try:
  59. from collections import OrderedDict
  60. except ImportError:
  61. from ordereddict import OrderedDict
  62. # modify tarfile.TarInfo to keep the original tar headers
  63. tarfile.TarInfo.orig_frombuf = tarfile.TarInfo.frombuf
  64. @classmethod
  65. def keepbuf_frombuf(cls, buf):
  66. entry = cls.orig_frombuf(buf)
  67. entry.buf = buf
  68. return entry
  69. tarfile.TarInfo.frombuf = keepbuf_frombuf
  70. # open input_filename and write the data from offset to
  71. # (offset+size) to stream
  72. def copy_to_stream(stream, input_filename, offset, size):
  73. with open(input_filename, "r") as f:
  74. f.seek(offset)
  75. to_read = size
  76. while to_read > 0:
  77. buf_size = min(to_read, 4096)
  78. buf = f.read(buf_size)
  79. if len(buf) < buf_size:
  80. raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf)))
  81. stream.write(buf)
  82. to_read -= len(buf)
  83. stream.flush()
  84. # part of a stream as a file
  85. # (seek relative to an offset)
  86. class RangeFile(object):
  87. def __init__(self, stream, offset, size):
  88. self._stream = stream
  89. self._offset = offset
  90. self._size = size
  91. self._current_rel_offset = 0
  92. def tell(self):
  93. return self._current_rel_offset
  94. def seek(self, pos, whence=os.SEEK_SET):
  95. if whence == os.SEEK_SET:
  96. self._current_rel_offset = pos
  97. elif whence == os.SEEK_CUR:
  98. self._current_rel_offset += pos
  99. elif whence == os.SEEK_END:
  100. self._current_rel_offset = self._size + pos
  101. else:
  102. raise Exception("Unknown whence: %d." % whence)
  103. if self._current_rel_offset < 0 or self._current_rel_offset > self._size:
  104. raise Exception("Seek outside file: %d." % self._current_rel_offset)
  105. self._stream.seek(self._offset + self._current_rel_offset)
  106. def read(self, size):
  107. size = min(self._size - self._current_rel_offset, size)
  108. self._current_rel_offset += size
  109. buf = self._stream.read(size)
  110. if len(buf) < size:
  111. raise Exception("Expected to read %d but received %d." % (size, len(buf)))
  112. return buf
  113. # check for gzip errors
  114. def test_gz(filename, offset, size, verbose=False):
  115. with open(filename, "r") as f_stream:
  116. f = RangeFile(f_stream, offset, size)
  117. try:
  118. gz = gzip.GzipFile(fileobj=f, mode="rb")
  119. while True:
  120. buf = gz.read(4096)
  121. if len(buf) == 0:
  122. break
  123. except (IOError, ValueError) as e:
  124. if verbose:
  125. print >>sys.stderr, e
  126. return False
  127. return True
  128. # converting a .tar with warcs to megawarc tar+warc+json
  129. class MegawarcBuilder(object):
  130. def __init__(self, input_filename):
  131. self.verbose = False
  132. self.input_filename = input_filename
  133. self.output_warc_filename = input_filename + ".megawarc.warc.gz"
  134. self.output_tar_filename = input_filename + ".megawarc.tar"
  135. self.output_json_filename = input_filename + ".megawarc.json.gz"
  136. def process(self):
  137. with open(self.output_warc_filename, "wb") as warc_out:
  138. with open(self.output_tar_filename, "wb") as tar_out:
  139. with gzip.open(self.output_json_filename, "wb") as json_out:
  140. with tarfile.open(self.input_filename, "r") as tar:
  141. for tarinfo in tar:
  142. self.process_entry(tarinfo, warc_out, tar_out, json_out)
  143. tar_out.flush()
  144. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  145. if padding > 0:
  146. tar_out.write("\0" * padding)
  147. def process_entry(self, entry, warc_out, tar_out, json_out):
  148. # calculate position of tar entry
  149. block_size = (tarfile.BLOCKSIZE + # header
  150. entry.size + # data
  151. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  152. data_offset = entry.offset + tarfile.BLOCKSIZE
  153. next_offset = entry.offset + block_size
  154. d_src_offsets = OrderedDict()
  155. d_src_offsets["entry"] = entry.offset
  156. d_src_offsets["data"] = data_offset
  157. d_src_offsets["next_entry"] = next_offset
  158. # decide what to do with this entry
  159. valid_warc_gz = False
  160. if entry.isfile() and re.search(r"\.warc\.gz", entry.name):
  161. if self.verbose:
  162. print >>sys.stderr, "Checking %s" % entry.name
  163. valid_warc_gz = test_gz(self.input_filename, data_offset, entry.size, self.verbose)
  164. if not valid_warc_gz:
  165. if self.verbose:
  166. print >>sys.stderr, "Invalid gzip %s" % entry.name
  167. # save in megawarc or in tar
  168. d_target = OrderedDict()
  169. if valid_warc_gz:
  170. # a warc file.gz, add to megawarc
  171. warc_offset = warc_out.tell()
  172. if self.verbose:
  173. print >>sys.stderr, "Copying %s to warc" % entry.name
  174. copy_to_stream(warc_out, self.input_filename, data_offset, entry.size)
  175. d_target["container"] = "warc"
  176. d_target["offset"] = warc_offset
  177. d_target["size"] = entry.size
  178. else:
  179. # not a warc.gz file, add to tar
  180. tar_offset = tar_out.tell()
  181. if self.verbose:
  182. print >>sys.stderr, "Copying %s to tar" % entry.name
  183. copy_to_stream(tar_out, self.input_filename, entry.offset, block_size)
  184. d_target["container"] = "tar"
  185. d_target["offset"] = tar_offset
  186. d_target["size"] = block_size
  187. # store details
  188. d = OrderedDict()
  189. d["target"] = d_target
  190. d["src_offsets"] = d_src_offsets
  191. d["header_fields"] = entry.get_info("utf-8", {})
  192. d["header_string"] = entry.buf
  193. # store metadata
  194. json.dump(d, json_out, separators=(',', ':'))
  195. json_out.write("\n")
  196. # adding .warc.gz and other files to megawarc tar+warc+json
  197. class MegawarcPacker(object):
  198. def __init__(self, output_basename):
  199. self.verbose = False
  200. self.output_basename = output_basename
  201. self.output_warc_filename = output_basename + ".megawarc.warc.gz"
  202. self.output_tar_filename = output_basename + ".megawarc.tar"
  203. self.output_json_filename = output_basename + ".megawarc.json.gz"
  204. self.tar_pos = 0
  205. def process(self, filelist):
  206. with open(self.output_warc_filename, "wb") as warc_out:
  207. with open(self.output_tar_filename, "wb") as tar_out:
  208. with gzip.open(self.output_json_filename, "wb") as json_out:
  209. def each_file(arg, dirname, names):
  210. for n in names:
  211. n = os.path.join(dirname, n)
  212. if os.path.isfile(n):
  213. self.process_file(n, warc_out, tar_out, json_out)
  214. for filename in filelist:
  215. if os.path.isdir(filename):
  216. os.path.walk(filename, each_file, None)
  217. elif os.path.isfile(filename):
  218. self.process_file(filename, warc_out, tar_out, json_out)
  219. tar_out.flush()
  220. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  221. if padding > 0:
  222. tar_out.write("\0" * padding)
  223. def process_file(self, filename, warc_out, tar_out, json_out):
  224. # make tar header
  225. arcname = filename
  226. arcname = arcname.replace(os.sep, "/")
  227. arcname = arcname.lstrip("/")
  228. entry = tarfile.TarInfo()
  229. statres = os.stat(filename)
  230. stmd = statres.st_mode
  231. entry.name = arcname
  232. entry.mode = stmd
  233. entry.uid = statres.st_uid
  234. entry.gid = statres.st_gid
  235. entry.size = statres.st_size
  236. entry.mtime = statres.st_mtime
  237. entry.type = tarfile.REGTYPE
  238. # find position in imaginary tar
  239. entry.offset = self.tar_pos
  240. # calculate position of tar entry
  241. block_size = (tarfile.BLOCKSIZE + # header
  242. entry.size + # data
  243. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  244. data_offset = entry.offset + tarfile.BLOCKSIZE
  245. next_offset = entry.offset + block_size
  246. # move to next position in imaginary tar
  247. self.tar_pos = next_offset
  248. d_src_offsets = OrderedDict()
  249. d_src_offsets["entry"] = entry.offset
  250. d_src_offsets["data"] = data_offset
  251. d_src_offsets["next_entry"] = next_offset
  252. # decide what to do with this file
  253. valid_warc_gz = False
  254. if re.search(r"\.warc\.gz", filename):
  255. if self.verbose:
  256. print >>sys.stderr, "Checking %s" % filename
  257. valid_warc_gz = test_gz(filename, 0, entry.size, self.verbose)
  258. if not valid_warc_gz:
  259. if self.verbose:
  260. print >>sys.stderr, "Invalid gzip %s" % filename
  261. # save in megawarc or in tar
  262. d_target = OrderedDict()
  263. if valid_warc_gz:
  264. # a warc file.gz, add to megawarc
  265. warc_offset = warc_out.tell()
  266. if self.verbose:
  267. print >>sys.stderr, "Copying %s to warc" % filename
  268. copy_to_stream(warc_out, filename, 0, entry.size)
  269. d_target["container"] = "warc"
  270. d_target["offset"] = warc_offset
  271. d_target["size"] = entry.size
  272. else:
  273. # not a warc.gz file, add to tar
  274. tar_offset = tar_out.tell()
  275. if self.verbose:
  276. print >>sys.stderr, "Copying %s to tar" % filename
  277. tar_out.write(entry.tobuf())
  278. copy_to_stream(tar_out, filename, 0, entry.size)
  279. padding = (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE
  280. if padding > 0:
  281. tar_out.write("\0" * padding)
  282. tar_out.flush()
  283. d_target["container"] = "tar"
  284. d_target["offset"] = tar_offset
  285. d_target["size"] = block_size
  286. # store details
  287. d = OrderedDict()
  288. d["target"] = d_target
  289. d["src_offsets"] = d_src_offsets
  290. d["header_fields"] = entry.get_info("utf-8", {})
  291. d["header_string"] = entry.tobuf()
  292. # store metadata
  293. json.dump(d, json_out, separators=(',', ':'))
  294. json_out.write("\n")
  295. # recreate the original .tar from a megawarc tar+warc+json
  296. class MegawarcRestorer(object):
  297. def __init__(self, output_filename):
  298. self.verbose = False
  299. self.output_filename = output_filename
  300. self.input_warc_filename = output_filename + ".megawarc.warc.gz"
  301. self.input_tar_filename = output_filename + ".megawarc.tar"
  302. self.input_json_filename = output_filename + ".megawarc.json.gz"
  303. def process(self):
  304. with gzip.open(self.input_json_filename, "rb") as json_in:
  305. with open(self.output_filename, "wb") as tar_out:
  306. for line in json_in:
  307. entry = json.loads(line)
  308. self.process_entry(entry, tar_out)
  309. tar_out.flush()
  310. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  311. if padding > 0:
  312. tar_out.write("\0" * padding)
  313. def process_entry(self, entry, tar_out):
  314. if entry["target"]["container"] == "warc":
  315. if self.verbose:
  316. print >>sys.stderr, "Copying %s from warc" % entry["header_fields"]["name"]
  317. tar_out.write(entry["header_string"])
  318. copy_to_stream(tar_out, self.input_warc_filename,
  319. entry["target"]["offset"], entry["target"]["size"])
  320. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  321. if padding > 0:
  322. tar_out.write("\0" * padding)
  323. elif entry["target"]["container"] == "tar":
  324. if self.verbose:
  325. print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"]
  326. copy_to_stream(tar_out, self.input_tar_filename,
  327. entry["target"]["offset"], entry["target"]["size"])
  328. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  329. if padding > 0:
  330. tar_out.write("\0" * padding)
  331. else:
  332. raise Exception("Unkown container: %s for %s" %
  333. (entry["target"]["container"], entry["header_fields"]["name"]))
  334. def main():
  335. parser = OptionParser(
  336. usage="Usage: %prog [--verbose] convert FILE\n %prog [--verbose] pack FILE [INFILE [INFILE ...]]\n %prog [--verbose] restore FILE",
  337. description="""%prog convert FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar.
  338. Use %prog pack FILE INFILE ... to create a megawarc containing the files.
  339. Use %prog restore FILE to reconstruct original tar.
  340. """
  341. )
  342. parser.add_option("-v", "--verbose", dest="verbose",
  343. action="store_true",
  344. help="print status messages", default=False)
  345. (options, args) = parser.parse_args()
  346. if len(args) < 2:
  347. parser.print_usage()
  348. exit(1)
  349. if args[0] == "convert":
  350. if not os.path.exists(args[1]):
  351. print >>sys.stderr, "Input file %s does not exist." % args[1]
  352. exit(1)
  353. try:
  354. mwb = MegawarcBuilder(args[1])
  355. mwb.verbose = options.verbose
  356. mwb.process()
  357. except:
  358. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  359. if os.path.exists(args[1]+ext):
  360. os.unlink(args[1]+ext)
  361. raise
  362. elif args[0] == "pack":
  363. try:
  364. mwb = MegawarcPacker(args[1])
  365. mwb.verbose = options.verbose
  366. mwb.process(args[2:])
  367. except:
  368. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  369. if os.path.exists(args[1]+ext):
  370. os.unlink(args[1]+ext)
  371. raise
  372. elif args[0] == "restore":
  373. for ext in (".megawarc.warc.gz", ".megawarc.json.gz"):
  374. if not os.path.exists(args[1]+ext):
  375. print >>sys.stderr, "Input file %s does not exist." % (args[1] + ext)
  376. exit(1)
  377. if os.path.exists(args[1]):
  378. print >>sys.stderr, "Outputfile %s already exists." % args[1]
  379. exit(1)
  380. try:
  381. mwr = MegawarcRestorer(args[1])
  382. mwr.verbose = options.verbose
  383. mwr.process()
  384. except:
  385. if os.path.exists(args[1]):
  386. os.unlink(args[1])
  387. raise
  388. else:
  389. parser.print_usage()
  390. exit(1)
  391. if __name__ == "__main__":
  392. main()