| // SPDX-License-Identifier: GPL-2.0 |
| /* |
| * Copyright (c) 2025 NVIDIA Corporation. |
| */ |
| #define _GNU_SOURCE |
| #include <stdio.h> |
| #include <unistd.h> |
| #include <signal.h> |
| #include <time.h> |
| #include <bpf/bpf.h> |
| #include <scx/common.h> |
| #include <sys/wait.h> |
| #include <sched.h> |
| #include <pthread.h> |
| #include "scx_test.h" |
| #include "dequeue.bpf.skel.h" |
| |
| #define NUM_WORKERS 8 |
| #define AFFINITY_HAMMER_MS 500 |
| |
| /* |
| * Worker function that creates enqueue/dequeue events via CPU work and |
| * sleep. |
| */ |
| static void worker_fn(int id) |
| { |
| int i; |
| volatile int sum = 0; |
| |
| for (i = 0; i < 1000; i++) { |
| volatile int j; |
| |
| /* Do some work to trigger scheduling events */ |
| for (j = 0; j < 10000; j++) |
| sum += j; |
| asm volatile("" : : "r"(sum)); |
| |
| /* Sleep to trigger dequeue */ |
| usleep(1000 + (id * 100)); |
| } |
| |
| exit(0); |
| } |
| |
| /* |
| * This thread changes workers' affinity from outside so that some changes |
| * hit tasks while they are still in the scheduler's queue and trigger |
| * property-change dequeues. |
| */ |
| static void *affinity_hammer_fn(void *arg) |
| { |
| pid_t *pids = arg; |
| cpu_set_t cpuset; |
| int i = 0, n = NUM_WORKERS; |
| struct timespec start, now; |
| |
| clock_gettime(CLOCK_MONOTONIC, &start); |
| while (1) { |
| int w = i % n; |
| int cpu = (i / n) % 4; |
| |
| CPU_ZERO(&cpuset); |
| CPU_SET(cpu, &cpuset); |
| sched_setaffinity(pids[w], sizeof(cpuset), &cpuset); |
| i++; |
| |
| /* Check elapsed time every 256 iterations to limit gettime cost */ |
| if ((i & 255) == 0) { |
| long long elapsed_ms; |
| |
| clock_gettime(CLOCK_MONOTONIC, &now); |
| elapsed_ms = (now.tv_sec - start.tv_sec) * 1000LL + |
| (now.tv_nsec - start.tv_nsec) / 1000000; |
| if (elapsed_ms >= AFFINITY_HAMMER_MS) |
| break; |
| } |
| } |
| return NULL; |
| } |
| |
| static enum scx_test_status run_scenario(struct dequeue *skel, u32 scenario, |
| const char *scenario_name) |
| { |
| struct bpf_link *link; |
| pid_t pids[NUM_WORKERS]; |
| pthread_t hammer; |
| |
| int i, status; |
| u64 enq_start, deq_start, |
| dispatch_deq_start, change_deq_start, bpf_queue_full_start; |
| u64 enq_delta, deq_delta, |
| dispatch_deq_delta, change_deq_delta, bpf_queue_full_delta; |
| |
| /* Set the test scenario */ |
| skel->bss->test_scenario = scenario; |
| |
| /* Record starting counts */ |
| enq_start = skel->bss->enqueue_cnt; |
| deq_start = skel->bss->dequeue_cnt; |
| dispatch_deq_start = skel->bss->dispatch_dequeue_cnt; |
| change_deq_start = skel->bss->change_dequeue_cnt; |
| bpf_queue_full_start = skel->bss->bpf_queue_full; |
| |
| link = bpf_map__attach_struct_ops(skel->maps.dequeue_ops); |
| SCX_FAIL_IF(!link, "Failed to attach struct_ops for scenario %s", scenario_name); |
| |
| /* Fork worker processes to generate enqueue/dequeue events */ |
| for (i = 0; i < NUM_WORKERS; i++) { |
| pids[i] = fork(); |
| SCX_FAIL_IF(pids[i] < 0, "Failed to fork worker %d", i); |
| |
| if (pids[i] == 0) { |
| worker_fn(i); |
| /* Should not reach here */ |
| exit(1); |
| } |
| } |
| |
| /* |
| * Run an "affinity hammer" so that some property changes hit tasks |
| * while they are still in BPF custody (e.g., in user DSQ or BPF |
| * queue), triggering SCX_DEQ_SCHED_CHANGE dequeues. |
| */ |
| SCX_FAIL_IF(pthread_create(&hammer, NULL, affinity_hammer_fn, pids) != 0, |
| "Failed to create affinity hammer thread"); |
| pthread_join(hammer, NULL); |
| |
| /* Wait for all workers to complete */ |
| for (i = 0; i < NUM_WORKERS; i++) { |
| SCX_FAIL_IF(waitpid(pids[i], &status, 0) != pids[i], |
| "Failed to wait for worker %d", i); |
| SCX_FAIL_IF(status != 0, "Worker %d exited with status %d", i, status); |
| } |
| |
| bpf_link__destroy(link); |
| |
| SCX_EQ(skel->data->uei.kind, EXIT_KIND(SCX_EXIT_UNREG)); |
| |
| /* Calculate deltas */ |
| enq_delta = skel->bss->enqueue_cnt - enq_start; |
| deq_delta = skel->bss->dequeue_cnt - deq_start; |
| dispatch_deq_delta = skel->bss->dispatch_dequeue_cnt - dispatch_deq_start; |
| change_deq_delta = skel->bss->change_dequeue_cnt - change_deq_start; |
| bpf_queue_full_delta = skel->bss->bpf_queue_full - bpf_queue_full_start; |
| |
| printf("%s:\n", scenario_name); |
| printf(" enqueues: %lu\n", (unsigned long)enq_delta); |
| printf(" dequeues: %lu (dispatch: %lu, property_change: %lu)\n", |
| (unsigned long)deq_delta, |
| (unsigned long)dispatch_deq_delta, |
| (unsigned long)change_deq_delta); |
| printf(" BPF queue full: %lu\n", (unsigned long)bpf_queue_full_delta); |
| |
| /* |
| * Validate enqueue/dequeue lifecycle tracking. |
| * |
| * For scenarios 0, 1, 3, 4 (local and global DSQs from |
| * ops.select_cpu() and ops.enqueue()), both enqueues and dequeues |
| * should be 0 because tasks bypass the BPF scheduler entirely: |
| * tasks never enter BPF scheduler's custody. |
| * |
| * For scenarios 2, 5, 6 (user DSQ or BPF internal queue) we expect |
| * both enqueues and dequeues. |
| * |
| * The BPF code does strict state machine validation with |
| * scx_bpf_error() to ensure the workflow semantics are correct. |
| * |
| * If we reach this point without errors, the semantics are |
| * validated correctly. |
| */ |
| if (scenario == 0 || scenario == 1 || |
| scenario == 3 || scenario == 4) { |
| /* Tasks bypass BPF scheduler completely */ |
| SCX_EQ(enq_delta, 0); |
| SCX_EQ(deq_delta, 0); |
| SCX_EQ(dispatch_deq_delta, 0); |
| SCX_EQ(change_deq_delta, 0); |
| } else { |
| /* |
| * User DSQ from ops.enqueue() or ops.select_cpu(): tasks |
| * enter BPF scheduler's custody. |
| * |
| * Also validate 1:1 enqueue/dequeue pairing. |
| */ |
| SCX_GT(enq_delta, 0); |
| SCX_GT(deq_delta, 0); |
| SCX_EQ(enq_delta, deq_delta); |
| } |
| |
| return SCX_TEST_PASS; |
| } |
| |
| static enum scx_test_status setup(void **ctx) |
| { |
| struct dequeue *skel; |
| |
| skel = dequeue__open(); |
| SCX_FAIL_IF(!skel, "Failed to open skel"); |
| SCX_ENUM_INIT(skel); |
| SCX_FAIL_IF(dequeue__load(skel), "Failed to load skel"); |
| |
| *ctx = skel; |
| |
| return SCX_TEST_PASS; |
| } |
| |
| static enum scx_test_status run(void *ctx) |
| { |
| struct dequeue *skel = ctx; |
| enum scx_test_status status; |
| |
| status = run_scenario(skel, 0, "Scenario 0: Local DSQ from ops.select_cpu()"); |
| if (status != SCX_TEST_PASS) |
| return status; |
| |
| status = run_scenario(skel, 1, "Scenario 1: Global DSQ from ops.select_cpu()"); |
| if (status != SCX_TEST_PASS) |
| return status; |
| |
| status = run_scenario(skel, 2, "Scenario 2: User DSQ from ops.select_cpu()"); |
| if (status != SCX_TEST_PASS) |
| return status; |
| |
| status = run_scenario(skel, 3, "Scenario 3: Local DSQ from ops.enqueue()"); |
| if (status != SCX_TEST_PASS) |
| return status; |
| |
| status = run_scenario(skel, 4, "Scenario 4: Global DSQ from ops.enqueue()"); |
| if (status != SCX_TEST_PASS) |
| return status; |
| |
| status = run_scenario(skel, 5, "Scenario 5: User DSQ from ops.enqueue()"); |
| if (status != SCX_TEST_PASS) |
| return status; |
| |
| status = run_scenario(skel, 6, "Scenario 6: BPF queue from ops.enqueue()"); |
| if (status != SCX_TEST_PASS) |
| return status; |
| |
| printf("\n=== Summary ===\n"); |
| printf("Total enqueues: %lu\n", (unsigned long)skel->bss->enqueue_cnt); |
| printf("Total dequeues: %lu\n", (unsigned long)skel->bss->dequeue_cnt); |
| printf(" Dispatch dequeues: %lu (no flag, normal workflow)\n", |
| (unsigned long)skel->bss->dispatch_dequeue_cnt); |
| printf(" Property change dequeues: %lu (SCX_DEQ_SCHED_CHANGE flag)\n", |
| (unsigned long)skel->bss->change_dequeue_cnt); |
| printf(" BPF queue full: %lu\n", |
| (unsigned long)skel->bss->bpf_queue_full); |
| printf("\nAll scenarios passed - no state machine violations detected\n"); |
| printf("-> Validated: Local DSQ dispatch bypasses BPF scheduler\n"); |
| printf("-> Validated: Global DSQ dispatch bypasses BPF scheduler\n"); |
| printf("-> Validated: User DSQ dispatch triggers ops.dequeue() callbacks\n"); |
| printf("-> Validated: Dispatch dequeues have no flags (normal workflow)\n"); |
| printf("-> Validated: Property change dequeues have SCX_DEQ_SCHED_CHANGE flag\n"); |
| printf("-> Validated: No duplicate enqueues or invalid state transitions\n"); |
| |
| return SCX_TEST_PASS; |
| } |
| |
| static void cleanup(void *ctx) |
| { |
| struct dequeue *skel = ctx; |
| |
| dequeue__destroy(skel); |
| } |
| |
| struct scx_test dequeue_test = { |
| .name = "dequeue", |
| .description = "Verify ops.dequeue() semantics", |
| .setup = setup, |
| .run = run, |
| .cleanup = cleanup, |
| }; |
| |
| REGISTER_SCX_TEST(&dequeue_test) |