安装dora-v0.3.2
export DORA_VERSION=v0.3.2 # Check for the latest release
export ARCHITECTURE=$(uname -m)
wget <https://github.com/dora-rs/dora/releases/download/${DORA_VERSION}/dora-${DORA_VERSION}-${ARCHITECTURE}-Linux.zip>
unzip dora-${DORA_VERSION}-${ARCHITECTURE}-Linux.zip
pip install dora-rs==${DORA_VERSION}
PATH=$PATH:$(pwd)
dora --help
cargo install dora-cli # In case of issues, you can try to add `--locked`
pip install dora-rs
实验二:基于Dora创建基于数据流的应用程序
参考https://dora-rs.ai/docs/guides/getting-started/conversation_py 创建一个基于rust语言的Dora项目,并达到与教程里的python代码一样的效果.
# 创建rust dora项目
dora new conversation_rs --kind dataflow --lang rust
# 新建名为talker的node
dora new --kind custom-node talker
# 初始化dora执行环境(生成daemon和coordinator)
dora up
# 启动一个命名为conversation的dataflow
dora start dataflow.yml --name conversation
# 查看名为conversation中listener node的logs
dora logs conversation listener
.
├── Cargo.lock
├── Cargo.toml
├── dataflow.yml
├── listener
│ ├── Cargo.toml
│ └── src
│ └── main.rs
└── talker
├── Cargo.toml
└── src
└── main.rs
4 directories, 7 files
// in ./listener/src/main.rs
use dora_node_api::{arrow::array::{Array, StringArray}, DoraNode, Event};
use std::error::Error;
fn main() -> Result<(), Box<dyn Error>> {
let (_node, mut events) = DoraNode::init_from_env()?;
while let Some(event) = events.recv() {
match event {
Event::Input {
id,
metadata: _,
data,
} => match id.as_str() {
_other => {
// data = ArrowData(StringArray["Hello World",])
let output = data.0.as_any(); // fetch dyn Any
// downcast to StringArray
if let Some(res) = output.downcast_ref::<StringArray>() {
eprintln!("Received input {}", res.value(0)); // get first element
}
},
},
_ => {}
}
}
Ok(())
}
// ./talker/src/main.rs
use dora_node_api::{dora_core::config::DataId, DoraNode, Event, IntoArrow, MetadataParameters};
use std::error::Error;
fn main() -> Result<(), Box<dyn Error>> {
let (mut node, mut events) = DoraNode::init_from_env()?;
while let Some(event) = events.recv() {
match event {
Event::Input {
id,
metadata: _,
data: _,
} => match id.as_str() {
other => {
eprintln!("Received input `{other}`");
// set id(should be equivalent to the outputs set in dataflow.yml)
let output_id = DataId::from("speech".to_owned());
// set parameter to default
let parameter = MetadataParameters::default();
// using into_arrow to get ArrowData
let str = "Hello World".into_arrow();
// send out ArrowData by send_output
node.send_output(output_id, parameter, str)?;
},
},
_ => {}
}
}
Ok(())
}
nodes:
- id: talker
custom:
source: ./target/debug/talker
build: cargo build -p talker
inputs:
tick: dora/timer/secs/1
outputs:
- speech
- id: listener
custom:
source: ./target/debug/listener
build: cargo build -p listener
inputs:
speech: talker/speech
build
参数无法自动构建,目前需手动构建后方可执行dora start
实验三:发送结构化的数据
修改上面建立的dora应用项目,让talker节点发送如下的
struct { name: String, age: u8 }
结构体数据,并让listener监听数据,打印出