Pandas oddity

Table of Contents

What's going on here?

In [91]:
import pandas as pd
In [104]:
from numpy import nan
from pandas import NaT
In [119]:
df = pd.DataFrame({
    'timestamp_utc': {6: Timestamp('2019-10-08 03:40:00+0000', tz='UTC'), 
                      7: Timestamp('2019-10-08 04:40:00+0000', tz='UTC')}, 
})
In [120]:
df
Out[120]:
timestamp_utc
6 2019-10-08 03:40:00+00:00
7 2019-10-08 04:40:00+00:00
In [124]:
df.loc[:, 'hour'] = df['timestamp_utc'].dt.floor(freq='1 H')
df
Out[124]:
timestamp_utc hour
6 2019-10-08 03:40:00+00:00 2019-10-08 03:00:00+00:00
7 2019-10-08 04:40:00+00:00 2019-10-08 04:00:00+00:00
In [125]:
df = pd.DataFrame({
    'timestamp_utc': {6: Timestamp('2019-10-08 03:40:00+0000', tz='UTC')}, 
})
In [126]:
df.loc[:, 'hour'] = df['timestamp_utc'].dt.floor(freq='1 H')
df
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-126-4f1ec8ec51e9> in <module>
----> 1 df.loc[:, 'hour'] = df['timestamp_utc'].dt.floor(freq='1 H')
      2 df

~/covid-19/venv/lib/python3.6/site-packages/pandas/core/indexing.py in __setitem__(self, key, value)
    669             key = com.apply_if_callable(key, self.obj)
    670         indexer = self._get_setitem_indexer(key)
--> 671         self._setitem_with_indexer(indexer, value)
    672 
    673     def _validate_key(self, key, axis: int):

~/covid-19/venv/lib/python3.6/site-packages/pandas/core/indexing.py in _setitem_with_indexer(self, indexer, value)
    848                             indexer, self.obj.axes
    849                         )
--> 850                         self._setitem_with_indexer(new_indexer, value)
    851 
    852                         return self.obj

~/covid-19/venv/lib/python3.6/site-packages/pandas/core/indexing.py in _setitem_with_indexer(self, indexer, value)
   1010                     labels, value, plane_indexer, lplane_indexer, self.obj
   1011                 ):
-> 1012                     setter(labels[0], value)
   1013 
   1014                 # per label values

~/covid-19/venv/lib/python3.6/site-packages/pandas/core/indexing.py in setter(item, v)
    960                     s._consolidate_inplace()
    961                     s = s.copy()
--> 962                     s._data = s._data.setitem(indexer=pi, value=v)
    963                     s._maybe_update_cacher(clear=True)
    964 

~/covid-19/venv/lib/python3.6/site-packages/pandas/core/internals/managers.py in setitem(self, **kwargs)
    559 
    560     def setitem(self, **kwargs):
--> 561         return self.apply("setitem", **kwargs)
    562 
    563     def putmask(self, **kwargs):

~/covid-19/venv/lib/python3.6/site-packages/pandas/core/internals/managers.py in apply(self, f, filter, **kwargs)
    440                 applied = b.apply(f, **kwargs)
    441             else:
--> 442                 applied = getattr(b, f)(**kwargs)
    443             result_blocks = _extend_blocks(applied, result_blocks)
    444 

~/covid-19/venv/lib/python3.6/site-packages/pandas/core/internals/blocks.py in setitem(self, indexer, value)
    856                 if not is_dtype_equal(self.dtype, dtype):
    857                     b = self.astype(dtype)
--> 858                     return b.setitem(indexer, value)
    859 
    860         # value must be storeable at this moment

~/covid-19/venv/lib/python3.6/site-packages/pandas/core/internals/blocks.py in setitem(self, indexer, value)
    906 
    907             try:
--> 908                 values = values.astype(arr_value.dtype)
    909             except ValueError:
    910                 pass

TypeError: data type not understood
In [ ]:
 

A python decorator puzzle

In [14]:
import pandas as pd

I recently wrote an ETL job to be run in a data processing pipeline. The job fetches data from four database tables in our data lake, stores them in pandas DataFrames and outputs a single DataFrame. So in pseudocode:

In [44]:
def fetch_data_x(t_start: pd.Timestamp, t_end: pd.Timestamp):
    print(f"Fetching data X between {t_start} and {t_end}")
    return t_start, t_end
    
def fetch_data_y(t_start: pd.Timestamp, t_end: pd.Timestamp):
    print(f"Fetching data Y between {t_start} and {t_end}")
    return t_start, t_end

Since the fetching operations are the most time consuming part of the pipeline, I wanted to cache the fetched results. I wrote a decorator:

What's wrong with this piece of code?

In [45]:
from pathlib import Path
import pickle
def cache_result(cache_dir='./tmp'):
    def decorator(func):
        def wrapped(t_start, t_end):
            cache_dir = Path(cache_dir)
            function_details = f"{func.__code__.co_name},{t_start.isoformat()},{t_end.isoformat}"
            cache_filepath = cache_dir.joinpath(function_details)
            
            try:
                print(f"Reading from cache: {cache_filepath}")
                with open(cache_filepath, 'rb') as f:
                    res = pickle.load(f)
            except:
                res = func(t_start, t_end)
                print(f"Writing to cache: {cache_filepath}")
                with open(cache_filepath, 'wb') as f:
                    pickle.dump(res, f)
            return res
        return wrapped
    return decorator
In [46]:
@cache_result()
def fetch_data_x(t_start: pd.Timestamp, t_end: pd.Timestamp):
    print(f"Fetching data X between {t_start} and {t_end}")
    return t_start, t_end
    
@cache_result()
def fetch_data_y(t_start: pd.Timestamp, t_end: pd.Timestamp):
    print(f"Fetching data Y between {t_start} and {t_end}")
    return t_start, t_end
    
In [47]:
fetch_data_x(pd.Timestamp('2020-01-01'), pd.Timestamp('2020-02-02'))
---------------------------------------------------------------------------
UnboundLocalError                         Traceback (most recent call last)
<ipython-input-47-9ccb94c9170a> in <module>
----> 1 fetch_data_x(pd.Timestamp('2020-01-01'), pd.Timestamp('2020-02-02'))

<ipython-input-45-d206a3d6af1a> in wrapped(t_start, t_end)
      4     def decorator(func):
      5         def wrapped(t_start, t_end):
----> 6             cache_dir = Path(cache_dir)
      7             function_details = f"{func.__code__.co_name},{t_start.isoformat()},{t_end.isoformat}"
      8             cache_filepath = cache_dir.joinpath(function_details)

UnboundLocalError: local variable 'cache_dir' referenced before assignment

What is going on?

This took me quite a while to figure out. I read through a couple of in-depth description of how to write decorators, but was none the wiser.

In the end, I figured out this UnboundLocalError has less to do with decorators than namespaces. In particular, we are allowed to reference a variable defined in an outer scope from an inner scope, but not to reassign it. More details follow:

Referencing variable defind in outer scope: OK

In [51]:
foo = "Hello "

def hello(x):
    return foo+x
hello("world")
Out[51]:
'Hello world'

Assigning to variable already defind in outer scope: also fine. A new variable is created, variable in outer scope not modified.

In [56]:
foo = "Hello "

def hello(x):
    foo = "Hi! "
    return foo+x
hello("world")

hello("world")
Out[56]:
'Hi! world'
In [55]:
print(f"foo is: {foo}")
foo is: Hello 

All hell breaks loose if you do both: reference foo and at the same time try to assign to it:

In [57]:
foo = "Hello "

def hello(x):
    if foo == "Hello":
        foo = "Hi"
    return foo+x

hello("world")
---------------------------------------------------------------------------
UnboundLocalError                         Traceback (most recent call last)
<ipython-input-57-5b276360a2fe> in <module>
      6     return foo+x
      7 
----> 8 hello("world")

<ipython-input-57-5b276360a2fe> in hello(x)
      2 
      3 def hello(x):
----> 4     if foo == "Hello":
      5         foo = "Hi"
      6     return foo+x

UnboundLocalError: local variable 'foo' referenced before assignment

Now the fixed decorator

In [58]:
def cache_result(cache_dir='./tmp'):
    def decorator(func):
        def wrapped(t_start, t_end):
            cache_dir_path = Path(cache_dir)
            function_details = f"{func.__code__.co_name}_{t_start.isoformat()}_{t_end.isoformat()}"
            cache_filepath = cache_dir_path.joinpath(function_details)
            cache_dir_path.mkdir(parents=True, exist_ok=True)
            try:
                print(f"Reading from cache: {cache_filepath}")
                with open(cache_filepath, 'rb') as f:
                    res = pickle.load(f)
            except:
                print(f"Failed to read from cache: {cache_filepath}")
                res = func(t_start, t_end)
                print(f"Writing to cache: {cache_filepath}")
                with open(cache_filepath, 'wb') as f:
                    pickle.dump(res, f)
            return res
        return wrapped
    return decorator

@cache_result()
def fetch_data_x(t_start: pd.Timestamp, t_end: pd.Timestamp):
    print(f"Fetching data X between {t_start} and {t_end}")
    return t_start, t_end
    
@cache_result()
def fetch_data_y(t_start: pd.Timestamp, t_end: pd.Timestamp):
    print(f"Fetching data Y between {t_start} and {t_end}")
    return t_start, t_end
    
In [49]:
fetch_data_x(pd.Timestamp('2020-01-02'), pd.Timestamp('2020-02-02'))
Reading from cache: tmp/fetch_data_x_2020-01-02T00:00:00_2020-02-02T00:00:00
Out[49]:
(Timestamp('2020-01-02 00:00:00'), Timestamp('2020-02-02 00:00:00'))

Python's stdout buffering and systemd

Running a Python script as a Systemd service

This is very simple, first we need to create a systmed "unit", in /etc/systemd/system/foo.service

[Unit]
Description=Abracadabra
After=network.target

[Service]
Type=simple
User=bob
Restart=on-failure
RestartSec=3
ExecStart=/usr/bin/python3 /home/bob/test_daemon.py
[Install]
WantedBy=multi-user.target

Then, we can enable and start the service with

systemctl enable foo.service
systemctl start foo

What happens to the stdout and stderr?

They can be checked by calling journalctl -u foo. Assuming test_daemon.py was:

from datetime import datetime
from time import sleep
import sys

print("Started abracadabra")

while True:
    curtime = datetime.now().strftime('%X')
    print("Hi! Current time is: ", curtime)
    sleep(2)

we should see

$ sudo journalctl -u foo
Okt 09 21:07:18 etna python3[13499]: Started abracadabra
Okt 09 21:07:18 etna python3[13499]: Hi! Current time is:  21:07:18
Okt 09 21:07:20 etna python3[13499]: Hi! Current time is:  21:07:20
Okt 09 21:07:22 etna python3[13499]: Hi! Current time is:  21:07:22
Okt 09 21:07:24 etna python3[13499]: Hi! Current time is:  21:07:24
Okt 09 21:07:26 etna python3[13499]: Hi! Current time is:  21:07:26
Okt 09 21:07:28 etna python3[13499]: Hi! Current time is:  21:07:28
Okt 09 21:07:30 etna python3[13499]: Hi! Current time is:  21:07:30

Redirecting stdout/stderr to a file

We can ask Systemd to do this by adding these lines in /etc/systemd/system/foo.service

[Service]
---snip---
ExecStart=/usr/bin/python3 /home/bob/test_daemon.py
StandardOutput=file:/var/log/foo/stdout
StandardError=file:/var/log/foo/stderr
---snip---

But nothing gets written to /var/log/foo/stdout!

This is because python buffers stdout if it is not a console. If we really want to see the stdout of our daemon as soon as they are printed, we need to disable this buffering. This can be achieved by starting python with -u flag. So, the systemd unit file changes to:

[Unit]
Description=Abracadabra
After=network.target

[Service]
Type=simple
User=bob
Restart=on-failure
RestartSec=3
ExecStart=/usr/bin/python3 /home/bob/test_daemon.py
StandardOutput=file:/var/log/foo/stdout
StandardError=file:/var/log/foo/stderr
[Install]
WantedBy=multi-user.target

That's it! Now /var/log/foo/stdout should be populated with whatever test_daemon.py prints.

Logging in Python

Logging in Python

Python has a very powerful logging system. Adding logging to a Python program is as simple as:

In [1]:
import logging 
logger = logging.getLogger('__name__')
def function_that_logs(x, y):
    logger.info(f"I was called with x={x}, y={y}")

However, calling function_that_logs won't print the logged message anywhere yet:

In [4]:
function_that_logs(12, 13)

This is because unless explicitly enabled, python ignores logging calls. Otherwise calling codes would be inundated with logging messages from various libraries. There are a few steps to do before our logged message gets anywhere.

In [5]:
# Set the logger level. Anything logged with level lower than DEBUG will be ignored. 
logger.setLevel(logging.DEBUG)
# Now add a handler to our logger. StreamHandler prints to stderr
handler = logging.StreamHandler()
# Now the handler's level must be set too.
handler.setLevel(logging.DEBUG)
# Add the handler to the logger
logger.addHandler(handler)
In [7]:
function_that_logs(12, 13)
I was called with x=12, y=13

Finally, our logged message appears. A logger can have multiple handlers in Python: logging module contains many kinds of handlers, e.g. HTTPHandler (sends logs to a web server), SMTPHandler (sends the log via email) etc.

Custom formatters

We might sometimes want some additional information to be logged alongwith the message, for example the time the log was created. This can be achieved by setting the formatter of our log handler:

In [31]:
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s')
handler.setFormatter(formatter)
logger.handlers = []
logger.addHandler(handler)

Where do the properties like asctime or funcName come from? They are all attributes of the Logrecord objects. On each logging call, such a LogRecord object is created, and they contain a lot of information including the calling code's module name, filepath, line number etc.

In [12]:
class CustomFormatter(logging.Formatter):
    def format(self, record):
        return "Hello"
In [13]:
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = CustomFormatter()
handler.setFormatter(formatter)
logger.addHandler(handler)
In [14]:
function_that_logs(12, 13)
I was called with x=12, y=13
Hello
In [ ]:
 

Creating this website

I wanted a personal homepage/blog. I very often jot down my thoughts using Jupyter notebooks or MarkDown documents, so I looked for a tool that can create an website out of documents in these two formats. Since I love CI/CD pipelines, and totally wanted my future site to be automatically deployed to GitLab pages on each commit, I decided to use a static site generator (SSG). There are quite a few good SSG's, e.g. Pelican, Jekyll, Hugo, to name a few.

I settled on Nikola because I liked its documentations, out of the box support for Jupyter nokebooks and ease of configuring via a python file.

I roughly followed the guide by Jaakko Luttinen. The steps I took were:

  • Install nikola in a virtualenv.
  • Run nikola init to initialize a website.
  • Made jupyter notebooks the default format for posts in conf.py.
  • Enable MathJax rendering of $...$ delimited text.
  • Choose a theme by playing around with nikola subtheme.
  • Add CC-BY-SA license.
  • Configure GitLab CI so that on each commit, a fresh copy of the site gets deployed to GitLab pages.