Publish-Subscribe Event System
Every major class in Embedia publishes events at important points in its execution. These events are subscribed by two pre-defiend subscribers in Embedia, namely:
- an
stdout
printer that prints everything about the event in a color coded output - a sqlite3 backup that backs-up everything about the event to a sqlite3 db
located at
~/.embedia/backup.db
You can also create your own subscribers to do whatever you want with the data.
Currently the pubsub system is synchronous. We might make it asynchronous in the future.
Data sent during an event publish
While publishing an event, the below datapoints are sent to the Publish-Subscribe System. Every subscriber of that event will recieve all the 4 datapoints when that happens:
event_type
: One of theEvent
enum values (Learn more about: Event enum)id
: The instance id of the variable firing the event generated usingid(obj)
python functiontimestamp
: The current time with the timezonedata
: A dictionary containing information related to the event
Below we are describing the data
dict keys and value types for each event.
LLMStart
{
"prompt": str # The prompt that is being sent to the LLM
"prompt_tokens": int | None # Number of tokens in the prompt if a tokenizer is provided, else None
}
LLMEnd
{
"prompt": str # The prompt that is being sent to the LLM
"prompt_tokens": int | None # Number of tokens in the prompt if a tokenizer is provided, else None
"completion": str # The completion recieved from the LLM
"completion_tokens": int | None # Number of tokens in the completion if a tokenizer is provided, else None
}
ChatLLMInit
{
"system_role": str # MessageRole.system
"system_content": str # The system prompt
"system_tokens": int | None # Number of tokens in the system prompt if a tokenizer is provided, else None
}
ChatLLMStart
{
"msg_role": str # MessageRole.user
"msg_content": str # The message contents being sent to the LLM
"msg_tokens": int | None # Number of tokens in the message contents if a tokenizer is provided, else None
}
ChatLLMEnd
{
"msg_role": str # MessageRole.user
"msg_content": str # The message contents being sent to the LLM
"msg_tokens": int | None # Number of tokens in the message contents if a tokenizer is provided, else None
"reply_role": str # MessageRole.assistant
"reply_content": str # The reply message contents recieved from the LLM
"reply_tokens": int | None # Number of tokens in the reply message contents if a tokenizer is provided, else None
}
ToolStart
{
"name": str # The classname of the tool being run generated using `self.__class__.__name__`
"args": tuple # The arguments passed to the tool
"kwargs": dict # The keyword arguments passed to the tool
}
ToolEnd
{
"name": str # The classname of the tool being run generated using `self.__class__.__name__`
"args": tuple # The arguments passed to the tool
"kwargs": dict # The keyword arguments passed to the tool
"tool_output": Any # The output of the tool
"tool_exit_code": int # The exit code of the tool (0 if successful, 1 if failed)
}
EmbeddingStart
{
"input": Union[List[Any], str] # The input text to the embedding model
}
EmbeddingEnd
{
"input": Union[List[Any], str] # The input text to the embedding model
"embedding": List[Any] # The embedding created by the embedding model
}
AgentStart
{
"question": str # The main question asked by the user
}
AgentStep
{
"question": str # Question the agent is trying to answer in this step
"tool": str # The tool name chosen by the agent for answering this question
"tool_args": dict # The keyword arguments chosen to be passed to the tool
"tool_output": Any # The output of the tool
"tool_exit_code": int # The exit code of the tool (0 if successful, 1 if failed)
}
AgentEnd
{
"question": str # The main question asked by the user
"answer": str # The final answer found by the agent
}
AgentTimeout
{
"step_history": List[dict] # A list of every step (serialized) taken by the agent
"duration": str # The number of seconds the agent ran for
"num_steps": int # The number of steps the agent took
}
Subscribing to an Event
You can create your own event subscribers in case you want to execute some code
whenever a certain event occurs. To do so, you can use the subscribe_event
function.
Lets create a custom subscriber for the ChatLLMEnd
event. Lets say you want to
track the number of tokens that have been consumed (if you are using a third
party service like OpenAI) to keep a track of your costs.
from embedia import Event, subscribe_event
total_tokens = 0
def track_cost(event_type: Event, id: int, timestamp: str, data: Optional[dict] = None) -> None:
total_tokens += data["msg_tokens"] + data["reply_tokens"]
total_cost = total_tokens * 0.000002
print(f"Approximate bill so far: {total_cost} USD")
subscribe_event(Event.ChatLLMEnd, track_cost)
Note that this is just an example. Please consider the actual costs of the service you are using and modify the code accordingly.
Running the above code before you use any ChatLLM
, will add the track_cost
function to the list of subscribers for the ChatLLMEnd
event. Hence, every
time the ChatLLMEnd
event is published, the track_cost
function will be
called with the data sent by the event publisher. The track_cost
function will
then calculate and print the approximate bill so far.