We use cookies on our website.
Docs
API Reference
Utils
Publish-Subscribe Event System

Publish-Subscribe Event System

Every major class in Embedia publishes events at important points in its execution. These events are subscribed by two pre-defiend subscribers in Embedia, namely:

  • an stdout printer that prints everything about the event in a color coded output
  • a sqlite3 backup that backs-up everything about the event to a sqlite3 db located at ~/.embedia/backup.db

You can also create your own subscribers to do whatever you want with the data.

ℹ️

Currently the pubsub system is synchronous. We might make it asynchronous in the future.

Data sent during an event publish

While publishing an event, the below datapoints are sent to the Publish-Subscribe System. Every subscriber of that event will recieve all the 4 datapoints when that happens:

  • event_type: One of the Event enum values (Learn more about: Event enum)
  • id: The instance id of the variable firing the event generated using id(obj) python function
  • timestamp: The current time with the timezone
  • data: A dictionary containing information related to the event

Below we are describing the data dict keys and value types for each event.

  • LLMStart
{
  "prompt": str  # The prompt that is being sent to the LLM
  "prompt_tokens": int | None  # Number of tokens in the prompt if a tokenizer is provided, else None
}
  • LLMEnd
{
  "prompt": str  # The prompt that is being sent to the LLM
  "prompt_tokens": int | None  # Number of tokens in the prompt if a tokenizer is provided, else None
  "completion": str  # The completion recieved from the LLM
  "completion_tokens": int | None  # Number of tokens in the completion if a tokenizer is provided, else None
}
  • ChatLLMInit
{
  "system_role": str  # MessageRole.system
  "system_content": str  # The system prompt
  "system_tokens": int | None  # Number of tokens in the system prompt if a tokenizer is provided, else None
}
  • ChatLLMStart
{
  "msg_role": str  # MessageRole.user
  "msg_content": str  # The message contents being sent to the LLM
  "msg_tokens": int | None  # Number of tokens in the message contents if a tokenizer is provided, else None
}
  • ChatLLMEnd
{
  "msg_role": str  # MessageRole.user
  "msg_content": str  # The message contents being sent to the LLM
  "msg_tokens": int | None  # Number of tokens in the message contents if a tokenizer is provided, else None
  "reply_role": str  # MessageRole.assistant
  "reply_content": str  # The reply message contents recieved from the LLM
  "reply_tokens": int | None  # Number of tokens in the reply message contents if a tokenizer is provided, else None
}
  • ToolStart
{
  "name": str  # The classname of the tool being run generated using `self.__class__.__name__`
  "args": tuple  # The arguments passed to the tool
  "kwargs": dict  # The keyword arguments passed to the tool
}
  • ToolEnd
{
  "name": str  # The classname of the tool being run generated using `self.__class__.__name__`
  "args": tuple  # The arguments passed to the tool
  "kwargs": dict  # The keyword arguments passed to the tool
  "tool_output": Any  # The output of the tool
  "tool_exit_code": int  # The exit code of the tool (0 if successful, 1 if failed)
}
  • EmbeddingStart
{
  "input": Union[List[Any], str]  # The input text to the embedding model
}
  • EmbeddingEnd
{
  "input": Union[List[Any], str]  # The input text to the embedding model
  "embedding": List[Any]  # The embedding created by the embedding model
}
  • AgentStart
{
  "question": str  # The main question asked by the user
}
  • AgentStep
{
  "question": str  # Question the agent is trying to answer in this step
  "tool": str  # The tool name chosen by the agent for answering this question
  "tool_args": dict  # The keyword arguments chosen to be passed to the tool
  "tool_output": Any  # The output of the tool
  "tool_exit_code": int  # The exit code of the tool (0 if successful, 1 if failed)
}
  • AgentEnd
{
  "question": str  # The main question asked by the user
  "answer": str  # The final answer found by the agent
}
  • AgentTimeout
{
  "step_history": List[dict]  # A list of every step (serialized) taken by the agent
  "duration": str  # The number of seconds the agent ran for
  "num_steps": int  # The number of steps the agent took
}

Subscribing to an Event

You can create your own event subscribers in case you want to execute some code whenever a certain event occurs. To do so, you can use the subscribe_event function.

Lets create a custom subscriber for the ChatLLMEnd event. Lets say you want to track the number of tokens that have been consumed (if you are using a third party service like OpenAI) to keep a track of your costs.

from embedia import Event, subscribe_event
 
total_tokens = 0
 
def track_cost(event_type: Event, id: int, timestamp: str, data: Optional[dict] = None) -> None:
    total_tokens += data["msg_tokens"] + data["reply_tokens"]
    total_cost = total_tokens * 0.000002
    print(f"Approximate bill so far: {total_cost} USD")
 
subscribe_event(Event.ChatLLMEnd, track_cost)
ℹ️

Note that this is just an example. Please consider the actual costs of the service you are using and modify the code accordingly.

Running the above code before you use any ChatLLM, will add the track_cost function to the list of subscribers for the ChatLLMEnd event. Hence, every time the ChatLLMEnd event is published, the track_cost function will be called with the data sent by the event publisher. The track_cost function will then calculate and print the approximate bill so far.