From 85a718f70518e52df106ca95c947199264a37211 Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Fri, 8 Mar 2024 13:18:28 -0600 Subject: [PATCH 1/7] Add support for pretokenized tars. --- .../datapreprocess/ray/tokenize_shuffle.py | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/open_lm/datapreprocess/ray/tokenize_shuffle.py b/open_lm/datapreprocess/ray/tokenize_shuffle.py index 665dbbb5..1b8ea290 100644 --- a/open_lm/datapreprocess/ray/tokenize_shuffle.py +++ b/open_lm/datapreprocess/ray/tokenize_shuffle.py @@ -90,6 +90,7 @@ class RawFileType(enum.Enum): ZSTD_JSONL_COMPRESSED = 2 GZIP_JSONL_COMPRESSED = 3 TAR = 4 + TAR_PRETOK = 5 UNKNOWN = -1 @@ -118,13 +119,17 @@ def tar_reader(fh: BinaryIO, content_key: str): """ content_key: where in the tarfile to find the text/tokens. Options: "txt" - read text file as string + "json" - read json file "json:key" - read json[key] as string + "json.gz" - same as json, but also gzipped + "json.gz:key" - same as json.gz, but also gzipped "npy" - read numpy array as tokens """ + # TODO(gsmyrnis) - I think some of the modes (namely npy) are not clean on whether they are still useful - consider + # removing them in the future. content_ext = content_key.split(":")[0] buffer = io.BytesIO(fh.read()) with tarfile.open(fileobj=buffer, mode="r") as tar: - samples = [] for member in tar.getmembers(): if member.isfile() and member.name.endswith(f".{content_ext}"): with tar.extractfile(member) as fileobj: @@ -132,8 +137,20 @@ def tar_reader(fh: BinaryIO, content_key: str): if content_ext == "txt": content = fileobj.read().decode("utf-8") elif content_ext == "json": - json_dict, json_key = json.load(fileobj), content_key.split(":")[1] - content = json_dict[json_key] + json_data = json.load(fileobj) + if isinstance(json_data, dict): + json_key = content_key.split(":")[1] + content = json_data[json_key] + else: + content = json_data + elif content_ext == "json.gz": + with gzip.open(fileobj, "rb") as fileobj_unzip: + json_data = json.load(fileobj_unzip) + if isinstance(json_data, dict): + json_key = content_key.split(":")[1] + content = json_data[json_key] + else: + content = json_data elif content_ext == "npy": token_array = np.load(io.BytesIO(fileobj.read()), allow_pickle=True) content = token_array.reshape(-1).tolist() @@ -256,6 +273,7 @@ def preprocess( do_sample: bool = False, sources: enum.Enum = None, source_counter: GlobalCounter = None, + pretok_tars: bool = False ): tokenizer_fn, vocab_size = tokenizer rng = random.Random(hash(key) + seed) @@ -273,7 +291,10 @@ def preprocess( pbar = tqdm(file_reader(fh), mininterval=10) pbar.set_description(key) for string in pbar: - tokens = tokenizer_fn(string) + if file_type == RawFileType.TAR and pretok_tars: + tokens = string + else: + tokens = tokenizer_fn(string) tokens.append(EOT) buffer += tokens while len(buffer) >= seqlen: @@ -308,7 +329,7 @@ def preprocess( return [] -def process_keys(data, tokenizer, seqlen, seed, content_key, do_sample, sources=None, source_counters=None): +def process_keys(data, tokenizer, seqlen, seed, content_key, do_sample, pretok_tars, sources=None, source_counters=None): path = data["path"] if path.startswith("s3"): @@ -337,6 +358,7 @@ def process_keys(data, tokenizer, seqlen, seed, content_key, do_sample, sources= do_sample=do_sample, sources=sources, source_counter=source_counter, + pretok_tars=pretok_tars ) # Ensure that all operations on the file handle are done within this block @@ -569,6 +591,7 @@ def main(args): "--ray_dashboard_host", type=str, default="127.0.0.1" ) # default is localhost; for slurm jobs do 0.0.0.0 parser.add_argument("--suffixes", nargs="+", default=[".json", ".jsonl", ".zst", ".zstd", ".tar", ".gz"]) + parser.add_argument("--pretok-tars", action="store_true", help="Assume tars contain pretokenized data.") args = parser.parse_args(args) if args.do_sample: @@ -650,6 +673,7 @@ def main(args): seed=args.seed, content_key=content_key, do_sample=args.do_sample, + pretok_tars=args.pretok_tars, sources=Sources, source_counters=source_counters, ) From 96a97489b3a9807db75dd0e25c1dc0b6e92ab0e9 Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Fri, 8 Mar 2024 13:38:55 -0600 Subject: [PATCH 2/7] Formatting. --- open_lm/datapreprocess/ray/tokenize_shuffle.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/open_lm/datapreprocess/ray/tokenize_shuffle.py b/open_lm/datapreprocess/ray/tokenize_shuffle.py index 1b8ea290..551ec00a 100644 --- a/open_lm/datapreprocess/ray/tokenize_shuffle.py +++ b/open_lm/datapreprocess/ray/tokenize_shuffle.py @@ -273,7 +273,7 @@ def preprocess( do_sample: bool = False, sources: enum.Enum = None, source_counter: GlobalCounter = None, - pretok_tars: bool = False + pretok_tars: bool = False, ): tokenizer_fn, vocab_size = tokenizer rng = random.Random(hash(key) + seed) @@ -329,7 +329,9 @@ def preprocess( return [] -def process_keys(data, tokenizer, seqlen, seed, content_key, do_sample, pretok_tars, sources=None, source_counters=None): +def process_keys( + data, tokenizer, seqlen, seed, content_key, do_sample, pretok_tars, sources=None, source_counters=None +): path = data["path"] if path.startswith("s3"): @@ -358,7 +360,7 @@ def process_keys(data, tokenizer, seqlen, seed, content_key, do_sample, pretok_t do_sample=do_sample, sources=sources, source_counter=source_counter, - pretok_tars=pretok_tars + pretok_tars=pretok_tars, ) # Ensure that all operations on the file handle are done within this block @@ -591,7 +593,7 @@ def main(args): "--ray_dashboard_host", type=str, default="127.0.0.1" ) # default is localhost; for slurm jobs do 0.0.0.0 parser.add_argument("--suffixes", nargs="+", default=[".json", ".jsonl", ".zst", ".zstd", ".tar", ".gz"]) - parser.add_argument("--pretok-tars", action="store_true", help="Assume tars contain pretokenized data.") + parser.add_argument("--pretok_tars", action="store_true", help="Assume tars contain pretokenized data.") args = parser.parse_args(args) if args.do_sample: From 69bbf4b961a735e6a67098cbf823e316aa13b007 Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Fri, 8 Mar 2024 17:23:59 -0600 Subject: [PATCH 3/7] Add automated test. --- tests/test_tokenize_shuffle.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/test_tokenize_shuffle.py b/tests/test_tokenize_shuffle.py index 97d73169..0388ddba 100644 --- a/tests/test_tokenize_shuffle.py +++ b/tests/test_tokenize_shuffle.py @@ -115,3 +115,37 @@ def test_tokenize_shuffle_local_read_local_write(): total += len(x["json.gz"]) assert total == NUM_TOKENS assert exit_value == 0 + + +def test_tokenize_shuffle_with_pretokenized(): + content_len = 2048 + NUM_TOKENS = 24508089 + # download a small test json file and store at ./test_input + os.system("mkdir test_input") + os.system("mkdir test_output") + os.system( + "wget -O ./test_input/wikipedia_sample.jsonl https://huggingface.co/datasets/togethercomputer/RedPajama-Data-1T-Sample/resolve/main/wikipedia_sample.jsonl" + ) + # run tokenize script + exit_value_1 = os.system( + f"python open_lm/datapreprocess/ray/tokenize_shuffle.py --input ./test_input --content_key text --seqlen {content_len} --output ./test_output/" + ) + assert exit_value_1 == 0 + + os.system("cp -r ./test_output ./test_input_2a") + os.system("cp -r ./test_output ./test_input_2b") + os.system("mkdir test_output_2") + + exit_value_2 = os.system( + f"python open_lm/datapreprocess/ray/tokenize_shuffle.py --input ./test_input_2a,./test_input_2b --content_key json.gz --seqlen {content_len} --output ./test_output_2 --pretok_tars" + ) + assert exit_value_2 == 0 + + tars = [os.path.join("test_output_2", fname) for fname in os.listdir("test_output_2") if fname.endswith(".tar")] + total = 0 + for tar in tars: + ds = wds.WebDataset(tar).decode() + for x in ds: + assert len(x["json.gz"]) == content_len + 1 + total += len(x["json.gz"]) + assert total == 2 * NUM_TOKENS \ No newline at end of file From e60874e4d2183e72bbccba31c9fdd8926f431182 Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Fri, 8 Mar 2024 18:27:27 -0600 Subject: [PATCH 4/7] Debugging attempts. --- open_lm/datapreprocess/ray/tokenize_shuffle.py | 4 ++++ tests/test_tokenize_shuffle.py | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/open_lm/datapreprocess/ray/tokenize_shuffle.py b/open_lm/datapreprocess/ray/tokenize_shuffle.py index 551ec00a..e7273112 100644 --- a/open_lm/datapreprocess/ray/tokenize_shuffle.py +++ b/open_lm/datapreprocess/ray/tokenize_shuffle.py @@ -596,6 +596,9 @@ def main(args): parser.add_argument("--pretok_tars", action="store_true", help="Assume tars contain pretokenized data.") args = parser.parse_args(args) + + assert not args.pretok_tars or args.suffixes == [".tar"], "Currently mixing with tokenized and untokenized data at the same time is not supported." + if args.do_sample: Sources, SAMPLING_FREQUENCIES = load_from_yaml(args.default_dataset_yaml) logger.info(f"SOURCES:\n {Sources}") @@ -637,6 +640,7 @@ def main(args): input_paths = input_paths[: args.subset] if args.subfraction is not None: input_paths = input_paths[: int(args.subfraction * len(input_paths))] + print("Files considered: \n", input_paths) print(f"num files ={len(input_paths)}") num_files = len(input_paths) diff --git a/tests/test_tokenize_shuffle.py b/tests/test_tokenize_shuffle.py index 0388ddba..516f524a 100644 --- a/tests/test_tokenize_shuffle.py +++ b/tests/test_tokenize_shuffle.py @@ -137,7 +137,7 @@ def test_tokenize_shuffle_with_pretokenized(): os.system("mkdir test_output_2") exit_value_2 = os.system( - f"python open_lm/datapreprocess/ray/tokenize_shuffle.py --input ./test_input_2a,./test_input_2b --content_key json.gz --seqlen {content_len} --output ./test_output_2 --pretok_tars" + f"python open_lm/datapreprocess/ray/tokenize_shuffle.py --input ./test_input_2a,./test_input_2b --content_key json.gz --seqlen {content_len} --output ./test_output_2 --pretok_tars --suffixes .tar" ) assert exit_value_2 == 0 @@ -148,4 +148,4 @@ def test_tokenize_shuffle_with_pretokenized(): for x in ds: assert len(x["json.gz"]) == content_len + 1 total += len(x["json.gz"]) - assert total == 2 * NUM_TOKENS \ No newline at end of file + assert total == 2 * NUM_TOKENS From d200b856257a06c332e61ce10b351c7427b894f4 Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Fri, 8 Mar 2024 19:36:17 -0600 Subject: [PATCH 5/7] Debugging. --- open_lm/datapreprocess/ray/tokenize_shuffle.py | 2 +- tests/test_tokenize_shuffle.py | 13 ++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/open_lm/datapreprocess/ray/tokenize_shuffle.py b/open_lm/datapreprocess/ray/tokenize_shuffle.py index e7273112..75c18adf 100644 --- a/open_lm/datapreprocess/ray/tokenize_shuffle.py +++ b/open_lm/datapreprocess/ray/tokenize_shuffle.py @@ -295,7 +295,7 @@ def preprocess( tokens = string else: tokens = tokenizer_fn(string) - tokens.append(EOT) + tokens.append(EOT) buffer += tokens while len(buffer) >= seqlen: if do_sample: diff --git a/tests/test_tokenize_shuffle.py b/tests/test_tokenize_shuffle.py index 516f524a..5aa6a912 100644 --- a/tests/test_tokenize_shuffle.py +++ b/tests/test_tokenize_shuffle.py @@ -132,8 +132,10 @@ def test_tokenize_shuffle_with_pretokenized(): ) assert exit_value_1 == 0 - os.system("cp -r ./test_output ./test_input_2a") - os.system("cp -r ./test_output ./test_input_2b") + os.system("mkdir test_input_2a") + os.system("mkdir test_input_2b") + os.system("cp -r ./test_output/00000001.tar ./test_input_2a/") + os.system("cp -r ./test_output/00000002.tar ./test_input_2b/") os.system("mkdir test_output_2") exit_value_2 = os.system( @@ -148,4 +150,9 @@ def test_tokenize_shuffle_with_pretokenized(): for x in ds: assert len(x["json.gz"]) == content_len + 1 total += len(x["json.gz"]) - assert total == 2 * NUM_TOKENS + + os.system("rm -rf test_input_2a") + os.system("rm -rf test_input_2b") + os.system("rm -rf test_output_2") + + assert total == NUM_TOKENS From a8282c810c25994a7c9d4a7ac281d316cf0d4275 Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Sat, 9 Mar 2024 21:55:03 -0600 Subject: [PATCH 6/7] Formatting. --- open_lm/datapreprocess/ray/tokenize_shuffle.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/open_lm/datapreprocess/ray/tokenize_shuffle.py b/open_lm/datapreprocess/ray/tokenize_shuffle.py index 75c18adf..63e07fd2 100644 --- a/open_lm/datapreprocess/ray/tokenize_shuffle.py +++ b/open_lm/datapreprocess/ray/tokenize_shuffle.py @@ -597,7 +597,9 @@ def main(args): args = parser.parse_args(args) - assert not args.pretok_tars or args.suffixes == [".tar"], "Currently mixing with tokenized and untokenized data at the same time is not supported." + assert not args.pretok_tars or args.suffixes == [ + ".tar" + ], "Currently mixing with tokenized and untokenized data at the same time is not supported." if args.do_sample: Sources, SAMPLING_FREQUENCIES = load_from_yaml(args.default_dataset_yaml) From c5c2b1b89918aa18ce4fa226bad23efd3066f4d7 Mon Sep 17 00:00:00 2001 From: George Smyrnis Date: Sun, 10 Mar 2024 14:12:15 -0500 Subject: [PATCH 7/7] Fix duplicate file names. --- open_lm/datapreprocess/ray/tokenize_shuffle.py | 2 +- tests/test_tokenize_shuffle.py | 17 +++++------------ 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/open_lm/datapreprocess/ray/tokenize_shuffle.py b/open_lm/datapreprocess/ray/tokenize_shuffle.py index 63e07fd2..e0ba33fa 100644 --- a/open_lm/datapreprocess/ray/tokenize_shuffle.py +++ b/open_lm/datapreprocess/ray/tokenize_shuffle.py @@ -251,7 +251,7 @@ def _flush_buffer(self, folder, counter): tokens = [int(x) for x in self.buffer[i]["tokens"]] token_count += len(tokens) json_string = json.dumps(tokens) - uid = hashlib.md5(json_string.encode()).hexdigest() + uid = f"{tar_index_str}_{i:0{digits}}" sample = {"__key__": uid, "json.gz": json_string} sink.write(sample) bio.seek(0) diff --git a/tests/test_tokenize_shuffle.py b/tests/test_tokenize_shuffle.py index 5aa6a912..7e2240fc 100644 --- a/tests/test_tokenize_shuffle.py +++ b/tests/test_tokenize_shuffle.py @@ -132,18 +132,15 @@ def test_tokenize_shuffle_with_pretokenized(): ) assert exit_value_1 == 0 - os.system("mkdir test_input_2a") - os.system("mkdir test_input_2b") - os.system("cp -r ./test_output/00000001.tar ./test_input_2a/") - os.system("cp -r ./test_output/00000002.tar ./test_input_2b/") - os.system("mkdir test_output_2") + os.system("cp -r ./test_output ./test_input/2a/") + os.system("cp -r ./test_output ./test_input/2b/") exit_value_2 = os.system( - f"python open_lm/datapreprocess/ray/tokenize_shuffle.py --input ./test_input_2a,./test_input_2b --content_key json.gz --seqlen {content_len} --output ./test_output_2 --pretok_tars --suffixes .tar" + f"python open_lm/datapreprocess/ray/tokenize_shuffle.py --input ./test_input/2a,./test_input/2b --content_key json.gz --seqlen {content_len} --output ./test_output/2 --pretok_tars --suffixes .tar" ) assert exit_value_2 == 0 - tars = [os.path.join("test_output_2", fname) for fname in os.listdir("test_output_2") if fname.endswith(".tar")] + tars = [os.path.join("test_output/2", fname) for fname in os.listdir("test_output/2") if fname.endswith(".tar")] total = 0 for tar in tars: ds = wds.WebDataset(tar).decode() @@ -151,8 +148,4 @@ def test_tokenize_shuffle_with_pretokenized(): assert len(x["json.gz"]) == content_len + 1 total += len(x["json.gz"]) - os.system("rm -rf test_input_2a") - os.system("rm -rf test_input_2b") - os.system("rm -rf test_output_2") - - assert total == NUM_TOKENS + assert total == 2 * NUM_TOKENS