FET Image Agent Payment Protocol Example
This example showcases a paid image generation service where users pay with native FET tokens directly on the Fetch.ai blockchain. The agent verifies on-chain transactions before processing image requests with the ASI1 One API and delivers results through the chat protocol. This implementation demonstrates direct blockchain payment integration without third-party payment processors.
What it demonstrates
- On-chain FET payment verification using Fetch.ai ledger queries.
- Seller role implementation in the payment protocol workflow.
- Integration with ASI1 One API for image generation.
- Automatic image delivery as markdown image links via tmpfiles.org.
- Robust error handling and one retry without re-payment on failure.
Project Structure
Key files used in this example:
agent.py– Agent setup; includes chat and payment protocolschat_proto.py– Chat protocol and message handlerspayment.py– Seller-side payment logic (request, verify FET payment, generate image)client.py– Gemini Imagen image generation and tmpfiles.org uploadshared.py– Shared utilities for creating chat messages
Core Dependencies
The payment protocol requires these imports:
from uagents_core.contrib.protocols.payment import (
Funds,
RequestPayment,
RejectPayment,
CommitPayment,
CancelPayment,
CompletePayment,
payment_protocol_spec,
)
Additionally, for blockchain verification:
from cosmpy.aerial.client import LedgerClient, NetworkConfig
On-Chain Payment Verification
Unlike token-based payment systems, this example performs direct blockchain verification by querying transaction events on the Fetch.ai network:
def verify_fet_payment_to_agent(
transaction_id: str,
expected_amount_fet: str,
sender_fet_address: str,
recipient_agent_wallet,
logger,
use_mainnet: bool = False,
) -> bool:
"""Verify FET payment transaction on Fetch.ai network."""
try:
from cosmpy.aerial.client import LedgerClient, NetworkConfig
testnet = os.getenv("FET_USE_TESTNET", "true").lower() == "true" if not use_mainnet else False
network_config = (
NetworkConfig.fetchai_stable_testnet()
if testnet
else NetworkConfig.fetchai_mainnet()
)
ledger = LedgerClient(network_config)
expected_amount_micro = int(float(expected_amount_fet) * 10**18)
if not recipient_agent_wallet:
logger.error("Recipient agent wallet is not set")
return False
expected_recipient = str(recipient_agent_wallet.address())
logger.info(
f"Verifying payment of {expected_amount_fet} FET from {sender_fet_address} "
f"to {expected_recipient} on {'testnet' if testnet else 'mainnet'}"
)
tx_response = ledger.query_tx(transaction_id)
if not tx_response.is_successful():
logger.error(f"Transaction {transaction_id} was not successful")
return False
recipient_found = False
amount_found = False
sender_found = False
denom = "atestfet" if testnet else "afet"
for event_type, event_attrs in tx_response.events.items():
if event_type == "transfer":
if event_attrs.get("recipient") == expected_recipient:
recipient_found = True
if event_attrs.get("sender") == sender_fet_address:
sender_found = True
amount_str = event_attrs.get("amount", "")
if amount_str and amount_str.endswith(denom):
try:
amount_value = int(amount_str.replace(denom, ""))
if amount_value >= expected_amount_micro:
amount_found = True
except Exception:
pass
if recipient_found and amount_found and sender_found:
logger.info(f"Payment verified: {transaction_id}")
return True
logger.error(
f"Payment verification failed - recipient: {recipient_found}, "
f"amount: {amount_found}, sender: {sender_found}"
)
return False
except Exception as e:
logger.error(f"FET payment verification failed: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return False
Payment Protocol and Image Generation
The payment protocol uses FET_FUNDS, set_agent_wallet, request_payment_from_user, and handles CommitPayment / RejectPayment. After a verified payment, generate_response_after_payment calls the ASI1 One image API and process_api_result delivers image/text output to chat:
"""
Payment protocol for ASI1 One LLM API agent.
"""
import os
import traceback
from datetime import datetime, timezone
from uuid import uuid4
from uagents import Context, Protocol
from uagents_core.contrib.protocols.payment import (
CancelPayment,
CommitPayment,
CompletePayment,
Funds,
RejectPayment,
RequestPayment,
payment_protocol_spec,
)
from uagents_core.contrib.protocols.chat import (
ChatMessage as AvChatMessage,
TextContent,
)
from shared import create_text_chat
_agent_wallet = None
def set_agent_wallet(wallet):
global _agent_wallet
_agent_wallet = wallet
payment_proto = Protocol(spec=payment_protocol_spec, role="seller")
FET_FUNDS = Funds(currency="FET", amount="0.1", payment_method="fet_direct")
ACCEPTED_FUNDS = [FET_FUNDS]
def verify_fet_payment_to_agent(
transaction_id: str,
expected_amount_fet: str,
sender_fet_address: str,
recipient_agent_wallet,
logger,
use_mainnet: bool = False,
) -> bool:
"""Verify FET payment transaction on Fetch.ai network."""
try:
from cosmpy.aerial.client import LedgerClient, NetworkConfig
testnet = os.getenv("FET_USE_TESTNET", "true").lower() == "true" if not use_mainnet else False
network_config = (
NetworkConfig.fetchai_stable_testnet()
if testnet
else NetworkConfig.fetchai_mainnet()
)
ledger = LedgerClient(network_config)
expected_amount_micro = int(float(expected_amount_fet) * 10**18)
if not recipient_agent_wallet:
logger.error("Recipient agent wallet is not set")
return False
expected_recipient = str(recipient_agent_wallet.address())
logger.info(
f"Verifying payment of {expected_amount_fet} FET from {sender_fet_address} "
f"to {expected_recipient} on {'testnet' if testnet else 'mainnet'}"
)
tx_response = ledger.query_tx(transaction_id)
if not tx_response.is_successful():
logger.error(f"Transaction {transaction_id} was not successful")
return False
recipient_found = False
amount_found = False
sender_found = False
denom = "atestfet" if testnet else "afet"
for event_type, event_attrs in tx_response.events.items():
if event_type == "transfer":
if event_attrs.get("recipient") == expected_recipient:
recipient_found = True
if event_attrs.get("sender") == sender_fet_address:
sender_found = True
amount_str = event_attrs.get("amount", "")
if amount_str and amount_str.endswith(denom):
try:
amount_value = int(amount_str.replace(denom, ""))
if amount_value >= expected_amount_micro:
amount_found = True
except Exception:
pass
if recipient_found and amount_found and sender_found:
logger.info(f"Payment verified: {transaction_id}")
return True
logger.error(
f"Payment verification failed - recipient: {recipient_found}, "
f"amount: {amount_found}, sender: {sender_found}"
)
return False
except Exception as e:
logger.error(f"FET payment verification failed: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
return False
async def request_payment_from_user(
ctx: Context, user_address: str, description: str | None = None
):
session = str(ctx.session)
accepted_funds: list[Funds] = []
fet_amount = os.getenv("FIXED_FET_AMOUNT", "0.1")
accepted_funds.append(
Funds(currency="FET", amount=str(fet_amount), payment_method="fet_direct")
)
if not accepted_funds:
ctx.logger.warning(
f"[payment] no accepted_funds; cannot send RequestPayment user={user_address} session={session}"
)
await ctx.send(
user_address,
AvChatMessage(
content=[
TextContent(
type="text",
text="No payment methods are currently available. Please try again in a moment.",
)
]
),
)
return
metadata: dict[str, str] = {
"agent": "asi1-llm-agent",
"service": "llm_processing",
}
use_testnet = os.getenv("FET_USE_TESTNET", "true").lower() == "true"
fet_network = "stable-testnet" if use_testnet else "mainnet"
metadata["fet_network"] = fet_network
metadata["mainnet"] = "false" if use_testnet else "true"
if _agent_wallet:
metadata["provider_agent_wallet"] = str(_agent_wallet.address())
if description:
metadata["content"] = description
else:
metadata["content"] = (
"Please complete the payment to proceed. "
"After payment, I will process your request using ASI1 One LLM."
)
recipient_addr = str(_agent_wallet.address()) if _agent_wallet else str(ctx.agent.address)
funds_log = [
{"method": f.payment_method, "currency": f.currency, "amount": f.amount}
for f in accepted_funds
]
ctx.logger.info(
f"[payment] outbound RequestPayment user={user_address} session={session} "
f"funds={funds_log} metadata={metadata} deadline_seconds=300"
)
payment_request = RequestPayment(
accepted_funds=accepted_funds,
recipient=recipient_addr,
deadline_seconds=300,
reference=session,
description=description
or "ASI1 One LLM: after payment, I will process your request",
metadata=metadata,
)
await ctx.send(user_address, payment_request)
ctx.logger.info(
f"[payment] RequestPayment sent to {user_address} with recipient {recipient_addr}"
)
def _allow_retry(ctx: Context, sender: str, session_id: str) -> bool:
retry_key = f"{sender}:{session_id}:retry_count"
try:
current = int(ctx.storage.get(retry_key) or 0)
except Exception:
current = 0
if current >= 1:
return False
ctx.storage.set(retry_key, current + 1)
ctx.storage.set(f"{sender}:{session_id}:awaiting_prompt", True)
ctx.storage.set(f"{sender}:{session_id}:verified_payment", True)
return True
@payment_proto.on_message(CommitPayment)
async def handle_commit_payment(ctx: Context, sender: str, msg: CommitPayment):
ctx.logger.info(f"Received payment commitment from {sender}")
payment_verified = False
if msg.funds.payment_method == "fet_direct" and msg.funds.currency == "FET":
try:
buyer_fet_wallet = None
if isinstance(msg.metadata, dict):
buyer_fet_wallet = msg.metadata.get("buyer_fet_wallet") or msg.metadata.get(
"buyer_fet_address"
)
if not buyer_fet_wallet:
ctx.logger.error("Missing buyer_fet_wallet in CommitPayment.metadata")
else:
use_testnet = os.getenv("FET_USE_TESTNET", "true").lower() == "true"
payment_verified = verify_fet_payment_to_agent(
transaction_id=msg.transaction_id,
expected_amount_fet=str(msg.funds.amount),
sender_fet_address=buyer_fet_wallet,
recipient_agent_wallet=_agent_wallet,
logger=ctx.logger,
use_mainnet=not use_testnet,
)
except Exception as e:
ctx.logger.error(f"FET verify error: {e}")
else:
ctx.logger.error(f"Unsupported payment method: {msg.funds.payment_method}")
if payment_verified:
ctx.logger.info(f"Payment verified successfully from {sender}")
await ctx.send(sender, CompletePayment(transaction_id=msg.transaction_id))
await generate_response_after_payment(ctx, sender)
else:
ctx.logger.error(f"Payment verification failed from {sender}")
await ctx.send(
sender,
CancelPayment(
transaction_id=msg.transaction_id,
reason="Payment verification failed",
),
)
@payment_proto.on_message(RejectPayment)
async def handle_reject_payment(ctx: Context, sender: str, msg: RejectPayment):
ctx.logger.info(f"Payment rejected by {sender}: {msg.reason}")
await ctx.send(
sender,
create_text_chat(
"Sorry, you denied the payment. Reply again and I'll send a new payment request."
),
)
async def generate_response_after_payment(ctx: Context, user_address: str):
from client import call_asi_one_api
session_id = str(ctx.session)
prompt = ctx.storage.get(f"prompt:{user_address}:{session_id}") or ctx.storage.get(
f"current_prompt:{user_address}:{session_id}"
)
if not prompt:
ctx.logger.error("No prompt found in storage")
await ctx.send(user_address, create_text_chat("Error: No prompt found"))
return
ctx.logger.info(f"Processing request for verified payment: {prompt}")
try:
result = await call_asi_one_api(prompt=prompt)
ctx.logger.info(
f"API result: status={result.get('status')}, "
f"has_image_url={bool(result.get('image_url'))}, "
f"has_response_text={bool(result.get('response_text'))}"
)
await process_api_result(ctx, user_address, result)
except Exception as e:
ctx.logger.error(f"API call error: {e}")
await ctx.send(user_address, create_text_chat(f"Error processing request: {e}"))
async def process_api_result(ctx: Context, sender: str, result: dict):
session_id = str(ctx.session)
if result.get("status") == "failed" or "error" in result:
err = result.get("error", "Unknown error")
await ctx.send(sender, create_text_chat(f"Error: {err}"))
if _allow_retry(ctx, sender, session_id):
await ctx.send(
sender,
create_text_chat(
"Request failed, but your payment is valid. "
"Send your request again - you won't be charged again."
),
)
return
image_url = result.get("image_url")
if image_url:
try:
image_markdown = (
f"Image generated successfully.\n\n"
f"\n\n"
)
await ctx.send(
sender,
AvChatMessage(
timestamp=datetime.now(timezone.utc),
msg_id=uuid4(),
content=[
TextContent(type="text", text=image_markdown),
],
),
)
ctx.storage.remove(f"{sender}:{session_id}:retry_count")
ctx.logger.info(f"Image sent successfully as markdown: {image_url}")
except Exception as e:
ctx.logger.error(f"Failed to send image: {e}")
if _allow_retry(ctx, sender, session_id):
await ctx.send(
sender,
create_text_chat(
"Could not send image, but your payment is valid. "
"Send your request again - no extra charge."
),
)
else:
await ctx.send(
sender,
create_text_chat(
"Could not send image. Please try again or start a new session."
),
)
return
response_text = result.get("response_text")
if not response_text:
await ctx.send(
sender,
create_text_chat("Response generated but could not retrieve image or text"),
)
if _allow_retry(ctx, sender, session_id):
await ctx.send(
sender,
create_text_chat(
"Delivery failed, but your payment is valid. "
"Send your request again - no extra charge."
),
)
return
try:
await ctx.send(
sender,
AvChatMessage(
timestamp=datetime.now(timezone.utc),
msg_id=uuid4(),
content=[
TextContent(type="text", text=response_text),
],
),
)
ctx.storage.remove(f"{sender}:{session_id}:retry_count")
ctx.logger.info("Response sent successfully")
except Exception as e:
ctx.logger.error(f"Failed to send response: {e}")
if _allow_retry(ctx, sender, session_id):
await ctx.send(
sender,
create_text_chat(
"Could not send response, but your payment is valid. "
"Send your request again - no extra charge."
),
)
else:
await ctx.send(
sender,
create_text_chat(
"Could not send response. Please try again or start a new session."
),
)
Image Generation Service
The agent uses ASI1 One image generation and tmpfiles.org for temporary hosting:
"""ASI1 One LLM API client."""
from __future__ import annotations
import asyncio
import base64
import os
from typing import Any
import requests
from dotenv import load_dotenv
load_dotenv()
ASI_ONE_API_KEY = os.getenv("ASI_ONE_API_KEY")
ASI_ONE_MODEL = os.getenv("ASI_ONE_MODEL", "asi1")
TMPFILES_API_URL = "https://tmpfiles.org/api/v1/upload"
def upload_to_tmpfiles(image_bytes: bytes, filename: str = "asi1_image.png") -> str:
"""Upload image to tmpfiles.org and return the public download URL (https)."""
try:
response = requests.post(
TMPFILES_API_URL,
files={"file": (filename, image_bytes, "image/png")},
timeout=120,
)
response.raise_for_status()
response_data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Tmpfiles upload failed: {e}") from e
raw_url = response_data.get("data", {}).get("url")
if not raw_url:
raise RuntimeError(f"Tmpfiles upload returned no URL: {response_data}")
return raw_url.replace("http://tmpfiles.org/", "https://tmpfiles.org/dl/")
async def call_asi_one_api(
*,
prompt: str,
size: str = "auto",
) -> dict[str, Any] | None:
"""Call ASI1 One image generation API with a text prompt and upload image to tmpfiles.org."""
if not ASI_ONE_API_KEY:
return {"error": "ASI_ONE_API_KEY is not set", "status": "failed"}
try:
url = "https://api.asi1.ai/v1/image/generate"
payload = {
"model": ASI_ONE_MODEL,
"prompt": prompt.strip(),
"size": size,
}
headers = {
"Authorization": f"Bearer {ASI_ONE_API_KEY}",
"Content-Type": "application/json",
}
response = requests.post(url, json=payload, headers=headers, timeout=60)
if not response.ok:
return {
"error": f"{response.status_code} Error from ASI image API: {response.text}",
"status": "failed",
}
response_data = response.json()
image_url = response_data.get("image_url") or response_data.get("url")
if not image_url:
data_items = response_data.get("data", [])
if data_items and isinstance(data_items, list):
first_item = data_items[0] if data_items else {}
image_url = first_item.get("url")
if not image_url and first_item.get("b64_json"):
try:
image_bytes = base64.b64decode(first_item["b64_json"])
image_url = await asyncio.to_thread(upload_to_tmpfiles, image_bytes)
except Exception as e:
return {
"error": f"Failed to process base64 image: {str(e)}",
"status": "failed",
}
if not image_url:
return {
"error": f"ASI image API returned no image URL: {response_data}",
"status": "failed",
}
return {
"image_url": image_url,
"status": "success",
}
except requests.RequestException as e:
return {"error": f"Image generation request failed: {str(e)}", "status": "failed"}
except Exception as e:
return {"error": str(e), "status": "failed"}
Chat Message Processing
The chat protocol handles incoming messages and coordinates payment flow:
from datetime import datetime, timezone
from uagents import Context, Protocol
from uagents_core.contrib.protocols.chat import (
ChatAcknowledgement,
ChatMessage,
TextContent,
chat_protocol_spec,
)
from payment import (
generate_response_after_payment,
request_payment_from_user,
)
chat_proto = Protocol(spec=chat_protocol_spec)
@chat_proto.on_message(ChatMessage)
async def handle_message(ctx: Context, sender: str, msg: ChatMessage):
ctx.logger.info(f"Got a message from {sender}: {msg.content}")
await ctx.send(
sender,
ChatAcknowledgement(
timestamp=datetime.now(timezone.utc), acknowledged_msg_id=msg.msg_id
),
)
for item in msg.content:
if isinstance(item, TextContent):
text = item.text.strip()
session_id = str(ctx.session)
awaiting_key = f"{sender}:{session_id}:awaiting_prompt"
verified_key = f"{sender}:{session_id}:verified_payment"
if (ctx.storage.has(awaiting_key) or ctx.storage.get(awaiting_key)) and (
ctx.storage.has(verified_key) or ctx.storage.get(verified_key)
):
ctx.logger.info("Consuming prompt post-payment and processing request")
ctx.storage.remove(awaiting_key)
ctx.storage.remove(verified_key)
ctx.storage.set(f"prompt:{sender}:{session_id}", text)
ctx.storage.set(f"current_prompt:{sender}:{session_id}", text)
ctx.storage.set("requesting_user", sender)
await generate_response_after_payment(ctx, sender)
return
ctx.logger.info(f"Requesting payment from {sender} for LLM processing")
payment_description = "Please complete the payment to process this request."
ctx.storage.set(f"prompt:{sender}:{session_id}", text)
ctx.storage.set("current_prompt", text)
ctx.storage.remove(f"{sender}:{session_id}:request_recorded")
await request_payment_from_user(
ctx,
sender,
description=payment_description,
)
return
@chat_proto.on_message(ChatAcknowledgement)
async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement):
ctx.logger.info(
f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}"
)
Helper Functions
Shared utilities for creating chat messages:
"""
Shared utilities to avoid circular imports between chat_proto and payment_proto.
"""
from datetime import datetime, timezone
from uuid import uuid4
from uagents_core.contrib.protocols.chat import (
AgentContent,
ChatMessage,
TextContent,
)
def create_text_chat(text: str, end_session: bool = False) -> ChatMessage:
"""Create a text chat message."""
content: list[AgentContent] = [TextContent(type="text", text=text)]
if end_session:
from uagents_core.contrib.protocols.chat import EndSessionContent
content.append(EndSessionContent(type="end-session"))
return ChatMessage(
timestamp=datetime.now(timezone.utc),
msg_id=uuid4(),
content=content,
)
Agent Initialization
The main agent file sets up protocols and wallet using environment-based configuration:
# agent.py - ASI1 One LLM Agent with Payment Protocol
import os
from dotenv import load_dotenv
from uagents import Agent, Context
load_dotenv()
from chat_proto import chat_proto
from payment import payment_proto, set_agent_wallet
agent = Agent(
name=os.getenv("AGENT_NAME", "Fet Example Agent"),
seed=os.getenv("AGENT_SEED_PHRASE", "asi1-llm-agent"),
port=int(os.getenv("AGENT_PORT", "8000")),
mailbox=True,
)
agent.include(chat_proto, publish_manifest=True)
agent.include(payment_proto, publish_manifest=True)
set_agent_wallet(agent.wallet)
@agent.on_event("startup")
async def startup(ctx: Context):
ctx.logger.info(f"Agent started: {agent.wallet.address()}")
ctx.logger.info("=== ASI1 One LLM Agent ===")
ctx.logger.info("Accepted: 0.1 FET (direct)")
ctx.logger.info("LLM via ASI1 One API (ASI_ONE_API_KEY)")
ctx.logger.info("Chat to request LLM processing")
if __name__ == "__main__":
agent.run()
Requirements
Install dependencies from requirements.txt:
uagents==0.23.6
uagents-core==0.4.0
python-dotenv
requests
cosmpy
Configuration
Copy .env.example to .env and set your credentials:
# Agent identity (optional; defaults shown)
AGENT_NAME=Fet Example Agent
AGENT_SEED_PHRASE=asi-imagen-agent
AGENT_PORT=8000
# ASI1 API key for image generation (required)
ASI_ONE_API_KEY=your_asi1_api_key_here
# Optional: ASI1 model name
# ASI_ONE_MODEL=asi1
# Fetch.ai network (true = testnet, false = mainnet)
FET_USE_TESTNET=true
Create a .env file with required credentials:
# Copy from .env.example and fill in values
cp .env.example .env
# Edit .env and set ASI_ONE_API_KEY, etc.
Installation & Execution
Set up the environment and run the agent:
# Create virtual environment
python3 -m venv .venv && source .venv/bin/activate
# Install dependencies from requirements.txt
pip install -r requirements.txt
# Configure environment (copy .env.example to .env and set ASI_ONE_API_KEY, etc.)
cp .env.example .env
# Start the agent
python3 agent.py
Usage Flow
- User sends message: Submit an image prompt via chat
- Payment request: Agent responds with a payment request for 0.1 FET
- User pays: Buyer completes on-chain FET transfer
- Verification: Agent verifies transaction on Fetch.ai ledger
- Generation: ASI1 One API creates the image (ASI_ONE_API_KEY)
- Delivery: Image uploaded to tmpfiles.org and sent as markdown image output
Implementation Details
Payment Verification Setup
- Install blockchain library:
pip install cosmpy - Network configuration: Set
FET_USE_TESTNETin.env(true for testnet, false for mainnet) - Payment definition: In
payment.py, defineFET_FUNDS = Funds(currency="FET", amount="0.1", payment_method="fet_direct") - Buyer metadata: On
CommitPayment, buyer must providebuyer_fet_walletinmsg.metadata - On-chain check: Agent uses
LedgerClient.query_tx()to verify transaction events match expected transfer
Key Implementation Features
- Blockchain Integration: Direct on-chain verification without payment intermediaries
- ASI1 One API: Image generation via ASI (
ASI_ONE_API_KEY); tmpfiles.org for hosting - State Management: Session-based storage tracks payment status and prompts
- Error Recovery: Failed generations allow one retry without additional payment (
_allow_retry) - Chat Protocol Output: Images delivered as markdown links in
TextContentwith tmpfiles.org URLs
The screenshots below show the Fet Example Agent flow: (1) payment request with Reject / Approve FET Payment, and (2) the generated image delivered after successful payment.
1. Payment request — Agent asks for 0.1 FET on Dorado; user can approve or reject.

2. Generated image — After payment, the agent returns the generated image (e.g. smartwatch on white background).

Getting FET Testnet Tokens
To test this agent on the Fetch.ai testnet, you'll need testnet FET tokens in your wallet. Here's how to get them:
-
Visit the Testnet Faucet: Go to the official Fetch.ai Testnet Faucet to request testnet tokens
-
Wait for Confirmation: Testnet tokens are typically distributed automatically after a short delay
For detailed instructions and a step-by-step visual guide, check out this blog post or watch the video below: