Persisting logs in DB for each task in Celery (+ FastAPI)

At FereAI.xyz, we use celery at scale. Each time the trading agent wants to perform a trade or a manual sell is executed or emails are supposed to be sent for scheduled jobs. All these actions happen via celery.

Celery is great, but I always miss the feature from airflow where you can see the logs of each individual task run, and then be able to diagnose or debug something. So, I decided to build something around it.

Requirements

I wanted a solution where

  • All logs from celery jobs are stored in DB along with their task id, status & a few other params
  • Logging works otb for celery, fastapi & standalone usage (jupyter notebooks)

Here’s what I did

A database table for storing logs

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

from sqlalchemy import Column, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from datetime import datetime
from sqlalchemy.sql.expression import func
from sqlalchemy import DateTime
from .base import Base


class BatchedLog(Base):
  __tablename__ = "batched_log"
  id = Column(Integer, primary_key=True, autoincrement=True)
  task_id = Column(String(255), nullable=True, index=True)
  task_name = Column(String(255), nullable=True, index=True)
  status = Column(String(255), nullable=True, index=True)
  logs = Column(Text)  # Store logs as JSON
  created_at: Mapped[datetime] = mapped_column(
    DateTime(timezone=True),
    nullable=False,
    default=func.now(),
    index=True,
  )

Logger config

import contextvars
import logging
import os

from celery.app.log import TaskFormatter

# Create and configure the logger
logger = logging.getLogger("trader")
logger.setLevel(logging.DEBUG)

# Remove existing handlers to avoid duplication
if logger.hasHandlers():
  logger.handlers.clear()

# Stream handler for outputting to stdout
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)

# Formatter for the log messages
formatter = TaskFormatter(
  "%(asctime)s - %(task_id)s - %(task_name)s - %(name)s - %(threadName)s- %(levelname)s - %(message)s"
)

logger.addHandler(stream_handler)

# Prevent propagation to the root logger
logger.propagate = False

task_logs = contextvars.ContextVar("task_logs", default=[])

# Define a context variable to store the logger
logger_context = contextvars.ContextVar("logger_context", default=None)


def get_logger():
  """Retrieve the current logger from the context variable."""
  lc = logger_context.get()
  if lc is None:
    logger.name = "Jupyter" if "ipykernel" in os.environ else "Standalone"
    return logger
  return lc

Celery app

import json
import logging
import traceback
from uuid import UUID

from celery.app import Celery
from celery.app.log import TaskFormatter
from celery.schedules import crontab
from celery.signals import after_setup_task_logger
from celery.utils.log import get_task_logger
from celery.signals import task_prerun, task_postrun, task_failure
from celery.result import AsyncResult

from .logger_config import task_logs, logger_context, get_logger

app = Celery("trader", broker=get_redis_url(), backend=get_redis_url())
app.conf.task_logging_level = logging.DEBUG

# Signal to initialize logs
@task_prerun.connect
def initialize_logs(task_id=None, task=None, args=None, kwargs=None, **extras):
  logger_context.set(celery_task_logger)
  task_logs.set([])
  celery_task_logger.info(f"Logger set for Celery task: {task_id}")


# Signal to persist logs on task completion
@task_postrun.connect
def persist_logs_on_completion(
  task_id=None, task=None, args=None, kwargs=None, retval=None, **extras
):

  result = AsyncResult(task_id)
  status = result.status

  task_name = task.name if task else "unknown"

  logs = task_logs.get()
  print(f"Persisting logs for task_id={task_id}")

  # Save logs to the database
  with get_trade_db_session() as session:
    session.merge(
      BatchedLog(
        task_id=task_id,
        status=status.lower(),
        task_name=task_name,
        logs=json.dumps(logs),
      )
    )
    session.commit()
  logger_context.set(None)

# Signal to handle task failure
@task_failure.connect
def handle_failure(
  task_id=None,
  task=None,
  args=None,
  kwargs=None,
  exc=None,
  traceback=None,
  **extras,
):
  logger = get_logger()
  result = AsyncResult(task_id)
  status = result.status

  task_name = task.name if task else "unknown"

  logger.error(f"Task failed: {exc}")
  logger.error(
    "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
  )

  logs = task_logs.get()
  print(f"Task failed. Persisting logs for task_id={task_id}")

  # Save logs to the database
  with get_trade_db_session() as session:
    session.merge(
      BatchedLog(
        task_id=task_id,
        status=status.lower(),
        task_name=task_name,
        logs=json.dumps(logs),
      )
    )
    session.commit()

class TaskLogHandler(logging.Handler):
  def emit(self, record):
    logs = task_logs.get()
    logs.append(
      {
        "level": record.levelname,
        "message": record.getMessage(),
        "time": record.created,
        "filename": record.filename,
        "lineno": record.lineno,
        "funcName": record.funcName,
        "thread": record.threadName,
      }
    )
    task_logs.set(logs)


@after_setup_task_logger.connect
def setup_task_logger(logger, *args, **kwargs):
  task_handler = TaskLogHandler()
  logger.addHandler(task_handler)
  for handler in logger.handlers:
    handler.setFormatter(
      TaskFormatter(
        "%(asctime)s - %(task_id)s - %(task_name)s - %(name)s - %(levelname)s - %(message)s"
      )
    )

@app.task
def test_task():
  celery_task_logger.info("Starting main task")
  try:
    celery_task_logger.info(
      f"Started {len(agents_started)} agents with UUIDs: {agents_started}"
    )
    return agents_started
  except Exception as e:
    celery_task_logger.error(f"Task failed: {e}")
    celery_task_logger.error(traceback.format_exc())
    raise

Any other custom claases or functions used should also use the get_logger

from .logger_config import get_logger

class MyClass:

  def __init__(self):
    self.logger = get_logger()
  
  def foo(self):
    self.logger.info("Inside foo")

fastapi app

app = FastAPI(
...
)

@app.middleware("http")
async def set_fastapi_logger(request: Request, call_next):
  # Assign a request-specific logger to the logger context
  if not hasattr(request.state, "logger"):
    request.state.logger = get_logger()
  logger_context.set(request.state.logger)
  response = await call_next(request)
  # Clear the logger context after request handling
  logger_context.set(None)
  return response

Using websockets with Autogen

Autogen provides a default implementation of Websockets. It’s available at https://microsoft.github.io/autogen/docs/notebooks/agentchat_websockets. However, it has some limitations. Notably

  1. Isn’t compatible with ASGI Servers running multiple instances of FastAPI Server.

For a production grade deployment, one wants to have many instances of FastAPI or Django being served over an ASGI server like uvicorn.

So, this notebook demonstrates how to build an alternative approach that works seamlessly and scales up as the demand grows.

Requirements

Some extra dependencies are needed for this notebook, which can be installed via pip:

pip install pyautogen[websockets] fastapi uvicorn

Define your Agents

agent = autogen.ConversableAgent(
        name="chatbot",
        system_message="Complete a task given to you and reply TERMINATE when the task is done. If asked about the weather, use tool 'weather_forecast(city)' to get the weather forecast for a city.",
        llm_config={
            "config_list": autogen.config_list_from_json(
                env_or_file="OAI_CONFIG_LIST",
                filter_dict={
                    "model": ["gpt-4", "gpt-3.5-turbo", "gpt-3.5-turbo-16k"],
                },
            ),
            "stream": True,
        },
    )

user_proxy = autogen.UserProxyAgent(
        name="user_proxy",
        system_message="A proxy for the user.",
        is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("TERMINATE"),
        human_input_mode="NEVER",
        max_consecutive_auto_reply=10,
        code_execution_config=False,
    )

def weather_forecast(city: str) -> str:
        return f"The weather forecast for {city} at {datetime.now()} is sunny."

autogen.register_function(
        weather_forecast, caller=agent, executor=user_proxy, description="Weather forecast for a city"
    )

Create a custom IOStream that handles websocket connections

from autogen.io.base import IOStream
from fastapi import WebSocket

class CustomIOWebsockets(IOStream):
  r"""A websocket input/output stream.

  Attributes:
      _websocket (WebSocket): The websocket server.
  """

  def __init__(self, websocket: WebSocket) -> None:
    """Initialize the websocket input/output stream.

    Args:
        websocket (ServerConnection): The websocket server.

    Raises:
        ImportError: If the websockets module is not available.
    """
    self._websocket = websocket

  @staticmethod
  async def handler(websocket: WebSocket, on_connect, *args, **kwargs) -> None:
    """The handler function for the websocket server."""
    logger.debug(
      f" - CustomIOWebsockets._handler(): Client connected on {websocket}"
    )
    # create a new IOWebsockets instance using the websocket that is
    # create when a client connects
    iowebsocket = CustomIOWebsockets(websocket)
    with CustomIOWebsockets.set_default(iowebsocket):
      # call the on_connect function
      await on_connect(iowebsocket, *args, **kwargs)

  @property
  def websocket(self) -> "WebSocket":
    """The URI of the websocket server."""
    return self._websocket


  def print(
    self, *objects: Any, sep: str = " ", end: str = "\n", flush: bool = False
  ) -> None:
    r"""Print data to the output stream.

    Args:
        objects (any): The data to print.
        sep (str, optional): The separator between objects.
        Defaults to " ".
        end (str, optional): The end of the output. Defaults to "\n".
        flush (bool, optional): Whether to flush the output.
        Defaults to False.
    """
    if isinstance(objects, tuple) and isinstance(objects[0], dict):
      _xs = sep.join(map(json.dumps, objects)) + end
    else:
      _xs = sep.join(map(str, objects)) + end
    if _xs:
      xs = {"type": "websocket.send", "text": _text_to_send}

      def send_async():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
          loop.run_until_complete(self._websocket.send(xs))
        finally:
          loop.close()

      thread = threading.Thread(target=send_async)
      thread.start()
      thread.join()

  async def input(self, prompt: str = "", *, password: bool = False) -> str:
    """Read a line from the input stream.

    Args:
        prompt (str, optional): The prompt to display. Defaults to "".
        password (bool, optional): Whether to read a password.
        Defaults to False.

    Returns:
        str: The line read from the input stream.
    """
    if prompt != "":
      await self._websocket.send(prompt)

    msg = await self._websocket.receive_text()

    return msg.decode("utf-8") if isinstance(msg, bytes) else msg

Define your on_connect handler

async def chat_on_connect(
  iostream: CustomIOWebsockets,
) -> None:
  logger.debug(
    " - on_connect(): Connected to client using CustomIOWebsockets "
    f"{iostream}"
  )

  logger.debug(" - on_connect(): Receiving message from client.")

  msg = json.loads(await iostream.input())  # Await the input method
  query = msg["query"]
  user_proxy.initiate_chat(  # noqa: F704
        agent,
        message=initial_msg,
    )
  

Adding this with FastAPI

from fastapi.websockets import WebSocketState
from fastapi import WebSocket
from starlette.websockets import WebSocketDisconnect

app = FastAPI(
  title="WebSockets with Autogen",
  description="""A better websocket with Autogen""",
  version="1.0.1"
)

@app.websocket("/chat")
async def websocket_endpoint_v2(
  websocket: WebSocket,
):
  await websocket.accept()
  try:
    origin = websocket.headers.get("origin")
    await CustomIOWebsockets.handler(
      websocket,
      chat_on_connect,
    )
  except WebSocketDisconnect:
    logger.info("Client disconnected")
    pass
  except Exception as e:
    logger.exception(f"An error occurred: {e}", exc_info=True)
    # send the error message to the client
    if websocket.client_state == WebSocketState.CONNECTED:
      try:
        await websocket.send_text("An internal server error occurred. Closing.")
      except RuntimeError:
        pass
  finally:
    if websocket.client_state == WebSocketState.CONNECTED:
      try:
        await websocket.close()
      except RuntimeError:
        pass

Start your ASGI

uvicorn server:app  --workers 3 --ws auto

Building Farcaster Frame in Python using PyCaster

PyCaster Demo Frame Screenshot

Summary: This post guides you to build a farcaster frame in Python. We’ve used Meroku dApp Store Kit APIs to build a mini app store frame. You’ll also get access to some handy functions to help build your own frame in Python.

The frame is available at https://pycaster-demo.dappstore.app

Farcaster is currently buzzing with activity, especially since the introduction of frames a few weeks ago. This development is particularly thrilling for me, given my background. My career began at a social gaming development company, where we were among the pioneers in adopting the OpenSocial framework around 2010, allowing us to launch applications and games across various social networks. This effectively leveled the playing field, giving every network access to tools comparable to those available through Facebook’s Apps framework.

The introduction of Farcaster frames represents a similarly transformative opportunity, opening up endless possibilities for engagement within the ecosystem. However, most resources for creating frames are tailored towards the JavaScript ecosystem. As a Python developer, I found the learning curve somewhat steep, which motivated me to create a comprehensive guide once I successfully developed my own frame

In this blog post, I aim to demystify the process and provide a roadmap for others like me. The topics covered include:

  • Understanding the Frames Interaction Flow
  • How to Build an App Store
  • Setting Up a Flask Application
  • Employing Advanced Techniques

This guide is designed to bridge the gap for Python developers looking to explore Farcaster frames, ensuring a smoother and more accessible entry into this exciting space.

Understanding Frames Interaction Flow

Frames as state machines

Consider frames as intricate state machines: they begin with an initial state, and users can navigate through the application via specific actions, such as clicking buttons or entering text. These actions trigger transitions to new states, with each state accessible only through predefined pathways. This mechanism ensures a controlled and intuitive user interaction flow within your application.

Frames as the frontend for your app

Frames serve as the dynamic frontend of your application. To facilitate this, you will need to implement a series of server-side operations that respond to POST requests, each time rendering a new “frame” based on user interactions. This setup allows for a responsive and engaging user experience, as your application dynamically adapts to user inputs and actions.

The following image from Farcaster docs specify the frame interactions

Building an App Store

Creating Your Own App Store with Ease

Leveraging the Meroku Protocol’s Dapp Store Kit APIs, building an App Store is straightforward and efficient.

Functionality of Our App Store

Our App Store will feature a dynamic listing of Farcaster apps, displayed in a random sequence. This simplicity is intentional, focusing on the core functionality to ensure a seamless user experience. While the Meroku’s Dapp Store Kit APIs offer a broad spectrum of additional features—such as app screenshots, user ratings, rankings, and categorization—we’ll concentrate on the essentials for this guide. The exploration of these advanced functionalities is left as an exercise for the reader, allowing for creative expansion beyond the basics.

Implementation Steps

  1. Acquire a Meroku Developer API Key: Begin by completing this form to obtain your developer API key.
  2. Integrate with the Dapp Registry Search Endpoint: Utilize the /api/v1/dapp/search endpoint, specifying storekey as farcaster. This request fetches a list of apps available on Farcaster, which your App Store will use to populate its listings.
  3. Display Random Apps: Ensure that each time the frame is loaded, a random Farcaster app is showcased to the user. This adds an element of discovery and variety to the user experience.
  4. Enhance with User Interaction: Respond to user actions by presenting a different app or enabling users to share their thoughts on the app they’ve explored. This interactive element enriches the engagement with your App Store.

Accessing the Farcaster Apps Registry

The steps outlined provide a solid foundation for creating a minimalist yet functional App Store. By focusing on these key aspects, you can launch an App Store that not only showcases the diverse applications within Farcaster but also encourages exploration and user interaction.

Getting Farcaster Apps Registry

So we’ll have the building blocks for these actions written down in Python. This is easy, just a bunch of API calls.

import http.client
import json
import os
def get_apps():
  conn = http.client.HTTPSConnection("api.meroku.store")
  headers = {
      'Accept': "application/json",
      'apikey': os.getenv("MEROKU_API_KEY")
  }
  conn.request("GET", "/api/v1/dapp/search?storeKey=farcaster", headers=headers)
  res = conn.getresponse()
  if res.status != 200:
    return []
  data = res.read()
  data = json.loads(data)["data"]
  return data

Creating an image card for the app

Since Farcaster Frames only show images, you need to be able to take the app data and convert it into an image. This is the part where frameworks like Satori etc have an edge. I couldn’t find something highly recommended and had to build some custom functions for this part. If you’re aware of something better, do let me know.

Steps for this are:

  1. Choosing a background image
  2. Deciding which contents & images should be placed on the background image
  3. Deciding the positions, font size etc of the placements.

The majority of code for this lies at src/lib/image.py. The class ImageComponent can be used to define the elements on the image and their positioning. Using ImageComponent and generate_app_image you can generate any kind of image you want to.

from src.lib.image import ImageComponent, generate_app_image
def gen_image_for_dapp(app):
  image_stack = []
  
  app_logo = ImageComponent(
    ImageComponent.EXTERNAL_IMAGE,
    position=(100, 100),
    external_img_url=_app['images']['logo'],
    display_type=ImageComponent.DISPLAY_TYPE_CIRCLE,
    circle_radius=60
  )
  image_stack.append(app_logo)
  
  app_name = ImageComponent(
    ImageComponent.TEXT,
    position=(120, 0),
    text=_app['name'],
    font_size=40,
    font_color=(128, 128, 128)
  )
  image_stack.append(app_name)
  
  description = ImageComponent(
    ImageComponent.TEXT,
    position=(0, 200),
    text=_app['description'],
    font_size=30,
    font_color=(0, 0, 0)
  )
  image_stack.append(description)
  
  img = generate_app_image(image_stack)
  
  return img

The Frames Backend (aka Flask app)

We will have the structure of a standard flask app as follows.

Now we can build the index frame (zero state) with a simple code as follows

app.py

app = Flask(__name__, template_folder='src/templates')
@app.route('/')
def index():
  apps = get_apps()
  app_img_url = url_for('frame_image', app_id=apps[0]['dappId'])
  image_url = f"https://{ app_url }{ app_img_url }"
  post_url = f"https://{ app_url }{ url_for('action', app_id=apps[0]['dappId']) }"
  return render_template('index.html', image_url=image_url, post_url=post_url)
@app.route('/frame/image/<app_id>')
def frame_image(app_id):
  apps = get_apps()
  apps = filter(lambda x: x['dappId'] == app_id, apps)
  _app = next(apps, None)
  
  img = generate_image_for_dapp(_app)
  img.seek(0)
  return send_file(img, mimetype='image/png')

And your templates/index.html file looking as below

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <meta property="og:title" content="FarCaster AppStore"/>
    <meta property="og:image" content="{{ image_url }}"/>
    <meta property="fc:frame" content="vNext"/>
    <meta property="fc:frame:image" content="{{ image_url }}"/>
    <meta property="fc:frame:post_url" content="{{ post_url }}"/>
    <meta property="fc:frame:button:1" content="Find another App"/>
    <meta property="fc:frame:button:2" content="Visit App"/>
    <meta property="fc:frame:button:2:action" content="post_redirect" />
    <meta property="fc:frame:button:3" content="Re-Cast"/>
    <meta property="fc:frame:button:3:action" content="post_redirect" />
    <title>FarCaster AppStore</title>
 </head>
  <body>
<p>Hello Farcaster AppStore</p>
  </body>
</html>

Frames are rendered on the basis of their FC tags. To know more about the specs of FC tags, visit this Frames specification

Here we have first button to pick another app, second button to visit the app (external redirect) and third button to cast this app (external redirect).

Both these external redirects will lead to Frames UI sending a POST request to the URL specified in fc:frame:post_url meta property.

So now, we will write another POST handler that takes these fields and then acts accordingly.

But first, a word about casting

When we want to have a user cast from the frame, we will need to redirect them to a Composer Cast Intent URL. A notion doc is available at https://warpcast.notion.site/Cast-composer-intents-73f2c44884c6474ab53031abfb5f1be3 that describes this in depth.

A cast Intent URL looks like

https://warpcast.com/~/compose?text=Hello%20world!. When someone visits this URL warpcast opens the compose modal.

This experience works great in desktop. However in mobile, a direct hit to this URL will lead to the warpcast download action page and then it redirects to app store or play store. Thus removing the desired effect of cast intentions. Hence it’s necessary to have an app redirect URL that redirects to warpcast.

POST Action Controller -> Redirects to “Redirector” Page -> Redirects to Warpcast Intent URL

The app.py code for this would be as below. It’s important to have the FULL URL in the redirect call. This is because Frames spec specify the Redirect header to be the full redirect URL.

@app.route('/redirect/<app_id>')
def redirect_url(app_id: str):
  apps = get_apps()
  apps = filter(lambda x: x['dappId'] == app_id, apps)
  _app = next(apps, None)
  link_url = f"https://explorer.meroku.org/dapp?id={ app_id }"
  cast_text = f"Check out {_app['name']} on Meroku!"
  cast_text = quote(cast_text)
  
  cast_intent_url = f"https://warpcast.com/~/compose?text={cast_text}&embeds[]={link_url}"
  
  return redirect(cast_intent_url, 302)

Now we are ready to have our POST action controller written down. It’s also simple and goes below

@app.route('/action/<app_id>', methods=['POST'])
def action(app_id: str):
  data = request.get_json(silent=True)
  untrusted_data = data['untrustedData']
  buttonIndex = untrusted_data['buttonIndex']
  
  if buttonIndex == 1:
    apps = get_apps()
    idx = random.randint(0, len(apps) - 1)
    app_img_url = url_for('frame_image', app_id=apps[idx]['dappId'])
    image_url = f"https://{ app_url }{ app_img_url }"
    post_url = f"https://{ app_url }{ url_for('action', app_id=apps[idx]['dappId']) }"
    return render_template('index.html', image_url=image_url, post_url=post_url)
  
  elif buttonIndex == 2:
    user_id = f"fc_user:{ untrusted_data['fid'] }"
    redirect_url = f"https://api.meroku.store/api/v1/o/view/{ app_id }?userId={ user_id }"
    return redirect(redirect_url, 302)
  
  elif buttonIndex == 3:
    redirect_url = f"https://{ app_url }{ url_for('redirect_url', app_id=app_id) }"
    return redirect(redirect_url, 302)

A few important things to note in above code

  1. Since the same action controller will be called for each button in the frame, you must handle the behavior by switching on button index.
  2. Each redirect should be a complete URL. Without this, Frames won’t redirect your users properly.

That’s it. Congratulations. Now you know everything that you have to, in order to build a Farcaster Frame in Python.

Advanced Techniques

  1. Caching: Since frames needs a response in 5s, you should cache some data that you can. Specially the images. In this codebase you will find usage of redis to cache the images and other data points related to users. Depending on your frames, they can help you a lot in reducing image generation times.
  2. Parallel API Calls: As much as possible go for parallel API calls. This is also to reduce the time it takes for your server to respond.
  3. Prefer faster LLM over latest: This is again due to the 5s hard timeoff for frames to respond. Choose the fastest model that does your job.
  4. Working with images can be tricky. Use the ImageComponent and the utility functions to make images.

If you found this post helpful, cast about it. Click here to cast.

Code & Usage

The source code for this repo exists at https://github.com/merokudao/farcaster-appstore-frame

The repo can be used as a boilerplate code for your own projects. You can fork it and then repurpose it for your use case. If you need any help, reach out to me on Twitter or Farcaster.

The Doors of Perception – Extracts

By its very nature every embodied spirit is doomed to suffer and enjoy in solitude. Sensations, feelings, insights, fancies – all these are private and, except through symbols and at second hand, incommunicable. We can pool information about experiences, but never the experiences themselves. From family to nation, every human group is a society of island universe.

Neither agreeable nor disagreeable, it just is.

The suggestion is that the function of the brain and nervous system and sense organs is in the main eliminative and not productive. Each person is at each moment capable of remembering all that has ever happened to him and of perceiving everything that is happening everywhere in the universe. The function of the brain and nervous system is to protect us from being overwhelmed and confused by this mass of largely useless and irrelevant knowledge, by shutting out most of what we should otherwise perceive or remember at any moment, and leaving only that very small and special selection which is likely to be practically useful. 

According to such a theory, each one of us is potentially Mind at Large. But in so far as we are animals, our business is at all costs to survive. To make biological survival possible, Mind at Large has to be funneled through the reducing valve of the brain and nervous system. What comes out at the other end is a measly trickle of the kind of consciousness which will help us to stay alive on the surface of this Particular planet

Most men and women lead lives at the worst so painful, at the best so monotonous, poor and limited that the urge to escape, the longing to transcend themselves if only for a few moments, is and has always been one of the principal appetites of the soul

Solr gem and xml ruby gem How it matters

If you use solr-ruby gem, you should be well aware that Solr attempts to create an XML doc from the provided doc-hash. It first attempts to use ‘xml/libxml’, which if not available, falls back to REXML. It is recommended to use libxml.

All you need to do is


gem install libxml-ruby

If you are lucky enough, that’s all for you. However, many a times we face issues like


Building native extensions. This could take a while...
ERROR: Error installing libxml-ruby:
ERROR: Failed to build gem native extension.

/usr/bin/ruby extconf.rb
checking for socket() in -lsocket... no
checking for gethostbyname() in -lnsl... yes
checking for atan() in -lm... no
checking for atan() in -lm... yes
checking for inflate() in -lz... no
checking for inflate() in -lzlib... no
checking for inflate() in -lzlib1... no
checking for inflate() in -llibz... no
*** extconf.rb failed ***
Could not create Makefile due to some reason, probably lack of
necessary libraries and/or headers. Check the mkmf.log file for more
details. You may need configuration options.

If this is the case, you should install zlib-devel and libxml2-devel packages.


yum -y install zlib-devel
yum -y install libxml2-devel
gem install libxml-ruby

That sorts it all. You are good to go

ruby fetch_hash ArgumentError: NULL pointer given

Recently, One of our servers started to have the following error

irb(main):001:0> require 'rubygems'
=> true
irb(main):002:0> require 'mysql'
=> true
irb(main):003:0> conn = Mysql.connect('db01', 'xxxxxx', 'xxxxxx', 'employee')
=> #
irb(main):004:0> a = conn.query("SELECT * FROM employees WHERE id >= 8500 AND id < 9000")
=> #
irb(main):005:0> a.fetch_hash
ArgumentError: NULL pointer given
    from (irb):5:in `fetch_hash'
	from (irb):5
irb(main):006:0> exit

The method fetch_hash is a standard method and works. Here is the versions of MySQL and ruby that I used

-bash-3.2$ mysql --version
mysql  Ver 14.12 Distrib 5.0.89, for unknown-linux-gnu (x86_64) using readline 5.1

-bash-3.2$ ruby --version
ruby 1.8.7 (2009-06-12 patchlevel 174) [x86_64-linux]

If you face this issue in your servers or anywhere try the following workaround. They have worked for me, and they should work for you as well.

  1. uninstall the MySQL-shared-compat package
  2. Re Install mysql gem