ARTEMIS-CRIB
 
Loading...
Searching...
No Matches
custom_hadd.cpp
Go to the documentation of this file.
1#include "Compression.h"
2#include "ROOT/StringConv.hxx"
3#include "ROOT/TIOFeatures.hxx"
4#include "TClass.h"
5#include "TFile.h"
6#include "THashList.h"
7#include "TKey.h"
8#include "TSystem.h"
9#include "TUUID.h"
10#include "snprintf.h"
11#include <ROOT/RConfig.hxx>
12
13#include <climits>
14#include <cstdlib>
15#include <fstream>
16#include <iostream>
17#include <sstream>
18#include <string>
19
20#include "TFileMerger.h"
21#ifndef R__WIN32
22#include "ROOT/TProcessExecutor.hxx"
23#endif
24
25////////////////////////////////////////////////////////////////////////////////
26
27int main(int argc, char **argv) {
28 if (argc < 3 || "-h" == std::string(argv[1]) || "--help" == std::string(argv[1])) {
29 std::cerr << "\nusage: hadd [-a] [-f] [-f[0-9]] [-fk] [-ff] [-k] [-O] [-T] [-v V] "
30 << "[-j J] [-dbg] [-d D] [-n N] [-cachesize CACHESIZE] [-experimental-io-features "
31 << "EXPERIMENTAL_IO_FEATURES] TARGET SOURCES \n\n"
32 << "This program will add histograms, trees and other objects from a list\n"
33 << "of ROOT files and write them to a target ROOT file. The target file is\n"
34 << "newly created and must not exist, or if -f (\" force \") is given, must\n"
35 << "not be one of the source files.\n\n"
36 << "It is copied from ROOT source file and linked artemis library\n";
37 return (argc == 2 && ("-h" == std::string(argv[1]) || "--help" == std::string(argv[1])))
38 ? 0
39 : 1;
40 }
41
42 ROOT::TIOFeatures features;
43 Bool_t append = kFALSE;
44 Bool_t force = kFALSE;
45 Bool_t skip_errors = kFALSE;
46 Bool_t reoptimize = kFALSE;
47 Bool_t noTrees = kFALSE;
48 Bool_t keepCompressionAsIs = kFALSE;
49 Bool_t useFirstInputCompression = kFALSE;
50 Bool_t multiproc = kFALSE;
51 Bool_t debug = kFALSE;
52 Int_t maxopenedfiles = 0;
53 Int_t verbosity = 99;
54 TString cacheSize;
55 SysInfo_t s;
56 gSystem->GetSysInfo(&s);
57 auto nProcesses = s.fCpus;
58 auto workingDir = gSystem->TempDirectory();
59 int outputPlace = 0;
60 int ffirst = 2;
61 Int_t newcomp = -1;
62 for (int a = 1; a < argc; ++a) {
63 if (strcmp(argv[a], "-T") == 0) {
64 noTrees = kTRUE;
65 ++ffirst;
66 } else if (strcmp(argv[a], "-a") == 0) {
67 append = kTRUE;
68 ++ffirst;
69 } else if (strcmp(argv[a], "-f") == 0) {
70 force = kTRUE;
71 ++ffirst;
72 } else if (strcmp(argv[a], "-k") == 0) {
73 skip_errors = kTRUE;
74 ++ffirst;
75 } else if (strcmp(argv[a], "-O") == 0) {
76 reoptimize = kTRUE;
77 ++ffirst;
78 } else if (strcmp(argv[a], "-dbg") == 0) {
79 debug = kTRUE;
80 verbosity = kTRUE;
81 ++ffirst;
82 } else if (strcmp(argv[a], "-d") == 0) {
83 if (a + 1 != argc && argv[a + 1][0] != '-') {
84 if (gSystem->AccessPathName(argv[a + 1])) {
85 std::cerr << "Error: could not access the directory specified: " << argv[a + 1]
86 << ". We will use the system's temporal directory.\n";
87 } else {
88 workingDir = argv[a + 1];
89 }
90 ++a;
91 ++ffirst;
92 } else {
93 std::cout << "-d: no directory specified. We will use the system's temporal directory.\n";
94 }
95 ++ffirst;
96 } else if (strcmp(argv[a], "-j") == 0) {
97 // If the number of processes is not specified, use the default.
98 if (a + 1 != argc && argv[a + 1][0] != '-') {
99 // number of processes specified
100 Long_t request = 1;
101 for (char *c = argv[a + 1]; *c != '\0'; ++c) {
102 if (!isdigit(*c)) {
103 // Wrong number of Processes. Use the default:
104 std::cerr << "Error: could not parse the number of processes to run in parallel passed after -j: "
105 << argv[a + 1] << ". We will use the system maximum.\n";
106 request = 0;
107 break;
108 }
109 }
110 if (request == 1) {
111 request = strtol(argv[a + 1], 0, 10);
112 if (request < kMaxLong && request >= 0) {
113 nProcesses = (Int_t)request;
114 ++a;
115 ++ffirst;
116 std::cout << "Parallelizing with " << nProcesses << " processes.\n";
117 } else {
118 std::cerr << "Error: could not parse the number of processes to use passed after -j: " << argv[a + 1]
119 << ". We will use the default value (number of logical cores).\n";
120 }
121 }
122 }
123 multiproc = kTRUE;
124 ++ffirst;
125 } else if (strcmp(argv[a], "-cachesize=") == 0) {
126 int size;
127 static constexpr size_t arglen = std::char_traits<char>::length("-cachesize=");
128 auto parseResult = ROOT::FromHumanReadableSize(argv[a] + arglen, size);
129 if (parseResult == ROOT::EFromHumanReadableSize::kParseFail) {
130 std::cerr << "Error: could not parse the cache size passed after -cachesize: "
131 << argv[a + 1] << ". We will use the default value.\n";
132 } else if (parseResult == ROOT::EFromHumanReadableSize::kOverflow) {
133 double m;
134 const char *munit = nullptr;
135 ROOT::ToHumanReadableSize(INT_MAX, false, &m, &munit);
136 std::cerr << "Error: the cache size passed after -cachesize is too large: "
137 << argv[a + 1] << " is greater than " << m << munit
138 << ". We will use the default value.\n";
139 } else {
140 cacheSize = "cachesize=";
141 cacheSize.Append(argv[a] + 1);
142 }
143 ++ffirst;
144 } else if (strcmp(argv[a], "-cachesize") == 0) {
145 if (a + 1 >= argc) {
146 std::cerr << "Error: no cache size number was provided after -cachesize.\n";
147 } else {
148 int size;
149 auto parseResult = ROOT::FromHumanReadableSize(argv[a + 1], size);
150 if (parseResult == ROOT::EFromHumanReadableSize::kParseFail) {
151 std::cerr << "Error: could not parse the cache size passed after -cachesize: "
152 << argv[a + 1] << ". We will use the default value.\n";
153 } else if (parseResult == ROOT::EFromHumanReadableSize::kOverflow) {
154 double m;
155 const char *munit = nullptr;
156 ROOT::ToHumanReadableSize(INT_MAX, false, &m, &munit);
157 std::cerr << "Error: the cache size passed after -cachesize is too large: "
158 << argv[a + 1] << " is greater than " << m << munit
159 << ". We will use the default value.\n";
160 ++a;
161 ++ffirst;
162 } else {
163 cacheSize = "cachesize=";
164 cacheSize.Append(argv[a + 1]);
165 ++a;
166 ++ffirst;
167 }
168 }
169 ++ffirst;
170 } else if (!strcmp(argv[a], "-experimental-io-features")) {
171 if (a + 1 >= argc) {
172 std::cerr << "Error: no IO feature was specified after -experimental-io-features; ignoring\n";
173 } else {
174 std::stringstream ss;
175 ss.str(argv[++a]);
176 ++ffirst;
177 std::string item;
178 while (std::getline(ss, item, ',')) {
179 if (!features.Set(item)) {
180 std::cerr << "Ignoring unknown feature request: " << item << std::endl;
181 }
182 }
183 }
184 ++ffirst;
185 } else if (strcmp(argv[a], "-n") == 0) {
186 if (a + 1 >= argc) {
187 std::cerr << "Error: no maximum number of opened was provided after -n.\n";
188 } else {
189 Long_t request = strtol(argv[a + 1], 0, 10);
190 if (request < kMaxLong && request >= 0) {
191 maxopenedfiles = (Int_t)request;
192 ++a;
193 ++ffirst;
194 } else {
195 std::cerr << "Error: could not parse the max number of opened file passed after -n: " << argv[a + 1] << ". We will use the system maximum.\n";
196 }
197 }
198 ++ffirst;
199 } else if (strcmp(argv[a], "-v") == 0) {
200 if (a + 1 == argc || argv[a + 1][0] == '-') {
201 // Verbosity level was not specified use the default:
202 verbosity = 99;
203 // if (a+1 >= argc) {
204 // std::cerr << "Error: no verbosity level was provided after -v.\n";
205 } else {
206 Bool_t hasFollowupNumber = kTRUE;
207 for (char *c = argv[a + 1]; *c != '\0'; ++c) {
208 if (!isdigit(*c)) {
209 // Verbosity level was not specified use the default:
210 hasFollowupNumber = kFALSE;
211 break;
212 }
213 }
214 if (hasFollowupNumber) {
215 Long_t request = strtol(argv[a + 1], 0, 10);
216 if (request < kMaxLong && request >= 0) {
217 verbosity = (Int_t)request;
218 ++a;
219 ++ffirst;
220 } else {
221 verbosity = 99;
222 std::cerr << "Error: could not parse the verbosity level passed after -v: " << argv[a + 1] << ". We will use the default value (99).\n";
223 }
224 }
225 }
226 ++ffirst;
227 } else if (argv[a][0] == '-') {
228 bool farg = false;
229 if (force && argv[a][1] == 'f') {
230 // Bad argument
231 std::cerr << "Error: Using option " << argv[a] << " more than once is not supported.\n";
232 ++ffirst;
233 farg = true;
234 }
235 const char *prefix = "";
236 if (argv[a][1] == 'f' && argv[a][2] == 'k') {
237 farg = true;
238 force = kTRUE;
239 keepCompressionAsIs = kTRUE;
240 prefix = "k";
241 }
242 if (argv[a][1] == 'f' && argv[a][2] == 'f') {
243 farg = true;
244 force = kTRUE;
245 useFirstInputCompression = kTRUE;
246 if (argv[a][3] != '\0') {
247 std::cerr << "Error: option -ff should not have any suffix: " << argv[a] << " (suffix has been ignored)\n";
248 }
249 }
250 char ft[7];
251 for (int alg = 0; !useFirstInputCompression && alg <= 5; ++alg) {
252 for (int j = 0; j <= 9; ++j) {
253 const int comp = (alg * 100) + j;
254 snprintf(ft, 7, "-f%s%d", prefix, comp);
255 if (!strcmp(argv[a], ft)) {
256 farg = true;
257 force = kTRUE;
258 newcomp = comp;
259 break;
260 }
261 }
262 }
263 if (!farg) {
264 // Bad argument
265 std::cerr << "Error: option " << argv[a] << " is not a supported option.\n";
266 }
267 ++ffirst;
268 } else if (!outputPlace) {
269 outputPlace = a;
270 }
271 }
272
273 gSystem->Load("libTreePlayer");
274
275 const char *targetname = 0;
276 if (outputPlace) {
277 targetname = argv[outputPlace];
278 } else {
279 targetname = argv[ffirst - 1];
280 }
281
282 if (verbosity > 1) {
283 std::cout << "chadd Target file: " << targetname << std::endl;
284 }
285
286 TFileMerger fileMerger(kFALSE, kFALSE);
287 fileMerger.SetMsgPrefix("chadd");
288 fileMerger.SetPrintLevel(verbosity - 1);
289 if (maxopenedfiles > 0) {
290 fileMerger.SetMaxOpenedFiles(maxopenedfiles);
291 }
292 // The following section will collect all input filenames into a vector,
293 // including those listed within an indirect file.
294 // If any file can not be accessed, it will error out, unless skip_errors is true
295 std::vector<std::string> allSubfiles;
296 for (int a = ffirst; a < argc; ++a) {
297 if (a == outputPlace)
298 continue;
299 if (argv[a] && argv[a][0] == '@') {
300 std::ifstream indirect_file(argv[a] + 1);
301 if (!indirect_file.is_open()) {
302 std::cerr << "chadd could not open indirect file " << (argv[a] + 1) << std::endl;
303 if (!skip_errors)
304 return 1;
305 } else {
306 std::string line;
307 while (indirect_file) {
308 if (std::getline(indirect_file, line) && line.length()) {
309 if (gSystem->AccessPathName(line.c_str(), kReadPermission) == kTRUE) {
310 std::cerr << "chadd could not validate the file name \"" << line << "\" within indirect file "
311 << (argv[a] + 1) << std::endl;
312 if (!skip_errors)
313 return 1;
314 } else
315 allSubfiles.emplace_back(line);
316 }
317 }
318 }
319 } else {
320 const std::string line = argv[a];
321 if (gSystem->AccessPathName(line.c_str(), kReadPermission) == kTRUE) {
322 std::cerr << "chadd could not validate argument \"" << line << "\" as input file " << std::endl;
323 if (!skip_errors)
324 return 1;
325 } else
326 allSubfiles.emplace_back(line);
327 }
328 }
329 if (allSubfiles.empty()) {
330 std::cerr << "chadd could not find any valid input file " << std::endl;
331 return 1;
332 }
333 // The next snippet determines the output compression if unset
334 if (newcomp == -1) {
335 if (useFirstInputCompression || keepCompressionAsIs) {
336 // grab from the first file.
337 TFile *firstInput = TFile::Open(allSubfiles.front().c_str());
338 if (firstInput && !firstInput->IsZombie())
339 newcomp = firstInput->GetCompressionSettings();
340 else
341 newcomp = ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault;
342 delete firstInput;
343 fileMerger.SetMergeOptions(TString("first_source_compression"));
344 } else {
345 newcomp = ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault;
346 fileMerger.SetMergeOptions(TString("default_compression"));
347 }
348 }
349 if (verbosity > 1) {
350 if (keepCompressionAsIs && !reoptimize)
351 std::cout << "chadd compression setting for meta data: " << newcomp << '\n';
352 else
353 std::cout << "chadd compression setting for all output: " << newcomp << '\n';
354 }
355 if (append) {
356 if (!fileMerger.OutputFile(targetname, "UPDATE", newcomp)) {
357 std::cerr << "chadd error opening target file for update :" << argv[ffirst - 1] << "." << std::endl;
358 exit(2);
359 }
360 } else if (!fileMerger.OutputFile(targetname, force, newcomp)) {
361 std::cerr << "chadd error opening target file (does " << argv[ffirst - 1] << " exist?)." << std::endl;
362 if (!force)
363 std::cerr << "Pass \"-f\" argument to force re-creation of output file." << std::endl;
364 exit(1);
365 }
366
367 auto step = (allSubfiles.size() + nProcesses - 1) / nProcesses;
368 if (multiproc && step < 3) {
369 // At least 3 files per process
370 step = 3;
371 nProcesses = (allSubfiles.size() + step - 1) / step;
372 std::cout << "Each process should handle at least 3 files for efficiency.";
373 std::cout << " Setting the number of processes to: " << nProcesses << std::endl;
374 }
375 if (nProcesses == 1)
376 multiproc = kFALSE;
377
378 std::vector<std::string> partialFiles;
379
380#ifndef R__WIN32
381 // this is commented out only to try to prevent false positive detection
382 // from several anti-virus engines on Windows, and multiproc is not
383 // supported on Windows anyway
384 if (multiproc) {
385 auto uuid = TUUID();
386 auto partialTail = uuid.AsString();
387 for (auto i = 0; (i * step) < allSubfiles.size(); i++) {
388 std::stringstream buffer;
389 buffer << workingDir << "/partial" << i << "_" << partialTail << ".root";
390 partialFiles.emplace_back(buffer.str());
391 }
392 }
393#endif
394
395 auto mergeFiles = [&](TFileMerger &merger) {
396 if (reoptimize) {
397 merger.SetFastMethod(kFALSE);
398 } else {
399 if (!keepCompressionAsIs && merger.HasCompressionChange()) {
400 // Don't warn if the user explicitly requested re-optimization.
401 std::cout << "chadd Sources and Target have different compression settings\n";
402 std::cout << "chadd merging will be slower" << std::endl;
403 }
404 }
405 merger.SetNotrees(noTrees);
406 merger.SetMergeOptions(TString(merger.GetMergeOptions()) + " " + cacheSize);
407 merger.SetIOFeatures(features);
408 Bool_t status;
409 if (append)
410 status = merger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kAll);
411 else
412 status = merger.Merge();
413 return status;
414 };
415
416 auto sequentialMerge = [&](TFileMerger &merger, int start, int nFiles) {
417 for (auto i = start; i < (start + nFiles) && i < static_cast<int>(allSubfiles.size()); i++) {
418 if (!merger.AddFile(allSubfiles[i].c_str())) {
419 if (skip_errors) {
420 std::cerr << "chadd skipping file with error: " << allSubfiles[i] << std::endl;
421 } else {
422 std::cerr << "chadd exiting due to error in " << allSubfiles[i] << std::endl;
423 return kFALSE;
424 }
425 }
426 }
427 return mergeFiles(merger);
428 };
429
430 auto parallelMerge = [&](int start) {
431 TFileMerger mergerP(kFALSE, kFALSE);
432 mergerP.SetMsgPrefix("chadd");
433 mergerP.SetPrintLevel(verbosity - 1);
434 if (maxopenedfiles > 0) {
435 mergerP.SetMaxOpenedFiles(maxopenedfiles / nProcesses);
436 }
437 if (!mergerP.OutputFile(partialFiles[start / step].c_str(), newcomp)) {
438 std::cerr << "chadd error opening target partial file" << std::endl;
439 exit(1);
440 }
441 return sequentialMerge(mergerP, start, step);
442 };
443
444 auto reductionFunc = [&]() {
445 for (const auto &pf : partialFiles) {
446 fileMerger.AddFile(pf.c_str());
447 }
448 return mergeFiles(fileMerger);
449 };
450
451 Bool_t status;
452
453#ifndef R__WIN32
454 if (multiproc) {
455 ROOT::TProcessExecutor p(nProcesses);
456 auto res = p.Map(parallelMerge, ROOT::TSeqI(0, allSubfiles.size(), step));
457 status = std::accumulate(res.begin(), res.end(), 0U) == partialFiles.size();
458 if (status) {
459 status = reductionFunc();
460 } else {
461 std::cout << "chadd failed at the parallel stage" << std::endl;
462 }
463 if (!debug) {
464 for (const auto &pf : partialFiles) {
465 gSystem->Unlink(pf.c_str());
466 }
467 }
468 } else {
469 status = sequentialMerge(fileMerger, 0, allSubfiles.size());
470 }
471#else
472 status = sequentialMerge(fileMerger, 0, allSubfiles.size());
473#endif
474
475 if (status) {
476 if (verbosity == 1) {
477 std::cout << "chadd merged " << allSubfiles.size() << " (" << fileMerger.GetMergeList()->GetEntries()
478 << ") input (partial) files into " << targetname << ".\n";
479 }
480 return 0;
481 } else {
482 if (verbosity == 1) {
483 std::cout << "chadd failure during the merge of " << allSubfiles.size() << " ("
484 << fileMerger.GetMergeList()->GetEntries() << ") input (partial) files into " << targetname << ".\n";
485 }
486 return 1;
487 }
488}
int main(int argc, char **argv)
return to the guide