Spaces:
Runtime error
Runtime error
| """Run ray tests. | |
| poetry add protobuf="^3.20.1" | |
| """ | |
| import multiprocessing | |
| import os | |
| from multiprocessing import Pool | |
| from pathlib import Path | |
| # import joblib | |
| import more_itertools as mit | |
| import numpy as np | |
| import ray | |
| from about_time import about_time | |
| from logzero import logger | |
| from radio_embed import radio_embed | |
| num_cpus = multiprocessing.cpu_count() | |
| filename = "fangfang-en.txt" | |
| lines = Path(filename).read_text("utf8").splitlines() | |
| lst = [_.strip() for _ in lines if _.strip()] | |
| # with about_time() as dur: res1 = radio_embed("\n".join(lst)) | |
| # 143.72 s | |
| # ray.init(num_cpus=num_cpus) | |
| def test_pool(func, args_): | |
| """Test.""" | |
| with Pool(num_cpus) as pool: | |
| ret = pool.map(func, args_) | |
| # pool.close() | |
| # pool.join() | |
| return ret | |
| args = "\n".join(lst) | |
| args = ["\n".join(elm) for elm in mit.divide(num_cpus, lst)] | |
| # with about_time() as dur2: res2 = test_pool(radio_embed, args) | |
| # print(dur2.duration) | |
| # 26.5s about 1/6 of | |
| # res2a = np.concatenate(res2) | |
| # np.allclose(res1, res2a, rtol=1e-05, atol=1e-07) | |
| # %timeit ret = joblib.Parallel(n_jobs=num_cpus, backend='loky', verbose=0)(joblib.delayed(radio_embed)(arg) for arg in args) | |
| # 28.1 s ± 1.08 s per loop (mean ± std. dev. of 7 runs, 1 loop each) | |
| # with about_time() as dur4: ret4 = joblib.Parallel(n_jobs=num_cpus, backend='multiprocessing', verbose=0)(joblib.delayed(radio_embed)(arg) for arg in args) | |
| # dur4.duration 28.48s | |
| # ret4a = np.concatenate(ret4) | |
| # assert np.allclose(res1, ret4a, rtol=1e-05, atol=1e-07) | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| if not ray.is_initialized(): | |
| ray.init(num_cpus=num_cpus) | |
| def ray_embed(text): | |
| """Embed text to d-512.""" | |
| return radio_embed(text) | |
| def main(): | |
| """Run.""" | |
| _ = """ | |
| with about_time() as dur: | |
| res1 = radio_embed("\n".join(lst)) | |
| print(dur.duration_human) | |
| # 143.72 s 137 s | |
| # """ | |
| with about_time() as dur5: | |
| _ = [ray_embed.remote(arg) for arg in args] | |
| res5 = ray.get(_) | |
| print(num_cpus, dur5.duration_human) # 40s | |
| res5a = np.concatenate(res5) | |
| # _ = np.allclose(res1, res5a, rtol=1e-05, atol=1e-07) | |
| # print(_) | |
| ray.shutdown() | |
| ray.init(num_cpus=num_cpus // 2) | |
| with about_time() as dur5a: | |
| _ = [ray_embed.remote(arg) for arg in args] | |
| res6 = ray.get(_) | |
| print(num_cpus // 2, dur5a.duration_human) # 40s | |
| res6a = np.concatenate(res6) | |
| _ = np.allclose(res5a, res6a, rtol=1e-05, atol=1e-07) | |
| logger.info(" res5a allclose to res6a? %s", _) | |
| ray.shutdown() | |
| ray.init(num_cpus=2) | |
| with about_time() as dur7: | |
| _ = [ray_embed.remote(arg) for arg in args] | |
| res7 = ray.get(_) | |
| print(2, dur7.duration_human) # 90s | |
| res7a = np.concatenate(res7) | |
| _ = np.allclose(res5a, res7a, rtol=1e-05, atol=1e-07) | |
| logger.info(" res5a allclose to res7a? %s", _) | |
| # num_cpus - 1 | |
| ray.shutdown() | |
| ray.init(num_cpus=num_cpus - 1) | |
| with about_time() as dur8: | |
| _ = [ray_embed.remote(arg) for arg in args] | |
| res8 = ray.get(_) | |
| print(num_cpus - 1, dur8.duration_human) # 44s | |
| res8a = np.concatenate(res8) | |
| _ = np.allclose(res5a, res8a, rtol=1e-05, atol=1e-07) | |
| logger.info(" res5a allclose to res8a? %s", _) | |
| print(num_cpus, dur5.duration_human) # 32s | |
| print(num_cpus // 2, dur5a.duration_human) # 38s | |
| print(2, dur7.duration_human) # 90s | |
| print(num_cpus - 1, dur8.duration_human) # 44s | |
| if __name__ == "__main__": | |
| main() | |