Skip to main content
Version: 1.0.5

FET Image Agent Payment Protocol Example

This example showcases a paid image generation service where users pay with native FET tokens directly on the Fetch.ai blockchain. The agent verifies on-chain transactions before processing image requests with the ASI1 One API and delivers results through the chat protocol. This implementation demonstrates direct blockchain payment integration without third-party payment processors.

What it demonstrates

  • On-chain FET payment verification using Fetch.ai ledger queries.
  • Seller role implementation in the payment protocol workflow.
  • Integration with ASI1 One API for image generation.
  • Automatic image delivery as markdown image links via tmpfiles.org.
  • Robust error handling and one retry without re-payment on failure.

Project Structure

Key files used in this example:

  • agent.py – Agent setup; includes chat and payment protocols
  • chat_proto.py – Chat protocol and message handlers
  • payment.py – Seller-side payment logic (request, verify FET payment, generate image)
  • client.py – Gemini Imagen image generation and tmpfiles.org upload
  • shared.py – Shared utilities for creating chat messages

Core Dependencies

The payment protocol requires these imports:

from uagents_core.contrib.protocols.payment import (
Funds,
RequestPayment,
RejectPayment,
CommitPayment,
CancelPayment,
CompletePayment,
payment_protocol_spec,
)

Additionally, for blockchain verification:

from cosmpy.aerial.client import LedgerClient, NetworkConfig

On-Chain Payment Verification

Unlike token-based payment systems, this example performs direct blockchain verification by querying transaction events on the Fetch.ai network:

payment.py
def verify_fet_payment_to_agent(
transaction_id: str,
expected_amount_fet: str,
sender_fet_address: str,
recipient_agent_wallet,
logger,
use_mainnet: bool = False,
) -> bool:
"""Verify FET payment transaction on Fetch.ai network."""
try:
from cosmpy.aerial.client import LedgerClient, NetworkConfig

testnet = os.getenv("FET_USE_TESTNET", "true").lower() == "true" if not use_mainnet else False

network_config = (
NetworkConfig.fetchai_stable_testnet()
if testnet
else NetworkConfig.fetchai_mainnet()
)
ledger = LedgerClient(network_config)
expected_amount_micro = int(float(expected_amount_fet) * 10**18)

if not recipient_agent_wallet:
logger.error("Recipient agent wallet is not set")
return False

expected_recipient = str(recipient_agent_wallet.address())

logger.info(
f"Verifying payment of {expected_amount_fet} FET from {sender_fet_address} "
f"to {expected_recipient} on {'testnet' if testnet else 'mainnet'}"
)

tx_response = ledger.query_tx(transaction_id)
if not tx_response.is_successful():
logger.error(f"Transaction {transaction_id} was not successful")
return False

recipient_found = False
amount_found = False
sender_found = False
denom = "atestfet" if testnet else "afet"

for event_type, event_attrs in tx_response.events.items():
if event_type == "transfer":
if event_attrs.get("recipient") == expected_recipient:
recipient_found = True
if event_attrs.get("sender") == sender_fet_address:
sender_found = True
amount_str = event_attrs.get("amount", "")
if amount_str and amount_str.endswith(denom):
try:
amount_value = int(amount_str.replace(denom, ""))
if amount_value >= expected_amount_micro:
amount_found = True
except Exception:
pass

if recipient_found and amount_found and sender_found:
logger.info(f"Payment verified: {transaction_id}")
return True

logger.error(
f"Payment verification failed - recipient: {recipient_found}, "
f"amount: {amount_found}, sender: {sender_found}"
)
return False
except Exception as e:
logger.error(f"FET payment verification failed: {e}")
import traceback

logger.error(f"Traceback: {traceback.format_exc()}")
return False

Payment Protocol and Image Generation

The payment protocol uses FET_FUNDS, set_agent_wallet, request_payment_from_user, and handles CommitPayment / RejectPayment. After a verified payment, generate_response_after_payment calls the ASI1 One image API and process_api_result delivers image/text output to chat:

payment.py
"""
Payment protocol for ASI1 One LLM API agent.
"""

import os
import traceback
from datetime import datetime, timezone
from uuid import uuid4

from uagents import Context, Protocol
from uagents_core.contrib.protocols.payment import (
CancelPayment,
CommitPayment,
CompletePayment,
Funds,
RejectPayment,
RequestPayment,
payment_protocol_spec,
)
from uagents_core.contrib.protocols.chat import (
ChatMessage as AvChatMessage,
TextContent,
)

from shared import create_text_chat

_agent_wallet = None


def set_agent_wallet(wallet):
global _agent_wallet
_agent_wallet = wallet


payment_proto = Protocol(spec=payment_protocol_spec, role="seller")

FET_FUNDS = Funds(currency="FET", amount="0.1", payment_method="fet_direct")
ACCEPTED_FUNDS = [FET_FUNDS]


def verify_fet_payment_to_agent(
transaction_id: str,
expected_amount_fet: str,
sender_fet_address: str,
recipient_agent_wallet,
logger,
use_mainnet: bool = False,
) -> bool:
"""Verify FET payment transaction on Fetch.ai network."""
try:
from cosmpy.aerial.client import LedgerClient, NetworkConfig

testnet = os.getenv("FET_USE_TESTNET", "true").lower() == "true" if not use_mainnet else False

network_config = (
NetworkConfig.fetchai_stable_testnet()
if testnet
else NetworkConfig.fetchai_mainnet()
)
ledger = LedgerClient(network_config)
expected_amount_micro = int(float(expected_amount_fet) * 10**18)

if not recipient_agent_wallet:
logger.error("Recipient agent wallet is not set")
return False

expected_recipient = str(recipient_agent_wallet.address())

logger.info(
f"Verifying payment of {expected_amount_fet} FET from {sender_fet_address} "
f"to {expected_recipient} on {'testnet' if testnet else 'mainnet'}"
)

tx_response = ledger.query_tx(transaction_id)
if not tx_response.is_successful():
logger.error(f"Transaction {transaction_id} was not successful")
return False

recipient_found = False
amount_found = False
sender_found = False
denom = "atestfet" if testnet else "afet"

for event_type, event_attrs in tx_response.events.items():
if event_type == "transfer":
if event_attrs.get("recipient") == expected_recipient:
recipient_found = True
if event_attrs.get("sender") == sender_fet_address:
sender_found = True
amount_str = event_attrs.get("amount", "")
if amount_str and amount_str.endswith(denom):
try:
amount_value = int(amount_str.replace(denom, ""))
if amount_value >= expected_amount_micro:
amount_found = True
except Exception:
pass

if recipient_found and amount_found and sender_found:
logger.info(f"Payment verified: {transaction_id}")
return True

logger.error(
f"Payment verification failed - recipient: {recipient_found}, "
f"amount: {amount_found}, sender: {sender_found}"
)
return False
except Exception as e:
logger.error(f"FET payment verification failed: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
return False


async def request_payment_from_user(
ctx: Context, user_address: str, description: str | None = None
):
session = str(ctx.session)

accepted_funds: list[Funds] = []

fet_amount = os.getenv("FIXED_FET_AMOUNT", "0.1")
accepted_funds.append(
Funds(currency="FET", amount=str(fet_amount), payment_method="fet_direct")
)

if not accepted_funds:
ctx.logger.warning(
f"[payment] no accepted_funds; cannot send RequestPayment user={user_address} session={session}"
)
await ctx.send(
user_address,
AvChatMessage(
content=[
TextContent(
type="text",
text="No payment methods are currently available. Please try again in a moment.",
)
]
),
)
return

metadata: dict[str, str] = {
"agent": "asi1-llm-agent",
"service": "llm_processing",
}

use_testnet = os.getenv("FET_USE_TESTNET", "true").lower() == "true"
fet_network = "stable-testnet" if use_testnet else "mainnet"
metadata["fet_network"] = fet_network
metadata["mainnet"] = "false" if use_testnet else "true"

if _agent_wallet:
metadata["provider_agent_wallet"] = str(_agent_wallet.address())

if description:
metadata["content"] = description
else:
metadata["content"] = (
"Please complete the payment to proceed. "
"After payment, I will process your request using ASI1 One LLM."
)

recipient_addr = str(_agent_wallet.address()) if _agent_wallet else str(ctx.agent.address)

funds_log = [
{"method": f.payment_method, "currency": f.currency, "amount": f.amount}
for f in accepted_funds
]
ctx.logger.info(
f"[payment] outbound RequestPayment user={user_address} session={session} "
f"funds={funds_log} metadata={metadata} deadline_seconds=300"
)

payment_request = RequestPayment(
accepted_funds=accepted_funds,
recipient=recipient_addr,
deadline_seconds=300,
reference=session,
description=description
or "ASI1 One LLM: after payment, I will process your request",
metadata=metadata,
)

await ctx.send(user_address, payment_request)
ctx.logger.info(
f"[payment] RequestPayment sent to {user_address} with recipient {recipient_addr}"
)


def _allow_retry(ctx: Context, sender: str, session_id: str) -> bool:
retry_key = f"{sender}:{session_id}:retry_count"
try:
current = int(ctx.storage.get(retry_key) or 0)
except Exception:
current = 0
if current >= 1:
return False
ctx.storage.set(retry_key, current + 1)
ctx.storage.set(f"{sender}:{session_id}:awaiting_prompt", True)
ctx.storage.set(f"{sender}:{session_id}:verified_payment", True)
return True


@payment_proto.on_message(CommitPayment)
async def handle_commit_payment(ctx: Context, sender: str, msg: CommitPayment):
ctx.logger.info(f"Received payment commitment from {sender}")
payment_verified = False
if msg.funds.payment_method == "fet_direct" and msg.funds.currency == "FET":
try:
buyer_fet_wallet = None
if isinstance(msg.metadata, dict):
buyer_fet_wallet = msg.metadata.get("buyer_fet_wallet") or msg.metadata.get(
"buyer_fet_address"
)
if not buyer_fet_wallet:
ctx.logger.error("Missing buyer_fet_wallet in CommitPayment.metadata")
else:
use_testnet = os.getenv("FET_USE_TESTNET", "true").lower() == "true"
payment_verified = verify_fet_payment_to_agent(
transaction_id=msg.transaction_id,
expected_amount_fet=str(msg.funds.amount),
sender_fet_address=buyer_fet_wallet,
recipient_agent_wallet=_agent_wallet,
logger=ctx.logger,
use_mainnet=not use_testnet,
)
except Exception as e:
ctx.logger.error(f"FET verify error: {e}")
else:
ctx.logger.error(f"Unsupported payment method: {msg.funds.payment_method}")

if payment_verified:
ctx.logger.info(f"Payment verified successfully from {sender}")
await ctx.send(sender, CompletePayment(transaction_id=msg.transaction_id))
await generate_response_after_payment(ctx, sender)
else:
ctx.logger.error(f"Payment verification failed from {sender}")
await ctx.send(
sender,
CancelPayment(
transaction_id=msg.transaction_id,
reason="Payment verification failed",
),
)


@payment_proto.on_message(RejectPayment)
async def handle_reject_payment(ctx: Context, sender: str, msg: RejectPayment):
ctx.logger.info(f"Payment rejected by {sender}: {msg.reason}")
await ctx.send(
sender,
create_text_chat(
"Sorry, you denied the payment. Reply again and I'll send a new payment request."
),
)


async def generate_response_after_payment(ctx: Context, user_address: str):
from client import call_asi_one_api

session_id = str(ctx.session)
prompt = ctx.storage.get(f"prompt:{user_address}:{session_id}") or ctx.storage.get(
f"current_prompt:{user_address}:{session_id}"
)
if not prompt:
ctx.logger.error("No prompt found in storage")
await ctx.send(user_address, create_text_chat("Error: No prompt found"))
return

ctx.logger.info(f"Processing request for verified payment: {prompt}")
try:
result = await call_asi_one_api(prompt=prompt)
ctx.logger.info(
f"API result: status={result.get('status')}, "
f"has_image_url={bool(result.get('image_url'))}, "
f"has_response_text={bool(result.get('response_text'))}"
)
await process_api_result(ctx, user_address, result)
except Exception as e:
ctx.logger.error(f"API call error: {e}")
await ctx.send(user_address, create_text_chat(f"Error processing request: {e}"))


async def process_api_result(ctx: Context, sender: str, result: dict):
session_id = str(ctx.session)

if result.get("status") == "failed" or "error" in result:
err = result.get("error", "Unknown error")
await ctx.send(sender, create_text_chat(f"Error: {err}"))
if _allow_retry(ctx, sender, session_id):
await ctx.send(
sender,
create_text_chat(
"Request failed, but your payment is valid. "
"Send your request again - you won't be charged again."
),
)
return

image_url = result.get("image_url")
if image_url:
try:
image_markdown = (
f"Image generated successfully.\n\n"
f"![Generated image]({image_url})\n\n"
)

await ctx.send(
sender,
AvChatMessage(
timestamp=datetime.now(timezone.utc),
msg_id=uuid4(),
content=[
TextContent(type="text", text=image_markdown),
],
),
)
ctx.storage.remove(f"{sender}:{session_id}:retry_count")
ctx.logger.info(f"Image sent successfully as markdown: {image_url}")
except Exception as e:
ctx.logger.error(f"Failed to send image: {e}")
if _allow_retry(ctx, sender, session_id):
await ctx.send(
sender,
create_text_chat(
"Could not send image, but your payment is valid. "
"Send your request again - no extra charge."
),
)
else:
await ctx.send(
sender,
create_text_chat(
"Could not send image. Please try again or start a new session."
),
)
return

response_text = result.get("response_text")
if not response_text:
await ctx.send(
sender,
create_text_chat("Response generated but could not retrieve image or text"),
)
if _allow_retry(ctx, sender, session_id):
await ctx.send(
sender,
create_text_chat(
"Delivery failed, but your payment is valid. "
"Send your request again - no extra charge."
),
)
return

try:
await ctx.send(
sender,
AvChatMessage(
timestamp=datetime.now(timezone.utc),
msg_id=uuid4(),
content=[
TextContent(type="text", text=response_text),
],
),
)
ctx.storage.remove(f"{sender}:{session_id}:retry_count")
ctx.logger.info("Response sent successfully")
except Exception as e:
ctx.logger.error(f"Failed to send response: {e}")
if _allow_retry(ctx, sender, session_id):
await ctx.send(
sender,
create_text_chat(
"Could not send response, but your payment is valid. "
"Send your request again - no extra charge."
),
)
else:
await ctx.send(
sender,
create_text_chat(
"Could not send response. Please try again or start a new session."
),
)

Image Generation Service

The agent uses ASI1 One image generation and tmpfiles.org for temporary hosting:

client.py
"""ASI1 One LLM API client."""

from __future__ import annotations

import asyncio
import base64
import os
from typing import Any

import requests
from dotenv import load_dotenv

load_dotenv()

ASI_ONE_API_KEY = os.getenv("ASI_ONE_API_KEY")
ASI_ONE_MODEL = os.getenv("ASI_ONE_MODEL", "asi1")
TMPFILES_API_URL = "https://tmpfiles.org/api/v1/upload"


def upload_to_tmpfiles(image_bytes: bytes, filename: str = "asi1_image.png") -> str:
"""Upload image to tmpfiles.org and return the public download URL (https)."""
try:
response = requests.post(
TMPFILES_API_URL,
files={"file": (filename, image_bytes, "image/png")},
timeout=120,
)
response.raise_for_status()
response_data = response.json()
except requests.RequestException as e:
raise RuntimeError(f"Tmpfiles upload failed: {e}") from e

raw_url = response_data.get("data", {}).get("url")
if not raw_url:
raise RuntimeError(f"Tmpfiles upload returned no URL: {response_data}")

return raw_url.replace("http://tmpfiles.org/", "https://tmpfiles.org/dl/")


async def call_asi_one_api(
*,
prompt: str,
size: str = "auto",
) -> dict[str, Any] | None:
"""Call ASI1 One image generation API with a text prompt and upload image to tmpfiles.org."""
if not ASI_ONE_API_KEY:
return {"error": "ASI_ONE_API_KEY is not set", "status": "failed"}

try:
url = "https://api.asi1.ai/v1/image/generate"
payload = {
"model": ASI_ONE_MODEL,
"prompt": prompt.strip(),
"size": size,
}
headers = {
"Authorization": f"Bearer {ASI_ONE_API_KEY}",
"Content-Type": "application/json",
}

response = requests.post(url, json=payload, headers=headers, timeout=60)
if not response.ok:
return {
"error": f"{response.status_code} Error from ASI image API: {response.text}",
"status": "failed",
}

response_data = response.json()

image_url = response_data.get("image_url") or response_data.get("url")

if not image_url:
data_items = response_data.get("data", [])
if data_items and isinstance(data_items, list):
first_item = data_items[0] if data_items else {}
image_url = first_item.get("url")
if not image_url and first_item.get("b64_json"):
try:
image_bytes = base64.b64decode(first_item["b64_json"])
image_url = await asyncio.to_thread(upload_to_tmpfiles, image_bytes)
except Exception as e:
return {
"error": f"Failed to process base64 image: {str(e)}",
"status": "failed",
}

if not image_url:
return {
"error": f"ASI image API returned no image URL: {response_data}",
"status": "failed",
}

return {
"image_url": image_url,
"status": "success",
}
except requests.RequestException as e:
return {"error": f"Image generation request failed: {str(e)}", "status": "failed"}
except Exception as e:
return {"error": str(e), "status": "failed"}

Chat Message Processing

The chat protocol handles incoming messages and coordinates payment flow:

chat_proto.py
from datetime import datetime, timezone

from uagents import Context, Protocol
from uagents_core.contrib.protocols.chat import (
ChatAcknowledgement,
ChatMessage,
TextContent,
chat_protocol_spec,
)

from payment import (
generate_response_after_payment,
request_payment_from_user,
)

chat_proto = Protocol(spec=chat_protocol_spec)


@chat_proto.on_message(ChatMessage)
async def handle_message(ctx: Context, sender: str, msg: ChatMessage):
ctx.logger.info(f"Got a message from {sender}: {msg.content}")
await ctx.send(
sender,
ChatAcknowledgement(
timestamp=datetime.now(timezone.utc), acknowledged_msg_id=msg.msg_id
),
)

for item in msg.content:
if isinstance(item, TextContent):
text = item.text.strip()
session_id = str(ctx.session)
awaiting_key = f"{sender}:{session_id}:awaiting_prompt"
verified_key = f"{sender}:{session_id}:verified_payment"

if (ctx.storage.has(awaiting_key) or ctx.storage.get(awaiting_key)) and (
ctx.storage.has(verified_key) or ctx.storage.get(verified_key)
):
ctx.logger.info("Consuming prompt post-payment and processing request")
ctx.storage.remove(awaiting_key)
ctx.storage.remove(verified_key)
ctx.storage.set(f"prompt:{sender}:{session_id}", text)
ctx.storage.set(f"current_prompt:{sender}:{session_id}", text)
ctx.storage.set("requesting_user", sender)
await generate_response_after_payment(ctx, sender)
return

ctx.logger.info(f"Requesting payment from {sender} for LLM processing")
payment_description = "Please complete the payment to process this request."
ctx.storage.set(f"prompt:{sender}:{session_id}", text)
ctx.storage.set("current_prompt", text)
ctx.storage.remove(f"{sender}:{session_id}:request_recorded")
await request_payment_from_user(
ctx,
sender,
description=payment_description,
)
return


@chat_proto.on_message(ChatAcknowledgement)
async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement):
ctx.logger.info(
f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}"
)

Helper Functions

Shared utilities for creating chat messages:

shared.py
"""
Shared utilities to avoid circular imports between chat_proto and payment_proto.
"""

from datetime import datetime, timezone
from uuid import uuid4

from uagents_core.contrib.protocols.chat import (
AgentContent,
ChatMessage,
TextContent,
)


def create_text_chat(text: str, end_session: bool = False) -> ChatMessage:
"""Create a text chat message."""
content: list[AgentContent] = [TextContent(type="text", text=text)]
if end_session:
from uagents_core.contrib.protocols.chat import EndSessionContent

content.append(EndSessionContent(type="end-session"))
return ChatMessage(
timestamp=datetime.now(timezone.utc),
msg_id=uuid4(),
content=content,
)

Agent Initialization

The main agent file sets up protocols and wallet using environment-based configuration:

agent.py
# agent.py - ASI1 One LLM Agent with Payment Protocol
import os

from dotenv import load_dotenv
from uagents import Agent, Context

load_dotenv()

from chat_proto import chat_proto
from payment import payment_proto, set_agent_wallet

agent = Agent(
name=os.getenv("AGENT_NAME", "Fet Example Agent"),
seed=os.getenv("AGENT_SEED_PHRASE", "asi1-llm-agent"),
port=int(os.getenv("AGENT_PORT", "8000")),
mailbox=True,
)

agent.include(chat_proto, publish_manifest=True)
agent.include(payment_proto, publish_manifest=True)
set_agent_wallet(agent.wallet)


@agent.on_event("startup")
async def startup(ctx: Context):
ctx.logger.info(f"Agent started: {agent.wallet.address()}")
ctx.logger.info("=== ASI1 One LLM Agent ===")
ctx.logger.info("Accepted: 0.1 FET (direct)")
ctx.logger.info("LLM via ASI1 One API (ASI_ONE_API_KEY)")
ctx.logger.info("Chat to request LLM processing")


if __name__ == "__main__":
agent.run()

Requirements

Install dependencies from requirements.txt:

requirements.txt
uagents==0.23.6
uagents-core==0.4.0
python-dotenv
requests
cosmpy

Configuration

Copy .env.example to .env and set your credentials:

.env.example
# Agent identity (optional; defaults shown)
AGENT_NAME=Fet Example Agent
AGENT_SEED_PHRASE=asi-imagen-agent
AGENT_PORT=8000

# ASI1 API key for image generation (required)
ASI_ONE_API_KEY=your_asi1_api_key_here

# Optional: ASI1 model name
# ASI_ONE_MODEL=asi1

# Fetch.ai network (true = testnet, false = mainnet)
FET_USE_TESTNET=true

Create a .env file with required credentials:

# Copy from .env.example and fill in values
cp .env.example .env
# Edit .env and set ASI_ONE_API_KEY, etc.

Installation & Execution

Set up the environment and run the agent:

# Create virtual environment
python3 -m venv .venv && source .venv/bin/activate

# Install dependencies from requirements.txt
pip install -r requirements.txt

# Configure environment (copy .env.example to .env and set ASI_ONE_API_KEY, etc.)
cp .env.example .env

# Start the agent
python3 agent.py

Usage Flow

  1. User sends message: Submit an image prompt via chat
  2. Payment request: Agent responds with a payment request for 0.1 FET
  3. User pays: Buyer completes on-chain FET transfer
  4. Verification: Agent verifies transaction on Fetch.ai ledger
  5. Generation: ASI1 One API creates the image (ASI_ONE_API_KEY)
  6. Delivery: Image uploaded to tmpfiles.org and sent as markdown image output

Implementation Details

Payment Verification Setup

  1. Install blockchain library: pip install cosmpy
  2. Network configuration: Set FET_USE_TESTNET in .env (true for testnet, false for mainnet)
  3. Payment definition: In payment.py, define FET_FUNDS = Funds(currency="FET", amount="0.1", payment_method="fet_direct")
  4. Buyer metadata: On CommitPayment, buyer must provide buyer_fet_wallet in msg.metadata
  5. On-chain check: Agent uses LedgerClient.query_tx() to verify transaction events match expected transfer

Key Implementation Features

  • Blockchain Integration: Direct on-chain verification without payment intermediaries
  • ASI1 One API: Image generation via ASI (ASI_ONE_API_KEY); tmpfiles.org for hosting
  • State Management: Session-based storage tracks payment status and prompts
  • Error Recovery: Failed generations allow one retry without additional payment (_allow_retry)
  • Chat Protocol Output: Images delivered as markdown links in TextContent with tmpfiles.org URLs

The screenshots below show the Fet Example Agent flow: (1) payment request with Reject / Approve FET Payment, and (2) the generated image delivered after successful payment.

1. Payment request — Agent asks for 0.1 FET on Dorado; user can approve or reject.

Fet Example Agent – payment request

2. Generated image — After payment, the agent returns the generated image (e.g. smartwatch on white background).

Fet Example Agent – generated image result

Getting FET Testnet Tokens

To test this agent on the Fetch.ai testnet, you'll need testnet FET tokens in your wallet. Here's how to get them:

  1. Visit the Testnet Faucet: Go to the official Fetch.ai Testnet Faucet to request testnet tokens

  2. Wait for Confirmation: Testnet tokens are typically distributed automatically after a short delay

For detailed instructions and a step-by-step visual guide, check out this blog post or watch the video below: