Skip to content

Commit

Permalink
fixed issues with async execution of python methods
Browse files Browse the repository at this point in the history
  • Loading branch information
kvey committed Sep 4, 2024
1 parent ec9bab5 commit bf7cf65
Show file tree
Hide file tree
Showing 27 changed files with 212 additions and 262 deletions.
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.11.9
3.12
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ brew install \
protobuf \
# We are investigating if this is necessary or can be removed
libiconv \
python@3.11 \
python@3.12 \
# Chidori uses uv for handling python dependencies
uv

Expand Down
4 changes: 0 additions & 4 deletions toolchain/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion toolchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ members = [
"chidori-prompt-format",
"chidori-static-analysis",
"chidori-debugger",
"chidori-tsne",
]
resolver = "2"

Expand Down
1 change: 0 additions & 1 deletion toolchain/book_src/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ Chidori consists of the following crates:
- `chidori-debugger` contains a UI for visualizing and debugging Chidori executed programs.
- `chidori-prompt-format` implements handlebars-like templating with support for tracing composition
- `chidori-static-analysis` implements our parsing and extraction of control-flow from Python and TypeScript source code
- `chidori-tsne` (IGNORE) - not yet implemented, in the future we'd like to add support for visualizing embeddings within our debugger


### Chidori Core
Expand Down
2 changes: 1 addition & 1 deletion toolchain/book_src/COMMON_ERRORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
## Error: `ld: library 'python3.12' not found`

Solution:
Set PYO3_PYTHON=python3.11 when building chidori-debugger to your currently installed python version.
Set PYO3_PYTHON=python3.12 when building chidori-debugger to your currently installed python version.



2 changes: 1 addition & 1 deletion toolchain/chidori-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,4 @@ anyhow = "1.0.82"


[package.metadata.pyo3]
python = "/opt/homebrew/bin/python3.11"
python = "/opt/homebrew/bin/python3.12"
6 changes: 3 additions & 3 deletions toolchain/chidori-core/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ fn add_extension_module_link_args(triple: &Triple) -> io::Result<()> {
writeln!(writer, "cargo:rustc-cdylib-link-arg=-undefined")?;
writeln!(writer, "cargo:rustc-cdylib-link-arg=dynamic_lookup")?;
writeln!(writer, "cargo:rustc-link-search=native=/opt/homebrew/Cellar/libiconv/1.17/lib")?;
writeln!(writer, "cargo:rustc-link-search=native=/opt/homebrew/Cellar/python@3.11/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib")?;
writeln!(writer, "cargo:rustc-link-lib=dylib=python3.11")?;
println!("cargo:warning=Linking against Python 3.11");
writeln!(writer, "cargo:rustc-link-search=native=/opt/homebrew/Cellar/python@3.12/3.12.5/Frameworks/Python.framework/Versions/3.12/lib")?;
writeln!(writer, "cargo:rustc-link-lib=dylib=python3.12")?;
println!("cargo:warning=Linking against Python 3.12");

// Assuming the toolchain directory is part of the RUSTUP_HOME environment variable
let home_directory = home_dir().expect("Could not find the home directory");
Expand Down
37 changes: 36 additions & 1 deletion toolchain/chidori-core/examples/core10_concurrency/core.md
Original file line number Diff line number Diff line change
@@ -1 +1,36 @@
# Explaining and demonstrating our concurrency support
# Explaining and demonstrating our concurrency support

This is a python function that is invoking a prompt by name, kwargs to this
invocation are passed to the prompt. Prompts are async and return strings.
```python (run_prompt_cell)
async def first_letter(s):
return s.replace("-", "").strip()[0]

async def run_prompt(number_of_states):
out = ""
for state in (await get_states_first_letters(num=number_of_states)).split('\n'):
out += await first_letter(state)
return "demo" + out
```

This is the prompt itself. The cell name is used to refer to the prompt output when it is satisfied
by globally available values. The fn key is used to name the prompt in the context of a function invocation.
```prompt (states)
---
model: gpt-3.5-turbo
fn: get_states_first_letters
---
List the first {{num}} US states to be added to the union.
Return this as a `-` bulleted list with the name of the state on each line.
```

A unit test demonstrates the invocation of the prompt by the function.
```python (entry)
import unittest

class TestMarshalledValues(unittest.IsolatedAsyncioTestCase):
async def test_run_prompt(self):
self.assertEqual(await run_prompt(5), "demoDPNGC")

unittest.TextTestRunner().run(unittest.TestLoader().loadTestsFromTestCase(TestMarshalledValues))
```
20 changes: 20 additions & 0 deletions toolchain/chidori-core/examples/core2_marshalling/core.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ Chidori.assertEq(x6, [1, 2, 3]);

// TODO: marshalling of functions is not currently supported
// Chidori.assertEq(typeof x9, "function");


// These will appear in the UI
const jsX0 = x0;
const jsX1 = x1;
const jsX2 = x2;
const jsX3 = x3;
const jsX4 = x4;
const jsX5 = x5;
const jsX6 = x6;
```


Expand Down Expand Up @@ -83,5 +93,15 @@ class TestMarshalledValues(unittest.TestCase):
self.assertEqual(y6, [1,2,3])

unittest.TextTestRunner().run(unittest.TestLoader().loadTestsFromTestCase(TestMarshalledValues))


# These will appear in the UI
pyY0 = y0
pyY1 = y1
pyY2 = y2
pyY3 = y3
pyY4 = y4
pyY5 = y5
pyY6 = y6
```

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { assertEquals } from "https://deno.land/std@0.221.0/assert/mod.ts";
Deno.test("async addition test", async () => {
assertEquals(await add_two(2), 4);
});

const resultJs = await add_two(2)
```

### Demonstrates defining a function in javascript and calling it in python
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ Return this as a `-` bulleted list with the name of the state on each line.

A unit test demonstrates the invocation of the prompt by the function.
```python (entry)
import unittest
result = await run_prompt(5)
```

class TestMarshalledValues(unittest.IsolatedAsyncioTestCase):
async def test_run_prompt(self):
self.assertEqual(await run_prompt(5), "demoDPNGC")

unittest.TextTestRunner().run(unittest.TestLoader().loadTestsFromTestCase(TestMarshalledValues))
```
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ def add_two_numbers(a, b):
```


```prompt (add_population)
```prompt (declare_add_population)
---
fn: add_population
model: gpt-3.5-turbo
fn: add_population
import:
- add_two_numbers
---
Expand Down
2 changes: 1 addition & 1 deletion toolchain/chidori-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = [{ name = "Colton Pierson", email = "colton@thousandbirds.ai" }]
dependencies = [
]
readme = "README.md"
requires-python = "== 3.11"
requires-python = "== 3.12"

[tool.rye]
managed = true
Expand Down
34 changes: 13 additions & 21 deletions toolchain/chidori-core/src/cells/code_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,19 @@ pub fn code_cell(execution_state_id: ExecutionNodeId, cell: &CodeCell, range: &T
let s = s.clone();
let cell = cell.clone();
async move {
let result = tokio::task::spawn_blocking(move || {
let runtime = tokio::runtime::Runtime::new().unwrap();
let result = runtime.block_on(crate::library::std::code::runtime_deno::source_code_run_deno(
&s,
&cell.source_code,
&x,
&cell.function_invocation,
));
match result {
Ok(v) =>
Ok(OperationFnOutput {
has_error: false,
execution_state: None,
output: Ok(v.0),
stdout: v.1,
stderr: v.2,
}),
Err(e) => panic!("{:?}", e),
}
}).await.unwrap();
result
let result = crate::library::std::code::runtime_deno::source_code_run_deno(
&s,
&cell.source_code,
&x,
&cell.function_invocation,
).await?;
Ok(OperationFnOutput {
has_error: false,
execution_state: None,
output: result.0,
stdout: result.1,
stderr: result.2,
})
}.boxed()
}),
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl ExecutionGraph {
_ = tokio::time::sleep(Duration::from_millis(10)) => {
match receiver.try_recv() {
Ok((resulting_execution_state, oneshot)) => {
println!("==== Received dispatch event {:?}", resulting_execution_state);
println!("==== Execution Graph received dispatch event {:?}", resulting_execution_state);

let s = resulting_execution_state.clone();
match &resulting_execution_state {
Expand Down Expand Up @@ -366,7 +366,7 @@ impl ExecutionGraph {
ExecutionStateEvaluation::Error(_) => unreachable!("Cannot get state from a future state"),
ExecutionStateEvaluation::EvalFailure(_) => unreachable!("Cannot get state from a future state"),
};
println!("Inserting into graph {:?}", &resulting_state_id);
println!("Resulting state received from progress_graph {:?}", &resulting_state_id);
// TODO: if state already exists how to handle
state_id_to_state.deref_mut().insert(resulting_state_id.clone(), new_state.clone());
execution_graph.deref_mut()
Expand All @@ -390,11 +390,10 @@ impl ExecutionGraph {
(ExecutionNodeId, ExecutionStateEvaluation), // the resulting total state of this step
Vec<(usize, OperationFnOutput)>, // values emitted by operations during this step
)> {
println!("step_execution_with_previous_state");
let previous_state = match previous_state {
ExecutionStateEvaluation::Complete(state) => state,
ExecutionStateEvaluation::Executing(..) => panic!("Cannot step an execution state that is still executing"),
ExecutionStateEvaluation::Error(_) => unreachable!("Cannot get state from a future state"),
ExecutionStateEvaluation::EvalFailure(_) => unreachable!("Cannot get state from a future state"),
_ => { panic!("Stepping execution should only occur against completed states") }
};
let eval_state = previous_state.determine_next_operation()?;
let (new_state, outputs) = previous_state.step_execution(eval_state).await?;
Expand Down Expand Up @@ -444,6 +443,7 @@ impl ExecutionGraph {
(ExecutionNodeId, ExecutionStateEvaluation), // the resulting total state of this step
Vec<(usize, OperationFnOutput)>, // values emitted by operations during this step
)> {
println!("external_step_execution");
let state = self.get_state_at_id(prev_execution_id);
if let Some(state) = state {
self.step_execution_with_previous_state(&state).await
Expand Down
18 changes: 11 additions & 7 deletions toolchain/chidori-core/src/execution/execution/execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub enum ExecutionStateErrors {
CellExecutionUnexpectedFailure(ExecutionNodeId, String),
#[error("unknown execution state error")]
Unknown(String),
#[error("anyhow error")]
#[error("Anyhow Error: {0}")]
AnyhowError(String),
}

Expand Down Expand Up @@ -131,7 +131,7 @@ impl Debug for ExecutionStateEvaluation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ExecutionStateEvaluation::Complete(ref state) => f.debug_tuple("Complete").field(state).finish(),
ExecutionStateEvaluation::Executing(..) => f.debug_tuple("Executing").field(&format!("Future state evaluating")).finish(),
ExecutionStateEvaluation::Executing(ref state) => f.debug_tuple("Executing").field(state).finish(),
ExecutionStateEvaluation::Error(_) => unreachable!("Cannot get state from a future state"),
ExecutionStateEvaluation::EvalFailure(_) => unreachable!("Cannot get state from a future state"),
}
Expand Down Expand Up @@ -187,6 +187,9 @@ pub struct ExecutionState {
/// Map of operation_id -> OperationNode definition
pub operation_by_id: ImHashMap<OperationId, Arc<Mutex<OperationNode>>>,

/// Map of operation_id -> Cell definition
pub cells_by_id: ImHashMap<OperationId, CellTypes>,

/// This is a mapping of function names to operation ids. Function calls are dispatched to the associated
/// OperationId that they are initialized by. When a function is invoked, it is dispatched to the operation
/// node that initialized it where we re-use that OperationNode's runtime in order to invoke the function.
Expand Down Expand Up @@ -225,8 +228,8 @@ fn render_map_as_table(exec_state: &ExecutionState) -> String {
|---|---|"));
for key in exec_state.state.keys() {
if let Some(val) = exec_state.state_get(key) {
table.push_str(&format!(indoc!(r"
| {} | {:?} |" ), key, val));
table.push_str(&format!(indoc!(r"| {} | {:?} |" ), key, val));
table.push_str("\n");
}
}
table.push_str("\n");
Expand Down Expand Up @@ -255,7 +258,7 @@ async fn pause_future_with_oneshot(execution_state_evaluation: ExecutionStateEva
}
// let recv = oneshot_receiver.await.expect("Failed to receive oneshot signal");
}
println!("Continuing from oneshot signal");
println!("============= should resume =============");
RkyvSerializedValue::Null
};
sender.send((execution_state_evaluation, Some(oneshot_sender))).await.expect("Failed to send oneshot signal to the graph receiver");
Expand All @@ -281,6 +284,7 @@ impl Default for ExecutionState {
fresh_values: Default::default(),
operation_name_to_id: Default::default(),
operation_by_id: Default::default(),
cells_by_id: Default::default(),
function_name_to_metadata: Default::default(),
has_been_set: Default::default(),
dependency_map: Default::default(),
Expand Down Expand Up @@ -537,7 +541,7 @@ impl ExecutionState {
})
};
operation_node.id = op_id;

s.cells_by_id.insert(op_id, operation_node.cell.clone());
s.operation_by_id.insert(op_id, Arc::new(Mutex::new(operation_node)));
s.update_callable_functions();
s.exec_queue.push_back(op_id);
Expand Down Expand Up @@ -836,7 +840,7 @@ impl ExecutionState {
let operation_count = self.operation_by_id.keys().count();
let mut count_loops = 0;
loop {
println!("looping {:?}", self.exec_queue);
println!("looping {:?} {:?}", self.exec_queue, count_loops);
count_loops += 1;
if count_loops >= operation_count * 2 {
return Err(Error::msg("Looped through all operations without detecting an execution"));
Expand Down
Loading

0 comments on commit bf7cf65

Please sign in to comment.