You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

458 lines
15 KiB

  1. #!/usr/bin/env python
  2. """
  3. megawarc is useful if you have .tar full of .warc.gz files and
  4. you really want one big .warc.gz. With megawarc you get your
  5. .warc.gz, but you can still restore the original .tar.
  6. The megawarc tool looks for .warc.gz in the .tar file and
  7. creates three files, the megawarc:
  8. FILE.warc.gz is the concatenated .warc.gz
  9. FILE.tar contains any non-warc files from the .tar
  10. FILE.json.gz contains metadata
  11. You need the JSON file to reconstruct the original .tar from
  12. the .warc.gz and .tar files. The JSON file has the location
  13. of every file from the original .tar file.
  14. METADATA FORMAT
  15. ---------------
  16. One line with a JSON object per file in the .tar.
  17. {
  18. "target": {
  19. "container": "warc" or "tar", (where is this file?)
  20. "offset": number, (where in the tar/warc does this
  21. file start? for files in the tar
  22. this includes the tar header,
  23. which is copied to the tar.)
  24. "size": size (where does this file end?
  25. for files in the tar, this includes
  26. the padding to 512 bytes)
  27. },
  28. "src_offsets": {
  29. "entry": number, (where is this file in the original tar?)
  30. "data": number, (where does the data start? entry+512)
  31. "next_entry": number (where does the next tar entry start)
  32. },
  33. "header_fields": {
  34. ... (parsed fields from the tar header)
  35. },
  36. "header_string": string (the tar header for this entry)
  37. }
  38. USAGE
  39. -----
  40. megawarc convert FILE
  41. Converts the tar file (containing .warc.gz files) to a megawarc.
  42. It creates FILE.warc.gz, FILE.tar and FILE.json.gz from FILE.
  43. megawarc pack FILE INFILE_1 [[INFILE_2] ...]
  44. Creates a megawarc with basename FILE and recursively adds the
  45. given files and directories to it, as if they were in a tar file.
  46. It creates FILE.warc.gz, FILE.tar and FILE.json.gz.
  47. megawarc restore FILE
  48. Converts the megawarc back to the original tar.
  49. It reads FILE.warc.gz, FILE.tar and FILE.json.gz to make FILE.
  50. """
  51. import gzip
  52. import json
  53. import os.path
  54. import re
  55. import sys
  56. import tarfile
  57. import zlib
  58. from optparse import OptionParser
  59. try:
  60. from collections import OrderedDict
  61. except ImportError:
  62. from ordereddict import OrderedDict
  63. # modify tarfile.TarInfo to keep the original tar headers
  64. tarfile.TarInfo.orig_frombuf = tarfile.TarInfo.frombuf
  65. @classmethod
  66. def keepbuf_frombuf(cls, buf):
  67. entry = cls.orig_frombuf(buf)
  68. entry.buf = buf
  69. return entry
  70. tarfile.TarInfo.frombuf = keepbuf_frombuf
  71. # open input_filename and write the data from offset to
  72. # (offset+size) to stream
  73. def copy_to_stream(stream, input_filename, offset, size):
  74. with open(input_filename, "r") as f:
  75. f.seek(offset)
  76. to_read = size
  77. while to_read > 0:
  78. buf_size = min(to_read, 4096)
  79. buf = f.read(buf_size)
  80. if len(buf) < buf_size:
  81. raise Exception("End of file: %d bytes expected, but %d bytes read." % (buf_size, len(buf)))
  82. stream.write(buf)
  83. to_read -= len(buf)
  84. stream.flush()
  85. # converting a .tar with warcs to megawarc tar+warc+json
  86. class MegawarcBuilder(object):
  87. def __init__(self, input_filename):
  88. self.verbose = False
  89. self.input_filename = input_filename
  90. self.output_warc_filename = input_filename + ".megawarc.warc.gz"
  91. self.output_tar_filename = input_filename + ".megawarc.tar"
  92. self.output_json_filename = input_filename + ".megawarc.json.gz"
  93. def process(self):
  94. with open(self.output_warc_filename, "wb") as warc_out:
  95. with open(self.output_tar_filename, "wb") as tar_out:
  96. with gzip.open(self.output_json_filename, "wb") as json_out:
  97. with tarfile.open(self.input_filename, "r") as tar:
  98. for tarinfo in tar:
  99. self.process_entry(tarinfo, warc_out, tar_out, json_out)
  100. tar_out.flush()
  101. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  102. if padding > 0:
  103. tar_out.write("\0" * padding)
  104. def test_gz(self, offset, size):
  105. with open(self.input_filename, "r") as f:
  106. z = zlib.decompressobj(15 + 32)
  107. f.seek(offset)
  108. to_read = size
  109. while to_read > 0:
  110. buf_size = min(to_read, 4096)
  111. buf = f.read(buf_size)
  112. if len(buf) < buf_size:
  113. # end of file, not a valid gz
  114. return False
  115. else:
  116. z.decompress(buf)
  117. to_read -= len(buf)
  118. if z.flush()!="":
  119. # remaining uncompressed data
  120. return False
  121. return True
  122. def process_entry(self, entry, warc_out, tar_out, json_out):
  123. # calculate position of tar entry
  124. block_size = (tarfile.BLOCKSIZE + # header
  125. entry.size + # data
  126. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  127. data_offset = entry.offset + tarfile.BLOCKSIZE
  128. next_offset = entry.offset + block_size
  129. d_src_offsets = OrderedDict()
  130. d_src_offsets["entry"] = entry.offset
  131. d_src_offsets["data"] = data_offset
  132. d_src_offsets["next_entry"] = next_offset
  133. # decide what to do with this entry
  134. valid_warc_gz = False
  135. if entry.isfile() and re.search(r"\.warc\.gz", entry.name):
  136. if self.verbose:
  137. print >>sys.stderr, "Checking %s" % entry.name
  138. valid_warc_gz = self.test_gz(data_offset, entry.size)
  139. if not valid_warc_gz:
  140. if self.verbose:
  141. print >>sys.stderr, "Invalid gzip %s" % entry.name
  142. # save in megawarc or in tar
  143. d_target = OrderedDict()
  144. if valid_warc_gz:
  145. # a warc file.gz, add to megawarc
  146. warc_offset = warc_out.tell()
  147. if self.verbose:
  148. print >>sys.stderr, "Copying %s to warc" % entry.name
  149. copy_to_stream(warc_out, self.input_filename, data_offset, entry.size)
  150. d_target["container"] = "warc"
  151. d_target["offset"] = warc_offset
  152. d_target["size"] = entry.size
  153. else:
  154. # not a warc.gz file, add to tar
  155. tar_offset = tar_out.tell()
  156. if self.verbose:
  157. print >>sys.stderr, "Copying %s to tar" % entry.name
  158. copy_to_stream(tar_out, self.input_filename, entry.offset, block_size)
  159. d_target["container"] = "tar"
  160. d_target["offset"] = tar_offset
  161. d_target["size"] = block_size
  162. # store details
  163. d = OrderedDict()
  164. d["target"] = d_target
  165. d["src_offsets"] = d_src_offsets
  166. d["header_fields"] = entry.get_info("utf-8", {})
  167. d["header_string"] = entry.buf
  168. # store metadata
  169. json.dump(d, json_out, separators=(',', ':'))
  170. json_out.write("\n")
  171. # adding .warc.gz and other files to megawarc tar+warc+json
  172. class MegawarcPacker(object):
  173. def __init__(self, output_basename):
  174. self.verbose = False
  175. self.output_basename = output_basename
  176. self.output_warc_filename = output_basename + ".megawarc.warc.gz"
  177. self.output_tar_filename = output_basename + ".megawarc.tar"
  178. self.output_json_filename = output_basename + ".megawarc.json.gz"
  179. self.tar_pos = 0
  180. def process(self, filelist):
  181. with open(self.output_warc_filename, "wb") as warc_out:
  182. with open(self.output_tar_filename, "wb") as tar_out:
  183. with gzip.open(self.output_json_filename, "wb") as json_out:
  184. def each_file(arg, dirname, names):
  185. for n in names:
  186. n = os.path.join(dirname, n)
  187. if os.path.isfile(n):
  188. self.process_file(n, warc_out, tar_out, json_out)
  189. for filename in filelist:
  190. if os.path.isdir(filename):
  191. os.path.walk(filename, each_file, None)
  192. elif os.path.isfile(filename):
  193. self.process_file(filename, warc_out, tar_out, json_out)
  194. tar_out.flush()
  195. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  196. if padding > 0:
  197. tar_out.write("\0" * padding)
  198. def test_gz(self, filename, size):
  199. with open(filename, "r") as f:
  200. z = zlib.decompressobj(15 + 32)
  201. to_read = size
  202. while to_read > 0:
  203. buf_size = min(to_read, 4096)
  204. buf = f.read(buf_size)
  205. if len(buf) < buf_size:
  206. # end of file, not a valid gz
  207. return False
  208. else:
  209. z.decompress(buf)
  210. to_read -= len(buf)
  211. if z.flush()!="":
  212. # remaining uncompressed data
  213. return False
  214. return True
  215. def process_file(self, filename, warc_out, tar_out, json_out):
  216. # make tar header
  217. arcname = filename
  218. arcname = arcname.replace(os.sep, "/")
  219. arcname = arcname.lstrip("/")
  220. entry = tarfile.TarInfo()
  221. statres = os.stat(filename)
  222. stmd = statres.st_mode
  223. entry.name = arcname
  224. entry.mode = stmd
  225. entry.uid = statres.st_uid
  226. entry.gid = statres.st_gid
  227. entry.size = statres.st_size
  228. entry.mtime = statres.st_mtime
  229. entry.type = tarfile.REGTYPE
  230. # find position in imaginary tar
  231. entry.offset = self.tar_pos
  232. # calculate position of tar entry
  233. block_size = (tarfile.BLOCKSIZE + # header
  234. entry.size + # data
  235. (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE)
  236. data_offset = entry.offset + tarfile.BLOCKSIZE
  237. next_offset = entry.offset + block_size
  238. # move to next position in imaginary tar
  239. self.tar_pos = next_offset
  240. d_src_offsets = OrderedDict()
  241. d_src_offsets["entry"] = entry.offset
  242. d_src_offsets["data"] = data_offset
  243. d_src_offsets["next_entry"] = next_offset
  244. # decide what to do with this file
  245. valid_warc_gz = False
  246. if re.search(r"\.warc\.gz", filename):
  247. if self.verbose:
  248. print >>sys.stderr, "Checking %s" % filename
  249. valid_warc_gz = self.test_gz(filename, entry.size)
  250. if not valid_warc_gz:
  251. if self.verbose:
  252. print >>sys.stderr, "Invalid gzip %s" % filename
  253. # save in megawarc or in tar
  254. d_target = OrderedDict()
  255. if valid_warc_gz:
  256. # a warc file.gz, add to megawarc
  257. warc_offset = warc_out.tell()
  258. if self.verbose:
  259. print >>sys.stderr, "Copying %s to warc" % filename
  260. copy_to_stream(warc_out, filename, 0, entry.size)
  261. d_target["container"] = "warc"
  262. d_target["offset"] = warc_offset
  263. d_target["size"] = entry.size
  264. else:
  265. # not a warc.gz file, add to tar
  266. tar_offset = tar_out.tell()
  267. if self.verbose:
  268. print >>sys.stderr, "Copying %s to tar" % filename
  269. tar_out.write(entry.tobuf())
  270. copy_to_stream(tar_out, filename, 0, entry.size)
  271. padding = (tarfile.BLOCKSIZE - entry.size) % tarfile.BLOCKSIZE
  272. if padding > 0:
  273. tar_out.write("\0" * padding)
  274. tar_out.flush()
  275. d_target["container"] = "tar"
  276. d_target["offset"] = tar_offset
  277. d_target["size"] = block_size
  278. # store details
  279. d = OrderedDict()
  280. d["target"] = d_target
  281. d["src_offsets"] = d_src_offsets
  282. d["header_fields"] = entry.get_info("utf-8", {})
  283. d["header_string"] = entry.tobuf()
  284. # store metadata
  285. json.dump(d, json_out, separators=(',', ':'))
  286. json_out.write("\n")
  287. # recreate the original .tar from a megawarc tar+warc+json
  288. class MegawarcRestorer(object):
  289. def __init__(self, output_filename):
  290. self.verbose = False
  291. self.output_filename = output_filename
  292. self.input_warc_filename = output_filename + ".megawarc.warc.gz"
  293. self.input_tar_filename = output_filename + ".megawarc.tar"
  294. self.input_json_filename = output_filename + ".megawarc.json.gz"
  295. def process(self):
  296. with gzip.open(self.input_json_filename, "rb") as json_in:
  297. with open(self.output_filename, "wb") as tar_out:
  298. for line in json_in:
  299. entry = json.loads(line)
  300. self.process_entry(entry, tar_out)
  301. tar_out.flush()
  302. padding = (tarfile.RECORDSIZE - tar_out.tell()) % tarfile.RECORDSIZE
  303. if padding > 0:
  304. tar_out.write("\0" * padding)
  305. def process_entry(self, entry, tar_out):
  306. if entry["target"]["container"] == "warc":
  307. if self.verbose:
  308. print >>sys.stderr, "Copying %s from warc" % entry["header_fields"]["name"]
  309. tar_out.write(entry["header_string"])
  310. copy_to_stream(tar_out, self.input_warc_filename,
  311. entry["target"]["offset"], entry["target"]["size"])
  312. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  313. if padding > 0:
  314. tar_out.write("\0" * padding)
  315. elif entry["target"]["container"] == "tar":
  316. if self.verbose:
  317. print >>sys.stderr, "Copying %s from tar" % entry["header_fields"]["name"]
  318. copy_to_stream(tar_out, self.input_tar_filename,
  319. entry["target"]["offset"], entry["target"]["size"])
  320. padding = (tarfile.BLOCKSIZE - entry["target"]["size"]) % tarfile.BLOCKSIZE
  321. if padding > 0:
  322. tar_out.write("\0" * padding)
  323. else:
  324. raise Exception("Unkown container: %s for %s" %
  325. (entry["target"]["container"], entry["header_fields"]["name"]))
  326. def main():
  327. parser = OptionParser(
  328. usage="Usage: %prog [--verbose] convert FILE\n %prog [--verbose] pack FILE [INFILE [INFILE ...]]\n %prog [--verbose] restore FILE",
  329. description="""%prog convert FILE converts the tar file (containing .warc.gz files) to a megawarc. A megawarc has three parts: 1. a .warc.gz of the concatenated warc files; 2. a .tar with the non-warc files from the original tar; 3. a .json.gz with metadata that can be used to reconstruct the original tar.
  330. Use %prog pack FILE INFILE ... to create a megawarc containing the files.
  331. Use %prog restore FILE to reconstruct original tar.
  332. """
  333. )
  334. parser.add_option("-v", "--verbose", dest="verbose",
  335. action="store_true",
  336. help="print status messages", default=False)
  337. (options, args) = parser.parse_args()
  338. if len(args) < 2:
  339. parser.print_usage()
  340. exit(1)
  341. if args[0] == "convert":
  342. if not os.path.exists(args[1]):
  343. print >>sys.stderr, "Input file %s does not exist." % args[1]
  344. exit(1)
  345. try:
  346. mwb = MegawarcBuilder(args[1])
  347. mwb.verbose = options.verbose
  348. mwb.process()
  349. except:
  350. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  351. if os.path.exists(args[1]+ext):
  352. os.unlink(args[1]+ext)
  353. raise
  354. elif args[0] == "pack":
  355. try:
  356. mwb = MegawarcPacker(args[1])
  357. mwb.verbose = options.verbose
  358. mwb.process(args[2:])
  359. except:
  360. for ext in (".megawarc.warc.gz", ".megawarc.json.gz", ".megawarc.tar"):
  361. if os.path.exists(args[1]+ext):
  362. os.unlink(args[1]+ext)
  363. raise
  364. elif args[0] == "restore":
  365. for ext in (".megawarc.warc.gz", ".megawarc.json.gz"):
  366. if not os.path.exists(args[1]+ext):
  367. print >>sys.stderr, "Input file %s does not exist." % (args[1] + ext)
  368. exit(1)
  369. if os.path.exists(args[1]):
  370. print >>sys.stderr, "Outputfile %s already exists." % args[1]
  371. exit(1)
  372. try:
  373. mwr = MegawarcRestorer(args[1])
  374. mwr.verbose = options.verbose
  375. mwr.process()
  376. except:
  377. if os.path.exists(args[1]):
  378. os.unlink(args[1])
  379. raise
  380. else:
  381. parser.print_usage()
  382. exit(1)
  383. if __name__ == "__main__":
  384. main()