| #!/usr/bin/env python3 | 
 | # SPDX-License-Identifier: GPL-2.0 | 
 | # | 
 | # Run a perf script command multiple times in parallel, using perf script | 
 | # options --cpu and --time so that each job processes a different chunk | 
 | # of the data. | 
 | # | 
 | # Copyright (c) 2024, Intel Corporation. | 
 |  | 
 | import subprocess | 
 | import argparse | 
 | import pathlib | 
 | import shlex | 
 | import time | 
 | import copy | 
 | import sys | 
 | import os | 
 | import re | 
 |  | 
 | glb_prog_name = "parallel-perf.py" | 
 | glb_min_interval = 10.0 | 
 | glb_min_samples = 64 | 
 |  | 
 | class Verbosity(): | 
 |  | 
 | 	def __init__(self, quiet=False, verbose=False, debug=False): | 
 | 		self.normal    = True | 
 | 		self.verbose   = verbose | 
 | 		self.debug     = debug | 
 | 		self.self_test = True | 
 | 		if self.debug: | 
 | 			self.verbose = True | 
 | 		if self.verbose: | 
 | 			quiet = False | 
 | 		if quiet: | 
 | 			self.normal = False | 
 |  | 
 | # Manage work (Start/Wait/Kill), as represented by a subprocess.Popen command | 
 | class Work(): | 
 |  | 
 | 	def __init__(self, cmd, pipe_to, output_dir="."): | 
 | 		self.popen = None | 
 | 		self.consumer = None | 
 | 		self.cmd = cmd | 
 | 		self.pipe_to = pipe_to | 
 | 		self.output_dir = output_dir | 
 | 		self.cmdout_name = f"{output_dir}/cmd.txt" | 
 | 		self.stdout_name = f"{output_dir}/out.txt" | 
 | 		self.stderr_name = f"{output_dir}/err.txt" | 
 |  | 
 | 	def Command(self): | 
 | 		sh_cmd = [ shlex.quote(x) for x in self.cmd ] | 
 | 		return " ".join(self.cmd) | 
 |  | 
 | 	def Stdout(self): | 
 | 		return open(self.stdout_name, "w") | 
 |  | 
 | 	def Stderr(self): | 
 | 		return open(self.stderr_name, "w") | 
 |  | 
 | 	def CreateOutputDir(self): | 
 | 		pathlib.Path(self.output_dir).mkdir(parents=True, exist_ok=True) | 
 |  | 
 | 	def Start(self): | 
 | 		if self.popen: | 
 | 			return | 
 | 		self.CreateOutputDir() | 
 | 		with open(self.cmdout_name, "w") as f: | 
 | 			f.write(self.Command()) | 
 | 			f.write("\n") | 
 | 		stdout = self.Stdout() | 
 | 		stderr = self.Stderr() | 
 | 		if self.pipe_to: | 
 | 			self.popen = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=stderr) | 
 | 			args = shlex.split(self.pipe_to) | 
 | 			self.consumer = subprocess.Popen(args, stdin=self.popen.stdout, stdout=stdout, stderr=stderr) | 
 | 		else: | 
 | 			self.popen = subprocess.Popen(self.cmd, stdout=stdout, stderr=stderr) | 
 |  | 
 | 	def RemoveEmptyErrFile(self): | 
 | 		if os.path.exists(self.stderr_name): | 
 | 			if os.path.getsize(self.stderr_name) == 0: | 
 | 				os.unlink(self.stderr_name) | 
 |  | 
 | 	def Errors(self): | 
 | 		if os.path.exists(self.stderr_name): | 
 | 			if os.path.getsize(self.stderr_name) != 0: | 
 | 				return [ f"Non-empty error file {self.stderr_name}" ] | 
 | 		return [] | 
 |  | 
 | 	def TidyUp(self): | 
 | 		self.RemoveEmptyErrFile() | 
 |  | 
 | 	def RawPollWait(self, p, wait): | 
 | 		if wait: | 
 | 			return p.wait() | 
 | 		return p.poll() | 
 |  | 
 | 	def Poll(self, wait=False): | 
 | 		if not self.popen: | 
 | 			return None | 
 | 		result = self.RawPollWait(self.popen, wait) | 
 | 		if self.consumer: | 
 | 			res = result | 
 | 			result = self.RawPollWait(self.consumer, wait) | 
 | 			if result != None and res == None: | 
 | 				self.popen.kill() | 
 | 				result = None | 
 | 			elif result == 0 and res != None and res != 0: | 
 | 				result = res | 
 | 		if result != None: | 
 | 			self.TidyUp() | 
 | 		return result | 
 |  | 
 | 	def Wait(self): | 
 | 		return self.Poll(wait=True) | 
 |  | 
 | 	def Kill(self): | 
 | 		if not self.popen: | 
 | 			return | 
 | 		self.popen.kill() | 
 | 		if self.consumer: | 
 | 			self.consumer.kill() | 
 |  | 
 | def KillWork(worklist, verbosity): | 
 | 	for w in worklist: | 
 | 		w.Kill() | 
 | 	for w in worklist: | 
 | 		w.Wait() | 
 |  | 
 | def NumberOfCPUs(): | 
 | 	return os.sysconf("SC_NPROCESSORS_ONLN") | 
 |  | 
 | def NanoSecsToSecsStr(x): | 
 | 	if x == None: | 
 | 		return "" | 
 | 	x = str(x) | 
 | 	if len(x) < 10: | 
 | 		x = "0" * (10 - len(x)) + x | 
 | 	return x[:len(x) - 9] + "." + x[-9:] | 
 |  | 
 | def InsertOptionAfter(cmd, option, after): | 
 | 	try: | 
 | 		pos = cmd.index(after) | 
 | 		cmd.insert(pos + 1, option) | 
 | 	except: | 
 | 		cmd.append(option) | 
 |  | 
 | def CreateWorkList(cmd, pipe_to, output_dir, cpus, time_ranges_by_cpu): | 
 | 	max_len = len(str(cpus[-1])) | 
 | 	cpu_dir_fmt = f"cpu-%.{max_len}u" | 
 | 	worklist = [] | 
 | 	pos = 0 | 
 | 	for cpu in cpus: | 
 | 		if cpu >= 0: | 
 | 			cpu_dir = os.path.join(output_dir, cpu_dir_fmt % cpu) | 
 | 			cpu_option = f"--cpu={cpu}" | 
 | 		else: | 
 | 			cpu_dir = output_dir | 
 | 			cpu_option = None | 
 |  | 
 | 		tr_dir_fmt = "time-range" | 
 |  | 
 | 		if len(time_ranges_by_cpu) > 1: | 
 | 			time_ranges = time_ranges_by_cpu[pos] | 
 | 			tr_dir_fmt += f"-{pos}" | 
 | 			pos += 1 | 
 | 		else: | 
 | 			time_ranges = time_ranges_by_cpu[0] | 
 |  | 
 | 		max_len = len(str(len(time_ranges))) | 
 | 		tr_dir_fmt += f"-%.{max_len}u" | 
 |  | 
 | 		i = 0 | 
 | 		for r in time_ranges: | 
 | 			if r == [None, None]: | 
 | 				time_option = None | 
 | 				work_output_dir = cpu_dir | 
 | 			else: | 
 | 				time_option = "--time=" + NanoSecsToSecsStr(r[0]) + "," + NanoSecsToSecsStr(r[1]) | 
 | 				work_output_dir = os.path.join(cpu_dir, tr_dir_fmt % i) | 
 | 				i += 1 | 
 | 			work_cmd = list(cmd) | 
 | 			if time_option != None: | 
 | 				InsertOptionAfter(work_cmd, time_option, "script") | 
 | 			if cpu_option != None: | 
 | 				InsertOptionAfter(work_cmd, cpu_option, "script") | 
 | 			w = Work(work_cmd, pipe_to, work_output_dir) | 
 | 			worklist.append(w) | 
 | 	return worklist | 
 |  | 
 | def DoRunWork(worklist, nr_jobs, verbosity): | 
 | 	nr_to_do = len(worklist) | 
 | 	not_started = list(worklist) | 
 | 	running = [] | 
 | 	done = [] | 
 | 	chg = False | 
 | 	while True: | 
 | 		nr_done = len(done) | 
 | 		if chg and verbosity.normal: | 
 | 			nr_run = len(running) | 
 | 			print(f"\rThere are {nr_to_do} jobs: {nr_done} completed, {nr_run} running", flush=True, end=" ") | 
 | 			if verbosity.verbose: | 
 | 				print() | 
 | 			chg = False | 
 | 		if nr_done == nr_to_do: | 
 | 			break | 
 | 		while len(running) < nr_jobs and len(not_started): | 
 | 			w = not_started.pop(0) | 
 | 			running.append(w) | 
 | 			if verbosity.verbose: | 
 | 				print("Starting:", w.Command()) | 
 | 			w.Start() | 
 | 			chg = True | 
 | 		if len(running): | 
 | 			time.sleep(0.1) | 
 | 		finished = [] | 
 | 		not_finished = [] | 
 | 		while len(running): | 
 | 			w = running.pop(0) | 
 | 			r = w.Poll() | 
 | 			if r == None: | 
 | 				not_finished.append(w) | 
 | 				continue | 
 | 			if r == 0: | 
 | 				if verbosity.verbose: | 
 | 					print("Finished:", w.Command()) | 
 | 				finished.append(w) | 
 | 				chg = True | 
 | 				continue | 
 | 			if verbosity.normal and not verbosity.verbose: | 
 | 				print() | 
 | 			print("Job failed!\n    return code:", r, "\n    command:    ", w.Command()) | 
 | 			if w.pipe_to: | 
 | 				print("    piped to:   ", w.pipe_to) | 
 | 			print("Killing outstanding jobs") | 
 | 			KillWork(not_finished, verbosity) | 
 | 			KillWork(running, verbosity) | 
 | 			return False | 
 | 		running = not_finished | 
 | 		done += finished | 
 | 	errorlist = [] | 
 | 	for w in worklist: | 
 | 		errorlist += w.Errors() | 
 | 	if len(errorlist): | 
 | 		print("Errors:") | 
 | 		for e in errorlist: | 
 | 			print(e) | 
 | 	elif verbosity.normal: | 
 | 		print("\r"," "*50, "\rAll jobs finished successfully", flush=True) | 
 | 	return True | 
 |  | 
 | def RunWork(worklist, nr_jobs=NumberOfCPUs(), verbosity=Verbosity()): | 
 | 	try: | 
 | 		return DoRunWork(worklist, nr_jobs, verbosity) | 
 | 	except: | 
 | 		for w in worklist: | 
 | 			w.Kill() | 
 | 		raise | 
 | 	return True | 
 |  | 
 | def ReadHeader(perf, file_name): | 
 | 	return subprocess.Popen([perf, "script", "--header-only", "--input", file_name], stdout=subprocess.PIPE).stdout.read().decode("utf-8") | 
 |  | 
 | def ParseHeader(hdr): | 
 | 	result = {} | 
 | 	lines = hdr.split("\n") | 
 | 	for line in lines: | 
 | 		if ":" in line and line[0] == "#": | 
 | 			pos = line.index(":") | 
 | 			name = line[1:pos-1].strip() | 
 | 			value = line[pos+1:].strip() | 
 | 			if name in result: | 
 | 				orig_name = name | 
 | 				nr = 2 | 
 | 				while True: | 
 | 					name = f"{orig_name} {nr}" | 
 | 					if name not in result: | 
 | 						break | 
 | 					nr += 1 | 
 | 			result[name] = value | 
 | 	return result | 
 |  | 
 | def HeaderField(hdr_dict, hdr_fld): | 
 | 	if hdr_fld not in hdr_dict: | 
 | 		raise Exception(f"'{hdr_fld}' missing from header information") | 
 | 	return hdr_dict[hdr_fld] | 
 |  | 
 | # Represent the position of an option within a command string | 
 | # and provide the option value and/or remove the option | 
 | class OptPos(): | 
 |  | 
 | 	def Init(self, opt_element=-1, value_element=-1, opt_pos=-1, value_pos=-1, error=None): | 
 | 		self.opt_element = opt_element		# list element that contains option | 
 | 		self.value_element = value_element	# list element that contains option value | 
 | 		self.opt_pos = opt_pos			# string position of option | 
 | 		self.value_pos = value_pos		# string position of value | 
 | 		self.error = error			# error message string | 
 |  | 
 | 	def __init__(self, args, short_name, long_name, default=None): | 
 | 		self.args = list(args) | 
 | 		self.default = default | 
 | 		n = 2 + len(long_name) | 
 | 		m = len(short_name) | 
 | 		pos = -1 | 
 | 		for opt in args: | 
 | 			pos += 1 | 
 | 			if m and opt[:2] == f"-{short_name}": | 
 | 				if len(opt) == 2: | 
 | 					if pos + 1 < len(args): | 
 | 						self.Init(pos, pos + 1, 0, 0) | 
 | 					else: | 
 | 						self.Init(error = f"-{short_name} option missing value") | 
 | 				else: | 
 | 					self.Init(pos, pos, 0, 2) | 
 | 				return | 
 | 			if opt[:n] == f"--{long_name}": | 
 | 				if len(opt) == n: | 
 | 					if pos + 1 < len(args): | 
 | 						self.Init(pos, pos + 1, 0, 0) | 
 | 					else: | 
 | 						self.Init(error = f"--{long_name} option missing value") | 
 | 				elif opt[n] == "=": | 
 | 					self.Init(pos, pos, 0, n + 1) | 
 | 				else: | 
 | 					self.Init(error = f"--{long_name} option expected '='") | 
 | 				return | 
 | 			if m and opt[:1] == "-" and opt[:2] != "--" and short_name in opt: | 
 | 				ipos = opt.index(short_name) | 
 | 				if "-" in opt[1:]: | 
 | 					hpos = opt[1:].index("-") | 
 | 					if hpos < ipos: | 
 | 						continue | 
 | 				if ipos + 1 == len(opt): | 
 | 					if pos + 1 < len(args): | 
 | 						self.Init(pos, pos + 1, ipos, 0) | 
 | 					else: | 
 | 						self.Init(error = f"-{short_name} option missing value") | 
 | 				else: | 
 | 					self.Init(pos, pos, ipos, ipos + 1) | 
 | 				return | 
 | 		self.Init() | 
 |  | 
 | 	def Value(self): | 
 | 		if self.opt_element >= 0: | 
 | 			if self.opt_element != self.value_element: | 
 | 				return self.args[self.value_element] | 
 | 			else: | 
 | 				return self.args[self.value_element][self.value_pos:] | 
 | 		return self.default | 
 |  | 
 | 	def Remove(self, args): | 
 | 		if self.opt_element == -1: | 
 | 			return | 
 | 		if self.opt_element != self.value_element: | 
 | 			del args[self.value_element] | 
 | 		if self.opt_pos: | 
 | 			args[self.opt_element] = args[self.opt_element][:self.opt_pos] | 
 | 		else: | 
 | 			del args[self.opt_element] | 
 |  | 
 | def DetermineInputFileName(cmd): | 
 | 	p = OptPos(cmd, "i", "input", "perf.data") | 
 | 	if p.error: | 
 | 		raise Exception(f"perf command {p.error}") | 
 | 	file_name = p.Value() | 
 | 	if not os.path.exists(file_name): | 
 | 		raise Exception(f"perf command input file '{file_name}' not found") | 
 | 	return file_name | 
 |  | 
 | def ReadOption(args, short_name, long_name, err_prefix, remove=False): | 
 | 	p = OptPos(args, short_name, long_name) | 
 | 	if p.error: | 
 | 		raise Exception(f"{err_prefix}{p.error}") | 
 | 	value = p.Value() | 
 | 	if remove: | 
 | 		p.Remove(args) | 
 | 	return value | 
 |  | 
 | def ExtractOption(args, short_name, long_name, err_prefix): | 
 | 	return ReadOption(args, short_name, long_name, err_prefix, True) | 
 |  | 
 | def ReadPerfOption(args, short_name, long_name): | 
 | 	return ReadOption(args, short_name, long_name, "perf command ") | 
 |  | 
 | def ExtractPerfOption(args, short_name, long_name): | 
 | 	return ExtractOption(args, short_name, long_name, "perf command ") | 
 |  | 
 | def PerfDoubleQuickCommands(cmd, file_name): | 
 | 	cpu_str = ReadPerfOption(cmd, "C", "cpu") | 
 | 	time_str = ReadPerfOption(cmd, "", "time") | 
 | 	# Use double-quick sampling to determine trace data density | 
 | 	times_cmd = ["perf", "script", "--ns", "--input", file_name, "--itrace=qqi"] | 
 | 	if cpu_str != None and cpu_str != "": | 
 | 		times_cmd.append(f"--cpu={cpu_str}") | 
 | 	if time_str != None and time_str != "": | 
 | 		times_cmd.append(f"--time={time_str}") | 
 | 	cnts_cmd = list(times_cmd) | 
 | 	cnts_cmd.append("-Fcpu") | 
 | 	times_cmd.append("-Fcpu,time") | 
 | 	return cnts_cmd, times_cmd | 
 |  | 
 | class CPUTimeRange(): | 
 | 	def __init__(self, cpu): | 
 | 		self.cpu = cpu | 
 | 		self.sample_cnt = 0 | 
 | 		self.time_ranges = None | 
 | 		self.interval = 0 | 
 | 		self.interval_remaining = 0 | 
 | 		self.remaining = 0 | 
 | 		self.tr_pos = 0 | 
 |  | 
 | def CalcTimeRangesByCPU(line, cpu, cpu_time_ranges, max_time): | 
 | 	cpu_time_range = cpu_time_ranges[cpu] | 
 | 	cpu_time_range.remaining -= 1 | 
 | 	cpu_time_range.interval_remaining -= 1 | 
 | 	if cpu_time_range.remaining == 0: | 
 | 		cpu_time_range.time_ranges[cpu_time_range.tr_pos][1] = max_time | 
 | 		return | 
 | 	if cpu_time_range.interval_remaining == 0: | 
 | 		time = TimeVal(line[1][:-1], 0) | 
 | 		time_ranges = cpu_time_range.time_ranges | 
 | 		time_ranges[cpu_time_range.tr_pos][1] = time - 1 | 
 | 		time_ranges.append([time, max_time]) | 
 | 		cpu_time_range.tr_pos += 1 | 
 | 		cpu_time_range.interval_remaining = cpu_time_range.interval | 
 |  | 
 | def CountSamplesByCPU(line, cpu, cpu_time_ranges): | 
 | 	try: | 
 | 		cpu_time_ranges[cpu].sample_cnt += 1 | 
 | 	except: | 
 | 		print("exception") | 
 | 		print("cpu", cpu) | 
 | 		print("len(cpu_time_ranges)", len(cpu_time_ranges)) | 
 | 		raise | 
 |  | 
 | def ProcessCommandOutputLines(cmd, per_cpu, fn, *x): | 
 | 	# Assume CPU number is at beginning of line and enclosed by [] | 
 | 	pat = re.compile(r"\s*\[[0-9]+\]") | 
 | 	p = subprocess.Popen(cmd, stdout=subprocess.PIPE) | 
 | 	while True: | 
 | 		line = p.stdout.readline() | 
 | 		if line: | 
 | 			line = line.decode("utf-8") | 
 | 			if pat.match(line): | 
 | 				line = line.split() | 
 | 				if per_cpu: | 
 | 					# Assumes CPU number is enclosed by [] | 
 | 					cpu = int(line[0][1:-1]) | 
 | 				else: | 
 | 					cpu = 0 | 
 | 				fn(line, cpu, *x) | 
 | 		else: | 
 | 			break | 
 | 	p.wait() | 
 |  | 
 | def IntersectTimeRanges(new_time_ranges, time_ranges): | 
 | 	pos = 0 | 
 | 	new_pos = 0 | 
 | 	# Can assume len(time_ranges) != 0 and len(new_time_ranges) != 0 | 
 | 	# Note also, there *must* be at least one intersection. | 
 | 	while pos < len(time_ranges) and new_pos < len(new_time_ranges): | 
 | 		# new end < old start => no intersection, remove new | 
 | 		if new_time_ranges[new_pos][1] < time_ranges[pos][0]: | 
 | 			del new_time_ranges[new_pos] | 
 | 			continue | 
 | 		# new start > old end => no intersection, check next | 
 | 		if new_time_ranges[new_pos][0] > time_ranges[pos][1]: | 
 | 			pos += 1 | 
 | 			if pos < len(time_ranges): | 
 | 				continue | 
 | 			# no next, so remove remaining | 
 | 			while new_pos < len(new_time_ranges): | 
 | 				del new_time_ranges[new_pos] | 
 | 			return | 
 | 		# Found an intersection | 
 | 		# new start < old start => adjust new start = old start | 
 | 		if new_time_ranges[new_pos][0] < time_ranges[pos][0]: | 
 | 			new_time_ranges[new_pos][0] = time_ranges[pos][0] | 
 | 		# new end > old end => keep the overlap, insert the remainder | 
 | 		if new_time_ranges[new_pos][1] > time_ranges[pos][1]: | 
 | 			r = [ time_ranges[pos][1] + 1, new_time_ranges[new_pos][1] ] | 
 | 			new_time_ranges[new_pos][1] = time_ranges[pos][1] | 
 | 			new_pos += 1 | 
 | 			new_time_ranges.insert(new_pos, r) | 
 | 			continue | 
 | 		# new [start, end] is within old [start, end] | 
 | 		new_pos += 1 | 
 |  | 
 | def SplitTimeRangesByTraceDataDensity(time_ranges, cpus, nr, cmd, file_name, per_cpu, min_size, min_interval, verbosity): | 
 | 	if verbosity.normal: | 
 | 		print("\rAnalyzing...", flush=True, end=" ") | 
 | 		if verbosity.verbose: | 
 | 			print() | 
 | 	cnts_cmd, times_cmd = PerfDoubleQuickCommands(cmd, file_name) | 
 |  | 
 | 	nr_cpus = cpus[-1] + 1 if per_cpu else 1 | 
 | 	if per_cpu: | 
 | 		nr_cpus = cpus[-1] + 1 | 
 | 		cpu_time_ranges = [ CPUTimeRange(cpu) for cpu in range(nr_cpus) ] | 
 | 	else: | 
 | 		nr_cpus = 1 | 
 | 		cpu_time_ranges = [ CPUTimeRange(-1) ] | 
 |  | 
 | 	if verbosity.debug: | 
 | 		print("nr_cpus", nr_cpus) | 
 | 		print("cnts_cmd", cnts_cmd) | 
 | 		print("times_cmd", times_cmd) | 
 |  | 
 | 	# Count the number of "double quick" samples per CPU | 
 | 	ProcessCommandOutputLines(cnts_cmd, per_cpu, CountSamplesByCPU, cpu_time_ranges) | 
 |  | 
 | 	tot = 0 | 
 | 	mx = 0 | 
 | 	for cpu_time_range in cpu_time_ranges: | 
 | 		cnt = cpu_time_range.sample_cnt | 
 | 		tot += cnt | 
 | 		if cnt > mx: | 
 | 			mx = cnt | 
 | 		if verbosity.debug: | 
 | 			print("cpu:", cpu_time_range.cpu, "sample_cnt", cnt) | 
 |  | 
 | 	if min_size < 1: | 
 | 		min_size = 1 | 
 |  | 
 | 	if mx < min_size: | 
 | 		# Too little data to be worth splitting | 
 | 		if verbosity.debug: | 
 | 			print("Too little data to split by time") | 
 | 		if nr == 0: | 
 | 			nr = 1 | 
 | 		return [ SplitTimeRangesIntoN(time_ranges, nr, min_interval) ] | 
 |  | 
 | 	if nr: | 
 | 		divisor = nr | 
 | 		min_size = 1 | 
 | 	else: | 
 | 		divisor = NumberOfCPUs() | 
 |  | 
 | 	interval = int(round(tot / divisor, 0)) | 
 | 	if interval < min_size: | 
 | 		interval = min_size | 
 |  | 
 | 	if verbosity.debug: | 
 | 		print("divisor", divisor) | 
 | 		print("min_size", min_size) | 
 | 		print("interval", interval) | 
 |  | 
 | 	min_time = time_ranges[0][0] | 
 | 	max_time = time_ranges[-1][1] | 
 |  | 
 | 	for cpu_time_range in cpu_time_ranges: | 
 | 		cnt = cpu_time_range.sample_cnt | 
 | 		if cnt == 0: | 
 | 			cpu_time_range.time_ranges = copy.deepcopy(time_ranges) | 
 | 			continue | 
 | 		# Adjust target interval for CPU to give approximately equal interval sizes | 
 | 		# Determine number of intervals, rounding to nearest integer | 
 | 		n = int(round(cnt / interval, 0)) | 
 | 		if n < 1: | 
 | 			n = 1 | 
 | 		# Determine interval size, rounding up | 
 | 		d, m = divmod(cnt, n) | 
 | 		if m: | 
 | 			d += 1 | 
 | 		cpu_time_range.interval = d | 
 | 		cpu_time_range.interval_remaining = d | 
 | 		cpu_time_range.remaining = cnt | 
 | 		# Init. time ranges for each CPU with the start time | 
 | 		cpu_time_range.time_ranges = [ [min_time, max_time] ] | 
 |  | 
 | 	# Set time ranges so that the same number of "double quick" samples | 
 | 	# will fall into each time range. | 
 | 	ProcessCommandOutputLines(times_cmd, per_cpu, CalcTimeRangesByCPU, cpu_time_ranges, max_time) | 
 |  | 
 | 	for cpu_time_range in cpu_time_ranges: | 
 | 		if cpu_time_range.sample_cnt: | 
 | 			IntersectTimeRanges(cpu_time_range.time_ranges, time_ranges) | 
 |  | 
 | 	return [cpu_time_ranges[cpu].time_ranges for cpu in cpus] | 
 |  | 
 | def SplitSingleTimeRangeIntoN(time_range, n): | 
 | 	if n <= 1: | 
 | 		return [time_range] | 
 | 	start = time_range[0] | 
 | 	end   = time_range[1] | 
 | 	duration = int((end - start + 1) / n) | 
 | 	if duration < 1: | 
 | 		return [time_range] | 
 | 	time_ranges = [] | 
 | 	for i in range(n): | 
 | 		time_ranges.append([start, start + duration - 1]) | 
 | 		start += duration | 
 | 	time_ranges[-1][1] = end | 
 | 	return time_ranges | 
 |  | 
 | def TimeRangeDuration(r): | 
 | 	return r[1] - r[0] + 1 | 
 |  | 
 | def TotalDuration(time_ranges): | 
 | 	duration = 0 | 
 | 	for r in time_ranges: | 
 | 		duration += TimeRangeDuration(r) | 
 | 	return duration | 
 |  | 
 | def SplitTimeRangesByInterval(time_ranges, interval): | 
 | 	new_ranges = [] | 
 | 	for r in time_ranges: | 
 | 		duration = TimeRangeDuration(r) | 
 | 		n = duration / interval | 
 | 		n = int(round(n, 0)) | 
 | 		new_ranges += SplitSingleTimeRangeIntoN(r, n) | 
 | 	return new_ranges | 
 |  | 
 | def SplitTimeRangesIntoN(time_ranges, n, min_interval): | 
 | 	if n <= len(time_ranges): | 
 | 		return time_ranges | 
 | 	duration = TotalDuration(time_ranges) | 
 | 	interval = duration / n | 
 | 	if interval < min_interval: | 
 | 		interval = min_interval | 
 | 	return SplitTimeRangesByInterval(time_ranges, interval) | 
 |  | 
 | def RecombineTimeRanges(tr): | 
 | 	new_tr = copy.deepcopy(tr) | 
 | 	n = len(new_tr) | 
 | 	i = 1 | 
 | 	while i < len(new_tr): | 
 | 		# if prev end + 1 == cur start, combine them | 
 | 		if new_tr[i - 1][1] + 1 == new_tr[i][0]: | 
 | 			new_tr[i][0] = new_tr[i - 1][0] | 
 | 			del new_tr[i - 1] | 
 | 		else: | 
 | 			i += 1 | 
 | 	return new_tr | 
 |  | 
 | def OpenTimeRangeEnds(time_ranges, min_time, max_time): | 
 | 	if time_ranges[0][0] <= min_time: | 
 | 		time_ranges[0][0] = None | 
 | 	if time_ranges[-1][1] >= max_time: | 
 | 		time_ranges[-1][1] = None | 
 |  | 
 | def BadTimeStr(time_str): | 
 | 	raise Exception(f"perf command bad time option: '{time_str}'\nCheck also 'time of first sample' and 'time of last sample' in perf script --header-only") | 
 |  | 
 | def ValidateTimeRanges(time_ranges, time_str): | 
 | 	n = len(time_ranges) | 
 | 	for i in range(n): | 
 | 		start = time_ranges[i][0] | 
 | 		end   = time_ranges[i][1] | 
 | 		if i != 0 and start <= time_ranges[i - 1][1]: | 
 | 			BadTimeStr(time_str) | 
 | 		if start > end: | 
 | 			BadTimeStr(time_str) | 
 |  | 
 | def TimeVal(s, dflt): | 
 | 	s = s.strip() | 
 | 	if s == "": | 
 | 		return dflt | 
 | 	a = s.split(".") | 
 | 	if len(a) > 2: | 
 | 		raise Exception(f"Bad time value'{s}'") | 
 | 	x = int(a[0]) | 
 | 	if x < 0: | 
 | 		raise Exception("Negative time not allowed") | 
 | 	x *= 1000000000 | 
 | 	if len(a) > 1: | 
 | 		x += int((a[1] + "000000000")[:9]) | 
 | 	return x | 
 |  | 
 | def BadCPUStr(cpu_str): | 
 | 	raise Exception(f"perf command bad cpu option: '{cpu_str}'\nCheck also 'nrcpus avail' in perf script --header-only") | 
 |  | 
 | def ParseTimeStr(time_str, min_time, max_time): | 
 | 	if time_str == None or time_str == "": | 
 | 		return [[min_time, max_time]] | 
 | 	time_ranges = [] | 
 | 	for r in time_str.split(): | 
 | 		a = r.split(",") | 
 | 		if len(a) != 2: | 
 | 			BadTimeStr(time_str) | 
 | 		try: | 
 | 			start = TimeVal(a[0], min_time) | 
 | 			end   = TimeVal(a[1], max_time) | 
 | 		except: | 
 | 			BadTimeStr(time_str) | 
 | 		time_ranges.append([start, end]) | 
 | 	ValidateTimeRanges(time_ranges, time_str) | 
 | 	return time_ranges | 
 |  | 
 | def ParseCPUStr(cpu_str, nr_cpus): | 
 | 	if cpu_str == None or cpu_str == "": | 
 | 		return [-1] | 
 | 	cpus = [] | 
 | 	for r in cpu_str.split(","): | 
 | 		a = r.split("-") | 
 | 		if len(a) < 1 or len(a) > 2: | 
 | 			BadCPUStr(cpu_str) | 
 | 		try: | 
 | 			start = int(a[0].strip()) | 
 | 			if len(a) > 1: | 
 | 				end = int(a[1].strip()) | 
 | 			else: | 
 | 				end = start | 
 | 		except: | 
 | 			BadCPUStr(cpu_str) | 
 | 		if start < 0 or end < 0 or end < start or end >= nr_cpus: | 
 | 			BadCPUStr(cpu_str) | 
 | 		cpus.extend(range(start, end + 1)) | 
 | 	cpus = list(set(cpus)) # Remove duplicates | 
 | 	cpus.sort() | 
 | 	return cpus | 
 |  | 
 | class ParallelPerf(): | 
 |  | 
 | 	def __init__(self, a): | 
 | 		for arg_name in vars(a): | 
 | 			setattr(self, arg_name, getattr(a, arg_name)) | 
 | 		self.orig_nr = self.nr | 
 | 		self.orig_cmd = list(self.cmd) | 
 | 		self.perf = self.cmd[0] | 
 | 		if os.path.exists(self.output_dir): | 
 | 			raise Exception(f"Output '{self.output_dir}' already exists") | 
 | 		if self.jobs < 0 or self.nr < 0 or self.interval < 0: | 
 | 			raise Exception("Bad options (negative values): try -h option for help") | 
 | 		if self.nr != 0 and self.interval != 0: | 
 | 			raise Exception("Cannot specify number of time subdivisions and time interval") | 
 | 		if self.jobs == 0: | 
 | 			self.jobs = NumberOfCPUs() | 
 | 		if self.nr == 0 and self.interval == 0: | 
 | 			if self.per_cpu: | 
 | 				self.nr = 1 | 
 | 			else: | 
 | 				self.nr = self.jobs | 
 |  | 
 | 	def Init(self): | 
 | 		if self.verbosity.debug: | 
 | 			print("cmd", self.cmd) | 
 | 		self.file_name = DetermineInputFileName(self.cmd) | 
 | 		self.hdr = ReadHeader(self.perf, self.file_name) | 
 | 		self.hdr_dict = ParseHeader(self.hdr) | 
 | 		self.cmd_line = HeaderField(self.hdr_dict, "cmdline") | 
 |  | 
 | 	def ExtractTimeInfo(self): | 
 | 		self.min_time = TimeVal(HeaderField(self.hdr_dict, "time of first sample"), 0) | 
 | 		self.max_time = TimeVal(HeaderField(self.hdr_dict, "time of last sample"), 0) | 
 | 		self.time_str = ExtractPerfOption(self.cmd, "", "time") | 
 | 		self.time_ranges = ParseTimeStr(self.time_str, self.min_time, self.max_time) | 
 | 		if self.verbosity.debug: | 
 | 			print("time_ranges", self.time_ranges) | 
 |  | 
 | 	def ExtractCPUInfo(self): | 
 | 		if self.per_cpu: | 
 | 			nr_cpus = int(HeaderField(self.hdr_dict, "nrcpus avail")) | 
 | 			self.cpu_str = ExtractPerfOption(self.cmd, "C", "cpu") | 
 | 			if self.cpu_str == None or self.cpu_str == "": | 
 | 				self.cpus = [ x for x in range(nr_cpus) ] | 
 | 			else: | 
 | 				self.cpus = ParseCPUStr(self.cpu_str, nr_cpus) | 
 | 		else: | 
 | 			self.cpu_str = None | 
 | 			self.cpus = [-1] | 
 | 		if self.verbosity.debug: | 
 | 			print("cpus", self.cpus) | 
 |  | 
 | 	def IsIntelPT(self): | 
 | 		return self.cmd_line.find("intel_pt") >= 0 | 
 |  | 
 | 	def SplitTimeRanges(self): | 
 | 		if self.IsIntelPT() and self.interval == 0: | 
 | 			self.split_time_ranges_for_each_cpu = \ | 
 | 				SplitTimeRangesByTraceDataDensity(self.time_ranges, self.cpus, self.orig_nr, | 
 | 								  self.orig_cmd, self.file_name, self.per_cpu, | 
 | 								  self.min_size, self.min_interval, self.verbosity) | 
 | 		elif self.nr: | 
 | 			self.split_time_ranges_for_each_cpu = [ SplitTimeRangesIntoN(self.time_ranges, self.nr, self.min_interval) ] | 
 | 		else: | 
 | 			self.split_time_ranges_for_each_cpu = [ SplitTimeRangesByInterval(self.time_ranges, self.interval) ] | 
 |  | 
 | 	def CheckTimeRanges(self): | 
 | 		for tr in self.split_time_ranges_for_each_cpu: | 
 | 			# Re-combined time ranges should be the same | 
 | 			new_tr = RecombineTimeRanges(tr) | 
 | 			if new_tr != self.time_ranges: | 
 | 				if self.verbosity.debug: | 
 | 					print("tr", tr) | 
 | 					print("new_tr", new_tr) | 
 | 				raise Exception("Self test failed!") | 
 |  | 
 | 	def OpenTimeRangeEnds(self): | 
 | 		for time_ranges in self.split_time_ranges_for_each_cpu: | 
 | 			OpenTimeRangeEnds(time_ranges, self.min_time, self.max_time) | 
 |  | 
 | 	def CreateWorkList(self): | 
 | 		self.worklist = CreateWorkList(self.cmd, self.pipe_to, self.output_dir, self.cpus, self.split_time_ranges_for_each_cpu) | 
 |  | 
 | 	def PerfDataRecordedPerCPU(self): | 
 | 		if "--per-thread" in self.cmd_line.split(): | 
 | 			return False | 
 | 		return True | 
 |  | 
 | 	def DefaultToPerCPU(self): | 
 | 		# --no-per-cpu option takes precedence | 
 | 		if self.no_per_cpu: | 
 | 			return False | 
 | 		if not self.PerfDataRecordedPerCPU(): | 
 | 			return False | 
 | 		# Default to per-cpu for Intel PT data that was recorded per-cpu, | 
 | 		# because decoding can be done for each CPU separately. | 
 | 		if self.IsIntelPT(): | 
 | 			return True | 
 | 		return False | 
 |  | 
 | 	def Config(self): | 
 | 		self.Init() | 
 | 		self.ExtractTimeInfo() | 
 | 		if not self.per_cpu: | 
 | 			self.per_cpu = self.DefaultToPerCPU() | 
 | 		if self.verbosity.debug: | 
 | 			print("per_cpu", self.per_cpu) | 
 | 		self.ExtractCPUInfo() | 
 | 		self.SplitTimeRanges() | 
 | 		if self.verbosity.self_test: | 
 | 			self.CheckTimeRanges() | 
 | 		# Prefer open-ended time range to starting / ending with min_time / max_time resp. | 
 | 		self.OpenTimeRangeEnds() | 
 | 		self.CreateWorkList() | 
 |  | 
 | 	def Run(self): | 
 | 		if self.dry_run: | 
 | 			print(len(self.worklist),"jobs:") | 
 | 			for w in self.worklist: | 
 | 				print(w.Command()) | 
 | 			return True | 
 | 		result = RunWork(self.worklist, self.jobs, verbosity=self.verbosity) | 
 | 		if self.verbosity.verbose: | 
 | 			print(glb_prog_name, "done") | 
 | 		return result | 
 |  | 
 | def RunParallelPerf(a): | 
 | 	pp = ParallelPerf(a) | 
 | 	pp.Config() | 
 | 	return pp.Run() | 
 |  | 
 | def Main(args): | 
 | 	ap = argparse.ArgumentParser( | 
 | 		prog=glb_prog_name, formatter_class = argparse.RawDescriptionHelpFormatter, | 
 | 		description = | 
 | """ | 
 | Run a perf script command multiple times in parallel, using perf script options | 
 | --cpu and --time so that each job processes a different chunk of the data. | 
 | """, | 
 | 		epilog = | 
 | """ | 
 | Follow the options by '--' and then the perf script command e.g. | 
 |  | 
 | 	$ perf record -a -- sleep 10 | 
 | 	$ parallel-perf.py --nr=4 -- perf script --ns | 
 | 	All jobs finished successfully | 
 | 	$ tree parallel-perf-output/ | 
 | 	parallel-perf-output/ | 
 | 	├── time-range-0 | 
 | 	│   ├── cmd.txt | 
 | 	│   └── out.txt | 
 | 	├── time-range-1 | 
 | 	│   ├── cmd.txt | 
 | 	│   └── out.txt | 
 | 	├── time-range-2 | 
 | 	│   ├── cmd.txt | 
 | 	│   └── out.txt | 
 | 	└── time-range-3 | 
 | 	    ├── cmd.txt | 
 | 	    └── out.txt | 
 | 	$ find parallel-perf-output -name cmd.txt | sort | xargs grep -H . | 
 | 	parallel-perf-output/time-range-0/cmd.txt:perf script --time=,9466.504461499 --ns | 
 | 	parallel-perf-output/time-range-1/cmd.txt:perf script --time=9466.504461500,9469.005396999 --ns | 
 | 	parallel-perf-output/time-range-2/cmd.txt:perf script --time=9469.005397000,9471.506332499 --ns | 
 | 	parallel-perf-output/time-range-3/cmd.txt:perf script --time=9471.506332500, --ns | 
 |  | 
 | Any perf script command can be used, including the use of perf script options | 
 | --dlfilter and --script, so that the benefit of running parallel jobs | 
 | naturally extends to them also. | 
 |  | 
 | If option --pipe-to is used, standard output is first piped through that | 
 | command. Beware, if the command fails (e.g. grep with no matches), it will be | 
 | considered a fatal error. | 
 |  | 
 | Final standard output is redirected to files named out.txt in separate | 
 | subdirectories under the output directory. Similarly, standard error is | 
 | written to files named err.txt. In addition, files named cmd.txt contain the | 
 | corresponding perf script command. After processing, err.txt files are removed | 
 | if they are empty. | 
 |  | 
 | If any job exits with a non-zero exit code, then all jobs are killed and no | 
 | more are started. A message is printed if any job results in a non-empty | 
 | err.txt file. | 
 |  | 
 | There is a separate output subdirectory for each time range. If the --per-cpu | 
 | option is used, these are further grouped under cpu-n subdirectories, e.g. | 
 |  | 
 | 	$ parallel-perf.py --per-cpu --nr=2 -- perf script --ns --cpu=0,1 | 
 | 	All jobs finished successfully | 
 | 	$ tree parallel-perf-output | 
 | 	parallel-perf-output/ | 
 | 	├── cpu-0 | 
 | 	│   ├── time-range-0 | 
 | 	│   │   ├── cmd.txt | 
 | 	│   │   └── out.txt | 
 | 	│   └── time-range-1 | 
 | 	│       ├── cmd.txt | 
 | 	│       └── out.txt | 
 | 	└── cpu-1 | 
 | 	    ├── time-range-0 | 
 | 	    │   ├── cmd.txt | 
 | 	    │   └── out.txt | 
 | 	    └── time-range-1 | 
 | 	        ├── cmd.txt | 
 | 	        └── out.txt | 
 | 	$ find parallel-perf-output -name cmd.txt | sort | xargs grep -H . | 
 | 	parallel-perf-output/cpu-0/time-range-0/cmd.txt:perf script --cpu=0 --time=,9469.005396999 --ns | 
 | 	parallel-perf-output/cpu-0/time-range-1/cmd.txt:perf script --cpu=0 --time=9469.005397000, --ns | 
 | 	parallel-perf-output/cpu-1/time-range-0/cmd.txt:perf script --cpu=1 --time=,9469.005396999 --ns | 
 | 	parallel-perf-output/cpu-1/time-range-1/cmd.txt:perf script --cpu=1 --time=9469.005397000, --ns | 
 |  | 
 | Subdivisions of time range, and cpus if the --per-cpu option is used, are | 
 | expressed by the --time and --cpu perf script options respectively. If the | 
 | supplied perf script command has a --time option, then that time range is | 
 | subdivided, otherwise the time range given by 'time of first sample' to | 
 | 'time of last sample' is used (refer perf script --header-only). Similarly, the | 
 | supplied perf script command may provide a --cpu option, and only those CPUs | 
 | will be processed. | 
 |  | 
 | To prevent time intervals becoming too small, the --min-interval option can | 
 | be used. | 
 |  | 
 | Note there is special handling for processing Intel PT traces. If an interval is | 
 | not specified and the perf record command contained the intel_pt event, then the | 
 | time range will be subdivided in order to produce subdivisions that contain | 
 | approximately the same amount of trace data. That is accomplished by counting | 
 | double-quick (--itrace=qqi) samples, and choosing time ranges that encompass | 
 | approximately the same number of samples. In that case, time ranges may not be | 
 | the same for each CPU processed. For Intel PT, --per-cpu is the default, but | 
 | that can be overridden by --no-per-cpu. Note, for Intel PT, double-quick | 
 | decoding produces 1 sample for each PSB synchronization packet, which in turn | 
 | come after a certain number of bytes output, determined by psb_period (refer | 
 | perf Intel PT documentation). The minimum number of double-quick samples that | 
 | will define a time range can be set by the --min_size option, which defaults to | 
 | 64. | 
 | """) | 
 | 	ap.add_argument("-o", "--output-dir", default="parallel-perf-output", help="output directory (default 'parallel-perf-output')") | 
 | 	ap.add_argument("-j", "--jobs", type=int, default=0, help="maximum number of jobs to run in parallel at one time (default is the number of CPUs)") | 
 | 	ap.add_argument("-n", "--nr", type=int, default=0, help="number of time subdivisions (default is the number of jobs)") | 
 | 	ap.add_argument("-i", "--interval", type=float, default=0, help="subdivide the time range using this time interval (in seconds e.g. 0.1 for a tenth of a second)") | 
 | 	ap.add_argument("-c", "--per-cpu", action="store_true", help="process data for each CPU in parallel") | 
 | 	ap.add_argument("-m", "--min-interval", type=float, default=glb_min_interval, help=f"minimum interval (default {glb_min_interval} seconds)") | 
 | 	ap.add_argument("-p", "--pipe-to", help="command to pipe output to (optional)") | 
 | 	ap.add_argument("-N", "--no-per-cpu", action="store_true", help="do not process data for each CPU in parallel") | 
 | 	ap.add_argument("-b", "--min_size", type=int, default=glb_min_samples, help="minimum data size (for Intel PT in PSBs)") | 
 | 	ap.add_argument("-D", "--dry-run", action="store_true", help="do not run any jobs, just show the perf script commands") | 
 | 	ap.add_argument("-q", "--quiet", action="store_true", help="do not print any messages except errors") | 
 | 	ap.add_argument("-v", "--verbose", action="store_true", help="print more messages") | 
 | 	ap.add_argument("-d", "--debug", action="store_true", help="print debugging messages") | 
 | 	cmd_line = list(args) | 
 | 	try: | 
 | 		split_pos = cmd_line.index("--") | 
 | 		cmd = cmd_line[split_pos + 1:] | 
 | 		args = cmd_line[:split_pos] | 
 | 	except: | 
 | 		cmd = None | 
 | 		args = cmd_line | 
 | 	a = ap.parse_args(args=args[1:]) | 
 | 	a.cmd = cmd | 
 | 	a.verbosity = Verbosity(a.quiet, a.verbose, a.debug) | 
 | 	try: | 
 | 		if a.cmd == None: | 
 | 			if len(args) <= 1: | 
 | 				ap.print_help() | 
 | 				return True | 
 | 			raise Exception("Command line must contain '--' before perf command") | 
 | 		return RunParallelPerf(a) | 
 | 	except Exception as e: | 
 | 		print("Fatal error: ", str(e)) | 
 | 		if a.debug: | 
 | 			raise | 
 | 		return False | 
 |  | 
 | if __name__ == "__main__": | 
 | 	if not Main(sys.argv): | 
 | 		sys.exit(1) |