diff --git a/.github/workflows/ci-preemptive.sh b/.github/workflows/ci-preemptive.sh
new file mode 100755
index 00000000..1a244eaf
--- /dev/null
+++ b/.github/workflows/ci-preemptive.sh
@@ -0,0 +1,36 @@
+#!/usr/bin/env sh
+
+set -ex
+
+CARGO=cargo
+if [ "${CROSS}" = "1" ]; then
+ export CARGO_NET_RETRY=5
+ export CARGO_NET_TIMEOUT=10
+
+ cargo install cross
+ CARGO=cross
+fi
+
+# If a test crashes, we want to know which one it was.
+export RUST_TEST_THREADS=1
+export RUST_BACKTRACE=1
+
+# test open-coroutine-core mod
+cd "${PROJECT_DIR}"/core
+"${CARGO}" test --target "${TARGET}" --features preemptive
+"${CARGO}" test --target "${TARGET}" --features preemptive --release
+
+# test open-coroutine
+cd "${PROJECT_DIR}"/open-coroutine
+"${CARGO}" test --target "${TARGET}" --features preemptive
+"${CARGO}" test --target "${TARGET}" --features preemptive --release
+
+# test io_uring
+if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ]; then
+ cd "${PROJECT_DIR}"/core
+ "${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive
+ "${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive --release
+ cd "${PROJECT_DIR}"/open-coroutine
+ "${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive
+ "${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive --release
+fi
diff --git a/.github/workflows/ci.sh b/.github/workflows/ci.sh
new file mode 100755
index 00000000..47b5a23c
--- /dev/null
+++ b/.github/workflows/ci.sh
@@ -0,0 +1,36 @@
+#!/usr/bin/env sh
+
+set -ex
+
+CARGO=cargo
+if [ "${CROSS}" = "1" ]; then
+ export CARGO_NET_RETRY=5
+ export CARGO_NET_TIMEOUT=10
+
+ cargo install cross
+ CARGO=cross
+fi
+
+# If a test crashes, we want to know which one it was.
+export RUST_TEST_THREADS=1
+export RUST_BACKTRACE=1
+
+# test open-coroutine-core mod
+cd "${PROJECT_DIR}"/core
+"${CARGO}" test --target "${TARGET}"
+"${CARGO}" test --target "${TARGET}" --release
+
+# test open-coroutine
+cd "${PROJECT_DIR}"/open-coroutine
+"${CARGO}" test --target "${TARGET}"
+"${CARGO}" test --target "${TARGET}" --release
+
+# test io_uring
+if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ]; then
+ cd "${PROJECT_DIR}"/core
+ "${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring
+ "${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring --release
+ cd "${PROJECT_DIR}"/open-coroutine
+ "${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring
+ "${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring --release
+fi
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 175b9adb..7791551c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -7,256 +7,103 @@ on:
pull_request:
paths-ignore:
- '**.md'
- workflow_dispatch:
env:
CARGO_TERM_COLOR: always
jobs:
- lints:
- name: Run cargo fmt and cargo clippy
- runs-on: ubuntu-latest
+ test:
+ runs-on: ${{ matrix.os }}
steps:
- - name: Checkout sources
- uses: actions/checkout@v2
- - name: Install toolchain
- uses: actions-rs/toolchain@v1
+ - uses: actions/checkout@v4
+ - uses: actions-rs/toolchain@v1
with:
- profile: minimal
- toolchain: stable
+ toolchain: ${{ matrix.target == 'i686-pc-windows-gnu' && format('{0}-i686-pc-windows-gnu', matrix.channel) || matrix.channel }}
+ target: ${{ matrix.target }}
override: true
- components: rustfmt, clippy
- - name: cargo fmt --check
- uses: actions-rs/cargo@v1
+ components: rustfmt
+ - uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
- - name: Run cargo clippy
- uses: actions-rs/cargo@v1
+ - name: Run cargo deny
+ if: ${{ contains(matrix.os, 'ubuntu') }}
+ uses: EmbarkStudios/cargo-deny-action@v2
with:
- command: clippy
- args: -- -D warnings
+ log-level: warn
+ command: check
+ arguments: --all-features
+ - name: Run tests
+ env:
+ CHANNEL: ${{ matrix.channel }}
+ CROSS: ${{ !startsWith(matrix.target, 'x86_64') && contains(matrix.target, 'linux') && '1' || '0' }}
+ TARGET: ${{ matrix.target }}
+ OS: ${{ matrix.os }}
+ PROJECT_DIR: ${{ github.workspace }}
+ run: sh .github/workflows/ci.sh
+ - name: Run preemptive tests
+ if: always()
+ env:
+ CHANNEL: ${{ matrix.channel }}
+ CROSS: ${{ !startsWith(matrix.target, 'x86_64') && contains(matrix.target, 'linux') && '1' || '0' }}
+ TARGET: ${{ matrix.target }}
+ OS: ${{ matrix.os }}
+ PROJECT_DIR: ${{ github.workspace }}
+ run: sh .github/workflows/ci-preemptive.sh
- linux:
- name: Test ${{ matrix.rust }} on ubuntu-latest
- runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
- rust:
- - stable
- - nightly
- steps:
- - name: Checkout sources
- uses: actions/checkout@v2
- - name: Install Rust (${{ matrix.rust }})
- uses: actions-rs/toolchain@v1
- with:
- profile: minimal
- toolchain: ${{ matrix.rust }}
- override: true
- - name: Run cargo clean
- run: |
- cd ${{ github.workspace }}
- /home/runner/.cargo/bin/cargo clean
- - name: Run cargo release test compile
- uses: actions-rs/cargo@v1
- with:
- command: test
- args: --release --all --no-run
- - name: Run cargo release test
- run: sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /home/runner/.cargo/bin/cargo test --release --all"
- - name: Run cargo release preemptive example
- if: always()
- run: |
- cd ${{ github.workspace }}/open-coroutine-core
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /home/runner/.cargo/bin/cargo run --example preemptive --release --features preemptive-schedule"
- - name: Run cargo release test io_uring
- if: always()
- run: |
- cd ${{ github.workspace }}/open-coroutine-iouring
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /home/runner/.cargo/bin/cargo test --release"
- - name: Run cargo release sleep not coroutine example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /home/runner/.cargo/bin/cargo run --example sleep_not_co --release"
- - name: Run cargo release sleep coroutine example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /home/runner/.cargo/bin/cargo run --example sleep_co --release"
- - name: Run cargo release socket not coroutine example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /home/runner/.cargo/bin/cargo run --example socket_not_co --release"
- - name: Run cargo release socket coroutine server example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /home/runner/.cargo/bin/cargo run --example socket_co_server --release"
- - name: Run cargo release socket coroutine client example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /home/runner/.cargo/bin/cargo run --example socket_co_client --release"
- - name: Run cargo release socket coroutine example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /home/runner/.cargo/bin/cargo run --example socket_co --release"
- - name: Run cargo release socket not coroutine with io_uring example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /home/runner/.cargo/bin/cargo run --example socket_not_co --release --features io_uring"
- - name: Run cargo release socket coroutine server with io_uring example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /home/runner/.cargo/bin/cargo run --example socket_co_server --release --features io_uring"
- - name: Run cargo release socket coroutine client with io_uring example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /home/runner/.cargo/bin/cargo run --example socket_co_client --release --features io_uring"
- - name: Run cargo release socket coroutine with io_uring example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /home/runner/.cargo/bin/cargo run --example socket_co --release --features io_uring"
+ target: [
+ x86_64-unknown-linux-gnu,
+ i686-unknown-linux-gnu,
+ aarch64-unknown-linux-gnu,
+ armv7-unknown-linux-gnueabihf,
+ riscv64gc-unknown-linux-gnu,
+ thumbv7neon-unknown-linux-gnueabihf,
+# mips64-unknown-linux-muslabi64,
+# loongarch64-unknown-linux-gnu,
+# s390x-unknown-linux-gnu,
- macos:
- name: Test ${{ matrix.rust }} on ${{ matrix.os }}
- runs-on: ${{ matrix.os }}
- strategy:
- fail-fast: false
- matrix:
- rust:
- - stable
- - nightly
- os:
- - macos-latest
- - macos-14
- steps:
- - name: Checkout sources
- uses: actions/checkout@v2
- - name: Install Rust (${{ matrix.rust }})
- uses: actions-rs/toolchain@v1
- with:
- profile: minimal
- toolchain: ${{ matrix.rust }}
- override: true
- - name: Run cargo clean
- run: |
- cd ${{ github.workspace }}
- /Users/runner/.cargo/bin/cargo clean
- - name: Run cargo release test compile
- uses: actions-rs/cargo@v1
- with:
- command: test
- args: --release --all --no-run
- - name: Run cargo release test
- run: sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /Users/runner/.cargo/bin/cargo test --release --all"
- - name: Run cargo release preemptive example
- if: always()
- run: |
- cd ${{ github.workspace }}/open-coroutine-core
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /Users/runner/.cargo/bin/cargo run --example preemptive --release --features preemptive-schedule"
- - name: Run cargo release sleep not coroutine example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /Users/runner/.cargo/bin/cargo run --example sleep_not_co --release"
- - name: Run cargo release sleep coroutine example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /Users/runner/.cargo/bin/cargo run --example sleep_co --release"
- - name: Run cargo release socket not coroutine example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /Users/runner/.cargo/bin/cargo run --example socket_not_co --release"
- - name: Run cargo release socket coroutine server example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /Users/runner/.cargo/bin/cargo run --example socket_co_server --release"
- - name: Run cargo release socket coroutine client example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /Users/runner/.cargo/bin/cargo run --example socket_co_client --release"
- - name: Run cargo release socket coroutine example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- sudo bash -c "sudo -u runner RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 /Users/runner/.cargo/bin/cargo run --example socket_co --release"
+ x86_64-apple-darwin,
+ aarch64-apple-darwin,
- windows:
- name: Test ${{ matrix.rust }} on windows-latest
- runs-on: windows-latest
- strategy:
- fail-fast: false
- matrix:
- rust:
- # stable is not supported due to static-detour in retour crate
-# - stable-x86_64-pc-windows-gnu
- - nightly-x86_64-pc-windows-gnu
- steps:
- - name: Checkout sources
- uses: actions/checkout@v2
- - name: Install Rust (${{ matrix.rust }})
- uses: actions-rs/toolchain@v1
- with:
- profile: minimal
- toolchain: ${{ matrix.rust }}
- override: true
- - name: Run cargo clean
- run: |
- cd ${{ github.workspace }}
- C://Users//runneradmin//.cargo//bin//cargo.exe clean
- - name: Run cargo release test compile
- uses: actions-rs/cargo@v1
- with:
- command: test
- args: --release --all --no-run
- - name: Run cargo release test
- run: bash -c "RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 C://Users//runneradmin//.cargo//bin//cargo.exe test --release --all"
- - name: Run cargo release preemptive example
- if: always()
- run: |
- cd ${{ github.workspace }}/open-coroutine-core
- bash -c "RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 C://Users//runneradmin//.cargo//bin//cargo.exe run --example preemptive --release --features preemptive-schedule"
- - name: Run cargo release sleep not coroutine example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- bash -c "RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 C://Users//runneradmin//.cargo//bin//cargo.exe run --example sleep_not_co --release"
- - name: Run cargo release sleep coroutine example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- bash -c "RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 C://Users//runneradmin//.cargo//bin//cargo.exe run --example sleep_co --release"
- - name: Run cargo release socket not coroutine example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- bash -c "RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 C://Users//runneradmin//.cargo//bin//cargo.exe run --example socket_not_co --release"
- - name: Run cargo release socket coroutine server example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- bash -c "RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 C://Users//runneradmin//.cargo//bin//cargo.exe run --example socket_co_server --release"
- - name: Run cargo release socket coroutine client example
- if: always()
- run: |
- cd ${{ github.workspace }}/examples
- bash -c "RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 C://Users//runneradmin//.cargo//bin//cargo.exe run --example socket_co_client --release"
-# - name: Run cargo release socket coroutine example
-# if: always()
-# run: |
-# cd ${{ github.workspace }}/examples
-# bash -c "RUSTUP_TOOLCHAIN=${{ matrix.rust }} RUST_BACKTRACE=1 C://Users//runneradmin//.cargo//bin//cargo.exe run --example socket_co --release"
+ x86_64-pc-windows-gnu,
+ i686-pc-windows-gnu,
+ x86_64-pc-windows-msvc,
+ i686-pc-windows-msvc,
+ ]
+ channel: [ 1.81.0, nightly-2024-08-02 ]
+ include:
+ - target: x86_64-unknown-linux-gnu
+ os: ubuntu-latest
+ - target: i686-unknown-linux-gnu
+ os: ubuntu-latest
+ - target: aarch64-unknown-linux-gnu
+ os: ubuntu-latest
+ - target: armv7-unknown-linux-gnueabihf
+ os: ubuntu-latest
+ - target: riscv64gc-unknown-linux-gnu
+ os: ubuntu-latest
+ - target: thumbv7neon-unknown-linux-gnueabihf
+ os: ubuntu-latest
+# - target: mips64-unknown-linux-muslabi64
+# os: ubuntu-latest
+# - target: loongarch64-unknown-linux-gnu
+# os: ubuntu-latest
+# - target: s390x-unknown-linux-gnu
+# os: ubuntu-latest
+
+ - target: x86_64-apple-darwin
+ os: macos-latest
+ - target: aarch64-apple-darwin
+ os: macos-14
+
+ - target: x86_64-pc-windows-gnu
+ os: windows-latest
+ - target: i686-pc-windows-gnu
+ os: windows-latest
+ - target: x86_64-pc-windows-msvc
+ os: windows-latest
+ - target: i686-pc-windows-msvc
+ os: windows-latest
diff --git a/.github/workflows/pr-audit.yml b/.github/workflows/pr-audit.yml
index c6ff721e..a02666dc 100644
--- a/.github/workflows/pr-audit.yml
+++ b/.github/workflows/pr-audit.yml
@@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
if: "!contains(github.event.head_commit.message, 'ci skip')"
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v4
- name: Install cargo-audit
uses: actions-rs/cargo@v1
diff --git a/.gitignore b/.gitignore
index b8aed76b..df5c3508 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,16 +1,3 @@
-# Generated by Cargo
-# will have compiled files and executables
-/target/
-
-# idea ignore
-.idea/
-*.ipr
-*.iml
-*.iws
-
-# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
-# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
-*.lock
-
-# These are backup files generated by rustfmt
-**/*.rs.bk
+/target
+.idea
+*.lock
\ No newline at end of file
diff --git a/Cargo.toml b/Cargo.toml
index 9f46fcac..d4efe6ff 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,12 +1,57 @@
[workspace]
resolver = "2"
members = [
- "open-coroutine-timer",
- "open-coroutine-queue",
- "open-coroutine-iouring",
- "open-coroutine-core",
- "open-coroutine-hooks",
- "open-coroutine-macros",
- "open-coroutine",
- "examples"
+ "core",
+ "hook",
+ "macros",
+ "open-coroutine"
]
+
+[workspace.package]
+version = "0.6.10"
+edition = "2021"
+authors = ["zhangzicheng@apache.org"]
+repository = "https://github.com/acl-dev/open-coroutine"
+license = "Apache-2.0"
+readme = "README.md"
+
+[workspace.dependencies]
+open-coroutine-core = { path = "core", version = "0.6.0" }
+open-coroutine-hook = { path = "hook", version = "0.6.0" }
+open-coroutine-macros = { path = "macros", version = "0.6.0" }
+
+tracing = { version = "0.1", default-features = false }
+tracing-subscriber = { version = "0.3", default-features = false }
+tracing-appender = { version = "0.2", default-features = false }
+cargo_metadata = { version = "0.18", default-features = false }
+mio = { version = "1.0", default-features = false }
+
+cfg-if = "1.0.0"
+polling = "2.8.0"
+
+libc = "0.2"
+rand = "0.8"
+st3 = "0.4"
+crossbeam-deque = "0.8"
+time = "0.3"
+corosensei = "0.2"
+core_affinity = "0.8"
+crossbeam-utils = "0.8"
+nix = "0.29"
+io-uring = "0.7"
+windows-sys = "0.59"
+anyhow = "1.0"
+slab = "0.4"
+backtrace = "0.3"
+minhook = "0.6"
+psm = "0.1"
+
+once_cell = "1"
+dashmap = "6"
+num_cpus = "1"
+uuid = "1"
+derivative = "2"
+tempfile = "3"
+cc = "1"
+syn = "2"
+quote = "1"
diff --git a/README.md b/README.md
index 8c0ae5ed..3afb5f09 100644
--- a/README.md
+++ b/README.md
@@ -160,7 +160,6 @@ nanosleep hooked
### todo
-- [ ] support scalable stack
- [ ] support and compatibility for AF_XDP socket
- [ ] hook other syscall maybe interrupt by signal
@@ -204,6 +203,10 @@ nanosleep hooked
- [ ] support `#[open_coroutine::join]` macro to wait coroutines
+### 0.6.x
+
+- [x] support scalable stack
+
### 0.5.x
- [x] refactor syscall state, distinguish between state and innerState
diff --git a/TODO.md b/TODO.md
deleted file mode 100644
index 794dd45f..00000000
--- a/TODO.md
+++ /dev/null
@@ -1,3 +0,0 @@
-EventLoops does not perform load balancing
-
-refactor CI
\ No newline at end of file
diff --git a/core/Cargo.toml b/core/Cargo.toml
new file mode 100644
index 00000000..39e20026
--- /dev/null
+++ b/core/Cargo.toml
@@ -0,0 +1,93 @@
+[package]
+name = "open-coroutine-core"
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+description = "The open-coroutine is a simple, efficient and generic coroutine library."
+repository.workspace = true
+keywords = ["runtime", "coroutine", "hook", "preempt", "work-steal"]
+categories = ["concurrency", "asynchronous", "os", "network-programming", "wasm"]
+license.workspace = true
+readme.workspace = true
+
+[dependencies]
+cfg-if.workspace = true
+once_cell.workspace = true
+dashmap.workspace = true
+num_cpus.workspace = true
+rand.workspace = true
+st3.workspace = true
+crossbeam-deque.workspace = true
+tracing = { workspace = true, default-features = false, optional = true }
+tracing-subscriber = { workspace = true, features = [
+ "fmt",
+ "local-time"
+], default-features = false, optional = true }
+time = { workspace = true, optional = true }
+corosensei = { workspace = true, optional = true }
+uuid = { workspace = true, features = [
+ "v4",
+ "fast-rng",
+], optional = true }
+derivative = { workspace = true, optional = true }
+core_affinity = { workspace = true, optional = true }
+crossbeam-utils = { workspace = true, optional = true }
+psm.workspace = true
+
+[target.'cfg(unix)'.dependencies]
+libc.workspace = true
+nix = { workspace = true, features = ["signal"] }
+mio = { workspace = true, features = [
+ "net",
+ "os-poll",
+ "os-ext",
+], default-features = false, optional = true }
+
+[target.'cfg(target_os = "linux")'.dependencies]
+io-uring = { workspace = true, optional = true }
+
+[target.'cfg(windows)'.dependencies]
+windows-sys = { workspace = true, features = [
+ "Win32_System_IO",
+ "Win32_Foundation",
+ "Win32_System_Kernel",
+ "Win32_System_Threading",
+ "Win32_Networking_WinSock",
+ "Win32_System_SystemInformation",
+ "Win32_System_Diagnostics_Debug",
+] }
+polling = { workspace = true, optional = true }
+
+[build-dependencies]
+cfg-if.workspace = true
+
+[target.'cfg(target_os = "linux")'.build-dependencies]
+cc.workspace = true
+
+[dev-dependencies]
+anyhow.workspace = true
+slab.workspace = true
+backtrace.workspace = true
+
+[features]
+# Print some help log.
+# Enable for default.
+log = ["tracing", "tracing-subscriber", "time"]
+
+# low-level raw coroutine
+korosensei = ["corosensei", "uuid", "nix/pthread", "derivative"]
+
+# Provide preemptive scheduling implementation.
+# Enable for default.
+preemptive = ["korosensei"]
+
+# Provide net API abstraction and implementation.
+net = ["korosensei", "polling", "mio", "crossbeam-utils", "core_affinity"]
+
+# Provide io_uring adaptation, this feature only works in linux.
+io_uring = ["net", "io-uring"]
+
+# Provide syscall implementation.
+syscall = ["net"]
+
+default = ["log", "syscall"]
\ No newline at end of file
diff --git a/open-coroutine-iouring/build.rs b/core/build.rs
similarity index 53%
rename from open-coroutine-iouring/build.rs
rename to core/build.rs
index cda86a94..84c5d843 100644
--- a/open-coroutine-iouring/build.rs
+++ b/core/build.rs
@@ -2,12 +2,8 @@ fn main() {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
cc::Build::new()
- .cpp(true)
.warnings(true)
- .flag("-Wall")
- .flag("-std=c++11")
- .flag("-c")
- .file("cpp_src/version.cpp")
+ .file("c_src/version.c")
.compile("version");
}
}
diff --git a/open-coroutine-iouring/cpp_src/version.cpp b/core/c_src/version.c
similarity index 69%
rename from open-coroutine-iouring/cpp_src/version.cpp
rename to core/c_src/version.c
index 6e60b02c..19817801 100644
--- a/open-coroutine-iouring/cpp_src/version.cpp
+++ b/core/c_src/version.c
@@ -1,4 +1,4 @@
-#include "version.h"
+#include
int linux_version_code() {
return LINUX_VERSION_CODE;
diff --git a/open-coroutine-core/src/pool/creator.rs b/core/src/co_pool/creator.rs
similarity index 78%
rename from open-coroutine-core/src/pool/creator.rs
rename to core/src/co_pool/creator.rs
index e35ec3a3..544107b5 100644
--- a/open-coroutine-core/src/pool/creator.rs
+++ b/core/src/co_pool/creator.rs
@@ -1,18 +1,18 @@
-use crate::common::{Current, Pool};
-use crate::constants::CoroutineState;
+use crate::co_pool::CoroutinePool;
+use crate::common::constants::CoroutineState;
use crate::coroutine::listener::Listener;
-use crate::pool::CoroutinePool;
-use crate::scheduler::{SchedulableCoroutine, SchedulableCoroutineState, SchedulableListener};
+use crate::coroutine::local::CoroutineLocal;
+use crate::scheduler::SchedulableCoroutineState;
use std::sync::atomic::Ordering;
#[repr(C)]
#[derive(Debug, Default)]
pub(crate) struct CoroutineCreator {}
-impl Listener<(), (), Option> for CoroutineCreator {
+impl Listener<(), Option> for CoroutineCreator {
fn on_state_changed(
&self,
- _: &SchedulableCoroutine,
+ _: &CoroutineLocal,
_: SchedulableCoroutineState,
new_state: SchedulableCoroutineState,
) {
@@ -41,5 +41,3 @@ impl Listener<(), (), Option> for CoroutineCreator {
}
}
}
-
-impl SchedulableListener for CoroutineCreator {}
diff --git a/core/src/co_pool/mod.rs b/core/src/co_pool/mod.rs
new file mode 100644
index 00000000..716ef662
--- /dev/null
+++ b/core/src/co_pool/mod.rs
@@ -0,0 +1,440 @@
+use crate::co_pool::creator::CoroutineCreator;
+use crate::co_pool::task::Task;
+use crate::common::beans::BeanFactory;
+use crate::common::constants::PoolState;
+use crate::common::work_steal::{LocalQueue, WorkStealQueue};
+use crate::common::{get_timeout_time, now, CondvarBlocker};
+use crate::coroutine::suspender::Suspender;
+use crate::scheduler::{SchedulableCoroutine, Scheduler};
+use crate::{impl_current_for, impl_display_by_debug, impl_for_named, trace};
+use dashmap::DashMap;
+use std::cell::Cell;
+use std::io::{Error, ErrorKind};
+use std::ops::{Deref, DerefMut};
+use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
+use std::sync::{Arc, Condvar, Mutex};
+use std::time::Duration;
+
+/// Task abstraction and impl.
+pub mod task;
+
+/// Coroutine pool state abstraction and impl.
+mod state;
+
+/// Creator for coroutine pool.
+mod creator;
+
+/// The coroutine pool impls.
+#[repr(C)]
+#[derive(Debug)]
+pub struct CoroutinePool<'p> {
+ //协程池状态
+ state: Cell,
+ //任务队列
+ task_queue: LocalQueue<'p, Task<'p>>,
+ //工作协程组
+ workers: Scheduler<'p>,
+ //当前协程数
+ running: AtomicUsize,
+ //尝试取出任务失败的次数
+ pop_fail_times: AtomicUsize,
+ //最小协程数,即核心协程数
+ min_size: AtomicUsize,
+ //最大协程数
+ max_size: AtomicUsize,
+ //非核心协程的最大存活时间,单位ns
+ keep_alive_time: AtomicU64,
+ //阻滞器
+ blocker: Arc,
+ //正在等待结果的
+ waits: DashMap<&'p str, Arc<(Mutex, Condvar)>>,
+ //任务执行结果
+ results: DashMap, &'p str>>,
+}
+
+impl Drop for CoroutinePool<'_> {
+ fn drop(&mut self) {
+ if std::thread::panicking() {
+ return;
+ }
+ self.stop(Duration::from_secs(30))
+ .unwrap_or_else(|_| panic!("Failed to stop coroutine pool {} !", self.name()));
+ assert_eq!(
+ PoolState::Stopped,
+ self.state(),
+ "The coroutine pool is not stopped !"
+ );
+ assert_eq!(
+ 0,
+ self.get_running_size(),
+ "There are still tasks in progress !"
+ );
+ assert_eq!(
+ 0,
+ self.task_queue.len(),
+ "There are still tasks to be carried out !"
+ );
+ }
+}
+
+impl Default for CoroutinePool<'_> {
+ fn default() -> Self {
+ Self::new(
+ format!("open-coroutine-pool-{:?}", std::thread::current().id()),
+ crate::common::constants::DEFAULT_STACK_SIZE,
+ 0,
+ 65536,
+ 0,
+ )
+ }
+}
+
+impl<'p> Deref for CoroutinePool<'p> {
+ type Target = Scheduler<'p>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.workers
+ }
+}
+
+impl DerefMut for CoroutinePool<'_> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.workers
+ }
+}
+
+impl_for_named!(CoroutinePool<'p>);
+
+impl_current_for!(COROUTINE_POOL, CoroutinePool<'p>);
+
+impl_display_by_debug!(CoroutinePool<'p>);
+
+impl<'p> CoroutinePool<'p> {
+ /// Create a new `CoroutinePool` instance.
+ #[must_use]
+ pub fn new(
+ name: String,
+ stack_size: usize,
+ min_size: usize,
+ max_size: usize,
+ keep_alive_time: u64,
+ ) -> Self {
+ let mut workers = Scheduler::new(name, stack_size);
+ workers.add_listener(CoroutineCreator::default());
+ CoroutinePool {
+ state: Cell::new(PoolState::Running),
+ workers,
+ running: AtomicUsize::new(0),
+ pop_fail_times: AtomicUsize::new(0),
+ min_size: AtomicUsize::new(min_size),
+ max_size: AtomicUsize::new(max_size),
+ task_queue: BeanFactory::get_or_default::>>(
+ crate::common::constants::TASK_GLOBAL_QUEUE_BEAN,
+ )
+ .local_queue(),
+ keep_alive_time: AtomicU64::new(keep_alive_time),
+ blocker: Arc::default(),
+ results: DashMap::new(),
+ waits: DashMap::default(),
+ }
+ }
+
+ /// Set the minimum coroutine number in this pool.
+ pub fn set_min_size(&self, min_size: usize) {
+ self.min_size.store(min_size, Ordering::Release);
+ }
+
+ /// Get the minimum coroutine number in this pool
+ pub fn get_min_size(&self) -> usize {
+ self.min_size.load(Ordering::Acquire)
+ }
+
+ /// Gets the number of coroutines currently running in this pool.
+ pub fn get_running_size(&self) -> usize {
+ self.running.load(Ordering::Acquire)
+ }
+
+ /// Set the maximum coroutine number in this pool.
+ pub fn set_max_size(&self, max_size: usize) {
+ self.max_size.store(max_size, Ordering::Release);
+ }
+
+ /// Get the maximum coroutine number in this pool.
+ pub fn get_max_size(&self) -> usize {
+ self.max_size.load(Ordering::Acquire)
+ }
+
+ /// Set the maximum idle time running in this pool.
+ /// `keep_alive_time` has `ns` units.
+ pub fn set_keep_alive_time(&self, keep_alive_time: u64) {
+ self.keep_alive_time
+ .store(keep_alive_time, Ordering::Release);
+ }
+
+ /// Get the maximum idle time running in this pool.
+ /// Returns in `ns` units.
+ pub fn get_keep_alive_time(&self) -> u64 {
+ self.keep_alive_time.load(Ordering::Acquire)
+ }
+
+ /// Returns `true` if the task queue is empty.
+ pub fn is_empty(&self) -> bool {
+ self.size() == 0
+ }
+
+ /// Returns the number of tasks owned by this pool.
+ pub fn size(&self) -> usize {
+ self.task_queue.len()
+ }
+
+ /// Stop this coroutine pool.
+ pub fn stop(&mut self, dur: Duration) -> std::io::Result<()> {
+ match self.state() {
+ PoolState::Running => {
+ assert_eq!(PoolState::Running, self.stopping()?);
+ _ = self.try_timed_schedule_task(dur)?;
+ assert_eq!(PoolState::Stopping, self.stopped()?);
+ Ok(())
+ }
+ PoolState::Stopping => Err(Error::new(ErrorKind::Other, "should never happens")),
+ PoolState::Stopped => Ok(()),
+ }
+ }
+
+ /// Submit a new task to this pool.
+ ///
+ /// Allow multiple threads to concurrently submit task to the pool,
+ /// but only allow one thread to execute scheduling.
+ pub fn submit_task(
+ &self,
+ name: Option,
+ func: impl FnOnce(Option) -> Option + 'p,
+ param: Option,
+ ) -> std::io::Result {
+ match self.state() {
+ PoolState::Running => {}
+ PoolState::Stopping | PoolState::Stopped => {
+ return Err(Error::new(
+ ErrorKind::Other,
+ "The coroutine pool is stopping or stopped !",
+ ))
+ }
+ }
+ let name = name.unwrap_or(format!("{}@{}", self.name(), uuid::Uuid::new_v4()));
+ self.submit_raw_task(Task::new(name.clone(), func, param));
+ Ok(name)
+ }
+
+ /// Submit new task to this pool.
+ ///
+ /// Allow multiple threads to concurrently submit task to the pool,
+ /// but only allow one thread to execute scheduling.
+ pub(crate) fn submit_raw_task(&self, task: Task<'p>) {
+ self.task_queue.push_back(task);
+ self.blocker.notify();
+ }
+
+ /// Attempt to obtain task results with the given `task_name`.
+ pub fn try_get_task_result(&self, task_name: &str) -> Option, &'p str>> {
+ self.results.remove(task_name).map(|(_, r)| r)
+ }
+
+ /// Use the given `task_name` to obtain task results, and if no results are found,
+ /// block the current thread for `wait_time`.
+ ///
+ /// # Errors
+ /// if timeout
+ pub fn wait_task_result(
+ &self,
+ task_name: &str,
+ wait_time: Duration,
+ ) -> std::io::Result, &str>> {
+ let key = Box::leak(Box::from(task_name));
+ if let Some(r) = self.try_get_task_result(key) {
+ self.notify(key);
+ drop(self.waits.remove(key));
+ return Ok(r);
+ }
+ if SchedulableCoroutine::current().is_some() {
+ let timeout_time = get_timeout_time(wait_time);
+ loop {
+ _ = self.try_run();
+ if let Some(r) = self.try_get_task_result(key) {
+ return Ok(r);
+ }
+ if timeout_time.saturating_sub(now()) == 0 {
+ return Err(Error::new(ErrorKind::TimedOut, "wait timeout"));
+ }
+ }
+ }
+ let arc = if let Some(arc) = self.waits.get(key) {
+ arc.clone()
+ } else {
+ let arc = Arc::new((Mutex::new(true), Condvar::new()));
+ assert!(self.waits.insert(key, arc.clone()).is_none());
+ arc
+ };
+ let (lock, cvar) = &*arc;
+ drop(
+ cvar.wait_timeout_while(
+ lock.lock()
+ .map_err(|e| Error::new(ErrorKind::Other, format!("{e}")))?,
+ wait_time,
+ |&mut pending| pending,
+ )
+ .map_err(|e| Error::new(ErrorKind::Other, format!("{e}")))?,
+ );
+ if let Some(r) = self.try_get_task_result(key) {
+ self.notify(key);
+ assert!(self.waits.remove(key).is_some());
+ return Ok(r);
+ }
+ Err(Error::new(ErrorKind::TimedOut, "wait timeout"))
+ }
+
+ fn can_recycle(&self) -> bool {
+ match self.state() {
+ PoolState::Running => false,
+ PoolState::Stopping | PoolState::Stopped => true,
+ }
+ }
+
+ /// Try to create a coroutine in this pool.
+ ///
+ /// # Errors
+ /// if create failed.
+ fn try_grow(&self) -> std::io::Result<()> {
+ if self.task_queue.is_empty() {
+ // No task to run
+ trace!("The coroutine pool:{} has no task !", self.name());
+ return Ok(());
+ }
+ let create_time = now();
+ self.submit_co(
+ move |suspender, ()| {
+ loop {
+ let pool = Self::current().expect("current pool not found");
+ if pool.try_run().is_some() {
+ pool.reset_pop_fail_times();
+ continue;
+ }
+ let running = pool.get_running_size();
+ if now().saturating_sub(create_time) >= pool.get_keep_alive_time()
+ && running > pool.get_min_size()
+ || pool.can_recycle()
+ {
+ return None;
+ }
+ _ = pool.pop_fail_times.fetch_add(1, Ordering::Release);
+ match pool.pop_fail_times.load(Ordering::Acquire).cmp(&running) {
+ //让出CPU给下一个协程
+ std::cmp::Ordering::Less => suspender.suspend(),
+ //减少CPU在N个无任务的协程中空轮询
+ std::cmp::Ordering::Equal | std::cmp::Ordering::Greater => {
+ pool.blocker.clone().block(Duration::from_millis(1));
+ pool.reset_pop_fail_times();
+ }
+ }
+ }
+ },
+ None,
+ )
+ }
+
+ /// Try to create a coroutine in this pool.
+ ///
+ /// # Errors
+ /// if create failed.
+ pub fn submit_co(
+ &self,
+ f: impl FnOnce(&Suspender<(), ()>, ()) -> Option + 'static,
+ stack_size: Option,
+ ) -> std::io::Result<()> {
+ if self.get_running_size() >= self.get_max_size() {
+ trace!(
+ "The coroutine pool:{} has reached its maximum size !",
+ self.name()
+ );
+ return Err(Error::new(
+ ErrorKind::Other,
+ "The coroutine pool has reached its maximum size !",
+ ));
+ }
+ self.deref().submit_co(f, stack_size).map(|()| {
+ _ = self.running.fetch_add(1, Ordering::Release);
+ })
+ }
+
+ fn reset_pop_fail_times(&self) {
+ self.pop_fail_times.store(0, Ordering::Release);
+ }
+
+ fn try_run(&self) -> Option<()> {
+ self.task_queue.pop_front().map(|task| {
+ let (task_name, result) = task.run();
+ assert!(
+ self.results.insert(task_name.clone(), result).is_none(),
+ "The previous result was not retrieved in a timely manner"
+ );
+ self.notify(&task_name);
+ })
+ }
+
+ fn notify(&self, task_name: &str) {
+ if let Some(arc) = self.waits.get(task_name) {
+ let (lock, cvar) = &**arc;
+ let mut pending = lock.lock().expect("notify task failed");
+ *pending = false;
+ cvar.notify_one();
+ }
+ }
+
+ /// Schedule the tasks.
+ ///
+ /// Allow multiple threads to concurrently submit task to the pool,
+ /// but only allow one thread to execute scheduling.
+ ///
+ /// # Errors
+ /// see `try_timeout_schedule`.
+ pub fn try_schedule_task(&mut self) -> std::io::Result<()> {
+ self.try_timeout_schedule_task(u64::MAX).map(|_| ())
+ }
+
+ /// Try scheduling the tasks for up to `dur`.
+ ///
+ /// Allow multiple threads to concurrently submit task to the scheduler,
+ /// but only allow one thread to execute scheduling.
+ ///
+ /// # Errors
+ /// see `try_timeout_schedule`.
+ pub fn try_timed_schedule_task(&mut self, dur: Duration) -> std::io::Result {
+ self.try_timeout_schedule_task(get_timeout_time(dur))
+ }
+
+ /// Attempt to schedule the tasks before the `timeout_time` timestamp.
+ ///
+ /// Allow multiple threads to concurrently submit task to the scheduler,
+ /// but only allow one thread to execute scheduling.
+ ///
+ /// Returns the left time in ns.
+ ///
+ /// # Errors
+ /// if change to ready fails.
+ pub fn try_timeout_schedule_task(&mut self, timeout_time: u64) -> std::io::Result {
+ match self.state() {
+ PoolState::Running | PoolState::Stopping => {
+ drop(self.try_grow());
+ }
+ PoolState::Stopped => {
+ return Err(Error::new(
+ ErrorKind::Other,
+ "The coroutine pool is stopped !",
+ ))
+ }
+ }
+ Self::init_current(self);
+ let left_time = self.try_timeout_schedule(timeout_time);
+ Self::clean_current();
+ left_time
+ }
+}
diff --git a/core/src/co_pool/state.rs b/core/src/co_pool/state.rs
new file mode 100644
index 00000000..234448ed
--- /dev/null
+++ b/core/src/co_pool/state.rs
@@ -0,0 +1,45 @@
+use crate::co_pool::CoroutinePool;
+use crate::common::constants::PoolState;
+use std::io::{Error, ErrorKind};
+
+impl CoroutinePool<'_> {
+ /// running -> stopping
+ ///
+ /// # Errors
+ /// if change state fails.
+ pub(crate) fn stopping(&self) -> std::io::Result {
+ self.change_state(PoolState::Running, PoolState::Stopping)
+ }
+
+ /// stopping -> stopped
+ ///
+ /// # Errors
+ /// if change state fails.
+ pub(crate) fn stopped(&self) -> std::io::Result {
+ self.change_state(PoolState::Stopping, PoolState::Stopped)
+ }
+
+ /// Get the state of this coroutine.
+ pub fn state(&self) -> PoolState {
+ self.state.get()
+ }
+
+ fn change_state(
+ &self,
+ old_state: PoolState,
+ new_state: PoolState,
+ ) -> std::io::Result {
+ let current = self.state();
+ if current == new_state {
+ return Ok(old_state);
+ }
+ if current == old_state {
+ assert_eq!(old_state, self.state.replace(new_state));
+ return Ok(old_state);
+ }
+ Err(Error::new(
+ ErrorKind::Other,
+ format!("{} unexpected {current}->{:?}", self.name(), new_state),
+ ))
+ }
+}
diff --git a/core/src/co_pool/task.rs b/core/src/co_pool/task.rs
new file mode 100644
index 00000000..0553c749
--- /dev/null
+++ b/core/src/co_pool/task.rs
@@ -0,0 +1,79 @@
+use crate::catch;
+use derivative::Derivative;
+
+/// 做C兼容时会用到
+pub type UserTaskFunc = extern "C" fn(usize) -> usize;
+
+/// The task impls.
+#[repr(C)]
+#[derive(Derivative)]
+#[derivative(Debug)]
+pub struct Task<'t> {
+ name: String,
+ #[derivative(Debug = "ignore")]
+ func: Box) -> Option + 't>,
+ param: Option,
+}
+
+impl<'t> Task<'t> {
+ /// Create a new `Task` instance.
+ pub fn new(
+ name: String,
+ func: impl FnOnce(Option) -> Option + 't,
+ param: Option,
+ ) -> Self {
+ Task {
+ name,
+ func: Box::new(func),
+ param,
+ }
+ }
+
+ /// execute the task
+ ///
+ /// # Errors
+ /// if an exception occurred while executing this task.
+ pub fn run<'e>(self) -> (String, Result